Skip to content

Commit 63459ca

Browse files
authored
Add ConfigOptions convenience class to cudf-polars (#18137)
I propose that we add a simple management utility for `GPUEngine` configuration options in cudf-polars. In order to support a variety of configurable groupby/shuffle/sort/join options for multi-gpu execution, we will want to make it easy to pass through these options to `IR` nodes at translation time. We will also want it to be "easy" set/get specific options at traversal/evaluation time. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: #18137
1 parent c2c6d1f commit 63459ca

File tree

7 files changed

+179
-97
lines changed

7 files changed

+179
-97
lines changed

python/cudf_polars/cudf_polars/callback.py

-41
Original file line numberDiff line numberDiff line change
@@ -202,46 +202,6 @@ def _callback(
202202
raise ValueError(f"Unknown executor '{executor}'")
203203

204204

205-
def validate_config_options(config: dict) -> None:
206-
"""
207-
Validate the configuration options for the GPU engine.
208-
209-
Parameters
210-
----------
211-
config
212-
Configuration options to validate.
213-
214-
Raises
215-
------
216-
ValueError
217-
If the configuration contains unsupported options.
218-
"""
219-
if unsupported := (
220-
config.keys()
221-
- {"raise_on_fail", "parquet_options", "executor", "executor_options"}
222-
):
223-
raise ValueError(
224-
f"Engine configuration contains unsupported settings: {unsupported}"
225-
)
226-
assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset(
227-
config.get("parquet_options", {})
228-
)
229-
230-
# Validate executor_options
231-
executor = config.get("executor", "pylibcudf")
232-
if executor == "dask-experimental":
233-
unsupported = config.get("executor_options", {}).keys() - {
234-
"max_rows_per_partition",
235-
"parquet_blocksize",
236-
"cardinality_factor",
237-
"groupby_n_ary",
238-
}
239-
else:
240-
unsupported = config.get("executor_options", {}).keys()
241-
if unsupported:
242-
raise ValueError(f"Unsupported executor_options for {executor}: {unsupported}")
243-
244-
245205
def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
246206
"""
247207
A post optimization callback that attempts to execute the plan with cudf.
@@ -269,7 +229,6 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
269229
memory_resource = config.memory_resource
270230
raise_on_fail = config.config.get("raise_on_fail", False)
271231
executor = config.config.get("executor", None)
272-
validate_config_options(config.config)
273232

274233
with nvtx.annotate(message="ConvertIR", domain="cudf_polars"):
275234
translator = Translator(nt, config)

python/cudf_polars/cudf_polars/dsl/ir.py

+17-27
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from polars.polars import _expr_nodes as pl_expr
4040

4141
from cudf_polars.typing import Schema, Slice as Zlice
42+
from cudf_polars.utils.config import ConfigOptions
4243

4344

4445
__all__ = [
@@ -284,7 +285,7 @@ class Scan(IR):
284285
"""Reader-specific options, as dictionary."""
285286
cloud_options: dict[str, Any] | None
286287
"""Cloud-related authentication options, currently ignored."""
287-
config_options: dict[str, Any]
288+
config_options: ConfigOptions
288289
"""GPU-specific configuration options"""
289290
paths: list[str]
290291
"""List of paths to read from."""
@@ -308,7 +309,7 @@ def __init__(
308309
typ: str,
309310
reader_options: dict[str, Any],
310311
cloud_options: dict[str, Any] | None,
311-
config_options: dict[str, Any],
312+
config_options: ConfigOptions,
312313
paths: list[str],
313314
with_columns: list[str] | None,
314315
skip_rows: int,
@@ -413,7 +414,7 @@ def get_hashable(self) -> Hashable:
413414
self.typ,
414415
json.dumps(self.reader_options),
415416
json.dumps(self.cloud_options),
416-
json.dumps(self.config_options),
417+
self.config_options,
417418
tuple(self.paths),
418419
tuple(self.with_columns) if self.with_columns is not None else None,
419420
self.skip_rows,
@@ -428,7 +429,7 @@ def do_evaluate(
428429
schema: Schema,
429430
typ: str,
430431
reader_options: dict[str, Any],
431-
config_options: dict[str, Any],
432+
config_options: ConfigOptions,
432433
paths: list[str],
433434
with_columns: list[str] | None,
434435
skip_rows: int,
@@ -516,8 +517,7 @@ def do_evaluate(
516517
colnames[0],
517518
)
518519
elif typ == "parquet":
519-
parquet_options = config_options.get("parquet_options", {})
520-
if parquet_options.get("chunked", True):
520+
if config_options.get("parquet_options.chunked", default=True):
521521
options = plc.io.parquet.ParquetReaderOptions.builder(
522522
plc.io.SourceInfo(paths)
523523
).build()
@@ -534,11 +534,13 @@ def do_evaluate(
534534
options.set_columns(with_columns)
535535
reader = plc.io.parquet.ChunkedParquetReader(
536536
options,
537-
chunk_read_limit=parquet_options.get(
538-
"chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE
537+
chunk_read_limit=config_options.get(
538+
"parquet_options.chunk_read_limit",
539+
default=cls.PARQUET_DEFAULT_CHUNK_SIZE,
539540
),
540-
pass_read_limit=parquet_options.get(
541-
"pass_read_limit", cls.PARQUET_DEFAULT_PASS_LIMIT
541+
pass_read_limit=config_options.get(
542+
"parquet_options.pass_read_limit",
543+
default=cls.PARQUET_DEFAULT_PASS_LIMIT,
542544
),
543545
)
544546
chk = reader.read_chunk()
@@ -702,15 +704,15 @@ class DataFrameScan(IR):
702704
"""Polars LazyFrame object."""
703705
projection: tuple[str, ...] | None
704706
"""List of columns to project out."""
705-
config_options: dict[str, Any]
707+
config_options: ConfigOptions
706708
"""GPU-specific configuration options"""
707709

708710
def __init__(
709711
self,
710712
schema: Schema,
711713
df: Any,
712714
projection: Sequence[str] | None,
713-
config_options: dict[str, Any],
715+
config_options: ConfigOptions,
714716
):
715717
self.schema = schema
716718
self.df = df
@@ -736,7 +738,7 @@ def get_hashable(self) -> Hashable:
736738
schema_hash,
737739
id(self.df),
738740
self.projection,
739-
json.dumps(self.config_options),
741+
self.config_options,
740742
)
741743

742744
@classmethod
@@ -876,7 +878,7 @@ def __init__(self, polars_groupby_options: Any):
876878
"""Preserve order in groupby."""
877879
options: GroupbyOptions
878880
"""Arbitrary options."""
879-
config_options: dict[str, Any]
881+
config_options: ConfigOptions
880882
"""GPU-specific configuration options"""
881883

882884
def __init__(
@@ -886,7 +888,7 @@ def __init__(
886888
agg_requests: Sequence[expr.NamedExpr],
887889
maintain_order: bool, # noqa: FBT001
888890
options: Any,
889-
config_options: dict[str, Any],
891+
config_options: ConfigOptions,
890892
df: IR,
891893
):
892894
self.schema = schema
@@ -912,18 +914,6 @@ def __init__(
912914
self.AggInfos(self.agg_requests),
913915
)
914916

915-
def get_hashable(self) -> Hashable:
916-
"""Hashable representation of the node."""
917-
return (
918-
type(self),
919-
tuple(self.schema.items()),
920-
self.keys,
921-
self.maintain_order,
922-
self.options,
923-
json.dumps(self.config_options),
924-
self.children,
925-
)
926-
927917
@staticmethod
928918
def check_agg(agg: expr.Expr) -> int:
929919
"""

python/cudf_polars/cudf_polars/dsl/translate.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from __future__ import annotations
77

8+
import copy
89
import functools
910
import json
1011
from contextlib import AbstractContextManager, nullcontext
@@ -23,7 +24,7 @@
2324
from cudf_polars.dsl import expr, ir
2425
from cudf_polars.dsl.to_ast import insert_colrefs
2526
from cudf_polars.typing import NodeTraverser
26-
from cudf_polars.utils import dtypes, sorting
27+
from cudf_polars.utils import config, dtypes, sorting
2728

2829
if TYPE_CHECKING:
2930
from polars import GPUEngine
@@ -41,13 +42,13 @@ class Translator:
4142
----------
4243
visitor
4344
Polars NodeTraverser object
44-
config
45+
engine
4546
GPU engine configuration.
4647
"""
4748

48-
def __init__(self, visitor: NodeTraverser, config: GPUEngine):
49+
def __init__(self, visitor: NodeTraverser, engine: GPUEngine):
4950
self.visitor = visitor
50-
self.config = config
51+
self.config_options = config.ConfigOptions(copy.deepcopy(engine.config))
5152
self.errors: list[Exception] = []
5253

5354
def translate_ir(self, *, n: int | None = None) -> ir.IR:
@@ -233,7 +234,7 @@ def _(
233234
typ,
234235
reader_options,
235236
cloud_options,
236-
translator.config.config.copy(),
237+
translator.config_options,
237238
node.paths,
238239
with_columns,
239240
skip_rows,
@@ -260,7 +261,7 @@ def _(
260261
schema,
261262
node.df,
262263
node.projection,
263-
translator.config.config.copy(),
264+
translator.config_options,
264265
)
265266

266267

@@ -288,7 +289,7 @@ def _(
288289
aggs,
289290
node.maintain_order,
290291
node.options,
291-
translator.config.config.copy(),
292+
translator.config_options,
292293
inp,
293294
)
294295

python/cudf_polars/cudf_polars/experimental/groupby.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ def _(
166166
groupby_key_columns = [ne.name for ne in ir.keys]
167167
cardinality_factor = {
168168
c: min(f, 1.0)
169-
for c, f in ir.config_options.get("executor_options", {})
170-
.get("cardinality_factor", {})
171-
.items()
169+
for c, f in ir.config_options.get(
170+
"executor_options.cardinality_factor", default={}
171+
).items()
172172
if c in groupby_key_columns
173173
}
174174
if cardinality_factor:
@@ -278,7 +278,7 @@ def _(
278278

279279
# Simple N-ary tree reduction
280280
j = 0
281-
n_ary = ir.config_options.get("executor_options", {}).get("groupby_n_ary", 32)
281+
n_ary = ir.config_options.get("executor_options.groupby_n_ary", default=32)
282282
graph: MutableMapping[Any, Any] = {}
283283
name = get_key_name(ir)
284284
keys: list[Any] = [(child_name, i) for i in range(child_count)]

python/cudf_polars/cudf_polars/experimental/io.py

+12-10
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@
2222
from cudf_polars.dsl.expr import NamedExpr
2323
from cudf_polars.experimental.dispatch import LowerIRTransformer
2424
from cudf_polars.typing import Schema
25+
from cudf_polars.utils.config import ConfigOptions
2526

2627

2728
@lower_ir_node.register(DataFrameScan)
2829
def _(
2930
ir: DataFrameScan, rec: LowerIRTransformer
3031
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
31-
rows_per_partition = ir.config_options.get("executor_options", {}).get(
32-
"max_rows_per_partition", 1_000_000
32+
rows_per_partition = ir.config_options.get(
33+
"executor_options.max_rows_per_partition",
34+
default=1_000_000,
3335
)
3436

3537
nrows = max(ir.df.shape()[0], 1)
@@ -91,8 +93,10 @@ def from_scan(ir: Scan) -> ScanPartitionPlan:
9193
"""Extract the partitioning plan of a Scan operation."""
9294
if ir.typ == "parquet":
9395
# TODO: Use system info to set default blocksize
94-
parallel_options = ir.config_options.get("executor_options", {})
95-
blocksize: int = parallel_options.get("parquet_blocksize", 1024**3)
96+
blocksize: int = ir.config_options.get(
97+
"executor_options.parquet_blocksize",
98+
default=1024**3,
99+
)
96100
stats = _sample_pq_statistics(ir)
97101
file_size = sum(float(stats[column]) for column in ir.schema)
98102
if file_size > 0:
@@ -168,7 +172,7 @@ def do_evaluate(
168172
schema: Schema,
169173
typ: str,
170174
reader_options: dict[str, Any],
171-
config_options: dict[str, Any],
175+
config_options: ConfigOptions,
172176
paths: list[str],
173177
with_columns: list[str] | None,
174178
skip_rows: int,
@@ -270,11 +274,9 @@ def _(
270274
paths = list(ir.paths)
271275
if plan.flavor == ScanPartitionFlavor.SPLIT_FILES:
272276
# Disable chunked reader when splitting files
273-
config_options = ir.config_options.copy()
274-
config_options["parquet_options"] = config_options.get(
275-
"parquet_options", {}
276-
).copy()
277-
config_options["parquet_options"]["chunked"] = False
277+
config_options = ir.config_options.set(
278+
name="parquet_options.chunked", value=False
279+
)
278280

279281
slices: list[SplitScan] = []
280282
for path in paths:

0 commit comments

Comments
 (0)