-
Notifications
You must be signed in to change notification settings - Fork 997
Description
Summary
Introduce a new API that returns only the partition row indices from hash_partition.
Motivation
Hash partitioning is frequently used prior to data exchange. Consider the following steps:
- Call
hash_partitionto reorder rows by partition - Call
contiguous_splitto create packed buffers for transmission - Transmit packed buffers to recipient nodes
- Call
unpackand copy into acudf::table
This involves 3 data copies:
hash_partitiongathers/scatters rows to create the partitioned tablecontiguous_splitcopies data into contiguous packed buffers- Receiver copies unpacked data into a new table
The current hash_partition API returns std::pair<std::unique_ptr<table>, std::vector<size_type>>, which bundles the partition computation with the data materialization. Users who need packed output for transmission must pay for an extra copy, because the hash-partitioned table cannot be transmitted partition-by-partition (bitmasks, strings, and nested types would need their storage buffers broken into pieces).
Proposed API
New Function: hash_partition_indices
// cpp/include/cudf/partitioning.hpp
/**
* @brief Computes partition row indices without materializing the partitioned table.
*
* Returns a vector of columns where each column contains the row indices belonging
* to that partition. This enables downstream operations to gather the partitions in
* the most efficient format for their use case.
*
* @param input The table to partition
* @param columns_to_hash Indices of columns to hash for partitioning
* @param num_partitions Number of partitions to create
* @param hash_function Hash function to use (default: HASH_MURMUR3)
* @param seed Hash seed value
* @param stream CUDA stream
* @param mr Memory resource for allocations
* @return Vector of columns, each containing row indices for one partition
*/
std::vector<std::unique_ptr<column>> hash_partition_indices(
table_view const& input,
std::vector<size_type> const& columns_to_hash,
int num_partitions,
hash_id hash_function = hash_id::HASH_MURMUR3,
uint32_t seed = DEFAULT_HASH_SEED,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());Usage Pattern
// Get partition indices
auto partition_indices = cudf::hash_partition_indices(input, cols_to_hash, num_partitions);
// Gather each partition separately (potentially on separate streams)
std::vector<std::unique_ptr<table>> partitions;
for (auto& indices : partition_indices) {
partitions.push_back(cudf::gather(input, indices->view()));
}Implementation Notes
The existing hash_partition implementation already computes partition indices internally:
- Lines 501-547 (
partitioning.cu):compute_row_partition_numberskernel computes which partition each row belongs to - Lines 549-562: Exclusive scan computes partition offsets
- Lines 572-624: Gather/scatter materializes the output table
The new API would stop after step 2 and return the indices organized by partition, rather than continuing to step 3.
Key Files
| Component | File |
|---|---|
| Header | cpp/include/cudf/partitioning.hpp |
| Implementation | cpp/src/partitioning/partitioning.cu |
| Tests | cpp/tests/partitioning/hash_partition_test.cpp |
| Python bindings | python/pylibcudf/pylibcudf/partitioning.pyx |
Acceptance Criteria
- New
hash_partition_indicesC++ API incpp/include/cudf/partitioning.hpp - Implementation in
cpp/src/partitioning/partitioning.cu - Unit tests in
cpp/tests/partitioning/hash_partition_test.cppcovering:- Empty inputs (zero rows, zero columns)
- Single partition
- Many partitions (power-of-2 and non-power-of-2)
- All column types: fixed-width, strings, structs, lists
- Null handling in key and non-key columns
- Python/Cython bindings in
python/pylibcudf/pylibcudf/partitioning.pyx - Python tests
- API documentation
References
- Related:
contiguous_split(cpp/include/cudf/contiguous_split.hpp)