Skip to content

Can we enable adaptive clustering? #1790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
myz540 opened this issue Mar 12, 2025 · 8 comments
Open

Can we enable adaptive clustering? #1790

myz540 opened this issue Mar 12, 2025 · 8 comments

Comments

@myz540
Copy link

myz540 commented Mar 12, 2025

Question

Does pyiceberg allow us to enable adaptive clustering when creating a table or enable it on an existing table?

The relevant sql would be something like

ALTER TABLE <table_identifier> CLUSTER BY (<column_name> [, ...]);
@Fokko
Copy link
Contributor

Fokko commented Mar 12, 2025

@myz540 That's not in there today. However, if you pre-cluster the table before writing, it should maintain order.

@myz540
Copy link
Author

myz540 commented Mar 12, 2025

@myz540 That's not in there today. However, if you pre-cluster the table before writing, it should maintain order.

thanks for your reply. On another matter, I am trying to write to a table that has TruncateTransforms on two columns as part of the partition schema. I mostly followed the documentation.

def create_partitions_truncate():
    return PartitionSpec(
        PartitionField(
            source_id=1, field_id=1000, transform=TruncateTransform(width=7), name="sid_truncate"
        ),
        PartitionField(
            source_id=2, field_id=2000, transform=TruncateTransform(width=1), name="gene_truncate"
        ),
    )

However, when I try writing to it

table = catalog.load_table((DATABASE, table_name))
smol_table = pa.Table.from_pandas(_df, schema=create_pa_schema())
with table.transaction() as transaction:
    transaction.append(smol_table)

I am hit with the following error

Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: [PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=7), name='sid_truncate'), PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(width=1), name='gene_truncate')].

Is this simply not supported by pyarrow at the moment or am I doing something wrong?

@Fokko
Copy link
Contributor

Fokko commented Mar 12, 2025

@myz540 can you double check if you're using the latest version?

@myz540
Copy link
Author

myz540 commented Mar 13, 2025

@myz540 can you double check if you're using the latest version?

my pyarrow==16.0.0 and pyiceberg==0.8.1

I upgraded them to their latest versions and now have a new weird error

Traceback (most recent call last):
  File "/Users/mikezhong/dev/pbmc/scripts/upload_lems.py", line 137, in <module>
    transaction.append(smol_table)
  File "/Users/mikezhong/.venv/main/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 473, in append
    data_files = list(
                 ^^^^^
  File "/Users/mikezhong/.venv/main/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 2583, in _dataframe_to_data_files
    partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mikezhong/.venv/main/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 2630, in _determine_partitions
    name, partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mikezhong/.venv/main/lib/python3.11/site-packages/pyiceberg/transforms.py", line 905, in pyarrow_transform
    from pyiceberg_core import transform as pyiceberg_core_transform
ModuleNotFoundError: No module named 'pyiceberg_core'

Rolling back to 0.8.1 resolves the issue but I am still left with the original write error. I don't think pyarrow implements the partition write for truncate transform. I can look to use aws glue to run spark scripts to do the work but would've loved to use pyiceberg for all our AWS s3 Tables interactions

@kevinjqliu
Copy link
Contributor

Truncate transform with pyarrow was added in 0.9.0
50c33aa#diff-59b88b08481bfa59240342994e8dc16b34f4e9b28fb05540beb7dd22af8c036fR864-R865

ModuleNotFoundError: No module named 'pyiceberg_core'

you'd need to install the extra pyiceberg-core

pyiceberg-core = ["pyiceberg-core"]

@myz540
Copy link
Author

myz540 commented Mar 18, 2025

Truncate transform with pyarrow was added in 0.9.0 50c33aa#diff-59b88b08481bfa59240342994e8dc16b34f4e9b28fb05540beb7dd22af8c036fR864-R865

ModuleNotFoundError: No module named 'pyiceberg_core'

you'd need to install the extra pyiceberg-core

iceberg-python/pyproject.toml

Line 307 in 1c0e2b0

pyiceberg-core = ["pyiceberg-core"]

thanks Kevin, I wasn't sure if the error msg I was receiving was because of pyarrow not implementing or pyiceberg not implementing the wrapper. I'll give it a go and let you know

@myz540
Copy link
Author

myz540 commented Mar 18, 2025

@kevinjqliu Would you be able to provide an example for the BucketTransform?

Here is the schema

def create_lems_schema() -> Schema:
    """
    Create and return the PyArrow schema for lems table.
    """
    return Schema(
        NestedField(field_id=1, name="sid", field_type=StringType(), required=True),
        NestedField(field_id=2, name="gene", field_type=StringType(), required=True),
        NestedField(
            field_id=3, name="prediction", field_type=FloatType(), required=True
        ),
    )

I have tried two things and gotten errors for both:

def create_lems_partitions_bucket():
    return PartitionSpec(
        PartitionField(
            source_id=1,
            field_id=1,
            transform=BucketTransform(num_buckets=1000),
            name="sid",
        ),
        PartitionField(
            source_id=2,
            field_id=2,
            transform=BucketTransform(num_buckets=100),
            name="gene",
        ),
    )

yields error: ValueError: Could not find in old schema: 1: sid: bucket[1000](1)

def create_lems_partitions_bucket():
    return PartitionSpec(
        PartitionField(
            source_id=1,
            field_id=1000,
            transform=BucketTransform(num_buckets=1000),
            name="sid_bucket",
        ),
        PartitionField(
            source_id=2,
            field_id=2000,
            transform=BucketTransform(num_buckets=100),
            name="gene_bucket",
        ),
    )

yields error: ValueError: Could not find in old schema: 1000: sid_bucket: bucket[1000](1)

@myz540
Copy link
Author

myz540 commented Mar 18, 2025

I am able to create the partition spec with TruncateTransform and am able to write, however, when looping over chunks of 10k records and writing, like so:

for i, _df in tqdm(enumerate(chunk_dataframe(df)), desc="Processing chunk"):
    catalog = get_rest_catalog()
    table = catalog.load_table((DATABASE, table_name))
    smol_table = pa.Table.from_pandas(_df, schema=create_lems_pa_schema())
    with table.transaction() as transaction:
        transaction.append(smol_table)
        print(f"✅ Successfully appended data for {i}")
    print(f"✅ Successfully committed data for {i}")

print("✅ Successfully committed all data")

I eventually encounter this error. I've hit this error any time I need to write lots of chunks and it usually happens about an hour and a half in. I am refreshing my catalog connection on each iteration so not sure what the problem is. Any help would be appreciated

OSError: When initiating multiple part upload for key 'data/sid=HCC1008/gene=R/00000-0-b4aef6a1-d6c0-4c09-9c08-8cd2e91957e8.parquet' in bucket 'a5fc81c2-ccf3-4f36-wbi7imrt75cabpxguuo6i8f1a7n9quse1b--table-s3': AWS Error NETWORK_CONNECTION during CreateMultipartUpload operation: curlCode: 28, Timeout was reached

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants