-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11290: [Rust][DataFusion] Address hash aggregate performance issue with high number of groups #9234
Conversation
Codecov Report
@@ Coverage Diff @@
## master #9234 +/- ##
=======================================
Coverage 81.61% 81.61%
=======================================
Files 215 215
Lines 51891 51897 +6
=======================================
+ Hits 42353 42358 +5
- Misses 9538 9539 +1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool idea. LGTM. Thanks a lot, @Dandandan
@@ -288,6 +288,9 @@ fn group_aggregate_batch( | |||
// Make sure we can create the accumulators or otherwise return an error | |||
create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?; | |||
|
|||
// Keys received in this batch | |||
let mut batch_keys = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a set rather than a vec since it is intended to track the unique set of keys in the batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I thought first, but this is checked already when push
ing the keys to the vec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It checks for either an empty indices vec (which means no rows yet with this key) or being the first row with this key in or_insert_with
.
Added a ticket for remaining work including some profiling information here: https://issues.apache.org/jira/browse/ARROW-11300 |
I merged this branch locally to master and re-ran all the tests. Things looked good so merging it in. |
…sue with high number of groups Currently, we loop to the hashmap for every key. However, as we receive a batch, if we a lot of groups in the group by expression (or receive sorted data, etc.) then we could create a lot of empty batches and call `update_batch` for each of the key already in the hashmap. In the PR we keep track of which keys we received in the batch and only update the accumulators with the same keys instead of all accumulators. On the db-benchmark h2oai/db-benchmark#182 this is the difference (mainly q3 and q5, others seem to be noise). It doesn't seem to completely solve the problem, but it reduces the problem already quite a bit. This PR: ``` q1 took 340 ms q2 took 1768 ms q3 took 10975 ms q4 took 337 ms q5 took 13529 ms ``` Master: ``` q1 took 330 ms q2 took 1648 ms q3 took 16408 ms q4 took 335 ms q5 took 21074 ms ``` Closes #9234 from Dandandan/hash_agg_speed2 Authored-by: Heres, Daniel <[email protected]> Signed-off-by: Andrew Lamb <[email protected]>
…ash aggregation with small groups Based on #9234, this PR improves the situation described in https://issues.apache.org/jira/browse/ARROW-11300. The current situation is that we call `take` on arrays, which is fine, but causes a lot of small `Arrays` to be created / allocated. when we have only a small number of rows in each group. This improves the results on the group by queries on db-benchmark: PR: ``` q1 took 32 ms q2 took 422 ms q3 took 3468 ms q4 took 44 ms q5 took 3166 ms q7 took 3081 ms ``` #9234 (different results from that PR description as this has now partitioning enabled and a custom allocator) ``` q1 took 34 ms q2 took 389 ms q3 took 4590 ms q4 took 47 ms q5 took 5152 ms q7 took 3941 ms ``` The PR changes the algorithm to: * Create indices / offsets of all keys / indices new in the batch. * `take` the arrays based on indices in one go (so it only requires one bigger allocation for each array) * Use `slice` based on the offsets to take values from the arrays and pass it to the accumulators. Closes #9271 from Dandandan/hash_agg_few_rows Authored-by: Heres, Daniel <[email protected]> Signed-off-by: Jorge C. Leitao <[email protected]>
…sue with high number of groups Currently, we loop to the hashmap for every key. However, as we receive a batch, if we a lot of groups in the group by expression (or receive sorted data, etc.) then we could create a lot of empty batches and call `update_batch` for each of the key already in the hashmap. In the PR we keep track of which keys we received in the batch and only update the accumulators with the same keys instead of all accumulators. On the db-benchmark h2oai/db-benchmark#182 this is the difference (mainly q3 and q5, others seem to be noise). It doesn't seem to completely solve the problem, but it reduces the problem already quite a bit. This PR: ``` q1 took 340 ms q2 took 1768 ms q3 took 10975 ms q4 took 337 ms q5 took 13529 ms ``` Master: ``` q1 took 330 ms q2 took 1648 ms q3 took 16408 ms q4 took 335 ms q5 took 21074 ms ``` Closes apache#9234 from Dandandan/hash_agg_speed2 Authored-by: Heres, Daniel <[email protected]> Signed-off-by: Andrew Lamb <[email protected]>
…ash aggregation with small groups Based on apache#9234, this PR improves the situation described in https://issues.apache.org/jira/browse/ARROW-11300. The current situation is that we call `take` on arrays, which is fine, but causes a lot of small `Arrays` to be created / allocated. when we have only a small number of rows in each group. This improves the results on the group by queries on db-benchmark: PR: ``` q1 took 32 ms q2 took 422 ms q3 took 3468 ms q4 took 44 ms q5 took 3166 ms q7 took 3081 ms ``` apache#9234 (different results from that PR description as this has now partitioning enabled and a custom allocator) ``` q1 took 34 ms q2 took 389 ms q3 took 4590 ms q4 took 47 ms q5 took 5152 ms q7 took 3941 ms ``` The PR changes the algorithm to: * Create indices / offsets of all keys / indices new in the batch. * `take` the arrays based on indices in one go (so it only requires one bigger allocation for each array) * Use `slice` based on the offsets to take values from the arrays and pass it to the accumulators. Closes apache#9271 from Dandandan/hash_agg_few_rows Authored-by: Heres, Daniel <[email protected]> Signed-off-by: Jorge C. Leitao <[email protected]>
…sue with high number of groups Currently, we loop to the hashmap for every key. However, as we receive a batch, if we a lot of groups in the group by expression (or receive sorted data, etc.) then we could create a lot of empty batches and call `update_batch` for each of the key already in the hashmap. In the PR we keep track of which keys we received in the batch and only update the accumulators with the same keys instead of all accumulators. On the db-benchmark h2oai/db-benchmark#182 this is the difference (mainly q3 and q5, others seem to be noise). It doesn't seem to completely solve the problem, but it reduces the problem already quite a bit. This PR: ``` q1 took 340 ms q2 took 1768 ms q3 took 10975 ms q4 took 337 ms q5 took 13529 ms ``` Master: ``` q1 took 330 ms q2 took 1648 ms q3 took 16408 ms q4 took 335 ms q5 took 21074 ms ``` Closes apache#9234 from Dandandan/hash_agg_speed2 Authored-by: Heres, Daniel <[email protected]> Signed-off-by: Andrew Lamb <[email protected]>
…ash aggregation with small groups Based on apache#9234, this PR improves the situation described in https://issues.apache.org/jira/browse/ARROW-11300. The current situation is that we call `take` on arrays, which is fine, but causes a lot of small `Arrays` to be created / allocated. when we have only a small number of rows in each group. This improves the results on the group by queries on db-benchmark: PR: ``` q1 took 32 ms q2 took 422 ms q3 took 3468 ms q4 took 44 ms q5 took 3166 ms q7 took 3081 ms ``` apache#9234 (different results from that PR description as this has now partitioning enabled and a custom allocator) ``` q1 took 34 ms q2 took 389 ms q3 took 4590 ms q4 took 47 ms q5 took 5152 ms q7 took 3941 ms ``` The PR changes the algorithm to: * Create indices / offsets of all keys / indices new in the batch. * `take` the arrays based on indices in one go (so it only requires one bigger allocation for each array) * Use `slice` based on the offsets to take values from the arrays and pass it to the accumulators. Closes apache#9271 from Dandandan/hash_agg_few_rows Authored-by: Heres, Daniel <[email protected]> Signed-off-by: Jorge C. Leitao <[email protected]>
Currently, we loop to the hashmap for every key.
However, as we receive a batch, if we a lot of groups in the group by expression (or receive sorted data, etc.) then we could create a lot of empty batches and call
update_batch
for each of the key already in the hashmap.In the PR we keep track of which keys we received in the batch and only update the accumulators with the same keys instead of all accumulators.
On the db-benchmark h2oai/db-benchmark#182 this is the difference (mainly q3 and q5, others seem to be noise). It doesn't seem to completely solve the problem, but it reduces the problem already quite a bit.
This PR:
Master: