Skip to content

Commit beec233

Browse files
committed
refactor into all_known_files
1 parent b09641b commit beec233

1 file changed

Lines changed: 35 additions & 15 deletions

File tree

pyiceberg/table/inspect.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,39 @@ def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] =
666666
)
667667
return pa.concat_tables(manifests_by_snapshots)
668668

669+
def all_known_files(self) -> dict[str, set[str]]:
670+
"""Get all the known files in the table.
671+
672+
Returns:
673+
dict of {file_type: list of file paths} for each file type.
674+
"""
675+
snapshots = self.tbl.snapshots()
676+
677+
_all_known_files = {}
678+
_all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist())
679+
_all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots}
680+
_all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics}
681+
682+
snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
683+
executor = ExecutorFactory.get_or_create()
684+
files_by_snapshots: Iterator[Set[str]] = executor.map(
685+
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids
686+
)
687+
_all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set())
688+
689+
return _all_known_files
690+
669691
def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]:
692+
"""Get all the orphaned files in the table.
693+
694+
Args:
695+
location: The location to check for orphaned files.
696+
older_than: The time period to check for orphaned files. Defaults to 3 days.
697+
698+
Returns:
699+
A set of orphaned file paths.
700+
701+
"""
670702
try:
671703
import pyarrow as pa # noqa: F401
672704
except ModuleNotFoundError as e:
@@ -676,20 +708,8 @@ def orphaned_files(self, location: str, older_than: Optional[timedelta] = timede
676708

677709
from pyiceberg.io.pyarrow import _fs_from_file_path
678710

679-
all_known_files = set()
680-
snapshots = self.tbl.snapshots()
681-
snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
682-
683-
all_known_files.update(self.all_manifests(snapshots)["path"].to_pylist())
684-
all_known_files.update([snapshot.manifest_list for snapshot in snapshots])
685-
all_known_files.update([statistic.statistics_path for statistic in self.tbl.metadata.statistics])
686-
687-
executor = ExecutorFactory.get_or_create()
688-
files_by_snapshots: Iterator[Set[str]] = executor.map(
689-
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids
690-
)
691-
datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set())
692-
all_known_files.update(datafile_paths)
711+
all_known_files = self.all_known_files()
712+
flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set())
693713

694714
fs = _fs_from_file_path(self.tbl.io, location)
695715

@@ -701,6 +721,6 @@ def orphaned_files(self, location: str, older_than: Optional[timedelta] = timede
701721
f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))
702722
]
703723

704-
orphaned_files = set(all_files).difference(all_known_files)
724+
orphaned_files = set(all_files).difference(flat_known_files)
705725

706726
return orphaned_files

0 commit comments

Comments
 (0)