Skip to content

Commit

Permalink
Mark column chunks in a PQ reader pass as large strings when the cu…
Browse files Browse the repository at this point in the history
…mulative `offsets` exceeds the large strings threshold. (#17207)

This PR implements a method to correctly set the large-string property for column chunks in a in the Chunked Parquet Reader subpass if the cumulative string offsets have exceeded the large strings threshold.

Fixes #17158

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)
  - David Wendt (https://github.com/davidwendt)

URL: #17207
  • Loading branch information
mhaseeb123 authored Nov 7, 2024
1 parent c73defd commit e52ce85
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 10 deletions.
31 changes: 25 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,37 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
_stream);
}

// Compute column string sizes (using page string offsets) for this subpass
col_string_sizes = calculate_page_string_offsets();

// check for overflow
// ensure cumulative column string sizes have been initialized
if (pass.cumulative_col_string_sizes.empty()) {
pass.cumulative_col_string_sizes.resize(_input_columns.size(), 0);
}

// Add to the cumulative column string sizes of this pass
std::transform(pass.cumulative_col_string_sizes.begin(),
pass.cumulative_col_string_sizes.end(),
col_string_sizes.begin(),
pass.cumulative_col_string_sizes.begin(),
std::plus<>{});

// Check for overflow in cumulative column string sizes of this pass so that the page string
// offsets of overflowing (large) string columns are treated as 64-bit.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
auto const has_large_strings = std::any_of(col_string_sizes.cbegin(),
col_string_sizes.cend(),
auto const has_large_strings = std::any_of(pass.cumulative_col_string_sizes.cbegin(),
pass.cumulative_col_string_sizes.cend(),
[=](std::size_t sz) { return sz > threshold; });
if (has_large_strings and not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}

// mark any chunks that are large string columns
// Mark any chunks for which the cumulative column string size has exceeded the
// large strings threshold
if (has_large_strings) {
for (auto& chunk : pass.chunks) {
auto const idx = chunk.src_col_index;
if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
if (pass.cumulative_col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
}
}
}
Expand Down Expand Up @@ -195,7 +210,11 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
// only do string buffer for leaf
if (idx == max_depth - 1 and out_buf.string_size() == 0 and
col_string_sizes[pass.chunks[c].src_col_index] > 0) {
out_buf.create_string_data(col_string_sizes[pass.chunks[c].src_col_index], _stream);
out_buf.create_string_data(
col_string_sizes[pass.chunks[c].src_col_index],
pass.cumulative_col_string_sizes[pass.chunks[c].src_col_index] >
static_cast<size_t>(strings::detail::get_offset64_threshold()),
_stream);
}
if (has_strings) { str_data[idx] = out_buf.string_data(); }
out_buf.user_data |=
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ struct pass_intermediate_data {
rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()};
rmm::device_uvector<string_index_pair> str_dict_index{0, cudf::get_default_stream()};

// cumulative strings column sizes.
std::vector<size_t> cumulative_col_string_sizes{};

int level_type_size{0};

// skip_rows / num_rows for this pass.
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ void cudf::io::detail::inline_column_buffer::allocate_strings_data(bool memset_d
}

void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes,
bool is_large_strings_col,
rmm::cuda_stream_view stream)
{
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
_is_large_strings_col = is_large_strings_col;
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
}

namespace {
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,17 @@ class inline_column_buffer : public column_buffer_base<inline_column_buffer> {
[[nodiscard]] size_t data_size_impl() const { return _data.size(); }
std::unique_ptr<column> make_string_column_impl(rmm::cuda_stream_view stream);

void create_string_data(size_t num_bytes, rmm::cuda_stream_view stream);
void create_string_data(size_t num_bytes,
bool is_large_strings_col,
rmm::cuda_stream_view stream);
void* string_data() { return _string_data.data(); }
[[nodiscard]] void const* string_data() const { return _string_data.data(); }
[[nodiscard]] size_t string_size() const { return _string_data.size(); }
[[nodiscard]] bool is_large_strings_column() const { return _is_large_strings_col; }

private:
rmm::device_buffer _string_data{};
bool _is_large_strings_col{};
};

using column_buffer = gather_column_buffer;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/io/utilities/column_buffer_strings.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_colu
{
// if the size of _string_data is over the threshold for 64bit size_type, _data will contain
// sizes rather than offsets. need special handling for that case.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
if (_string_data.size() > threshold) {
if (is_large_strings_column()) {
if (not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}
Expand Down
74 changes: 74 additions & 0 deletions cpp/tests/large_strings/parquet_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cudf_test/table_utilities.hpp>

#include <cudf/concatenate.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>
Expand Down Expand Up @@ -69,3 +70,76 @@ TEST_F(ParquetStringsTest, ReadLargeStrings)
// go back to normal threshold
unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD");
}

// Disabled as the test is too brittle and depends on empirically set `pass_read_limit`,
// encoding type, and the currently used `ZSTD` scratch space size.
TEST_F(ParquetStringsTest, DISABLED_ChunkedReadLargeStrings)
{
// Construct a table with one large strings column > 2GB
auto const wide = this->wide_column();
auto input = cudf::concatenate(std::vector<cudf::column_view>(120000, wide)); ///< 230MB

int constexpr multiplier = 12;
std::vector<cudf::column_view> input_cols(multiplier, input->view());
auto col0 = cudf::concatenate(input_cols); ///< 2.70GB

// Expected table
auto const expected = cudf::table_view{{col0->view()}};
auto expected_metadata = cudf::io::table_input_metadata{expected};

// Needed to get exactly 2 Parquet subpasses: first with large-strings and the second with
// regualar ones. This may change in the future and lead to false failures.
expected_metadata.column_metadata[0].set_encoding(
cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY);

// Host buffer to write Parquet
std::vector<char> buffer;

// Writer options
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected)
.metadata(expected_metadata);

// Needed to get exactly 2 Parquet subpasses: first with large-strings and the second with
// regualar ones. This may change in the future and lead to false failures.
out_opts.set_compression(cudf::io::compression_type::ZSTD);

// Write to Parquet
cudf::io::write_parquet(out_opts);

// Empirically set pass_read_limit of 8GB so we read almost entire table (>2GB strings) in the
// first subpass and only a small amount in the second subpass. This may change in the future
// and lead to false failures.
size_t constexpr pass_read_limit = size_t{8} * 1024 * 1024 * 1024;

// Reader options
cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size()));

// Chunked parquet reader
auto reader = cudf::io::chunked_parquet_reader(0, pass_read_limit, default_in_opts);

// Read chunked
auto tables = std::vector<std::unique_ptr<cudf::table>>{};
while (reader.has_next()) {
tables.emplace_back(reader.read_chunk().tbl);
}
auto table_views = std::vector<cudf::table_view>{};
std::transform(tables.begin(), tables.end(), std::back_inserter(table_views), [](auto& tbl) {
return tbl->view();
});
auto result = cudf::concatenate(table_views);
auto const result_view = result->view();

// Verify offsets
for (auto const& cv : result_view) {
auto const offsets = cudf::strings_column_view(cv).offsets();
EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64});
}

// Verify tables to be equal
CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected);

// Verify that we read exactly two table chunks
EXPECT_EQ(tables.size(), 2);
}

0 comments on commit e52ce85

Please sign in to comment.