|
14 | 14 | # KIND, either express or implied. See the License for the |
15 | 15 | # specific language governing permissions and limitations |
16 | 16 | # under the License. |
17 | | -from typing import Iterator, Optional |
| 17 | +from typing import Callable, Iterator, Optional |
18 | 18 |
|
19 | 19 | from pyiceberg.exceptions import ValidationException |
20 | 20 | from pyiceberg.expressions import BooleanExpression |
21 | 21 | from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator |
22 | | -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile |
| 22 | +from pyiceberg.manifest import DataFile, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile |
23 | 23 | from pyiceberg.table import Table |
24 | 24 | from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between |
25 | 25 | from pyiceberg.typedef import Record |
26 | 26 |
|
27 | 27 | VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} |
28 | 28 |
|
29 | 29 |
|
| 30 | +class ManifestEntryEvaluator: |
| 31 | + """Evaluate manifests against given conditions.""" |
| 32 | + |
| 33 | + def __init__( |
| 34 | + self, |
| 35 | + manifests: list[ManifestFile], |
| 36 | + table: Table, |
| 37 | + snapshot_ids: Optional[set[int]] = None, |
| 38 | + status: Optional[ManifestEntryStatus] = None, |
| 39 | + data_filter: Optional[BooleanExpression] = None, |
| 40 | + partition_set: Optional[dict[int, set[Record]]] = None, |
| 41 | + ): |
| 42 | + self.manifests = manifests |
| 43 | + self.table = table |
| 44 | + self.snapshot_ids = snapshot_ids if snapshot_ids is not None else [] |
| 45 | + self.status = status |
| 46 | + self.partition_set = partition_set |
| 47 | + |
| 48 | + if data_filter is not None: |
| 49 | + self.metrics_evaluator: Optional[Callable[[DataFile], bool]] = _InclusiveMetricsEvaluator( |
| 50 | + self.table.schema(), data_filter |
| 51 | + ).eval |
| 52 | + else: |
| 53 | + self.metrics_evaluator = None |
| 54 | + |
| 55 | + def _evaluate_snapshot_ids(self, entry: ManifestEntry) -> bool: |
| 56 | + """Check if the entry's snapshot ID is in the filter set.""" |
| 57 | + if entry.snapshot_id in self.snapshot_ids: |
| 58 | + return True |
| 59 | + else: |
| 60 | + return False |
| 61 | + |
| 62 | + def _evaluate_status(self, entry: ManifestEntry) -> bool: |
| 63 | + """Check if the entry's status matches the filter.""" |
| 64 | + if self.status is None or entry.status == self.status: |
| 65 | + return True |
| 66 | + else: |
| 67 | + return False |
| 68 | + |
| 69 | + def _evaluate_data_filter(self, entry: ManifestEntry) -> bool: |
| 70 | + """Check if the entry's data file matches the data filter.""" |
| 71 | + if self.metrics_evaluator is None: |
| 72 | + return True |
| 73 | + if self.metrics_evaluator(entry.data_file) is ROWS_CANNOT_MATCH: |
| 74 | + return False |
| 75 | + return True |
| 76 | + |
| 77 | + def _evaluate_partition_set(self, entry: ManifestEntry) -> bool: |
| 78 | + """Check if the entry's partition matches the partition set.""" |
| 79 | + if self.partition_set is None: |
| 80 | + return True |
| 81 | + spec_id = entry.data_file.spec_id |
| 82 | + partition = entry.data_file.partition |
| 83 | + if spec_id not in self.partition_set or partition not in self.partition_set[spec_id]: |
| 84 | + return False |
| 85 | + return True |
| 86 | + |
| 87 | + def evaluate(self) -> Iterator[ManifestEntry]: |
| 88 | + """Evaluate the manifests against the given conditions.""" |
| 89 | + for manifest in self.manifests: |
| 90 | + for entry in manifest.fetch_manifest_entry(self.table.io, discard_deleted=False): |
| 91 | + if not self._evaluate_snapshot_ids(entry): |
| 92 | + continue |
| 93 | + |
| 94 | + if not self._evaluate_status(entry): |
| 95 | + continue |
| 96 | + |
| 97 | + if not self._evaluate_data_filter(entry): |
| 98 | + continue |
| 99 | + |
| 100 | + if not self._evaluate_partition_set(entry): |
| 101 | + continue |
| 102 | + |
| 103 | + yield entry |
| 104 | + |
| 105 | + |
30 | 106 | def validation_history( |
31 | 107 | table: Table, |
32 | 108 | from_snapshot: Snapshot, |
@@ -108,27 +184,16 @@ def _deleted_data_files( |
108 | 184 | ManifestContent.DATA, |
109 | 185 | ) |
110 | 186 |
|
111 | | - if data_filter is not None: |
112 | | - evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval |
113 | | - |
114 | | - for manifest in manifests: |
115 | | - for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): |
116 | | - if entry.snapshot_id not in snapshot_ids: |
117 | | - continue |
118 | | - |
119 | | - if entry.status != ManifestEntryStatus.DELETED: |
120 | | - continue |
121 | | - |
122 | | - if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH: |
123 | | - continue |
124 | | - |
125 | | - if partition_set is not None: |
126 | | - spec_id = entry.data_file.spec_id |
127 | | - partition = entry.data_file.partition |
128 | | - if spec_id not in partition_set or partition not in partition_set[spec_id]: |
129 | | - continue |
| 187 | + manifest_evaluator = ManifestEntryEvaluator( |
| 188 | + manifests, |
| 189 | + table, |
| 190 | + snapshot_ids, |
| 191 | + ManifestEntryStatus.DELETED, |
| 192 | + data_filter, |
| 193 | + partition_set, |
| 194 | + ) |
130 | 195 |
|
131 | | - yield entry |
| 196 | + yield from manifest_evaluator.evaluate() |
132 | 197 |
|
133 | 198 |
|
134 | 199 | def _validate_deleted_data_files( |
|
0 commit comments