Skip to content

Commit dfeb2c5

Browse files
perf(add_files): stream manifest entries for duplicate-files check
1 parent cdb625f commit dfeb2c5

1 file changed

Lines changed: 35 additions & 5 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,40 @@ def upsert(
861861

862862
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
863863

864+
def _referenced_data_files(self, file_paths: list[str]) -> list[str]:
865+
"""Find any file_paths already referenced by data files in the current snapshot.
866+
867+
Streams ManifestEntries from each data manifest and short-circuits on
868+
file_path equality against the candidate set. Avoids the full
869+
``inspect.data_files()`` materialization, which builds a pyarrow Table
870+
with every column (including readable_metrics, lower/upper bounds,
871+
partition records) for every DataFile in the snapshot — an expensive
872+
cost dominated by stats decoding that is not needed for path equality.
873+
874+
Returns the file_paths that are already referenced; empty list if none.
875+
"""
876+
snapshot = self.table_metadata.current_snapshot()
877+
if snapshot is None or not file_paths:
878+
return []
879+
880+
candidates = set(file_paths)
881+
io = self._table.io
882+
883+
def _scan(manifest: ManifestFile) -> list[str]:
884+
if manifest.content != ManifestContent.DATA:
885+
return []
886+
return [
887+
entry.data_file.file_path
888+
for entry in manifest.fetch_manifest_entry(io, discard_deleted=True)
889+
if entry.data_file.file_path in candidates
890+
]
891+
892+
executor = ExecutorFactory.get_or_create()
893+
matches: list[str] = []
894+
for partial in executor.map(_scan, snapshot.manifests(io)):
895+
matches.extend(partial)
896+
return matches
897+
864898
def add_files(
865899
self,
866900
file_paths: list[str],
@@ -883,11 +917,7 @@ def add_files(
883917
raise ValueError("File paths must be unique")
884918

885919
if check_duplicate_files:
886-
import pyarrow.compute as pc
887-
888-
expr = pc.field("file_path").isin(file_paths)
889-
referenced_files = [file["file_path"] for file in self._table.inspect.data_files().filter(expr).to_pylist()]
890-
920+
referenced_files = self._referenced_data_files(file_paths)
891921
if referenced_files:
892922
raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")
893923

0 commit comments

Comments
 (0)