Skip to content

[Query Resource Isolation] Workload Scheduler #16018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

praveenc7
Copy link
Contributor

@praveenc7 praveenc7 commented Jun 5, 2025

Summary

This Scheduler is an improvement over Binary Workload Scheduler. It will verify CPU / memory budgets at admission time.

It uses the WorkloadBudgetManager.canAdmitQuery() before scheduling to ensure if we can any budget to admit the query. This function can be enhanced to also allow for borrowing from other unused workloads, but will take that up as a future enhancement.

Back-compat & Upgrade Notes

No behavioural change for clusters that don’t set the new configs for enabling the workload scheduler

Performance

Budget checks are O(1) atomic-counter operations and run only once per query admission; empirical tests show < 100 ns overhead per call, negligible compared with planner cost.

Testing done

Unit test added

@praveenc7 praveenc7 force-pushed the scheduler branch 2 times, most recently from cb18c42 to 3869c48 Compare June 5, 2025 23:11
@praveenc7 praveenc7 force-pushed the scheduler branch 3 times, most recently from b811200 to fc1b41c Compare July 14, 2025 17:54
@praveenc7 praveenc7 marked this pull request as ready for review July 14, 2025 17:57
@codecov-commenter
Copy link

codecov-commenter commented Jul 14, 2025

Codecov Report

Attention: Patch coverage is 43.93939% with 37 lines in your changes missing coverage. Please review.

Project coverage is 63.16%. Comparing base (1a476de) to head (e4f25c7).
Report is 445 commits behind head on master.

Files with missing lines Patch % Lines
.../pinot/core/query/scheduler/WorkloadScheduler.java 22.22% 21 Missing ⚠️
...roker/requesthandler/BaseBrokerRequestHandler.java 40.00% 5 Missing and 1 partial ⚠️
...y/scheduler/resources/WorkloadResourceManager.java 37.50% 5 Missing ⚠️
...e/pinot/core/accounting/WorkloadBudgetManager.java 68.75% 2 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16018      +/-   ##
============================================
+ Coverage     62.90%   63.16%   +0.26%     
+ Complexity     1386     1363      -23     
============================================
  Files          2867     2976     +109     
  Lines        163354   172645    +9291     
  Branches      24952    26435    +1483     
============================================
+ Hits         102755   109053    +6298     
- Misses        52847    55264    +2417     
- Partials       7752     8328     +576     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.12% <43.93%> (+0.25%) ⬆️
java-21 63.15% <43.93%> (+0.32%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.16% <43.93%> (+0.26%) ⬆️
unittests 63.16% <43.93%> (+0.26%) ⬆️
unittests1 56.40% <44.64%> (+0.58%) ⬆️
unittests2 33.17% <10.60%> (-0.40%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@@ -115,4 +115,31 @@ void testConcurrentTryChargeSingleWorkload() throws InterruptedException {
assertEquals(initialMemBudget - totalMemCharged, remaining._memoryRemaining,
"Memory budget mismatch after concurrent updates");
}

@Test
void testCanAdmitQuery() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these tests enough to get good coverage of all scenarios / combinations ? How are we validating that ?

Do we have a theoretical version / model of different permutations and combinations that we are using to test the functionality ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we written a unit or integration test to mimic the steady state pattern in PROD ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the WorkloadBudget admission logic, I’ve tested all possible scenarios to ensure queries are either admitted or rejected correctly. Additional test cases for WorkloadBudgetManager were also contributed by @vvivekiyer in his earlier PR.

We further validated this on a production host by deploying a binary and confirmed the behavior aligns with expectations.

This implementation uses most of the scheduling logic handled by other schedulers in this class. Further we plan to include it in our comprehensive end-to-end validation as part of the QRI testing plan.

Since the scheduler usage is gated by a config flag, it will remain disabled until validation is complete.

+ workloadName;
LOGGER.info(errorMessage);
requestContext.setErrorCode(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED);
return new BrokerResponseNative(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED, errorMessage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this admission control on the broker even needed? Can't we just rely on the Accountant to catch these kill them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can but, isn't it good to fast exit and not waste resources if the query is eventually getting killed?

MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false);
MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false),
// Workload Budget exceeded counter
WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times workload budget exceeded");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to have this metric on the broker too. That way we can get a count of the number of queries that failed because of exceeding workload budget. Would be good to see both broker-host level and <broker, workload> level metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, let me add it there and make it both workload level and global level. Do you think table-level make sense here? I see an value in cases where multiple table share the same workload, providing insights into table-level exhausation

@@ -52,6 +52,7 @@ public enum QueryErrorCode {
BROKER_REQUEST_SEND(425, "BrokerRequestSend"),
SERVER_NOT_RESPONDING(427, "ServerNotResponding"),
TOO_MANY_REQUESTS(429, "TooManyRequests"),
WORKLOAD_QUOTA_EXCEEDED(429, "WorkloadQuotaExceeded"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also change all other places to propagate this error code when terminating queries because of Workload exhaustion - in the accountant, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the other place would be the WorkloadAggregator, let me see if there is way to add this

* This is fixed budget allocated during host startup and used across all secondary queries.
*/
private void initSecondaryWorkloadBudget(PinotConfiguration config) {
_secondaryWorkloadName = config.getProperty(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us make this optional as not all users would be migrating from BinaryWorkloadScheduler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the DEFAULT_SECONDARY_WORKLOAD_CPU_PERCENTAGE to be 0, so the secondaryWorkload won't be turned on by default

// The Secondary CPU budget is based on the CPU percentage allocated for secondary workload.
// The memory budget is set to Long.MAX_VALUE for now, since we do not have a specific memory budget for
// secondary queries.
long secondaryCpuBudget = (long) (secondaryCpuPercentage * _enforcementWindowMs * 100_000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This computation seems off.
Considering that there are 50 threads. If secondaryCpuPercentage=10, you'll be allocating 20% of CPU to secondary queries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, missed to add availableProcessors

* @param workload the workload identifier to check budget for
* @return true if the query may be accepted; false if budget is insufficient
*/
public boolean canAdmitQuery(String workload, boolean isSecondary) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

canAdmitQuery should only take workload as the parameter. The workload name will be equal to "defaultSecondary" if the query belongs to secondary workload. And as a part of the init function, we would have already allocated budget for it. This will also eliminate the need for a member variable _secondaryWorkloadName.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this

import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;

public class WorkloadResourceManager extends ResourceManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this class is necessary. We can just use the UnboundedResourceManager for now. The only thing this is doing differently is setting thread limits based on the policy. But why is that needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, currently it is not different than UnboundedResourceManager. The taught was to avoid any warranted change made toUnboundedResourceManager doesn't effect our scheduler and also allow for this resource to evolve differently, if needed in Future. However I get your point that we can create ResourceManager when we have some new changes or diverge from UnboundedResourceManager

return shuttingDown(queryRequest);
}

boolean isSecondary = QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if isSecondary is true, use the configs here to populate workload name. Add a comment mentioning that this is needed for backward compatibility when migrating from BinaryWorkloadScheduler to WorkloadScheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supporting secondary workloads breaks down into two concerns:

  1. Defining the secondary workload name
  2. Assigning a cost to that workload.

I moved all cost initialization into the WorkloadBudgetManager to keep the scheduler focused on admission logic. However, passing the isSecondary flag into the canAdmit API pollutes its intent—it shouldn’t need to know about secondary workloads. To clean this up, I’ll separate how and where we look up the secondary workload:

  • In the scheduler, read the secondary workload name only for backward‐compatibility routing.
  • In WorkloadBudgetManager, read it again at startup to establish its budget cost.

It does repeat some code, but this keeps each component responsible for its own concern without overloading canAdmit with extra context.

boolean isSecondary = QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions());
String workloadName = QueryOptionsUtils.getWorkloadName(queryRequest.getQueryContext().getQueryOptions());
if (!_workloadBudgetManager.canAdmitQuery(workloadName, isSecondary)) {
String tableName = TableNameBuilder.extractRawTableName(queryRequest.getTableNameWithType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explore queueing the requests here (until a certain threshold is reached) and admit the queries. This can be a TODO for later but do add a comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants