Skip to content
14 changes: 14 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ class RunConfig:
stats_planning: bool
max_io_threads: int
native_parquet: bool
spill_to_pinned_memory: bool

def __post_init__(self) -> None: # noqa: D105
if self.gather_shuffle_stats and self.shuffle != "rapidsmpf":
Expand Down Expand Up @@ -375,6 +376,7 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig:
stats_planning=args.stats_planning,
max_io_threads=args.max_io_threads,
native_parquet=args.native_parquet,
spill_to_pinned_memory=args.spill_to_pinned_memory,
)

def serialize(self, engine: pl.GPUEngine | None) -> dict:
Expand Down Expand Up @@ -466,6 +468,7 @@ def get_executor_options(
executor_options["client_device_threshold"] = run_config.spill_device
executor_options["runtime"] = run_config.runtime
executor_options["max_io_threads"] = run_config.max_io_threads
executor_options["spill_to_pinned_memory"] = run_config.spill_to_pinned_memory

if (
benchmark
Expand Down Expand Up @@ -585,6 +588,9 @@ def initialize_dask_cluster(run_config: RunConfig, args: argparse.Namespace): #
client,
options=Options(
{
"dask_spill_to_pinned_memory": str(
run_config.spill_to_pinned_memory
),
"dask_spill_device": str(run_config.spill_device),
"dask_statistics": str(args.rapidsmpf_dask_statistics),
"dask_print_statistics": str(args.rapidsmpf_print_statistics),
Expand Down Expand Up @@ -971,6 +977,14 @@ def parse_args(
default=None,
help="Optional directory to write query results as parquet files.",
)
parser.add_argument(
"--spill-to-pinned-memory",
action=argparse.BooleanOptionalAction,
default=False,
help=textwrap.dedent("""\
Whether RapidsMPF should spill to pinned host memory when available,
or use regular pageable host memory."""),
)

parsed_args = parser.parse_args(args)

Expand Down
11 changes: 10 additions & 1 deletion python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from rapidsmpf.config import Options, get_environment_variables
from rapidsmpf.memory.buffer import MemoryType
from rapidsmpf.memory.buffer_resource import BufferResource, LimitAvailableMemory
from rapidsmpf.memory.pinned_memory_resource import PinnedMemoryResource
from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor
from rapidsmpf.streaming.core.context import Context
from rapidsmpf.streaming.core.leaf_node import pull_from_channel
Expand Down Expand Up @@ -201,11 +202,19 @@ def evaluate_pipeline(
}
| get_environment_variables()
)
pinned_mr = (
PinnedMemoryResource.make_if_available()
if config_options.executor.spill_to_pinned_memory
else None
)
if isinstance(config_options.cuda_stream_policy, CUDAStreamPoolConfig):
stream_pool = config_options.cuda_stream_policy.build()
local_comm = new_communicator(options)
br = BufferResource(
mr, memory_available=memory_available, stream_pool=stream_pool
mr,
pinned_mr=pinned_mr,
memory_available=memory_available,
stream_pool=stream_pool,
)
rmpf_context_manager = Context(local_comm, br, options)

Expand Down
14 changes: 13 additions & 1 deletion python/cudf_polars/cudf_polars/utils/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""
Expand Down Expand Up @@ -649,6 +649,11 @@ class StreamingExecutor:
max_io_threads
Maximum number of IO threads for the rapidsmpf runtime. Default is 2.
This controls the parallelism of IO operations when reading data.
spill_to_pinned_memory
Whether RapidsMPF should spill to pinned host memory when available,
or use regular pageable host memory. Pinned host memory offers higher
bandwidth and lower latency for device to host transfers compared to
regular pageable host memory.

Notes
-----
Expand Down Expand Up @@ -751,6 +756,11 @@ class StreamingExecutor:
f"{_env_prefix}__MAX_IO_THREADS", int, default=2
)
)
spill_to_pinned_memory: bool = dataclasses.field(
default_factory=_make_default_factory(
f"{_env_prefix}__SPILL_TO_PINNED_MEMORY", bool, default=False
)
)

def __post_init__(self) -> None: # noqa: D105
# Check for rapidsmpf runtime
Expand Down Expand Up @@ -887,6 +897,8 @@ def __post_init__(self) -> None: # noqa: D105
raise TypeError("client_device_threshold must be a float")
if not isinstance(self.max_io_threads, int):
raise TypeError("max_io_threads must be an int")
if not isinstance(self.spill_to_pinned_memory, bool):
raise TypeError("spill_to_pinned_memory must be bool")

# RapidsMPF spill is only supported for distributed clusters for now.
# This is because the spilling API is still within the RMPF-Dask integration.
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def test_validate_shuffle_insertion_method() -> None:
"sink_to_directory",
"client_device_threshold",
"max_io_threads",
"spill_to_pinned_memory",
],
)
def test_validate_streaming_executor_options(option: str) -> None:
Expand Down