diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 3536c9345dc..1eb17d20806 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -202,46 +202,6 @@ def _callback( raise ValueError(f"Unknown executor '{executor}'") -def validate_config_options(config: dict) -> None: - """ - Validate the configuration options for the GPU engine. - - Parameters - ---------- - config - Configuration options to validate. - - Raises - ------ - ValueError - If the configuration contains unsupported options. - """ - if unsupported := ( - config.keys() - - {"raise_on_fail", "parquet_options", "executor", "executor_options"} - ): - raise ValueError( - f"Engine configuration contains unsupported settings: {unsupported}" - ) - assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( - config.get("parquet_options", {}) - ) - - # Validate executor_options - executor = config.get("executor", "pylibcudf") - if executor == "dask-experimental": - unsupported = config.get("executor_options", {}).keys() - { - "max_rows_per_partition", - "parquet_blocksize", - "cardinality_factor", - "groupby_n_ary", - } - else: - unsupported = config.get("executor_options", {}).keys() - if unsupported: - raise ValueError(f"Unsupported executor_options for {executor}: {unsupported}") - - def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: """ A post optimization callback that attempts to execute the plan with cudf. @@ -269,7 +229,6 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: memory_resource = config.memory_resource raise_on_fail = config.config.get("raise_on_fail", False) executor = config.config.get("executor", None) - validate_config_options(config.config) with nvtx.annotate(message="ConvertIR", domain="cudf_polars"): translator = Translator(nt, config) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 34e855735a8..f777d4c0df8 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -39,6 +39,7 @@ from polars.polars import _expr_nodes as pl_expr from cudf_polars.typing import Schema, Slice as Zlice + from cudf_polars.utils.config import ConfigOptions __all__ = [ @@ -284,7 +285,7 @@ class Scan(IR): """Reader-specific options, as dictionary.""" cloud_options: dict[str, Any] | None """Cloud-related authentication options, currently ignored.""" - config_options: dict[str, Any] + config_options: ConfigOptions """GPU-specific configuration options""" paths: list[str] """List of paths to read from.""" @@ -308,7 +309,7 @@ def __init__( typ: str, reader_options: dict[str, Any], cloud_options: dict[str, Any] | None, - config_options: dict[str, Any], + config_options: ConfigOptions, paths: list[str], with_columns: list[str] | None, skip_rows: int, @@ -413,7 +414,7 @@ def get_hashable(self) -> Hashable: self.typ, json.dumps(self.reader_options), json.dumps(self.cloud_options), - json.dumps(self.config_options), + self.config_options, tuple(self.paths), tuple(self.with_columns) if self.with_columns is not None else None, self.skip_rows, @@ -428,7 +429,7 @@ def do_evaluate( schema: Schema, typ: str, reader_options: dict[str, Any], - config_options: dict[str, Any], + config_options: ConfigOptions, paths: list[str], with_columns: list[str] | None, skip_rows: int, @@ -516,8 +517,7 @@ def do_evaluate( colnames[0], ) elif typ == "parquet": - parquet_options = config_options.get("parquet_options", {}) - if parquet_options.get("chunked", True): + if config_options.get("parquet_options.chunked", default=True): options = plc.io.parquet.ParquetReaderOptions.builder( plc.io.SourceInfo(paths) ).build() @@ -534,11 +534,13 @@ def do_evaluate( options.set_columns(with_columns) reader = plc.io.parquet.ChunkedParquetReader( options, - chunk_read_limit=parquet_options.get( - "chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE + chunk_read_limit=config_options.get( + "parquet_options.chunk_read_limit", + default=cls.PARQUET_DEFAULT_CHUNK_SIZE, ), - pass_read_limit=parquet_options.get( - "pass_read_limit", cls.PARQUET_DEFAULT_PASS_LIMIT + pass_read_limit=config_options.get( + "parquet_options.pass_read_limit", + default=cls.PARQUET_DEFAULT_PASS_LIMIT, ), ) chk = reader.read_chunk() @@ -702,7 +704,7 @@ class DataFrameScan(IR): """Polars LazyFrame object.""" projection: tuple[str, ...] | None """List of columns to project out.""" - config_options: dict[str, Any] + config_options: ConfigOptions """GPU-specific configuration options""" def __init__( @@ -710,7 +712,7 @@ def __init__( schema: Schema, df: Any, projection: Sequence[str] | None, - config_options: dict[str, Any], + config_options: ConfigOptions, ): self.schema = schema self.df = df @@ -736,7 +738,7 @@ def get_hashable(self) -> Hashable: schema_hash, id(self.df), self.projection, - json.dumps(self.config_options), + self.config_options, ) @classmethod @@ -876,7 +878,7 @@ def __init__(self, polars_groupby_options: Any): """Preserve order in groupby.""" options: GroupbyOptions """Arbitrary options.""" - config_options: dict[str, Any] + config_options: ConfigOptions """GPU-specific configuration options""" def __init__( @@ -886,7 +888,7 @@ def __init__( agg_requests: Sequence[expr.NamedExpr], maintain_order: bool, # noqa: FBT001 options: Any, - config_options: dict[str, Any], + config_options: ConfigOptions, df: IR, ): self.schema = schema @@ -912,18 +914,6 @@ def __init__( self.AggInfos(self.agg_requests), ) - def get_hashable(self) -> Hashable: - """Hashable representation of the node.""" - return ( - type(self), - tuple(self.schema.items()), - self.keys, - self.maintain_order, - self.options, - json.dumps(self.config_options), - self.children, - ) - @staticmethod def check_agg(agg: expr.Expr) -> int: """ diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index 1c8ae32973f..3c4c3bb3e0c 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -5,6 +5,7 @@ from __future__ import annotations +import copy import functools import json from contextlib import AbstractContextManager, nullcontext @@ -23,7 +24,7 @@ from cudf_polars.dsl import expr, ir from cudf_polars.dsl.to_ast import insert_colrefs from cudf_polars.typing import NodeTraverser -from cudf_polars.utils import dtypes, sorting +from cudf_polars.utils import config, dtypes, sorting if TYPE_CHECKING: from polars import GPUEngine @@ -41,13 +42,13 @@ class Translator: ---------- visitor Polars NodeTraverser object - config + engine GPU engine configuration. """ - def __init__(self, visitor: NodeTraverser, config: GPUEngine): + def __init__(self, visitor: NodeTraverser, engine: GPUEngine): self.visitor = visitor - self.config = config + self.config_options = config.ConfigOptions(copy.deepcopy(engine.config)) self.errors: list[Exception] = [] def translate_ir(self, *, n: int | None = None) -> ir.IR: @@ -233,7 +234,7 @@ def _( typ, reader_options, cloud_options, - translator.config.config.copy(), + translator.config_options, node.paths, with_columns, skip_rows, @@ -260,7 +261,7 @@ def _( schema, node.df, node.projection, - translator.config.config.copy(), + translator.config_options, ) @@ -288,7 +289,7 @@ def _( aggs, node.maintain_order, node.options, - translator.config.config.copy(), + translator.config_options, inp, ) diff --git a/python/cudf_polars/cudf_polars/experimental/groupby.py b/python/cudf_polars/cudf_polars/experimental/groupby.py index bc17fb2c86c..0d98eec98ef 100644 --- a/python/cudf_polars/cudf_polars/experimental/groupby.py +++ b/python/cudf_polars/cudf_polars/experimental/groupby.py @@ -166,9 +166,9 @@ def _( groupby_key_columns = [ne.name for ne in ir.keys] cardinality_factor = { c: min(f, 1.0) - for c, f in ir.config_options.get("executor_options", {}) - .get("cardinality_factor", {}) - .items() + for c, f in ir.config_options.get( + "executor_options.cardinality_factor", default={} + ).items() if c in groupby_key_columns } if cardinality_factor: @@ -278,7 +278,7 @@ def _( # Simple N-ary tree reduction j = 0 - n_ary = ir.config_options.get("executor_options", {}).get("groupby_n_ary", 32) + n_ary = ir.config_options.get("executor_options.groupby_n_ary", default=32) graph: MutableMapping[Any, Any] = {} name = get_key_name(ir) keys: list[Any] = [(child_name, i) for i in range(child_count)] diff --git a/python/cudf_polars/cudf_polars/experimental/io.py b/python/cudf_polars/cudf_polars/experimental/io.py index ba4432ecdea..a63817fbde0 100644 --- a/python/cudf_polars/cudf_polars/experimental/io.py +++ b/python/cudf_polars/cudf_polars/experimental/io.py @@ -22,14 +22,16 @@ from cudf_polars.dsl.expr import NamedExpr from cudf_polars.experimental.dispatch import LowerIRTransformer from cudf_polars.typing import Schema + from cudf_polars.utils.config import ConfigOptions @lower_ir_node.register(DataFrameScan) def _( ir: DataFrameScan, rec: LowerIRTransformer ) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: - rows_per_partition = ir.config_options.get("executor_options", {}).get( - "max_rows_per_partition", 1_000_000 + rows_per_partition = ir.config_options.get( + "executor_options.max_rows_per_partition", + default=1_000_000, ) nrows = max(ir.df.shape()[0], 1) @@ -91,8 +93,10 @@ def from_scan(ir: Scan) -> ScanPartitionPlan: """Extract the partitioning plan of a Scan operation.""" if ir.typ == "parquet": # TODO: Use system info to set default blocksize - parallel_options = ir.config_options.get("executor_options", {}) - blocksize: int = parallel_options.get("parquet_blocksize", 1024**3) + blocksize: int = ir.config_options.get( + "executor_options.parquet_blocksize", + default=1024**3, + ) stats = _sample_pq_statistics(ir) file_size = sum(float(stats[column]) for column in ir.schema) if file_size > 0: @@ -168,7 +172,7 @@ def do_evaluate( schema: Schema, typ: str, reader_options: dict[str, Any], - config_options: dict[str, Any], + config_options: ConfigOptions, paths: list[str], with_columns: list[str] | None, skip_rows: int, @@ -270,11 +274,9 @@ def _( paths = list(ir.paths) if plan.flavor == ScanPartitionFlavor.SPLIT_FILES: # Disable chunked reader when splitting files - config_options = ir.config_options.copy() - config_options["parquet_options"] = config_options.get( - "parquet_options", {} - ).copy() - config_options["parquet_options"]["chunked"] = False + config_options = ir.config_options.set( + name="parquet_options.chunked", value=False + ) slices: list[SplitScan] = [] for path in paths: diff --git a/python/cudf_polars/cudf_polars/utils/config.py b/python/cudf_polars/cudf_polars/utils/config.py new file mode 100644 index 00000000000..e445d7e1caf --- /dev/null +++ b/python/cudf_polars/cudf_polars/utils/config.py @@ -0,0 +1,137 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +"""Config utilities.""" + +from __future__ import annotations + +import copy +import json +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from typing_extensions import Self + +__all__ = ["ConfigOptions"] + + +class ConfigOptions: + """ + GPUEngine configuration-option manager. + + This is a convenience class to help manage the nested + dictionary of user-accessible `GPUEngine` options. + """ + + __slots__ = ("_hash_value", "config_options") + _hash_value: int + config_options: dict[str, Any] + """The underlying (nested) config-option dictionary.""" + + def __init__(self, options: dict[str, Any]): + self.validate(options) + self.config_options = options + + def set(self, name: str, value: Any) -> Self: + """ + Set a user config option. + + Nested dictionary keys should be separated by periods. + For example:: + + >>> options = options.set("parquet_options.chunked", False) + + Parameters + ---------- + name + Period-separated config name. + value + New config value. + """ + options = config_options = copy.deepcopy(self.config_options) + keys = name.split(".") + for k in keys[:-1]: + assert isinstance(options, dict) + if k not in options: + options[k] = {} + options = options[k] + options[keys[-1]] = value + return type(self)(config_options) + + def get(self, name: str, *, default: Any = None) -> Any: + """ + Get a user config option. + + Nested dictionary keys should be separated by periods. + For example:: + + >>> chunked = config_options.get("parquet_options.chunked") + + Parameters + ---------- + name + Period-separated config name. + default + Default return value. + + Returns + ------- + The user-specified config value, or `default` + if the config is not found. + """ + options = self.config_options + keys = name.split(".") + for k in keys[:-1]: + assert isinstance(options, dict) + options = options.get(k, {}) + return options.get(keys[-1], default) + + def __hash__(self) -> int: + """Hash a ConfigOptions object.""" + try: + return self._hash_value + except AttributeError: + self._hash_value = hash(json.dumps(self.config_options)) + return self._hash_value + + @staticmethod + def validate(config: dict) -> None: + """ + Validate a configuration-option dictionary. + + Parameters + ---------- + config + GPUEngine configuration options to validate. + + Raises + ------ + ValueError + If the configuration contains unsupported options. + """ + if unsupported := ( + config.keys() + - {"raise_on_fail", "parquet_options", "executor", "executor_options"} + ): + raise ValueError( + f"Engine configuration contains unsupported settings: {unsupported}" + ) + assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( + config.get("parquet_options", {}) + ) + + # Validate executor_options + executor = config.get("executor", "pylibcudf") + if executor == "dask-experimental": + unsupported = config.get("executor_options", {}).keys() - { + "max_rows_per_partition", + "parquet_blocksize", + "cardinality_factor", + "groupby_n_ary", + } + else: + unsupported = config.get("executor_options", {}).keys() + if unsupported: + raise ValueError( + f"Unsupported executor_options for {executor}: {unsupported}" + ) diff --git a/python/cudf_polars/tests/experimental/test_groupby.py b/python/cudf_polars/tests/experimental/test_groupby.py index ddd2f762d7d..c28584aaefc 100644 --- a/python/cudf_polars/tests/experimental/test_groupby.py +++ b/python/cudf_polars/tests/experimental/test_groupby.py @@ -34,14 +34,7 @@ def df(): @pytest.mark.parametrize("keys", [("y",), ("y", "z")]) def test_groupby(df, engine, op, keys): q = getattr(df.group_by(*keys), op)() - - from cudf_polars import Translator - from cudf_polars.experimental.parallel import evaluate_dask - - ir = Translator(q._ldf.visit(), engine).translate_ir() - evaluate_dask(ir) - - # assert_gpu_result_equal(q, engine=engine, check_row_order=False) + assert_gpu_result_equal(q, engine=engine, check_row_order=False) @pytest.mark.parametrize("op", ["sum", "mean", "len"])