Skip to content

Commit fafd86c

Browse files
JunRuiLeezhuzhurk
authored andcommitted
[FLINK-36850][runtime] Disable adaptive batch execution when batch job progress recovery is enabled.
1 parent 80c748f commit fafd86c

File tree

3 files changed

+52
-34
lines changed

3 files changed

+52
-34
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,10 @@ public static AdaptiveBatchScheduler createScheduler(
269269

270270
AdaptiveExecutionHandler adaptiveExecutionHandler =
271271
AdaptiveExecutionHandlerFactory.create(
272-
executionPlan, userCodeLoader, futureExecutor);
272+
executionPlan,
273+
jobRecoveryHandler instanceof DefaultBatchJobRecoveryHandler,
274+
userCodeLoader,
275+
futureExecutor);
273276

274277
return new AdaptiveBatchScheduler(
275278
log,

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ public class AdaptiveExecutionHandlerFactory {
3636
/**
3737
* Creates an instance of {@link AdaptiveExecutionHandler} based on the provided execution plan.
3838
*
39+
* <p>TODO: Currently, adaptive execution cannot work with batch job progress recovery, so we
40+
* always use {@link NonAdaptiveExecutionHandler} if batch job recovery is enabled. This
41+
* limitation will be removed in the future when we adapt adaptive batch execution to batch job
42+
* recovery.
43+
*
3944
* @param executionPlan The execution plan, which can be either a {@link JobGraph} or a {@link
4045
* StreamGraph}.
46+
* @param enableBatchJobRecovery Whether to enable batch job recovery.
4147
* @param userClassLoader The class loader for the user code.
4248
* @param serializationExecutor The executor used for serialization tasks.
4349
* @return An instance of {@link AdaptiveExecutionHandler}.
@@ -46,15 +52,21 @@ public class AdaptiveExecutionHandlerFactory {
4652
*/
4753
public static AdaptiveExecutionHandler create(
4854
ExecutionPlan executionPlan,
55+
boolean enableBatchJobRecovery,
4956
ClassLoader userClassLoader,
5057
Executor serializationExecutor)
5158
throws DynamicCodeLoadingException {
5259
if (executionPlan instanceof JobGraph) {
5360
return new NonAdaptiveExecutionHandler((JobGraph) executionPlan);
5461
} else {
5562
checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan.");
56-
return new DefaultAdaptiveExecutionHandler(
57-
userClassLoader, (StreamGraph) executionPlan, serializationExecutor);
63+
if (enableBatchJobRecovery) {
64+
StreamGraph streamGraph = (StreamGraph) executionPlan;
65+
return new NonAdaptiveExecutionHandler(streamGraph.getJobGraph(userClassLoader));
66+
} else {
67+
return new DefaultAdaptiveExecutionHandler(
68+
userClassLoader, (StreamGraph) executionPlan, serializationExecutor);
69+
}
5870
}
5971
}
6072
}

flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java

+34-31
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,12 @@
4141
import org.apache.flink.runtime.execution.Environment;
4242
import org.apache.flink.runtime.execution.ExecutionState;
4343
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
44+
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
4445
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
4546
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
4647
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
4748
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
48-
import org.apache.flink.runtime.jobgraph.JobGraph;
4949
import org.apache.flink.runtime.jobgraph.JobType;
50-
import org.apache.flink.runtime.jobgraph.JobVertex;
5150
import org.apache.flink.runtime.jobmaster.JobResult;
5251
import org.apache.flink.runtime.minicluster.MiniCluster;
5352
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
@@ -57,7 +56,6 @@
5756
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
5857
import org.apache.flink.streaming.api.graph.StreamConfig;
5958
import org.apache.flink.streaming.api.graph.StreamGraph;
60-
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
6159
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
6260
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
6361
import org.apache.flink.streaming.api.operators.Output;
@@ -165,12 +163,12 @@ public void setup() throws Exception {
165163

166164
@Test
167165
void testRecoverFromJMFailover() throws Exception {
168-
JobGraph jobGraph = prepareEnvAndGetJobGraph();
166+
StreamGraph streamGraph = prepareEnvAndGetStreamGraph();
169167

170168
// blocking all sink
171169
StubRecordSink.blockSubTasks(0, 1, 2, 3);
172170

173-
JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
171+
JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
174172

175173
// wait until sink is running.
176174
tryWaitUntilCondition(() -> StubRecordSink.attemptIds.size() > 0);
@@ -188,21 +186,25 @@ void testRecoverFromJMFailover() throws Exception {
188186

189187
@Test
190188
void testSourceNotAllFinished() throws Exception {
191-
JobGraph jobGraph = prepareEnvAndGetJobGraph();
189+
StreamGraph streamGraph = prepareEnvAndGetStreamGraph();
192190

193191
// blocking source 0
194192
SourceTail.blockSubTasks(0);
195193

196-
JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
194+
JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
197195

198196
// wait until source is running.
199197
tryWaitUntilCondition(() -> SourceTail.attemptIds.size() == SOURCE_PARALLELISM);
200198

201-
JobVertex source = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
202199
while (true) {
203200
AccessExecutionGraph executionGraph = flinkCluster.getExecutionGraph(jobId).get();
201+
AccessExecutionJobVertex source =
202+
executionGraph.getAllVertices().values().stream()
203+
.filter(jobVertex -> jobVertex.getName().contains("Source"))
204+
.findFirst()
205+
.get();
204206
long finishedTasks =
205-
Arrays.stream(executionGraph.getJobVertex(source.getID()).getTaskVertices())
207+
Arrays.stream(source.getTaskVertices())
206208
.filter(task -> task.getExecutionState() == ExecutionState.FINISHED)
207209
.count();
208210
if (finishedTasks == SOURCE_PARALLELISM - 1) {
@@ -228,12 +230,12 @@ void testTaskExecutorNotRegisterOnTime() throws Exception {
228230
Configuration configuration = new Configuration();
229231
configuration.set(
230232
BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, Duration.ZERO);
231-
JobGraph jobGraph = prepareEnvAndGetJobGraph(configuration);
233+
StreamGraph streamGraph = prepareEnvAndGetStreamGraph(configuration);
232234

233235
// blocking all sink
234236
StubRecordSink.blockSubTasks(0, 1, 2, 3);
235237

236-
JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
238+
JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
237239

238240
// wait until sink is running.
239241
tryWaitUntilCondition(() -> StubRecordSink.attemptIds.size() > 0);
@@ -251,12 +253,12 @@ void testTaskExecutorNotRegisterOnTime() throws Exception {
251253

252254
@Test
253255
void testPartitionNotFoundTwice() throws Exception {
254-
JobGraph jobGraph = prepareEnvAndGetJobGraph();
256+
StreamGraph streamGraph = prepareEnvAndGetStreamGraph();
255257

256258
// blocking map 0 and map 1.
257259
StubMapFunction.blockSubTasks(0, 1);
258260

259-
JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
261+
JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
260262

261263
// wait until map deploying, which indicates all source finished.
262264
tryWaitUntilCondition(() -> StubMapFunction.attemptIds.size() > 0);
@@ -286,12 +288,12 @@ void testPartitionNotFoundTwice() throws Exception {
286288

287289
@Test
288290
void testPartitionNotFoundAndOperatorCoordinatorNotSupportBatchSnapshot() throws Exception {
289-
JobGraph jobGraph = prepareEnvAndGetJobGraph(false);
291+
StreamGraph streamGraph = prepareEnvAndGetStreamGraph(false);
290292

291293
// blocking all map task
292294
StubMapFunction2.blockSubTasks(0, 1, 2, 3);
293295

294-
JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
296+
JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
295297

296298
// wait until map deploying, which indicates all source finished.
297299
tryWaitUntilCondition(() -> StubMapFunction2.attemptIds.size() > 0);
@@ -312,12 +314,12 @@ void testPartitionNotFoundAndOperatorCoordinatorNotSupportBatchSnapshot() throws
312314

313315
@Test
314316
void testPartitionNotFoundAndOperatorCoordinatorSupportBatchSnapshot() throws Exception {
315-
JobGraph jobGraph = prepareEnvAndGetJobGraph();
317+
StreamGraph streamGraph = prepareEnvAndGetStreamGraph();
316318

317319
// blocking map 0.
318320
StubMapFunction.blockSubTasks(0);
319321

320-
JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
322+
JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
321323

322324
// wait until map deploying, which indicates all source finished.
323325
tryWaitUntilCondition(() -> StubMapFunction.attemptIds.size() > 0);
@@ -336,28 +338,29 @@ void testPartitionNotFoundAndOperatorCoordinatorSupportBatchSnapshot() throws Ex
336338
checkCountResults();
337339
}
338340

339-
private JobGraph prepareEnvAndGetJobGraph() throws Exception {
341+
private StreamGraph prepareEnvAndGetStreamGraph() throws Exception {
340342
Configuration configuration = new Configuration();
341343
configuration.set(
342344
BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT,
343345
previousWorkerRecoveryTimeout);
344-
return prepareEnvAndGetJobGraph(configuration, true);
346+
return prepareEnvAndGetStreamGraph(configuration, true);
345347
}
346348

347-
private JobGraph prepareEnvAndGetJobGraph(Configuration config) throws Exception {
348-
return prepareEnvAndGetJobGraph(config, true);
349+
private StreamGraph prepareEnvAndGetStreamGraph(Configuration config) throws Exception {
350+
return prepareEnvAndGetStreamGraph(config, true);
349351
}
350352

351-
private JobGraph prepareEnvAndGetJobGraph(boolean operatorCoordinatorsSupportsBatchSnapshot)
352-
throws Exception {
353+
private StreamGraph prepareEnvAndGetStreamGraph(
354+
boolean operatorCoordinatorsSupportsBatchSnapshot) throws Exception {
353355
Configuration configuration = new Configuration();
354356
configuration.set(
355357
BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT,
356358
previousWorkerRecoveryTimeout);
357-
return prepareEnvAndGetJobGraph(configuration, operatorCoordinatorsSupportsBatchSnapshot);
359+
return prepareEnvAndGetStreamGraph(
360+
configuration, operatorCoordinatorsSupportsBatchSnapshot);
358361
}
359362

360-
private JobGraph prepareEnvAndGetJobGraph(
363+
private StreamGraph prepareEnvAndGetStreamGraph(
361364
Configuration config, boolean operatorCoordinatorsSupportsBatchSnapshot)
362365
throws Exception {
363366
flinkCluster =
@@ -371,8 +374,8 @@ private JobGraph prepareEnvAndGetJobGraph(
371374
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
372375

373376
return operatorCoordinatorsSupportsBatchSnapshot
374-
? createJobGraph(env, methodName)
375-
: createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(env, methodName);
377+
? createStreamGraph(env, methodName)
378+
: createStreamGraphWithUnsupportedBatchSnapshotOperatorCoordinator(env, methodName);
376379
}
377380

378381
private TestingMiniClusterConfiguration getMiniClusterConfiguration(Configuration config)
@@ -446,7 +449,7 @@ private void releaseResultPartitionOfSource() {
446449
new File(flinkConfiguration.get(CoreOptions.TMP_DIRS)));
447450
}
448451

449-
private JobGraph createJobGraph(StreamExecutionEnvironment env, String jobName) {
452+
private StreamGraph createStreamGraph(StreamExecutionEnvironment env, String jobName) {
450453
TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo =
451454
new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
452455

@@ -468,10 +471,10 @@ private JobGraph createJobGraph(StreamExecutionEnvironment env, String jobName)
468471
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
469472
streamGraph.setJobType(JobType.BATCH);
470473
streamGraph.setJobName(jobName);
471-
return StreamingJobGraphGenerator.createJobGraph(streamGraph);
474+
return streamGraph;
472475
}
473476

474-
private JobGraph createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(
477+
private StreamGraph createStreamGraphWithUnsupportedBatchSnapshotOperatorCoordinator(
475478
StreamExecutionEnvironment env, String jobName) throws Exception {
476479

477480
TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo =
@@ -500,7 +503,7 @@ private JobGraph createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(
500503
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
501504
streamGraph.setJobType(JobType.BATCH);
502505
streamGraph.setJobName(jobName);
503-
return StreamingJobGraphGenerator.createJobGraph(streamGraph);
506+
return streamGraph;
504507
}
505508

506509
private static void setSubtaskBlocked(

0 commit comments

Comments
 (0)