-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Draft: Use upstream arrow coalesce
kernel in DataFusion
#16249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -98,197 +49,93 @@ impl BatchCoalescer { | |||
fetch: Option<usize>, | |||
) -> Self { | |||
Self { | |||
schema, | |||
target_batch_size, | |||
inner: BatchCoalescer::new(schema, target_batch_size), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the key change here -- move all the buffer management upstream into arrow
/// [`BatchCoalescer::push_batch()`] operation. | ||
/// | ||
/// The caller should take different actions, depending on the variant returned. | ||
pub enum CoalescerState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the buffering is all managed upstream now, so there is no need to expose buffering details to the user of the coalescer
@@ -488,110 +329,6 @@ mod tests { | |||
.unwrap() | |||
} | |||
|
|||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all moved upstream
/// Execution metrics | ||
baseline_metrics: BaselineMetrics, | ||
/// The current inner state of the stream. This state dictates the current | ||
/// action or operation to be performed in the streaming process. | ||
inner_state: CoalesceBatchesStreamState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the Coalescer can now buffer batches internally, this enum can be reduced to "complete" or not
🤖 |
🤖: Benchmark completed Details
|
TLDR is that the performance looks good. I'll fixup the tests shortly |
🤖 |
🤖: Benchmark completed Details
|
@@ -2107,9 +2107,9 @@ RIGHT JOIN (select t2_id from join_t2 where join_t2.t2_id > 11) as join_t2 | |||
ON join_t1.t1_id < join_t2.t2_id | |||
---- | |||
33 44 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this query has no ORDER BY so it is ok if the output emerges in a different order
…lected b…atches: (#7597) # Which issue does this PR close? - Part of #6692 - Part of #7589 # Rationale for this change The pattern of combining multiple small RecordBatches to form one larger one for subsequent processing is common in query engines like DataFusion which filter or partition incoming Arrays. Current best practice is to use the `filter` or `take` kernels and then `concat` kernels as explained in - #6692 This pattern also appears in my attempt to improve parquet filter performance (to cache the result of applying a filter rather than re-decoding the results). See - apache/datafusion#3463 - #7513 The current pattern is non optimal as it requires: 1. At least 2x peak memory (holding the input and output of `concat`) 2. 2 copies of the data (to create the output of `filter` and then create the output of `concat`) The theory is that with sufficient optimization we can reduce the peak memory requirements and (possibly) make it faster as well. However, to add a benchmark for this filter+concat, I basically had nothing to benchmark. Specifically, there needed to be an API to call. - Note I also made a PR to DataFusion showing this API can be used and it is not slower: apache/datafusion#16249 # What changes are included in this PR? I ported the code from DataFusion downstream upstream into arrow-rs so 1. We can use it in the parquet reader 2. We can benchmark and optimize it appropriately 1. Add `BatchCoalescer` to `arrow-select`, and tests 2. Update documentation 2. Add examples 3. Add a `pub` export in `arrow` 4. Add Benchmark # Are there any user-facing changes? This is a new API. I next plan to make an benchmark for this particular
🤖 |
https://github.com/apache/datafusion/actions/runs/15506836203/job/43662848611?pr=16249
🤔 that isn't good |
I added a PR for reverting the changes in arrow-rs apache/arrow-rs#7623 - probably something subtle with one of the fast paths that isn't tested in arrow-rs. |
Thank you. I have a theory: apache/arrow-rs#7623 (comment) |
🤖 |
#7623) This reverts commit 7739a83. # Which issue does this PR close? # Rationale for this change I found this errors in DataFusion (see apache/datafusion#16249 (comment)), so let's revert it and find the error. # What changes are included in this PR? # Are there any user-facing changes?
7d6471b
to
49cb62e
Compare
@alamb benchmark runs ok now |
🤖 |
Awesome -- thanks -- I restarted it now |
🤖: Benchmark completed Details
|
This is similar to my results, some larger gains on TPC-H, smaller gains (spreaded out over different queries) for clickbench. |
SWEET! Just wait until we get rid of the intermediate copy ;) it is going to be amazing |
Which issue does this PR close?
Rationale for this change
I am trying to move the coalesce operation upstream into arrow-rs so that it can be reused in parquet filter pushdown and more highly optimized. See :
coalesce
kernel andBatchCoalescer
for statefully combining selected b…atches: arrow-rs#7597The proposed upstream coalescer API is slightly different (guaranteed batch size, and doesn't have limit) so we must adapt the DataFusion code to handle this
What changes are included in this PR?
This PR refactors the
BatchCoalescer
in DataFusion to use the proposed upstream API to show that itAre these changes tested?
By CI tests
Are there any user-facing changes?
No