|
14 | 14 | # KIND, either express or implied. See the License for the |
15 | 15 | # specific language governing permissions and limitations |
16 | 16 | # under the License. |
17 | | -from typing import Iterator, Optional |
| 17 | +from typing import Iterator, Optional, Set |
18 | 18 |
|
19 | 19 | from pyiceberg.exceptions import ValidationException |
20 | 20 | from pyiceberg.expressions import BooleanExpression |
|
24 | 24 | from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between |
25 | 25 | from pyiceberg.typedef import Record |
26 | 26 |
|
27 | | -VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} |
| 27 | +VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} |
| 28 | +VALIDATE_ADDED_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE} |
28 | 29 |
|
29 | 30 |
|
30 | 31 | def validation_history( |
@@ -150,3 +151,53 @@ def _validate_deleted_data_files( |
150 | 151 | if any(conflicting_entries): |
151 | 152 | conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries} |
152 | 153 | raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!") |
| 154 | + |
| 155 | +def _added_data_files( |
| 156 | + table: Table, |
| 157 | + starting_snapshot: Snapshot, |
| 158 | + data_filter: Optional[BooleanExpression], |
| 159 | + partition_set: Optional[dict[int, set[Record]]], |
| 160 | + parent_snapshot: Optional[Snapshot], |
| 161 | +) -> Iterator[ManifestEntry]: |
| 162 | + if parent_snapshot is None: |
| 163 | + return |
| 164 | + |
| 165 | + manifests, snapshot_ids = validation_history( |
| 166 | + table, |
| 167 | + parent_snapshot, |
| 168 | + starting_snapshot, |
| 169 | + VALIDATE_ADDED_FILES_OPERATIONS, |
| 170 | + ManifestContent.DATA, |
| 171 | + ) |
| 172 | + |
| 173 | + if data_filter is not None: |
| 174 | + evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter) |
| 175 | + |
| 176 | + for manifest in manifests: |
| 177 | + for entry in manifest.fetch_manifest_entry(table.io): |
| 178 | + if entry.snapshot_id not in snapshot_ids: |
| 179 | + continue |
| 180 | + |
| 181 | + if data_filter and evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH: |
| 182 | + continue |
| 183 | + |
| 184 | + if partition_set is not None: |
| 185 | + partition = entry.data_file.partition |
| 186 | + spec_id = entry.data_file.spec_id |
| 187 | + if spec_id not in partition_set or partition not in partition_set[spec_id]: |
| 188 | + continue |
| 189 | + |
| 190 | + yield entry |
| 191 | + |
| 192 | + |
| 193 | +def validate_added_data_files( |
| 194 | + table: Table, |
| 195 | + starting_snapshot: Snapshot, |
| 196 | + data_filter: Optional[BooleanExpression], |
| 197 | + parent_snapshot: Optional[Snapshot], |
| 198 | +) -> None: |
| 199 | + conflicting_entries = _added_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) |
| 200 | + |
| 201 | + if any(conflicting_entries): |
| 202 | + conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None} |
| 203 | + raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!") |
0 commit comments