Skip to content

Commit 9eb2df9

Browse files
committed
[FLINK-37410] Split level Watermark metrics
1 parent 506ac18 commit 9eb2df9

File tree

15 files changed

+669
-18
lines changed

15 files changed

+669
-18
lines changed

docs/content/docs/ops/metrics.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,6 +1741,56 @@ Note that the metrics are only available via reporters.
17411741
<td>The total number of InputSplits this data source has processed (if the operator is a data source).</td>
17421742
<td>Gauge</td>
17431743
</tr>
1744+
<tr>
1745+
<th rowspan="7"><strong>Split</strong></th>
1746+
<td>watermark.currentWatermark</td>
1747+
<td>
1748+
The last watermark this split has received (in milliseconds).
1749+
</td>
1750+
<td>Gauge</td>
1751+
</tr>
1752+
<tr>
1753+
<td>watermark.activeTimeMsPerSecond</td>
1754+
<td>
1755+
The time (in milliseconds) this split is active (i.e. not paused due to watermark alignment or idle due to idleness detection) per second.
1756+
</td>
1757+
<td>Gauge</td>
1758+
</tr>
1759+
<tr>
1760+
<td>watermark.pausedTimeMsPerSecond</td>
1761+
<td>
1762+
The time (in milliseconds) this split is paused due to watermark alignment per second.
1763+
</td>
1764+
<td>Gauge</td>
1765+
</tr>
1766+
<tr>
1767+
<td>watermark.idleTimeMsPerSecond</td>
1768+
<td>
1769+
The time (in milliseconds) this split is marked idle by idleness detection per second.
1770+
</td>
1771+
<td>Gauge</td>
1772+
</tr>
1773+
<tr>
1774+
<td>watermark.accumulatedActiveTimeMs</td>
1775+
<td>
1776+
Accumulated time (in milliseconds) this split was active since registered
1777+
</td>
1778+
<td>Gauge</td>
1779+
</tr>
1780+
<tr>
1781+
<td>watermark.accumulatedPausedTimeMs</td>
1782+
<td>
1783+
Accumulated time (in milliseconds) this split was paused since registered
1784+
</td>
1785+
<td>Gauge</td>
1786+
</tr>
1787+
<tr>
1788+
<td>watermark.accumulatedIdleTimeMs</td>
1789+
<td>
1790+
Accumulated time (in milliseconds) this split was idle since registered
1791+
</td>
1792+
<td>Gauge</td>
1793+
</tr>
17441794
</tbody>
17451795
</table>
17461796

flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private long getWatermark() {
118118
* <p>Setting a watermark will clear the idleness flag.
119119
*/
120120
public boolean setWatermark(long watermark) {
121-
this.idle = false;
121+
setIdle(false);
122122
final boolean updated = watermark > this.watermark;
123123
if (updated) {
124124
this.onWatermarkUpdate.onWatermarkUpdate(watermark);
@@ -133,6 +133,7 @@ private boolean isIdle() {
133133

134134
public void setIdle(boolean idle) {
135135
this.idle = idle;
136+
this.onWatermarkUpdate.onIdleUpdate(idle);
136137
}
137138
}
138139
}

flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,13 @@
4949
@Internal
5050
public class WatermarkOutputMultiplexer {
5151
/** A callback for propagating changes to split based watermarks. */
52-
@FunctionalInterface
5352
@Internal
5453
public interface WatermarkUpdateListener {
5554
/** Called when the watermark increases. */
5655
void onWatermarkUpdate(long watermark);
56+
57+
/** Called when the idle state changes. */
58+
void onIdleUpdate(boolean idle);
5759
}
5860

5961
/**
@@ -95,6 +97,11 @@ public void registerNewOutput(String id, WatermarkUpdateListener onWatermarkUpda
9597
combinedWatermarkStatus.add(outputState);
9698
}
9799

100+
/** Registers a new multiplexed output with a no-op listener. */
101+
public void registerNewOutput(String id) {
102+
registerNewOutput(id, new NoOpWatermarkUpdateListener());
103+
}
104+
98105
public boolean unregisterOutput(String id) {
99106
final PartialWatermark output = watermarkPerOutputId.remove(id);
100107
if (output != null) {
@@ -223,4 +230,14 @@ public void markActive() {
223230
state.setIdle(false);
224231
}
225232
}
233+
234+
/** A No-op implementation of {@link WatermarkUpdateListener}. */
235+
@Internal
236+
public static class NoOpWatermarkUpdateListener implements WatermarkUpdateListener {
237+
@Override
238+
public void onWatermarkUpdate(long watermark) {}
239+
240+
@Override
241+
public void onIdleUpdate(boolean idle) {}
242+
}
226243
}

flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ void immediateUpdateOnSameOutputAsDeferredUpdateDoesNotRegress() {
263263
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
264264

265265
final String id = "test-id";
266-
multiplexer.registerNewOutput(id, watermark -> {});
266+
multiplexer.registerNewOutput(id);
267267
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
268268
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
269269

@@ -287,7 +287,7 @@ void lowerImmediateUpdateOnSameOutputDoesNotEmitCombinedUpdate() {
287287
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
288288

289289
final String id = "1234-test";
290-
multiplexer.registerNewOutput(id, watermark -> {});
290+
multiplexer.registerNewOutput(id);
291291
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
292292
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
293293

@@ -305,8 +305,8 @@ void testRemoveUnblocksWatermarks() {
305305
final long lowTimestamp = 156765L;
306306
final long highTimestamp = lowTimestamp + 10;
307307

308-
multiplexer.registerNewOutput("lower", watermark -> {});
309-
multiplexer.registerNewOutput("higher", watermark -> {});
308+
multiplexer.registerNewOutput("lower");
309+
multiplexer.registerNewOutput("higher");
310310
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
311311

312312
multiplexer.unregisterOutput("lower");
@@ -324,8 +324,8 @@ void testRemoveOfLowestDoesNotImmediatelyAdvanceWatermark() {
324324
final long lowTimestamp = -4343L;
325325
final long highTimestamp = lowTimestamp + 10;
326326

327-
multiplexer.registerNewOutput("lower", watermark -> {});
328-
multiplexer.registerNewOutput("higher", watermark -> {});
327+
multiplexer.registerNewOutput("lower");
328+
multiplexer.registerNewOutput("higher");
329329
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
330330
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
331331

@@ -343,11 +343,11 @@ void testRemoveOfHighestDoesNotRetractWatermark() {
343343
final long lowTimestamp = 1L;
344344
final long highTimestamp = 2L;
345345

346-
multiplexer.registerNewOutput("higher", watermark -> {});
346+
multiplexer.registerNewOutput("higher");
347347
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
348348
multiplexer.unregisterOutput("higher");
349349

350-
multiplexer.registerNewOutput("lower", watermark -> {});
350+
multiplexer.registerNewOutput("lower");
351351
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
352352

353353
assertThat(underlyingWatermarkOutput.lastWatermark().getTimestamp())
@@ -359,7 +359,7 @@ void testRemoveRegisteredReturnValue() {
359359
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
360360
final WatermarkOutputMultiplexer multiplexer =
361361
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
362-
multiplexer.registerNewOutput("does-exist", watermark -> {});
362+
multiplexer.registerNewOutput("does-exist");
363363

364364
final boolean unregistered = multiplexer.unregisterOutput("does-exist");
365365

@@ -385,7 +385,7 @@ void testNotEmittingIdleAfterAllSplitsRemoved() {
385385

386386
Watermark emittedWatermark = new Watermark(1);
387387
final String id = UUID.randomUUID().toString();
388-
multiplexer.registerNewOutput(id, watermark -> {});
388+
multiplexer.registerNewOutput(id);
389389
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
390390
immediateOutput.emitWatermark(emittedWatermark);
391391
multiplexer.unregisterOutput(id);
@@ -401,7 +401,7 @@ void testNotEmittingIdleAfterAllSplitsRemoved() {
401401
*/
402402
private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) {
403403
final String id = UUID.randomUUID().toString();
404-
multiplexer.registerNewOutput(id, watermark -> {});
404+
multiplexer.registerNewOutput(id);
405405
return multiplexer.getImmediateOutput(id);
406406
}
407407

@@ -411,7 +411,7 @@ private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer
411411
*/
412412
private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) {
413413
final String id = UUID.randomUUID().toString();
414-
multiplexer.registerNewOutput(id, watermark -> {});
414+
multiplexer.registerNewOutput(id);
415415
return multiplexer.getDeferredOutput(id);
416416
}
417417

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.metrics.groups;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
22+
/**
23+
* Pre-defined metrics for {@code SplitEnumerator}.
24+
*
25+
* <p>You should only update the metrics in the main operator thread.
26+
*/
27+
@PublicEvolving
28+
public interface SourceSplitMetricGroup extends OperatorCoordinatorMetricGroup {
29+
long getCurrentWatermark();
30+
31+
double getActiveTimePerSecond();
32+
33+
long getPausedTimePerSecond();
34+
35+
long getIdleTimePerSecond();
36+
37+
double getAccumulatedActiveTime();
38+
39+
long getAccumulatedPausedTime();
40+
41+
long getAccumulatedIdleTime();
42+
}

flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,13 @@ public static String currentInputWatermarkName(int index) {
145145
public static final String FAILED_COMMITTABLES = "failedCommittables";
146146
public static final String RETRIED_COMMITTABLES = "retriedCommittables";
147147
public static final String PENDING_COMMITTABLES = "pendingCommittables";
148+
149+
// FLIP-513 split level metrics
150+
public static final String SPLIT_CURRENT_WATERMARK = "currentWatermark";
151+
public static final String SPLIT_ACTIVE_TIME = "activeTimeMs" + SUFFIX_RATE;
152+
public static final String SPLIT_PAUSED_TIME = "pausedTimeMs" + SUFFIX_RATE;
153+
public static final String SPLIT_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
154+
public static final String ACC_SPLIT_PAUSED_TIME = "accumulatedPausedTimeMs";
155+
public static final String ACC_SPLIT_ACTIVE_TIME = "accumulatedActiveTimeMs";
156+
public static final String ACC_SPLIT_IDLE_TIME = "accumulatedIdleTimeMs";
148157
}

flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@ public synchronized long getCount() {
194194
return currentCount;
195195
}
196196

197-
@VisibleForTesting
198197
public synchronized boolean isMeasuring() {
199198
return currentMeasurementStartTS != 0;
200199
}

0 commit comments

Comments
 (0)