-
Notifications
You must be signed in to change notification settings - Fork 329
Move implementation of upsert from Table to Transaction #1817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I think since the transaction wrapper has been moved out, there should be a unit test added to do partial upsert and then throw an error and ensure the rollback occurs and we are not left in a state where a partial upsert succeeded. Example:
Just my thoughts 😃. Thanks, |
Agree! I will work on the test. With "update" you mean "delete", right? |
Hey sorry; just saw this; when i mean update, i mean it invokes an "overwrite" operation, which i believe is what delete's also trigger under the covers. 😀 |
There is a nice edgecase here.. tbl = catalog.create_table(identifier, schema=schema)
# Define exact schema: required int32 and required string
arrow_schema = pa.schema([
pa.field("id", pa.int32(), nullable=False),
pa.field("name", pa.string(), nullable=False),
])
tbl.append(pa.Table.from_pylist([{"id": 1, "name": "Alice"}], schema=arrow_schema))
df = pa.Table.from_pylist([{"id": 2, "name": "Bob"}, {"id": 1, "name": "Alicia"}], schema=arrow_schema)
with tbl.transaction() as txn:
txn.upsert(df, join_cols=["id"])
# This will re-insert Bob, instead of reading the uncommitted changes and ignore Bob
txn.upsert(df, join_cols=["id"]) @Fokko should it be possible to read uncommitted changes? |
now that #1903 is merged, could you rebase this PR? |
56888d3
to
f336c0b
Compare
matched_predicate = upsert_util.create_match_filter(df, join_cols) | ||
|
||
# We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes. | ||
matched_iceberg_table = DataScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most important change. Required #1903
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks for pointing out, and also covering this in the tests 👍
txn.delete(delete_filter="id = 1") | ||
txn.append(df) | ||
|
||
# This should read the uncommitted changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test uncommitted changes are read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great @koenvo
matched_predicate = upsert_util.create_match_filter(df, join_cols) | ||
|
||
# We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes. | ||
matched_iceberg_table = DataScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks for pointing out, and also covering this in the tests 👍
### Summary This PR updates the upsert logic to use batch processing. The main goal is to prevent out-of-memory (OOM) issues when updating large tables by avoiding loading all data at once. **Note:** This has only been tested against the unit tests—no real-world datasets have been evaluated yet. This PR partially depends on functionality introduced in [#1817](apache/iceberg#1817). --- ### Notes - Duplicate detection across multiple batches is **not** possible with this approach. - ~All data is read sequentially, which may be slower than the parallel read used by `to_arrow`.~ fixed using `concurrent_tasks` parameter --- ### Performance Comparison In setups with many small files, network and metadata overhead become the dominant factor. This impacts batch reading performance, as each file contributes relatively more overhead than payload. In the test setup used here, metadata access was the largest cost. #### Using `to_arrow_batch_reader` (sequential): - **Scan:** 9993.50 ms - **To list:** 19811.09 ms #### Using `to_arrow` (parallel): - **Scan:** 10607.88 ms --------- Co-authored-by: Fokko Driesprong <[email protected]>
### Summary This PR updates the upsert logic to use batch processing. The main goal is to prevent out-of-memory (OOM) issues when updating large tables by avoiding loading all data at once. **Note:** This has only been tested against the unit tests—no real-world datasets have been evaluated yet. This PR partially depends on functionality introduced in [apache#1817](apache/iceberg#1817). --- ### Notes - Duplicate detection across multiple batches is **not** possible with this approach. - ~All data is read sequentially, which may be slower than the parallel read used by `to_arrow`.~ fixed using `concurrent_tasks` parameter --- ### Performance Comparison In setups with many small files, network and metadata overhead become the dominant factor. This impacts batch reading performance, as each file contributes relatively more overhead than payload. In the test setup used here, metadata access was the largest cost. #### Using `to_arrow_batch_reader` (sequential): - **Scan:** 9993.50 ms - **To list:** 19811.09 ms #### Using `to_arrow` (parallel): - **Scan:** 10607.88 ms --------- Co-authored-by: Fokko Driesprong <[email protected]>
Rationale for this change
Previously, the upsert functionality was implemented at the table level, which meant it always initiated a new Transaction. This change moves the upsert implementation to the Transaction level while keeping
table.upsert(...)
as an entry point.With this refactor, end users now have the flexibility to call upsert in two ways:
table.upsert(...)
– which still starts a new transaction.transaction.upsert(...)
– allowing upserts within an existing transaction.Are these changes tested?
Using existing tests.
Are there any user-facing changes?
Yes. This change enables users to perform upserts within an existing transaction using
transaction.upsert(...)
, in addition to the existingtable.upsert(...)
method.