Skip to content

Commit 1a5e32a

Browse files
redpheonixxredpheonixxredpheonixx
authored
Fix decimal physicial type mapping (#1839)
This pull request addresses the handling of decimal physical type matching in Parquet. It implements rules such that: For precision ≤ 9, values are stored as `int32`. For precision ≤ 18, values are stored as `int64`. For higher precision, values are stored as a `FIXED_LEN_BYTE_ARRAY`. Closes #1789 --------- Co-authored-by: redpheonixx <[email protected]> Co-authored-by: redpheonixx <[email protected]>
1 parent bae62df commit 1a5e32a

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

pyiceberg/io/pyarrow.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@
175175
from pyiceberg.utils.concurrent import ExecutorFactory
176176
from pyiceberg.utils.config import Config
177177
from pyiceberg.utils.datetime import millis_to_datetime
178+
from pyiceberg.utils.decimal import unscaled_to_decimal
178179
from pyiceberg.utils.deprecated import deprecation_message
179180
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
180181
from pyiceberg.utils.singleton import Singleton
@@ -1888,7 +1889,7 @@ def visit_fixed(self, fixed_type: FixedType) -> str:
18881889
return "FIXED_LEN_BYTE_ARRAY"
18891890

18901891
def visit_decimal(self, decimal_type: DecimalType) -> str:
1891-
return "FIXED_LEN_BYTE_ARRAY"
1892+
return "INT32" if decimal_type.precision <= 9 else "INT64" if decimal_type.precision <= 18 else "FIXED_LEN_BYTE_ARRAY"
18921893

18931894
def visit_boolean(self, boolean_type: BooleanType) -> str:
18941895
return "BOOLEAN"
@@ -2362,8 +2363,13 @@ def data_file_statistics_from_parquet_metadata(
23622363
stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
23632364
)
23642365

2365-
col_aggs[field_id].update_min(statistics.min)
2366-
col_aggs[field_id].update_max(statistics.max)
2366+
if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
2367+
scale = stats_col.iceberg_type.scale
2368+
col_aggs[field_id].update_min(unscaled_to_decimal(statistics.min_raw, scale))
2369+
col_aggs[field_id].update_max(unscaled_to_decimal(statistics.max_raw, scale))
2370+
else:
2371+
col_aggs[field_id].update_min(statistics.min)
2372+
col_aggs[field_id].update_max(statistics.max)
23672373

23682374
except pyarrow.lib.ArrowNotImplementedError as e:
23692375
invalidate_col.add(field_id)

tests/io/test_pyarrow_stats.py

+21-5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
timedelta,
2828
timezone,
2929
)
30+
from decimal import Decimal
3031
from typing import (
3132
Any,
3233
Dict,
@@ -446,6 +447,9 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table
446447
{"id": 10, "name": "strings", "required": False, "type": "string"},
447448
{"id": 11, "name": "uuids", "required": False, "type": "uuid"},
448449
{"id": 12, "name": "binaries", "required": False, "type": "binary"},
450+
{"id": 13, "name": "decimal8", "required": False, "type": "decimal(5, 2)"},
451+
{"id": 14, "name": "decimal16", "required": False, "type": "decimal(16, 6)"},
452+
{"id": 15, "name": "decimal32", "required": False, "type": "decimal(19, 6)"},
449453
],
450454
},
451455
],
@@ -470,6 +474,9 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table
470474
strings = ["hello", "world"]
471475
uuids = [uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes, uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes]
472476
binaries = [b"hello", b"world"]
477+
decimal8 = pa.array([Decimal("123.45"), Decimal("678.91")], pa.decimal128(8, 2))
478+
decimal16 = pa.array([Decimal("12345679.123456"), Decimal("67891234.678912")], pa.decimal128(16, 6))
479+
decimal32 = pa.array([Decimal("1234567890123.123456"), Decimal("9876543210703.654321")], pa.decimal128(19, 6))
473480

474481
table = pa.Table.from_pydict(
475482
{
@@ -485,14 +492,17 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table
485492
"strings": strings,
486493
"uuids": uuids,
487494
"binaries": binaries,
495+
"decimal8": decimal8,
496+
"decimal16": decimal16,
497+
"decimal32": decimal32,
488498
},
489499
schema=arrow_schema,
490500
)
491501

492502
metadata_collector: List[Any] = []
493503

494504
with pa.BufferOutputStream() as f:
495-
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector) as writer:
505+
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector, store_decimal_as_integer=True) as writer:
496506
writer.write_table(table)
497507

498508
return metadata_collector[0], table_metadata
@@ -510,13 +520,13 @@ def test_metrics_primitive_types() -> None:
510520
)
511521
datafile = DataFile(**statistics.to_serialized_dict())
512522

513-
assert len(datafile.value_counts) == 12
514-
assert len(datafile.null_value_counts) == 12
523+
assert len(datafile.value_counts) == 15
524+
assert len(datafile.null_value_counts) == 15
515525
assert len(datafile.nan_value_counts) == 0
516526

517527
tz = timezone(timedelta(seconds=19800))
518528

519-
assert len(datafile.lower_bounds) == 12
529+
assert len(datafile.lower_bounds) == 15
520530
assert datafile.lower_bounds[1] == STRUCT_BOOL.pack(False)
521531
assert datafile.lower_bounds[2] == STRUCT_INT32.pack(23)
522532
assert datafile.lower_bounds[3] == STRUCT_INT64.pack(2)
@@ -529,8 +539,11 @@ def test_metrics_primitive_types() -> None:
529539
assert datafile.lower_bounds[10] == b"he"
530540
assert datafile.lower_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes
531541
assert datafile.lower_bounds[12] == b"he"
542+
assert datafile.lower_bounds[13][::-1].ljust(4, b"\x00") == STRUCT_INT32.pack(12345)
543+
assert datafile.lower_bounds[14][::-1].ljust(8, b"\x00") == STRUCT_INT64.pack(12345679123456)
544+
assert str(int.from_bytes(datafile.lower_bounds[15], byteorder="big", signed=True)).encode("utf-8") == b"1234567890123123456"
532545

533-
assert len(datafile.upper_bounds) == 12
546+
assert len(datafile.upper_bounds) == 15
534547
assert datafile.upper_bounds[1] == STRUCT_BOOL.pack(True)
535548
assert datafile.upper_bounds[2] == STRUCT_INT32.pack(89)
536549
assert datafile.upper_bounds[3] == STRUCT_INT64.pack(54)
@@ -543,6 +556,9 @@ def test_metrics_primitive_types() -> None:
543556
assert datafile.upper_bounds[10] == b"wp"
544557
assert datafile.upper_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes
545558
assert datafile.upper_bounds[12] == b"wp"
559+
assert datafile.upper_bounds[13][::-1].ljust(4, b"\x00") == STRUCT_INT32.pack(67891)
560+
assert datafile.upper_bounds[14][::-1].ljust(8, b"\x00") == STRUCT_INT64.pack(67891234678912)
561+
assert str(int.from_bytes(datafile.upper_bounds[15], byteorder="big", signed=True)).encode("utf-8") == b"9876543210703654321"
546562

547563

548564
def construct_test_table_invalid_upper_bound() -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]:

0 commit comments

Comments
 (0)