|
20 | 20 | from pyiceberg.expressions import BooleanExpression |
21 | 21 | from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator |
22 | 22 | from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile |
| 23 | +from pyiceberg.schema import Schema |
23 | 24 | from pyiceberg.table import Table |
24 | 25 | from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between |
25 | 26 | from pyiceberg.typedef import Record |
@@ -78,6 +79,47 @@ def validation_history( |
78 | 79 | return manifests_files, snapshots |
79 | 80 |
|
80 | 81 |
|
| 82 | +def _filter_manifest_entries( |
| 83 | + entry: ManifestEntry, |
| 84 | + snapshot_ids: set[int], |
| 85 | + data_filter: Optional[BooleanExpression], |
| 86 | + partition_set: Optional[dict[int, set[Record]]], |
| 87 | + entry_status: Optional[ManifestEntryStatus], |
| 88 | + schema: Schema, |
| 89 | +) -> bool: |
| 90 | + """Filter manifest entries based on data filter and partition set. |
| 91 | +
|
| 92 | + Args: |
| 93 | + entry: Manifest entry to filter |
| 94 | + snapshot_ids: set of snapshot ids to match data files |
| 95 | + data_filter: Optional filter to match data files |
| 96 | + partition_set: Optional set of partitions to match data files |
| 97 | + status: Optional status to match data files |
| 98 | + table: Table containing the schema for filtering |
| 99 | +
|
| 100 | + Returns: |
| 101 | + True if the entry should be included, False otherwise |
| 102 | + """ |
| 103 | + if entry.snapshot_id not in snapshot_ids: |
| 104 | + return False |
| 105 | + |
| 106 | + if entry_status is not None and entry.status != entry_status: |
| 107 | + return False |
| 108 | + |
| 109 | + if data_filter is not None: |
| 110 | + evaluator = _InclusiveMetricsEvaluator(schema, data_filter) |
| 111 | + if evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH: |
| 112 | + return False |
| 113 | + |
| 114 | + if partition_set is not None: |
| 115 | + partition = entry.data_file.partition |
| 116 | + spec_id = entry.data_file.spec_id |
| 117 | + if spec_id not in partition_set or partition not in partition_set[spec_id]: |
| 118 | + return False |
| 119 | + |
| 120 | + return True |
| 121 | + |
| 122 | + |
81 | 123 | def _deleted_data_files( |
82 | 124 | table: Table, |
83 | 125 | starting_snapshot: Snapshot, |
@@ -109,27 +151,12 @@ def _deleted_data_files( |
109 | 151 | ManifestContent.DATA, |
110 | 152 | ) |
111 | 153 |
|
112 | | - if data_filter is not None: |
113 | | - evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval |
114 | | - |
115 | 154 | for manifest in manifests: |
116 | 155 | for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): |
117 | | - if entry.snapshot_id not in snapshot_ids: |
118 | | - continue |
119 | | - |
120 | | - if entry.status != ManifestEntryStatus.DELETED: |
121 | | - continue |
122 | | - |
123 | | - if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH: |
124 | | - continue |
125 | | - |
126 | | - if partition_set is not None: |
127 | | - spec_id = entry.data_file.spec_id |
128 | | - partition = entry.data_file.partition |
129 | | - if spec_id not in partition_set or partition not in partition_set[spec_id]: |
130 | | - continue |
131 | | - |
132 | | - yield entry |
| 156 | + if _filter_manifest_entries( |
| 157 | + entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.DELETED, table.schema() |
| 158 | + ): |
| 159 | + yield entry |
133 | 160 |
|
134 | 161 |
|
135 | 162 | def _validate_deleted_data_files( |
@@ -183,24 +210,10 @@ def _added_data_files( |
183 | 210 | ManifestContent.DATA, |
184 | 211 | ) |
185 | 212 |
|
186 | | - if data_filter is not None: |
187 | | - evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter) |
188 | | - |
189 | 213 | for manifest in manifests: |
190 | 214 | for entry in manifest.fetch_manifest_entry(table.io): |
191 | | - if entry.snapshot_id not in snapshot_ids: |
192 | | - continue |
193 | | - |
194 | | - if data_filter and evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH: |
195 | | - continue |
196 | | - |
197 | | - if partition_set is not None: |
198 | | - partition = entry.data_file.partition |
199 | | - spec_id = entry.data_file.spec_id |
200 | | - if spec_id not in partition_set or partition not in partition_set[spec_id]: |
201 | | - continue |
202 | | - |
203 | | - yield entry |
| 215 | + if _filter_manifest_entries(entry, snapshot_ids, data_filter, partition_set, None, table.schema()): |
| 216 | + yield entry |
204 | 217 |
|
205 | 218 |
|
206 | 219 | def _validate_added_data_files( |
|
0 commit comments