-
Notifications
You must be signed in to change notification settings - Fork 953
Implement distributed sorted for cudf_polars
#18912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-25.08
Are you sure you want to change the base?
Conversation
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
Moved to 25.08 |
ir.by, | ||
ir.order, | ||
ir.null_order, | ||
ir.config_options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note that you can pass in rec.state["config_options"]
here. This way you don't need to add the config_options
attribute to Sort
. We only need to add that attribute if the IR object needs to access the configs within generate_ir_tasks
.
cudf_polars
cudf_polars
null_order + [plc.types.NullOrder.AFTER] * 2, | ||
) | ||
global_split_points = plc.Column.from_arrow( | ||
pa.array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's an ongoing effort to remove the hard requirement on pyarrow. I'm not sure what the alternative is, but perhaps plc.Column.from_iterable_of_py
?
# the partition id and the local row number of the final split values | ||
*split_values, split_part_id, split_local_row = split_values.columns() | ||
split_values = plc.Table(split_values) | ||
# Now we find the first and last row in the local table corresponding to the split value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this is common and expensive enough to merit a function that does the lower and upper in a single pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a first pass. Thanks for working on this @seberg !
shuffle_method = ir.config_options.executor.shuffle_method | ||
|
||
by = [ne.value.name for ne in ir.by if isinstance(ne.value, Col)] | ||
if len(by) != len(ir.by): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(by) != len(ir.by): | |
if len(by) != len(ir.by): # pragma: no cover |
raise NotImplementedError("Sorting columns must be column names.") | ||
|
||
sort_boundaries_name, graph = _sort_boundaries_graph( | ||
get_key_name(ir.children[0]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Minor preference to do child, = ir.children
earlier in this function, and then refer to child
instead of ir.children[0]
throughout.
except (ImportError, ValueError) as err: | ||
# ImportError: rapidsmpf is not installed | ||
# ValueError: rapidsmpf couldn't find a distributed client | ||
if shuffle_method == "rapidsmpf": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need a # pragma: no cover
here?
row id columns). The columns are already in the order of `by`. | ||
""" | ||
df = df.select(by) | ||
candidates = [i * df.num_rows // num_partitions for i in range(num_partitions)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
candidates = range(0, df.num_rows, df.num_rows // num_partitions)
Maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose I did that because it get's fewer rounding errors towards the end. But the worst case for that is probably the last partition being too big by num_partitions/2
.
(Kept it for now, but happy to change)
Shuffling is performed by extracting sort boundary candidates from all partitions, | ||
sharing them all-to-all and then exchanging data accordingly. | ||
The sorting information is required to be passed in identically | ||
to the initial sort. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we somehow establish as clearly as possible that the child of this IR node must have locally sorted partitions? I think you have stated this both implicitly and explicitly in other places. I suspect it would be good information to have here as well.
The reason for much of the complexity is to get the result sizes as | ||
precise as possible even when e.g. all values are equal. | ||
In other words, this goes through extra effort to split the data at the | ||
precise boundaries (which includes part_id and local_row_number). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay - I was initially confused by the fact that this function wasn't already applied to the concatenated result in _sort_boundaries_graph
(before being broadcasted to the shuffler insertion).
If I understand correctly, this is the motivation. We may be doing redundant work for every input partition, but this allows us to handle pathological data distributions. How much of an overhead do you think this is for uniformly-distributed data (where local adjustments are unnecessary)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_sort_boundaries_graph
(I'll rename to _split_boundary_candidates_graph
, I think) are the values at which we wish to split.
To find the indices at which to split, we also need the local sorted data, so that is a clear second step (and the one that adds the complexity, so it is not avoidable).
Let me pull out the sort+extract part and move it into _sort_boundaries_graph
. I suppose, right now dask is likely pulling it all into one worker anyway and then distributing, so might as well sort+extract there.
Plus, I thought a bit more about a case where we have one partition per worker/GPU where repeating that work locally seems fine, but if we have many partitions per worker/GPU it may be nice not do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much of an overhead do you think this is for uniformly-distributed data (where local adjustments are unnecessary)?
Ok, the above comment was about something else. So doing this double work to find the right split points, seemed to add maybe 10% overhead in worst case (data already sorted, so no exchange happens and sort is fast) -- not an exact science right now.
If that seems not nice, maybe the solution is to rewrite it by extracting the points and doing an equality check instead, since that avoids the second full binary search.
This implements distributed sorting for cudf-polars, it should work but is still low on tests and I am not sure that local tests exercised the rapidsmpf paths properly. Right now, it is structured around introducing a ``ShuffleSorted`` IR but I am not sure that is better than e.g. merging everything. It does some smaller refactors to the shuffling to re-use some code there (but not actually move too much sorting related logic into the file). Signed-off-by: Sebastian Berg <[email protected]>
…plits Signed-off-by: Sebastian Berg <[email protected]>
Signed-off-by: Sebastian Berg <[email protected]>
Signed-off-by: Sebastian Berg <[email protected]>
This implements distributed sorting for cudf-polars, it should work but is missing new sorting tests and I am also not quite sure that local tests exercised the
rapidsmpf
paths properly.Right now, it is structured around introducing a
ShuffleSorted
IR but maybe it would be nicer to merge the steps further.It does some smaller refactors to the shuffling to re-use some code there (but not actually move too much sorting related logic into the file).
Main missing things:
concat
in both paths (rapidsmpf
and not) for merging the exchanged chunks. If the sort isn't stable, then this should useplc...merge
, this will require changingrapidsmpf
, a bit more.zlice
handling (top/bottom limits). For small values there is probably little point but for large result slices this needs to be threaded in.