Skip to content

Commit 8cca600

Browse files
committed
updates from review!
1 parent eed5ea8 commit 8cca600

3 files changed

Lines changed: 92 additions & 34 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
import contextlib
1920
import itertools
2021
import logging
2122
import os
@@ -32,7 +33,6 @@
3233
Callable,
3334
Dict,
3435
Iterable,
35-
Iterator,
3636
List,
3737
Optional,
3838
Set,
@@ -64,7 +64,7 @@
6464
inclusive_projection,
6565
manifest_evaluator,
6666
)
67-
from pyiceberg.io import FileIO, _parse_location, load_file_io
67+
from pyiceberg.io import FileIO, load_file_io
6868
from pyiceberg.manifest import (
6969
POSITIONAL_DELETE_SCHEMA,
7070
DataFile,
@@ -1377,39 +1377,19 @@ def to_polars(self) -> pl.LazyFrame:
13771377

13781378
def delete_orphaned_files(self) -> None:
13791379
"""Delete orphaned files in the table."""
1380-
try:
1381-
import pyarrow as pa # noqa: F401
1382-
except ModuleNotFoundError as e:
1383-
raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e
1384-
1385-
from pyarrow.fs import FileSelector, FileType
1386-
1387-
from pyiceberg.io.pyarrow import _fs_from_file_path
1388-
13891380
location = self.location()
1390-
1391-
all_known_files = []
1392-
snapshots = self.snapshots()
1393-
snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
1394-
all_manifests_table = self.inspect.all_manifests(snapshots)
1395-
all_known_files.extend(all_manifests_table["path"].to_pylist())
1396-
1397-
executor = ExecutorFactory.get_or_create()
1398-
files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids)
1399-
all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist())
1400-
1401-
fs = _fs_from_file_path(self.io, location)
1402-
1403-
_, _, path = _parse_location(location)
1404-
selector = FileSelector(path, recursive=True)
1405-
# filter to just files as it may return directories
1406-
all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File]
1407-
1408-
orphaned_files = set(all_files).difference(set(all_known_files))
1381+
orphaned_files = self.inspect.orphaned_files(location)
14091382
logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!")
14101383

1384+
def _delete(file: str) -> None:
1385+
# don't error if the file doesn't exist
1386+
# still catch ctrl-c, etc.
1387+
with contextlib.suppress(Exception):
1388+
self.io.delete(file)
1389+
14111390
if orphaned_files:
1412-
deletes = executor.map(self.io.delete, orphaned_files)
1391+
executor = ExecutorFactory.get_or_create()
1392+
deletes = executor.map(_delete, orphaned_files)
14131393
# exhaust
14141394
list(deletes)
14151395
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!")

pyiceberg/table/inspect.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
from __future__ import annotations
1818

1919
from datetime import datetime, timezone
20-
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
20+
from functools import reduce
21+
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast
2122

2223
from pyiceberg.conversions import from_bytes
24+
from pyiceberg.io import _parse_location
2325
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
2426
from pyiceberg.partitioning import PartitionSpec
2527
from pyiceberg.table.snapshots import Snapshot, ancestors_of
@@ -645,10 +647,16 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
645647
def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
646648
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
647649

648-
def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table":
650+
def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table":
649651
import pyarrow as pa
650652

651-
snapshots = snapshots or self.tbl.snapshots()
653+
# coerce into snapshot objects if users passes in snapshot ids
654+
if snapshots is not None:
655+
if isinstance(snapshots[0], int):
656+
snapshots = cast(list[Snapshot], [self.tbl.metadata.snapshot_by_id(snapshot_id) for snapshot_id in snapshots])
657+
else:
658+
snapshots = self.tbl.snapshots()
659+
652660
if not snapshots:
653661
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())
654662

@@ -657,3 +665,36 @@ def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table
657665
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
658666
)
659667
return pa.concat_tables(manifests_by_snapshots)
668+
669+
def orphaned_files(self, location: str) -> Set[str]:
670+
try:
671+
import pyarrow as pa # noqa: F401
672+
except ModuleNotFoundError as e:
673+
raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e
674+
675+
from pyarrow.fs import FileSelector, FileType
676+
677+
from pyiceberg.io.pyarrow import _fs_from_file_path
678+
679+
all_known_files = set()
680+
snapshots = self.tbl.snapshots()
681+
manifests_paths = self.all_manifests(snapshots)["path"].to_pylist()
682+
all_known_files.update(manifests_paths)
683+
684+
executor = ExecutorFactory.get_or_create()
685+
files_by_snapshots: Iterator[Set[str]] = executor.map(
686+
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist())
687+
)
688+
datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set())
689+
all_known_files.update(datafile_paths)
690+
691+
fs = _fs_from_file_path(self.tbl.io, location)
692+
693+
_, _, path = _parse_location(location)
694+
selector = FileSelector(path, recursive=True)
695+
# filter to just files as it may return directories
696+
all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File]
697+
698+
orphaned_files = set(all_files).difference(set(all_known_files))
699+
700+
return orphaned_files

tests/table/test_delete_orphans.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
from pathlib import Path, PosixPath
18+
from unittest.mock import PropertyMock, patch
1819

1920
import pyarrow as pa
2021
import pytest
@@ -67,3 +68,39 @@ def test_delete_orphaned_files(catalog: Catalog) -> None:
6768

6869
tbl.delete_orphaned_files()
6970
assert not orphaned_file.exists()
71+
72+
73+
def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None:
74+
identifier = "default.test_delete_orphaned_files"
75+
76+
schema = Schema(
77+
NestedField(1, "city", StringType(), required=True),
78+
NestedField(2, "inhabitants", IntegerType(), required=True),
79+
# Mark City as the identifier field, also known as the primary-key
80+
identifier_field_ids=[1],
81+
)
82+
83+
tbl = catalog.create_table(identifier, schema=schema)
84+
85+
arrow_schema = pa.schema(
86+
[
87+
pa.field("city", pa.string(), nullable=False),
88+
pa.field("inhabitants", pa.int32(), nullable=False),
89+
]
90+
)
91+
92+
df = pa.Table.from_pylist(
93+
[
94+
{"city": "Drachten", "inhabitants": 45019},
95+
{"city": "Drachten", "inhabitants": 45019},
96+
],
97+
schema=arrow_schema,
98+
)
99+
tbl.append(df)
100+
101+
file_that_does_not_exist = "foo/bar.baz"
102+
with patch.object(type(tbl), "inspect", new_callable=PropertyMock) as mock_inspect:
103+
mock_inspect.return_value.orphaned_files = lambda x: {file_that_does_not_exist}
104+
with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete:
105+
tbl.delete_orphaned_files()
106+
mock_delete.assert_called_with(file_that_does_not_exist)

0 commit comments

Comments
 (0)