Skip to content

Commit 38738d9

Browse files
dmvkXComp
authored andcommitted
[FLINK-35553][runtime] Wire-up RescaleManager with CheckpointLifecycleListener in Executing state
1 parent 4eab93e commit 38738d9

File tree

16 files changed

+864
-65
lines changed

16 files changed

+864
-65
lines changed

docs/layouts/shortcodes/generated/all_jobmanager_section.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-scale-trigger</h5></td>
1313
<td style="word-wrap: break-word;">(none)</td>
1414
<td>Duration</td>
15-
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled).</td>
15+
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled).</td>
1616
</tr>
1717
<tr>
1818
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
@@ -32,6 +32,12 @@
3232
<td>Duration</td>
3333
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to a negative value to disable the resource timeout.</td>
3434
</tr>
35+
<tr>
36+
<td><h5>jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count</h5></td>
37+
<td style="word-wrap: break-word;">2</td>
38+
<td>Integer</td>
39+
<td>The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.</td>
40+
</tr>
3541
<tr>
3642
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.max</h5></td>
3743
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/expert_scheduling_section.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-scale-trigger</h5></td>
9191
<td style="word-wrap: break-word;">(none)</td>
9292
<td>Duration</td>
93-
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled).</td>
93+
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled).</td>
9494
</tr>
9595
<tr>
9696
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
@@ -110,6 +110,12 @@
110110
<td>Duration</td>
111111
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to a negative value to disable the resource timeout.</td>
112112
</tr>
113+
<tr>
114+
<td><h5>jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count</h5></td>
115+
<td style="word-wrap: break-word;">2</td>
116+
<td>Integer</td>
117+
<td>The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.</td>
118+
</tr>
113119
<tr>
114120
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.max</h5></td>
115121
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/job_manager_configuration.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-scale-trigger</h5></td>
1313
<td style="word-wrap: break-word;">(none)</td>
1414
<td>Duration</td>
15-
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled).</td>
15+
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled).</td>
1616
</tr>
1717
<tr>
1818
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
@@ -32,6 +32,12 @@
3232
<td>Duration</td>
3333
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to a negative value to disable the resource timeout.</td>
3434
</tr>
35+
<tr>
36+
<td><h5>jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count</h5></td>
37+
<td style="word-wrap: break-word;">2</td>
38+
<td>Integer</td>
39+
<td>The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.</td>
40+
</tr>
3541
<tr>
3642
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.max</h5></td>
3743
<td style="word-wrap: break-word;">(none)</td>

flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
public class JobManagerOptions {
4040

4141
public static final MemorySize MIN_JVM_HEAP_SIZE = MemorySize.ofMebiBytes(128);
42-
public static final int FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER = 3;
4342

4443
/**
4544
* The config parameter defining the network address to connect to for communication with the
@@ -574,6 +573,20 @@ public InlineElement getDescription() {
574573
code(SchedulerExecutionMode.REACTIVE.name()))
575574
.build());
576575

576+
@Documentation.Section({
577+
Documentation.Sections.EXPERT_SCHEDULING,
578+
Documentation.Sections.ALL_JOB_MANAGER
579+
})
580+
public static final ConfigOption<Integer> SCHEDULER_SCALE_ON_FAILED_CHECKPOINTS_COUNT =
581+
key("jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count")
582+
.intType()
583+
.defaultValue(2)
584+
.withDescription(
585+
Description.builder()
586+
.text(
587+
"The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.")
588+
.build());
589+
577590
@Documentation.Section({
578591
Documentation.Sections.EXPERT_SCHEDULING,
579592
Documentation.Sections.ALL_JOB_MANAGER
@@ -586,10 +599,8 @@ public InlineElement getDescription() {
586599
Description.builder()
587600
.text(
588601
"The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled "
589-
+ "and %dx of the checkpointing interval if checkpointing is enabled).",
590-
text(
591-
String.valueOf(
592-
FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER)))
602+
+ "and the checkpointing interval multiplied by the by-1-incremented parameter value of %s if checkpointing is enabled).",
603+
text(SCHEDULER_SCALE_ON_FAILED_CHECKPOINTS_COUNT.key()))
593604
.build());
594605

595606
@Documentation.Section({
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.checkpoint;
20+
21+
/** An interface that allows listening on the checkpoint lifecycle. */
22+
public interface CheckpointStatsListener {
23+
24+
/** Called when a checkpoint was completed successfully. */
25+
default void onCompletedCheckpoint() {
26+
// No-op.
27+
}
28+
29+
/** Called when a checkpoint failed. */
30+
default void onFailedCheckpoint() {
31+
// No-op.
32+
}
33+
}

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker {
7373

7474
private Optional<JobInitializationMetricsBuilder> jobInitializationMetricsBuilder =
7575
Optional.empty();
76+
@Nullable private final CheckpointStatsListener checkpointStatsListener;
7677

7778
/** Latest created snapshot. */
7879
private volatile CheckpointStatsSnapshot latestSnapshot;
@@ -95,9 +96,25 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker {
9596
*/
9697
public DefaultCheckpointStatsTracker(
9798
int numRememberedCheckpoints, JobManagerJobMetricGroup metricGroup) {
99+
this(numRememberedCheckpoints, metricGroup, null);
100+
}
101+
102+
/**
103+
* Creates a new checkpoint stats tracker.
104+
*
105+
* @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in
106+
* progress ones.
107+
* @param metricGroup Metric group for exposed metrics.
108+
* @param checkpointStatsListener Listener for monitoring checkpoint-related events.
109+
*/
110+
public DefaultCheckpointStatsTracker(
111+
int numRememberedCheckpoints,
112+
JobManagerJobMetricGroup metricGroup,
113+
@Nullable CheckpointStatsListener checkpointStatsListener) {
98114
checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
99115
this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
100116
this.metricGroup = metricGroup;
117+
this.checkpointStatsListener = checkpointStatsListener;
101118

102119
// Latest snapshot is empty
103120
latestSnapshot =
@@ -203,6 +220,10 @@ public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
203220

204221
dirty = true;
205222
logCheckpointStatistics(completed);
223+
224+
if (checkpointStatsListener != null) {
225+
checkpointStatsListener.onCompletedCheckpoint();
226+
}
206227
} finally {
207228
statsReadWriteLock.unlock();
208229
}
@@ -217,6 +238,10 @@ public void reportFailedCheckpoint(FailedCheckpointStats failed) {
217238

218239
dirty = true;
219240
logCheckpointStatistics(failed);
241+
242+
if (checkpointStatsListener != null) {
243+
checkpointStatsListener.onFailedCheckpoint();
244+
}
220245
} finally {
221246
statsReadWriteLock.unlock();
222247
}
@@ -256,6 +281,10 @@ public void reportFailedCheckpointsWithoutInProgress() {
256281
counts.incrementFailedCheckpointsWithoutInProgress();
257282

258283
dirty = true;
284+
285+
if (checkpointStatsListener != null) {
286+
checkpointStatsListener.onFailedCheckpoint();
287+
}
259288
} finally {
260289
statsReadWriteLock.unlock();
261290
}

0 commit comments

Comments
 (0)