From d74fe291b595292fde9ab42c6faada9d1c02fd94 Mon Sep 17 00:00:00 2001 From: sharkdtu Date: Tue, 25 Feb 2025 19:58:20 +0800 Subject: [PATCH 1/2] fix: make file name of write task consistent with java api --- pyiceberg/io/pyarrow.py | 12 ++++++++++-- pyiceberg/table/__init__.py | 5 +++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f7e3c7c082..8a68d7faf1 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2547,6 +2547,7 @@ def _dataframe_to_data_files( table_metadata: TableMetadata, df: pa.Table, io: FileIO, + task_id: int = 0, write_uuid: Optional[uuid.UUID] = None, counter: Optional[itertools.count[int]] = None, ) -> Iterable[DataFile]: @@ -2574,7 +2575,13 @@ def _dataframe_to_data_files( table_metadata=table_metadata, tasks=iter( [ - WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema) + WriteTask( + task_id=task_id, + write_uuid=write_uuid, + counter_id=next(counter), + record_batches=batches, + schema=task_schema + ) for batches in bin_pack_arrow_table(df, target_file_size) ] ), @@ -2587,8 +2594,9 @@ def _dataframe_to_data_files( tasks=iter( [ WriteTask( + task_id=task_id, write_uuid=write_uuid, - task_id=next(counter), + counter_id=next(counter), record_batches=batches, partition_key=partition.partition_key, schema=task_schema, diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index e625b848b2..960bc9bf20 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1864,8 +1864,9 @@ def count(self) -> int: class WriteTask: """Task with the parameters for writing a DataFile.""" - write_uuid: uuid.UUID task_id: int + write_uuid: uuid.UUID + counter_id: int schema: Schema record_batches: List[pa.RecordBatch] sort_order_id: Optional[int] = None @@ -1874,7 +1875,7 @@ class WriteTask: def generate_data_file_filename(self, extension: str) -> str: # Mimics the behavior in the Java API: # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 - return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + return f"00000-{self.task_id}-{self.write_uuid}-{self.counter_id:05d}.{extension}" @dataclass(frozen=True) From 781f887f2649dff64bd24fa2bbe48a461f28c335 Mon Sep 17 00:00:00 2001 From: sharkdtu Date: Wed, 26 Feb 2025 01:41:25 +0800 Subject: [PATCH 2/2] fix style --- pyiceberg/io/pyarrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 8a68d7faf1..6d9433f2f7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2580,7 +2580,7 @@ def _dataframe_to_data_files( write_uuid=write_uuid, counter_id=next(counter), record_batches=batches, - schema=task_schema + schema=task_schema, ) for batches in bin_pack_arrow_table(df, target_file_size) ]