Skip to content

Commit

Permalink
apacheGH-37242: [Python][Parquet] Parquet Support write and validate …
Browse files Browse the repository at this point in the history
…Page CRC (apache#38360)

### Rationale for this change

The C++ Parquet API already supports enabling CRC checksum for read and write operations.

CRC checksum are optional and can detect data corruption due to, for example, file storage issues or [cosmic rays](https://en.wikipedia.org/wiki/Soft_error).

It would then be beneficial to expose this optional functionality to the Python API too.

This PR is based on a previous PR which became stale: apache#37439

### What changes are included in this PR?

The PyArrow interface is expanded to include a `page_checksum_enabled` flag.

### Are these changes tested?

[ ] NOT YET!

### Are there any user-facing changes?

The change is backward compatible. An additional, optional keyword argument is added to some interfaces.

Closes apache#37242
Supersedes apache#37439
* Closes: apache#37242

Lead-authored-by: Francesco Zardi <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Signed-off-by: AlenkaF <[email protected]>
  • Loading branch information
4 people authored Nov 20, 2023
1 parent 96e62d8 commit 68ba49d
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 14 deletions.
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ parquet::ReaderProperties MakeReaderProperties(
parquet_scan_options->reader_properties->thrift_string_size_limit());
properties.set_thrift_container_size_limit(
parquet_scan_options->reader_properties->thrift_container_size_limit());

properties.set_page_checksum_verification(
parquet_scan_options->reader_properties->page_checksum_verification());

return properties;
}

Expand Down
22 changes: 19 additions & 3 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
write_batch_size=self._properties["write_batch_size"],
dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"],
write_page_index=self._properties["write_page_index"],
write_page_checksum=self._properties["write_page_checksum"],
)

def _set_arrow_properties(self):
Expand Down Expand Up @@ -655,6 +656,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
dictionary_pagesize_limit=None,
write_page_index=False,
encryption_config=None,
write_page_checksum=False,
)

self._set_properties()
Expand Down Expand Up @@ -701,6 +703,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None
If not None, use the provided ParquetDecryptionConfig to decrypt the
Parquet file.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.
"""

# Avoid mistakingly creating attributes
Expand All @@ -711,7 +715,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
bint pre_buffer=True,
thrift_string_size_limit=None,
thrift_container_size_limit=None,
decryption_config=None):
decryption_config=None,
bint page_checksum_verification=False):
self.init(shared_ptr[CFragmentScanOptions](
new CParquetFragmentScanOptions()))
self.use_buffered_stream = use_buffered_stream
Expand All @@ -723,6 +728,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
self.thrift_container_size_limit = thrift_container_size_limit
if decryption_config is not None:
self.parquet_decryption_config = decryption_config
self.page_checksum_verification = page_checksum_verification

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
Expand Down Expand Up @@ -802,6 +808,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
set_decryption_config(self, config)
self._parquet_decryption_config = config

@property
def page_checksum_verification(self):
return self.reader_properties().page_checksum_verification()

@page_checksum_verification.setter
def page_checksum_verification(self, bint page_checksum_verification):
self.reader_properties().set_page_checksum_verification(page_checksum_verification)

def equals(self, ParquetFragmentScanOptions other):
"""
Parameters
Expand All @@ -814,11 +828,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
"""
attrs = (
self.use_buffered_stream, self.buffer_size, self.pre_buffer,
self.thrift_string_size_limit, self.thrift_container_size_limit)
self.thrift_string_size_limit, self.thrift_container_size_limit,
self.page_checksum_verification)
other_attrs = (
other.use_buffered_stream, other.buffer_size, other.pre_buffer,
other.thrift_string_size_limit,
other.thrift_container_size_limit)
other.thrift_container_size_limit, other.page_checksum_verification)
return attrs == other_attrs

@staticmethod
Expand All @@ -835,6 +850,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
pre_buffer=self.pre_buffer,
thrift_string_size_limit=self.thrift_string_size_limit,
thrift_container_size_limit=self.thrift_container_size_limit,
page_checksum_verification=self.page_checksum_verification
)
return ParquetFragmentScanOptions._reconstruct, (kwargs,)

Expand Down
8 changes: 7 additions & 1 deletion python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
shared_ptr[CFileDecryptionProperties] file_decryption_properties() \
const

c_bool page_checksum_verification() const
void set_page_checksum_verification(c_bool check_crc)

CReaderProperties default_reader_properties()

cdef cppclass ArrowReaderProperties:
Expand Down Expand Up @@ -428,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit)
Builder* enable_write_page_index()
Builder* disable_write_page_index()
Builder* enable_page_checksum()
Builder* disable_page_checksum()
shared_ptr[WriterProperties] build()

cdef cppclass ArrowWriterProperties:
Expand Down Expand Up @@ -576,7 +581,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
FileEncryptionProperties encryption_properties=*,
write_batch_size=*,
dictionary_pagesize_limit=*,
write_page_index=*) except *
write_page_index=*,
write_page_checksum=*) except *


cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
Expand Down
22 changes: 18 additions & 4 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,8 @@ cdef class ParquetReader(_Weakrefable):
coerce_int96_timestamp_unit=None,
FileDecryptionProperties decryption_properties=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
"""
Open a parquet file for reading.
Expand All @@ -1199,6 +1200,7 @@ cdef class ParquetReader(_Weakrefable):
decryption_properties : FileDecryptionProperties, optional
thrift_string_size_limit : int, optional
thrift_container_size_limit : int, optional
page_checksum_verification : bool, default False
"""
cdef:
shared_ptr[CFileMetaData] c_metadata
Expand Down Expand Up @@ -1236,6 +1238,8 @@ cdef class ParquetReader(_Weakrefable):

arrow_props.set_pre_buffer(pre_buffer)

properties.set_page_checksum_verification(page_checksum_verification)

if coerce_int96_timestamp_unit is None:
# use the default defined in default_arrow_reader_properties()
pass
Expand Down Expand Up @@ -1559,7 +1563,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
FileEncryptionProperties encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
write_page_index=False) except *:
write_page_index=False,
write_page_checksum=False) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
Expand Down Expand Up @@ -1703,6 +1708,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
# a size larger than this then it will be latched to this value.
props.max_row_group_length(_MAX_ROW_GROUP_SIZE)

# checksum

if write_page_checksum:
props.enable_page_checksum()
else:
props.disable_page_checksum()

# page index

if write_page_index:
Expand Down Expand Up @@ -1822,7 +1834,8 @@ cdef class ParquetWriter(_Weakrefable):
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False):
write_page_index=False,
write_page_checksum=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -1853,7 +1866,8 @@ cdef class ParquetWriter(_Weakrefable):
encryption_properties=encryption_properties,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
write_page_index=write_page_index
write_page_index=write_page_index,
write_page_checksum=write_page_checksum
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
Expand Down
37 changes: 33 additions & 4 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ class ParquetFile:
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.
page_checksum_verification : bool, default False
If True, verify the checksum for each page read from the file.
Examples
--------
Expand Down Expand Up @@ -327,7 +329,8 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
read_dictionary=None, memory_map=False, buffer_size=0,
pre_buffer=False, coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None, filesystem=None):
thrift_container_size_limit=None, filesystem=None,
page_checksum_verification=False):

self._close_source = getattr(source, 'closed', True)

Expand All @@ -346,6 +349,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
decryption_properties=decryption_properties,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
Expand Down Expand Up @@ -887,6 +891,10 @@ def _sanitize_table(table, new_schema, flavor):
filtering more efficient than the page header, as it gathers all the
statistics for a Parquet file in a single place, avoiding scattered I/O.
Note that the page index is not yet used on the read size by PyArrow.
write_page_checksum : bool, default False
Whether to write page checksums in general for all columns.
Page checksums enable detection of data corruption, which might occur during
transmission or in the storage.
"""

_parquet_writer_example_doc = """\
Expand Down Expand Up @@ -980,6 +988,7 @@ def __init__(self, where, schema, filesystem=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
write_page_checksum=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -1037,6 +1046,7 @@ def __init__(self, where, schema, filesystem=None,
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
**options)
self.is_open = True

Expand Down Expand Up @@ -1766,6 +1776,8 @@ class ParquetDataset:
If not None, override the maximum total size of containers allocated
when decoding Thrift structures. The default limit should be
sufficient for most Parquet files.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.
Examples
--------
Expand All @@ -1779,7 +1791,8 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):

extra_msg = ""
if use_legacy_dataset is None:
Expand Down Expand Up @@ -1812,6 +1825,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
metadata_nthreads=metadata_nthreads,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
warnings.warn(
"Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
Expand All @@ -1828,7 +1842,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
Expand Down Expand Up @@ -2419,6 +2434,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None,
coerce_int96_timestamp_unit=None, schema=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None,
page_checksum_verification=False,
**kwargs):
import pyarrow.dataset as ds

Expand All @@ -2437,6 +2453,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None,
"coerce_int96_timestamp_unit": coerce_int96_timestamp_unit,
"thrift_string_size_limit": thrift_string_size_limit,
"thrift_container_size_limit": thrift_container_size_limit,
"page_checksum_verification": page_checksum_verification,
}
if buffer_size:
read_options.update(use_buffered_stream=True,
Expand Down Expand Up @@ -2855,6 +2872,8 @@ def partitioning(self):
If not None, override the maximum total size of containers allocated
when decoding Thrift structures. The default limit should be
sufficient for most Parquet files.
page_checksum_verification : bool, default False
If True, verify the checksum for each page read from the file.
Returns
-------
Expand Down Expand Up @@ -2949,7 +2968,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
ignore_prefixes=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
if not use_legacy_dataset:
if metadata is not None:
raise ValueError(
Expand All @@ -2973,6 +2993,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
Expand Down Expand Up @@ -3004,6 +3025,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
decryption_properties=decryption_properties,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)

return dataset.read(columns=columns, use_threads=use_threads,
Expand All @@ -3020,6 +3042,11 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
"The 'ignore_prefixes' keyword is only supported when "
"use_legacy_dataset=False")

if page_checksum_verification:
raise ValueError(
"The 'page_checksum_verification' keyword is only supported when "
"use_legacy_dataset=False")

if schema is not None:
raise ValueError(
"The 'schema' argument is only supported when "
Expand Down Expand Up @@ -3101,6 +3128,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
write_page_checksum=False,
**kwargs):
# Implementor's note: when adding keywords here / updating defaults, also
# update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
Expand Down Expand Up @@ -3129,6 +3157,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
Expand Down
Loading

0 comments on commit 68ba49d

Please sign in to comment.