feat: validate_deleted_data_files#1938
Conversation
| if entry.snapshot_id not in new_snapshot_ids: | ||
| continue | ||
|
|
||
| if entry.status != ManifestEntryStatus.DELETED: | ||
| continue | ||
|
|
||
| if data_filter is not None and not evaluator(entry.data_file): | ||
| continue | ||
|
|
||
| if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: | ||
| continue |
There was a problem hiding this comment.
probably cleaner to put these each on a line and wrap in an any call
validate_deleted_data_filesvalidate_deleted_data_files
| parent_snapshot: Optional[Snapshot], | ||
| partition_set: Optional[set[Record]], |
There was a problem hiding this comment.
This looks out of order:
| parent_snapshot: Optional[Snapshot], | |
| partition_set: Optional[set[Record]], | |
| partition_set: Optional[set[Record]], | |
| parent_snapshot: Optional[Snapshot], |
It looks like the function call in validate_deleted_data_files is assuming that parent_snapshot is the last argument
conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
There was a problem hiding this comment.
I think this needs some more work. Next to @sungwy's comment, we in the code below:
(entry.data_file.spec_id, entry.data_file.partition) not in partition_setWhere it checks if a tuple is in a partition_set, but the partition_set only contains the Record according to the signature.
This triggered me, because if you do:
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);
-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);Both of the partitioning strategies will produce a Record[int] because it will contain the number of days since epoch. But the meaning is completely different.
| starting_snapshot, | ||
| parent_snapshot, |
There was a problem hiding this comment.
This looks out of order to, because validation_history has to_snapshot as the second argument, and from_snapshot as the third argument.
| starting_snapshot, | |
| parent_snapshot, | |
| starting_snapshot, | |
| parent_snapshot, |
Do you think it would be better to update validation_history function to use the following function signature instead? I think it's a lot more expected to have from_snapshot then to_snapshot
def validation_history(
table: Table,
from_snapshot: Snapshot,
to_snapshot: Snapshot,
matching_operations: set[Operation],
manifest_content_filter: ManifestContent,
)
There was a problem hiding this comment.
Could you clarify a little more? Are you saying we should move to the terms starting_snapshot (which is from_snapshot) and parent_snapshot (to_snapshot)
| ) | ||
|
|
||
| if data_filter is not None: | ||
| evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval |
There was a problem hiding this comment.
I'm not too sure if this is correct, because ManifestGroup.entries seems to be using inclusive projection.
Should we be using inclusive_projection here instead?
Summoning @Fokko for a second review
There was a problem hiding this comment.
Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns rows_might_match and rows_cannot_match. If they cannot be matched, then we can skip it :)
Fokko
left a comment
There was a problem hiding this comment.
Thanks for following up on the previous PR @jayceslesar. This looks like a great start, but there are some missing elements. It would be good to add some more tests as well, since this is pretty critical code since it might affect correctness.
| parent_snapshot: Optional[Snapshot], | ||
| partition_set: Optional[set[Record]], |
There was a problem hiding this comment.
I think this needs some more work. Next to @sungwy's comment, we in the code below:
(entry.data_file.spec_id, entry.data_file.partition) not in partition_setWhere it checks if a tuple is in a partition_set, but the partition_set only contains the Record according to the signature.
This triggered me, because if you do:
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);
-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);Both of the partitioning strategies will produce a Record[int] because it will contain the number of days since epoch. But the meaning is completely different.
| ) | ||
|
|
||
| if data_filter is not None: | ||
| evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval |
There was a problem hiding this comment.
Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns rows_might_match and rows_cannot_match. If they cannot be matched, then we can skip it :)
Co-authored-by: Fokko Driesprong <fokko@apache.org>
sungwy
left a comment
There was a problem hiding this comment.
Hi @jayceslesar - thanks for adopting all of the review feedback!
I think this is good to merge, but I'd prefer to make validate_deleted_data_files a private function
|
Waiting for the CI to pass |
| parent_snapshot: Ending snapshot on the branch being validated | ||
|
|
||
| """ | ||
| conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) |
There was a problem hiding this comment.
nit, I think style wise it is more elegant to switch over to keyword-arguments:
| conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) | |
| conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, parent_snapshot=parent_snapshot) |
Fokko
left a comment
There was a problem hiding this comment.
Looks good, thanks @jayceslesar for working on this, and thanks @sungwy for the review 🙌
Closes apache#1928 # Rationale for this change Add `validate_deleted_data_files` which depends on apache#1935 # Are these changes tested? Added a test! # References Java `deletedDataFiles` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L678 Java `ManifestGroup.entries` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/ManifestGroup.java#L242 --------- Co-authored-by: Fokko Driesprong <fokko@apache.org>
Closes apache#1928 # Rationale for this change Add `validate_deleted_data_files` which depends on apache#1935 # Are these changes tested? Added a test! # References Java `deletedDataFiles` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L678 Java `ManifestGroup.entries` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/ManifestGroup.java#L242 --------- Co-authored-by: Fokko Driesprong <fokko@apache.org>
Closes #1928
Rationale for this change
Add
validate_deleted_data_fileswhich depends on #1935Are these changes tested?
Added a test!
References
Java
deletedDataFilesimpl:https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L678
Java
ManifestGroup.entriesimpl:https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/ManifestGroup.java#L242