Skip to content

Commit 9dadb61

Browse files
author
Yuepeng Pan
committed
[FLINK-33653][runtime] Introduce a benchmark for balanced tasks scheduling.
1 parent 43d0184 commit 9dadb61

File tree

4 files changed

+457
-0
lines changed

4 files changed

+457
-0
lines changed
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.scheduler.benchmark.slot.matching;
20+
21+
import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
22+
import org.apache.flink.runtime.clusterframework.types.AllocationID;
23+
import org.apache.flink.runtime.clusterframework.types.ResourceID;
24+
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
25+
import org.apache.flink.runtime.jobgraph.JobVertexID;
26+
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
27+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
28+
import org.apache.flink.runtime.scheduler.adaptive.allocator.RequestSlotMatchingStrategy;
29+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SimpleRequestSlotMatchingStrategy;
30+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
31+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotsBalancedRequestSlotMatchingStrategy;
32+
import org.apache.flink.runtime.scheduler.adaptive.allocator.TasksBalancedRequestSlotMatchingStrategy;
33+
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
34+
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
35+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
36+
import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
37+
38+
import org.openjdk.jmh.annotations.Benchmark;
39+
import org.openjdk.jmh.annotations.BenchmarkMode;
40+
import org.openjdk.jmh.annotations.Level;
41+
import org.openjdk.jmh.annotations.Mode;
42+
import org.openjdk.jmh.annotations.Param;
43+
import org.openjdk.jmh.annotations.Setup;
44+
import org.openjdk.jmh.infra.Blackhole;
45+
import org.openjdk.jmh.runner.RunnerException;
46+
47+
import java.net.InetAddress;
48+
import java.net.UnknownHostException;
49+
import java.util.ArrayList;
50+
import java.util.Collection;
51+
import java.util.HashSet;
52+
import java.util.List;
53+
import java.util.Set;
54+
55+
/** The executor to drive {@link RequestSlotMatchingStrategy} */
56+
public class AdaptiveSchedulerSlotMatchingBenchmarkExecutor extends SchedulerBenchmarkExecutorBase {
57+
58+
/**
59+
* We set the number of slots is very smaller than the number of task managers
60+
* to simulate the production environment to the greatest extent possible.
61+
*/
62+
public static final int SLOTS_PER_TASKS_MANAGER = 8;
63+
public static final int TASK_MANAGERS = 128;
64+
65+
private static final int requestedSlotSharingGroups = 3;
66+
private static final List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();
67+
private static final Collection<ExecutionSlotSharingGroup> requestGroups = new ArrayList<>();
68+
private static final Collection<PhysicalSlot> slots = new ArrayList<>();
69+
70+
static {
71+
// For ResourceProfile.UNKNOWN.
72+
slotSharingGroups.add(new SlotSharingGroup());
73+
// For other resource profiles.
74+
for (int i = 1; i < requestedSlotSharingGroups; i++) {
75+
SlotSharingGroup sharingGroup = new SlotSharingGroup();
76+
sharingGroup.setResourceProfile(newGrainfinedResourceProfile(i));
77+
slotSharingGroups.add(sharingGroup);
78+
}
79+
// For requested groups and slots.
80+
for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) {
81+
82+
TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1);
83+
84+
for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; slotIndex++) {
85+
ResourceProfile profile = newGrainfinedResourceProfile(slotIndex);
86+
87+
slots.add(new TestingSlot(new AllocationID(), profile, tml));
88+
requestGroups.add(getExecutionSlotSharingGroup(slotIndex + 1, slotIndex));
89+
}
90+
}
91+
}
92+
93+
private static ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
94+
int loading, int slotIndex) {
95+
Set<ExecutionVertexID> executionVertexIDSet = new HashSet<>();
96+
JobVertexID jobVertexID = new JobVertexID();
97+
for (int i = 0; i < loading; i++) {
98+
executionVertexIDSet.add(new ExecutionVertexID(jobVertexID, i));
99+
}
100+
return new ExecutionSlotSharingGroup(
101+
slotSharingGroups.get(slotIndex % 3), executionVertexIDSet);
102+
}
103+
104+
public static TaskManagerLocation getTaskManagerLocation(int dataPort) {
105+
try {
106+
InetAddress inetAddress = InetAddress.getByName("1.2.3.4");
107+
return new TaskManagerLocation(ResourceID.generate(), inetAddress, dataPort);
108+
} catch (UnknownHostException e) {
109+
throw new RuntimeException(e);
110+
}
111+
}
112+
113+
public static ResourceProfile newGrainfinedResourceProfile(int slotIndex) {
114+
return ResourceProfile.newBuilder()
115+
.setCpuCores(slotIndex % 2 == 0 ? 1 : 2)
116+
.setTaskHeapMemoryMB(100)
117+
.setTaskOffHeapMemoryMB(100)
118+
.setManagedMemoryMB(100)
119+
.build();
120+
}
121+
122+
@Param({"NONE", "SLOTS", "TASKS"})
123+
private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
124+
125+
private RequestSlotMatchingStrategy requestSlotMatchingStrategy;
126+
127+
public static void main(String[] args) throws RunnerException {
128+
runBenchmark(AdaptiveSchedulerSlotMatchingBenchmarkExecutor.class);
129+
}
130+
131+
@Setup(Level.Trial)
132+
public void setup() throws Exception {
133+
requestSlotMatchingStrategy = getRequestSlotMatchingStrategy();
134+
}
135+
136+
@Benchmark
137+
@BenchmarkMode(Mode.SingleShotTime)
138+
public void runSlotsMatching(Blackhole blackhole) {
139+
blackhole.consume(requestSlotMatchingStrategy.matchRequestsWithSlots(requestGroups, slots));
140+
}
141+
142+
private RequestSlotMatchingStrategy getRequestSlotMatchingStrategy() {
143+
switch (taskManagerLoadBalanceMode) {
144+
case NONE:
145+
this.requestSlotMatchingStrategy = SimpleRequestSlotMatchingStrategy.INSTANCE;
146+
break;
147+
case SLOTS:
148+
this.requestSlotMatchingStrategy =
149+
SlotsBalancedRequestSlotMatchingStrategy.INSTANCE;
150+
break;
151+
case TASKS:
152+
this.requestSlotMatchingStrategy =
153+
TasksBalancedRequestSlotMatchingStrategy.INSTANCE;
154+
break;
155+
default:
156+
throw new UnsupportedOperationException(
157+
String.format(
158+
"Unsupported task manager load balance mode '%s' in %s",
159+
taskManagerLoadBalanceMode, getClass().getName()));
160+
}
161+
return requestSlotMatchingStrategy;
162+
}
163+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.scheduler.benchmark.slot.matching;
20+
21+
import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
22+
import org.apache.flink.runtime.clusterframework.types.AllocationID;
23+
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
24+
import org.apache.flink.runtime.jobmaster.SlotRequestId;
25+
import org.apache.flink.runtime.jobmaster.slotpool.PendingRequest;
26+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
27+
import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
28+
import org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy;
29+
import org.apache.flink.runtime.jobmaster.slotpool.TasksBalancedRequestSlotMatchingStrategy;
30+
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
31+
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
32+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
33+
import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
34+
35+
import org.openjdk.jmh.annotations.Benchmark;
36+
import org.openjdk.jmh.annotations.BenchmarkMode;
37+
import org.openjdk.jmh.annotations.Level;
38+
import org.openjdk.jmh.annotations.Mode;
39+
import org.openjdk.jmh.annotations.Param;
40+
import org.openjdk.jmh.annotations.Setup;
41+
import org.openjdk.jmh.infra.Blackhole;
42+
import org.openjdk.jmh.runner.RunnerException;
43+
44+
import java.util.ArrayList;
45+
import java.util.Collection;
46+
import java.util.Collections;
47+
import java.util.HashMap;
48+
49+
import static org.apache.flink.scheduler.benchmark.slot.matching.AdaptiveSchedulerSlotMatchingBenchmarkExecutor.SLOTS_PER_TASKS_MANAGER;
50+
import static org.apache.flink.scheduler.benchmark.slot.matching.AdaptiveSchedulerSlotMatchingBenchmarkExecutor.TASK_MANAGERS;
51+
import static org.apache.flink.scheduler.benchmark.slot.matching.AdaptiveSchedulerSlotMatchingBenchmarkExecutor.getTaskManagerLocation;
52+
import static org.apache.flink.scheduler.benchmark.slot.matching.AdaptiveSchedulerSlotMatchingBenchmarkExecutor.newGrainfinedResourceProfile;
53+
54+
/** The executor to drive {@link RequestSlotMatchingStrategy}. */
55+
public class DefaultSchedulerSlotMatchingBenchmarkExecutor extends SchedulerBenchmarkExecutorBase {
56+
57+
private static final Collection<PhysicalSlot> slots = new ArrayList<>();
58+
private static final Collection<PendingRequest> slotRequests = new ArrayList<>();
59+
60+
static {
61+
// For requested groups and slots.
62+
for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) {
63+
64+
TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1);
65+
66+
for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; slotIndex++) {
67+
ResourceProfile profile = newGrainfinedResourceProfile(slotIndex);
68+
69+
slots.add(new TestingSlot(new AllocationID(), profile, tml));
70+
slotRequests.add(getPendingRequest(slotIndex + 1, slotIndex));
71+
}
72+
}
73+
}
74+
75+
private static PendingRequest getPendingRequest(float loading, int slotIndex) {
76+
return PendingRequest.createNormalRequest(
77+
new SlotRequestId(),
78+
newGrainfinedResourceProfile(slotIndex),
79+
new DefaultLoadingWeight(loading),
80+
Collections.emptyList());
81+
}
82+
83+
@Param({"NONE", "TASKS"})
84+
private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
85+
86+
private RequestSlotMatchingStrategy requestSlotMatchingStrategy;
87+
88+
public static void main(String[] args) throws RunnerException {
89+
runBenchmark(DefaultSchedulerSlotMatchingBenchmarkExecutor.class);
90+
}
91+
92+
@Setup(Level.Trial)
93+
public void setup() throws Exception {
94+
requestSlotMatchingStrategy = getRequestSlotMatchingStrategy();
95+
}
96+
97+
@Benchmark
98+
@BenchmarkMode(Mode.SingleShotTime)
99+
public void runSlotsMatching(Blackhole blackhole) {
100+
blackhole.consume(
101+
requestSlotMatchingStrategy.matchRequestsAndSlots(
102+
slots, slotRequests, new HashMap<>()));
103+
}
104+
105+
private RequestSlotMatchingStrategy getRequestSlotMatchingStrategy() {
106+
switch (taskManagerLoadBalanceMode) {
107+
case TASKS:
108+
this.requestSlotMatchingStrategy =
109+
TasksBalancedRequestSlotMatchingStrategy.INSTANCE;
110+
break;
111+
case NONE:
112+
this.requestSlotMatchingStrategy = SimpleRequestSlotMatchingStrategy.INSTANCE;
113+
break;
114+
default:
115+
throw new UnsupportedOperationException(
116+
String.format(
117+
"Unsupported task manager load balance mode '%s' in %s",
118+
taskManagerLoadBalanceMode, getClass().getName()));
119+
}
120+
return requestSlotMatchingStrategy;
121+
}
122+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package org.apache.flink.scheduler.benchmark.slot.sharing;
2+
3+
import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
4+
import org.apache.flink.runtime.JobException;
5+
import org.apache.flink.runtime.client.JobExecutionException;
6+
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
7+
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
8+
import org.apache.flink.runtime.jobgraph.JobGraph;
9+
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
10+
import org.apache.flink.runtime.jobgraph.JobVertex;
11+
import org.apache.flink.runtime.scheduler.LocalInputPreferredSlotSharingStrategy;
12+
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
13+
import org.apache.flink.runtime.scheduler.TaskBalancedPreferredSlotSharingStrategy;
14+
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
15+
16+
import java.util.Collection;
17+
18+
/** The benchmark of initializing {@link SlotSharingStrategy}. */
19+
public class InitSlotSharingStrategyBenchmark {
20+
21+
private final JobGraph jobGraph;
22+
private final ExecutionGraph executionGraph;
23+
private final TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
24+
25+
public InitSlotSharingStrategyBenchmark(
26+
TaskManagerLoadBalanceMode taskManagerLoadBalanceMode, Collection<JobVertex> vertices) {
27+
this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode;
28+
this.jobGraph =
29+
JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(vertices).build();
30+
try {
31+
this.executionGraph =
32+
TestingDefaultExecutionGraphBuilder.newBuilder()
33+
.setJobGraph(jobGraph)
34+
.build(new DirectScheduledExecutorService());
35+
} catch (JobException | JobExecutionException e) {
36+
throw new RuntimeException(e);
37+
}
38+
}
39+
40+
public SlotSharingStrategy createSlotSharingStrategy() {
41+
switch (taskManagerLoadBalanceMode) {
42+
case NONE:
43+
return new LocalInputPreferredSlotSharingStrategy.Factory()
44+
.create(
45+
executionGraph.getSchedulingTopology(),
46+
jobGraph.getSlotSharingGroups(),
47+
jobGraph.getCoLocationGroups());
48+
case TASKS:
49+
return new TaskBalancedPreferredSlotSharingStrategy.Factory()
50+
.create(
51+
executionGraph.getSchedulingTopology(),
52+
jobGraph.getSlotSharingGroups(),
53+
jobGraph.getCoLocationGroups());
54+
default:
55+
throw new UnsupportedOperationException(
56+
String.format(
57+
"Unsupported task manager load balance mode '%s' in %s",
58+
taskManagerLoadBalanceMode, getClass().getName()));
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)