-
Notifications
You must be signed in to change notification settings - Fork 481
feat: validate_deleted_data_files
#1938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
beff92b
c369720
41bb8a4
763e9f4
f200beb
7f6bf9d
c63cc55
f2f3a88
74d5569
167f9e4
efe50b4
0793713
89120aa
a39abd2
cf5b061
645e8df
caf7e36
a52c422
d3df4a7
9eca29f
fc83d42
ee3959a
a51c2da
ec86695
0a6b781
a96da01
03b2913
aae22e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,11 +14,17 @@ | |||||||||
| # KIND, either express or implied. See the License for the | ||||||||||
| # specific language governing permissions and limitations | ||||||||||
| # under the License. | ||||||||||
| from typing import Iterator, Optional | ||||||||||
|
|
||||||||||
| from pyiceberg.exceptions import ValidationException | ||||||||||
| from pyiceberg.manifest import ManifestContent, ManifestFile | ||||||||||
| from pyiceberg.expressions import BooleanExpression | ||||||||||
| from pyiceberg.expressions.visitors import _StrictMetricsEvaluator | ||||||||||
| from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile | ||||||||||
| from pyiceberg.table import Table | ||||||||||
| from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between | ||||||||||
| from pyiceberg.typedef import Record | ||||||||||
|
|
||||||||||
| VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def validation_history( | ||||||||||
|
|
@@ -69,3 +75,74 @@ def validation_history( | |||||||||
| raise ValidationException("No matching snapshot found.") | ||||||||||
|
|
||||||||||
| return manifests_files, snapshots | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def deleted_data_files( | ||||||||||
| table: Table, | ||||||||||
| starting_snapshot: Snapshot, | ||||||||||
| data_filter: Optional[BooleanExpression], | ||||||||||
| parent_snapshot: Optional[Snapshot], | ||||||||||
| partition_set: Optional[set[Record]], | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks out of order:
Suggested change
It looks like the function call in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs some more work. Next to @sungwy's comment, we in the code below: (entry.data_file.spec_id, entry.data_file.partition) not in partition_setWhere it checks if a tuple is in a This triggered me, because if you do: ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);
-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);Both of the partitioning strategies will produce a |
||||||||||
| ) -> Iterator[ManifestEntry]: | ||||||||||
| """Find deleted data files matching a filter since a starting snapshot. | ||||||||||
|
|
||||||||||
| Args: | ||||||||||
| table: Table to validate | ||||||||||
| starting_snapshot: Snapshot current at the start of the operation | ||||||||||
| data_filter: Expression used to find deleted data files | ||||||||||
| partition_set: a set of partitions to find deleted data files | ||||||||||
| parent_snapshot: Ending snapshot on the branch being validated | ||||||||||
|
|
||||||||||
| Returns: | ||||||||||
| List of deleted data files matching the filter | ||||||||||
|
jayceslesar marked this conversation as resolved.
Outdated
|
||||||||||
| """ | ||||||||||
| # if there is no current table state, no files have been deleted | ||||||||||
| if parent_snapshot is None: | ||||||||||
| return | ||||||||||
|
|
||||||||||
| manifests, snapshot_ids = validation_history( | ||||||||||
| table, | ||||||||||
| starting_snapshot, | ||||||||||
| parent_snapshot, | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks out of order to, because
Suggested change
Do you think it would be better to update
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you clarify a little more? Are you saying we should move to the terms |
||||||||||
| VALIDATE_DATA_FILES_EXIST_OPERATIONS, | ||||||||||
| ManifestContent.DATA, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| if data_filter is not None: | ||||||||||
| evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not too sure if this is correct, because Should we be using Summoning @Fokko for a second review
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns |
||||||||||
|
|
||||||||||
| for manifest in manifests: | ||||||||||
| for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): | ||||||||||
| if entry.snapshot_id not in snapshot_ids: | ||||||||||
| continue | ||||||||||
|
|
||||||||||
| if entry.status != ManifestEntryStatus.DELETED: | ||||||||||
| continue | ||||||||||
|
|
||||||||||
| if data_filter is not None and not evaluator(entry.data_file): | ||||||||||
|
Fokko marked this conversation as resolved.
Outdated
|
||||||||||
| continue | ||||||||||
|
|
||||||||||
| if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: | ||||||||||
| continue | ||||||||||
|
|
||||||||||
| yield entry | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def validate_deleted_data_files( | ||||||||||
|
Fokko marked this conversation as resolved.
Outdated
|
||||||||||
| table: Table, | ||||||||||
| starting_snapshot: Snapshot, | ||||||||||
| data_filter: Optional[BooleanExpression], | ||||||||||
| parent_snapshot: Snapshot, | ||||||||||
| ) -> None: | ||||||||||
| """Validate that no files matching a filter have been deleted from the table since a starting snapshot. | ||||||||||
|
|
||||||||||
| Args: | ||||||||||
| table: Table to validate | ||||||||||
| starting_snapshot: Snapshot current at the start of the operation | ||||||||||
| data_filter: Expression used to find deleted data files | ||||||||||
| parent_snapshot: Ending snapshot on the branch being validated | ||||||||||
|
|
||||||||||
| """ | ||||||||||
| conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) | ||||||||||
| if any(conflicting_entries): | ||||||||||
| raise ValidationException("Deleted data files were found matching the filter.") | ||||||||||
|
jayceslesar marked this conversation as resolved.
Outdated
|
||||||||||
Uh oh!
There was an error while loading. Please reload this page.