-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: Allow cancelling of grouping operations which are CPU bound #16196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…io MPSC (RecordBatchReceiverStream)
The PR is limited to solve aggregate with no group streaming, we can extend to more cases if it's not affecting performance? |
Thanks @zhuqi-lucas -- I'll try running the cancellation benchmark from @carols10cents |
🤖 |
1 similar comment
🤖 |
🤖: Benchmark completed Details
|
Thank you @alamb for review and benchmark. I am wandering if it will hit datafusion itself running performance, because we add (if logic) in the aggregate and other core exec. If we only want to support datafusion-cli canceling logic, maybe we can add the wapper logic to datafusion-cli. But from other related issue, it seems some cusomers use grpc to drop stream not only limited to datafusion-cli. May be the perfect solution is:
|
I polish the code only affect the no grouping aggregate, maybe we can compare the clickbench, so we can be confident to merge if it not affect aggregate performance. |
Updated the performance for current PR: SET datafusion.execution.target_partitions = 1;
SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value) |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.315 seconds. The main branch: SET datafusion.execution.target_partitions = 1;
SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value) |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.567 seconds. No performance regression from the above testing. |
@@ -77,6 +77,11 @@ impl AggregateStream { | |||
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); | |||
let input = agg.input.execute(partition, Arc::clone(&context))?; | |||
|
|||
// Only wrap no‐grouping aggregates in our YieldStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my own testing with partition_count = 1
group by aggregates suffer from the same problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve for review, i will try to reproduce it for group by aggregates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, i can reproduce it now:
SELECT
(value % 10) AS group_key,
COUNT(*) AS cnt,
SUM(value) AS sum_val
FROM range(1, 5000000000) AS t
GROUP BY (value % 10)
ORDER BY group_key;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve , i also added the grouping support in latest PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From testing result, it seems the grouping by cases have some performance regression.
Another solution is using CoalescePartitionsExec to wrapper: diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs
index 114f83068..ffb24463e 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
0 => internal_err!(
"CoalescePartitionsExec requires at least one input partition"
),
- 1 => {
- // bypass any threading / metrics if there is a single partition
- self.input.execute(0, context)
- }
+ // 1 => {
+ // // bypass any threading / metrics if there is a single partition
+ // self.input.execute(0, context)
+ // }
_ => {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// record the (very) minimal work done so that
diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs
index b81b3c8be..8bb8b2145 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -963,8 +963,7 @@ pub fn execute_stream(
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
- 1 => plan.execute(0, context),
- 2.. => {
+ 1.. => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
// CoalescePartitionsExec must produce a single partition
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd7..107b36603 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0 |
Hi @alamb , i believe we also can do the clickbench benchmark for this PR. But i am not confident about the result since it seems we will always add some overhead to aggregate. Thanks! |
🤖 |
🤖: Benchmark completed Details
|
This comment was marked as outdated.
This comment was marked as outdated.
Running the benchmarks again to gather more details |
This comment was marked as outdated.
This comment was marked as outdated.
Thank you @alamb , it's surprising that performance has no regression, even faster for clickbench_partitioned, it may due to we yield for each partition running, and those make the partition running more efficient. |
One performance aspect I've been looking at is the cost of yielding. There's no magic as far as I can tell. Returning a Pending simply leads to a full unwind of the call stack by virtue of the return bubbling up all the way up to the tokio executor and then a full descent back to the point where you left off using the same function calls that got you there in the first place. Running with target_partitions = 1, shows that for queries like the deeply nested window/sort query you linked to @ozankabak the call stack can get pretty deep. It's essentially proportionate to the depth of the plan tree. To mitigate this, would it make sense for pipeline breaking operators to run their pipeline breaking portion in a SpawnedTask instead of as child? I'm thinking of the sort phase of sort, the build phase of join, etc. Regardless of how where you inject Pending that seems beneficial to keep the call stack that needs to be unwound shallow. Note that this same argument does suggest it could more interesting to do the cooperative yield where the looping is happening rather than where the data is produced. The loop is the shallowest point, definitely if you spawn a task since that gets you a new root, while the producer is the deepest point. Cutting the call stack using spawned tasks may also mitigate the deeply nested query concern regarding checking for yielding at multiple levels. The yield is never going to go beyond the scope of a single task. pepijnve@75f8800 illustrates the change I'm suggesting. If you think it's useful I can turn this into a separate PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found some time to work on this tonight and it looks good to me now.
To summarize where we are:
- We add yields to all leaf nodes, but no yields to any intermediate node.
- We added a bunch of tests to cover some corner cases and all of them pass.
- There is a single new
with_cooperative_yields
API, which returns a cooperatively yielding version of a plan object (if it exists). If it doesn't exist for a leaf node, we add an auxiliary operator to handle yielding.
Future work:
- We will study input-side pipelining behaviors and improve the pipelining API, so that we only trigger explicit yielding when it is necessary. Given the small number of leaf nodes, we are not that far off from optimality even as is, which is great. We have some ideas on what to try here, but the current state seems quite good -- so we can merge it to fix downstream issues as we make further progress.
- We will think about supporting cases involving non-volcano (e.g. spill) data flow.
@zhuqi-lucas and @alamb, PTAL
Thank you, i agree that we are in good state, because:
I will also help investigate following case, may be as a follow-up ticket, thanks!
Additional benefit from this PR? |
Good idea, I'm curious to see if you can. |
I merged the latest from main, this is good to go |
…nstead of frequency for config parameter terminology
@zhuqi-lucas, I wanted to make a few final finishing touches as we gave a chance in case @alamb wants to take a final look. I changed the config terminology from "frequency" to "period" because the former was kind of a misnomer. I also did some refactoring to remove some code repeatitions. Can you please double check to make sure all looks good on your end? Thanks. |
Thank you @ozankabak , the final refractor and name change looks good to me. Yeah , let's wait @alamb to get the final look. Thanks! |
Thank you, it makes sense, i will experiment it as a follow-up. |
@alamb FYI I plan to merge this soon. It is OK if you don't have the bandwidth to take a look, it is the first step towards the design we discussed before. |
I can't help but feel that this is needlessly being rushed. Committing to a new public API on an extension point before it's clearly proven feels like a bad idea to me. If it was purely an implementation detail it would be less of an issue. What's the hurry? The more I've been digging into the code over the past few days the clearer it is that getting yielding just right while avoiding wasteful work is something you need to be careful about. See for instance #16196. Wouldn't it be prudent to give this some more time to mature and maybe see if there are better strategies? It's not a universal solution, but just as an example what I found in #16319 is that restructuring the operator code a little bit makes them behave much nicer from the caller's perspective. Unfortunately you do need to do this kind of thing at the operator implementation level. I do think there are implementation patterns here that could server as building blocks for operators. 'Build a stream async and then emit from it' for instance seems to be pretty common. Rather than having a bespoke implementation in each operator it would be useful to have a combinator that operators can use. Perhaps there's a similar zero cost solution to the 'drains input before first emit' pattern as well? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas and @ozankabak and @pepijnve -- I think this PR is really nicely commented and structured. It is easy to read and review.
However, I am sorry but I am a bit confused by the implications of this PR now.
From what I can tell, it doesn't insert YieldExec or add Yielding for AggregateExec, which is the operator we have real evidence doesn't yield. Instead it seems to add yielding for DataSource exec, which already will yield when reading Parquet from a remote store, for example 🤔
fn wrap_leaves_of_pipeline_breakers( | ||
plan: Arc<dyn ExecutionPlan>, | ||
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> { | ||
let is_pipeline_breaker = plan.properties().emission_type == EmissionType::Final; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this code in the initial PR
/// with the runtime by yielding control back to the runtime every `frequency` | ||
/// batches. This is useful for operators that do not natively support yielding | ||
/// control, allowing them to be used in a runtime that requires yielding for | ||
/// cancellation or other purposes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would also help to explain here how to avoid this Exec in your plan
/// cancellation or other purposes. | |
/// cancellation or other purposes. | |
/// | |
/// # Note | |
/// If your ExecutionPlan periodically yields control back to the scheduler | |
/// implement [`ExecutionPlan::with_cooperative_yields`] to avoid the need for this | |
/// node. |
@@ -137,6 +138,7 @@ impl PhysicalOptimizer { | |||
// are not present, the load of executors such as join or union will be | |||
// reduced by narrowing their input tables. | |||
Arc::new(ProjectionPushdown::new()), | |||
Arc::new(WrapLeaves::new()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please call this pass something related to Cancel or Yield? Like InsertYieldExec
to make it clearer what it is doing?
} | ||
|
||
#[derive(Debug)] | ||
struct InfiniteExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a new exec, perhaps we could use MemoryExec
(with like 1000 batch.clone()
for example) to show it is configured correctly
@@ -242,6 +242,7 @@ physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[W | |||
physical_plan after LimitAggregation SAME TEXT AS ABOVE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused about why YieldStreamExec does not appear in more of the explain slt
plans
I am happy to wait for a bit more testing on this PR -- we have now about a month before the next release so there is no pressure from there. However, I do like a bias of action, and if this PR fixes a real problem, I don't think we should bikeshed it indefinitely
I was thinking of
@ozankabak -- what are the next steps? I may have lost track -- if this PR needs some follow on I think we should file tickets to explain what they are before merging it (I can help to file such tickets) |
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
Which issue does this PR close?
Rationale for this change
Some AggregateExecs can always make progress and thus they may never notice that the plan was canceled.
🧵 Yield-based Cooperative Scheduling in
AggregateExec
This PR introduces a lightweight yielding mechanism to
AggregateExec
to improve responsiveness to external signals (likeCtrl-C
) without adding any measurable performance overhead.🧭 Motivation
For aggregation queries without any
GROUP BY
Similarly for queries like
Where the computational work for each input is substantial
Are these changes tested?
Yes
Before this PR:
It will always stuck until done, we can't ctril c to stop it.
Are there any user-facing changes?
Some CPU heavy aggregation plans now cancel much sooner