diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index ac7f378a25d..788ff15f3c1 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -528,6 +528,8 @@ class parquet_writer_options { std::optional _max_page_fragment_size; // Optional compression statistics std::shared_ptr _compression_stats; + // write V2 page headers? + bool _v2_page_headers = false; /** * @brief Constructor from sink and table. @@ -712,6 +714,13 @@ class parquet_writer_options { return _compression_stats; } + /** + * @brief Returns `true` if V2 page headers should be written. + * + * @return `true` if V2 page headers should be written. + */ + [[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; } + /** * @brief Sets partitions. * @@ -829,6 +838,13 @@ class parquet_writer_options { { _compression_stats = std::move(comp_stats); } + + /** + * @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`. + * + * @param val Boolean value to enable/disable writing of V2 page headers. + */ + void enable_write_v2_headers(bool val) { _v2_page_headers = val; } }; /** @@ -1060,6 +1076,14 @@ class parquet_writer_options_builder { return *this; } + /** + * @brief Set to true if V2 page headers are to be written. + * + * @param enabled Boolean value to enable/disable writing of V2 page headers. + * @return this for chaining + */ + parquet_writer_options_builder& write_v2_headers(bool enabled); + /** * @brief move parquet_writer_options member once it's built. */ @@ -1141,6 +1165,8 @@ class chunked_parquet_writer_options { std::optional _max_page_fragment_size; // Optional compression statistics std::shared_ptr _compression_stats; + // write V2 page headers? + bool _v2_page_headers = false; /** * @brief Constructor from sink. @@ -1281,6 +1307,13 @@ class chunked_parquet_writer_options { return _compression_stats; } + /** + * @brief Returns `true` if V2 page headers should be written. + * + * @return `true` if V2 page headers should be written. + */ + [[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; } + /** * @brief Sets metadata. * @@ -1384,6 +1417,13 @@ class chunked_parquet_writer_options { _compression_stats = std::move(comp_stats); } + /** + * @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`. + * + * @param val Boolean value to enable/disable writing of V2 page headers. + */ + void enable_write_v2_headers(bool val) { _v2_page_headers = val; } + /** * @brief creates builder to build chunked_parquet_writer_options. * @@ -1475,6 +1515,14 @@ class chunked_parquet_writer_options_builder { return *this; } + /** + * @brief Set to true if V2 page headers are to be written. + * + * @param enabled Boolean value to enable/disable writing of V2 page headers. + * @return this for chaining + */ + chunked_parquet_writer_options_builder& write_v2_headers(bool enabled); + /** * @brief Sets the maximum row group size, in bytes. * diff --git a/cpp/include/cudf_test/base_fixture.hpp b/cpp/include/cudf_test/base_fixture.hpp index 364355438fd..05319e03003 100644 --- a/cpp/include/cudf_test/base_fixture.hpp +++ b/cpp/include/cudf_test/base_fixture.hpp @@ -59,6 +59,27 @@ class BaseFixture : public ::testing::Test { rmm::mr::device_memory_resource* mr() { return _mr; } }; +/** + * @brief Base test fixture that takes a parameter. + * + * Example: + * ``` + * class MyIntTestFixture : public cudf::test::BaseFixtureWithParam {}; + * ``` + */ +template +class BaseFixtureWithParam : public ::testing::TestWithParam { + rmm::mr::device_memory_resource* _mr{rmm::mr::get_current_device_resource()}; + + public: + /** + * @brief Returns pointer to `device_memory_resource` that should be used for + * all tests inheriting from this fixture + * @return pointer to memory resource + */ + rmm::mr::device_memory_resource* mr() const { return _mr; } +}; + template struct uniform_distribution_impl {}; template diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index f0df650c79e..5adb2046dbd 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -750,6 +750,12 @@ parquet_writer_options_builder& parquet_writer_options_builder::max_page_fragmen return *this; } +parquet_writer_options_builder& parquet_writer_options_builder::write_v2_headers(bool enabled) +{ + options.enable_write_v2_headers(enabled); + return *this; +} + void chunked_parquet_writer_options::set_key_value_metadata( std::vector> metadata) { @@ -831,6 +837,13 @@ chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder:: return *this; } +chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::write_v2_headers( + bool enabled) +{ + options.enable_write_v2_headers(enabled); + return *this; +} + chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::max_page_fragment_size(size_type val) { diff --git a/cpp/src/io/parquet/compact_protocol_reader.cpp b/cpp/src/io/parquet/compact_protocol_reader.cpp index 789c76a860c..92fcd151925 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.cpp +++ b/cpp/src/io/parquet/compact_protocol_reader.cpp @@ -255,7 +255,8 @@ bool CompactProtocolReader::read(PageHeader* p) ParquetFieldInt32(2, p->uncompressed_page_size), ParquetFieldInt32(3, p->compressed_page_size), ParquetFieldStruct(5, p->data_page_header), - ParquetFieldStruct(7, p->dictionary_page_header)); + ParquetFieldStruct(7, p->dictionary_page_header), + ParquetFieldStruct(8, p->data_page_header_v2)); return function_builder(this, op); } @@ -275,6 +276,18 @@ bool CompactProtocolReader::read(DictionaryPageHeader* d) return function_builder(this, op); } +bool CompactProtocolReader::read(DataPageHeaderV2* d) +{ + auto op = std::make_tuple(ParquetFieldInt32(1, d->num_values), + ParquetFieldInt32(2, d->num_nulls), + ParquetFieldInt32(3, d->num_rows), + ParquetFieldEnum(4, d->encoding), + ParquetFieldInt32(5, d->definition_levels_byte_length), + ParquetFieldInt32(6, d->repetition_levels_byte_length), + ParquetFieldBool(7, d->is_compressed)); + return function_builder(this, op); +} + bool CompactProtocolReader::read(KeyValue* k) { auto op = std::make_tuple(ParquetFieldString(1, k->key), ParquetFieldString(2, k->value)); diff --git a/cpp/src/io/parquet/compact_protocol_reader.hpp b/cpp/src/io/parquet/compact_protocol_reader.hpp index 453a4be9b83..62ccacaac37 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.hpp +++ b/cpp/src/io/parquet/compact_protocol_reader.hpp @@ -114,6 +114,7 @@ class CompactProtocolReader { bool read(PageHeader* p); bool read(DataPageHeader* d); bool read(DictionaryPageHeader* d); + bool read(DataPageHeaderV2* d); bool read(KeyValue* k); bool read(PageLocation* p); bool read(OffsetIndex* o); diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 190f70d0747..9f4c0ba943a 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -309,7 +309,8 @@ __global__ void __launch_bounds__(128) int32_t num_columns, size_t max_page_size_bytes, size_type max_page_size_rows, - uint32_t page_align) + uint32_t page_align, + bool write_v2_headers) { // TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach __shared__ __align__(8) parquet_column_device_view col_g; @@ -318,7 +319,8 @@ __global__ void __launch_bounds__(128) __shared__ __align__(8) EncPage page_g; __shared__ __align__(8) statistics_merge_group pagestats_g; - uint32_t t = threadIdx.x; + uint32_t const t = threadIdx.x; + auto const data_page_type = write_v2_headers ? PageType::DATA_PAGE_V2 : PageType::DATA_PAGE; if (t == 0) { col_g = col_desc[blockIdx.x]; @@ -449,7 +451,7 @@ __global__ void __launch_bounds__(128) page_g.num_fragments = fragments_in_chunk - page_start; page_g.chunk = &chunks[blockIdx.y][blockIdx.x]; page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x; - page_g.page_type = PageType::DATA_PAGE; + page_g.page_type = data_page_type; page_g.hdr_size = 0; page_g.max_hdr_size = 32; // Max size excluding statistics if (ck_g.stats) { @@ -968,8 +970,12 @@ __global__ void __launch_bounds__(128, 8) device_span comp_results) { __shared__ __align__(8) page_enc_state_s state_g; - using block_scan = cub::BlockScan; - __shared__ typename block_scan::TempStorage temp_storage; + using block_reduce = cub::BlockReduce; + using block_scan = cub::BlockScan; + __shared__ union { + typename block_reduce::TempStorage reduce_storage; + typename block_scan::TempStorage scan_storage; + } temp_storage; page_enc_state_s* const s = &state_g; uint32_t t = threadIdx.x; @@ -980,9 +986,15 @@ __global__ void __launch_bounds__(128, 8) s->ck = *s->page.chunk; s->col = *s->ck.col_desc; s->cur = s->page.page_data + s->page.max_hdr_size; + // init V2 info + s->page.def_lvl_bytes = 0; + s->page.rep_lvl_bytes = 0; + s->page.num_nulls = 0; } __syncthreads(); + auto const is_v2 = s->page.page_type == PageType::DATA_PAGE_V2; + // Encode Repetition and Definition levels if (s->page.page_type != PageType::DICTIONARY_PAGE && (s->col.num_def_level_bits()) != 0 && // This means max definition level is not 0 (nullable) @@ -995,7 +1007,10 @@ __global__ void __launch_bounds__(128, 8) s->rle_run = 0; s->rle_pos = 0; s->rle_numvals = 0; - s->rle_out = s->cur + 4; + s->rle_out = s->cur; + if (not is_v2) { + s->rle_out += 4; // save space for length + } } __syncthreads(); while (s->rle_numvals < s->page.num_rows) { @@ -1037,11 +1052,13 @@ __global__ void __launch_bounds__(128, 8) __syncthreads(); } if (t < 32) { - uint8_t* cur = s->cur; - uint8_t* rle_out = s->rle_out; - if (t < 4) { - uint32_t rle_bytes = (uint32_t)(rle_out - cur) - 4; - cur[t] = rle_bytes >> (t * 8); + uint8_t* const cur = s->cur; + uint8_t* const rle_out = s->rle_out; + uint32_t const rle_bytes = static_cast(rle_out - cur) - (is_v2 ? 0 : 4); + if (is_v2 && t == 0) { + s->page.def_lvl_bytes = rle_bytes; + } else if (not is_v2 && t < 4) { + cur[t] = rle_bytes >> (t * 8); } __syncwarp(); if (t == 0) { s->cur = rle_out; } @@ -1050,14 +1067,17 @@ __global__ void __launch_bounds__(128, 8) } else if (s->page.page_type != PageType::DICTIONARY_PAGE && s->col.num_rep_level_bits() != 0 // This means there ARE repetition levels (has list) ) { - auto encode_levels = [&](uint8_t const* lvl_val_data, uint32_t nbits) { + auto encode_levels = [&](uint8_t const* lvl_val_data, uint32_t nbits, uint32_t& lvl_bytes) { // For list types, the repetition and definition levels are pre-calculated. We just need to // encode and write them now. if (!t) { s->rle_run = 0; s->rle_pos = 0; s->rle_numvals = 0; - s->rle_out = s->cur + 4; + s->rle_out = s->cur; + if (not is_v2) { + s->rle_out += 4; // save space for length + } } __syncthreads(); size_type page_first_val_idx = s->col.level_offsets[s->page.start_row]; @@ -1075,19 +1095,21 @@ __global__ void __launch_bounds__(128, 8) __syncthreads(); } if (t < 32) { - uint8_t* cur = s->cur; - uint8_t* rle_out = s->rle_out; - if (t < 4) { - uint32_t rle_bytes = (uint32_t)(rle_out - cur) - 4; - cur[t] = rle_bytes >> (t * 8); + uint8_t* const cur = s->cur; + uint8_t* const rle_out = s->rle_out; + uint32_t const rle_bytes = static_cast(rle_out - cur) - (is_v2 ? 0 : 4); + if (is_v2 && t == 0) { + lvl_bytes = rle_bytes; + } else if (not is_v2 && t < 4) { + cur[t] = rle_bytes >> (t * 8); } __syncwarp(); if (t == 0) { s->cur = rle_out; } } }; - encode_levels(s->col.rep_values, s->col.num_rep_level_bits()); + encode_levels(s->col.rep_values, s->col.num_rep_level_bits(), s->page.rep_lvl_bytes); __syncthreads(); - encode_levels(s->col.def_values, s->col.num_def_level_bits()); + encode_levels(s->col.def_values, s->col.num_def_level_bits(), s->page.def_lvl_bytes); } // Encode data values __syncthreads(); @@ -1118,6 +1140,7 @@ __global__ void __launch_bounds__(128, 8) s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); } __syncthreads(); + uint32_t num_valid = 0; for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, 128); uint32_t len, pos; @@ -1144,13 +1167,15 @@ __global__ void __launch_bounds__(128, 8) return std::make_tuple(is_valid, val_idx); }(); + if (is_valid) num_valid++; + cur_val_idx += nvals; if (dict_bits >= 0) { // Dictionary encoding if (dict_bits > 0) { uint32_t rle_numvals; uint32_t rle_numvals_in_block; - block_scan(temp_storage).ExclusiveSum(is_valid, pos, rle_numvals_in_block); + block_scan(temp_storage.scan_storage).ExclusiveSum(is_valid, pos, rle_numvals_in_block); rle_numvals = s->rle_numvals; if (is_valid) { uint32_t v; @@ -1190,7 +1215,7 @@ __global__ void __launch_bounds__(128, 8) len = 0; } uint32_t total_len = 0; - block_scan(temp_storage).ExclusiveSum(len, pos, total_len); + block_scan(temp_storage.scan_storage).ExclusiveSum(len, pos, total_len); __syncthreads(); if (t == 0) { s->cur = dst + total_len; } if (is_valid) { @@ -1317,7 +1342,11 @@ __global__ void __launch_bounds__(128, 8) __syncthreads(); } } + + uint32_t const valid_count = block_reduce(temp_storage.reduce_storage).Sum(num_valid); + if (t == 0) { + s->page.num_nulls = s->page.num_values - valid_count; uint8_t* base = s->page.page_data + s->page.max_hdr_size; auto actual_data_size = static_cast(s->cur - base); if (actual_data_size > s->page.max_data_size) { @@ -1325,8 +1354,13 @@ __global__ void __launch_bounds__(128, 8) } s->page.max_data_size = actual_data_size; if (not comp_in.empty()) { - comp_in[blockIdx.x] = {base, actual_data_size}; - comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size, 0}; // size is unused + // V2 does not compress rep and def level data + size_t const skip_comp_size = s->page.def_lvl_bytes + s->page.rep_lvl_bytes; + comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size}; + comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + skip_comp_size, + 0}; // size is unused + // copy uncompressed bytes over + memcpy(s->page.compressed_data + s->page.max_hdr_size, base, skip_comp_size); } pages[blockIdx.x] = s->page; if (not comp_results.empty()) { @@ -1367,9 +1401,10 @@ __global__ void __launch_bounds__(decide_compression_block_size) for (auto page_id = lane_id; page_id < num_pages; page_id += cudf::detail::warp_size) { auto const& curr_page = ck_g[warp_id].pages[page_id]; auto const page_data_size = curr_page.max_data_size; + auto const lvl_bytes = curr_page.def_lvl_bytes + curr_page.rep_lvl_bytes; uncompressed_data_size += page_data_size; if (auto comp_res = curr_page.comp_res; comp_res != nullptr) { - compressed_data_size += comp_res->bytes_written; + compressed_data_size += comp_res->bytes_written + lvl_bytes; if (comp_res->status != compression_status::SUCCESS) { atomicOr(&compression_error[warp_id], 1); } @@ -1493,6 +1528,13 @@ class header_encoder { current_header_ptr = cpw_put_int64(current_header_ptr, static_cast(value)); } + inline __device__ void field_bool(int field, bool value) + { + current_header_ptr = cpw_put_fldh( + current_header_ptr, field, current_field_index, value ? ST_FLD_TRUE : ST_FLD_FALSE); + current_field_index = field; + } + template inline __device__ void field_int32(int field, T value) { @@ -1812,8 +1854,10 @@ __global__ void __launch_bounds__(128) } uncompressed_page_size = page_g.max_data_size; if (ck_g.is_compressed) { + auto const lvl_bytes = page_g.def_lvl_bytes + page_g.rep_lvl_bytes; hdr_start = page_g.compressed_data; - compressed_page_size = (uint32_t)comp_results[blockIdx.x].bytes_written; + compressed_page_size = + static_cast(comp_results[blockIdx.x].bytes_written) + lvl_bytes; page_g.max_data_size = compressed_page_size; } else { hdr_start = page_g.page_data; @@ -1853,6 +1897,23 @@ __global__ void __launch_bounds__(128) encoder.field_struct_end(5); } encoder.field_struct_end(5); + } else if (page_type == PageType::DATA_PAGE_V2) { + encoder.field_struct_begin(8); + encoder.field_int32(1, page_g.num_values); + encoder.field_int32(2, page_g.num_nulls); + encoder.field_int32(3, page_g.num_rows); + encoder.field_int32(4, encoding); + encoder.field_int32(5, page_g.def_lvl_bytes); + encoder.field_int32(6, page_g.rep_lvl_bytes); + encoder.field_bool(7, ck_g.is_compressed); // TODO can compress at page level now + // Optionally encode page-level statistics + if (not page_stats.empty()) { + encoder.field_struct_begin(8); + encoder.set_ptr( + EncodeStatistics(encoder.get_ptr(), &page_stats[blockIdx.x], col_g.stats_dtype, scratch)); + encoder.field_struct_end(8); + } + encoder.field_struct_end(8); } else { // DictionaryPageHeader encoder.field_struct_begin(7); @@ -2154,6 +2215,7 @@ void InitEncoderPages(device_2dspan chunks, size_t max_page_size_bytes, size_type max_page_size_rows, uint32_t page_align, + bool write_v2_headers, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, rmm::cuda_stream_view stream) @@ -2170,7 +2232,8 @@ void InitEncoderPages(device_2dspan chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_align); + page_align, + write_v2_headers); } void EncodePages(device_span pages, diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index a25c7fab712..a729f28d672 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -298,6 +298,20 @@ struct DataPageHeader { Encoding repetition_level_encoding = Encoding::PLAIN; // Encoding used for repetition levels }; +/** + * @brief Thrift-derived struct describing the header for a V2 data page + */ +struct DataPageHeaderV2 { + int32_t num_values = 0; // Number of values, including NULLs, in this data page. + int32_t num_nulls = 0; // Number of NULL values, in this data page. + int32_t num_rows = 0; // Number of rows in this data page. which means + // pages change on record boundaries (r = 0) + Encoding encoding = Encoding::PLAIN; // Encoding used for this data page + int32_t definition_levels_byte_length = 0; // length of the definition levels + int32_t repetition_levels_byte_length = 0; // length of the repetition levels + bool is_compressed = true; // whether the values are compressed. +}; + /** * @brief Thrift-derived struct describing the header for a dictionary page */ @@ -322,6 +336,7 @@ struct PageHeader { int32_t compressed_page_size = 0; // Compressed page size in bytes (not including the header) DataPageHeader data_page_header; DictionaryPageHeader dictionary_page_header; + DataPageHeaderV2 data_page_header_v2; }; /** diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 97c71de9a9b..51d2f952a33 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -422,7 +422,10 @@ struct EncPage { uint32_t num_leaf_values; //!< Values in page. Different from num_rows in case of nested types uint32_t num_values; //!< Number of def/rep level values in page. Includes null/empty elements in //!< non-leaf levels + uint32_t def_lvl_bytes; //!< Number of bytes of encoded definition level data (V2 only) + uint32_t rep_lvl_bytes; //!< Number of bytes of encoded repetition level data (V2 only) compression_result* comp_res; //!< Ptr to compression result + uint32_t num_nulls; //!< Number of null values (V2 only) (down here for alignment) }; /** @@ -648,6 +651,7 @@ void get_dictionary_indices(cudf::detail::device_2dspan * @param[in] num_columns Number of columns * @param[in] page_grstats Setup for page-level stats * @param[in] page_align Required alignment for uncompressed pages + * @param[in] write_v2_headers True if V2 page headers should be written * @param[in] chunk_grstats Setup for chunk-level stats * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages * @param[in] stream CUDA stream to use, default 0 @@ -661,6 +665,7 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, size_t max_page_size_bytes, size_type max_page_size_rows, uint32_t page_align, + bool write_v2_headers, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 17a0a903a47..06e7b6bfc8a 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -996,6 +996,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, uint32_t num_columns, size_t max_page_size_bytes, size_type max_page_size_rows, + bool write_v2_headers, Compression compression_codec, rmm::cuda_stream_view stream) { @@ -1012,6 +1013,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, max_page_size_bytes, max_page_size_rows, page_alignment(compression_codec), + write_v2_headers, nullptr, nullptr, stream); @@ -1036,6 +1038,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, max_page_size_bytes, max_page_size_rows, page_alignment(compression_codec), + write_v2_headers, nullptr, nullptr, stream); @@ -1061,6 +1064,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, max_page_size_bytes, max_page_size_rows, page_alignment(compression_codec), + write_v2_headers, nullptr, nullptr, stream); @@ -1197,6 +1201,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, * @param compression Compression format * @param max_page_size_bytes Maximum uncompressed page size, in bytes * @param max_page_size_rows Maximum page size, in rows + * @param write_v2_headers True if version 2 page headers are to be written * @param stream CUDA stream used for device memory operations and kernel launches */ void init_encoder_pages(hostdevice_2dvector& chunks, @@ -1211,6 +1216,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, Compression compression, size_t max_page_size_bytes, size_type max_page_size_rows, + bool write_v2_headers, rmm::cuda_stream_view stream) { rmm::device_uvector page_stats_mrg(num_stats_bfr, stream); @@ -1224,6 +1230,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, max_page_size_bytes, max_page_size_rows, page_alignment(compression), + write_v2_headers, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, stream); @@ -1424,6 +1431,7 @@ void fill_table_meta(std::unique_ptr const& table_meta) * @param max_dictionary_size Maximum dictionary size, in bytes * @param single_write_mode Flag to indicate that we are guaranteeing a single table write * @param int96_timestamps Flag to indicate if timestamps will be written as INT96 + * @param write_v2_headers True if V2 page headers are to be written * @param out_sink Sink for checking if device write is supported, should not be used to write any * data in this function * @param stream CUDA stream used for device memory operations and kernel launches @@ -1447,6 +1455,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, size_t max_dictionary_size, single_write_mode write_mode, bool int96_timestamps, + bool write_v2_headers, host_span const> out_sink, rmm::cuda_stream_view stream) { @@ -1764,8 +1773,14 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } // Build chunk dictionaries and count pages. Sends chunks to device. - cudf::detail::hostdevice_vector comp_page_sizes = init_page_sizes( - chunks, col_desc, num_columns, max_page_size_bytes, max_page_size_rows, compression, stream); + cudf::detail::hostdevice_vector comp_page_sizes = init_page_sizes(chunks, + col_desc, + num_columns, + max_page_size_bytes, + max_page_size_rows, + write_v2_headers, + compression, + stream); // Find which partition a rg belongs to std::vector rg_to_part; @@ -1878,6 +1893,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, compression, max_page_size_bytes, max_page_size_rows, + write_v2_headers, stream); } @@ -1982,6 +1998,7 @@ writer::impl::impl(std::vector> sinks, _max_dictionary_size(options.get_max_dictionary_size()), _max_page_fragment_size(options.get_max_page_fragment_size()), _int96_timestamps(options.is_enabled_int96_timestamps()), + _write_v2_headers(options.is_enabled_write_v2_headers()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), _single_write_mode(mode), @@ -2009,6 +2026,7 @@ writer::impl::impl(std::vector> sinks, _max_dictionary_size(options.get_max_dictionary_size()), _max_page_fragment_size(options.get_max_page_fragment_size()), _int96_timestamps(options.is_enabled_int96_timestamps()), + _write_v2_headers(options.is_enabled_write_v2_headers()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), _single_write_mode(mode), @@ -2085,6 +2103,7 @@ void writer::impl::write(table_view const& input, std::vector co _max_dictionary_size, _single_write_mode, _int96_timestamps, + _write_v2_headers, _out_sink, _stream); } catch (...) { // catch any exception type @@ -2199,7 +2218,7 @@ void writer::impl::write_parquet_data_to_sink( auto const& enc_page = h_pages[curr_page_idx++]; // skip dict pages - if (enc_page.page_type != PageType::DATA_PAGE) { continue; } + if (enc_page.page_type == PageType::DICTIONARY_PAGE) { continue; } int32_t this_page_size = enc_page.hdr_size + enc_page.max_data_size; // first_row_idx is relative to start of row group diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index a6c55e04b96..89ef85ba2bd 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -161,6 +161,7 @@ class writer::impl { size_t const _max_dictionary_size; std::optional const _max_page_fragment_size; bool const _int96_timestamps; + bool const _write_v2_headers; int32_t const _column_index_truncate_length; std::vector> const _kv_meta; // Optional user metadata. single_write_mode const _single_write_mode; // Special parameter only used by `write()` to diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index a86190239fe..8c7d598d33f 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -376,7 +376,7 @@ struct ParquetChunkedWriterNumericTypeTest : public ParquetChunkedWriterTest { TYPED_TEST_SUITE(ParquetChunkedWriterNumericTypeTest, SupportedTypes); // Base test fixture for size-parameterized tests -class ParquetSizedTest : public ::testing::TestWithParam {}; +class ParquetSizedTest : public ::cudf::test::BaseFixtureWithParam {}; // test the allowed bit widths for dictionary encoding // values chosen to trigger 1, 2, 3, 4, 5, 6, 8, 10, 12, 16, 20, and 24 bit dictionaries @@ -385,6 +385,13 @@ INSTANTIATE_TEST_SUITE_P(ParquetDictionaryTest, testing::Range(1, 25), testing::PrintToStringParamName()); +// Base test fixture for V2 header tests +class ParquetV2Test : public ::cudf::test::BaseFixtureWithParam {}; +INSTANTIATE_TEST_SUITE_P(ParquetV2ReadWriteTest, + ParquetV2Test, + testing::Bool(), + testing::PrintToStringParamName()); + namespace { // Generates a vector of uniform random values of type T template @@ -594,9 +601,10 @@ TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampOverflow) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } -TEST_F(ParquetWriterTest, MultiColumn) +TEST_P(ParquetV2Test, MultiColumn) { constexpr auto num_rows = 100000; + auto const is_v2 = GetParam(); // auto col0_data = random_values(num_rows); auto col1_data = random_values(num_rows); @@ -645,6 +653,7 @@ TEST_F(ParquetWriterTest, MultiColumn) auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(is_v2) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -656,9 +665,10 @@ TEST_F(ParquetWriterTest, MultiColumn) cudf::test::expect_metadata_equal(expected_metadata, result.metadata); } -TEST_F(ParquetWriterTest, MultiColumnWithNulls) +TEST_P(ParquetV2Test, MultiColumnWithNulls) { constexpr auto num_rows = 100; + auto const is_v2 = GetParam(); // auto col0_data = random_values(num_rows); auto col1_data = random_values(num_rows); @@ -715,6 +725,7 @@ TEST_F(ParquetWriterTest, MultiColumnWithNulls) auto filepath = temp_env->get_temp_filepath("MultiColumnWithNulls.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(is_v2) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -730,8 +741,10 @@ TEST_F(ParquetWriterTest, MultiColumnWithNulls) cudf::test::expect_metadata_equal(expected_metadata, result.metadata); } -TEST_F(ParquetWriterTest, Strings) +TEST_P(ParquetV2Test, Strings) { + auto const is_v2 = GetParam(); + std::vector strings{ "Monday", "Wȅdnȅsday", "Friday", "Monday", "Friday", "Friday", "Friday", "Funday"}; auto const num_rows = strings.size(); @@ -754,6 +767,7 @@ TEST_F(ParquetWriterTest, Strings) auto filepath = temp_env->get_temp_filepath("Strings.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(is_v2) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -823,13 +837,14 @@ TEST_F(ParquetWriterTest, StringsAsBinary) cudf::test::expect_metadata_equal(expected_metadata, result.metadata); } -TEST_F(ParquetWriterTest, SlicedTable) +TEST_P(ParquetV2Test, SlicedTable) { // This test checks for writing zero copy, offsetted views into existing cudf tables std::vector strings{ "Monday", "Wȅdnȅsday", "Friday", "Monday", "Friday", "Friday", "Friday", "Funday"}; auto const num_rows = strings.size(); + auto const is_v2 = GetParam(); auto seq_col0 = random_values(num_rows); auto seq_col2 = random_values(num_rows); @@ -926,6 +941,7 @@ TEST_F(ParquetWriterTest, SlicedTable) auto filepath = temp_env->get_temp_filepath("SlicedTable.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected_slice) + .write_v2_headers(is_v2) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -937,8 +953,10 @@ TEST_F(ParquetWriterTest, SlicedTable) cudf::test::expect_metadata_equal(expected_metadata, result.metadata); } -TEST_F(ParquetWriterTest, ListColumn) +TEST_P(ParquetV2Test, ListColumn) { + auto const is_v2 = GetParam(); + auto valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 2; }); auto valids2 = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i != 3; }); @@ -1023,6 +1041,7 @@ TEST_F(ParquetWriterTest, ListColumn) auto filepath = temp_env->get_temp_filepath("ListColumn.parquet"); auto out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(is_v2) .metadata(expected_metadata) .compression(cudf::io::compression_type::NONE); @@ -1198,8 +1217,10 @@ TEST_F(ParquetWriterTest, Struct) cudf::io::read_parquet(read_args); } -TEST_F(ParquetWriterTest, StructOfList) +TEST_P(ParquetV2Test, StructOfList) { + auto const is_v2 = GetParam(); + // Structget_temp_filepath("StructOfList.parquet"); cudf::io::parquet_writer_options args = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(is_v2) .metadata(expected_metadata); cudf::io::write_parquet(args); @@ -1273,8 +1295,10 @@ TEST_F(ParquetWriterTest, StructOfList) cudf::test::expect_metadata_equal(expected_metadata, result.metadata); } -TEST_F(ParquetWriterTest, ListOfStruct) +TEST_P(ParquetV2Test, ListOfStruct) { + auto const is_v2 = GetParam(); + // Listget_temp_filepath("ListOfStruct.parquet"); cudf::io::parquet_writer_options args = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(is_v2) .metadata(expected_metadata); cudf::io::write_parquet(args); @@ -1457,8 +1482,10 @@ TEST_F(ParquetWriterTest, PartitionedWrite) CUDF_TEST_EXPECT_TABLES_EQUAL(expected2, result2.tbl->view()); } -TEST_F(ParquetWriterTest, PartitionedWriteEmptyPartitions) +TEST_P(ParquetV2Test, PartitionedWriteEmptyPartitions) { + auto const is_v2 = GetParam(); + auto source = create_random_fixed_table(4, 4, false); auto filepath1 = temp_env->get_temp_filepath("PartitionedWrite1.parquet"); @@ -1476,6 +1503,7 @@ TEST_F(ParquetWriterTest, PartitionedWriteEmptyPartitions) cudf::io::parquet_writer_options::builder( cudf::io::sink_info(std::vector{filepath1, filepath2}), *source) .partitions({partition1, partition2}) + .write_v2_headers(is_v2) .compression(cudf::io::compression_type::NONE); cudf::io::write_parquet(args); @@ -1488,8 +1516,10 @@ TEST_F(ParquetWriterTest, PartitionedWriteEmptyPartitions) CUDF_TEST_EXPECT_TABLES_EQUAL(expected2, result2.tbl->view()); } -TEST_F(ParquetWriterTest, PartitionedWriteEmptyColumns) +TEST_P(ParquetV2Test, PartitionedWriteEmptyColumns) { + auto const is_v2 = GetParam(); + auto source = create_random_fixed_table(0, 4, false); auto filepath1 = temp_env->get_temp_filepath("PartitionedWrite1.parquet"); @@ -1507,6 +1537,7 @@ TEST_F(ParquetWriterTest, PartitionedWriteEmptyColumns) cudf::io::parquet_writer_options::builder( cudf::io::sink_info(std::vector{filepath1, filepath2}), *source) .partitions({partition1, partition2}) + .write_v2_headers(is_v2) .compression(cudf::io::compression_type::NONE); cudf::io::write_parquet(args); @@ -4082,7 +4113,7 @@ int32_t compare_binary(std::vector const& v1, return 0; } -TEST_F(ParquetWriterTest, LargeColumnIndex) +TEST_P(ParquetV2Test, LargeColumnIndex) { // create a file large enough to be written in 2 batches (currently 1GB per batch) // pick fragment size that num_rows is divisible by, so we'll get equal sized row groups @@ -4090,6 +4121,7 @@ TEST_F(ParquetWriterTest, LargeColumnIndex) const std::string s2(1000, 'b'); constexpr auto num_rows = 512 * 1024; constexpr auto frag_size = num_rows / 128; + auto const is_v2 = GetParam(); auto col0_elements = cudf::detail::make_counting_transform_iterator( 0, [&](auto i) { return (i < num_rows) ? s1 : s2; }); @@ -4103,6 +4135,7 @@ TEST_F(ParquetWriterTest, LargeColumnIndex) .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) .compression(cudf::io::compression_type::NONE) .dictionary_policy(cudf::io::dictionary_policy::NEVER) + .write_v2_headers(is_v2) .max_page_fragment_size(frag_size) .row_group_size_bytes(1024 * 1024 * 1024) .row_group_size_rows(num_rows); @@ -4129,9 +4162,12 @@ TEST_F(ParquetWriterTest, LargeColumnIndex) } } -TEST_F(ParquetWriterTest, CheckColumnOffsetIndex) +TEST_P(ParquetV2Test, CheckColumnOffsetIndex) { constexpr auto num_rows = 100000; + auto const is_v2 = GetParam(); + auto const expected_hdr_type = + is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE; // fixed length strings auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { @@ -4169,6 +4205,7 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndex) const cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .write_v2_headers(is_v2) .max_page_size_rows(20000); cudf::io::write_parquet(out_opts); @@ -4190,9 +4227,9 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndex) for (size_t o = 0; o < oi.page_locations.size(); o++) { auto const& page_loc = oi.page_locations[o]; auto const ph = read_page_header(source, page_loc); - EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); + EXPECT_EQ(ph.type, expected_hdr_type); EXPECT_EQ(page_loc.first_row_index, num_vals); - num_vals += ph.data_page_header.num_values; + num_vals += is_v2 ? ph.data_page_header_v2.num_rows : ph.data_page_header.num_values; } // loop over page stats from the column index. check that stats.min <= page.min @@ -4216,9 +4253,12 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndex) } } -TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNulls) +TEST_P(ParquetV2Test, CheckColumnOffsetIndexNulls) { constexpr auto num_rows = 100000; + auto const is_v2 = GetParam(); + auto const expected_hdr_type = + is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE; // fixed length strings auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { @@ -4266,6 +4306,7 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNulls) const cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .write_v2_headers(is_v2) .max_page_size_rows(20000); cudf::io::write_parquet(out_opts); @@ -4287,9 +4328,9 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNulls) for (size_t o = 0; o < oi.page_locations.size(); o++) { auto const& page_loc = oi.page_locations[o]; auto const ph = read_page_header(source, page_loc); - EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); + EXPECT_EQ(ph.type, expected_hdr_type); EXPECT_EQ(page_loc.first_row_index, num_vals); - num_vals += ph.data_page_header.num_values; + num_vals += is_v2 ? ph.data_page_header_v2.num_rows : ph.data_page_header.num_values; } // loop over page stats from the column index. check that stats.min <= page.min @@ -4319,9 +4360,12 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNulls) } } -TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNullColumn) +TEST_P(ParquetV2Test, CheckColumnOffsetIndexNullColumn) { constexpr auto num_rows = 100000; + auto const is_v2 = GetParam(); + auto const expected_hdr_type = + is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE; // fixed length strings auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { @@ -4354,6 +4398,7 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNullColumn) const cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .write_v2_headers(is_v2) .max_page_size_rows(20000); cudf::io::write_parquet(out_opts); @@ -4375,9 +4420,9 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNullColumn) for (size_t o = 0; o < oi.page_locations.size(); o++) { auto const& page_loc = oi.page_locations[o]; auto const ph = read_page_header(source, page_loc); - EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); + EXPECT_EQ(ph.type, expected_hdr_type); EXPECT_EQ(page_loc.first_row_index, num_vals); - num_vals += ph.data_page_header.num_values; + num_vals += is_v2 ? ph.data_page_header_v2.num_rows : ph.data_page_header.num_values; } // loop over page stats from the column index. check that stats.min <= page.min @@ -4411,8 +4456,12 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNullColumn) } } -TEST_F(ParquetWriterTest, CheckColumnOffsetIndexStruct) +TEST_P(ParquetV2Test, CheckColumnOffsetIndexStruct) { + auto const is_v2 = GetParam(); + auto const expected_hdr_type = + is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE; + auto c0 = testdata::ascending(); auto sc0 = testdata::ascending(); @@ -4441,6 +4490,7 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexStruct) const cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .write_v2_headers(is_v2) .max_page_size_rows(page_size_for_ordered_tests); cudf::io::write_parquet(out_opts); @@ -4466,10 +4516,11 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexStruct) for (size_t o = 0; o < oi.page_locations.size(); o++) { auto const& page_loc = oi.page_locations[o]; auto const ph = read_page_header(source, page_loc); - EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); + EXPECT_EQ(ph.type, expected_hdr_type); + EXPECT_EQ(page_loc.first_row_index, num_vals); // last column has 2 values per row - EXPECT_EQ(page_loc.first_row_index * (c == rg.columns.size() - 1 ? 2 : 1), num_vals); - num_vals += ph.data_page_header.num_values; + num_vals += is_v2 ? ph.data_page_header_v2.num_rows + : ph.data_page_header.num_values / (c == rg.columns.size() - 1 ? 2 : 1); } // loop over page stats from the column index. check that stats.min <= page.min @@ -4489,8 +4540,86 @@ TEST_F(ParquetWriterTest, CheckColumnOffsetIndexStruct) } } -TEST_F(ParquetWriterTest, CheckColumnIndexListWithNulls) +TEST_P(ParquetV2Test, CheckColumnOffsetIndexStructNulls) +{ + auto const is_v2 = GetParam(); + auto const expected_hdr_type = + is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE; + + auto validity2 = + cudf::detail::make_counting_transform_iterator(0, [](cudf::size_type i) { return i % 2; }); + auto validity3 = cudf::detail::make_counting_transform_iterator( + 0, [](cudf::size_type i) { return (i % 3) != 0; }); + auto validity4 = cudf::detail::make_counting_transform_iterator( + 0, [](cudf::size_type i) { return (i % 4) != 0; }); + auto validity5 = cudf::detail::make_counting_transform_iterator( + 0, [](cudf::size_type i) { return (i % 5) != 0; }); + + auto c0 = testdata::ascending(); + + auto col1_data = random_values(num_ordered_rows); + auto col2_data = random_values(num_ordered_rows); + auto col3_data = random_values(num_ordered_rows); + + // col1 is all nulls + auto col1 = + cudf::test::fixed_width_column_wrapper(col1_data.begin(), col1_data.end(), validity2); + auto col2 = + cudf::test::fixed_width_column_wrapper(col2_data.begin(), col2_data.end(), validity3); + auto col3 = + cudf::test::fixed_width_column_wrapper(col2_data.begin(), col2_data.end(), validity4); + + std::vector> struct_children; + struct_children.push_back(col1.release()); + struct_children.push_back(col2.release()); + struct_children.push_back(col3.release()); + auto struct_validity = std::vector(validity5, validity5 + num_ordered_rows); + cudf::test::structs_column_wrapper c1(std::move(struct_children), struct_validity); + table_view expected({c0, c1}); + + auto const filepath = temp_env->get_temp_filepath("CheckColumnOffsetIndexStructNulls.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .write_v2_headers(is_v2) + .max_page_size_rows(page_size_for_ordered_tests); + cudf::io::write_parquet(out_opts); + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + + for (size_t r = 0; r < fmd.row_groups.size(); r++) { + auto const& rg = fmd.row_groups[r]; + for (size_t c = 0; c < rg.columns.size(); c++) { + auto const& chunk = rg.columns[c]; + + // loop over offsets, read each page header, make sure it's a data page and that + // the first row index is correct + auto const oi = read_offset_index(source, chunk); + auto const ci = read_column_index(source, chunk); + + int64_t num_vals = 0; + for (size_t o = 0; o < oi.page_locations.size(); o++) { + auto const& page_loc = oi.page_locations[o]; + auto const ph = read_page_header(source, page_loc); + EXPECT_EQ(ph.type, expected_hdr_type); + EXPECT_EQ(page_loc.first_row_index, num_vals); + num_vals += is_v2 ? ph.data_page_header_v2.num_rows : ph.data_page_header.num_values; + // check that null counts match + if (is_v2) { EXPECT_EQ(ci.null_counts[o], ph.data_page_header_v2.num_nulls); } + } + } + } +} + +TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) { + auto const is_v2 = GetParam(); + auto const expected_hdr_type = + is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE; + using cudf::test::iterators::null_at; using cudf::test::iterators::nulls_at; using lcw = cudf::test::lists_column_wrapper; @@ -4576,6 +4705,7 @@ TEST_F(ParquetWriterTest, CheckColumnIndexListWithNulls) auto const filepath = temp_env->get_temp_filepath("ColumnIndexListWithNulls.parquet"); auto out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .write_v2_headers(is_v2) .compression(cudf::io::compression_type::NONE); cudf::io::write_parquet(out_opts); @@ -4594,14 +4724,12 @@ TEST_F(ParquetWriterTest, CheckColumnIndexListWithNulls) // the first row index is correct auto const oi = read_offset_index(source, chunk); - int64_t num_vals = 0; for (size_t o = 0; o < oi.page_locations.size(); o++) { auto const& page_loc = oi.page_locations[o]; auto const ph = read_page_header(source, page_loc); - EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); - // last column has 2 values per row - EXPECT_EQ(page_loc.first_row_index * (c == rg.columns.size() - 1 ? 2 : 1), num_vals); - num_vals += ph.data_page_header.num_values; + EXPECT_EQ(ph.type, expected_hdr_type); + // check null counts in V2 header + if (is_v2) { EXPECT_EQ(ph.data_page_header_v2.num_nulls, expected_null_counts[c]); } } // check null counts in column chunk stats and page indexes