Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -28,7 +28,7 @@ auto serialize_roaring_bitmap(roaring64_bitmap_t const* roaring_bitmap)
{
auto const num_bytes = roaring64_bitmap_portable_size_in_bytes(roaring_bitmap);
CUDF_EXPECTS(num_bytes > 0, "Roaring64 bitmap is empty");
auto serialized_bitmap = thrust::host_vector<cuda::std::byte>(num_bytes);
auto serialized_bitmap = std::vector<cuda::std::byte>(num_bytes);
std::ignore = roaring64_bitmap_portable_serialize(
roaring_bitmap, reinterpret_cast<char*>(serialized_bitmap.data()));
return serialized_bitmap;
Expand Down Expand Up @@ -158,15 +158,15 @@ auto setup_table_and_deletion_vector(nvbench::state& state)
}

// Row offsets for each row group - arbitrary, only used to build the index column
auto row_group_offsets = thrust::host_vector<size_t>(num_row_groups);
auto row_group_offsets = std::vector<size_t>(num_row_groups);
row_group_offsets[0] = static_cast<size_t>(std::llround(2e9));
std::for_each(
thrust::counting_iterator<size_t>(1),
thrust::counting_iterator<size_t>(num_row_groups),
[&](auto i) { row_group_offsets[i] = std::llround(row_group_offsets[i - 1] + 0.5e9); });

// Row group splits
auto row_group_splits = thrust::host_vector<cudf::size_type>(num_row_groups - 1);
auto row_group_splits = std::vector<cudf::size_type>(num_row_groups - 1);
{
std::mt19937 engine{0xf00d};
std::uniform_int_distribution<cudf::size_type> dist{1, num_rows};
Expand All @@ -175,7 +175,7 @@ auto setup_table_and_deletion_vector(nvbench::state& state)
}

// Number of rows in each row group
auto row_group_num_rows = thrust::host_vector<cudf::size_type>{};
auto row_group_num_rows = std::vector<cudf::size_type>{};
{
row_group_num_rows.reserve(num_row_groups);
auto previous_split = cudf::size_type{0};
Expand Down Expand Up @@ -216,15 +216,15 @@ void BM_parquet_deletion_vectors(nvbench::state& state)

auto mem_stats_logger = cudf::memory_stats_logger();
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
try_drop_l3_cache();
state.exec(
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
try_drop_l3_cache();

timer.start();
auto const result = cudf::io::parquet::experimental::read_parquet(
read_opts, deletion_vector, row_group_offsets, row_group_num_rows);
timer.stop();
});
timer.start();
std::ignore = cudf::io::parquet::experimental::read_parquet(
read_opts, deletion_vector, std::move(row_group_offsets), std::move(row_group_num_rows));
timer.stop();
});

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(num_rows) / time, "rows_per_second");
Expand All @@ -245,6 +245,11 @@ void BM_parquet_chunked_deletion_vectors(nvbench::state& state)
auto [source_sink, row_group_offsets, row_group_num_rows, deletion_vector] =
setup_table_and_deletion_vector(state);

auto rows_per_deletion_vector =
std::vector<cudf::size_type>{std::numeric_limits<cudf::size_type>::max()};
auto deletion_vectors = std::vector<cudf::host_span<cuda::std::byte>>{};
deletion_vectors.emplace_back(deletion_vector);

cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_sink.make_source_info());

Expand All @@ -256,12 +261,14 @@ void BM_parquet_chunked_deletion_vectors(nvbench::state& state)
try_drop_l3_cache();

timer.start();
auto reader = cudf::io::parquet::experimental::chunked_parquet_reader(chunk_read_limit,
pass_read_limit,
read_opts,
deletion_vector,
row_group_offsets,
row_group_num_rows);
auto reader =
cudf::io::parquet::experimental::chunked_parquet_reader(chunk_read_limit,
pass_read_limit,
read_opts,
std::move(deletion_vectors),
std::move(rows_per_deletion_vector),
std::move(row_group_offsets),
std::move(row_group_num_rows));
do {
auto const result = reader.read_chunk();
num_chunks++;
Expand Down
121 changes: 33 additions & 88 deletions cpp/include/cudf/io/experimental/deletion_vectors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,76 +47,21 @@ class chunked_parquet_reader {
*
* @param chunk_read_limit Byte limit on the returned table chunk size, `0` if there is no limit
* @param options Parquet reader options
* @param serialized_roaring_bitmap Host span of `portable` serialized 64-bit roaring bitmap
* @param row_group_offsets Host span of row offsets of each row group
* @param row_group_num_rows Host span of number of rows in each row group
* @param serialized_roaring_bitmaps Vector of spans of `portable` serialized 64-bit roaring
* bitmaps
* @param deletion_vector_row_counts Vector of number of rows in each deletion vector
* @param row_group_offsets Vector of row offsets of each row group
* @param row_group_num_rows Vector of number of rows in each row group
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed these confusion causing constructors.

std::size_t chunk_read_limit,
parquet_reader_options const& options,
cudf::host_span<cuda::std::byte const> serialized_roaring_bitmap,
cudf::host_span<size_t const> row_group_offsets,
cudf::host_span<size_type const> row_group_num_rows,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Constructor for the chunked reader
*
* Requires the same arguments as `cudf::io::parquet::experimental::read_parquet()`, with
* additional parameters to specify the size byte limit of the output table chunk produced, and a
* byte limit on the amount of temporary memory to use when reading. The `pass_read_limit` affects
* how many row groups we can read at a time by limiting the amount of memory dedicated to
* decompression space. The `pass_read_limit` is a hint, not an absolute limit - if a single row
* group cannot fit within the limit given, it will still be loaded. Also note that the
* `pass_read_limit` does not include the memory to deserialize and construct the roaring64 bitmap
* deletion vector that stays alive throughout the the lifetime of the reader.
*
* @param chunk_read_limit Byte limit on the returned table chunk size, `0` if there is no limit
* @param pass_read_limit Byte limit on the amount of memory used for decompressing and decoding
* data, `0` if there is no limit
* @param options Parquet reader options
* @param serialized_roaring_bitmap Host span of `portable` serialized 64-bit roaring bitmap
* @param row_group_offsets Host span of row offsets of each row group
* @param row_group_num_rows Host span of number of rows in each row group
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
cudf::host_span<cuda::std::byte const> serialized_roaring_bitmap,
cudf::host_span<size_t const> row_group_offsets,
cudf::host_span<size_type const> row_group_num_rows,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Constructor for the chunked reader
*
* Requires the same arguments as the `cudf::io::parquet::experimental::read_parquet()`, and an
* additional parameter to specify the size byte limit of the output table chunk produced.
*
* @param chunk_read_limit Byte limit on the returned table chunk size, `0` if there is no limit
* @param options Parquet reader options
* @param serialized_roaring_bitmaps Host span of host spans containing `portable` serialized
* 64-bit roaring bitmaps
* @param deletion_vector_row_counts Host span of number of rows in each deletion vector
* @param row_group_offsets Host span of row offsets of each row group
* @param row_group_num_rows Host span of number of rows in each row group
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
parquet_reader_options const& options,
cudf::host_span<cudf::host_span<cuda::std::byte const> const> serialized_roaring_bitmaps,
cudf::host_span<size_type const> deletion_vector_row_counts,
cudf::host_span<size_t const> row_group_offsets,
cudf::host_span<size_type const> row_group_num_rows,
std::vector<cudf::host_span<cuda::std::byte>>&& serialized_roaring_bitmaps,
std::vector<size_type>&& deletion_vector_row_counts,
std::vector<size_t>&& row_group_offsets,
std::vector<size_type>&& row_group_num_rows,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

Expand All @@ -136,22 +81,22 @@ class chunked_parquet_reader {
* @param pass_read_limit Byte limit on the amount of memory used for decompressing and decoding
* data, `0` if there is no limit
* @param options Parquet reader options
* @param serialized_roaring_bitmaps Host span of host spans containing `portable` serialized
* 64-bit roaring bitmap
* @param deletion_vector_row_counts Host span of number of rows in each deletion vector
* @param row_group_offsets Host span of row offsets of each row group
* @param row_group_num_rows Host span of number of rows in each row group
* @param serialized_roaring_bitmaps Vector of spans of `portable` serialized 64-bit roaring
* bitmaps
* @param deletion_vector_row_counts Vector of number of rows in each deletion vector
* @param row_group_offsets Vector of row offsets of each row group
* @param row_group_num_rows Vector of number of rows in each row group
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
cudf::host_span<cudf::host_span<cuda::std::byte const> const> serialized_roaring_bitmaps,
cudf::host_span<size_type const> deletion_vector_row_counts,
cudf::host_span<size_t const> row_group_offsets,
cudf::host_span<size_type const> row_group_num_rows,
std::vector<cudf::host_span<cuda::std::byte>>&& serialized_roaring_bitmaps,
std::vector<size_type>&& deletion_vector_row_counts,
std::vector<size_t>&& row_group_offsets,
std::vector<size_type>&& row_group_num_rows,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

Expand Down Expand Up @@ -209,9 +154,9 @@ class chunked_parquet_reader {
* @ingroup io_readers
*
* @param options Parquet reader options
* @param serialized_roaring_bitmap Host span of `portable` serialized 64-bit roaring bitmap
* @param row_group_offsets Host span of row index offsets for each row group
* @param row_group_num_rows Host span of number of rows in each row group
* @param serialized_roaring_bitmap Span of `portable` serialized 64-bit roaring bitmap
* @param row_group_offsets Vector of row index offsets for each row group
* @param row_group_num_rows Vector of number of rows in each row group
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the returned table
*
Expand All @@ -220,9 +165,9 @@ class chunked_parquet_reader {
*/
table_with_metadata read_parquet(
parquet_reader_options const& options,
cudf::host_span<cuda::std::byte const> serialized_roaring_bitmap,
cudf::host_span<size_t const> row_group_offsets,
cudf::host_span<size_type const> row_group_num_rows,
cudf::host_span<cuda::std::byte> serialized_roaring_bitmap,
std::vector<size_t>&& row_group_offsets,
std::vector<size_type>&& row_group_num_rows,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref());

Expand All @@ -241,11 +186,11 @@ table_with_metadata read_parquet(
* @ingroup io_readers
*
* @param options Parquet reader options
* @param serialized_roaring_bitmaps Host span of host spans containing `portable` serialized 64-bit
* roaring bitmaps
* @param deletion_vector_row_counts Host span of number of rows in each deletion vector
* @param row_group_offsets Host span of row index offsets for each row group
* @param row_group_num_rows Host span of number of rows in each row group
* @param serialized_roaring_bitmaps Vector of spans of `portable` serialized 64-bit roaring
* bitmaps
* @param deletion_vector_row_counts Vector of number of rows in each deletion vector
* @param row_group_offsets Vector of row index offsets for each row group
* @param row_group_num_rows Vector of number of rows in each row group
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the returned table
*
Expand All @@ -254,10 +199,10 @@ table_with_metadata read_parquet(
*/
table_with_metadata read_parquet(
parquet_reader_options const& options,
cudf::host_span<cudf::host_span<cuda::std::byte const> const> serialized_roaring_bitmaps,
cudf::host_span<size_type const> deletion_vector_row_counts,
cudf::host_span<size_t const> row_group_offsets,
cudf::host_span<size_type const> row_group_num_rows,
std::vector<cudf::host_span<cuda::std::byte>>&& serialized_roaring_bitmaps,
std::vector<size_type>&& deletion_vector_row_counts,
std::vector<size_t>&& row_group_offsets,
std::vector<size_type>&& row_group_num_rows,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref());

Expand Down
Loading