Skip to content

Commit d536571

Browse files
committed
[FLINK-35550][runtime] Adds new component RescaleManager that handles the rescaling logic to improve code testability and extensibility
Rescaling is a state-specific functionality. Moving all the logic into Executing state allows us to align the resource controlling in Executing state and WaitingForResources state in a future effort.
1 parent 0305111 commit d536571

File tree

10 files changed

+983
-351
lines changed

10 files changed

+983
-351
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
5858
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
5959
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
60-
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
6160
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
6261
import org.apache.flink.runtime.executiongraph.JobStatusListener;
6362
import org.apache.flink.runtime.executiongraph.MutableVertexAttemptNumberStore;
@@ -110,9 +109,6 @@
110109
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
111110
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
112111
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
113-
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceMinimalIncreaseRescalingController;
114-
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceParallelismChangeRescalingController;
115-
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.RescalingController;
116112
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
117113
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
118114
import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics;
@@ -147,8 +143,8 @@
147143
import java.util.concurrent.ScheduledFuture;
148144
import java.util.concurrent.TimeUnit;
149145
import java.util.function.Function;
150-
import java.util.stream.Collectors;
151146

147+
import static org.apache.flink.configuration.JobManagerOptions.MIN_PARALLELISM_INCREASE;
152148
import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
153149

154150
/**
@@ -228,7 +224,8 @@ public static Settings of(Configuration configuration) {
228224
.orElse(stabilizationTimeoutDefault),
229225
configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT),
230226
scalingIntervalMin,
231-
scalingIntervalMax);
227+
scalingIntervalMax,
228+
configuration.get(MIN_PARALLELISM_INCREASE));
232229
}
233230

234231
private final SchedulerExecutionMode executionMode;
@@ -237,20 +234,23 @@ public static Settings of(Configuration configuration) {
237234
private final Duration slotIdleTimeout;
238235
private final Duration scalingIntervalMin;
239236
private final Duration scalingIntervalMax;
237+
private final int minParallelismChangeForDesiredRescale;
240238

241239
private Settings(
242240
SchedulerExecutionMode executionMode,
243241
Duration initialResourceAllocationTimeout,
244242
Duration resourceStabilizationTimeout,
245243
Duration slotIdleTimeout,
246244
Duration scalingIntervalMin,
247-
Duration scalingIntervalMax) {
245+
Duration scalingIntervalMax,
246+
int minParallelismChangeForDesiredRescale) {
248247
this.executionMode = executionMode;
249248
this.initialResourceAllocationTimeout = initialResourceAllocationTimeout;
250249
this.resourceStabilizationTimeout = resourceStabilizationTimeout;
251250
this.slotIdleTimeout = slotIdleTimeout;
252251
this.scalingIntervalMin = scalingIntervalMin;
253252
this.scalingIntervalMax = scalingIntervalMax;
253+
this.minParallelismChangeForDesiredRescale = minParallelismChangeForDesiredRescale;
254254
}
255255

256256
public SchedulerExecutionMode getExecutionMode() {
@@ -276,6 +276,10 @@ public Duration getScalingIntervalMin() {
276276
public Duration getScalingIntervalMax() {
277277
return scalingIntervalMax;
278278
}
279+
280+
public int getMinParallelismChangeForDesiredRescale() {
281+
return minParallelismChangeForDesiredRescale;
282+
}
279283
}
280284

281285
private final Settings settings;
@@ -308,10 +312,6 @@ public Duration getScalingIntervalMax() {
308312

309313
private final SlotAllocator slotAllocator;
310314

311-
private final RescalingController rescalingController;
312-
313-
private final RescalingController forceRescalingController;
314-
315315
private final ExecutionGraphFactory executionGraphFactory;
316316

317317
private State state = new Created(this, LOG);
@@ -395,10 +395,6 @@ public AdaptiveScheduler(
395395

396396
this.componentMainThreadExecutor = mainThreadExecutor;
397397

398-
this.rescalingController = new EnforceMinimalIncreaseRescalingController(configuration);
399-
400-
this.forceRescalingController = new EnforceParallelismChangeRescalingController();
401-
402398
this.executionGraphFactory = executionGraphFactory;
403399

404400
final JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp);
@@ -1048,8 +1044,8 @@ public void goToExecuting(
10481044
this,
10491045
userCodeClassLoader,
10501046
failureCollection,
1051-
settings.getScalingIntervalMin(),
1052-
settings.getScalingIntervalMax()));
1047+
DefaultRescaleManager.Factory.fromSettings(settings),
1048+
settings.getMinParallelismChangeForDesiredRescale()));
10531049
}
10541050

10551051
@Override
@@ -1275,34 +1271,10 @@ private ExecutionGraph createExecutionGraphAndRestoreState(
12751271
LOG);
12761272
}
12771273

1278-
/**
1279-
* In regular mode, rescale the job if added resource meets {@link
1280-
* JobManagerOptions#MIN_PARALLELISM_INCREASE}. In force mode rescale if the parallelism has
1281-
* changed.
1282-
*/
12831274
@Override
1284-
public boolean shouldRescale(ExecutionGraph executionGraph, boolean forceRescale) {
1285-
final Optional<VertexParallelism> maybeNewParallelism =
1286-
slotAllocator.determineParallelism(
1287-
jobInformation, declarativeSlotPool.getAllSlotsInformation());
1288-
return maybeNewParallelism
1289-
.filter(
1290-
vertexParallelism -> {
1291-
RescalingController rescalingControllerToUse =
1292-
forceRescale ? forceRescalingController : rescalingController;
1293-
return rescalingControllerToUse.shouldRescale(
1294-
getCurrentParallelism(executionGraph), vertexParallelism);
1295-
})
1296-
.isPresent();
1297-
}
1298-
1299-
private static VertexParallelism getCurrentParallelism(ExecutionGraph executionGraph) {
1300-
return new VertexParallelism(
1301-
executionGraph.getAllVertices().values().stream()
1302-
.collect(
1303-
Collectors.toMap(
1304-
ExecutionJobVertex::getJobVertexId,
1305-
ExecutionJobVertex::getParallelism)));
1275+
public Optional<VertexParallelism> getAvailableVertexParallelism() {
1276+
return slotAllocator.determineParallelism(
1277+
jobInformation, declarativeSlotPool.getAllSlotsInformation());
13061278
}
13071279

13081280
@Override
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.scheduler.adaptive;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.util.Preconditions;
23+
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import javax.annotation.Nullable;
28+
29+
import java.time.Duration;
30+
import java.time.Instant;
31+
import java.time.temporal.Temporal;
32+
import java.util.function.Supplier;
33+
34+
/**
35+
* {@code DefaultRescaleManager} manages triggering the next rescaling based on when the previous
36+
* rescale operation happened and the available resources. It handles the event based on the
37+
* following phases (in that order):
38+
*
39+
* <ol>
40+
* <li>Cooldown phase: No rescaling takes place (its upper threshold is defined by {@code
41+
* scalingIntervalMin}.
42+
* <li>Soft-rescaling phase: Rescaling is triggered if the desired amount of resources is
43+
* available.
44+
* <li>Hard-rescaling phase: Rescaling is triggered if a sufficient amount of resources is
45+
* available (its lower threshold is defined by (@code scalingIntervalMax}).
46+
* </ol>
47+
*
48+
* @see Executing
49+
*/
50+
public class DefaultRescaleManager implements RescaleManager {
51+
52+
private static final Logger LOG = LoggerFactory.getLogger(DefaultRescaleManager.class);
53+
54+
private final Temporal initializationTime;
55+
private final Supplier<Temporal> clock;
56+
57+
@VisibleForTesting final Duration scalingIntervalMin;
58+
@VisibleForTesting @Nullable final Duration scalingIntervalMax;
59+
60+
private final RescaleManager.Context rescaleContext;
61+
62+
private boolean rescaleScheduled = false;
63+
64+
DefaultRescaleManager(
65+
Temporal initializationTime,
66+
RescaleManager.Context rescaleContext,
67+
Duration scalingIntervalMin,
68+
@Nullable Duration scalingIntervalMax) {
69+
this(
70+
initializationTime,
71+
Instant::now,
72+
rescaleContext,
73+
scalingIntervalMin,
74+
scalingIntervalMax);
75+
}
76+
77+
@VisibleForTesting
78+
DefaultRescaleManager(
79+
Temporal initializationTime,
80+
Supplier<Temporal> clock,
81+
RescaleManager.Context rescaleContext,
82+
Duration scalingIntervalMin,
83+
@Nullable Duration scalingIntervalMax) {
84+
this.initializationTime = initializationTime;
85+
this.clock = clock;
86+
87+
Preconditions.checkArgument(
88+
scalingIntervalMax == null || scalingIntervalMin.compareTo(scalingIntervalMax) <= 0,
89+
"scalingIntervalMax should at least match or be longer than scalingIntervalMin.");
90+
this.scalingIntervalMin = scalingIntervalMin;
91+
this.scalingIntervalMax = scalingIntervalMax;
92+
93+
this.rescaleContext = rescaleContext;
94+
}
95+
96+
@Override
97+
public void onChange() {
98+
if (timeSinceLastRescale().compareTo(scalingIntervalMin) > 0) {
99+
maybeRescale();
100+
} else if (!rescaleScheduled) {
101+
rescaleScheduled = true;
102+
rescaleContext.scheduleOperation(this::maybeRescale, scalingIntervalMin);
103+
}
104+
}
105+
106+
private Duration timeSinceLastRescale() {
107+
return Duration.between(this.initializationTime, clock.get());
108+
}
109+
110+
private void maybeRescale() {
111+
rescaleScheduled = false;
112+
if (rescaleContext.hasDesiredResources()) {
113+
LOG.info("Desired parallelism for job was reached: Rescaling will be triggered.");
114+
rescaleContext.rescale();
115+
} else if (scalingIntervalMax != null) {
116+
LOG.info(
117+
"The longer the pipeline runs, the more the (small) resource gain is worth the restarting time. "
118+
+ "Last resource added does not meet the configured minimal parallelism change. Forced rescaling will be triggered after {} if the resource is still there.",
119+
scalingIntervalMax);
120+
121+
// reasoning for inconsistent scheduling:
122+
// https://lists.apache.org/thread/m2w2xzfjpxlw63j0k7tfxfgs0rshhwwr
123+
if (timeSinceLastRescale().compareTo(scalingIntervalMax) > 0) {
124+
rescaleWithSufficientResources();
125+
} else {
126+
rescaleContext.scheduleOperation(
127+
this::rescaleWithSufficientResources, scalingIntervalMax);
128+
}
129+
}
130+
}
131+
132+
private void rescaleWithSufficientResources() {
133+
if (rescaleContext.hasSufficientResources()) {
134+
LOG.info(
135+
"Resources for desired job parallelism couldn't be collected after {}: Rescaling will be enforced.",
136+
scalingIntervalMax);
137+
rescaleContext.rescale();
138+
}
139+
}
140+
141+
public static class Factory implements RescaleManager.Factory {
142+
143+
private final Duration scalingIntervalMin;
144+
@Nullable private final Duration scalingIntervalMax;
145+
146+
/**
147+
* Creates a {@code Factory} instance based on the {@link AdaptiveScheduler}'s {@code
148+
* Settings} for rescaling.
149+
*/
150+
public static Factory fromSettings(AdaptiveScheduler.Settings settings) {
151+
// it's not ideal that we use a AdaptiveScheduler internal class here. We might want to
152+
// change that as part of a more general alignment of the rescaling configuration.
153+
return new Factory(settings.getScalingIntervalMin(), settings.getScalingIntervalMax());
154+
}
155+
156+
private Factory(Duration scalingIntervalMin, @Nullable Duration scalingIntervalMax) {
157+
this.scalingIntervalMin = scalingIntervalMin;
158+
this.scalingIntervalMax = scalingIntervalMax;
159+
}
160+
161+
@Override
162+
public DefaultRescaleManager create(Context rescaleContext, Instant lastRescale) {
163+
return new DefaultRescaleManager(
164+
lastRescale, rescaleContext, scalingIntervalMin, scalingIntervalMax);
165+
}
166+
}
167+
}

0 commit comments

Comments
 (0)