Skip to content

Commit fb58b02

Browse files
committed
fixup! [FLINK-37410] Split level Watermark metrics
1 parent 028379e commit fb58b02

File tree

5 files changed

+36
-34
lines changed

5 files changed

+36
-34
lines changed

flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717
*/
1818

1919
package org.apache.flink.api.common.eventtime;
20-
import static org.apache.flink.util.Preconditions.checkState;
21-
22-
import java.util.HashMap;
23-
import java.util.Map;
2420

2521
import org.apache.flink.annotation.Internal;
2622
import org.apache.flink.api.common.eventtime.CombinedWatermarkStatus.PartialWatermark;
2723
import org.apache.flink.util.Preconditions;
2824

25+
import java.util.HashMap;
26+
import java.util.Map;
27+
28+
import static org.apache.flink.util.Preconditions.checkState;
29+
2930
/**
3031
* A {@link WatermarkOutputMultiplexer} combines the watermark (and idleness) updates of multiple
3132
* partitions/shards/splits into one combined watermark update and forwards it to an underlying

flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceSplitMetricGroup.java

+25-26
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,28 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.metrics.groups;
19-
20-
import org.apache.flink.annotation.PublicEvolving;
21-
22-
/**
23-
* Pre-defined metrics for {@code SplitEnumerator}.
24-
*
25-
* <p>You should only update the metrics in the main operator thread.
26-
*/
27-
@PublicEvolving
28-
public interface SourceSplitMetricGroup extends OperatorCoordinatorMetricGroup {
29-
long getCurrentWatermark();
30-
31-
double getActiveTimePerSecond();
32-
33-
long getPausedTimePerSecond();
34-
35-
long getIdleTimePerSecond();
36-
37-
double getAccumulatedActiveTime();
38-
39-
long getAccumulatedPausedTime();
40-
41-
long getAccumulatedIdleTime();
42-
}
43-
18+
package org.apache.flink.metrics.groups;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
22+
/**
23+
* Pre-defined metrics for {@code SplitEnumerator}.
24+
*
25+
* <p>You should only update the metrics in the main operator thread.
26+
*/
27+
@PublicEvolving
28+
public interface SourceSplitMetricGroup extends OperatorCoordinatorMetricGroup {
29+
long getCurrentWatermark();
30+
31+
double getActiveTimePerSecond();
32+
33+
long getPausedTimePerSecond();
34+
35+
long getIdleTimePerSecond();
36+
37+
double getAccumulatedActiveTime();
38+
39+
long getAccumulatedPausedTime();
40+
41+
long getAccumulatedIdleTime();
42+
}

flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
178178
private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
179179

180180
private final Set<String> currentlyPausedSplits = new HashSet<>();
181-
181+
182182
private final Map<String, InternalSourceSplitMetricGroup> splitMetricGroups = new HashMap<>();
183183

184184
private enum OperatingMode {
@@ -775,7 +775,7 @@ private void reportPausedOrResumed(
775775
getOrCreateSplitMetricGroup(splitId).markPaused();
776776
}
777777
}
778-
778+
779779
private void checkWatermarkAlignment() {
780780
if (operatingMode == OperatingMode.READING) {
781781
checkState(waitingForAlignmentFuture.isDone());

flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ Licensed to the Apache Software Foundation (ASF) under one
3939
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
4040
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
4141
import org.apache.flink.streaming.runtime.io.DataInputStatus;
42+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
4243
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
4344
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
4445
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
4546
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
4647
import org.apache.flink.streaming.util.MockOutput;
4748
import org.apache.flink.streaming.util.MockStreamConfig;
48-
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
4949

5050
import org.assertj.core.api.Condition;
5151
import org.junit.jupiter.api.Test;

flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ Licensed to the Apache Software Foundation (ASF) under one
4444
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
4545
import org.apache.flink.streaming.util.CollectorOutput;
4646
import org.apache.flink.util.CollectionUtil;
47-
import static org.apache.flink.api.common.eventtime.WatermarkMatchers.watermark;
47+
4848
import org.junit.jupiter.api.AfterEach;
4949
import org.junit.jupiter.api.BeforeEach;
5050
import org.junit.jupiter.api.Test;
@@ -56,6 +56,7 @@ Licensed to the Apache Software Foundation (ASF) under one
5656
import java.util.List;
5757
import java.util.concurrent.CompletableFuture;
5858

59+
import static org.apache.flink.api.common.eventtime.WatermarkMatchers.watermark;
5960
import static org.assertj.core.api.Assertions.assertThat;
6061

6162
/** Unit test for {@link SourceOperator}. */
@@ -270,6 +271,7 @@ public void testMetricGroupTracksSplitWatermark() throws Exception {
270271
operator.getSplitMetricGroup(split.splitId()).getCurrentWatermark(),
271272
expectedWatermark);
272273
}
274+
273275
private static class DataOutputToOutput<T> implements PushingAsyncDataInput.DataOutput<T> {
274276

275277
private final Output<StreamRecord<T>> output;

0 commit comments

Comments
 (0)