-
Notifications
You must be signed in to change notification settings - Fork 954
Single-partition Dask executor for cuDF-Polars #17262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
rapids-bot
merged 67 commits into
rapidsai:branch-25.02
from
rjzamora:cudf-polars-dask-simple
Nov 25, 2024
Merged
Changes from all commits
Commits
Show all changes
67 commits
Select commit
Hold shift + click to select a range
a590076
cleanup
rjzamora 7f1bec7
rename to parallel
rjzamora 023e085
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora e7a2fce
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora 69a3374
simplify solution
rjzamora 6aa3694
Merge branch 'cudf-polars-dask-simple' of github.com:rjzamora/cudf in…
rjzamora ea22a9a
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora 915a779
deeper dive
rjzamora bd9d783
improve simple agg reduction
rjzamora 7363d91
cleanup fundamental bugs
rjzamora 58ee5f4
move PartitionInfo
rjzamora ecc51ef
add Literal
rjzamora 75eae0c
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora fb2d6bf
add lower_ir_graph
rjzamora c17564c
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora 6e66998
strip out most exploratory logic
rjzamora c41723d
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora d774f38
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora 6886f8d
Add basic Dask evaluate test
pentschev 29b2d7b
Replace environment variable with new `"executor"` config
pentschev 3a68a6d
Add kwarg to specify executor in `assert_gpu_result_equal`
pentschev 8079ac0
Add couple of Dask executor tests
pentschev 6f7ccee
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev af4c5f5
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev 8aed94f
Improve `count` code
pentschev aadaf10
Pass `executor` to `GPUEngine` in `assert_gpu_result_equal`
pentschev c3a6907
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev 4f67819
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora c8ca09e
Clarify intent renaming executor to "dask-experimental"
pentschev 3fd51bb
move PartitionInfo out of ir module
rjzamora bf182e4
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev 453e274
skip coverage on sanity-check errors
rjzamora 2b74f28
Add `--executor` to pytest
pentschev 6d3cd55
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev 2398a2e
Enable dask-experimental tests in CI, remove duplicates
pentschev 9aa479a
Fix wrong protocol name in deserialization test
pentschev 64ea98e
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev 22678a5
Remove `executor` kwarg from `assert_gpu_result_equal`
pentschev 41441ca
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora efadb78
Reintroduce `executor` kwarg in `assert_gpu_result_equal`
pentschev 9b78d8f
Add basic tests for all executors to ensure 100% coverage
pentschev c54c217
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev 70da7a9
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev 3aeb1e4
Fix `executor` in `assert_gpu_result_equal`
pentschev 485a161
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev eb41100
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora e91fdb9
Merge branch 'branch-25.02' into cudf-polars-dask-simple
pentschev cac9e4f
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora addae40
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora dbf37d3
address code review - round 1
rjzamora 4d21f7c
move sort tupling
rjzamora e241af3
remove need for stringify
rjzamora 1064fcb
address code review - round 2
rjzamora bbddfb6
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora aeecd4d
remove global caching
rjzamora 09c5217
use general StateInfo
rjzamora 62f10bc
revert (for now)
rjzamora 77113d6
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora 9967cfb
skip coverage on singledispatch miss
rjzamora 075d41e
typo
rjzamora 21e598a
Rename `"cudf"` executor to `"pylibcudf"`
pentschev 7febe21
Update `test_evaluate_dask()` with `collect()`
pentschev ac4d2da
Remove tests docstrings
pentschev 2ced4a0
Improve `executor` typing
pentschev 98b0b36
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
pentschev af5aff8
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora f6f7eda
address code review
rjzamora File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
236 changes: 236 additions & 0 deletions
236
python/cudf_polars/cudf_polars/experimental/parallel.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,236 @@ | ||
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
"""Partitioned LogicalPlan nodes.""" | ||
|
||
from __future__ import annotations | ||
|
||
import operator | ||
from functools import reduce, singledispatch | ||
from typing import TYPE_CHECKING, Any | ||
|
||
from cudf_polars.dsl.ir import IR | ||
from cudf_polars.dsl.traversal import traversal | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import MutableMapping | ||
from typing import TypeAlias | ||
|
||
from cudf_polars.containers import DataFrame | ||
from cudf_polars.dsl.nodebase import Node | ||
from cudf_polars.typing import GenericTransformer | ||
|
||
|
||
class PartitionInfo: | ||
""" | ||
Partitioning information. | ||
|
||
This class only tracks the partition count (for now). | ||
""" | ||
|
||
__slots__ = ("count",) | ||
|
||
def __init__(self, count: int): | ||
self.count = count | ||
|
||
|
||
LowerIRTransformer: TypeAlias = ( | ||
"GenericTransformer[IR, MutableMapping[IR, PartitionInfo]]" | ||
) | ||
"""Protocol for Lowering IR nodes.""" | ||
|
||
|
||
def get_key_name(node: Node) -> str: | ||
"""Generate the key name for a Node.""" | ||
return f"{type(node).__name__.lower()}-{hash(node)}" | ||
|
||
|
||
@singledispatch | ||
def lower_ir_node( | ||
ir: IR, rec: LowerIRTransformer | ||
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: | ||
""" | ||
Rewrite an IR node and extract partitioning information. | ||
|
||
Parameters | ||
---------- | ||
ir | ||
IR node to rewrite. | ||
rec | ||
Recursive LowerIRTransformer callable. | ||
|
||
Returns | ||
------- | ||
new_ir, partition_info | ||
The rewritten node, and a mapping from unique nodes in | ||
the full IR graph to associated partitioning information. | ||
|
||
Notes | ||
----- | ||
This function is used by `lower_ir_graph`. | ||
|
||
See Also | ||
-------- | ||
lower_ir_graph | ||
""" | ||
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover | ||
|
||
|
||
@lower_ir_node.register(IR) | ||
def _(ir: IR, rec: LowerIRTransformer) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: | ||
if len(ir.children) == 0: | ||
# Default leaf node has single partition | ||
return ir, {ir: PartitionInfo(count=1)} | ||
|
||
# Lower children | ||
children, _partition_info = zip(*(rec(c) for c in ir.children), strict=False) | ||
partition_info = reduce(operator.or_, _partition_info) | ||
|
||
# Check that child partitioning is supported | ||
count = max(partition_info[c].count for c in children) | ||
if count > 1: | ||
raise NotImplementedError( | ||
f"Class {type(ir)} does not support multiple partitions." | ||
) # pragma: no cover | ||
|
||
# Return reconstructed node and partition-info dict | ||
partition = PartitionInfo(count=1) | ||
new_node = ir.reconstruct(children) | ||
partition_info[new_node] = partition | ||
return new_node, partition_info | ||
Comment on lines
+95
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: No need to address here, but just to note for a followup. This does "unnecessary" reconstruction if the children are unchanged. We could consider making |
||
|
||
|
||
def lower_ir_graph(ir: IR) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: | ||
""" | ||
Rewrite an IR graph and extract partitioning information. | ||
|
||
Parameters | ||
---------- | ||
ir | ||
Root of the graph to rewrite. | ||
|
||
Returns | ||
------- | ||
new_ir, partition_info | ||
The rewritten graph, and a mapping from unique nodes | ||
in the new graph to associated partitioning information. | ||
|
||
Notes | ||
----- | ||
This function traverses the unique nodes of the graph with | ||
root `ir`, and applies :func:`lower_ir_node` to each node. | ||
|
||
See Also | ||
-------- | ||
lower_ir_node | ||
""" | ||
from cudf_polars.dsl.traversal import CachingVisitor | ||
|
||
mapper = CachingVisitor(lower_ir_node) | ||
return mapper(ir) | ||
|
||
|
||
@singledispatch | ||
def generate_ir_tasks( | ||
ir: IR, partition_info: MutableMapping[IR, PartitionInfo] | ||
) -> MutableMapping[Any, Any]: | ||
""" | ||
Generate a task graph for evaluation of an IR node. | ||
|
||
Parameters | ||
---------- | ||
ir | ||
IR node to generate tasks for. | ||
partition_info | ||
Partitioning information, obtained from :func:`lower_ir_graph`. | ||
|
||
Returns | ||
------- | ||
mapping | ||
A (partial) dask task graph for the evaluation of an ir node. | ||
|
||
Notes | ||
----- | ||
Task generation should only produce the tasks for the current node, | ||
referring to child tasks by name. | ||
|
||
See Also | ||
-------- | ||
task_graph | ||
""" | ||
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover | ||
|
||
|
||
@generate_ir_tasks.register(IR) | ||
def _( | ||
ir: IR, partition_info: MutableMapping[IR, PartitionInfo] | ||
) -> MutableMapping[Any, Any]: | ||
# Single-partition default behavior. | ||
# This is used by `generate_ir_tasks` for all unregistered IR sub-types. | ||
if partition_info[ir].count > 1: | ||
raise NotImplementedError( | ||
f"Failed to generate multiple output tasks for {ir}." | ||
) # pragma: no cover | ||
|
||
child_names = [] | ||
for child in ir.children: | ||
child_names.append(get_key_name(child)) | ||
if partition_info[child].count > 1: | ||
raise NotImplementedError( | ||
f"Failed to generate tasks for {ir} with child {child}." | ||
) # pragma: no cover | ||
|
||
key_name = get_key_name(ir) | ||
return { | ||
(key_name, 0): ( | ||
ir.do_evaluate, | ||
*ir._non_child_args, | ||
*((child_name, 0) for child_name in child_names), | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
} | ||
|
||
|
||
def task_graph( | ||
ir: IR, partition_info: MutableMapping[IR, PartitionInfo] | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> tuple[MutableMapping[Any, Any], str | tuple[str, int]]: | ||
""" | ||
Construct a task graph for evaluation of an IR graph. | ||
|
||
Parameters | ||
---------- | ||
ir | ||
Root of the graph to rewrite. | ||
partition_info | ||
A mapping from all unique IR nodes to the | ||
associated partitioning information. | ||
|
||
Returns | ||
------- | ||
graph | ||
A Dask-compatible task graph for the entire | ||
IR graph with root `ir`. | ||
|
||
Notes | ||
----- | ||
This function traverses the unique nodes of the | ||
graph with root `ir`, and extracts the tasks for | ||
each node with :func:`generate_ir_tasks`. | ||
|
||
See Also | ||
-------- | ||
generate_ir_tasks | ||
""" | ||
graph = reduce( | ||
operator.or_, | ||
(generate_ir_tasks(node, partition_info) for node in traversal(ir)), | ||
) | ||
return graph, (get_key_name(ir), 0) | ||
|
||
|
||
def evaluate_dask(ir: IR) -> DataFrame: | ||
"""Evaluate an IR graph with Dask.""" | ||
from dask import get | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
ir, partition_info = lower_ir_graph(ir) | ||
|
||
graph, key = task_graph(ir, partition_info) | ||
return get(graph, key) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.