diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/resolver/SlotMatchingResolverBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/resolver/SlotMatchingResolverBenchmarkExecutor.java new file mode 100644 index 00000000..a8c3aa99 --- /dev/null +++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/resolver/SlotMatchingResolverBenchmarkExecutor.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver; + +import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SimpleSlotMatchingResolver; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotMatchingResolver; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotsBalancedSlotMatchingResolver; +import org.apache.flink.runtime.scheduler.adaptive.allocator.TasksBalancedSlotMatchingResolver; +import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.RunnerException; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** The executor to drive {@link SlotMatchingResolver}. */ +public class SlotMatchingResolverBenchmarkExecutor extends SchedulerBenchmarkExecutorBase { + + /** + * We set the number of slots is very smaller than the number of task managers to simulate the + * production environment to the greatest extent possible. + */ + public static final int SLOTS_PER_TASKS_MANAGER = 8; + + public static final int TASK_MANAGERS = 128; + + private static final int requestedSlotSharingGroups = 3; + private static final List slotSharingGroups = new ArrayList<>(); + private static final Collection requestGroups = new ArrayList<>(); + private static final Collection slots = new ArrayList<>(); + + static { + // For ResourceProfile.UNKNOWN. + slotSharingGroups.add(new SlotSharingGroup()); + // For other resource profiles. + for (int i = 1; i < requestedSlotSharingGroups; i++) { + SlotSharingGroup sharingGroup = new SlotSharingGroup(); + sharingGroup.setResourceProfile(newGrainfinedResourceProfile(i)); + slotSharingGroups.add(sharingGroup); + } + // For requested groups and slots. + for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) { + + TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1); + + for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; slotIndex++) { + ResourceProfile profile = newGrainfinedResourceProfile(slotIndex); + + slots.add(new TestingSlot(new AllocationID(), profile, tml)); + requestGroups.add(getExecutionSlotSharingGroup(slotIndex + 1, slotIndex)); + } + } + } + + private static ExecutionSlotSharingGroup getExecutionSlotSharingGroup( + int loading, int slotIndex) { + Set executionVertexIDSet = new HashSet<>(); + JobVertexID jobVertexID = new JobVertexID(); + for (int i = 0; i < loading; i++) { + executionVertexIDSet.add(new ExecutionVertexID(jobVertexID, i)); + } + return new ExecutionSlotSharingGroup( + slotSharingGroups.get(slotIndex % 3), executionVertexIDSet); + } + + public static TaskManagerLocation getTaskManagerLocation(int dataPort) { + try { + InetAddress inetAddress = InetAddress.getByName("1.2.3.4"); + return new TaskManagerLocation(ResourceID.generate(), inetAddress, dataPort); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + public static ResourceProfile newGrainfinedResourceProfile(int slotIndex) { + return ResourceProfile.newBuilder() + .setCpuCores(slotIndex % 2 == 0 ? 1 : 2) + .setTaskHeapMemoryMB(100) + .setTaskOffHeapMemoryMB(100) + .setManagedMemoryMB(100) + .build(); + } + + @Param({"NONE", "SLOTS", "TASKS"}) + private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode; + + private SlotMatchingResolver slotMatchingResolver; + + public static void main(String[] args) throws RunnerException { + runBenchmark(SlotMatchingResolverBenchmarkExecutor.class); + } + + @Setup(Level.Trial) + public void setup() throws Exception { + slotMatchingResolver = getSlotMatchingResolver(); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void runSlotsMatching(Blackhole blackhole) { + blackhole.consume( + slotMatchingResolver.matchSlotSharingGroupWithSlots(requestGroups, slots)); + } + + private SlotMatchingResolver getSlotMatchingResolver() { + switch (taskManagerLoadBalanceMode) { + case NONE: + this.slotMatchingResolver = SimpleSlotMatchingResolver.INSTANCE; + break; + case SLOTS: + this.slotMatchingResolver = SlotsBalancedSlotMatchingResolver.INSTANCE; + break; + case TASKS: + this.slotMatchingResolver = TasksBalancedSlotMatchingResolver.INSTANCE; + break; + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported task manager load balance mode '%s' in %s", + taskManagerLoadBalanceMode, getClass().getName())); + } + return slotMatchingResolver; + } +} diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/strategy/RequestSlotMatchingStrategyBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/strategy/RequestSlotMatchingStrategyBenchmarkExecutor.java new file mode 100644 index 00000000..70234593 --- /dev/null +++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/strategy/RequestSlotMatchingStrategyBenchmarkExecutor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.scheduler.benchmark.scheduling.slot.matching.strategy; + +import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.PendingRequest; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.TasksBalancedRequestSlotMatchingStrategy; +import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot; +import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.RunnerException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; + +import static org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.SLOTS_PER_TASKS_MANAGER; +import static org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.TASK_MANAGERS; +import static org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.getTaskManagerLocation; +import static org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.newGrainfinedResourceProfile; + +/** The executor to drive {@link RequestSlotMatchingStrategy}. */ +public class RequestSlotMatchingStrategyBenchmarkExecutor extends SchedulerBenchmarkExecutorBase { + + private static final Collection slots = new ArrayList<>(); + private static final Collection slotRequests = new ArrayList<>(); + + static { + // For requested groups and slots. + for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) { + + TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1); + + for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; slotIndex++) { + ResourceProfile profile = newGrainfinedResourceProfile(slotIndex); + + slots.add(new TestingSlot(new AllocationID(), profile, tml)); + slotRequests.add(getPendingRequest(slotIndex + 1, slotIndex)); + } + } + } + + private static PendingRequest getPendingRequest(float loading, int slotIndex) { + return PendingRequest.createNormalRequest( + new SlotRequestId(), + newGrainfinedResourceProfile(slotIndex), + new DefaultLoadingWeight(loading), + Collections.emptyList()); + } + + @Param({"NONE", "TASKS"}) + private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode; + + private RequestSlotMatchingStrategy requestSlotMatchingStrategy; + + public static void main(String[] args) throws RunnerException { + runBenchmark(RequestSlotMatchingStrategyBenchmarkExecutor.class); + } + + @Setup(Level.Trial) + public void setup() throws Exception { + requestSlotMatchingStrategy = getRequestSlotMatchingStrategy(); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void runSlotsMatching(Blackhole blackhole) { + blackhole.consume( + requestSlotMatchingStrategy.matchRequestsAndSlots( + slots, slotRequests, new HashMap<>())); + } + + private RequestSlotMatchingStrategy getRequestSlotMatchingStrategy() { + switch (taskManagerLoadBalanceMode) { + case TASKS: + this.requestSlotMatchingStrategy = + TasksBalancedRequestSlotMatchingStrategy.INSTANCE; + break; + case NONE: + this.requestSlotMatchingStrategy = SimpleRequestSlotMatchingStrategy.INSTANCE; + break; + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported task manager load balance mode '%s' in %s", + taskManagerLoadBalanceMode, getClass().getName())); + } + return requestSlotMatchingStrategy; + } +} diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/resolver/SlotSharingResolverBenchmark.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/resolver/SlotSharingResolverBenchmark.java new file mode 100644 index 00000000..ebb609b9 --- /dev/null +++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/resolver/SlotSharingResolverBenchmark.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.scheduler.benchmark.scheduling.slot.sharing.resolver; + +import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation; +import org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotSharingResolver; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator; +import org.apache.flink.runtime.scheduler.adaptive.allocator.TaskBalancedSlotSharingResolver; +import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; + +import java.util.Collection; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore; + +/** + * The benchmark of initializing {@link + * org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver}. + */ +public class SlotSharingResolverBenchmark { + + private final JobInformation jobInformation; + private final VertexParallelism vertexParallelism; + private final TaskManagerLoadBalanceMode taskManagerLoadBalanceMode; + + public SlotSharingResolverBenchmark( + TaskManagerLoadBalanceMode taskManagerLoadBalanceMode, Collection vertices) { + this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode; + final JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(vertices).build(); + try { + ExecutionGraph executionGraph = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(jobGraph) + .build(new DirectScheduledExecutorService()); + VertexParallelismStore vertexParallelismStore = computeVertexParallelismStore(jobGraph); + this.jobInformation = new JobGraphJobInformation(jobGraph, vertexParallelismStore); + this.vertexParallelism = + new VertexParallelism( + executionGraph.getAllVertices().values().stream() + .collect( + Collectors.toMap( + AccessExecutionJobVertex::getJobVertexId, + AccessExecutionJobVertex::getParallelism))); + } catch (JobException | JobExecutionException e) { + throw new RuntimeException(e); + } + } + + public Collection + invokeSlotSharingResolver() { + SlotSharingResolver slotSharingResolver = createSlotSharingResolver(); + return slotSharingResolver.getExecutionSlotSharingGroups(jobInformation, vertexParallelism); + } + + private SlotSharingResolver createSlotSharingResolver() { + switch (taskManagerLoadBalanceMode) { + case NONE: + return DefaultSlotSharingResolver.INSTANCE; + case TASKS: + return TaskBalancedSlotSharingResolver.INSTANCE; + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported task manager load balance mode '%s' in %s", + taskManagerLoadBalanceMode, getClass().getName())); + } + } +} diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/resolver/SlotSharingResolverBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/resolver/SlotSharingResolverBenchmarkExecutor.java new file mode 100644 index 00000000..a6a9c194 --- /dev/null +++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/resolver/SlotSharingResolverBenchmarkExecutor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.scheduler.benchmark.scheduling.slot.sharing.resolver; + +import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode; +import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.RunnerException; + +import static org.apache.flink.scheduler.benchmark.scheduling.slot.sharing.strategy.InitSlotSharingStrategyBenchmarkExecutor.JOB_VERTICES; + +/** + * The executor to run {@link SlotSharingResolverBenchmark} for using {@link + * org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver}. + */ +public class SlotSharingResolverBenchmarkExecutor extends SchedulerBenchmarkExecutorBase { + + @Param({"NONE", "TASKS"}) + private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode; + + private SlotSharingResolverBenchmark benchmark; + + public static void main(String[] args) throws RunnerException { + runBenchmark(SlotSharingResolverBenchmark.class); + } + + @Setup(Level.Trial) + public void setup() throws Exception { + benchmark = new SlotSharingResolverBenchmark(taskManagerLoadBalanceMode, JOB_VERTICES); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void invokeSlotSharingResolver(Blackhole blackhole) { + blackhole.consume(benchmark.invokeSlotSharingResolver()); + } +} diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/strategy/InitSlotSharingStrategyBenchmark.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/strategy/InitSlotSharingStrategyBenchmark.java new file mode 100644 index 00000000..9f0f4df1 --- /dev/null +++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/strategy/InitSlotSharingStrategyBenchmark.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.scheduler.benchmark.scheduling.slot.sharing.strategy; + +import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.LocalInputPreferredSlotSharingStrategy; +import org.apache.flink.runtime.scheduler.SlotSharingStrategy; +import org.apache.flink.runtime.scheduler.TaskBalancedPreferredSlotSharingStrategy; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; + +import java.util.Collection; + +/** The benchmark of initializing {@link SlotSharingStrategy}. */ +public class InitSlotSharingStrategyBenchmark { + + private final JobGraph jobGraph; + private final ExecutionGraph executionGraph; + private final TaskManagerLoadBalanceMode taskManagerLoadBalanceMode; + + public InitSlotSharingStrategyBenchmark( + TaskManagerLoadBalanceMode taskManagerLoadBalanceMode, Collection vertices) { + this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode; + this.jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(vertices).build(); + try { + this.executionGraph = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(jobGraph) + .build(new DirectScheduledExecutorService()); + } catch (JobException | JobExecutionException e) { + throw new RuntimeException(e); + } + } + + public SlotSharingStrategy createSlotSharingStrategy() { + switch (taskManagerLoadBalanceMode) { + case NONE: + return new LocalInputPreferredSlotSharingStrategy.Factory() + .create( + executionGraph.getSchedulingTopology(), + jobGraph.getSlotSharingGroups(), + jobGraph.getCoLocationGroups()); + case TASKS: + return new TaskBalancedPreferredSlotSharingStrategy.Factory() + .create( + executionGraph.getSchedulingTopology(), + jobGraph.getSlotSharingGroups(), + jobGraph.getCoLocationGroups()); + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported task manager load balance mode '%s' in %s", + taskManagerLoadBalanceMode, getClass().getName())); + } + } +} diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/strategy/InitSlotSharingStrategyBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/strategy/InitSlotSharingStrategyBenchmarkExecutor.java new file mode 100644 index 00000000..750539b7 --- /dev/null +++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/strategy/InitSlotSharingStrategyBenchmarkExecutor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.scheduler.benchmark.scheduling.slot.sharing.strategy; + +import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.SlotSharingStrategy; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.util.JobVertexConnectionUtils; +import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.RunnerException; + +import java.util.Arrays; +import java.util.Collection; + +/** + * The executor to run {@link InitSlotSharingStrategyBenchmark} for initializing {@link + * SlotSharingStrategy}. + */ +public class InitSlotSharingStrategyBenchmarkExecutor extends SchedulerBenchmarkExecutorBase { + + public static final Collection JOB_VERTICES; + + static JobVertex newJobVertex(int parallelism) { + JobVertex jobVertex = new JobVertex("", new JobVertexID()); + jobVertex.setParallelism(parallelism); + jobVertex.setInvokableClass(NoOpInvokable.class); + return jobVertex; + } + + static { + JobVertex jobVertexA = newJobVertex(1000); + JobVertex jobVertexB = newJobVertex(4000); + JobVertex jobVertexC = newJobVertex(4000); + JobVertex jobVertexD = newJobVertex(2000); + JobVertex jobVertexE = newJobVertex(3000); + JobVertex jobVertexF = newJobVertex(1000); + + JobVertexConnectionUtils.connectNewDataSetAsInput( + jobVertexB, + jobVertexA, + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.BLOCKING); + JobVertexConnectionUtils.connectNewDataSetAsInput( + jobVertexC, + jobVertexB, + DistributionPattern.POINTWISE, + ResultPartitionType.BLOCKING); + JobVertexConnectionUtils.connectNewDataSetAsInput( + jobVertexD, + jobVertexC, + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.BLOCKING); + JobVertexConnectionUtils.connectNewDataSetAsInput( + jobVertexE, + jobVertexD, + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.BLOCKING); + + JOB_VERTICES = + Arrays.asList( + jobVertexA, jobVertexB, jobVertexC, jobVertexD, jobVertexE, jobVertexF); + } + + @Param({"NONE", "TASKS"}) + private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode; + + private InitSlotSharingStrategyBenchmark benchmark; + + public static void main(String[] args) throws RunnerException { + runBenchmark(InitSlotSharingStrategyBenchmark.class); + } + + @Setup(Level.Trial) + public void setup() throws Exception { + benchmark = new InitSlotSharingStrategyBenchmark(taskManagerLoadBalanceMode, JOB_VERTICES); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void createSlotSharingStrategy(Blackhole blackhole) { + blackhole.consume(benchmark.createSlotSharingStrategy()); + } +} diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/throughput/ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark.java b/src/main/java/org/apache/flink/scheduler/benchmark/throughput/ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark.java new file mode 100644 index 00000000..9ac798ce --- /dev/null +++ b/src/main/java/org/apache/flink/scheduler/benchmark/throughput/ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.scheduler.benchmark.throughput; + +import org.apache.flink.benchmark.BenchmarkBase; +import org.apache.flink.benchmark.FlinkEnvironmentContext; +import org.apache.flink.benchmark.functions.LongSourceType; +import org.apache.flink.benchmark.functions.MultiplyByTwo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +@OperationsPerInvocation( + value = ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark.RECORDS_PER_INVOCATION) +public class ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark extends BenchmarkBase { + public static final int RECORDS_PER_INVOCATION = 150_000; + private static final long CHECKPOINT_INTERVAL_MS = 100; + + @Param({"F27_UNBOUNDED"}) + public LongSourceType sourceType; + + @Param({"NONE", "TASKS"}) + public TaskManagerLoadBalanceMode taskManagerLoadBalanceMode; + + @Param({"true", "false"}) + public boolean allParallelismsOfVerticesSame; + + public static void main(String[] args) throws RunnerException { + Options options = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include( + ".*" + + ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark + .class + .getCanonicalName() + + ".*") + .build(); + + new Runner(options).run(); + } + + @Benchmark + public void throughput(InputBenchmarkFlinkEnvironmentContext context) throws Exception { + + int[] parallelisms = new int[] {1, 2, 3, 4, 5, 6}; + StreamExecutionEnvironment env = context.env; + env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); + env.disableOperatorChaining(); + configAdaptivePartitioner(env); + + DataStreamSource source = sourceType.source(env, RECORDS_PER_INVOCATION); + if (allParallelismsOfVerticesSame) { + source.setParallelism(parallelisms[parallelisms.length - 1]); + } else { + source.setParallelism(parallelisms[0]); + } + + SingleOutputStreamOperator index = source; + for (int i = 1; i < parallelisms.length; i++) { + index = + index.map(new MultiplyByTwo()) + .setParallelism( + allParallelismsOfVerticesSame + ? parallelisms[parallelisms.length - 1] + : parallelisms[i]); + } + + index.sinkTo(new DiscardingSink<>()).setParallelism(parallelisms[parallelisms.length - 1]); + + env.execute(); + } + + private void configAdaptivePartitioner(StreamExecutionEnvironment env) { + Configuration config = new Configuration(); + config.set(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE, taskManagerLoadBalanceMode); + config.setString("restart-strategy", "fixed-delay"); + config.setString("restart-strategy.fixed-delay.attempts", "15000000"); + config.setString("restart-strategy.fixed-delay.delay", "3s"); + env.configure(config); + } + + public static class InputBenchmarkFlinkEnvironmentContext extends FlinkEnvironmentContext { + + @Override + protected int getNumberOfTaskManagers() { + return 2; + } + + @Override + protected int getNumberOfSlotsPerTaskManager() { + return 6; + } + } +}