diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 9e9de52dee..b65ba4b44a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -87,6 +87,7 @@ from pyiceberg.table.name_mapping import ( NameMapping, ) +from pyiceberg.table.optimize import OptimizeTable from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import ( Snapshot, @@ -907,6 +908,15 @@ def inspect(self) -> InspectTable: """ return InspectTable(self) + @property + def optimize(self) -> OptimizeTable: + """Return the OptimizeTable object to optimize. + + Returns: + OptimizeTable object based on this Table. + """ + return OptimizeTable(self) + def refresh(self) -> Table: """Refresh the current table metadata. diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 878ae71c81..ccdbd9cd25 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -17,7 +17,8 @@ from __future__ import annotations from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple +from functools import reduce +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary @@ -645,10 +646,16 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_manifests(self) -> "pa.Table": + def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table": import pyarrow as pa - snapshots = self.tbl.snapshots() + # coerce into snapshot objects if users passes in snapshot ids + if snapshots is not None: + if isinstance(snapshots[0], int): + snapshots = cast(list[Snapshot], [self.tbl.metadata.snapshot_by_id(snapshot_id) for snapshot_id in snapshots]) + else: + snapshots = self.tbl.snapshots() + if not snapshots: return pa.Table.from_pylist([], schema=self._get_all_manifests_schema()) @@ -657,3 +664,25 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def all_known_files(self) -> dict[str, set[str]]: + """Get all the known files in the table. + + Returns: + dict of {file_type: set of file paths} for each file type. + """ + snapshots = self.tbl.snapshots() + + _all_known_files = {} + _all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist()) + _all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} + _all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} + + executor = ExecutorFactory.get_or_create() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids + ) + _all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) + + return _all_known_files diff --git a/pyiceberg/table/optimize.py b/pyiceberg/table/optimize.py new file mode 100644 index 0000000000..dbd7ec081c --- /dev/null +++ b/pyiceberg/table/optimize.py @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from datetime import datetime, timedelta, timezone +from functools import reduce +from typing import TYPE_CHECKING, Set + +from pyiceberg.io import _parse_location +from pyiceberg.utils.concurrent import ExecutorFactory + +logger = logging.getLogger(__name__) + + +if TYPE_CHECKING: + from pyiceberg.table import Table + + +class OptimizeTable: + tbl: Table + + def __init__(self, tbl: Table) -> None: + self.tbl = tbl + + try: + import pyarrow as pa # noqa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e + + def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: + """Get all files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned". + + Args: + location: The location to check for orphaned files. + older_than: The time period to check for orphaned files. Defaults to 3 days. + + Returns: + A set of orphaned file paths. + """ + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = self.tbl.inspect.all_known_files() + flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set()) + + fs = _fs_from_file_path(self.tbl.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories, and filter on time + as_of = datetime.now(timezone.utc) - older_than + all_files = [ + f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of)) + ] + + orphaned_files = set(all_files).difference(flat_known_files) + + return orphaned_files + + def remove_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_run: bool = False) -> None: + """Remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned". + + Args: + older_than: The time period to check for orphaned files. Defaults to 3 days. + dry_run: If True, only log the files that would be deleted. Defaults to False. + """ + location = self.tbl.location() + orphaned_files = self.orphaned_files(location, older_than) + logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + deleted_files = set() + failed_to_delete_files = set() + + def _delete(file: str) -> None: + # don't error if the file doesn't exist + # still catch ctrl-c, etc. + try: + self.tbl.io.delete(file) + deleted_files.add(file) + except Exception: + failed_to_delete_files.add(file) + + if orphaned_files: + if dry_run: + logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!") + else: + executor = ExecutorFactory.get_or_create() + deletes = executor.map(_delete, orphaned_files) + # exhaust + list(deletes) + logger.info(f"Deleted {len(deleted_files)} orphaned files at {location}!") + logger.info(f"Files:\n{deleted_files}") + if failed_to_delete_files: + logger.warning(f"Failed to delete {len(failed_to_delete_files)} orphaned files at {location}!") + logger.warning(f"Files:\n{failed_to_delete_files}") + else: + logger.info(f"No orphaned files found at {location}!") diff --git a/tests/table/test_remove_orphans.py b/tests/table/test_remove_orphans.py new file mode 100644 index 0000000000..dc09717340 --- /dev/null +++ b/tests/table/test_remove_orphans.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +from datetime import datetime, timedelta +from pathlib import Path, PosixPath +from unittest.mock import patch + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType, NestedField, StringType +from tests.catalog.test_base import InMemoryCatalog + + +@pytest.fixture +def catalog(tmp_path: PosixPath) -> InMemoryCatalog: + catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix()) + catalog.create_namespace("default") + return catalog + + +def test_remove_orphaned_files(catalog: Catalog) -> None: + identifier = "default.test_remove_orphaned_files" + + schema = Schema( + NestedField(1, "city", StringType(), required=True), + NestedField(2, "inhabitants", IntegerType(), required=True), + # Mark City as the identifier field, also known as the primary-key + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("city", pa.string(), nullable=False), + pa.field("inhabitants", pa.int32(), nullable=False), + ] + ) + + df = pa.Table.from_pylist( + [ + {"city": "Drachten", "inhabitants": 45019}, + {"city": "Drachten", "inhabitants": 45019}, + ], + schema=arrow_schema, + ) + tbl.append(df) + + orphaned_file = Path(tbl.location()) / "orphan.txt" + + orphaned_file.touch() + assert orphaned_file.exists() + + # assert no files deleted if dry run... + tbl.optimize.remove_orphaned_files(dry_run=True) + assert orphaned_file.exists() + + # should not delete because it was just created... + tbl.optimize.remove_orphaned_files() + assert orphaned_file.exists() + + # modify creation date to be older than 3 days + five_days_ago = (datetime.now() - timedelta(days=5)).timestamp() + os.utime(orphaned_file, (five_days_ago, five_days_ago)) + tbl.optimize.remove_orphaned_files() + assert not orphaned_file.exists() + + # assert that all known files still exist... + all_known_files = tbl.inspect.all_known_files() + for files in all_known_files.values(): + for file in files: + assert Path(file).exists() + + +def test_remove_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None: + identifier = "default.test_remove_orphaned_files" + + schema = Schema( + NestedField(1, "city", StringType(), required=True), + NestedField(2, "inhabitants", IntegerType(), required=True), + # Mark City as the identifier field, also known as the primary-key + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("city", pa.string(), nullable=False), + pa.field("inhabitants", pa.int32(), nullable=False), + ] + ) + + df = pa.Table.from_pylist( + [ + {"city": "Drachten", "inhabitants": 45019}, + {"city": "Drachten", "inhabitants": 45019}, + ], + schema=arrow_schema, + ) + tbl.append(df) + + file_that_does_not_exist = "foo/bar.baz" + with patch.object(type(tbl.optimize), "orphaned_files", return_value={file_that_does_not_exist}): + with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete: + tbl.optimize.remove_orphaned_files(timedelta(days=3)) + mock_delete.assert_called_with(file_that_does_not_exist)