Skip to content

Commit 7fa4f78

Browse files
authored
[FLINK-36938][Runtime] Provide hooks before and after the watermark processing (#25824)
1 parent 1523f2c commit 7fa4f78

File tree

5 files changed

+335
-16
lines changed

5 files changed

+335
-16
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public RecordContext<K> buildContext(Object record, K key, boolean inheritEpoch)
216216
key,
217217
this::disposeContext,
218218
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
219-
inheritEpoch
219+
inheritEpoch && currentContext != null
220220
? epochManager.onEpoch(currentContext.getEpoch())
221221
: epochManager.onRecord());
222222
}
@@ -225,7 +225,7 @@ public RecordContext<K> buildContext(Object record, K key, boolean inheritEpoch)
225225
key,
226226
this::disposeContext,
227227
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
228-
inheritEpoch
228+
inheritEpoch && currentContext != null
229229
? epochManager.onEpoch(currentContext.getEpoch())
230230
: epochManager.onRecord());
231231
}
@@ -456,7 +456,11 @@ public void processNonRecord(
456456
? null
457457
: () -> {
458458
try {
459+
// We clear the current context since this is a non-record context.
460+
RecordContext<K> previousContext = currentContext;
461+
currentContext = null;
459462
triggerAction.run();
463+
currentContext = previousContext;
460464
} catch (Exception e) {
461465
exceptionHandler.handleException(
462466
"Failed to process non-record.", e);
@@ -466,7 +470,10 @@ public void processNonRecord(
466470
? null
467471
: () -> {
468472
try {
473+
RecordContext<K> previousContext = currentContext;
474+
currentContext = null;
469475
finalAction.run();
476+
currentContext = previousContext;
470477
} catch (Exception e) {
471478
exceptionHandler.handleException(
472479
"Failed to process non-record.", e);

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -327,16 +327,65 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
327327
// Watermark handling
328328
// ------------------------------------------------------------------------
329329

330+
/**
331+
* A hook that will be triggered when receiving a watermark. Some async state can safely go
332+
* within this method. Return the watermark that should be normally processed.
333+
*
334+
* @param watermark the receiving watermark.
335+
* @return the watermark that should be processed. Null if there is no need for following
336+
* processing.
337+
*/
338+
public Watermark preProcessWatermark(Watermark watermark) throws Exception {
339+
return watermark;
340+
}
341+
342+
/**
343+
* A hook that will be invoked after finishing advancing the watermark. It is not recommended to
344+
* perform async state here. Only some synchronous logic is suggested.
345+
*
346+
* @param watermark the advanced watermark.
347+
*/
348+
public void postProcessWatermark(Watermark watermark) throws Exception {}
349+
350+
/**
351+
* Process a watermark when receiving it. Do not override this method since the async processing
352+
* is difficult to write. Please override the hooks, see {@link #preProcessWatermark(Watermark)}
353+
* and {@link #postProcessWatermark(Watermark)}. The basic logic of processWatermark with hooks
354+
* in sync form would be:
355+
*
356+
* <pre>
357+
* Watermark watermark = preProcessWatermark(mark);
358+
* if (watermark != null) {
359+
* super.processWatermark(watermark);
360+
* postProcessWatermark(watermark);
361+
* }
362+
* </pre>
363+
*/
330364
@Override
331-
public void processWatermark(Watermark mark) throws Exception {
365+
public final void processWatermark(Watermark mark) throws Exception {
332366
if (!isAsyncStateProcessingEnabled()) {
333367
// If async state processing is disabled, fallback to the super class.
334-
super.processWatermark(mark);
368+
Watermark watermark = preProcessWatermark(mark);
369+
if (watermark != null) {
370+
super.processWatermark(watermark);
371+
postProcessWatermark(watermark);
372+
}
335373
return;
336374
}
375+
AtomicReference<Watermark> watermarkRef = new AtomicReference<>(null);
337376
asyncExecutionController.processNonRecord(
338-
timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark),
339-
() -> output.emitWatermark(mark));
377+
() -> {
378+
watermarkRef.set(preProcessWatermark(mark));
379+
if (timeServiceManager != null && watermarkRef.get() != null) {
380+
timeServiceManager.advanceWatermark(watermarkRef.get());
381+
}
382+
},
383+
() -> {
384+
if (watermarkRef.get() != null) {
385+
output.emitWatermark(watermarkRef.get());
386+
postProcessWatermark(watermarkRef.get());
387+
}
388+
});
340389
}
341390

342391
@Override
@@ -364,15 +413,18 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index
364413
wasIdle.set(combinedWatermark.isIdle());
365414
// index is 0-based
366415
if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
367-
watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark()));
368-
if (timeServiceManager != null) {
416+
watermarkRef.set(
417+
preProcessWatermark(
418+
new Watermark(combinedWatermark.getCombinedWatermark())));
419+
if (timeServiceManager != null && watermarkRef.get() != null) {
369420
timeServiceManager.advanceWatermark(watermarkRef.get());
370421
}
371422
}
372423
},
373424
() -> {
374425
if (watermarkRef.get() != null) {
375426
output.emitWatermark(watermarkRef.get());
427+
postProcessWatermark(watermarkRef.get());
376428
}
377429
if (wasIdle.get() != combinedWatermark.isIdle()) {
378430
output.emitWatermarkStatus(watermarkStatus);

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,15 +260,66 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
260260
// ------------------------------------------------------------------------
261261
// Watermark handling
262262
// ------------------------------------------------------------------------
263+
264+
/**
265+
* A hook that will be triggered when receiving a watermark. Some async state can safely go
266+
* within this method. Return the watermark that should be normally processed.
267+
*
268+
* @param watermark the receiving watermark.
269+
* @return the watermark that should be processed. Null if there is no need for following
270+
* processing.
271+
*/
272+
public Watermark preProcessWatermark(Watermark watermark) throws Exception {
273+
return watermark;
274+
}
275+
276+
/**
277+
* A hook that will be invoked after finishing advancing the watermark. It is not recommended to
278+
* perform async state here. Only some synchronous logic is suggested.
279+
*
280+
* @param watermark the advanced watermark.
281+
*/
282+
public void postProcessWatermark(Watermark watermark) throws Exception {}
283+
284+
/**
285+
* Process a watermark when receiving it. Do not override this method since the async processing
286+
* is difficult to write. Please override the hooks, see {@link #preProcessWatermark(Watermark)}
287+
* and {@link #postProcessWatermark(Watermark)}. The basic logic of processWatermark with hooks
288+
* in sync form would be:
289+
*
290+
* <pre>
291+
* Watermark watermark = preProcessWatermark(mark);
292+
* if (watermark != null) {
293+
* super.processWatermark(watermark);
294+
* postProcessWatermark(watermark);
295+
* }
296+
* </pre>
297+
*/
263298
@Override
264-
public void processWatermark(Watermark mark) throws Exception {
299+
public final void processWatermark(Watermark mark) throws Exception {
265300
if (!isAsyncStateProcessingEnabled()) {
266-
super.processWatermark(mark);
301+
// If async state processing is disabled, fallback to the super class.
302+
Watermark watermark = preProcessWatermark(mark);
303+
if (watermark != null) {
304+
super.processWatermark(watermark);
305+
postProcessWatermark(watermark);
306+
}
267307
return;
268308
}
309+
AtomicReference<Watermark> watermarkRef = new AtomicReference<>(null);
269310
asyncExecutionController.processNonRecord(
270-
timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark),
271-
() -> output.emitWatermark(mark));
311+
() -> {
312+
watermarkRef.set(preProcessWatermark(mark));
313+
if (timeServiceManager != null && watermarkRef.get() != null) {
314+
timeServiceManager.advanceWatermark(watermarkRef.get());
315+
}
316+
},
317+
() -> {
318+
if (watermarkRef.get() != null) {
319+
output.emitWatermark(watermarkRef.get());
320+
postProcessWatermark(watermarkRef.get());
321+
}
322+
});
272323
}
273324

274325
@Override
@@ -284,8 +335,10 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId)
284335
() -> {
285336
wasIdle.set(combinedWatermark.isIdle());
286337
if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) {
287-
watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark()));
288-
if (timeServiceManager != null) {
338+
watermarkRef.set(
339+
preProcessWatermark(
340+
new Watermark(combinedWatermark.getCombinedWatermark())));
341+
if (timeServiceManager != null && watermarkRef.get() != null) {
289342
timeServiceManager.advanceWatermark(watermarkRef.get());
290343
}
291344
}

flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.flink.streaming.util.TestHarnessUtil;
4343
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
4444
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
45+
import org.apache.flink.util.function.FunctionWithException;
4546
import org.apache.flink.util.function.ThrowingConsumer;
4647

4748
import org.junit.jupiter.api.Test;
@@ -282,6 +283,68 @@ void testWatermark() throws Exception {
282283
}
283284
}
284285

286+
@Test
287+
void testWatermarkHooks() throws Exception {
288+
final WatermarkTestingOperator testOperator = new WatermarkTestingOperator();
289+
290+
AtomicInteger counter = new AtomicInteger(0);
291+
testOperator.setPreProcessFunction(
292+
(watermark) -> {
293+
testOperator.asyncProcessWithKey(
294+
1L,
295+
() -> {
296+
testOperator.output(watermark.getTimestamp() + 1000L);
297+
});
298+
if (counter.incrementAndGet() % 2 == 0) {
299+
return null;
300+
} else {
301+
return new Watermark(watermark.getTimestamp() + 1L);
302+
}
303+
});
304+
305+
testOperator.setPostProcessFunction(
306+
(watermark) -> {
307+
testOperator.output(watermark.getTimestamp() + 100L);
308+
});
309+
310+
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
311+
KeySelector<Long, Integer> dummyKeySelector = l -> 0;
312+
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Integer, Long, Long, Long> testHarness =
313+
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
314+
testOperator,
315+
dummyKeySelector,
316+
dummyKeySelector,
317+
BasicTypeInfo.INT_TYPE_INFO,
318+
1,
319+
1,
320+
0)) {
321+
testHarness.setup();
322+
testHarness.open();
323+
testHarness.processElement1(1L, 1L);
324+
testHarness.processElement1(3L, 3L);
325+
testHarness.processElement1(4L, 4L);
326+
testHarness.processWatermark1(new Watermark(2L));
327+
testHarness.processWatermark2(new Watermark(2L));
328+
expectedOutput.add(new StreamRecord<>(1002L));
329+
expectedOutput.add(new StreamRecord<>(1L));
330+
expectedOutput.add(new StreamRecord<>(3L));
331+
expectedOutput.add(new Watermark(3L));
332+
expectedOutput.add(new StreamRecord<>(103L));
333+
testHarness.processWatermark1(new Watermark(4L));
334+
testHarness.processWatermark2(new Watermark(4L));
335+
expectedOutput.add(new StreamRecord<>(1004L));
336+
testHarness.processWatermark1(new Watermark(5L));
337+
testHarness.processWatermark2(new Watermark(5L));
338+
expectedOutput.add(new StreamRecord<>(1005L));
339+
expectedOutput.add(new StreamRecord<>(4L));
340+
expectedOutput.add(new Watermark(6L));
341+
expectedOutput.add(new StreamRecord<>(106L));
342+
343+
TestHarnessUtil.assertOutputEquals(
344+
"Output was not correct", expectedOutput, testHarness.getOutput());
345+
}
346+
}
347+
285348
@Test
286349
void testWatermarkStatus() throws Exception {
287350
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
@@ -498,6 +561,24 @@ private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOp
498561

499562
private transient InternalTimerService<VoidNamespace> timerService;
500563

564+
private FunctionWithException<Watermark, Watermark, Exception> preProcessFunction;
565+
566+
private ThrowingConsumer<Watermark, Exception> postProcessFunction;
567+
568+
public void setPreProcessFunction(
569+
FunctionWithException<Watermark, Watermark, Exception> preProcessFunction) {
570+
this.preProcessFunction = preProcessFunction;
571+
}
572+
573+
public void setPostProcessFunction(
574+
ThrowingConsumer<Watermark, Exception> postProcessFunction) {
575+
this.postProcessFunction = postProcessFunction;
576+
}
577+
578+
public void output(Long o) {
579+
output.collect(new StreamRecord<>(o));
580+
}
581+
501582
@Override
502583
public void open() throws Exception {
503584
super.open();
@@ -506,6 +587,18 @@ public void open() throws Exception {
506587
getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
507588
}
508589

590+
@Override
591+
public Watermark preProcessWatermark(Watermark watermark) throws Exception {
592+
return preProcessFunction == null ? watermark : preProcessFunction.apply(watermark);
593+
}
594+
595+
@Override
596+
public void postProcessWatermark(Watermark watermark) throws Exception {
597+
if (postProcessFunction != null) {
598+
postProcessFunction.accept(watermark);
599+
}
600+
}
601+
509602
@Override
510603
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
511604
output.collect(new StreamRecord<>(timer.getTimestamp()));

0 commit comments

Comments
 (0)