diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 253c68ec86654..7aa66ab064bd6 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1741,6 +1741,56 @@ Note that the metrics are only available via reporters.
The total number of InputSplits this data source has processed (if the operator is a data source). |
Gauge |
+
+ Split |
+ watermark.currentWatermark |
+
+ The last watermark this split has received (in milliseconds).
+ |
+ Gauge |
+
+
+ watermark.activeTimeMsPerSecond |
+
+ The time (in milliseconds) this split has been active (i.e. not paused due to watermark alignment or idle due to idleness detection) per second.
+ |
+ Gauge |
+
+
+ watermark.pausedTimeMsPerSecond |
+
+ The time (in milliseconds) this split has been paused due to watermark alignment per second.
+ |
+ Gauge |
+
+
+ watermark.idleTimeMsPerSecond |
+
+ The time (in milliseconds) this split has been marked idle by idleness detection per second.
+ |
+ Gauge |
+
+
+ watermark.accumulatedActiveTimeMs |
+
+ Accumulated time (in milliseconds) this split was active since it was registered
+ |
+ Gauge |
+
+
+ watermark.accumulatedPausedTimeMs |
+
+ Accumulated time (in milliseconds) this split was paused since it was registered
+ |
+ Gauge |
+
+
+ watermark.accumulatedIdleTimeMs |
+
+ Accumulated time (in milliseconds) this split was idle since it was registered
+ |
+ Gauge |
+
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
index 6f07b73d0858a..9b76882736a6f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
@@ -118,7 +118,7 @@ private long getWatermark() {
* Setting a watermark will clear the idleness flag.
*/
public boolean setWatermark(long watermark) {
- this.idle = false;
+ setIdle(false);
final boolean updated = watermark > this.watermark;
if (updated) {
this.onWatermarkUpdate.onWatermarkUpdate(watermark);
@@ -133,6 +133,7 @@ private boolean isIdle() {
public void setIdle(boolean idle) {
this.idle = idle;
+ this.onWatermarkUpdate.onIdleUpdate(idle);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/IndexedCombinedWatermarkStatus.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/IndexedCombinedWatermarkStatus.java
index 3241b9044da2a..c49171a354eec 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/IndexedCombinedWatermarkStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/IndexedCombinedWatermarkStatus.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer.NoOpWatermarkUpdateListener;
import java.util.stream.IntStream;
@@ -43,7 +44,9 @@ public static IndexedCombinedWatermarkStatus forInputsCount(int inputsCount) {
CombinedWatermarkStatus.PartialWatermark[] partialWatermarks =
IntStream.range(0, inputsCount)
.mapToObj(
- i -> new CombinedWatermarkStatus.PartialWatermark(watermark -> {}))
+ i ->
+ new CombinedWatermarkStatus.PartialWatermark(
+ new NoOpWatermarkUpdateListener()))
.toArray(CombinedWatermarkStatus.PartialWatermark[]::new);
CombinedWatermarkStatus combinedWatermarkStatus = new CombinedWatermarkStatus();
for (CombinedWatermarkStatus.PartialWatermark partialWatermark : partialWatermarks) {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
index 375b60c04eb3d..8a459db86b757 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
@@ -49,11 +49,13 @@
@Internal
public class WatermarkOutputMultiplexer {
/** A callback for propagating changes to split based watermarks. */
- @FunctionalInterface
@Internal
public interface WatermarkUpdateListener {
/** Called when the watermark increases. */
void onWatermarkUpdate(long watermark);
+
+ /** Called when the idle state changes. */
+ void onIdleUpdate(boolean idle);
}
/**
@@ -95,6 +97,11 @@ public void registerNewOutput(String id, WatermarkUpdateListener onWatermarkUpda
combinedWatermarkStatus.add(outputState);
}
+ /** Registers a new multiplexed output with a no-op listener. */
+ public void registerNewOutput(String id) {
+ registerNewOutput(id, new NoOpWatermarkUpdateListener());
+ }
+
public boolean unregisterOutput(String id) {
final PartialWatermark output = watermarkPerOutputId.remove(id);
if (output != null) {
@@ -223,4 +230,14 @@ public void markActive() {
state.setIdle(false);
}
}
+
+ /** A No-op implementation of {@link WatermarkUpdateListener}. */
+ @Internal
+ public static class NoOpWatermarkUpdateListener implements WatermarkUpdateListener {
+ @Override
+ public void onWatermarkUpdate(long watermark) {}
+
+ @Override
+ public void onIdleUpdate(boolean idle) {}
+ }
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
index a5dae3d5e79f0..eaf0794a88af2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
@@ -263,7 +263,7 @@ void immediateUpdateOnSameOutputAsDeferredUpdateDoesNotRegress() {
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
final String id = "test-id";
- multiplexer.registerNewOutput(id, watermark -> {});
+ multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
@@ -287,7 +287,7 @@ void lowerImmediateUpdateOnSameOutputDoesNotEmitCombinedUpdate() {
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
final String id = "1234-test";
- multiplexer.registerNewOutput(id, watermark -> {});
+ multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
@@ -305,8 +305,8 @@ void testRemoveUnblocksWatermarks() {
final long lowTimestamp = 156765L;
final long highTimestamp = lowTimestamp + 10;
- multiplexer.registerNewOutput("lower", watermark -> {});
- multiplexer.registerNewOutput("higher", watermark -> {});
+ multiplexer.registerNewOutput("lower");
+ multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
multiplexer.unregisterOutput("lower");
@@ -324,8 +324,8 @@ void testRemoveOfLowestDoesNotImmediatelyAdvanceWatermark() {
final long lowTimestamp = -4343L;
final long highTimestamp = lowTimestamp + 10;
- multiplexer.registerNewOutput("lower", watermark -> {});
- multiplexer.registerNewOutput("higher", watermark -> {});
+ multiplexer.registerNewOutput("lower");
+ multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
@@ -343,11 +343,11 @@ void testRemoveOfHighestDoesNotRetractWatermark() {
final long lowTimestamp = 1L;
final long highTimestamp = 2L;
- multiplexer.registerNewOutput("higher", watermark -> {});
+ multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
multiplexer.unregisterOutput("higher");
- multiplexer.registerNewOutput("lower", watermark -> {});
+ multiplexer.registerNewOutput("lower");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
assertThat(underlyingWatermarkOutput.lastWatermark().getTimestamp())
@@ -359,7 +359,7 @@ void testRemoveRegisteredReturnValue() {
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
final WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
- multiplexer.registerNewOutput("does-exist", watermark -> {});
+ multiplexer.registerNewOutput("does-exist");
final boolean unregistered = multiplexer.unregisterOutput("does-exist");
@@ -385,7 +385,7 @@ void testNotEmittingIdleAfterAllSplitsRemoved() {
Watermark emittedWatermark = new Watermark(1);
final String id = UUID.randomUUID().toString();
- multiplexer.registerNewOutput(id, watermark -> {});
+ multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
immediateOutput.emitWatermark(emittedWatermark);
multiplexer.unregisterOutput(id);
@@ -401,7 +401,7 @@ void testNotEmittingIdleAfterAllSplitsRemoved() {
*/
private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) {
final String id = UUID.randomUUID().toString();
- multiplexer.registerNewOutput(id, watermark -> {});
+ multiplexer.registerNewOutput(id);
return multiplexer.getImmediateOutput(id);
}
@@ -411,7 +411,7 @@ private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer
*/
private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) {
final String id = UUID.randomUUID().toString();
- multiplexer.registerNewOutput(id, watermark -> {});
+ multiplexer.registerNewOutput(id);
return multiplexer.getDeferredOutput(id);
}
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceSplitMetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceSplitMetricGroup.java
new file mode 100644
index 0000000000000..2be5ce91df8ef
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceSplitMetricGroup.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Pre-defined metrics for {@code SplitEnumerator}.
+ *
+ *
You should only update the metrics in the main operator thread.
+ */
+@PublicEvolving
+public interface SourceSplitMetricGroup extends OperatorCoordinatorMetricGroup {
+ long getCurrentWatermark();
+
+ double getActiveTimePerSecond();
+
+ long getPausedTimePerSecond();
+
+ long getIdleTimePerSecond();
+
+ double getAccumulatedActiveTime();
+
+ long getAccumulatedPausedTime();
+
+ long getAccumulatedIdleTime();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 96e826846ea77..22f59ee4e9bbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -145,4 +145,13 @@ public static String currentInputWatermarkName(int index) {
public static final String FAILED_COMMITTABLES = "failedCommittables";
public static final String RETRIED_COMMITTABLES = "retriedCommittables";
public static final String PENDING_COMMITTABLES = "pendingCommittables";
+
+ // FLIP-513 split level metrics
+ public static final String SPLIT_CURRENT_WATERMARK = "currentWatermark";
+ public static final String SPLIT_ACTIVE_TIME = "activeTimeMs" + SUFFIX_RATE;
+ public static final String SPLIT_PAUSED_TIME = "pausedTimeMs" + SUFFIX_RATE;
+ public static final String SPLIT_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
+ public static final String ACC_SPLIT_PAUSED_TIME = "accumulatedPausedTimeMs";
+ public static final String ACC_SPLIT_ACTIVE_TIME = "accumulatedActiveTimeMs";
+ public static final String ACC_SPLIT_IDLE_TIME = "accumulatedIdleTimeMs";
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
index 8f9a2ffd99f8d..157cf473e1dfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
@@ -194,7 +194,6 @@ public synchronized long getCount() {
return currentCount;
}
- @VisibleForTesting
public synchronized boolean isMeasuring() {
return currentMeasurementStartTS != 0;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
new file mode 100644
index 0000000000000..0fc548509b762
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceSplitMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.TimerGauge;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Special {@link MetricGroup} representing an {@link SplitEnumerator}. */
+@Internal
+public class InternalSourceSplitMetricGroup extends ProxyMetricGroup
+ implements SourceSplitMetricGroup {
+
+ static final Logger LOG = LoggerFactory.getLogger(InternalSourceSplitMetricGroup.class);
+ private final TimerGauge pausedTimePerSecond;
+ private final TimerGauge idleTimePerSecond;
+ private final Gauge currentWatermarkGauge;
+ private final Clock clock;
+ private static final String SPLIT = "split";
+ private static final String WATERMARK = "watermark";
+ private static final long SPLIT_NOT_STARTED = -1L;
+ private long splitStartTime = SPLIT_NOT_STARTED;
+
+ private InternalSourceSplitMetricGroup(
+ MetricGroup parentMetricGroup,
+ Clock clock,
+ String splitId,
+ Gauge currentWatermark) {
+ super(parentMetricGroup);
+ this.clock = clock;
+ MetricGroup splitWatermarkMetricGroup =
+ parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
+ pausedTimePerSecond =
+ splitWatermarkMetricGroup.gauge(
+ MetricNames.SPLIT_PAUSED_TIME, new TimerGauge(clock));
+ idleTimePerSecond =
+ splitWatermarkMetricGroup.gauge(MetricNames.SPLIT_IDLE_TIME, new TimerGauge(clock));
+ splitWatermarkMetricGroup.gauge(
+ MetricNames.SPLIT_ACTIVE_TIME, this::getActiveTimePerSecond);
+ splitWatermarkMetricGroup.gauge(
+ MetricNames.ACC_SPLIT_PAUSED_TIME, this::getAccumulatedPausedTime);
+ splitWatermarkMetricGroup.gauge(
+ MetricNames.ACC_SPLIT_ACTIVE_TIME, this::getAccumulatedActiveTime);
+ splitWatermarkMetricGroup.gauge(
+ MetricNames.ACC_SPLIT_IDLE_TIME, this::getAccumulatedIdleTime);
+ currentWatermarkGauge =
+ splitWatermarkMetricGroup.gauge(
+ MetricNames.SPLIT_CURRENT_WATERMARK, currentWatermark);
+ }
+
+ public static InternalSourceSplitMetricGroup wrap(
+ OperatorMetricGroup operatorMetricGroup, String splitId, Gauge currentWatermark) {
+ return new InternalSourceSplitMetricGroup(
+ operatorMetricGroup, SystemClock.getInstance(), splitId, currentWatermark);
+ }
+
+ @VisibleForTesting
+ public static InternalSourceSplitMetricGroup mock(
+ MetricGroup metricGroup, String splitId, Gauge currentWatermakr) {
+ return new InternalSourceSplitMetricGroup(
+ metricGroup, SystemClock.getInstance(), splitId, currentWatermakr);
+ }
+
+ @VisibleForTesting
+ public static InternalSourceSplitMetricGroup wrap(
+ OperatorMetricGroup operatorMetricGroup,
+ Clock clock,
+ String splitId,
+ Gauge currentWatermark) {
+ return new InternalSourceSplitMetricGroup(
+ operatorMetricGroup, clock, splitId, currentWatermark);
+ }
+
+ public void markSplitStart() {
+ splitStartTime = clock.absoluteTimeMillis();
+ }
+
+ public void maybeMarkSplitStart() {
+ if (splitStartTime == SPLIT_NOT_STARTED) {
+ markSplitStart();
+ }
+ }
+
+ public long getCurrentWatermark() {
+ return this.currentWatermarkGauge.getValue();
+ }
+
+ public void markPaused() {
+ maybeMarkSplitStart();
+ if (isIdle()) {
+ // If a split got paused it means it emitted records,
+ // hence it shouldn't be considered idle anymore
+ markNotIdle();
+ LOG.warn("Split marked paused while still idle");
+ }
+ this.pausedTimePerSecond.markStart();
+ }
+
+ public void markIdle() {
+ maybeMarkSplitStart();
+ if (isPaused()) {
+ // If a split is marked idle, it has no records to emit.
+ // hence it shouldn't be considered paused anymore
+ markNotPaused();
+ LOG.warn("Split marked idle while still paused");
+ }
+ this.idleTimePerSecond.markStart();
+ }
+
+ public void markNotPaused() {
+ maybeMarkSplitStart();
+ this.pausedTimePerSecond.markEnd();
+ }
+
+ public void markNotIdle() {
+ maybeMarkSplitStart();
+ this.idleTimePerSecond.markEnd();
+ }
+
+ public double getActiveTimePerSecond() {
+ if (splitStartTime == SPLIT_NOT_STARTED) {
+ return 0L;
+ }
+ double activeTimePerSecond = 1000.0 - getPausedTimePerSecond() - getIdleTimePerSecond();
+ return Math.max(activeTimePerSecond, 0);
+ }
+
+ public double getAccumulatedActiveTime() {
+ if (splitStartTime == SPLIT_NOT_STARTED) {
+ return 0L;
+ }
+ return Math.max(
+ clock.absoluteTimeMillis()
+ - splitStartTime
+ - getAccumulatedPausedTime()
+ - getAccumulatedIdleTime(),
+ 0);
+ }
+
+ public long getAccumulatedIdleTime() {
+ return idleTimePerSecond.getAccumulatedCount();
+ }
+
+ public long getIdleTimePerSecond() {
+ return idleTimePerSecond.getValue();
+ }
+
+ public long getPausedTimePerSecond() {
+ return pausedTimePerSecond.getValue();
+ }
+
+ public long getAccumulatedPausedTime() {
+ return pausedTimePerSecond.getAccumulatedCount();
+ }
+
+ public Boolean isPaused() {
+ return pausedTimePerSecond.isMeasuring();
+ }
+
+ public Boolean isIdle() {
+ return idleTimePerSecond.isMeasuring();
+ }
+
+ public Boolean isActive() {
+ return !isPaused() && !isIdle();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index f4764ff685990..9495a31a0e938 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -38,6 +38,7 @@
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
+import org.apache.flink.runtime.metrics.groups.InternalSourceSplitMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -175,8 +176,11 @@ public class SourceOperator extends AbstractStr
private final List splitsToInitializeOutput = new ArrayList<>();
private final Map splitCurrentWatermarks = new HashMap<>();
+
private final Set currentlyPausedSplits = new HashSet<>();
+ private final Map splitMetricGroups = new HashMap<>();
+
private enum OperatingMode {
READING,
WAITING_FOR_ALIGNMENT,
@@ -350,6 +354,22 @@ public InternalSourceReaderMetricGroup getSourceMetricGroup() {
return sourceMetricGroup;
}
+ protected InternalSourceSplitMetricGroup getOrCreateSplitMetricGroup(String splitId) {
+ if (!this.splitMetricGroups.containsKey(splitId)) {
+ InternalSourceSplitMetricGroup splitMetricGroup =
+ InternalSourceSplitMetricGroup.wrap(
+ getMetricGroup(), splitId, () -> splitCurrentWatermarks.get(splitId));
+ splitMetricGroup.markSplitStart();
+ this.splitMetricGroups.put(splitId, splitMetricGroup);
+ }
+ return this.splitMetricGroups.get(splitId);
+ }
+
+ @VisibleForTesting
+ public InternalSourceSplitMetricGroup getSplitMetricGroup(String splitId) {
+ return this.splitMetricGroups.get(splitId);
+ }
+
@Override
public void open() throws Exception {
mainInputActivityClock = new PausableRelativeClock(getProcessingTimeService().getClock());
@@ -381,6 +401,9 @@ public void open() throws Exception {
final List splits = CollectionUtil.iterableToList(readerState.get());
if (!splits.isEmpty()) {
LOG.info("Restoring state for {} split(s) to reader.", splits.size());
+ for (SplitT s : splits) {
+ getOrCreateSplitMetricGroup(s.splitId());
+ }
splitsToInitializeOutput.addAll(splits);
sourceReader.addSplits(splits);
}
@@ -647,6 +670,7 @@ private void handleAddSplitsEvent(AddSplitEvent event) {
createOutputForSplits(newSplits);
}
sourceReader.addSplits(newSplits);
+ createMetricGroupForSplits(newSplits);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
}
@@ -658,6 +682,12 @@ private void createOutputForSplits(List newSplits) {
}
}
+ private void createMetricGroupForSplits(List newSplits) {
+ for (SplitT split : newSplits) {
+ getOrCreateSplitMetricGroup(split.splitId());
+ }
+ }
+
private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
currentMaxDesiredWatermark = event.getMaxWatermark();
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
@@ -683,9 +713,19 @@ public void updateCurrentSplitWatermark(String splitId, long watermark) {
}
}
+ @Override
+ public void updateCurrentSplitIdle(String splitId, boolean idle) {
+ if (idle) {
+ this.getOrCreateSplitMetricGroup(splitId).markIdle();
+ } else {
+ this.getOrCreateSplitMetricGroup(splitId).markNotIdle();
+ }
+ }
+
@Override
public void splitFinished(String splitId) {
splitCurrentWatermarks.remove(splitId);
+ this.splitMetricGroups.remove(splitId);
}
/**
@@ -718,6 +758,7 @@ private void pauseOrResumeSplits(
try {
sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
eventTimeLogic.pauseOrResumeSplits(splitsToPause, splitsToResume);
+ reportPausedOrResumed(splitsToPause, splitsToResume);
} catch (UnsupportedOperationException e) {
if (!allowUnalignedSourceSplits) {
throw e;
@@ -725,6 +766,16 @@ private void pauseOrResumeSplits(
}
}
+ private void reportPausedOrResumed(
+ Collection splitsToPause, Collection splitsToResume) {
+ for (String splitId : splitsToResume) {
+ getOrCreateSplitMetricGroup(splitId).markNotPaused();
+ }
+ for (String splitId : splitsToPause) {
+ getOrCreateSplitMetricGroup(splitId).markPaused();
+ }
+ }
+
private void checkWatermarkAlignment() {
if (operatingMode == OperatingMode.READING) {
checkState(waitingForAlignmentFuture.isDone());
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
index 5b96c5dc0c969..99ffd6b05db54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
@@ -272,9 +272,17 @@ SourceOutput createOutputForSplit(String splitId) {
PausableRelativeClock inputActivityClock = createInputActivityClock(splitId);
watermarkMultiplexer.registerNewOutput(
splitId,
- watermark ->
- watermarkUpdateListener.updateCurrentSplitWatermark(
- splitId, watermark));
+ new WatermarkOutputMultiplexer.WatermarkUpdateListener() {
+ @Override
+ public void onWatermarkUpdate(long watermark) {
+ watermarkUpdateListener.updateCurrentSplitWatermark(splitId, watermark);
+ }
+
+ @Override
+ public void onIdleUpdate(boolean idle) {
+ watermarkUpdateListener.updateCurrentSplitIdle(splitId, idle);
+ }
+ });
final WatermarkOutput onEventOutput = watermarkMultiplexer.getImmediateOutput(splitId);
final WatermarkOutput periodicOutput = watermarkMultiplexer.getDeferredOutput(splitId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
index 4d96e53e2e4b2..9da5a9274d18c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -63,6 +63,9 @@ interface WatermarkUpdateListener {
/** Notifies about changes to per split watermarks. */
void updateCurrentSplitWatermark(String splitId, long watermark);
+ /** Notifies about changes to per split idleness. */
+ void updateCurrentSplitIdle(String splitId, boolean idle);
+
/** Notifies that split has finished. */
void splitFinished(String splitId);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
index 4fcf46ca9f968..a283443634a35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
@@ -54,6 +54,9 @@ public void updateCurrentEffectiveWatermark(long watermark) {}
@Override
public void updateCurrentSplitWatermark(String splitId, long watermark) {}
+ @Override
+ public void updateCurrentSplitIdle(String splitId, boolean isIdle) {}
+
@Override
public void splitFinished(String splitId) {}
});
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroupTest.java
new file mode 100644
index 0000000000000..5dd63dcdaa518
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroupTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link InternalSourceSplitMetricGroup}. */
+class InternalSourceSplitMetricGroupTest {
+ private static final MetricRegistry registry = TestingMetricRegistry.builder().build();
+
+ InternalSourceSplitMetricGroup getMetricGroupWithClock(ManualClock clock) {
+ return InternalSourceSplitMetricGroup.wrap(
+ UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(),
+ clock,
+ "splitId",
+ null);
+ }
+
+ @Test
+ void testClocksStartTickingAfterSplitStarted() throws InterruptedException {
+ ManualClock clock = new ManualClock(System.currentTimeMillis());
+ InternalSourceSplitMetricGroup metricGroup = getMetricGroupWithClock(clock);
+ long timeBeforeSplitStart = 4L;
+ clock.advanceTime(Duration.ofMillis(timeBeforeSplitStart));
+ // assert clocks aren't ticking before the split has started:
+ assertThat(
+ metricGroup.getAccumulatedIdleTime()
+ + metricGroup.getAccumulatedPausedTime()
+ + metricGroup.getAccumulatedActiveTime())
+ .isEqualTo(0);
+
+ // start split
+ metricGroup.markSplitStart();
+ long timeAfterSplitStart = 6L;
+ clock.advanceTime(Duration.ofMillis(timeAfterSplitStart));
+ assertThat(
+ metricGroup.getAccumulatedIdleTime()
+ + metricGroup.getAccumulatedPausedTime()
+ + metricGroup.getAccumulatedActiveTime())
+ .isEqualTo(timeAfterSplitStart);
+ }
+
+ @Test
+ void testConsistencyOfTime() throws InterruptedException {
+ ManualClock clock = new ManualClock(System.currentTimeMillis());
+ InternalSourceSplitMetricGroup metricGroup = getMetricGroupWithClock(clock);
+ metricGroup.markSplitStart();
+ final long startTime = clock.absoluteTimeMillis();
+
+ long pausedTime = 2L;
+ metricGroup.markPaused();
+ clock.advanceTime(Duration.ofMillis(pausedTime));
+ metricGroup.markNotPaused();
+
+ long activeTime = 4L;
+ clock.advanceTime(Duration.ofMillis(activeTime));
+
+ long idleTime = 1000 - pausedTime - activeTime;
+ metricGroup.markIdle();
+ clock.advanceTime(Duration.ofMillis(idleTime));
+ metricGroup.markNotIdle();
+
+ assertThat(metricGroup.getAccumulatedPausedTime()).isEqualTo(pausedTime);
+ assertThat(metricGroup.getAccumulatedActiveTime()).isEqualTo(activeTime);
+ assertThat(metricGroup.getAccumulatedIdleTime()).isEqualTo(idleTime);
+
+ long totalDuration = clock.absoluteTimeMillis() - startTime;
+ assertThat((double) metricGroup.getAccumulatedPausedTime())
+ .isEqualTo(
+ totalDuration
+ - metricGroup.getAccumulatedActiveTime()
+ - metricGroup.getAccumulatedIdleTime());
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 2b6fe1fee633e..d3226c651c2b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -39,6 +39,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -55,6 +56,8 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -303,6 +306,142 @@ void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
assertThat(sourceReader.getPausedSplits()).isEmpty();
}
+ @Test
+ void testStateReportingForMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
+ long idleTimeout = 100;
+ MockSourceReader sourceReader =
+ new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ SourceOperator operator =
+ createAndOpenSourceOperatorWithIdleness(
+ sourceReader, processingTimeService, idleTimeout);
+
+ MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
+ MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
+ int allowedWatermark4 = 4;
+ int allowedWatermark7 = 7;
+ int allowedWatermark10 = 10;
+ split0.addRecord(5);
+ split1.addRecord(3);
+ split0.addRecord(6);
+ split1.addRecord(8);
+ operator.handleOperatorEvent(
+ new AddSplitEvent<>(
+ Arrays.asList(split0, split1), new MockSourceSplitSerializer()));
+ CollectingDataOutput dataOutput = new CollectingDataOutput<>();
+
+ // at this point, both splits are neither paused nor idle
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
+ assertThat(operator.getSplitMetricGroup(split1.splitId()).isActive()).isTrue();
+
+ operator.emitNext(dataOutput); // split0 emits 5
+ operator.emitNext(dataOutput); // split1 emits 3
+
+ operator.handleOperatorEvent(
+ new WatermarkAlignmentEvent(allowedWatermark4)); // blocks split0
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
+ assertThat(operator.getSplitMetricGroup(split1.splitId()).isActive()).isTrue();
+
+ processingTimeService.advance(idleTimeout - 1);
+ operator.emitNext(dataOutput); // split0 emits 6
+ for (int i = 0; i < 10; i++) {
+ processingTimeService.advance(idleTimeout); // split1 eventually turns idle
+ }
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
+ assertThat(operator.getSplitMetricGroup(split1.splitId()).isIdle()).isTrue();
+
+ operator.handleOperatorEvent(
+ new WatermarkAlignmentEvent(allowedWatermark7)); // unblocks split0
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
+ assertThat(operator.getSplitMetricGroup(split1.splitId()).isIdle()).isTrue();
+
+ operator.emitNext(dataOutput); // split1 emits 8
+ assertThat(operator.getSplitMetricGroup(split1.splitId()).isPaused()).isTrue();
+
+ operator.handleOperatorEvent(
+ new WatermarkAlignmentEvent(allowedWatermark10)); // unblocks split0
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
+ assertThat(operator.getSplitMetricGroup(split1.splitId()).isActive()).isTrue();
+ }
+
+ @Test
+ void testStateReportingForSingleSplitWatermarkAlignmentAndIdleness() throws Exception {
+ long idleTimeout = 100;
+ MockSourceReader sourceReader =
+ new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, true, true);
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ SourceOperator operator =
+ createAndOpenSourceOperatorWithIdleness(
+ sourceReader, processingTimeService, idleTimeout);
+
+ MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
+ int allowedWatermark4 = 4;
+ int allowedWatermark5 = 5;
+ int allowedWatermark7 = 7;
+ split0.addRecord(5);
+ split0.addRecord(6);
+ split0.addRecord(7);
+ operator.handleOperatorEvent(
+ new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer()));
+ CollectingDataOutput actualOutput = new CollectingDataOutput<>();
+
+ operator.emitNext(actualOutput);
+ assertOutput(actualOutput, Arrays.asList(5));
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
+
+ // testing transition active -> paused
+ operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark4));
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
+
+ // Testing the split doesn't become idle after idle timeout if paused
+ for (int i = 0; i < 10; i++) {
+ processingTimeService.advance(idleTimeout);
+ }
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isFalse();
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
+
+ // testing transition paused -> active
+ operator.handleOperatorEvent(
+ new WatermarkAlignmentEvent(allowedWatermark5)); // unblocks split0
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
+
+ // testing transition active -> idle
+ for (int i = 0; i < 10; i++) {
+ processingTimeService.advance(idleTimeout);
+ }
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+
+ // testing transition idle -> paused
+ operator.emitNext(actualOutput);
+ assertOutput(actualOutput, Arrays.asList(5, 6));
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isFalse();
+
+ operator.handleOperatorEvent(
+ new WatermarkAlignmentEvent(allowedWatermark7)); // unblocks split0
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
+
+ // testing transition idle -> active
+ for (int i = 0; i < 10; i++) {
+ processingTimeService.advance(idleTimeout);
+ }
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+ operator.emitNext(actualOutput);
+ assertOutput(actualOutput, Arrays.asList(5, 6, 7));
+ assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
+ }
+
+ private void assertOutput(
+ CollectingDataOutput actualOutput, List expectedOutput) {
+ assertThat(
+ actualOutput.getEvents().stream()
+ .filter(o -> o instanceof StreamRecord)
+ .mapToInt(object -> ((StreamRecord) object).getValue())
+ .boxed()
+ .collect(Collectors.toList()))
+ .containsExactly(expectedOutput.toArray(new Integer[0]));
+ }
+
private SourceOperator createAndOpenSourceOperatorWithIdleness(
MockSourceReader sourceReader,
TestProcessingTimeService processingTimeService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 84c10c3e86314..b8e827ca3c287 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -57,6 +57,8 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.concurrent.CompletableFuture;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
/** Unit test for {@link SourceOperator}. */
@SuppressWarnings("serial")
@@ -235,6 +237,42 @@ void testHandleBacklogEvent() throws Exception {
new RecordAttributes(false));
}
+ @Test
+ public void testMetricGroupIsCreatedForNewSplit() throws Exception {
+ operator.initializeState(context.createStateContext());
+ operator.open();
+ MockSourceSplit newSplit = new MockSourceSplit((2));
+ operator.handleOperatorEvent(
+ new AddSplitEvent<>(
+ Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
+ assertNotNull(operator.getSplitMetricGroup(newSplit.splitId()));
+ }
+
+ @Test
+ public void testMetricGroupIsCreatedForRestoredSplit() throws Exception {
+ MockSourceSplit restoredSplit = new MockSourceSplit((2));
+ StateInitializationContext stateContext =
+ context.createStateContext(Collections.singletonList(restoredSplit));
+ operator.initializeState(stateContext);
+ operator.open();
+ assertNotNull(operator.getSplitMetricGroup(restoredSplit.splitId()));
+ }
+
+ @Test
+ public void testMetricGroupTracksSplitWatermark() throws Exception {
+ long expectedWatermark = 1000;
+ operator.initializeState(context.createStateContext());
+ operator.open();
+ MockSourceSplit split = new MockSourceSplit((2));
+ operator.handleOperatorEvent(
+ new AddSplitEvent<>(
+ Collections.singletonList(split), new MockSourceSplitSerializer()));
+ operator.updateCurrentSplitWatermark(split.splitId(), expectedWatermark);
+ assertEquals(
+ operator.getSplitMetricGroup(split.splitId()).getCurrentWatermark(),
+ expectedWatermark);
+ }
+
private static class DataOutputToOutput implements PushingAsyncDataInput.DataOutput {
private final Output> output;