Skip to content

feat: delete orphaned files #1958

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
from pyiceberg.table.name_mapping import (
NameMapping,
)
from pyiceberg.table.optimize import OptimizeTable
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.snapshots import (
Snapshot,
Expand Down Expand Up @@ -907,6 +908,15 @@ def inspect(self) -> InspectTable:
"""
return InspectTable(self)

@property
def optimize(self) -> OptimizeTable:
"""Return the OptimizeTable object to optimize.

Returns:
OptimizeTable object based on this Table.
"""
return OptimizeTable(self)

def refresh(self) -> Table:
"""Refresh the current table metadata.

Expand Down
35 changes: 32 additions & 3 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
from functools import reduce
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
Expand Down Expand Up @@ -645,10 +646,16 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

def all_manifests(self) -> "pa.Table":
def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table":
import pyarrow as pa

snapshots = self.tbl.snapshots()
# coerce into snapshot objects if users passes in snapshot ids
if snapshots is not None:
if isinstance(snapshots[0], int):
snapshots = cast(list[Snapshot], [self.tbl.metadata.snapshot_by_id(snapshot_id) for snapshot_id in snapshots])
else:
snapshots = self.tbl.snapshots()

if not snapshots:
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())

Expand All @@ -657,3 +664,25 @@ def all_manifests(self) -> "pa.Table":
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
)
return pa.concat_tables(manifests_by_snapshots)

def all_known_files(self) -> dict[str, set[str]]:
"""Get all the known files in the table.

Returns:
dict of {file_type: set of file paths} for each file type.
"""
snapshots = self.tbl.snapshots()

_all_known_files = {}
_all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist())
_all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots}
_all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics}

executor = ExecutorFactory.get_or_create()
snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
files_by_snapshots: Iterator[Set[str]] = executor.map(
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids
)
_all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set())

return _all_known_files
117 changes: 117 additions & 0 deletions pyiceberg/table/optimize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from datetime import datetime, timedelta, timezone
from functools import reduce
from typing import TYPE_CHECKING, Set

from pyiceberg.io import _parse_location
from pyiceberg.utils.concurrent import ExecutorFactory

logger = logging.getLogger(__name__)


if TYPE_CHECKING:
from pyiceberg.table import Table


class OptimizeTable:
tbl: Table

def __init__(self, tbl: Table) -> None:
self.tbl = tbl

try:
import pyarrow as pa # noqa
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e

def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]:
"""Get all files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".

Args:
location: The location to check for orphaned files.
older_than: The time period to check for orphaned files. Defaults to 3 days.

Returns:
A set of orphaned file paths.
"""
try:
import pyarrow as pa # noqa: F401
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e

from pyarrow.fs import FileSelector, FileType

from pyiceberg.io.pyarrow import _fs_from_file_path

all_known_files = self.tbl.inspect.all_known_files()
flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set())

fs = _fs_from_file_path(self.tbl.io, location)

_, _, path = _parse_location(location)
selector = FileSelector(path, recursive=True)
# filter to just files as it may return directories, and filter on time
as_of = datetime.now(timezone.utc) - older_than
all_files = [
f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))
]

orphaned_files = set(all_files).difference(flat_known_files)

return orphaned_files

def remove_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_run: bool = False) -> None:
"""Remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".

Args:
older_than: The time period to check for orphaned files. Defaults to 3 days.
dry_run: If True, only log the files that would be deleted. Defaults to False.
"""
location = self.tbl.location()
orphaned_files = self.orphaned_files(location, older_than)
logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!")
deleted_files = set()
failed_to_delete_files = set()

def _delete(file: str) -> None:
# don't error if the file doesn't exist
# still catch ctrl-c, etc.
try:
self.tbl.io.delete(file)
deleted_files.add(file)
except Exception:
failed_to_delete_files.add(file)

if orphaned_files:
if dry_run:
logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!")
else:
executor = ExecutorFactory.get_or_create()
deletes = executor.map(_delete, orphaned_files)
# exhaust
list(deletes)
logger.info(f"Deleted {len(deleted_files)} orphaned files at {location}!")
logger.info(f"Files:\n{deleted_files}")
if failed_to_delete_files:
logger.warning(f"Failed to delete {len(failed_to_delete_files)} orphaned files at {location}!")
logger.warning(f"Files:\n{failed_to_delete_files}")
else:
logger.info(f"No orphaned files found at {location}!")
124 changes: 124 additions & 0 deletions tests/table/test_remove_orphans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from datetime import datetime, timedelta
from pathlib import Path, PosixPath
from unittest.mock import patch

import pyarrow as pa
import pytest

from pyiceberg.catalog import Catalog
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType
from tests.catalog.test_base import InMemoryCatalog


@pytest.fixture
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix())
catalog.create_namespace("default")
return catalog


def test_remove_orphaned_files(catalog: Catalog) -> None:
identifier = "default.test_remove_orphaned_files"

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)

tbl = catalog.create_table(identifier, schema=schema)

arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)

df = pa.Table.from_pylist(
[
{"city": "Drachten", "inhabitants": 45019},
{"city": "Drachten", "inhabitants": 45019},
],
schema=arrow_schema,
)
tbl.append(df)

orphaned_file = Path(tbl.location()) / "orphan.txt"

orphaned_file.touch()
assert orphaned_file.exists()

# assert no files deleted if dry run...
tbl.optimize.remove_orphaned_files(dry_run=True)
assert orphaned_file.exists()

# should not delete because it was just created...
tbl.optimize.remove_orphaned_files()
assert orphaned_file.exists()

# modify creation date to be older than 3 days
five_days_ago = (datetime.now() - timedelta(days=5)).timestamp()
os.utime(orphaned_file, (five_days_ago, five_days_ago))
tbl.optimize.remove_orphaned_files()
assert not orphaned_file.exists()

# assert that all known files still exist...
all_known_files = tbl.inspect.all_known_files()
for files in all_known_files.values():
for file in files:
assert Path(file).exists()


def test_remove_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None:
identifier = "default.test_remove_orphaned_files"

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)

tbl = catalog.create_table(identifier, schema=schema)

arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)

df = pa.Table.from_pylist(
[
{"city": "Drachten", "inhabitants": 45019},
{"city": "Drachten", "inhabitants": 45019},
],
schema=arrow_schema,
)
tbl.append(df)

file_that_does_not_exist = "foo/bar.baz"
with patch.object(type(tbl.optimize), "orphaned_files", return_value={file_that_does_not_exist}):
with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete:
tbl.optimize.remove_orphaned_files(timedelta(days=3))
mock_delete.assert_called_with(file_that_does_not_exist)