Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add python tests for Parquet DELTA_BINARY_PACKED encoder #14316

Merged
merged 23 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
98b0f79
v2
vuule Oct 20, 2023
a8427f7
dictionary policy
vuule Oct 20, 2023
09983f0
get delta writing to work
etseidl Oct 20, 2023
41f827f
almost works
etseidl Oct 20, 2023
772a275
forgot to pass another param
etseidl Oct 23, 2023
0379b4d
fix undercount on page sizes for delta binary
etseidl Oct 23, 2023
b6a97e8
fix up delta test
etseidl Oct 23, 2023
5d5268e
fix comment
etseidl Oct 23, 2023
7a2ee66
Merge branch 'branch-23.12' into delta_encode_python
vuule Oct 23, 2023
13330e7
Merge branch 'rapidsai:branch-23.12' into delta_encode_python
etseidl Oct 24, 2023
6592442
Merge branch 'branch-23.12' into delta_encode_python
etseidl Oct 25, 2023
f1d3c88
Merge branch 'branch-23.12' into delta_encode_python
vuule Oct 26, 2023
eeea1b9
address review comments
etseidl Oct 26, 2023
e7b3694
Merge branch 'branch-23.12' into delta_encode_python
etseidl Oct 26, 2023
11e6c5e
fix typo
etseidl Oct 26, 2023
eeac17a
Merge branch 'branch-23.12' into delta_encode_python
etseidl Oct 27, 2023
a49f606
Merge branch 'branch-23.12' into delta_encode_python
etseidl Oct 30, 2023
0776811
Merge remote-tracking branch 'origin/branch-23.12' into delta_encode_…
etseidl Nov 1, 2023
3965125
Merge branch 'branch-23.12' into delta_encode_python
etseidl Nov 7, 2023
5f85be0
implement suggestion from review
etseidl Nov 7, 2023
7a770d6
add documentation for new arguments to to_parquet()
etseidl Nov 7, 2023
1e0cc58
Merge branch 'rapidsai:branch-23.12' into delta_encode_python
etseidl Nov 7, 2023
6dabbcb
Merge branch 'branch-23.12' into delta_encode_python
vuule Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ class writer_compression_statistics {
* @brief Control use of dictionary encoding for parquet writer
*/
enum dictionary_policy {
NEVER, ///< Never use dictionary encoding
ADAPTIVE, ///< Use dictionary when it will not impact compression
ALWAYS ///< Use dictionary reqardless of impact on compression
NEVER = 0, ///< Never use dictionary encoding
ADAPTIVE = 1, ///< Use dictionary when it will not impact compression
ALWAYS = 2 ///< Use dictionary regardless of impact on compression
};

/**
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/delta_enc.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ inline __device__ void put_zz128(uint8_t*& p, zigzag128_t v)
// too much shared memory.
// The parquet spec requires block_size to be a multiple of 128, and values_per_mini_block
// to be a multiple of 32.
// TODO: if these are ever made configurable, be sure to fix the page size calculation in
// delta_data_len() (page_enc.cu).
constexpr int block_size = 128;
constexpr int num_mini_blocks = 4;
constexpr int values_per_mini_block = block_size / num_mini_blocks;
Expand Down
14 changes: 11 additions & 3 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,20 @@ __device__ size_t delta_data_len(Type physical_type, cudf::type_id type_id, uint

auto const vals_per_block = delta::block_size;
size_t const num_blocks = util::div_rounding_up_unsafe(num_values, vals_per_block);
// need max dtype_len + 1 bytes for min_delta
// need max dtype_len + 1 bytes for min_delta (because we only encode 7 bits per byte)
// one byte per mini block for the bitwidth
// and block_size * dtype_len bytes for the actual encoded data
auto const block_size = dtype_len + 1 + delta::num_mini_blocks + vals_per_block * dtype_len;
auto const mini_block_header_size = dtype_len + 1 + delta::num_mini_blocks;
// each encoded value can be at most sizeof(type) * 8 + 1 bits
auto const max_bits = dtype_len * 8 + 1;
// each data block will then be max_bits * values per block. vals_per_block is guaranteed to be
// divisible by 128 (via static assert on delta::block_size), but do safe division anyway.
auto const bytes_per_block = cudf::util::div_rounding_up_unsafe(max_bits * vals_per_block, 8);
auto const block_size = mini_block_header_size + bytes_per_block;

// delta header is 2 bytes for the block_size, 1 byte for number of mini-blocks,
// max 5 bytes for number of values, and max dtype_len + 1 for first value.
// TODO: if we ever allow configurable block sizes then this calculation will need to be
// modified.
auto const header_size = 2 + 1 + 5 + dtype_len + 1;

return header_size + num_blocks * block_size;
Expand Down Expand Up @@ -1279,6 +1286,7 @@ __device__ void finish_page_encode(state_buf* s,
uint8_t const* const base = s->page.page_data + s->page.max_hdr_size;
auto const actual_data_size = static_cast<uint32_t>(end_ptr - base);
if (actual_data_size > s->page.max_data_size) {
// FIXME(ets): this needs to do error propagation back to the host
CUDF_UNREACHABLE("detected possible page data corruption");
}
s->page.max_data_size = actual_data_size;
Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
void set_max_page_size_rows(size_type val) except +
void enable_write_v2_headers(bool val) except +
void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except +

@staticmethod
parquet_writer_options_builder builder(
Expand Down Expand Up @@ -141,6 +143,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_writer_options_builder& max_page_size_rows(
size_type val
) except +
parquet_writer_options_builder& write_v2_headers(
bool val
) except +
parquet_writer_options_builder& dictionary_policy(
cudf_io_types.dictionary_policy val
) except +

parquet_writer_options build() except +

Expand Down Expand Up @@ -176,6 +184,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
void set_max_page_size_rows(size_type val) except +
void enable_write_v2_headers(bool val) except +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize it isn't the fault of this current PR, but one does wish enable_write_v2_headers were named set_write_v2_headers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use enable_ for bool options, so this should be consistent (for better or for worse, apparently).

void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except +

@staticmethod
chunked_parquet_writer_options_builder builder(
Expand Down Expand Up @@ -211,6 +221,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
chunked_parquet_writer_options_builder& max_page_size_rows(
size_type val
) except +
parquet_writer_options_builder& write_v2_headers(
bool val
) except +
parquet_writer_options_builder& dictionary_policy(
cudf_io_types.dictionary_policy val
) except +

chunked_parquet_writer_options build() except +

Expand Down
5 changes: 5 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/types.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ cdef extern from "cudf/io/types.hpp" \
STATISTICS_PAGE = 2,
STATISTICS_COLUMN = 3,

ctypedef enum dictionary_policy:
NEVER = 0,
ADAPTIVE = 1,
ALWAYS = 2,

cdef cppclass column_name_info:
string name
vector[column_name_info] children
Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ def write_parquet(
object max_page_size_rows=None,
object partitions_info=None,
object force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
):
"""
Cython function to call into libcudf API, see `write_parquet`.
Expand Down Expand Up @@ -383,6 +385,18 @@ def write_parquet(
tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata)
user_data.push_back(tmp_user_data)

if header_version not in ("1.0", "2.0"):
raise ValueError(
f"Invalid parquet header version: {header_version}. "
"Valid values are '1.0' and '2.0'"
)

dict_policy = (
cudf_io_types.dictionary_policy.ALWAYS
if use_dictionary
else cudf_io_types.dictionary_policy.NEVER
)

cdef cudf_io_types.compression_type comp_type = _get_comp_type(compression)
cdef cudf_io_types.statistics_freq stat_freq = _get_stat_freq(statistics)

Expand All @@ -399,6 +413,8 @@ def write_parquet(
.compression(comp_type)
.stats_level(stat_freq)
.int96_timestamps(_int96_timestamps)
.write_v2_headers(header_version == "2.0")
.dictionary_policy(dict_policy)
.build()
)
if partitions_info is not None:
Expand Down
4 changes: 4 additions & 0 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6370,6 +6370,8 @@ def to_parquet(
max_page_size_rows=None,
storage_options=None,
return_metadata=False,
use_dictionary=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document these two parameters here:

_docstring_to_parquet = """

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @galipremsagar. I would have never thought to look there for the docstring 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First time hearing about it as well 🤷‍♂️ (just don't git blame ioutils.py :P )

header_version="1.0",
*args,
**kwargs,
):
Expand All @@ -6394,6 +6396,8 @@ def to_parquet(
max_page_size_rows=max_page_size_rows,
storage_options=storage_options,
return_metadata=return_metadata,
use_dictionary=use_dictionary,
header_version=header_version,
*args,
**kwargs,
)
Expand Down
13 changes: 9 additions & 4 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def _write_parquet(
partitions_info=None,
storage_options=None,
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
):
if is_list_like(paths) and len(paths) > 1:
if partitions_info is None:
Expand Down Expand Up @@ -96,6 +98,8 @@ def _write_parquet(
"max_page_size_rows": max_page_size_rows,
"partitions_info": partitions_info,
"force_nullable_schema": force_nullable_schema,
"header_version": header_version,
"use_dictionary": use_dictionary,
}
if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs):
with ExitStack() as stack:
Expand Down Expand Up @@ -204,7 +208,6 @@ def write_to_dataset(
fs.mkdirs(root_path, exist_ok=True)

if partition_cols is not None and len(partition_cols) > 0:

(
full_paths,
metadata_file_paths,
Expand Down Expand Up @@ -709,7 +712,6 @@ def _parquet_to_frame(
dataset_kwargs=None,
**kwargs,
):

# If this is not a partitioned read, only need
# one call to `_read_parquet`
if not partition_keys:
Expand Down Expand Up @@ -753,7 +755,7 @@ def _parquet_to_frame(
)
)
# Add partition columns to the last DataFrame
for (name, value) in part_key:
for name, value in part_key:
_len = len(dfs[-1])
if partition_categories and name in partition_categories:
# Build the categorical column from `codes`
Expand Down Expand Up @@ -858,6 +860,8 @@ def to_parquet(
storage_options=None,
return_metadata=False,
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
*args,
**kwargs,
):
Expand Down Expand Up @@ -932,6 +936,8 @@ def to_parquet(
partitions_info=partition_info,
storage_options=storage_options,
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
)

else:
Expand Down Expand Up @@ -1034,7 +1040,6 @@ def _get_groups_and_offsets(
preserve_index=False,
**kwargs,
):

if not (set(df._data) - set(partition_cols)):
warnings.warn("No data left to save outside partition columns")

Expand Down
52 changes: 34 additions & 18 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,32 +1285,29 @@ def test_parquet_reader_v2(tmpdir, simple_pdf):
simple_pdf.to_parquet(pdf_fname, data_page_version="2.0")
assert_eq(cudf.read_parquet(pdf_fname), simple_pdf)

cudf.from_pandas(simple_pdf).to_parquet(pdf_fname, header_version="2.0")
assert_eq(cudf.read_parquet(pdf_fname), simple_pdf)


@pytest.mark.parametrize("nrows", [1, 100000])
@pytest.mark.parametrize("add_nulls", [True, False])
def test_delta_binary(nrows, add_nulls, tmpdir):
@pytest.mark.parametrize(
"dtype",
[
"int8",
"int16",
"int32",
"int64",
],
)
def test_delta_binary(nrows, add_nulls, dtype, tmpdir):
null_frequency = 0.25 if add_nulls else 0

# Create a pandas dataframe with random data of mixed types
arrow_table = dg.rand_dataframe(
dtypes_meta=[
{
"dtype": "int8",
"null_frequency": null_frequency,
"cardinality": nrows,
},
{
"dtype": "int16",
"null_frequency": null_frequency,
"cardinality": nrows,
},
{
"dtype": "int32",
"null_frequency": null_frequency,
"cardinality": nrows,
},
{
"dtype": "int64",
"dtype": dtype,
"null_frequency": null_frequency,
"cardinality": nrows,
},
Expand All @@ -1335,6 +1332,26 @@ def test_delta_binary(nrows, add_nulls, tmpdir):
pcdf = cudf.from_pandas(test_pdf)
assert_eq(cdf, pcdf)

# Write back out with cudf and make sure pyarrow can read it
cudf_fname = tmpdir.join("cudfv2.parquet")
pcdf.to_parquet(
cudf_fname,
compression=None,
header_version="2.0",
use_dictionary=False,
)

# FIXME(ets): should probably not use more bits than the data type
try:
cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname))
except OSError:
if dtype == "int32" and nrows == 100000:
pytest.mark.xfail(
reason="arrow does not support 33-bit delta encoding"
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
)
else:
etseidl marked this conversation as resolved.
Show resolved Hide resolved
assert_eq(cdf2, cdf)


@pytest.mark.parametrize(
"data",
Expand Down Expand Up @@ -1469,7 +1486,6 @@ def test_parquet_writer_int96_timestamps(tmpdir, pdf, gdf):


def test_multifile_parquet_folder(tmpdir):

test_pdf1 = make_pdf(nrows=10, nvalids=10 // 2)
test_pdf2 = make_pdf(nrows=20)
expect = pd.concat([test_pdf1, test_pdf2])
Expand Down