37
37
import org .apache .flink .runtime .scheduler .ExecutionGraphInfo ;
38
38
import org .apache .flink .runtime .state .SharedStateRegistry ;
39
39
import org .apache .flink .runtime .state .SharedStateRegistryFactory ;
40
+ import org .apache .flink .testutils .executor .TestExecutorExtension ;
40
41
import org .apache .flink .util .FlinkException ;
41
42
import org .apache .flink .util .Preconditions ;
42
43
import org .apache .flink .util .SerializedThrowable ;
43
44
import org .apache .flink .util .concurrent .Executors ;
44
45
import org .apache .flink .util .function .ThrowingConsumer ;
45
46
46
47
import org .junit .jupiter .api .Test ;
48
+ import org .junit .jupiter .api .extension .RegisterExtension ;
47
49
48
50
import java .time .Duration ;
49
51
import java .util .Objects ;
50
52
import java .util .concurrent .CompletableFuture ;
51
53
import java .util .concurrent .ExecutionException ;
52
54
import java .util .concurrent .Executor ;
53
- import java .util .concurrent .ForkJoinPool ;
55
+ import java .util .concurrent .ExecutorService ;
54
56
import java .util .function .Function ;
55
57
56
58
import static org .apache .flink .core .testutils .FlinkAssertions .assertThatFuture ;
63
65
*/
64
66
class CheckpointResourcesCleanupRunnerTest {
65
67
68
+ @ RegisterExtension
69
+ private static final TestExecutorExtension <ExecutorService > EXECUTOR_EXTENSION =
70
+ new TestExecutorExtension <>(java .util .concurrent .Executors ::newCachedThreadPool );
71
+
66
72
private static final Duration TIMEOUT_FOR_REQUESTS = Duration .ofMillis (0 );
67
73
68
74
private static final ThrowingConsumer <CheckpointResourcesCleanupRunner , ? extends Exception >
@@ -120,7 +126,7 @@ void testSuccessfulCloseAsyncAfterStart() throws Exception {
120
126
final CheckpointResourcesCleanupRunner testInstance =
121
127
new TestInstanceBuilder ()
122
128
.withCheckpointRecoveryFactory (checkpointRecoveryFactory )
123
- .withExecutor (ForkJoinPool . commonPool ())
129
+ .withExecutor (EXECUTOR_EXTENSION . getExecutor ())
124
130
.build ();
125
131
testInstance .start ();
126
132
@@ -169,7 +175,7 @@ void testCloseAsyncAfterStartAndErrorInCompletedCheckpointStoreShutdown() throws
169
175
final CheckpointResourcesCleanupRunner testInstance =
170
176
new TestInstanceBuilder ()
171
177
.withCheckpointRecoveryFactory (checkpointRecoveryFactory )
172
- .withExecutor (ForkJoinPool . commonPool ())
178
+ .withExecutor (EXECUTOR_EXTENSION . getExecutor ())
173
179
.build ();
174
180
testInstance .start ();
175
181
@@ -214,7 +220,7 @@ void testCloseAsyncAfterStartAndErrorInCheckpointIDCounterShutdown() throws Exce
214
220
final CheckpointResourcesCleanupRunner testInstance =
215
221
new TestInstanceBuilder ()
216
222
.withCheckpointRecoveryFactory (checkpointRecoveryFactory )
217
- .withExecutor (ForkJoinPool . commonPool ())
223
+ .withExecutor (EXECUTOR_EXTENSION . getExecutor ())
218
224
.build ();
219
225
testInstance .start ();
220
226
@@ -242,7 +248,7 @@ void testCloseAsyncAfterStartAndErrorInCheckpointIDCounterShutdown() throws Exce
242
248
@ Test
243
249
void testCancellationBeforeStart () throws Exception {
244
250
final CheckpointResourcesCleanupRunner testInstance =
245
- new TestInstanceBuilder ().withExecutor (ForkJoinPool . commonPool ()).build ();
251
+ new TestInstanceBuilder ().withExecutor (EXECUTOR_EXTENSION . getExecutor ()).build ();
246
252
247
253
assertThatFuture (testInstance .cancel (TIMEOUT_FOR_REQUESTS ))
248
254
.eventuallyFailsWith (ExecutionException .class )
@@ -262,7 +268,7 @@ void testCancellationAfterStart() throws Exception {
262
268
final CheckpointResourcesCleanupRunner testInstance =
263
269
new TestInstanceBuilder ()
264
270
.withCheckpointRecoveryFactory (checkpointRecoveryFactory )
265
- .withExecutor (ForkJoinPool . commonPool ())
271
+ .withExecutor (EXECUTOR_EXTENSION . getExecutor ())
266
272
.build ();
267
273
AFTER_START .accept (testInstance );
268
274
assertThatFuture (testInstance .cancel (TIMEOUT_FOR_REQUESTS ))
@@ -278,7 +284,7 @@ void testCancellationAfterStart() throws Exception {
278
284
@ Test
279
285
void testCancellationAfterClose () throws Exception {
280
286
final CheckpointResourcesCleanupRunner testInstance =
281
- new TestInstanceBuilder ().withExecutor (ForkJoinPool . commonPool ()).build ();
287
+ new TestInstanceBuilder ().withExecutor (EXECUTOR_EXTENSION . getExecutor ()).build ();
282
288
AFTER_CLOSE .accept (testInstance );
283
289
assertThatFuture (testInstance .cancel (TIMEOUT_FOR_REQUESTS ))
284
290
.eventuallyFailsWith (ExecutionException .class )
0 commit comments