Skip to content

Added ExpireSnapshots Feature #1880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0a94d96
Added initial units tests and Class for Removing a Snapshot
ForeverAngry Mar 29, 2025
5f0b62b
Added methods needed to expire snapshots by id, and optionally cleanu…
ForeverAngry Mar 31, 2025
f995daa
Update test_expire_snapshots.py
ForeverAngry Mar 31, 2025
65365e1
Added the builder method to __init__.py, updated the snapshot api wit…
ForeverAngry Apr 1, 2025
e28815f
Snapshots are not being transacted on, but need to re-assign refs
ForeverAngry Apr 1, 2025
4628ede
Fixed the test case.
ForeverAngry Apr 3, 2025
e80c41c
adding print statements to help with debugging
ForeverAngry Apr 3, 2025
cb9f0c9
Draft ready
ForeverAngry Apr 3, 2025
ebcff2b
Applied suggestions to Fix CICD
ForeverAngry Apr 3, 2025
97399bf
Merge branch 'main' into main
ForeverAngry Apr 3, 2025
95e5af2
Rebuild the poetry lock file.
ForeverAngry Apr 3, 2025
5ab5890
Merge branch 'main' into main
ForeverAngry Apr 4, 2025
5acd690
Refactor implementation of `ExpireSnapshots`
ForeverAngry Apr 13, 2025
d30a08c
Fixed format and linting issues
ForeverAngry Apr 13, 2025
e62ab58
Merge branch 'main' into main
ForeverAngry Apr 13, 2025
1af3258
Fixed format and linting issues
ForeverAngry Apr 13, 2025
352b48f
Merge branch 'main' of https://github.com/ForeverAngry/iceberg-python
ForeverAngry Apr 13, 2025
382e0ea
Merge branch 'main' into main
ForeverAngry Apr 18, 2025
549c183
rebased: from main
ForeverAngry Apr 19, 2025
386cb15
fixed: typo
ForeverAngry Apr 19, 2025
12729fa
removed errant files
ForeverAngry Apr 22, 2025
ce3515c
Added: public method signature to the init table file.
ForeverAngry Apr 22, 2025
28fce4b
Removed: `expire_snapshots_older_than` method, in favor of implementi…
ForeverAngry Apr 24, 2025
2c3153e
Update tests/table/test_expire_snapshots.py
ForeverAngry Apr 26, 2025
27c3ece
Removed: unrelated changes, Added: logic to expire snapshot method.
ForeverAngry Apr 26, 2025
05793c0
Merge branch 'main' into 1880-add-expire-snapshots
ForeverAngry Apr 26, 2025
8ec1889
Update test_partition_evolution.py
ForeverAngry Apr 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
ManageSnapshots,
UpdateSnapshot,
_FastAppendFiles,
ExpireSnapshots
)
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
Expand Down Expand Up @@ -1078,6 +1079,15 @@ def manage_snapshots(self) -> ManageSnapshots:
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def expire_snapshots(self) -> ExpireSnapshots:
"""
Shorthand to run expire snapshots by id or by a timestamp.

Use table.expire_snapshots().<operation>().commit() to run a specific operation.
Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
"""
return ExpireSnapshots(transaction=Transaction(self, autocommit=True))

def update_statistics(self) -> UpdateStatistics:
"""
Expand Down
73 changes: 70 additions & 3 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
AddSnapshotUpdate,
AssertRefSnapshotId,
RemoveSnapshotRefUpdate,
RemoveSnapshotsUpdate,
SetSnapshotRefUpdate,
TableMetadata,
TableRequirement,
TableUpdate,
U,
Expand All @@ -82,7 +84,11 @@
from pyiceberg.utils.properties import property_as_bool, property_as_int

if TYPE_CHECKING:
from pyiceberg.table import Transaction
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if is there to break circular dependencies. If it is not needed anymore, we can remove it 👍



from pyiceberg.table.metadata import Snapshot, TableMetadata
from pyiceberg.table.snapshots import Snapshot


def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
Expand Down Expand Up @@ -238,7 +244,7 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
)

def _commit(self) -> UpdatesAndRequirements:
def _commit(self, base_metadata: TableMetadata) -> UpdatesAndRequirements:
new_manifests = self._manifests()
next_sequence_number = self._transaction.table_metadata.next_sequence_number()

Expand Down Expand Up @@ -739,11 +745,11 @@ class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""

_snapshot_ids_to_expire = set()
_updates: Tuple[TableUpdate, ...] = ()
_requirements: Tuple[TableRequirement, ...] = ()

def _commit(self) -> UpdatesAndRequirements:
"""Apply the pending changes and commit."""
return self._updates, self._requirements

def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots:
Expand Down Expand Up @@ -843,3 +849,64 @@ def remove_branch(self, branch_name: str) -> ManageSnapshots:
This for method chaining
"""
return self._remove_ref_snapshot(ref_name=branch_name)

class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
"""
Expire snapshots by ID or by timestamp.
Use table.expire_snapshots().<operation>().commit() to run a specific operation.
Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
"""

_snapshot_ids_to_expire = set()
_updates: Tuple[TableUpdate, ...] = ()
_requirements: Tuple[TableRequirement, ...] = ()

def _commit(self) -> UpdatesAndRequirements:
"""
Commit the staged updates and requirements.
This will remove the snapshots with the given IDs.

Returns:
Tuple of updates and requirements to be committed,
as required by the calling parent apply functions.
"""
update = RemoveSnapshotsUpdate(snapshot_ids=self._snapshot_ids_to_expire)
self._updates += (update,)
return self._updates, self._requirements

def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots:
"""
Expire a snapshot by its ID.

Args:
snapshot_id (int): The ID of the snapshot to expire.

Returns:
This for method chaining.
"""
if self._transaction.table_metadata.snapshot_by_id(snapshot_id) is None:
raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.")
self._snapshot_ids_to_expire.add(snapshot_id)
return self

def expire_snapshots_older_than(self, timestamp_ms: int) -> ExpireSnapshots:
"""
Expire snapshots older than the given timestamp.

Args:
timestamp_ms (int): The timestamp in milliseconds. Snapshots older than this will be expired.

Returns:
This for method chaining.
"""
# Collect IDs of snapshots to be expired
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunally, it is not that simple to just look at the time alone. Instead, there are some rules, for example:

The easiest way of going through the logic is following this method: https://github.com/apache/iceberg/blob/3f661d5c6657542538a1e944db57405efdefea29/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java#L179

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might just pull this out into another issue, separate from this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hestitant to do that, because when folks would run it, it might break their tables 😱

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant, for now, ill remove the expire_snapshots_older_than method, for now, and contribute that in another PR.

snapshots_to_remove = [
snapshot.snapshot_id
for snapshot in self._transaction.table_metadata.snapshots
if snapshot.timestamp_ms < timestamp_ms
]
if snapshots_to_remove:
for snapshot_id in snapshots_to_remove:
self._snapshot_ids_to_expire.add(snapshot_id)
return self
2 changes: 1 addition & 1 deletion tests/expressions/test_literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def test_invalid_decimal_conversions() -> None:
def test_invalid_string_conversions() -> None:
assert_invalid_conversions(
literal("abc"),
[FloatType(), DoubleType(), FixedType(1), BinaryType()],
[FixedType(1), BinaryType()],
)


Expand Down
8 changes: 8 additions & 0 deletions tests/integration/test_partition_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ def test_add_hour(catalog: Catalog) -> None:
_validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "hour_transform"))


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_add_hour_string_transform(catalog: Catalog) -> None:
table = _table(catalog)
table.update_spec().add_field("event_ts", "hour", "str_hour_transform").commit()
_validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "str_hour_transform"))


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_add_hour_generates_default_name(catalog: Catalog) -> None:
Expand Down
34 changes: 34 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1776,3 +1776,37 @@ def test_write_optional_list(session_catalog: Catalog) -> None:
session_catalog.load_table(identifier).append(df_2)

assert len(session_catalog.load_table(identifier).scan().to_arrow()) == 4


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_evolve_and_write(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_evolve_and_write"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}, schema=Schema())
other_table = session_catalog.load_table(identifier)

numbers = pa.array([1, 2, 3, 4], type=pa.int32())

with tbl.update_schema() as upd:
# This is not known by other_table
upd.add_column("id", IntegerType())

with other_table.transaction() as tx:
# Refreshes the underlying metadata, and the schema
other_table.refresh()
tx.append(
pa.Table.from_arrays(
[
numbers,
],
schema=pa.schema(
[
pa.field("id", pa.int32(), nullable=True),
]
),
)
)

assert session_catalog.load_table(identifier).scan().to_arrow().column(0).combine_chunks() == numbers
43 changes: 43 additions & 0 deletions tests/table/test_expire_snapshots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from unittest.mock import MagicMock
from uuid import uuid4

from pyiceberg.table import CommitTableResponse, Table


def test_expire_snapshot(table_v2: Table) -> None:
EXPIRE_SNAPSHOT = 3051729675574597004
KEEP_SNAPSHOT = 3055729675574597004
# Mock the catalog's commit_table method
mock_response = CommitTableResponse(
# Use the table's current metadata but keep only the snapshot not to be expired
metadata=table_v2.metadata.model_copy(update={"snapshots": [KEEP_SNAPSHOT]}),
metadata_location="mock://metadata/location",
uuid=uuid4(),
)

# Mock the commit_table method to return the mock response
table_v2.catalog.commit_table = MagicMock(return_value=mock_response)

# Print snapshot IDs for debugging
print(f"Snapshot IDs before expiration: {[snapshot.snapshot_id for snapshot in table_v2.metadata.snapshots]}")

# Assert fixture data to validate test assumptions
assert len(table_v2.metadata.snapshots) == 2
assert len(table_v2.metadata.snapshot_log) == 2
assert len(table_v2.metadata.refs) == 2

# Expire the snapshot directly without using a transaction
try:
table_v2.expire_snapshots().expire_snapshot_by_id(EXPIRE_SNAPSHOT).commit()
except Exception as e:
assert False, f"Commit failed with error: {e}"

# Assert that commit_table was called once
table_v2.catalog.commit_table.assert_called_once()

# Assert the expired snapshot ID is no longer present
remaining_snapshots = table_v2.metadata.snapshots
assert EXPIRE_SNAPSHOT not in remaining_snapshots

# Assert the length of snapshots after expiration
assert len(table_v2.metadata.snapshots) == 1
Loading