Skip to content

Commit 034d19c

Browse files
vahmed-hamdyXComp
authored andcommitted
[FLINK-37412] Add MIN_RESOURCES TaskManagerLoadBalanceMode to maximize TM usage
1 parent edc3d68 commit 034d19c

File tree

6 files changed

+170
-29
lines changed

6 files changed

+170
-29
lines changed

docs/layouts/shortcodes/generated/all_taskmanager_section.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
<td><h5>taskmanager.load-balance.mode</h5></td>
9191
<td style="word-wrap: break-word;">NONE</td>
9292
<td><p>Enum</p></td>
93-
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li></ul></td>
93+
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">MIN_RESOURCES</code> mode tries to allocate slots on minimum number of available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li><li>"MIN_RESOURCES"</li></ul></td>
9494
</tr>
9595
<tr>
9696
<td><h5>taskmanager.log.path</h5></td>

docs/layouts/shortcodes/generated/expert_scheduling_section.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@
222222
<td><h5>taskmanager.load-balance.mode</h5></td>
223223
<td style="word-wrap: break-word;">NONE</td>
224224
<td><p>Enum</p></td>
225-
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li></ul></td>
225+
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">MIN_RESOURCES</code> mode tries to allocate slots on minimum number of available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li><li>"MIN_RESOURCES"</li></ul></td>
226226
</tr>
227227
</tbody>
228228
</table>

docs/layouts/shortcodes/generated/task_manager_configuration.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
<td><h5>taskmanager.load-balance.mode</h5></td>
7373
<td style="word-wrap: break-word;">NONE</td>
7474
<td><p>Enum</p></td>
75-
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li></ul></td>
75+
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">MIN_RESOURCES</code> mode tries to allocate slots on minimum number of available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li><li>"MIN_RESOURCES"</li></ul></td>
7676
</tr>
7777
<tr>
7878
<td><h5>taskmanager.log.path</h5></td>

flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,12 @@ public class TaskManagerOptions {
708708
"The %s mode tries to spread out the slots evenly across all available %s.",
709709
code(TaskManagerLoadBalanceMode.SLOTS.name()),
710710
code("TaskManagers")),
711+
text(
712+
"The %s mode tries to allocate slots on minimum number of available %s.",
713+
code(
714+
TaskManagerLoadBalanceMode.MIN_RESOURCES
715+
.name()),
716+
code("TaskManagers")),
711717
text(
712718
"The %s mode is the default mode without any specified strategy.",
713719
code(TaskManagerLoadBalanceMode.NONE.name())))
@@ -754,7 +760,8 @@ public enum SystemOutMode {
754760
/** Type of {@link TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}. */
755761
public enum TaskManagerLoadBalanceMode {
756762
NONE,
757-
SLOTS
763+
SLOTS,
764+
MIN_RESOURCES
758765
}
759766

760767
// ------------------------------------------------------------------------

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java

+52-17
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,20 @@ public DefaultResourceAllocationStrategy(
9595
this.defaultSlotResourceProfile =
9696
SlotManagerUtils.generateDefaultSlotResourceProfile(
9797
totalResourceProfile, numSlotsPerWorker);
98-
this.availableResourceMatchingStrategy =
99-
taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
100-
? LeastUtilizationResourceMatchingStrategy.INSTANCE
101-
: AnyMatchingResourceMatchingStrategy.INSTANCE;
98+
switch (taskManagerLoadBalanceMode) {
99+
case SLOTS:
100+
this.availableResourceMatchingStrategy =
101+
PrioritizedResourceMatchingStrategy.leastUtilization();
102+
break;
103+
case MIN_RESOURCES:
104+
this.availableResourceMatchingStrategy =
105+
PrioritizedResourceMatchingStrategy.mostUtilization();
106+
break;
107+
default:
108+
this.availableResourceMatchingStrategy =
109+
AnyMatchingResourceMatchingStrategy.INSTANCE;
110+
}
111+
102112
this.taskManagerTimeout = taskManagerTimeout;
103113
this.redundantTaskManagerNum = redundantTaskManagerNum;
104114
this.minTotalCPU = minTotalCPU;
@@ -526,8 +536,35 @@ public int tryFulfilledRequirementWithResource(
526536
}
527537
}
528538

529-
private enum LeastUtilizationResourceMatchingStrategy implements ResourceMatchingStrategy {
530-
INSTANCE;
539+
private static class PrioritizedResourceMatchingStrategy implements ResourceMatchingStrategy {
540+
private final Comparator<InternalResourceInfo> resourceInfoComparator;
541+
542+
/**
543+
* Returns a {@link PrioritizedResourceMatchingStrategy} that prioritizes the resource with
544+
* the least utilization, used to evenly distribute slots to workers.
545+
*
546+
* @return least utilization prioritized resource matching strategy.
547+
*/
548+
public static PrioritizedResourceMatchingStrategy leastUtilization() {
549+
return new PrioritizedResourceMatchingStrategy(
550+
Comparator.comparingDouble(i -> i.utilization));
551+
}
552+
553+
/**
554+
* Returns a {@link PrioritizedResourceMatchingStrategy} that prioritizes the resource with
555+
* the highest utilization, used to minimize number of workers assigned.
556+
*
557+
* @return most utilization prioritized resource matching strategy.
558+
*/
559+
public static PrioritizedResourceMatchingStrategy mostUtilization() {
560+
return new PrioritizedResourceMatchingStrategy(
561+
Comparator.comparingDouble(i -> -i.utilization));
562+
}
563+
564+
private PrioritizedResourceMatchingStrategy(
565+
final Comparator<InternalResourceInfo> resourceInfoComparator) {
566+
this.resourceInfoComparator = resourceInfoComparator;
567+
}
531568

532569
@Override
533570
public int tryFulfilledRequirementWithResource(
@@ -539,25 +576,23 @@ public int tryFulfilledRequirementWithResource(
539576
return numUnfulfilled;
540577
}
541578

542-
Queue<InternalResourceInfo> resourceInfoInUtilizationOrder =
543-
new PriorityQueue<>(
544-
internalResources.size(),
545-
Comparator.comparingDouble(i -> i.utilization));
546-
resourceInfoInUtilizationOrder.addAll(internalResources);
579+
Queue<InternalResourceInfo> prioritizedResources =
580+
new PriorityQueue<>(internalResources.size(), resourceInfoComparator);
581+
prioritizedResources.addAll(internalResources);
547582

548-
while (numUnfulfilled > 0 && !resourceInfoInUtilizationOrder.isEmpty()) {
549-
final InternalResourceInfo currentTaskManager =
550-
resourceInfoInUtilizationOrder.poll();
583+
while (numUnfulfilled > 0 && !prioritizedResources.isEmpty()) {
584+
final InternalResourceInfo currentTaskManager = prioritizedResources.poll();
551585

552586
if (currentTaskManager.tryAllocateSlotForJob(jobId, requiredResource)) {
553587
numUnfulfilled--;
554588

555-
// ignore non resource task managers to reduce the overhead of insert.
556-
if (!currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
557-
resourceInfoInUtilizationOrder.add(currentTaskManager);
589+
// ignore non-fitting task managers to reduce the overhead of insert and check.
590+
if (currentTaskManager.availableProfile.allFieldsNoLessThan(requiredResource)) {
591+
prioritizedResources.add(currentTaskManager);
558592
}
559593
}
560594
}
595+
561596
return numUnfulfilled;
562597
}
563598
}

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java

+107-8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class DefaultResourceAllocationStrategyTest {
5050
private static final DefaultResourceAllocationStrategy EVENLY_STRATEGY =
5151
createStrategy(TaskManagerLoadBalanceMode.SLOTS);
5252

53+
private static final DefaultResourceAllocationStrategy MIN_RESOURCES_STRATEGY =
54+
createStrategy(TaskManagerLoadBalanceMode.MIN_RESOURCES);
55+
5356
@Test
5457
void testFulfillRequirementWithRegisteredResources() {
5558
final TaskManagerInfo taskManager =
@@ -72,9 +75,7 @@ void testFulfillRequirementWithRegisteredResources() {
7275
Collections.singletonMap(jobId, requirements),
7376
taskManagerResourceInfoProvider,
7477
resourceID -> false);
75-
assertThat(result.getUnfulfillableJobs()).isEmpty();
76-
assertThat(result.getAllocationsOnPendingResources()).isEmpty();
77-
assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
78+
assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);
7879
assertThat(
7980
result.getAllocationsOnRegisteredResources()
8081
.get(jobId)
@@ -123,9 +124,7 @@ void testFulfillRequirementWithRegisteredResourcesEvenly() {
123124
Collections.singletonMap(jobId, requirements),
124125
taskManagerResourceInfoProvider,
125126
resourceID -> false);
126-
assertThat(result.getUnfulfillableJobs()).isEmpty();
127-
assertThat(result.getAllocationsOnPendingResources()).isEmpty();
128-
assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
127+
assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);
129128

130129
assertThat(result.getAllocationsOnRegisteredResources().get(jobId).values())
131130
.allSatisfy(
@@ -138,6 +137,100 @@ void testFulfillRequirementWithRegisteredResourcesEvenly() {
138137
.isTrue());
139138
}
140139

140+
@Test
141+
void testFullFillRequirementWithMinimumPossibleRegisteredResources() {
142+
final TaskManagerInfo taskManager1 =
143+
new TestingTaskManagerInfo(
144+
DEFAULT_SLOT_RESOURCE.multiply(10),
145+
DEFAULT_SLOT_RESOURCE.multiply(10),
146+
DEFAULT_SLOT_RESOURCE);
147+
final TaskManagerInfo taskManager2 =
148+
new TestingTaskManagerInfo(
149+
DEFAULT_SLOT_RESOURCE.multiply(10),
150+
DEFAULT_SLOT_RESOURCE.multiply(10),
151+
DEFAULT_SLOT_RESOURCE);
152+
final JobID jobId = new JobID();
153+
final List<ResourceRequirement> requirements = new ArrayList<>();
154+
final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(2);
155+
final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
156+
TestingTaskManagerResourceInfoProvider.newBuilder()
157+
.setRegisteredTaskManagersSupplier(
158+
() -> Arrays.asList(taskManager1, taskManager2))
159+
.build();
160+
requirements.add(ResourceRequirement.create(largeResource, 4));
161+
requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 2));
162+
163+
final ResourceAllocationResult result =
164+
MIN_RESOURCES_STRATEGY.tryFulfillRequirements(
165+
Collections.singletonMap(jobId, requirements),
166+
taskManagerResourceInfoProvider,
167+
resourceID -> false);
168+
169+
assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);
170+
171+
assertThat(result.getAllocationsOnRegisteredResources().get(jobId).keySet())
172+
.overridingErrorMessage(
173+
"Expected only one task manager to be used to fulfill the requirements.")
174+
.hasSize(1);
175+
176+
assertThat(result.getAllocationsOnRegisteredResources().get(jobId).values())
177+
.overridingErrorMessage("Expected all 6 allocations on registered TM.")
178+
.allSatisfy(
179+
resourceCounter ->
180+
assertThat(resourceCounter.getTotalResourceCount()).isEqualTo(6));
181+
182+
assertThat(result.getAllocationsOnRegisteredResources().get(jobId).values())
183+
.allSatisfy(
184+
resourceCounter ->
185+
assertThat(resourceCounter.containsResource(largeResource))
186+
.isTrue());
187+
}
188+
189+
@Test
190+
void testFullFillRequirementWithUtilizedRegisteredResourceIsPicked() {
191+
final TaskManagerInfo utilizedTaskManager =
192+
new TestingTaskManagerInfo(
193+
DEFAULT_SLOT_RESOURCE.multiply(10),
194+
DEFAULT_SLOT_RESOURCE.multiply(5),
195+
DEFAULT_SLOT_RESOURCE);
196+
197+
final TaskManagerInfo freeTaskManager =
198+
new TestingTaskManagerInfo(
199+
DEFAULT_SLOT_RESOURCE.multiply(10),
200+
DEFAULT_SLOT_RESOURCE.multiply(10),
201+
DEFAULT_SLOT_RESOURCE);
202+
final JobID jobId = new JobID();
203+
final List<ResourceRequirement> requirements = new ArrayList<>();
204+
final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(2);
205+
final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
206+
TestingTaskManagerResourceInfoProvider.newBuilder()
207+
.setRegisteredTaskManagersSupplier(
208+
() -> Arrays.asList(freeTaskManager, utilizedTaskManager))
209+
.build();
210+
requirements.add(ResourceRequirement.create(largeResource, 2));
211+
requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1));
212+
213+
final ResourceAllocationResult result =
214+
MIN_RESOURCES_STRATEGY.tryFulfillRequirements(
215+
Collections.singletonMap(jobId, requirements),
216+
taskManagerResourceInfoProvider,
217+
resourceID -> false);
218+
219+
assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);
220+
221+
assertThat(result.getAllocationsOnRegisteredResources().get(jobId).keySet())
222+
.overridingErrorMessage(
223+
"Expected requirements to be fulfilled by the utilized task manager.")
224+
.containsExactly(utilizedTaskManager.getInstanceId());
225+
226+
assertThat(result.getAllocationsOnRegisteredResources().get(jobId).values())
227+
.allSatisfy(
228+
resourceCounter -> {
229+
assertThat(resourceCounter.containsResource(largeResource)).isTrue();
230+
assertThat(resourceCounter.getTotalResourceCount()).isEqualTo(3);
231+
});
232+
}
233+
141234
@Test
142235
void testExcessPendingResourcesCouldReleaseEvenly() {
143236
final JobID jobId = new JobID();
@@ -209,8 +302,7 @@ void testSpecialResourcesRequirementCouldFulfilled(DefaultResourceAllocationStra
209302
taskManagerResourceInfoProvider,
210303
resourceID -> false);
211304

212-
assertThat(result.getUnfulfillableJobs()).isEmpty();
213-
assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
305+
assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);
214306
assertThat(result.getAllocationsOnRegisteredResources()).hasSize(1);
215307
assertThat(result.getAllocationsOnRegisteredResources().get(jobId).keySet())
216308
.satisfiesExactly(
@@ -728,4 +820,11 @@ private static DefaultResourceAllocationStrategy createStrategy(
728820
minRequiredCPU,
729821
minRequiredMemory);
730822
}
823+
824+
private static void assertAllSlotsAllocatedToRegisteredTaskManagersOnly(
825+
ResourceAllocationResult result) {
826+
assertThat(result.getUnfulfillableJobs()).isEmpty();
827+
assertThat(result.getAllocationsOnPendingResources()).isEmpty();
828+
assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
829+
}
731830
}

0 commit comments

Comments
 (0)