Skip to content

Commit b0a770c

Browse files
committed
refactor: refactor _existing_manifests into _SnapshotProducer for _OverwriteFiles and _RewriteFiles
1 parent 33aaef0 commit b0a770c

1 file changed

Lines changed: 45 additions & 75 deletions

File tree

pyiceberg/table/update/snapshot.py

Lines changed: 45 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,49 @@ def _calculate_added_rows(self, manifests: list[ManifestFile]) -> int:
165165
added_rows += manifest.added_rows_count
166166
return added_rows
167167

168+
def _get_existing_manifests(self) -> list[ManifestFile]:
169+
"""Filters existing manifests and rewrites those containing deleted data files."""
170+
existing_files = []
171+
# Use manifest pruning if a predicate is set (primarily for Overwrite)
172+
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
173+
174+
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
175+
for manifest_file in snapshot.manifests(io=self._io):
176+
# Skip pruning for rewrite operations unless we want to optimize later
177+
if self._operation == Operation.OVERWRITE and not manifest_evaluators[manifest_file.partition_spec_id](
178+
manifest_file
179+
):
180+
existing_files.append(manifest_file)
181+
continue
182+
183+
entries_to_write: list[ManifestEntry] = []
184+
found_deleted_entries = False
185+
186+
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
187+
if entry.data_file in self._deleted_data_files:
188+
found_deleted_entries = True
189+
else:
190+
entries_to_write.append(entry)
191+
192+
if not found_deleted_entries:
193+
existing_files.append(manifest_file)
194+
continue
195+
196+
if len(entries_to_write) > 0:
197+
with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer:
198+
for entry in entries_to_write:
199+
writer.add_entry(
200+
ManifestEntry.from_args(
201+
status=ManifestEntryStatus.EXISTING,
202+
snapshot_id=entry.snapshot_id,
203+
sequence_number=entry.sequence_number,
204+
file_sequence_number=entry.file_sequence_number,
205+
data_file=entry.data_file,
206+
)
207+
)
208+
existing_files.append(writer.to_manifest_file())
209+
return existing_files
210+
168211
@abstractmethod
169212
def _deleted_entries(self) -> list[ManifestEntry]: ...
170213

@@ -585,49 +628,7 @@ class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
585628

586629
def _existing_manifests(self) -> list[ManifestFile]:
587630
"""Determine if there are any existing manifest files."""
588-
existing_files = []
589-
590-
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
591-
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
592-
for manifest_file in snapshot.manifests(io=self._io):
593-
# Manifest does not contain rows that match the files to delete partitions
594-
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
595-
existing_files.append(manifest_file)
596-
continue
597-
598-
entries_to_write: set[ManifestEntry] = set()
599-
found_deleted_entries: set[ManifestEntry] = set()
600-
601-
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
602-
if entry.data_file in self._deleted_data_files:
603-
found_deleted_entries.add(entry)
604-
else:
605-
entries_to_write.add(entry)
606-
607-
# Is the intercept the empty set?
608-
if len(found_deleted_entries) == 0:
609-
existing_files.append(manifest_file)
610-
continue
611-
612-
# Delete all files from manifest
613-
if len(entries_to_write) == 0:
614-
continue
615-
616-
# We have to rewrite the manifest file without the deleted data files
617-
with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer:
618-
for entry in entries_to_write:
619-
writer.add_entry(
620-
ManifestEntry.from_args(
621-
status=ManifestEntryStatus.EXISTING,
622-
snapshot_id=entry.snapshot_id,
623-
sequence_number=entry.sequence_number,
624-
file_sequence_number=entry.file_sequence_number,
625-
data_file=entry.data_file,
626-
)
627-
)
628-
existing_files.append(writer.to_manifest_file())
629-
630-
return existing_files
631+
return self._get_existing_manifests()
631632

632633
def _deleted_entries(self) -> list[ManifestEntry]:
633634
"""To determine if we need to record any deleted entries.
@@ -723,38 +724,7 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
723724

724725
def _existing_manifests(self) -> list[ManifestFile]:
725726
"""To determine if there are any existing manifests."""
726-
existing_files = []
727-
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
728-
for manifest_file in snapshot.manifests(io=self._io):
729-
entries_to_write: set[ManifestEntry] = set()
730-
found_deleted_entries: set[ManifestEntry] = set()
731-
732-
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
733-
if entry.data_file in self._deleted_data_files:
734-
found_deleted_entries.add(entry)
735-
else:
736-
entries_to_write.add(entry)
737-
738-
if len(found_deleted_entries) == 0:
739-
existing_files.append(manifest_file)
740-
continue
741-
742-
if len(entries_to_write) == 0:
743-
continue
744-
745-
with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer:
746-
for entry in entries_to_write:
747-
writer.add_entry(
748-
ManifestEntry.from_args(
749-
status=ManifestEntryStatus.EXISTING,
750-
snapshot_id=entry.snapshot_id,
751-
sequence_number=entry.sequence_number,
752-
file_sequence_number=entry.file_sequence_number,
753-
data_file=entry.data_file,
754-
)
755-
)
756-
existing_files.append(writer.to_manifest_file())
757-
return existing_files
727+
return self._get_existing_manifests()
758728

759729

760730
class UpdateSnapshot:

0 commit comments

Comments
 (0)