-
Notifications
You must be signed in to change notification settings - Fork 932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add multi-partition Join
support to cuDF-Polars
#17518
Add multi-partition Join
support to cuDF-Polars
#17518
Conversation
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
/ok to test |
Join
support to cuDF-PolarsJoin
support to cuDF-Polars
This PR pulls out the `Shuffle` logic from #17518 to simplify the review process. The goal is to establish the shuffle groundwork for multi-partition `Join` and `Sort` operations. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: #17744
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all your work! I have some small questions/suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks pretty good. It would be good to have a few comments describing the algorithm for the join cases that are not inner joins (why the shuffle of both sides is needed, etc...)
shuffle_options, | ||
output_count, | ||
) | ||
new_node = ir.reconstruct([left, right]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we should implement short-circuiting for equal children in reconstruct
(so that ir.reconstruct(ir.children)
just returns ir
)
# Avoid the broadcast if the "large" table is already shuffled | ||
other_shuffled = ( | ||
partition_info[other].partitioned_on == other_on | ||
and partition_info[other].count == output_count | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Is this overly restrictive? For example, suppose I have an already shuffled (large) left frame and a right frame that is 100 rows. It seems like in that scenario I would probably not want to shuffle the right frame, but rather broadcast it. Even though the left frame is already shuffled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question - I'm skeptical that this is actually restrictive.
If our large table is already shuffled, then we almost certainly want to hash-partition the smaller table and send off the necessary pieces to each partition of the large table. I don't think we gain much of a performance advantage by skipping the part where we hash-partition the small table. We do, however, loose our ability to avoid shuffling for follow-up joins on the same columns.
I suppose we may still want to perform a broadcast join if the small table is a single partition, and we are not doing any additional joins on the same columns? Even in that case, the performance is probably comparable.
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: | ||
how = ir.options[0].lower() | ||
if how != "inner": | ||
shuffle_options: dict[str, Any] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this future-proofing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, exactly. I suppose I'm expecting that we will eventually extract shuffle options during Join
/Sort
/GroupBy
translation enable/disable use of the shuffle service. That said, I think a lot will change when we introduce a shuffle-service Join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Rick, looks good!
/merge |
Description
Adds multi-partition
Join
support following the same design as #17441In order to support parallel joins, this PR also introduces a special
Shuffle
node.Checklist