Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
f0964a6
basic groupby-aggregation support
rjzamora Dec 4, 2024
1329cf1
Merge branch 'branch-25.02' into cudf-polars-multi-groupby
rjzamora Dec 4, 2024
11a03f8
Merge branch 'branch-25.02' into cudf-polars-multi-groupby
rjzamora Dec 4, 2024
a9fa486
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Dec 4, 2024
b1224a0
remove GroupbyTree
rjzamora Dec 4, 2024
385f03a
simplify lower
rjzamora Dec 6, 2024
8956215
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Dec 6, 2024
70b29b2
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Dec 19, 2024
3f04eca
cleanup
rjzamora Dec 19, 2024
e090de5
no cover
rjzamora Dec 19, 2024
24b88f2
tweak error message
rjzamora Dec 19, 2024
161a53b
Merge branch 'branch-25.02' into cudf-polars-multi-groupby
rjzamora Jan 9, 2025
69f6336
update copyright dates
rjzamora Jan 9, 2025
22cebeb
add test coverage for single-partition
rjzamora Jan 11, 2025
45ac8ec
Merge branch 'branch-25.02' into cudf-polars-multi-groupby
rjzamora Jan 11, 2025
f5205bd
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Jan 27, 2025
a7cd29f
Merge branch 'branch-25.04' into cudf-polars-multi-groupby
rjzamora Jan 29, 2025
ef79e90
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Feb 25, 2025
b8a20e6
formatting
rjzamora Feb 25, 2025
fde4231
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Feb 27, 2025
7d18e7b
add shuffle-based groupby
rjzamora Feb 27, 2025
f21e1cd
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Feb 28, 2025
ee47bd9
Add `pylibcudf.gpumemoryview` support for `len()`/`nbytes`
pentschev Feb 28, 2025
ccd1029
improve test coverage
rjzamora Feb 28, 2025
33bf65c
Add gpumemoryview tests for `len()`/`nbytes`
pentschev Feb 28, 2025
ac73bab
Add `gpumemoryview.__cuda_array_interface__` tests
pentschev Feb 28, 2025
41844b7
Update stubs
pentschev Feb 28, 2025
df902aa
Merge branch 'branch-25.04' into pylibcudf-gpumemoryview-len
pentschev Feb 28, 2025
6dfb397
add ConfigOptions class
rjzamora Feb 28, 2025
3f00203
Fix typo in `__cuda_array_interface__` name
pentschev Feb 28, 2025
6a13323
Merge remote-tracking branch 'origin/pylibcudf-gpumemoryview-len' int…
pentschev Feb 28, 2025
6e088f0
Merge remote-tracking branch 'pentschev/pylibcudf-gpumemoryview-len' …
rjzamora Feb 28, 2025
af82051
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 3, 2025
0a13145
check for periods
rjzamora Mar 3, 2025
a977df6
roll back unnecessary change
rjzamora Mar 3, 2025
309757c
Merge branch 'branch-25.04' into cudf-polars-multi-groupby
rjzamora Mar 3, 2025
0bb7211
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 4, 2025
e445e37
remove copy API and make ConfigOptions immutable
rjzamora Mar 4, 2025
320d6fb
Merge branch 'branch-25.04' into cudf-polars-config-options
rjzamora Mar 4, 2025
75a7257
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 4, 2025
e9abe33
use typing_extensions for older python versions
rjzamora Mar 4, 2025
eb7a79a
formatting
rjzamora Mar 4, 2025
385c68a
break out the decomposition of a single groupby request into a stand-…
rjzamora Mar 4, 2025
7c96482
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 4, 2025
fe3ca7c
break out the decomposition of a single groupby request into a stand-…
rjzamora Mar 4, 2025
16cf883
Merge branch 'branch-25.04' into cudf-polars-multi-groupby
rjzamora Mar 7, 2025
e04a7ed
Merge branch 'branch-25.04' into cudf-polars-config-options
rjzamora Mar 7, 2025
9f9c097
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 11, 2025
f959988
address schema and maintain_order issues
rjzamora Mar 11, 2025
bca71d6
use lawrences suggestions
rjzamora Mar 11, 2025
568a3f0
Merge branch 'branch-25.04' into cudf-polars-multi-groupby
rjzamora Mar 11, 2025
ef10e25
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 11, 2025
7f90ded
address small code-review comments
rjzamora Mar 11, 2025
23f5b63
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 11, 2025
ffb69c7
align with 17503
rjzamora Mar 11, 2025
cb13d63
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 11, 2025
83f9790
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Mar 13, 2025
e6f32d7
add back copy on init
rjzamora Mar 13, 2025
b3e77fe
fix
rjzamora Mar 13, 2025
fb6f1da
add back missing test
rjzamora Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 0 additions & 41 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,46 +202,6 @@ def _callback(
raise ValueError(f"Unknown executor '{executor}'")


def validate_config_options(config: dict) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to ConfigOptions.validate

"""
Validate the configuration options for the GPU engine.

Parameters
----------
config
Configuration options to validate.

Raises
------
ValueError
If the configuration contains unsupported options.
"""
if unsupported := (
config.keys()
- {"raise_on_fail", "parquet_options", "executor", "executor_options"}
):
raise ValueError(
f"Engine configuration contains unsupported settings: {unsupported}"
)
assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset(
config.get("parquet_options", {})
)

# Validate executor_options
executor = config.get("executor", "pylibcudf")
if executor == "dask-experimental":
unsupported = config.get("executor_options", {}).keys() - {
"max_rows_per_partition",
"parquet_blocksize",
"cardinality_factor",
"groupby_n_ary",
}
else:
unsupported = config.get("executor_options", {}).keys()
if unsupported:
raise ValueError(f"Unsupported executor_options for {executor}: {unsupported}")


def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
"""
A post optimization callback that attempts to execute the plan with cudf.
Expand Down Expand Up @@ -269,7 +229,6 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
memory_resource = config.memory_resource
raise_on_fail = config.config.get("raise_on_fail", False)
executor = config.config.get("executor", None)
validate_config_options(config.config)

with nvtx.annotate(message="ConvertIR", domain="cudf_polars"):
translator = Translator(nt, config)
Expand Down
44 changes: 17 additions & 27 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from polars.polars import _expr_nodes as pl_expr

from cudf_polars.typing import Schema, Slice as Zlice
from cudf_polars.utils.config import ConfigOptions


__all__ = [
Expand Down Expand Up @@ -284,7 +285,7 @@ class Scan(IR):
"""Reader-specific options, as dictionary."""
cloud_options: dict[str, Any] | None
"""Cloud-related authentication options, currently ignored."""
config_options: dict[str, Any]
config_options: ConfigOptions
"""GPU-specific configuration options"""
paths: list[str]
"""List of paths to read from."""
Expand All @@ -308,7 +309,7 @@ def __init__(
typ: str,
reader_options: dict[str, Any],
cloud_options: dict[str, Any] | None,
config_options: dict[str, Any],
config_options: ConfigOptions,
paths: list[str],
with_columns: list[str] | None,
skip_rows: int,
Expand Down Expand Up @@ -413,7 +414,7 @@ def get_hashable(self) -> Hashable:
self.typ,
json.dumps(self.reader_options),
json.dumps(self.cloud_options),
json.dumps(self.config_options),
self.config_options,
tuple(self.paths),
tuple(self.with_columns) if self.with_columns is not None else None,
self.skip_rows,
Expand All @@ -428,7 +429,7 @@ def do_evaluate(
schema: Schema,
typ: str,
reader_options: dict[str, Any],
config_options: dict[str, Any],
config_options: ConfigOptions,
paths: list[str],
with_columns: list[str] | None,
skip_rows: int,
Expand Down Expand Up @@ -516,8 +517,7 @@ def do_evaluate(
colnames[0],
)
elif typ == "parquet":
parquet_options = config_options.get("parquet_options", {})
if parquet_options.get("chunked", True):
if config_options.get("parquet_options.chunked", default=True):
options = plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(paths)
).build()
Expand All @@ -534,11 +534,13 @@ def do_evaluate(
options.set_columns(with_columns)
reader = plc.io.parquet.ChunkedParquetReader(
options,
chunk_read_limit=parquet_options.get(
"chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE
chunk_read_limit=config_options.get(
"parquet_options.chunk_read_limit",
default=cls.PARQUET_DEFAULT_CHUNK_SIZE,
),
pass_read_limit=parquet_options.get(
"pass_read_limit", cls.PARQUET_DEFAULT_PASS_LIMIT
pass_read_limit=config_options.get(
"parquet_options.pass_read_limit",
default=cls.PARQUET_DEFAULT_PASS_LIMIT,
),
)
chk = reader.read_chunk()
Expand Down Expand Up @@ -702,15 +704,15 @@ class DataFrameScan(IR):
"""Polars LazyFrame object."""
projection: tuple[str, ...] | None
"""List of columns to project out."""
config_options: dict[str, Any]
config_options: ConfigOptions
"""GPU-specific configuration options"""

def __init__(
self,
schema: Schema,
df: Any,
projection: Sequence[str] | None,
config_options: dict[str, Any],
config_options: ConfigOptions,
):
self.schema = schema
self.df = df
Expand All @@ -736,7 +738,7 @@ def get_hashable(self) -> Hashable:
schema_hash,
id(self.df),
self.projection,
json.dumps(self.config_options),
self.config_options,
)

@classmethod
Expand Down Expand Up @@ -876,7 +878,7 @@ def __init__(self, polars_groupby_options: Any):
"""Preserve order in groupby."""
options: GroupbyOptions
"""Arbitrary options."""
config_options: dict[str, Any]
config_options: ConfigOptions
"""GPU-specific configuration options"""

def __init__(
Expand All @@ -886,7 +888,7 @@ def __init__(
agg_requests: Sequence[expr.NamedExpr],
maintain_order: bool, # noqa: FBT001
options: Any,
config_options: dict[str, Any],
config_options: ConfigOptions,
df: IR,
):
self.schema = schema
Expand All @@ -912,18 +914,6 @@ def __init__(
self.AggInfos(self.agg_requests),
)

def get_hashable(self) -> Hashable:
"""Hashable representation of the node."""
return (
type(self),
tuple(self.schema.items()),
self.keys,
self.maintain_order,
self.options,
json.dumps(self.config_options),
self.children,
)

@staticmethod
def check_agg(agg: expr.Expr) -> int:
"""
Expand Down
15 changes: 8 additions & 7 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

import copy
import functools
import json
from contextlib import AbstractContextManager, nullcontext
Expand All @@ -23,7 +24,7 @@
from cudf_polars.dsl import expr, ir
from cudf_polars.dsl.to_ast import insert_colrefs
from cudf_polars.typing import NodeTraverser
from cudf_polars.utils import dtypes, sorting
from cudf_polars.utils import config, dtypes, sorting

if TYPE_CHECKING:
from polars import GPUEngine
Expand All @@ -41,13 +42,13 @@ class Translator:
----------
visitor
Polars NodeTraverser object
config
engine
GPU engine configuration.
"""

def __init__(self, visitor: NodeTraverser, config: GPUEngine):
def __init__(self, visitor: NodeTraverser, engine: GPUEngine):
self.visitor = visitor
self.config = config
self.config_options = config.ConfigOptions(copy.deepcopy(engine.config))
self.errors: list[Exception] = []

def translate_ir(self, *, n: int | None = None) -> ir.IR:
Expand Down Expand Up @@ -233,7 +234,7 @@ def _(
typ,
reader_options,
cloud_options,
translator.config.config.copy(),
translator.config_options,
node.paths,
with_columns,
skip_rows,
Expand All @@ -260,7 +261,7 @@ def _(
schema,
node.df,
node.projection,
translator.config.config.copy(),
translator.config_options,
)


Expand Down Expand Up @@ -288,7 +289,7 @@ def _(
aggs,
node.maintain_order,
node.options,
translator.config.config.copy(),
translator.config_options,
inp,
)

Expand Down
8 changes: 4 additions & 4 deletions python/cudf_polars/cudf_polars/experimental/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ def _(
groupby_key_columns = [ne.name for ne in ir.keys]
cardinality_factor = {
c: min(f, 1.0)
for c, f in ir.config_options.get("executor_options", {})
.get("cardinality_factor", {})
.items()
for c, f in ir.config_options.get(
"executor_options.cardinality_factor", default={}
).items()
if c in groupby_key_columns
}
if cardinality_factor:
Expand Down Expand Up @@ -278,7 +278,7 @@ def _(

# Simple N-ary tree reduction
j = 0
n_ary = ir.config_options.get("executor_options", {}).get("groupby_n_ary", 32)
n_ary = ir.config_options.get("executor_options.groupby_n_ary", default=32)
graph: MutableMapping[Any, Any] = {}
name = get_key_name(ir)
keys: list[Any] = [(child_name, i) for i in range(child_count)]
Expand Down
22 changes: 12 additions & 10 deletions python/cudf_polars/cudf_polars/experimental/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.experimental.dispatch import LowerIRTransformer
from cudf_polars.typing import Schema
from cudf_polars.utils.config import ConfigOptions


@lower_ir_node.register(DataFrameScan)
def _(
ir: DataFrameScan, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
rows_per_partition = ir.config_options.get("executor_options", {}).get(
"max_rows_per_partition", 1_000_000
rows_per_partition = ir.config_options.get(
"executor_options.max_rows_per_partition",
default=1_000_000,
)

nrows = max(ir.df.shape()[0], 1)
Expand Down Expand Up @@ -91,8 +93,10 @@ def from_scan(ir: Scan) -> ScanPartitionPlan:
"""Extract the partitioning plan of a Scan operation."""
if ir.typ == "parquet":
# TODO: Use system info to set default blocksize
parallel_options = ir.config_options.get("executor_options", {})
blocksize: int = parallel_options.get("parquet_blocksize", 1024**3)
blocksize: int = ir.config_options.get(
"executor_options.parquet_blocksize",
default=1024**3,
)
stats = _sample_pq_statistics(ir)
file_size = sum(float(stats[column]) for column in ir.schema)
if file_size > 0:
Expand Down Expand Up @@ -168,7 +172,7 @@ def do_evaluate(
schema: Schema,
typ: str,
reader_options: dict[str, Any],
config_options: dict[str, Any],
config_options: ConfigOptions,
paths: list[str],
with_columns: list[str] | None,
skip_rows: int,
Expand Down Expand Up @@ -270,11 +274,9 @@ def _(
paths = list(ir.paths)
if plan.flavor == ScanPartitionFlavor.SPLIT_FILES:
# Disable chunked reader when splitting files
config_options = ir.config_options.copy()
config_options["parquet_options"] = config_options.get(
"parquet_options", {}
).copy()
config_options["parquet_options"]["chunked"] = False
config_options = ir.config_options.set(
name="parquet_options.chunked", value=False
)

slices: list[SplitScan] = []
for path in paths:
Expand Down
Loading
Loading