Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM][WIP] Single-partition Dask executor for cuDF-Polars #17262

Draft
wants to merge 4 commits into
base: branch-24.12
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ def _callback(
set_device(device),
set_memory_resource(memory_resource),
):
if os.environ.get("CUDF_POLARS_DASK", "OFF").upper() == "ON":
# Use experimental Dask executor
from cudf_polars.experimental.parallel import evaluate_dask

return evaluate_dask(ir).to_polars()
Comment on lines +148 to +152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: You can put arbitrary keyword arguments into the pl.GPUEngine object and handle them here


return ir.evaluate(cache={}).to_polars()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(
*by: Expr,
) -> None:
self.dtype = dtype
self.options = options
self.options = (options[0], tuple(options[1]), tuple(options[2]))
self.children = (column, *by)

def do_evaluate(
Expand Down
26 changes: 18 additions & 8 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -1468,13 +1468,20 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
self.options = (
tuple(indices),
tuple(pivotees),
(variable_name, schema[variable_name]),
(value_name, schema[value_name]),
variable_name,
value_name,
)
self._non_child_args = (name, self.options)
self._non_child_args = (schema, name, self.options)

def get_hashable(self) -> Hashable:
"""Hashable representation of the node."""
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, self.name, str(self.options))
Comment on lines +1475 to +1479
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is wrong, because it doesn't consider the children. What's wrong with the default?


@classmethod
def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
def do_evaluate(
cls, schema: Schema, name: str, options: Any, df: DataFrame
) -> DataFrame:
"""Evaluate and return a dataframe."""
if name == "rechunk":
# No-op in our data model
Expand All @@ -1496,8 +1503,8 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
(
indices,
pivotees,
(variable_name, variable_dtype),
(value_name, value_dtype),
variable_name,
value_name,
) = options
npiv = len(pivotees)
index_columns = [
Expand All @@ -1514,15 +1521,18 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
plc.interop.from_arrow(
pa.array(
pivotees,
type=plc.interop.to_arrow(variable_dtype),
type=plc.interop.to_arrow(schema[variable_name]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why, is it you need the schema elsewhere?

),
)
]
),
df.num_rows,
).columns()
value_column = plc.concatenate.concatenate(
[df.column_map[pivotee].astype(value_dtype).obj for pivotee in pivotees]
[
df.column_map[pivotee].astype(schema[value_name]).obj
for pivotee in pivotees
]
)
return DataFrame(
[
Expand Down
8 changes: 8 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Experimental features, which can change without any depreciation period."""

from __future__ import annotations

__all__: list[str] = []
73 changes: 73 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Partitioned LogicalPlan nodes."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable

if TYPE_CHECKING:
from collections.abc import MutableMapping

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.ir import IR


class PartitionInfo:
"""
Partitioning information.
This class only tracks the partition count (for now).
"""

__slots__ = ("npartitions",)

def __init__(self, npartitions: int):
self.npartitions = npartitions


@runtime_checkable
class PartitionedIR(Protocol):
"""
Partitioned IR Protocol.
IR nodes must satistfy this protocol to generate a valid task graph.
"""

_key: str
_parts: PartitionInfo

def _tasks(self) -> MutableMapping:
raise NotImplementedError()


def task_graph(_ir: IR) -> tuple[MutableMapping[str, Any], str]:
"""Construct a Dask-compatible task graph."""
from cudf_polars.dsl.traversal import traversal
from cudf_polars.experimental.single import lower_ir_graph

# Rewrite IR graph into a ParIR graph
ir: PartitionedIR = lower_ir_graph(_ir)

dsk = {
k: v for layer in [n._tasks() for n in traversal(ir)] for k, v in layer.items()
}

# Add task to reduce output partitions
npartitions = ir._parts.npartitions
key_name = ir._key
if npartitions == 1:
dsk[key_name] = (key_name, 0)
else:
# Need DataFrame.concat support
raise NotImplementedError()

return dsk, key_name


def evaluate_dask(ir: IR) -> DataFrame:
"""Evaluate an IR graph with Dask."""
from dask import get

dsk, key = task_graph(ir)
return get(dsk, key)
216 changes: 216 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/single.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Single-partition LogicalPlan nodes."""

from __future__ import annotations

from functools import cached_property, singledispatch
from typing import TYPE_CHECKING

from cudf_polars.dsl.ir import (
IR,
Cache,
DataFrameScan,
Distinct,
Filter,
GroupBy,
HConcat,
HStack,
Join,
MapFunction,
Projection,
PythonScan,
Reduce,
Scan,
Select,
Slice,
Sort,
Union,
)
from cudf_polars.dsl.traversal import CachingVisitor
from cudf_polars.experimental.parallel import PartitionInfo

if TYPE_CHECKING:
from cudf_polars.dsl.ir import IR


class SPartitionwise:
"""Single partition-wise PartitionedIR."""

@cached_property
def _key(self):
return f"{type(self).__name__.lower()}-{hash(self)}"

def _tasks(self):
return {
(self._key, 0): (
self.do_evaluate,
*self._non_child_args,
*((child._key, 0) for child in self.children),
)
}

@cached_property
def _parts(self) -> PartitionInfo:
return PartitionInfo(npartitions=1)


class SParPythonScan(PythonScan, SPartitionwise):
"""Single-partition demo class."""


class SParScan(Scan, SPartitionwise):
"""Single-partition demo class."""


class SParCache(Cache, SPartitionwise):
"""Single-partition demo class."""


class SParDataFrameScan(DataFrameScan, SPartitionwise):
"""Single-partition demo class."""


class SParSelect(Select, SPartitionwise):
"""Single-partition demo class."""


class SParReduce(Reduce, SPartitionwise):
"""Single-partition demo class."""


class SParGroupBy(GroupBy, SPartitionwise):
"""Single-partition demo class."""


class SParJoin(Join, SPartitionwise):
"""Single-partition demo class."""


class SParHStack(HStack, SPartitionwise):
"""Single-partition demo class."""


class SParDistinct(Distinct, SPartitionwise):
"""Single-partition demo class."""


class SParSort(Sort, SPartitionwise):
"""Single-partition demo class."""


class SParSlice(Slice, SPartitionwise):
"""Single-partition demo class."""


class SParFilter(Filter, SPartitionwise):
"""Single-partition demo class."""


class SParProjection(Projection, SPartitionwise):
"""Single-partition demo class."""


class SParMapFunction(MapFunction, SPartitionwise):
"""Single-partition demo class."""


class SParUnion(Union, SPartitionwise):
"""Single-partition demo class."""


class SParHConcat(HConcat, SPartitionwise):
"""Single-partition demo class."""


@singledispatch
def _single_partition_node(node: IR, rec) -> SPartitionwise:
raise NotImplementedError(f"Cannot convert {type(node)} to PartitionedIR.")


@_single_partition_node.register
def _(node: PythonScan, rec) -> SParPythonScan:
return SParPythonScan(*node._ctor_arguments(map(rec, node.children)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wence- - This PR is still rough, so it may not make sense to debug type-hinting problems yet. That said, I'm getting a ton of errors that I don't know how to address like: error: Argument 1 to "SParPythonScan" has incompatible type "*Sequence[Any | IR]"; expected "Mapping[str, Any]" [arg-type]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, can you apply:

diff --git a/python/cudf_polars/cudf_polars/dsl/nodebase.py b/python/cudf_polars/cudf_polars/dsl/nodebase.py
index 228d300f46..c96414ad3c 100644
--- a/python/cudf_polars/cudf_polars/dsl/nodebase.py
+++ b/python/cudf_polars/cudf_polars/dsl/nodebase.py
@@ -40,7 +40,7 @@ class Node(Generic[T]):
     children: tuple[T, ...]
     _non_child: ClassVar[tuple[str, ...]] = ()
 
-    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any | T]:
+    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any]:
         return (*(getattr(self, attr) for attr in self._non_child), *children)
 
     def reconstruct(

But you should use the reconstruct method:

Then you can do X.reconstruct(map(rec, node.children)) I suspect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably need to fiddle with the types a bit, perhaps this:

diff --git a/python/cudf_polars/cudf_polars/dsl/nodebase.py b/python/cudf_polars/cudf_polars/dsl/nodebase.py
index 228d300f46..9fc855a764 100644
--- a/python/cudf_polars/cudf_polars/dsl/nodebase.py
+++ b/python/cudf_polars/cudf_polars/dsl/nodebase.py
@@ -8,7 +8,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypeVar
 
 if TYPE_CHECKING:
-    from collections.abc import Hashable, Sequence
+    from collections.abc import Hashable, Iterable, Sequence
 
     from typing_extensions import Self
 
@@ -40,12 +40,12 @@ class Node(Generic[T]):
     children: tuple[T, ...]
     _non_child: ClassVar[tuple[str, ...]] = ()
 
-    def _ctor_arguments(self, children: Sequence[T]) -> Sequence[Any | T]:
+    def _ctor_arguments(self, children: Iterable[T]) -> Sequence[Any]:
         return (*(getattr(self, attr) for attr in self._non_child), *children)
 
     def reconstruct(
-        self, children: Sequence[T]
-    ) -> Self:  # pragma: no cover; not yet used
+        self, children: Iterable[T]
+    ) -> Self:
         """
         Rebuild this node with new children.
 



@_single_partition_node.register
def _(node: Scan, rec) -> SParScan:
return SParScan(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: DataFrameScan, rec) -> SParDataFrameScan:
return SParDataFrameScan(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Cache, rec) -> SParCache:
return SParCache(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Reduce, rec) -> SParReduce:
return SParReduce(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Select, rec) -> SParSelect:
return SParSelect(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: GroupBy, rec) -> SParGroupBy:
return SParGroupBy(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Join, rec) -> SParJoin:
return SParJoin(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: HStack, rec) -> SParHStack:
return SParHStack(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Distinct, rec) -> SParDistinct:
return SParDistinct(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Sort, rec) -> SParSort:
return SParSort(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Slice, rec) -> SParSlice:
return SParSlice(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Filter, rec) -> SParFilter:
return SParFilter(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Projection, rec) -> SParProjection:
return SParProjection(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: MapFunction, rec) -> SParMapFunction:
return SParMapFunction(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: Union, rec) -> SParUnion:
return SParUnion(*node._ctor_arguments(map(rec, node.children)))


@_single_partition_node.register
def _(node: HConcat, rec) -> SParHConcat:
return SParHConcat(*node._ctor_arguments(map(rec, node.children)))


lower_ir_graph = CachingVisitor(_single_partition_node)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I'm using CachingVisitor correctly (or how to deal with the type hinting).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want:

def lower_ir_graph(node: ir.IR) -> ...:
    mapper = CachingVisitor(_single_partition_node)
    return mapper(node)

The cache lives for the lifetime of the mapper, so if you make this module-scope, then you'll hold on to everything forever.

Loading