Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally write version 2 page headers in Parquet writer #13751

Merged
merged 36 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0d24ec9
first pass with hardcoded v2, no compression
etseidl Jul 24, 2023
fe6d34d
compression seems to work now...disable some tests
etseidl Jul 25, 2023
dc56e24
add write_v2_headers to options...still need to fix tests
etseidl Jul 25, 2023
cd28d08
add v2 page header
etseidl Jul 25, 2023
268b39f
start adding v2 tests
etseidl Jul 25, 2023
8916b71
add v2 to options (forgot header)
etseidl Jul 25, 2023
e1b4632
fix v2 header reader and add more tests
etseidl Jul 25, 2023
792077e
move null counting to value encoding
etseidl Jul 25, 2023
f4586d5
add test of struct with nulls
etseidl Jul 25, 2023
4ac47b6
a few cleanups
etseidl Jul 25, 2023
a027d69
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Jul 25, 2023
9d67c68
a few more cleanups
etseidl Jul 25, 2023
cadcfe6
formatting
etseidl Jul 25, 2023
e638f2d
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Jul 26, 2023
dc172fc
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Jul 26, 2023
e618a44
simplify num_valid check
etseidl Jul 26, 2023
b8a119c
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Jul 26, 2023
92095d7
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Jul 28, 2023
7ca5524
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Jul 28, 2023
64cab8d
Merge branch 'rapidsai:branch-23.10' into feature/write_v2_header
etseidl Jul 31, 2023
a8e7da7
Merge branch 'rapidsai:branch-23.10' into feature/write_v2_header
etseidl Aug 1, 2023
345da7c
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Aug 2, 2023
118ff3e
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Aug 3, 2023
0341856
Merge branch 'rapidsai:branch-23.10' into feature/write_v2_header
etseidl Aug 4, 2023
7b84849
move builder function implementations to functions.cpp
etseidl Aug 4, 2023
98060ed
Merge branch 'feature/write_v2_header' of github.com:etseidl/cudf int…
etseidl Aug 4, 2023
4a5fe3f
Merge branch 'rapidsai:branch-23.10' into feature/write_v2_header
etseidl Aug 4, 2023
28bce4d
Merge branch 'branch-23.10' into feature/write_v2_header
vuule Aug 7, 2023
2929b21
Merge branch 'rapidsai:branch-23.10' into feature/write_v2_header
etseidl Aug 9, 2023
133ed61
Merge branch 'rapidsai:branch-23.10' into feature/write_v2_header
etseidl Aug 9, 2023
ca86cb9
rename offset per review suggestion
etseidl Aug 11, 2023
49653f9
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Aug 11, 2023
d19f20f
implement suggestion from review
etseidl Aug 11, 2023
88bfcc8
Merge branch 'rapidsai:branch-23.10' into feature/write_v2_header
etseidl Aug 14, 2023
ba38a9f
Merge branch 'branch-23.10' into feature/write_v2_header
etseidl Aug 14, 2023
a3159d4
Merge branch 'rapidsai:branch-23.10' into feature/write_v2_header
etseidl Aug 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ class parquet_writer_options {
std::optional<size_type> _max_page_fragment_size;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;
// write V2 page headers?
bool _v2_page_headers = false;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -712,6 +714,13 @@ class parquet_writer_options {
return _compression_stats;
}

/**
* @brief Returns `true` if V2 page headers should be written.
*
* @return `true` if V2 page headers should be written.
*/
[[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; }

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -829,6 +838,13 @@ class parquet_writer_options {
{
_compression_stats = std::move(comp_stats);
}

/**
* @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`.
*
* @param val Boolean value to enable/disable writing of V2 page headers.
*/
void enable_write_v2_headers(bool val) { _v2_page_headers = val; }
};

/**
Expand Down Expand Up @@ -1060,6 +1076,14 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Set to true if V2 page headers are to be written.
*
* @param enabled Boolean value to enable/disable writing of V2 page headers.
* @return this for chaining
*/
parquet_writer_options_builder& write_v2_headers(bool enabled);

/**
* @brief move parquet_writer_options member once it's built.
*/
Expand Down Expand Up @@ -1141,6 +1165,8 @@ class chunked_parquet_writer_options {
std::optional<size_type> _max_page_fragment_size;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;
// write V2 page headers?
bool _v2_page_headers = false;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -1281,6 +1307,13 @@ class chunked_parquet_writer_options {
return _compression_stats;
}

/**
* @brief Returns `true` if V2 page headers should be written.
*
* @return `true` if V2 page headers should be written.
*/
[[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1384,6 +1417,13 @@ class chunked_parquet_writer_options {
_compression_stats = std::move(comp_stats);
}

/**
* @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`.
*
* @param val Boolean value to enable/disable writing of V2 page headers.
*/
void enable_write_v2_headers(bool val) { _v2_page_headers = val; }

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -1475,6 +1515,14 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Set to true if V2 page headers are to be written.
*
* @param enabled Boolean value to enable/disable writing of V2 page headers.
* @return this for chaining
*/
chunked_parquet_writer_options_builder& write_v2_headers(bool enabled);

/**
* @brief Sets the maximum row group size, in bytes.
*
Expand Down
21 changes: 21 additions & 0 deletions cpp/include/cudf_test/base_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ class BaseFixture : public ::testing::Test {
rmm::mr::device_memory_resource* mr() { return _mr; }
};

/**
* @brief Base test fixture that takes a parameter.
*
* Example:
* ```
* class MyIntTestFixture : public cudf::test::BaseFixtureWithParam<int> {};
* ```
*/
template <typename T>
class BaseFixtureWithParam : public ::testing::TestWithParam<T> {
rmm::mr::device_memory_resource* _mr{rmm::mr::get_current_device_resource()};

public:
/**
* @brief Returns pointer to `device_memory_resource` that should be used for
* all tests inheriting from this fixture
* @return pointer to memory resource
*/
rmm::mr::device_memory_resource* mr() { return _mr; }
etseidl marked this conversation as resolved.
Show resolved Hide resolved
};

template <typename T, typename Enable = void>
struct uniform_distribution_impl {};
template <typename T>
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,12 @@ parquet_writer_options_builder& parquet_writer_options_builder::max_page_fragmen
return *this;
}

parquet_writer_options_builder& parquet_writer_options_builder::write_v2_headers(bool enabled)
{
options.enable_write_v2_headers(enabled);
return *this;
}

void chunked_parquet_writer_options::set_key_value_metadata(
std::vector<std::map<std::string, std::string>> metadata)
{
Expand Down Expand Up @@ -831,6 +837,13 @@ chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::
return *this;
}

chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::write_v2_headers(
bool enabled)
{
options.enable_write_v2_headers(enabled);
return *this;
}

chunked_parquet_writer_options_builder&
chunked_parquet_writer_options_builder::max_page_fragment_size(size_type val)
{
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ bool CompactProtocolReader::read(PageHeader* p)
ParquetFieldInt32(2, p->uncompressed_page_size),
ParquetFieldInt32(3, p->compressed_page_size),
ParquetFieldStruct(5, p->data_page_header),
ParquetFieldStruct(7, p->dictionary_page_header));
ParquetFieldStruct(7, p->dictionary_page_header),
ParquetFieldStruct(8, p->data_page_header_v2));
Comment on lines 256 to +259
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is number 6?

Copy link
Contributor Author

@etseidl etseidl Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 would be the un-implemented IndexPageHeader. It's a placeholder type in the Parquet spec.

And before you ask, 4 is an optional CRC that no one adds either. 😉

return function_builder(this, op);
}

Expand All @@ -275,6 +276,18 @@ bool CompactProtocolReader::read(DictionaryPageHeader* d)
return function_builder(this, op);
}

bool CompactProtocolReader::read(DataPageHeaderV2* d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this wasn't needed in #11778?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the header decoder is device code, so there's a duplicate thrift decoder buried in page_hdr.cu We've got little bits of thrift decoding sprinkled all over the place :(

{
auto op = std::make_tuple(ParquetFieldInt32(1, d->num_values),
ParquetFieldInt32(2, d->num_nulls),
ParquetFieldInt32(3, d->num_rows),
ParquetFieldEnum<Encoding>(4, d->encoding),
ParquetFieldInt32(5, d->definition_levels_byte_length),
ParquetFieldInt32(6, d->repetition_levels_byte_length),
ParquetFieldBool(7, d->is_compressed));
return function_builder(this, op);
}

bool CompactProtocolReader::read(KeyValue* k)
{
auto op = std::make_tuple(ParquetFieldString(1, k->key), ParquetFieldString(2, k->value));
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class CompactProtocolReader {
bool read(PageHeader* p);
bool read(DataPageHeader* d);
bool read(DictionaryPageHeader* d);
bool read(DataPageHeaderV2* d);
bool read(KeyValue* k);
bool read(PageLocation* p);
bool read(OffsetIndex* o);
Expand Down
Loading