Skip to content

Commit 89120aa

Browse files
committed
[wip] feat: validate_deleted_data_files
1 parent 0793713 commit 89120aa

1 file changed

Lines changed: 80 additions & 2 deletions

File tree

pyiceberg/table/update/validate.py

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,16 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import Optional
17+
from typing import Iterator, Optional
1818

19-
from pyiceberg.manifest import ManifestContent, ManifestFile
19+
from pyiceberg.expressions import BooleanExpression
20+
from pyiceberg.expressions.visitors import _StrictMetricsEvaluator
21+
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
2022
from pyiceberg.table import Table
2123
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
24+
from pyiceberg.typedef import Record
25+
26+
VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
2227

2328

2429
class ValidationException(Exception):
@@ -70,3 +75,76 @@ def validation_history(
7075
raise ValidationException("No matching snapshot found.")
7176

7277
return manifests_files, snapshots
78+
79+
80+
def deleted_data_files(
81+
table: Table,
82+
starting_snapshot: Snapshot,
83+
data_filter: Optional[BooleanExpression],
84+
parent_snapshot: Optional[Snapshot],
85+
partition_set: Optional[set[Record]],
86+
) -> Iterator[ManifestEntry]:
87+
"""Find deleted data files matching a filter since a starting snapshot.
88+
89+
Args:
90+
table: Table to validate
91+
starting_snapshot: Snapshot current at the start of the operation
92+
data_filter: Expression used to find deleted data files
93+
partition_set: a set of partitions to find deleted data files
94+
parent_snapshot: Ending snapshot on the branch being validated
95+
96+
Returns:
97+
List of deleted data files matching the filter
98+
"""
99+
# if there is no current table state, no files have been deleted
100+
if parent_snapshot is None:
101+
return
102+
103+
manifests, new_snapshots = validation_history(
104+
table,
105+
starting_snapshot,
106+
parent_snapshot,
107+
VALIDATE_DATA_FILES_EXIST_OPERATIONS,
108+
ManifestContent.DATA,
109+
)
110+
111+
if data_filter is not None:
112+
evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval
113+
114+
new_snapshot_ids = {s.snapshot_id for s in new_snapshots}
115+
116+
for manifest in manifests:
117+
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
118+
if entry.snapshot_id not in new_snapshot_ids:
119+
continue
120+
121+
if entry.status != ManifestEntryStatus.DELETED:
122+
continue
123+
124+
if data_filter is not None and not evaluator(entry.data_file):
125+
continue
126+
127+
if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set:
128+
continue
129+
130+
yield entry
131+
132+
133+
def validate_deleted_data_files(
134+
table: Table,
135+
starting_snapshot: Snapshot,
136+
data_filter: Optional[BooleanExpression],
137+
parent_snapshot: Snapshot,
138+
) -> None:
139+
"""Validate that no files matching a filter have been deleted from the table since a starting snapshot.
140+
141+
Args:
142+
table: Table to validate
143+
starting_snapshot: Snapshot current at the start of the operation
144+
data_filter: Expression used to find deleted data files
145+
parent_snapshot: Ending snapshot on the branch being validated
146+
147+
"""
148+
conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
149+
if any(conflicting_entries):
150+
raise ValidationException("Deleted data files were found matching the filter.")

0 commit comments

Comments
 (0)