Skip to content

Commit c12c1c2

Browse files
refactor(add_files): reuse _open_manifest + chain.from_iterable
1 parent dfeb2c5 commit c12c1c2

1 file changed

Lines changed: 14 additions & 26 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -861,39 +861,27 @@ def upsert(
861861

862862
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
863863

864-
def _referenced_data_files(self, file_paths: list[str]) -> list[str]:
865-
"""Find any file_paths already referenced by data files in the current snapshot.
866-
867-
Streams ManifestEntries from each data manifest and short-circuits on
868-
file_path equality against the candidate set. Avoids the full
869-
``inspect.data_files()`` materialization, which builds a pyarrow Table
870-
with every column (including readable_metrics, lower/upper bounds,
871-
partition records) for every DataFile in the snapshot — an expensive
872-
cost dominated by stats decoding that is not needed for path equality.
873-
874-
Returns the file_paths that are already referenced; empty list if none.
875-
"""
864+
def _find_referenced_data_files(self, file_paths: list[str]) -> list[str]:
865+
"""Return file_paths already referenced by data files in the current snapshot."""
876866
snapshot = self.table_metadata.current_snapshot()
877-
if snapshot is None or not file_paths:
867+
if snapshot is None:
878868
return []
879869

880870
candidates = set(file_paths)
881871
io = self._table.io
872+
data_manifests = [m for m in snapshot.manifests(io) if m.content == ManifestContent.DATA]
882873

883-
def _scan(manifest: ManifestFile) -> list[str]:
884-
if manifest.content != ManifestContent.DATA:
885-
return []
886-
return [
887-
entry.data_file.file_path
888-
for entry in manifest.fetch_manifest_entry(io, discard_deleted=True)
889-
if entry.data_file.file_path in candidates
890-
]
874+
path_filter: Callable[[DataFile], bool] = lambda df: df.file_path in candidates
875+
always_true: Callable[[DataFile], bool] = lambda _: True
891876

892877
executor = ExecutorFactory.get_or_create()
893-
matches: list[str] = []
894-
for partial in executor.map(_scan, snapshot.manifests(io)):
895-
matches.extend(partial)
896-
return matches
878+
entries = chain.from_iterable(
879+
executor.map(
880+
lambda args: _open_manifest(*args),
881+
[(io, manifest, path_filter, always_true) for manifest in data_manifests],
882+
)
883+
)
884+
return [entry.data_file.file_path for entry in entries]
897885

898886
def add_files(
899887
self,
@@ -917,7 +905,7 @@ def add_files(
917905
raise ValueError("File paths must be unique")
918906

919907
if check_duplicate_files:
920-
referenced_files = self._referenced_data_files(file_paths)
908+
referenced_files = self._find_referenced_data_files(file_paths)
921909
if referenced_files:
922910
raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")
923911

0 commit comments

Comments
 (0)