Skip to content

Commit 3764e62

Browse files
committed
[FLINK-35552][runtime] Rework how CheckpointStatsTracker is constructed.
The checkpoint tracker doesn't live in the DefaultExecutionGraphFactory anymore but is moved into the AdaptiveScheduler. This will allow the scheduler to react to checkpoint-related events.
1 parent 1577ecc commit 3764e62

File tree

14 files changed

+100
-145
lines changed

14 files changed

+100
-145
lines changed

flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java

Lines changed: 0 additions & 42 deletions
This file was deleted.

flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -559,12 +559,6 @@ public CheckpointCoordinator getCheckpointCoordinator() {
559559
return checkpointCoordinator;
560560
}
561561

562-
@Nullable
563-
@Override
564-
public CheckpointStatsTracker getCheckpointStatsTracker() {
565-
return checkpointStatsTracker;
566-
}
567-
568562
@Override
569563
public KvStateLocationRegistry getKvStateLocationRegistry() {
570564
return kvStateLocationRegistry;

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import java.util.List;
6262
import java.util.concurrent.Executor;
6363
import java.util.concurrent.ScheduledExecutorService;
64-
import java.util.function.Supplier;
6564

6665
import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE;
6766
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,7 +91,7 @@ public static DefaultExecutionGraph buildGraph(
9291
long initializationTimestamp,
9392
VertexAttemptNumberStore vertexAttemptNumberStore,
9493
VertexParallelismStore vertexParallelismStore,
95-
Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory,
94+
CheckpointStatsTracker checkpointStatsTracker,
9695
boolean isDynamicGraph,
9796
ExecutionJobVertex.Factory executionJobVertexFactory,
9897
MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
@@ -342,7 +341,7 @@ public static DefaultExecutionGraph buildGraph(
342341
completedCheckpointStore,
343342
rootBackend,
344343
rootStorage,
345-
checkpointStatsTrackerFactory.get(),
344+
checkpointStatsTracker,
346345
checkpointsCleaner,
347346
jobManagerConfig.get(STATE_CHANGE_LOG_STORAGE));
348347
}

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,6 @@ void enableCheckpointing(
9696
@Nullable
9797
CheckpointCoordinator getCheckpointCoordinator();
9898

99-
@Nullable
100-
CheckpointStatsTracker getCheckpointStatsTracker();
101-
10299
KvStateLocationRegistry getKvStateLocationRegistry();
103100

104101
void setJsonPlan(String jsonPlan);

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@
2020

2121
import org.apache.flink.api.common.time.Time;
2222
import org.apache.flink.configuration.Configuration;
23-
import org.apache.flink.configuration.WebOptions;
2423
import org.apache.flink.runtime.blob.BlobWriter;
2524
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
2625
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
2726
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
2827
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
2928
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
30-
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
3129
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
3230
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
3331
import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
@@ -43,14 +41,12 @@
4341
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
4442
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
4543
import org.apache.flink.runtime.shuffle.ShuffleMaster;
46-
import org.apache.flink.util.function.CachingSupplier;
4744

4845
import org.slf4j.Logger;
4946

5047
import java.util.HashSet;
5148
import java.util.concurrent.Executor;
5249
import java.util.concurrent.ScheduledExecutorService;
53-
import java.util.function.Supplier;
5450

5551
import static org.apache.flink.util.Preconditions.checkNotNull;
5652

@@ -67,7 +63,6 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
6763
private final BlobWriter blobWriter;
6864
private final ShuffleMaster<?> shuffleMaster;
6965
private final JobMasterPartitionTracker jobMasterPartitionTracker;
70-
private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory;
7166
private final boolean isDynamicGraph;
7267
private final ExecutionJobVertex.Factory executionJobVertexFactory;
7368

@@ -124,12 +119,6 @@ public DefaultExecutionGraphFactory(
124119
this.blobWriter = blobWriter;
125120
this.shuffleMaster = shuffleMaster;
126121
this.jobMasterPartitionTracker = jobMasterPartitionTracker;
127-
this.checkpointStatsTrackerFactory =
128-
new CachingSupplier<>(
129-
() ->
130-
new DefaultCheckpointStatsTracker(
131-
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
132-
jobManagerJobMetricGroup));
133122
this.isDynamicGraph = isDynamicGraph;
134123
this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory);
135124
this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown;
@@ -141,6 +130,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
141130
CompletedCheckpointStore completedCheckpointStore,
142131
CheckpointsCleaner checkpointsCleaner,
143132
CheckpointIDCounter checkpointIdCounter,
133+
CheckpointStatsTracker checkpointStatsTracker,
144134
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
145135
long initializationTimestamp,
146136
VertexAttemptNumberStore vertexAttemptNumberStore,
@@ -180,7 +170,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
180170
initializationTimestamp,
181171
vertexAttemptNumberStore,
182172
vertexParallelismStore,
183-
checkpointStatsTrackerFactory,
173+
checkpointStatsTracker,
184174
isDynamicGraph,
185175
executionJobVertexFactory,
186176
markPartitionFinishedStrategy,

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.runtime.scheduler;
2020

2121
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
22+
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
2223
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
2324
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
2425
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
@@ -40,6 +41,8 @@ public interface ExecutionGraphFactory {
4041
* @param completedCheckpointStore completedCheckpointStore to pass to the CheckpointCoordinator
4142
* @param checkpointsCleaner checkpointsCleaner to pass to the CheckpointCoordinator
4243
* @param checkpointIdCounter checkpointIdCounter to pass to the CheckpointCoordinator
44+
* @param checkpointStatsTracker The {@link CheckpointStatsTracker} that's used for collecting
45+
* the checkpoint-related statistics.
4346
* @param partitionLocationConstraint partitionLocationConstraint for this job
4447
* @param initializationTimestamp initializationTimestamp when the ExecutionGraph was created
4548
* @param vertexAttemptNumberStore vertexAttemptNumberStore keeping information about the vertex
@@ -57,6 +60,7 @@ ExecutionGraph createAndRestoreExecutionGraph(
5760
CompletedCheckpointStore completedCheckpointStore,
5861
CheckpointsCleaner checkpointsCleaner,
5962
CheckpointIDCounter checkpointIdCounter,
63+
CheckpointStatsTracker checkpointStatsTracker,
6064
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
6165
long initializationTimestamp,
6266
VertexAttemptNumberStore vertexAttemptNumberStore,

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
3636
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
3737
import org.apache.flink.runtime.jobgraph.JobVertexID;
38+
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
3839
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
3940
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
4041
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -81,23 +82,19 @@ public void reportCheckpointMetrics(
8182
public void reportInitializationMetrics(
8283
ExecutionAttemptID executionAttemptId,
8384
SubTaskInitializationMetrics initializationMetrics) {
84-
if (executionGraph.getCheckpointStatsTracker() == null) {
85+
final CheckpointCoordinatorConfiguration checkpointConfig =
86+
executionGraph.getCheckpointCoordinatorConfiguration();
87+
if (checkpointConfig == null || !checkpointConfig.isCheckpointingEnabled()) {
8588
// TODO: Consider to support reporting initialization stats without checkpointing
86-
log.debug(
87-
"Ignoring reportInitializationMetrics if checkpoint coordinator is not present");
89+
log.debug("Ignoring reportInitializationMetrics if checkpointing is not present");
8890
return;
8991
}
90-
ioExecutor.execute(
91-
() -> {
92-
try {
93-
executionGraph
94-
.getCheckpointStatsTracker()
95-
.reportInitializationMetrics(
96-
executionAttemptId, initializationMetrics);
97-
} catch (Exception t) {
98-
log.warn("Error while reportInitializationMetrics", t);
99-
}
100-
});
92+
93+
processCheckpointCoordinatorMessage(
94+
"ReportInitializationMetrics",
95+
coordinator ->
96+
coordinator.reportInitializationMetrics(
97+
executionAttemptId, initializationMetrics));
10198
}
10299

103100
public void acknowledgeCheckpoint(

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@
4343
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
4444
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
4545
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
46+
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
4647
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
4748
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
4849
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
50+
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
4951
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
5052
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
5153
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -221,11 +223,20 @@ public SchedulerBase(
221223
this.deploymentStateTimeMetrics =
222224
new DeploymentStateTimeMetrics(jobGraph.getJobType(), jobStatusMetricsSettings);
223225

226+
final CheckpointStatsTracker checkpointStatsTracker =
227+
SchedulerUtils.createCheckpointStatsTrackerIfCheckpointingIsEnabled(
228+
jobGraph,
229+
() ->
230+
new DefaultCheckpointStatsTracker(
231+
jobMasterConfiguration.get(
232+
WebOptions.CHECKPOINTS_HISTORY_SIZE),
233+
jobManagerJobMetricGroup));
224234
this.executionGraph =
225235
createAndRestoreExecutionGraph(
226236
completedCheckpointStore,
227237
checkpointsCleaner,
228238
checkpointIdCounter,
239+
checkpointStatsTracker,
229240
initializationTimestamp,
230241
mainThreadExecutor,
231242
jobStatusListener,
@@ -372,6 +383,7 @@ private ExecutionGraph createAndRestoreExecutionGraph(
372383
CompletedCheckpointStore completedCheckpointStore,
373384
CheckpointsCleaner checkpointsCleaner,
374385
CheckpointIDCounter checkpointIdCounter,
386+
CheckpointStatsTracker checkpointStatsTracker,
375387
long initializationTimestamp,
376388
ComponentMainThreadExecutor mainThreadExecutor,
377389
JobStatusListener jobStatusListener,
@@ -384,6 +396,7 @@ private ExecutionGraph createAndRestoreExecutionGraph(
384396
completedCheckpointStore,
385397
checkpointsCleaner,
386398
checkpointIdCounter,
399+
checkpointStatsTracker,
387400
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
388401
jobGraph.getJobType()),
389402
initializationTimestamp,

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import org.apache.flink.core.execution.RestoreMode;
2525
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
2626
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
27+
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
2728
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
2829
import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore;
2930
import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter;
3031
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
32+
import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker;
3133
import org.apache.flink.runtime.client.JobExecutionException;
3234
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
3335
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -36,6 +38,7 @@
3638
import org.slf4j.Logger;
3739

3840
import java.util.concurrent.Executor;
41+
import java.util.function.Supplier;
3942

4043
/** Utils class for Flink's scheduler implementations. */
4144
public final class SchedulerUtils {
@@ -107,6 +110,15 @@ public static CheckpointIDCounter createCheckpointIDCounterIfCheckpointingIsEnab
107110
}
108111
}
109112

113+
public static CheckpointStatsTracker createCheckpointStatsTrackerIfCheckpointingIsEnabled(
114+
JobGraph jobGraph, Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory) {
115+
if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
116+
return checkpointStatsTrackerFactory.get();
117+
} else {
118+
return NoOpCheckpointStatsTracker.INSTANCE;
119+
}
120+
}
121+
110122
private static CheckpointIDCounter createCheckpointIdCounter(
111123
CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws Exception {
112124
return recoveryFactory.createCheckpointIDCounter(jobId);

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@
4343
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
4444
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
4545
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
46+
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
4647
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
4748
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
4849
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
50+
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
4951
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
5052
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
5153
import org.apache.flink.runtime.client.JobExecutionException;
@@ -328,6 +330,7 @@ public Duration getMaximumDelayForTriggeringRescale() {
328330
private final CheckpointsCleaner checkpointsCleaner;
329331
private final CompletedCheckpointStore completedCheckpointStore;
330332
private final CheckpointIDCounter checkpointIdCounter;
333+
private final CheckpointStatsTracker checkpointStatsTracker;
331334

332335
private final CompletableFuture<JobStatus> jobTerminationFuture = new CompletableFuture<>();
333336

@@ -419,6 +422,13 @@ public AdaptiveScheduler(
419422
this.checkpointIdCounter =
420423
SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
421424
jobGraph, checkpointRecoveryFactory);
425+
this.checkpointStatsTracker =
426+
SchedulerUtils.createCheckpointStatsTrackerIfCheckpointingIsEnabled(
427+
jobGraph,
428+
() ->
429+
new DefaultCheckpointStatsTracker(
430+
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
431+
jobManagerJobMetricGroup));
422432

423433
this.slotAllocator = slotAllocator;
424434

@@ -1293,6 +1303,7 @@ private ExecutionGraph createExecutionGraphAndRestoreState(
12931303
completedCheckpointStore,
12941304
checkpointsCleaner,
12951305
checkpointIdCounter,
1306+
checkpointStatsTracker,
12961307
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
12971308
initializationTimestamp,
12981309
vertexAttemptNumberStore,

0 commit comments

Comments
 (0)