Skip to content

Commit 75b1240

Browse files
committed
include dry run and older than
1 parent 8cca600 commit 75b1240

3 files changed

Lines changed: 26 additions & 14 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import warnings
2525
from abc import ABC, abstractmethod
2626
from dataclasses import dataclass
27+
from datetime import timedelta
2728
from functools import cached_property
2829
from itertools import chain
2930
from types import TracebackType
@@ -1375,10 +1376,10 @@ def to_polars(self) -> pl.LazyFrame:
13751376

13761377
return pl.scan_iceberg(self)
13771378

1378-
def delete_orphaned_files(self) -> None:
1379+
def delete_orphaned_files(self, older_than: Optional[timedelta] = timedelta(days=3), dry_run: bool = False) -> None:
13791380
"""Delete orphaned files in the table."""
13801381
location = self.location()
1381-
orphaned_files = self.inspect.orphaned_files(location)
1382+
orphaned_files = self.inspect.orphaned_files(location, older_than)
13821383
logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!")
13831384

13841385
def _delete(file: str) -> None:
@@ -1388,11 +1389,14 @@ def _delete(file: str) -> None:
13881389
self.io.delete(file)
13891390

13901391
if orphaned_files:
1391-
executor = ExecutorFactory.get_or_create()
1392-
deletes = executor.map(_delete, orphaned_files)
1393-
# exhaust
1394-
list(deletes)
1395-
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!")
1392+
if dry_run:
1393+
logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!")
1394+
else:
1395+
executor = ExecutorFactory.get_or_create()
1396+
deletes = executor.map(_delete, orphaned_files)
1397+
# exhaust
1398+
list(deletes)
1399+
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!")
13961400

13971401

13981402
class StaticTable(Table):

pyiceberg/table/inspect.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
from datetime import datetime, timezone
19+
from datetime import datetime, timedelta, timezone
2020
from functools import reduce
2121
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast
2222

@@ -666,7 +666,7 @@ def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] =
666666
)
667667
return pa.concat_tables(manifests_by_snapshots)
668668

669-
def orphaned_files(self, location: str) -> Set[str]:
669+
def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]:
670670
try:
671671
import pyarrow as pa # noqa: F401
672672
except ModuleNotFoundError as e:
@@ -692,9 +692,10 @@ def orphaned_files(self, location: str) -> Set[str]:
692692

693693
_, _, path = _parse_location(location)
694694
selector = FileSelector(path, recursive=True)
695-
# filter to just files as it may return directories
696-
all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File]
695+
# filter to just files as it may return directories, and filter on time
696+
as_of = datetime.now(timezone.utc) - older_than if older_than else None
697+
all_files = [f for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))]
697698

698-
orphaned_files = set(all_files).difference(set(all_known_files))
699+
orphaned_files = set(all_files).difference(all_known_files)
699700

700701
return orphaned_files

tests/table/test_delete_orphans.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
import os
18+
from datetime import datetime, timedelta
1719
from pathlib import Path, PosixPath
1820
from unittest.mock import PropertyMock, patch
1921

@@ -66,8 +68,13 @@ def test_delete_orphaned_files(catalog: Catalog) -> None:
6668
orphaned_file.touch()
6769
assert orphaned_file.exists()
6870

71+
# should not delete because it was just created...
6972
tbl.delete_orphaned_files()
70-
assert not orphaned_file.exists()
73+
assert orphaned_file.exists()
74+
75+
# modify creation date to be older than 3 days
76+
five_days_ago = (datetime.now() - timedelta(days=5)).timestamp()
77+
os.utime(orphaned_file, (five_days_ago, five_days_ago))
7178

7279

7380
def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None:
@@ -100,7 +107,7 @@ def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog)
100107

101108
file_that_does_not_exist = "foo/bar.baz"
102109
with patch.object(type(tbl), "inspect", new_callable=PropertyMock) as mock_inspect:
103-
mock_inspect.return_value.orphaned_files = lambda x: {file_that_does_not_exist}
110+
mock_inspect.return_value.orphaned_files = lambda location, older_than: {file_that_does_not_exist}
104111
with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete:
105112
tbl.delete_orphaned_files()
106113
mock_delete.assert_called_with(file_that_does_not_exist)

0 commit comments

Comments
 (0)