Skip to content

Commit 4ed0607

Browse files
committed
Fixed bugs and added more tests
1 parent 49f75b4 commit 4ed0607

3 files changed

Lines changed: 90 additions & 57 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ def dynamic_partition_overwrite(
541541

542542
partitions_to_overwrite = {data_file.partition for data_file in data_files}
543543
delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
544-
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
544+
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
545545

546546
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
547547
append_files.commit_uuid = append_snapshot_commit_uuid

pyiceberg/table/update/snapshot.py

Lines changed: 60 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,12 @@ def __init__(
131131

132132
def _set_target_branch(self, branch: str) -> None:
133133
# Default is already set to MAIN_BRANCH. So branch name can't be None.
134-
assert branch is not None, ValueError("Invalid branch name: null")
134+
assert branch is not None, "Invalid branch name: null"
135135
if branch in self._transaction.table_metadata.refs:
136136
ref = self._transaction.table_metadata.refs[branch]
137-
assert ref.snapshot_ref_type == SnapshotRefType.BRANCH, ValueError(
138-
f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots"
139-
)
137+
assert (
138+
ref.snapshot_ref_type == SnapshotRefType.BRANCH
139+
), f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots"
140140
self._target_branch = branch
141141

142142
def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
@@ -293,7 +293,7 @@ def _commit(self) -> UpdatesAndRequirements:
293293
AssertRefSnapshotId(
294294
snapshot_id=self._transaction.table_metadata.refs[self._target_branch].snapshot_id
295295
if self._target_branch in self._transaction.table_metadata.refs
296-
else self._transaction.table_metadata.current_snapshot_id,
296+
else None,
297297
ref=self._target_branch,
298298
),
299299
),
@@ -407,46 +407,52 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
407407
total_deleted_entries = []
408408
partial_rewrites_needed = False
409409
self._deleted_data_files = set()
410-
if snapshot := self._transaction.table_metadata.current_snapshot():
411-
for manifest_file in snapshot.manifests(io=self._io):
412-
if manifest_file.content == ManifestContent.DATA:
413-
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
414-
# If the manifest isn't relevant, we can just keep it in the manifest-list
415-
existing_manifests.append(manifest_file)
416-
else:
417-
# It is relevant, let's check out the content
418-
deleted_entries = []
419-
existing_entries = []
420-
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
421-
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
422-
# Based on the metadata, it can be dropped right away
423-
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
424-
self._deleted_data_files.add(entry.data_file)
425-
else:
426-
# Based on the metadata, we cannot determine if it can be deleted
427-
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
428-
if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
429-
partial_rewrites_needed = True
430-
431-
if len(deleted_entries) > 0:
432-
total_deleted_entries += deleted_entries
433-
434-
# Rewrite the manifest
435-
if len(existing_entries) > 0:
436-
with write_manifest(
437-
format_version=self._transaction.table_metadata.format_version,
438-
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
439-
schema=self._transaction.table_metadata.schema(),
440-
output_file=self.new_manifest_output(),
441-
snapshot_id=self._snapshot_id,
442-
) as writer:
443-
for existing_entry in existing_entries:
444-
writer.add_entry(existing_entry)
445-
existing_manifests.append(writer.to_manifest_file())
446-
else:
410+
411+
# Determine the snapshot to read manifests from for deletion
412+
# Should be the current tip of the _target_branch
413+
parent_snapshot_id_for_delete_source = self._parent_snapshot_id
414+
if parent_snapshot_id_for_delete_source is not None:
415+
snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source)
416+
if snapshot: # Ensure snapshot is found
417+
for manifest_file in snapshot.manifests(io=self._io):
418+
if manifest_file.content == ManifestContent.DATA:
419+
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
420+
# If the manifest isn't relevant, we can just keep it in the manifest-list
447421
existing_manifests.append(manifest_file)
448-
else:
449-
existing_manifests.append(manifest_file)
422+
else:
423+
# It is relevant, let's check out the content
424+
deleted_entries = []
425+
existing_entries = []
426+
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
427+
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
428+
# Based on the metadata, it can be dropped right away
429+
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
430+
self._deleted_data_files.add(entry.data_file)
431+
else:
432+
# Based on the metadata, we cannot determine if it can be deleted
433+
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
434+
if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
435+
partial_rewrites_needed = True
436+
437+
if len(deleted_entries) > 0:
438+
total_deleted_entries += deleted_entries
439+
440+
# Rewrite the manifest
441+
if len(existing_entries) > 0:
442+
with write_manifest(
443+
format_version=self._transaction.table_metadata.format_version,
444+
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
445+
schema=self._transaction.table_metadata.schema(),
446+
output_file=self.new_manifest_output(),
447+
snapshot_id=self._snapshot_id,
448+
) as writer:
449+
for existing_entry in existing_entries:
450+
writer.add_entry(existing_entry)
451+
existing_manifests.append(writer.to_manifest_file())
452+
else:
453+
existing_manifests.append(manifest_file)
454+
else:
455+
existing_manifests.append(manifest_file)
450456

451457
return existing_manifests, total_deleted_entries, partial_rewrites_needed
452458

@@ -575,19 +581,17 @@ def _existing_manifests(self) -> List[ManifestFile]:
575581
output_file=self.new_manifest_output(),
576582
snapshot_id=self._snapshot_id,
577583
) as writer:
578-
[
579-
writer.add_entry(
580-
ManifestEntry.from_args(
581-
status=ManifestEntryStatus.EXISTING,
582-
snapshot_id=entry.snapshot_id,
583-
sequence_number=entry.sequence_number,
584-
file_sequence_number=entry.file_sequence_number,
585-
data_file=entry.data_file,
584+
for entry in entries:
585+
if entry.data_file not in found_deleted_data_files:
586+
writer.add_entry(
587+
ManifestEntry.from_args(
588+
status=ManifestEntryStatus.EXISTING,
589+
snapshot_id=entry.snapshot_id,
590+
sequence_number=entry.sequence_number,
591+
file_sequence_number=entry.file_sequence_number,
592+
data_file=entry.data_file,
593+
)
586594
)
587-
)
588-
for entry in entries
589-
if entry.data_file not in found_deleted_data_files
590-
]
591595
existing_files.append(writer.to_manifest_file())
592596
return existing_files
593597

tests/integration/test_deletes.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,3 +894,32 @@ def test_overwrite_with_filter_case_insensitive(test_table: Table) -> None:
894894
test_table.overwrite(df=new_table, overwrite_filter=f"Idx == {record_to_overwrite['idx']}", case_sensitive=False)
895895
assert record_to_overwrite not in test_table.scan().to_arrow().to_pylist()
896896
assert new_record_to_insert in test_table.scan().to_arrow().to_pylist()
897+
898+
899+
@pytest.mark.integration
900+
@pytest.mark.parametrize("format_version", [1, 2])
901+
@pytest.mark.filterwarnings("ignore:Delete operation did not match any records")
902+
def test_delete_on_empty_table(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
903+
identifier = f"default.test_delete_on_empty_table_{format_version}"
904+
905+
run_spark_commands(
906+
spark,
907+
[
908+
f"DROP TABLE IF EXISTS {identifier}",
909+
f"""
910+
CREATE TABLE {identifier} (
911+
volume int
912+
)
913+
USING iceberg
914+
TBLPROPERTIES('format-version' = {format_version})
915+
""",
916+
],
917+
)
918+
919+
tbl = session_catalog.load_table(identifier)
920+
921+
# Perform a delete operation on the empty table
922+
tbl.delete(AlwaysTrue())
923+
924+
# Assert that no new snapshot was created because no rows were deleted
925+
assert len(tbl.snapshots()) == 0

0 commit comments

Comments
 (0)