Skip to content

[FLINK-37412] Add MIN_RESOURCES TaskManagerLoadBalanceMode to maximize TM usage #26308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@
<td><h5>taskmanager.load-balance.mode</h5></td>
<td style="word-wrap: break-word;">NONE</td>
<td><p>Enum</p></td>
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li></ul></td>
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">MIN_RESOURCES</code> mode tries to allocate slots on minimum number of available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li><li>"MIN_RESOURCES"</li></ul></td>
</tr>
<tr>
<td><h5>taskmanager.log.path</h5></td>
Original file line number Diff line number Diff line change
@@ -222,7 +222,7 @@
<td><h5>taskmanager.load-balance.mode</h5></td>
<td style="word-wrap: break-word;">NONE</td>
<td><p>Enum</p></td>
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li></ul></td>
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">MIN_RESOURCES</code> mode tries to allocate slots on minimum number of available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li><li>"MIN_RESOURCES"</li></ul></td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@
<td><h5>taskmanager.load-balance.mode</h5></td>
<td style="word-wrap: break-word;">NONE</td>
<td><p>Enum</p></td>
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li></ul></td>
<td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">MIN_RESOURCES</code> mode tries to allocate slots on minimum number of available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li><li>"MIN_RESOURCES"</li></ul></td>
</tr>
<tr>
<td><h5>taskmanager.log.path</h5></td>
Original file line number Diff line number Diff line change
@@ -708,6 +708,12 @@ public class TaskManagerOptions {
"The %s mode tries to spread out the slots evenly across all available %s.",
code(TaskManagerLoadBalanceMode.SLOTS.name()),
code("TaskManagers")),
text(
"The %s mode tries to allocate slots on minimum number of available %s.",
code(
TaskManagerLoadBalanceMode.MIN_RESOURCES
.name()),
code("TaskManagers")),
text(
"The %s mode is the default mode without any specified strategy.",
code(TaskManagerLoadBalanceMode.NONE.name())))
@@ -754,7 +760,8 @@ public enum SystemOutMode {
/** Type of {@link TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}. */
public enum TaskManagerLoadBalanceMode {
NONE,
SLOTS
SLOTS,
MIN_RESOURCES
}

// ------------------------------------------------------------------------
Original file line number Diff line number Diff line change
@@ -95,10 +95,20 @@ public DefaultResourceAllocationStrategy(
this.defaultSlotResourceProfile =
SlotManagerUtils.generateDefaultSlotResourceProfile(
totalResourceProfile, numSlotsPerWorker);
this.availableResourceMatchingStrategy =
taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
? LeastUtilizationResourceMatchingStrategy.INSTANCE
: AnyMatchingResourceMatchingStrategy.INSTANCE;
switch (taskManagerLoadBalanceMode) {
case SLOTS:
this.availableResourceMatchingStrategy =
PrioritizedResourceMatchingStrategy.leastUtilization();
break;
case MIN_RESOURCES:
this.availableResourceMatchingStrategy =
PrioritizedResourceMatchingStrategy.mostUtilization();
break;
default:
this.availableResourceMatchingStrategy =
AnyMatchingResourceMatchingStrategy.INSTANCE;
}

this.taskManagerTimeout = taskManagerTimeout;
this.redundantTaskManagerNum = redundantTaskManagerNum;
this.minTotalCPU = minTotalCPU;
@@ -526,8 +536,35 @@ public int tryFulfilledRequirementWithResource(
}
}

private enum LeastUtilizationResourceMatchingStrategy implements ResourceMatchingStrategy {
INSTANCE;
private static class PrioritizedResourceMatchingStrategy implements ResourceMatchingStrategy {
private final Comparator<InternalResourceInfo> resourceInfoComparator;

/**
* Returns a {@link PrioritizedResourceMatchingStrategy} that prioritizes the resource with
* the least utilization, used to evenly distribute slots to workers.
*
* @return least utilization prioritized resource matching strategy.
*/
public static PrioritizedResourceMatchingStrategy leastUtilization() {
return new PrioritizedResourceMatchingStrategy(
Comparator.comparingDouble(i -> i.utilization));
}

/**
* Returns a {@link PrioritizedResourceMatchingStrategy} that prioritizes the resource with
* the highest utilization, used to minimize number of workers assigned.
*
* @return most utilization prioritized resource matching strategy.
*/
public static PrioritizedResourceMatchingStrategy mostUtilization() {
return new PrioritizedResourceMatchingStrategy(
Comparator.comparingDouble(i -> -i.utilization));
}

private PrioritizedResourceMatchingStrategy(
final Comparator<InternalResourceInfo> resourceInfoComparator) {
this.resourceInfoComparator = resourceInfoComparator;
}

@Override
public int tryFulfilledRequirementWithResource(
@@ -539,25 +576,23 @@ public int tryFulfilledRequirementWithResource(
return numUnfulfilled;
}

Queue<InternalResourceInfo> resourceInfoInUtilizationOrder =
new PriorityQueue<>(
internalResources.size(),
Comparator.comparingDouble(i -> i.utilization));
resourceInfoInUtilizationOrder.addAll(internalResources);
Queue<InternalResourceInfo> prioritizedResources =
new PriorityQueue<>(internalResources.size(), resourceInfoComparator);
prioritizedResources.addAll(internalResources);

while (numUnfulfilled > 0 && !resourceInfoInUtilizationOrder.isEmpty()) {
final InternalResourceInfo currentTaskManager =
resourceInfoInUtilizationOrder.poll();
while (numUnfulfilled > 0 && !prioritizedResources.isEmpty()) {
final InternalResourceInfo currentTaskManager = prioritizedResources.poll();

if (currentTaskManager.tryAllocateSlotForJob(jobId, requiredResource)) {
numUnfulfilled--;

// ignore non resource task managers to reduce the overhead of insert.
if (!currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
resourceInfoInUtilizationOrder.add(currentTaskManager);
// ignore non-fitting task managers to reduce the overhead of insert and check.
if (currentTaskManager.availableProfile.allFieldsNoLessThan(requiredResource)) {
prioritizedResources.add(currentTaskManager);
}
}
}

return numUnfulfilled;
}
}
Original file line number Diff line number Diff line change
@@ -50,6 +50,9 @@ class DefaultResourceAllocationStrategyTest {
private static final DefaultResourceAllocationStrategy EVENLY_STRATEGY =
createStrategy(TaskManagerLoadBalanceMode.SLOTS);

private static final DefaultResourceAllocationStrategy MIN_RESOURCES_STRATEGY =
createStrategy(TaskManagerLoadBalanceMode.MIN_RESOURCES);

@Test
void testFulfillRequirementWithRegisteredResources() {
final TaskManagerInfo taskManager =
@@ -72,9 +75,7 @@ void testFulfillRequirementWithRegisteredResources() {
Collections.singletonMap(jobId, requirements),
taskManagerResourceInfoProvider,
resourceID -> false);
assertThat(result.getUnfulfillableJobs()).isEmpty();
assertThat(result.getAllocationsOnPendingResources()).isEmpty();
assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);
assertThat(
result.getAllocationsOnRegisteredResources()
.get(jobId)
@@ -123,9 +124,7 @@ void testFulfillRequirementWithRegisteredResourcesEvenly() {
Collections.singletonMap(jobId, requirements),
taskManagerResourceInfoProvider,
resourceID -> false);
assertThat(result.getUnfulfillableJobs()).isEmpty();
assertThat(result.getAllocationsOnPendingResources()).isEmpty();
assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);

assertThat(result.getAllocationsOnRegisteredResources().get(jobId).values())
.allSatisfy(
@@ -138,6 +137,100 @@ void testFulfillRequirementWithRegisteredResourcesEvenly() {
.isTrue());
}

@Test
void testFullFillRequirementWithMinimumPossibleRegisteredResources() {
final TaskManagerInfo taskManager1 =
new TestingTaskManagerInfo(
DEFAULT_SLOT_RESOURCE.multiply(10),
DEFAULT_SLOT_RESOURCE.multiply(10),
DEFAULT_SLOT_RESOURCE);
final TaskManagerInfo taskManager2 =
new TestingTaskManagerInfo(
DEFAULT_SLOT_RESOURCE.multiply(10),
DEFAULT_SLOT_RESOURCE.multiply(10),
DEFAULT_SLOT_RESOURCE);
final JobID jobId = new JobID();
final List<ResourceRequirement> requirements = new ArrayList<>();
final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(2);
final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
TestingTaskManagerResourceInfoProvider.newBuilder()
.setRegisteredTaskManagersSupplier(
() -> Arrays.asList(taskManager1, taskManager2))
.build();
requirements.add(ResourceRequirement.create(largeResource, 4));
requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 2));

final ResourceAllocationResult result =
MIN_RESOURCES_STRATEGY.tryFulfillRequirements(
Collections.singletonMap(jobId, requirements),
taskManagerResourceInfoProvider,
resourceID -> false);

assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);

assertThat(result.getAllocationsOnRegisteredResources().get(jobId).keySet())
.overridingErrorMessage(
"Expected only one task manager to be used to fulfill the requirements.")
.hasSize(1);

assertThat(result.getAllocationsOnRegisteredResources().get(jobId).values())
.overridingErrorMessage("Expected all 6 allocations on registered TM.")
.allSatisfy(
resourceCounter ->
assertThat(resourceCounter.getTotalResourceCount()).isEqualTo(6));

assertThat(result.getAllocationsOnRegisteredResources().get(jobId).values())
.allSatisfy(
resourceCounter ->
assertThat(resourceCounter.containsResource(largeResource))
.isTrue());
}

@Test
void testFullFillRequirementWithUtilizedRegisteredResourceIsPicked() {
final TaskManagerInfo utilizedTaskManager =
new TestingTaskManagerInfo(
DEFAULT_SLOT_RESOURCE.multiply(10),
DEFAULT_SLOT_RESOURCE.multiply(5),
DEFAULT_SLOT_RESOURCE);

final TaskManagerInfo freeTaskManager =
new TestingTaskManagerInfo(
DEFAULT_SLOT_RESOURCE.multiply(10),
DEFAULT_SLOT_RESOURCE.multiply(10),
DEFAULT_SLOT_RESOURCE);
final JobID jobId = new JobID();
final List<ResourceRequirement> requirements = new ArrayList<>();
final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(2);
final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
TestingTaskManagerResourceInfoProvider.newBuilder()
.setRegisteredTaskManagersSupplier(
() -> Arrays.asList(freeTaskManager, utilizedTaskManager))
.build();
requirements.add(ResourceRequirement.create(largeResource, 2));
requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1));

final ResourceAllocationResult result =
MIN_RESOURCES_STRATEGY.tryFulfillRequirements(
Collections.singletonMap(jobId, requirements),
taskManagerResourceInfoProvider,
resourceID -> false);

assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);

assertThat(result.getAllocationsOnRegisteredResources().get(jobId).keySet())
.overridingErrorMessage(
"Expected requirements to be fulfilled by the utilized task manager.")
.containsExactly(utilizedTaskManager.getInstanceId());

assertThat(result.getAllocationsOnRegisteredResources().get(jobId).values())
.allSatisfy(
resourceCounter -> {
assertThat(resourceCounter.containsResource(largeResource)).isTrue();
assertThat(resourceCounter.getTotalResourceCount()).isEqualTo(3);
});
}

@Test
void testExcessPendingResourcesCouldReleaseEvenly() {
final JobID jobId = new JobID();
@@ -209,8 +302,7 @@ void testSpecialResourcesRequirementCouldFulfilled(DefaultResourceAllocationStra
taskManagerResourceInfoProvider,
resourceID -> false);

assertThat(result.getUnfulfillableJobs()).isEmpty();
assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
assertAllSlotsAllocatedToRegisteredTaskManagersOnly(result);
assertThat(result.getAllocationsOnRegisteredResources()).hasSize(1);
assertThat(result.getAllocationsOnRegisteredResources().get(jobId).keySet())
.satisfiesExactly(
@@ -728,4 +820,11 @@ private static DefaultResourceAllocationStrategy createStrategy(
minRequiredCPU,
minRequiredMemory);
}

private static void assertAllSlotsAllocatedToRegisteredTaskManagersOnly(
ResourceAllocationResult result) {
assertThat(result.getUnfulfillableJobs()).isEmpty();
assertThat(result.getAllocationsOnPendingResources()).isEmpty();
assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
}
}