Skip to content

[FLINK-37410][runtime/metrics] Split level Watermark metrics #26276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,56 @@ Note that the metrics are only available via reporters.
<td>The total number of InputSplits this data source has processed (if the operator is a data source).</td>
<td>Gauge</td>
</tr>
<tr>
<th rowspan="7"><strong>Split</strong></th>
<td>watermark.currentWatermark</td>
<td>
The last watermark this split has received (in milliseconds).
</td>
<td>Gauge</td>
</tr>
<tr>
<td>watermark.activeTimeMsPerSecond</td>
<td>
The time (in milliseconds) this split has been active (i.e. not paused due to watermark alignment or idle due to idleness detection) per second.
</td>
<td>Gauge</td>
</tr>
<tr>
<td>watermark.pausedTimeMsPerSecond</td>
<td>
The time (in milliseconds) this split has been paused due to watermark alignment per second.
</td>
<td>Gauge</td>
</tr>
<tr>
<td>watermark.idleTimeMsPerSecond</td>
<td>
The time (in milliseconds) this split has been marked idle by idleness detection per second.
</td>
<td>Gauge</td>
</tr>
<tr>
<td>watermark.accumulatedActiveTimeMs</td>
<td>
Accumulated time (in milliseconds) this split was active since it was registered
</td>
<td>Gauge</td>
</tr>
<tr>
<td>watermark.accumulatedPausedTimeMs</td>
<td>
Accumulated time (in milliseconds) this split was paused since it was registered
</td>
<td>Gauge</td>
</tr>
<tr>
<td>watermark.accumulatedIdleTimeMs</td>
<td>
Accumulated time (in milliseconds) this split was idle since it was registered
</td>
<td>Gauge</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private long getWatermark() {
* <p>Setting a watermark will clear the idleness flag.
*/
public boolean setWatermark(long watermark) {
this.idle = false;
setIdle(false);
final boolean updated = watermark > this.watermark;
if (updated) {
this.onWatermarkUpdate.onWatermarkUpdate(watermark);
Expand All @@ -133,6 +133,7 @@ private boolean isIdle() {

public void setIdle(boolean idle) {
this.idle = idle;
this.onWatermarkUpdate.onIdleUpdate(idle);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer.NoOpWatermarkUpdateListener;

import java.util.stream.IntStream;

Expand All @@ -43,7 +44,9 @@ public static IndexedCombinedWatermarkStatus forInputsCount(int inputsCount) {
CombinedWatermarkStatus.PartialWatermark[] partialWatermarks =
IntStream.range(0, inputsCount)
.mapToObj(
i -> new CombinedWatermarkStatus.PartialWatermark(watermark -> {}))
i ->
new CombinedWatermarkStatus.PartialWatermark(
new NoOpWatermarkUpdateListener()))
.toArray(CombinedWatermarkStatus.PartialWatermark[]::new);
CombinedWatermarkStatus combinedWatermarkStatus = new CombinedWatermarkStatus();
for (CombinedWatermarkStatus.PartialWatermark partialWatermark : partialWatermarks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@
@Internal
public class WatermarkOutputMultiplexer {
/** A callback for propagating changes to split based watermarks. */
@FunctionalInterface
@Internal
public interface WatermarkUpdateListener {
/** Called when the watermark increases. */
void onWatermarkUpdate(long watermark);

/** Called when the idle state changes. */
void onIdleUpdate(boolean idle);
}

/**
Expand Down Expand Up @@ -95,6 +97,11 @@ public void registerNewOutput(String id, WatermarkUpdateListener onWatermarkUpda
combinedWatermarkStatus.add(outputState);
}

/** Registers a new multiplexed output with a no-op listener. */
public void registerNewOutput(String id) {
registerNewOutput(id, new NoOpWatermarkUpdateListener());
}

public boolean unregisterOutput(String id) {
final PartialWatermark output = watermarkPerOutputId.remove(id);
if (output != null) {
Expand Down Expand Up @@ -223,4 +230,14 @@ public void markActive() {
state.setIdle(false);
}
}

/** A No-op implementation of {@link WatermarkUpdateListener}. */
@Internal
public static class NoOpWatermarkUpdateListener implements WatermarkUpdateListener {
@Override
public void onWatermarkUpdate(long watermark) {}

@Override
public void onIdleUpdate(boolean idle) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ void immediateUpdateOnSameOutputAsDeferredUpdateDoesNotRegress() {
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);

final String id = "test-id";
multiplexer.registerNewOutput(id, watermark -> {});
multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);

Expand All @@ -287,7 +287,7 @@ void lowerImmediateUpdateOnSameOutputDoesNotEmitCombinedUpdate() {
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);

final String id = "1234-test";
multiplexer.registerNewOutput(id, watermark -> {});
multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);

Expand All @@ -305,8 +305,8 @@ void testRemoveUnblocksWatermarks() {
final long lowTimestamp = 156765L;
final long highTimestamp = lowTimestamp + 10;

multiplexer.registerNewOutput("lower", watermark -> {});
multiplexer.registerNewOutput("higher", watermark -> {});
multiplexer.registerNewOutput("lower");
multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));

multiplexer.unregisterOutput("lower");
Expand All @@ -324,8 +324,8 @@ void testRemoveOfLowestDoesNotImmediatelyAdvanceWatermark() {
final long lowTimestamp = -4343L;
final long highTimestamp = lowTimestamp + 10;

multiplexer.registerNewOutput("lower", watermark -> {});
multiplexer.registerNewOutput("higher", watermark -> {});
multiplexer.registerNewOutput("lower");
multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));

Expand All @@ -343,11 +343,11 @@ void testRemoveOfHighestDoesNotRetractWatermark() {
final long lowTimestamp = 1L;
final long highTimestamp = 2L;

multiplexer.registerNewOutput("higher", watermark -> {});
multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
multiplexer.unregisterOutput("higher");

multiplexer.registerNewOutput("lower", watermark -> {});
multiplexer.registerNewOutput("lower");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));

assertThat(underlyingWatermarkOutput.lastWatermark().getTimestamp())
Expand All @@ -359,7 +359,7 @@ void testRemoveRegisteredReturnValue() {
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
final WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
multiplexer.registerNewOutput("does-exist", watermark -> {});
multiplexer.registerNewOutput("does-exist");

final boolean unregistered = multiplexer.unregisterOutput("does-exist");

Expand All @@ -385,7 +385,7 @@ void testNotEmittingIdleAfterAllSplitsRemoved() {

Watermark emittedWatermark = new Watermark(1);
final String id = UUID.randomUUID().toString();
multiplexer.registerNewOutput(id, watermark -> {});
multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
immediateOutput.emitWatermark(emittedWatermark);
multiplexer.unregisterOutput(id);
Expand All @@ -401,7 +401,7 @@ void testNotEmittingIdleAfterAllSplitsRemoved() {
*/
private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) {
final String id = UUID.randomUUID().toString();
multiplexer.registerNewOutput(id, watermark -> {});
multiplexer.registerNewOutput(id);
return multiplexer.getImmediateOutput(id);
}

Expand All @@ -411,7 +411,7 @@ private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer
*/
private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) {
final String id = UUID.randomUUID().toString();
multiplexer.registerNewOutput(id, watermark -> {});
multiplexer.registerNewOutput(id);
return multiplexer.getDeferredOutput(id);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics.groups;

import org.apache.flink.annotation.PublicEvolving;

/**
* Pre-defined metrics for {@code SplitEnumerator}.
*
* <p>You should only update the metrics in the main operator thread.
*/
@PublicEvolving
public interface SourceSplitMetricGroup extends OperatorCoordinatorMetricGroup {
long getCurrentWatermark();

double getActiveTimePerSecond();

long getPausedTimePerSecond();

long getIdleTimePerSecond();

double getAccumulatedActiveTime();

long getAccumulatedPausedTime();

long getAccumulatedIdleTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,13 @@ public static String currentInputWatermarkName(int index) {
public static final String FAILED_COMMITTABLES = "failedCommittables";
public static final String RETRIED_COMMITTABLES = "retriedCommittables";
public static final String PENDING_COMMITTABLES = "pendingCommittables";

// FLIP-513 split level metrics
public static final String SPLIT_CURRENT_WATERMARK = "currentWatermark";
public static final String SPLIT_ACTIVE_TIME = "activeTimeMs" + SUFFIX_RATE;
public static final String SPLIT_PAUSED_TIME = "pausedTimeMs" + SUFFIX_RATE;
public static final String SPLIT_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
public static final String ACC_SPLIT_PAUSED_TIME = "accumulatedPausedTimeMs";
public static final String ACC_SPLIT_ACTIVE_TIME = "accumulatedActiveTimeMs";
public static final String ACC_SPLIT_IDLE_TIME = "accumulatedIdleTimeMs";
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ public synchronized long getCount() {
return currentCount;
}

@VisibleForTesting
public synchronized boolean isMeasuring() {
return currentMeasurementStartTS != 0;
}
Expand Down
Loading