24
24
import org .apache .flink .runtime .asyncprocessing .AsyncExecutionController ;
25
25
import org .apache .flink .runtime .asyncprocessing .StateRequestType ;
26
26
import org .apache .flink .runtime .jobgraph .OperatorID ;
27
- import org .apache .flink .runtime .state .CheckpointStorageLocationReference ;
28
27
import org .apache .flink .runtime .state .VoidNamespace ;
29
28
import org .apache .flink .runtime .state .VoidNamespaceSerializer ;
30
29
import org .apache .flink .runtime .state .hashmap .HashMapStateBackend ;
33
32
import org .apache .flink .streaming .api .operators .InternalTimerServiceAsyncImpl ;
34
33
import org .apache .flink .streaming .api .operators .OneInputStreamOperator ;
35
34
import org .apache .flink .streaming .api .operators .Triggerable ;
35
+ import org .apache .flink .streaming .api .operators .TwoInputStreamOperator ;
36
36
import org .apache .flink .streaming .api .watermark .Watermark ;
37
37
import org .apache .flink .streaming .runtime .io .RecordProcessorUtils ;
38
38
import org .apache .flink .streaming .runtime .operators .asyncprocessing .ElementOrder ;
39
39
import org .apache .flink .streaming .runtime .streamrecord .LatencyMarker ;
40
40
import org .apache .flink .streaming .runtime .streamrecord .StreamRecord ;
41
41
import org .apache .flink .streaming .runtime .watermarkstatus .WatermarkStatus ;
42
- import org .apache .flink .streaming .util .KeyedOneInputStreamOperatorTestHarness ;
42
+ import org .apache .flink .streaming .util .TestHarnessUtil ;
43
+ import org .apache .flink .streaming .util .asyncprocessing .AsyncKeyedOneInputStreamOperatorTestHarness ;
44
+ import org .apache .flink .streaming .util .asyncprocessing .AsyncKeyedTwoInputStreamOperatorTestHarness ;
43
45
import org .apache .flink .util .function .ThrowingConsumer ;
44
46
45
47
import org .junit .jupiter .api .Test ;
46
48
47
- import java .util .concurrent .ExecutorService ;
48
- import java .util .concurrent .Executors ;
49
+ import java .util .concurrent .CompletableFuture ;
50
+ import java .util .concurrent .ConcurrentLinkedQueue ;
49
51
import java .util .concurrent .atomic .AtomicInteger ;
50
52
51
53
import static org .apache .flink .runtime .state .StateBackendTestUtils .buildAsyncStateBackend ;
54
56
/** Basic tests for {@link AbstractAsyncStateStreamOperator}. */
55
57
public class AbstractAsyncStateStreamOperatorTest {
56
58
57
- protected KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
59
+ protected AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
58
60
createTestHarness (
59
61
int maxParalelism , int numSubtasks , int subtaskIndex , ElementOrder elementOrder )
60
62
throws Exception {
61
63
TestOperator testOperator = new TestOperator (elementOrder );
62
- KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
64
+ AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
63
65
testHarness =
64
- new KeyedOneInputStreamOperatorTestHarness <> (
66
+ AsyncKeyedOneInputStreamOperatorTestHarness . create (
65
67
testOperator ,
66
68
new TestKeySelector (),
67
69
BasicTypeInfo .INT_TYPE_INFO ,
@@ -74,7 +76,7 @@ public class AbstractAsyncStateStreamOperatorTest {
74
76
75
77
@ Test
76
78
void testCreateAsyncExecutionController () throws Exception {
77
- try (KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
79
+ try (AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
78
80
testHarness = createTestHarness (128 , 1 , 0 , ElementOrder .RECORD_ORDER )) {
79
81
testHarness .open ();
80
82
assertThat (testHarness .getOperator ())
@@ -93,51 +95,32 @@ void testCreateAsyncExecutionController() throws Exception {
93
95
94
96
@ Test
95
97
void testRecordProcessorWithFirstStateOrder () throws Exception {
96
- try (KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
98
+ try (AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
97
99
testHarness = createTestHarness (128 , 1 , 0 , ElementOrder .FIRST_STATE_ORDER )) {
98
100
testHarness .open ();
99
101
TestOperator testOperator = (TestOperator ) testHarness .getOperator ();
100
- ThrowingConsumer <StreamRecord <Tuple2 <Integer , String >>, Exception > processor =
101
- RecordProcessorUtils .getRecordProcessor (testOperator );
102
- ExecutorService anotherThread = Executors .newSingleThreadExecutor ();
103
- // Trigger the processor
104
- anotherThread .execute (
105
- () -> {
106
- try {
107
- processor .accept (new StreamRecord <>(Tuple2 .of (5 , "5" )));
108
- } catch (Exception e ) {
109
- }
110
- });
102
+ CompletableFuture <Void > future =
103
+ testHarness .processElementInternal (new StreamRecord <>(Tuple2 .of (5 , "5" )));
111
104
112
105
Thread .sleep (1000 );
113
106
assertThat (testOperator .getProcessed ()).isEqualTo (1 );
114
107
assertThat (testOperator .getCurrentProcessingContext ().getReferenceCount ()).isEqualTo (1 );
115
108
116
109
// Proceed processing
117
110
testOperator .proceed ();
118
- anotherThread .shutdown ();
119
- Thread .sleep (1000 );
111
+ future .get ();
120
112
assertThat (testOperator .getCurrentProcessingContext ().getReferenceCount ()).isEqualTo (0 );
121
113
}
122
114
}
123
115
124
116
@ Test
125
117
void testRecordProcessorWithRecordOrder () throws Exception {
126
- try (KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
118
+ try (AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
127
119
testHarness = createTestHarness (128 , 1 , 0 , ElementOrder .RECORD_ORDER )) {
128
120
testHarness .open ();
129
121
TestOperator testOperator = (TestOperator ) testHarness .getOperator ();
130
- ThrowingConsumer <StreamRecord <Tuple2 <Integer , String >>, Exception > processor =
131
- RecordProcessorUtils .getRecordProcessor (testOperator );
132
- ExecutorService anotherThread = Executors .newSingleThreadExecutor ();
133
- // Trigger the processor
134
- anotherThread .execute (
135
- () -> {
136
- try {
137
- processor .accept (new StreamRecord <>(Tuple2 .of (5 , "5" )));
138
- } catch (Exception e ) {
139
- }
140
- });
122
+ CompletableFuture <Void > future =
123
+ testHarness .processElementInternal (new StreamRecord <>(Tuple2 .of (5 , "5" )));
141
124
142
125
Thread .sleep (1000 );
143
126
assertThat (testOperator .getProcessed ()).isEqualTo (1 );
@@ -147,19 +130,19 @@ void testRecordProcessorWithRecordOrder() throws Exception {
147
130
148
131
// Proceed processing
149
132
testOperator .proceed ();
150
- anotherThread .shutdown ();
151
- Thread .sleep (1000 );
133
+ future .get ();
152
134
assertThat (testOperator .getCurrentProcessingContext ().getReferenceCount ()).isEqualTo (0 );
153
135
}
154
136
}
155
137
156
138
@ Test
157
139
void testAsyncProcessWithKey () throws Exception {
140
+
158
141
TestOperatorWithDirectAsyncProcess testOperator =
159
142
new TestOperatorWithDirectAsyncProcess (ElementOrder .RECORD_ORDER );
160
- KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
143
+ AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
161
144
testHarness =
162
- new KeyedOneInputStreamOperatorTestHarness <> (
145
+ AsyncKeyedOneInputStreamOperatorTestHarness . create (
163
146
testOperator ,
164
147
new TestKeySelector (),
165
148
BasicTypeInfo .INT_TYPE_INFO ,
@@ -169,17 +152,8 @@ void testAsyncProcessWithKey() throws Exception {
169
152
testHarness .setStateBackend (buildAsyncStateBackend (new HashMapStateBackend ()));
170
153
try {
171
154
testHarness .open ();
172
- ThrowingConsumer <StreamRecord <Tuple2 <Integer , String >>, Exception > processor =
173
- RecordProcessorUtils .getRecordProcessor (testOperator );
174
- ExecutorService anotherThread = Executors .newSingleThreadExecutor ();
175
- // Trigger the processor
176
- anotherThread .execute (
177
- () -> {
178
- try {
179
- processor .accept (new StreamRecord <>(Tuple2 .of (5 , "5" )));
180
- } catch (Exception e ) {
181
- }
182
- });
155
+ CompletableFuture <Void > future =
156
+ testHarness .processElementInternal (new StreamRecord <>(Tuple2 .of (5 , "5" )));
183
157
184
158
Thread .sleep (1000 );
185
159
assertThat (testOperator .getProcessed ()).isEqualTo (0 );
@@ -189,8 +163,7 @@ void testAsyncProcessWithKey() throws Exception {
189
163
190
164
// Proceed processing
191
165
testOperator .proceed ();
192
- anotherThread .shutdown ();
193
- Thread .sleep (1000 );
166
+ future .get ();
194
167
assertThat (testOperator .getCurrentProcessingContext ().getReferenceCount ()).isEqualTo (0 );
195
168
196
169
// We don't have the mailbox executor actually running, so the new context is blocked
@@ -203,11 +176,9 @@ void testAsyncProcessWithKey() throws Exception {
203
176
204
177
@ Test
205
178
void testCheckpointDrain () throws Exception {
206
- try (KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
179
+ try (AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
207
180
testHarness = createTestHarness (128 , 1 , 0 , ElementOrder .RECORD_ORDER )) {
208
181
testHarness .open ();
209
- CheckpointStorageLocationReference locationReference =
210
- CheckpointStorageLocationReference .getDefault ();
211
182
AsyncExecutionController asyncExecutionController =
212
183
((AbstractAsyncStateStreamOperator ) testHarness .getOperator ())
213
184
.getAsyncExecutionController ();
@@ -218,14 +189,14 @@ void testCheckpointDrain() throws Exception {
218
189
((AbstractAsyncStateStreamOperator <String >) testHarness .getOperator ())
219
190
.postProcessElement ();
220
191
assertThat (asyncExecutionController .getInFlightRecordNum ()).isEqualTo (1 );
221
- testHarness .getOperator (). prepareSnapshotPreBarrier ( 1 );
192
+ testHarness .drainStateRequests ( );
222
193
assertThat (asyncExecutionController .getInFlightRecordNum ()).isEqualTo (0 );
223
194
}
224
195
}
225
196
226
197
@ Test
227
198
void testTimerServiceIsAsync () throws Exception {
228
- try (KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
199
+ try (AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
229
200
testHarness = createTestHarness (128 , 1 , 0 , ElementOrder .RECORD_ORDER )) {
230
201
testHarness .open ();
231
202
assertThat (testHarness .getOperator ())
@@ -249,22 +220,14 @@ public void onProcessingTime(InternalTimer timer) throws Exception {}
249
220
250
221
@ Test
251
222
void testNonRecordProcess () throws Exception {
252
- try (KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
223
+ try (AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
253
224
testHarness = createTestHarness (128 , 1 , 0 , ElementOrder .RECORD_ORDER )) {
254
225
testHarness .open ();
255
226
TestOperator testOperator = (TestOperator ) testHarness .getOperator ();
256
- ThrowingConsumer <StreamRecord <Tuple2 <Integer , String >>, Exception > processor =
257
- RecordProcessorUtils .getRecordProcessor (testOperator );
258
- ExecutorService anotherThread = Executors .newSingleThreadExecutor ();
259
- anotherThread .execute (
260
- () -> {
261
- try {
262
- processor .accept (new StreamRecord <>(Tuple2 .of (5 , "5" )));
263
- testOperator .processLatencyMarker (
264
- new LatencyMarker (1234 , new OperatorID (), 0 ));
265
- } catch (Exception e ) {
266
- }
267
- });
227
+ testHarness .processElementInternal (new StreamRecord <>(Tuple2 .of (5 , "5" )));
228
+ CompletableFuture <Void > future =
229
+ testHarness .processLatencyMarkerInternal (
230
+ new LatencyMarker (1234 , new OperatorID (), 0 ));
268
231
269
232
Thread .sleep (1000 );
270
233
assertThat (testOperator .getProcessed ()).isEqualTo (1 );
@@ -274,32 +237,24 @@ void testNonRecordProcess() throws Exception {
274
237
275
238
// Proceed processing
276
239
testOperator .proceed ();
277
- anotherThread .shutdown ();
278
- Thread .sleep (1000 );
240
+ future .get ();
279
241
assertThat (testOperator .getCurrentProcessingContext ().getReferenceCount ()).isEqualTo (0 );
280
242
assertThat (testOperator .getLatencyProcessed ()).isEqualTo (1 );
281
243
}
282
244
}
283
245
284
246
@ Test
285
247
void testWatermarkStatus () throws Exception {
286
- try (KeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
248
+ try (AsyncKeyedOneInputStreamOperatorTestHarness <Integer , Tuple2 <Integer , String >, String >
287
249
testHarness = createTestHarness (128 , 1 , 0 , ElementOrder .RECORD_ORDER )) {
288
250
testHarness .open ();
289
251
TestOperator testOperator = (TestOperator ) testHarness .getOperator ();
290
252
ThrowingConsumer <StreamRecord <Tuple2 <Integer , String >>, Exception > processor =
291
253
RecordProcessorUtils .getRecordProcessor (testOperator );
292
- ExecutorService anotherThread = Executors .newSingleThreadExecutor ();
293
- anotherThread .execute (
294
- () -> {
295
- try {
296
- processor .accept (new StreamRecord <>(Tuple2 .of (5 , "5" )));
297
- testOperator .processWatermark1 (new Watermark (205L ));
298
- testOperator .processWatermark2 (new Watermark (105L ));
299
- testOperator .processWatermarkStatus (WatermarkStatus .IDLE , 1 );
300
- } catch (Exception e ) {
301
- }
302
- });
254
+ testHarness .processElementInternal (new StreamRecord <>(Tuple2 .of (5 , "5" )));
255
+ testHarness .processWatermarkInternal (new Watermark (205L ));
256
+ CompletableFuture <Void > future =
257
+ testHarness .processWatermarkStatusInternal (WatermarkStatus .IDLE );
303
258
304
259
Thread .sleep (1000 );
305
260
assertThat (testOperator .getProcessed ()).isEqualTo (1 );
@@ -310,15 +265,57 @@ void testWatermarkStatus() throws Exception {
310
265
311
266
// Proceed processing
312
267
testOperator .proceed ();
313
- anotherThread .shutdown ();
314
- Thread .sleep (1000 );
268
+ future .get ();
315
269
assertThat (testOperator .getCurrentProcessingContext ().getReferenceCount ()).isEqualTo (0 );
316
270
assertThat (testOperator .watermarkStatus .isActive ()).isFalse ();
317
271
assertThat (testHarness .getOutput ())
318
272
.containsExactly (
319
273
new StreamRecord <>("EventTimer-5-105" ),
320
- new Watermark (105L ),
321
- new Watermark (205L ));
274
+ new Watermark (205L ),
275
+ WatermarkStatus .IDLE );
276
+ }
277
+ }
278
+
279
+ @ Test
280
+ void testIdleWatermarkHandling () throws Exception {
281
+ final WatermarkTestingOperator testOperator = new WatermarkTestingOperator ();
282
+
283
+ ConcurrentLinkedQueue <Object > expectedOutput = new ConcurrentLinkedQueue <>();
284
+ KeySelector <Long , Integer > dummyKeySelector = l -> 0 ;
285
+ try (AsyncKeyedTwoInputStreamOperatorTestHarness <Integer , Long , Long , Long > testHarness =
286
+ AsyncKeyedTwoInputStreamOperatorTestHarness .create (
287
+ testOperator ,
288
+ dummyKeySelector ,
289
+ dummyKeySelector ,
290
+ BasicTypeInfo .INT_TYPE_INFO ,
291
+ 1 ,
292
+ 1 ,
293
+ 0 )) {
294
+ testHarness .setup ();
295
+ testHarness .open ();
296
+ testHarness .processElement1 (1L , 1L );
297
+ testHarness .processElement1 (3L , 3L );
298
+ testHarness .processElement1 (4L , 4L );
299
+ testHarness .processWatermark1 (new Watermark (1L ));
300
+ assertThat (testHarness .getOutput ()).isEmpty ();
301
+
302
+ testHarness .processWatermarkStatus2 (WatermarkStatus .IDLE );
303
+ expectedOutput .add (new StreamRecord <>(1L ));
304
+ expectedOutput .add (new Watermark (1L ));
305
+ TestHarnessUtil .assertOutputEquals (
306
+ "Output was not correct" , expectedOutput , testHarness .getOutput ());
307
+
308
+ testHarness .processWatermark1 (new Watermark (3L ));
309
+ expectedOutput .add (new StreamRecord <>(3L ));
310
+ expectedOutput .add (new Watermark (3L ));
311
+ TestHarnessUtil .assertOutputEquals (
312
+ "Output was not correct" , expectedOutput , testHarness .getOutput ());
313
+
314
+ testHarness .processWatermarkStatus2 (WatermarkStatus .ACTIVE );
315
+ // the other input is active now, we should not emit the watermark
316
+ testHarness .processWatermark1 (new Watermark (4L ));
317
+ TestHarnessUtil .assertOutputEquals (
318
+ "Output was not correct" , expectedOutput , testHarness .getOutput ());
322
319
}
323
320
}
324
321
@@ -420,18 +417,56 @@ private static class TestOperatorWithDirectAsyncProcess extends TestOperator {
420
417
421
418
@ Override
422
419
public void processElement (StreamRecord <Tuple2 <Integer , String >> element ) throws Exception {
420
+ System .out .println ("processElement " + Thread .currentThread ().getName ());
423
421
asyncProcessWithKey (
424
422
element .getValue ().f0 ,
425
423
() -> {
424
+ System .out .println (
425
+ "asyncProcessWithKey " + Thread .currentThread ().getName ());
426
426
processed .incrementAndGet ();
427
427
});
428
428
synchronized (objectToWait ) {
429
429
objectToWait .wait ();
430
430
}
431
+ System .out .println ("processElement " + Thread .currentThread ().getName ());
431
432
processed .incrementAndGet ();
432
433
}
433
434
}
434
435
436
+ private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOperator <Long >
437
+ implements TwoInputStreamOperator <Long , Long , Long >,
438
+ Triggerable <Integer , VoidNamespace > {
439
+
440
+ private transient InternalTimerService <VoidNamespace > timerService ;
441
+
442
+ @ Override
443
+ public void open () throws Exception {
444
+ super .open ();
445
+
446
+ this .timerService =
447
+ getInternalTimerService ("test-timers" , VoidNamespaceSerializer .INSTANCE , this );
448
+ }
449
+
450
+ @ Override
451
+ public void onEventTime (InternalTimer <Integer , VoidNamespace > timer ) throws Exception {
452
+ output .collect (new StreamRecord <>(timer .getTimestamp ()));
453
+ }
454
+
455
+ @ Override
456
+ public void onProcessingTime (InternalTimer <Integer , VoidNamespace > timer )
457
+ throws Exception {}
458
+
459
+ @ Override
460
+ public void processElement1 (StreamRecord <Long > element ) throws Exception {
461
+ timerService .registerEventTimeTimer (VoidNamespace .INSTANCE , element .getValue ());
462
+ }
463
+
464
+ @ Override
465
+ public void processElement2 (StreamRecord <Long > element ) throws Exception {
466
+ timerService .registerEventTimeTimer (VoidNamespace .INSTANCE , element .getValue ());
467
+ }
468
+ }
469
+
435
470
/** {@link KeySelector} for tests. */
436
471
public static class TestKeySelector implements KeySelector <Tuple2 <Integer , String >, Integer > {
437
472
private static final long serialVersionUID = 1L ;
0 commit comments