Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM][WIP] Single-partition Dask executor for cuDF-Polars #17262

Draft
wants to merge 4 commits into
base: branch-24.12
Choose a base branch
from

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Nov 7, 2024

WARNING: This is still very rough.

Description

The goal here is to lay down the initial foundation for dask-based evaluation of IR graphs in cudf-polars. The first pass will only support single-partition workloads. This functionality could be achieved with much less-complicated changes to cudf-polars. However, we do want to build multi-partition support on top of this.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@rjzamora rjzamora added 5 - DO NOT MERGE Hold off on merging; see PR for details improvement Improvement / enhancement to an existing function non-breaking Non-breaking change cudf.polars Issues specific to cudf.polars labels Nov 7, 2024
@rjzamora rjzamora self-assigned this Nov 7, 2024
@github-actions github-actions bot added the Python Affects Python cuDF API. label Nov 7, 2024

@_single_partition_node.register
def _(node: PythonScan, rec) -> SParPythonScan:
return SParPythonScan(*node._ctor_arguments(map(rec, node.children)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wence- - This PR is still rough, so it may not make sense to debug type-hinting problems yet. That said, I'm getting a ton of errors that I don't know how to address like: error: Argument 1 to "SParPythonScan" has incompatible type "*Sequence[Any | IR]"; expected "Mapping[str, Any]" [arg-type]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, can you apply:

diff --git a/python/cudf_polars/cudf_polars/dsl/nodebase.py b/python/cudf_polars/cudf_polars/dsl/nodebase.py
index 228d300f46..c96414ad3c 100644
--- a/python/cudf_polars/cudf_polars/dsl/nodebase.py
+++ b/python/cudf_polars/cudf_polars/dsl/nodebase.py
@@ -40,7 +40,7 @@ class Node(Generic[T]):
     children: tuple[T, ...]
     _non_child: ClassVar[tuple[str, ...]] = ()
 
-    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any | T]:
+    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any]:
         return (*(getattr(self, attr) for attr in self._non_child), *children)
 
     def reconstruct(

But you should use the reconstruct method:

Then you can do X.reconstruct(map(rec, node.children)) I suspect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably need to fiddle with the types a bit, perhaps this:

diff --git a/python/cudf_polars/cudf_polars/dsl/nodebase.py b/python/cudf_polars/cudf_polars/dsl/nodebase.py
index 228d300f46..9fc855a764 100644
--- a/python/cudf_polars/cudf_polars/dsl/nodebase.py
+++ b/python/cudf_polars/cudf_polars/dsl/nodebase.py
@@ -8,7 +8,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypeVar
 
 if TYPE_CHECKING:
-    from collections.abc import Hashable, Sequence
+    from collections.abc import Hashable, Iterable, Sequence
 
     from typing_extensions import Self
 
@@ -40,12 +40,12 @@ class Node(Generic[T]):
     children: tuple[T, ...]
     _non_child: ClassVar[tuple[str, ...]] = ()
 
-    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any | T]:
+    def _ctor_arguments(self, children: Iterable[T]) -> Sequence[Any]:
         return (*(getattr(self, attr) for attr in self._non_child), *children)
 
     def reconstruct(
-        self, children: Sequence[T]
-    ) -> Self:  # pragma: no cover; not yet used
+        self, children: Iterable[T]
+    ) -> Self:
         """
         Rebuild this node with new children.
 

return SParHConcat(*node._ctor_arguments(map(rec, node.children)))


lower_ir_graph = CachingVisitor(_single_partition_node)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I'm using CachingVisitor correctly (or how to deal with the type hinting).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want:

def lower_ir_graph(node: ir.IR) -> ...:
    mapper = CachingVisitor(_single_partition_node)
    return mapper(node)

The cache lives for the lifetime of the mapper, so if you make this module-scope, then you'll hold on to everything forever.

@rjzamora rjzamora added 2 - In Progress Currently a work in progress and removed 5 - DO NOT MERGE Hold off on merging; see PR for details labels Nov 7, 2024
Comment on lines +148 to +152
if os.environ.get("CUDF_POLARS_DASK", "OFF").upper() == "ON":
# Use experimental Dask executor
from cudf_polars.experimental.parallel import evaluate_dask

return evaluate_dask(ir).to_polars()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: You can put arbitrary keyword arguments into the pl.GPUEngine object and handle them here

Comment on lines +1475 to +1479

def get_hashable(self) -> Hashable:
"""Hashable representation of the node."""
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, self.name, str(self.options))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is wrong, because it doesn't consider the children. What's wrong with the default?

@@ -1514,15 +1521,18 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
plc.interop.from_arrow(
pa.array(
pivotees,
type=plc.interop.to_arrow(variable_dtype),
type=plc.interop.to_arrow(schema[variable_name]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why, is it you need the schema elsewhere?

return SParHConcat(*node._ctor_arguments(map(rec, node.children)))


lower_ir_graph = CachingVisitor(_single_partition_node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want:

def lower_ir_graph(node: ir.IR) -> ...:
    mapper = CachingVisitor(_single_partition_node)
    return mapper(node)

The cache lives for the lifetime of the mapper, so if you make this module-scope, then you'll hold on to everything forever.


@_single_partition_node.register
def _(node: PythonScan, rec) -> SParPythonScan:
return SParPythonScan(*node._ctor_arguments(map(rec, node.children)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, can you apply:

diff --git a/python/cudf_polars/cudf_polars/dsl/nodebase.py b/python/cudf_polars/cudf_polars/dsl/nodebase.py
index 228d300f46..c96414ad3c 100644
--- a/python/cudf_polars/cudf_polars/dsl/nodebase.py
+++ b/python/cudf_polars/cudf_polars/dsl/nodebase.py
@@ -40,7 +40,7 @@ class Node(Generic[T]):
     children: tuple[T, ...]
     _non_child: ClassVar[tuple[str, ...]] = ()
 
-    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any | T]:
+    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any]:
         return (*(getattr(self, attr) for attr in self._non_child), *children)
 
     def reconstruct(

But you should use the reconstruct method:

Then you can do X.reconstruct(map(rec, node.children)) I suspect.


@_single_partition_node.register
def _(node: PythonScan, rec) -> SParPythonScan:
return SParPythonScan(*node._ctor_arguments(map(rec, node.children)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably need to fiddle with the types a bit, perhaps this:

diff --git a/python/cudf_polars/cudf_polars/dsl/nodebase.py b/python/cudf_polars/cudf_polars/dsl/nodebase.py
index 228d300f46..9fc855a764 100644
--- a/python/cudf_polars/cudf_polars/dsl/nodebase.py
+++ b/python/cudf_polars/cudf_polars/dsl/nodebase.py
@@ -8,7 +8,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypeVar
 
 if TYPE_CHECKING:
-    from collections.abc import Hashable, Sequence
+    from collections.abc import Hashable, Iterable, Sequence
 
     from typing_extensions import Self
 
@@ -40,12 +40,12 @@ class Node(Generic[T]):
     children: tuple[T, ...]
     _non_child: ClassVar[tuple[str, ...]] = ()
 
-    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any | T]:
+    def _ctor_arguments(self, children: Iterable[T]) -> Sequence[Any]:
         return (*(getattr(self, attr) for attr in self._non_child), *children)
 
     def reconstruct(
-        self, children: Sequence[T]
-    ) -> Self:  # pragma: no cover; not yet used
+        self, children: Iterable[T]
+    ) -> Self:
         """
         Rebuild this node with new children.
 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2 - In Progress Currently a work in progress cudf.polars Issues specific to cudf.polars improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
Status: In Progress
Status: In Progress
Development

Successfully merging this pull request may close these issues.

2 participants