Skip to content

Commit ae150e5

Browse files
committed
[review][hotfix] Adds JavaDoc after checking that the ExecutionAttemptID is actually correct
I double-checked that the JobInitializationMetricsBuilder constructor is called for every local restart of tasks and the JobInitializationMetricsBuilder#reportInitializationMetrics is called by the deployed Subtask after restoring its state. ExecutionAttemptID is correct because the builder creation happens again when a new Execution attempt is triggered (through the local restart). The totalNumberOfSubtasks was a bit misleading here.
1 parent c2782ed commit ae150e5

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobInitializationMetricsBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.runtime.checkpoint;
2020

2121
import org.apache.flink.runtime.checkpoint.JobInitializationMetrics.SumMaxDuration;
22+
import org.apache.flink.runtime.executiongraph.Execution;
2223
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
2324

2425
import org.slf4j.Logger;
@@ -46,6 +47,17 @@ class JobInitializationMetricsBuilder {
4647
private Optional<CheckpointProperties> checkpointProperties = Optional.empty();
4748
private Optional<String> externalPath = Optional.empty();
4849

50+
/**
51+
* The {@code JobInitializationMetricsBuilder} handles the initialization metrics for deployed
52+
* {@link Execution Executions}. Building the final {@link JobInitializationMetrics} will be
53+
* only possible if all the subtasks reported their {@link SubTaskInitializationMetrics} back.
54+
*
55+
* @param toInitialize The {@link ExecutionAttemptID} of the subtasks that are about to be
56+
* deployed.
57+
* @param startTs The time the initialization was started.
58+
* @see #isComplete()
59+
* @see #reportInitializationMetrics(ExecutionAttemptID, SubTaskInitializationMetrics)
60+
*/
4961
JobInitializationMetricsBuilder(Set<ExecutionAttemptID> toInitialize, long startTs) {
5062
this.toInitialize = new HashSet<>(toInitialize);
5163
this.startTs = startTs;
@@ -119,6 +131,10 @@ private static void aggregateMetrics(
119131
}
120132
}
121133

134+
/**
135+
* Reports the {@link SubTaskInitializationMetrics} for a currently deployed subtask based on
136+
* the provided {@link ExecutionAttemptID}.
137+
*/
122138
public void reportInitializationMetrics(
123139
ExecutionAttemptID executionAttemptId,
124140
SubTaskInitializationMetrics initializationMetrics) {

0 commit comments

Comments
 (0)