|
25 | 25 | from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile |
26 | 26 | from pyiceberg.table import Table |
27 | 27 | from pyiceberg.table.snapshots import Operation, Snapshot, Summary |
28 | | -from pyiceberg.table.update.validate import _deleted_data_files, _validate_deleted_data_files, validation_history |
| 28 | +from pyiceberg.table.update.validate import ( |
| 29 | + _added_data_files, |
| 30 | + _deleted_data_files, |
| 31 | + _validate_added_data_files, |
| 32 | + _validate_deleted_data_files, |
| 33 | + validation_history, |
| 34 | +) |
29 | 35 |
|
30 | 36 |
|
31 | 37 | @pytest.fixture |
@@ -217,3 +223,130 @@ class DummyEntry: |
217 | 223 | data_filter=None, |
218 | 224 | parent_snapshot=oldest_snapshot, |
219 | 225 | ) |
| 226 | + |
| 227 | + |
| 228 | +@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.OVERWRITE]) |
| 229 | +def test_validate_added_data_files_conflicting_count( |
| 230 | + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], |
| 231 | + operation: Operation, |
| 232 | +) -> None: |
| 233 | + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests |
| 234 | + |
| 235 | + snapshot_history = 100 |
| 236 | + snapshots = table.snapshots() |
| 237 | + for i in range(1, snapshot_history + 1): |
| 238 | + altered_snapshot = snapshots[-i] |
| 239 | + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) |
| 240 | + snapshots[-i] = altered_snapshot |
| 241 | + |
| 242 | + table.metadata = table.metadata.model_copy( |
| 243 | + update={"snapshots": snapshots}, |
| 244 | + ) |
| 245 | + |
| 246 | + oldest_snapshot = table.snapshots()[-snapshot_history] |
| 247 | + newest_snapshot = cast(Snapshot, table.current_snapshot()) |
| 248 | + |
| 249 | + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: |
| 250 | + """Mock the manifests method to use the snapshot_id for lookup.""" |
| 251 | + snapshot_id = self.snapshot_id |
| 252 | + if snapshot_id in mock_manifests: |
| 253 | + return mock_manifests[snapshot_id] |
| 254 | + return [] |
| 255 | + |
| 256 | + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: |
| 257 | + return [ |
| 258 | + ManifestEntry.from_args( |
| 259 | + status=ManifestEntryStatus.ADDED, |
| 260 | + snapshot_id=self.added_snapshot_id, |
| 261 | + ) |
| 262 | + ] |
| 263 | + |
| 264 | + with ( |
| 265 | + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), |
| 266 | + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), |
| 267 | + ): |
| 268 | + result = list( |
| 269 | + _added_data_files( |
| 270 | + table=table, |
| 271 | + starting_snapshot=newest_snapshot, |
| 272 | + data_filter=None, |
| 273 | + parent_snapshot=oldest_snapshot, |
| 274 | + partition_set=None, |
| 275 | + ) |
| 276 | + ) |
| 277 | + |
| 278 | + # since we only look at the ManifestContent.Data files |
| 279 | + assert len(result) == snapshot_history / 2 |
| 280 | + |
| 281 | + |
| 282 | +@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.REPLACE]) |
| 283 | +def test_validate_added_data_files_non_conflicting_count( |
| 284 | + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], |
| 285 | + operation: Operation, |
| 286 | +) -> None: |
| 287 | + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests |
| 288 | + |
| 289 | + snapshot_history = 100 |
| 290 | + snapshots = table.snapshots() |
| 291 | + for i in range(1, snapshot_history + 1): |
| 292 | + altered_snapshot = snapshots[-i] |
| 293 | + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) |
| 294 | + snapshots[-i] = altered_snapshot |
| 295 | + |
| 296 | + table.metadata = table.metadata.model_copy( |
| 297 | + update={"snapshots": snapshots}, |
| 298 | + ) |
| 299 | + |
| 300 | + oldest_snapshot = table.snapshots()[-snapshot_history] |
| 301 | + newest_snapshot = cast(Snapshot, table.current_snapshot()) |
| 302 | + |
| 303 | + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: |
| 304 | + """Mock the manifests method to use the snapshot_id for lookup.""" |
| 305 | + snapshot_id = self.snapshot_id |
| 306 | + if snapshot_id in mock_manifests: |
| 307 | + return mock_manifests[snapshot_id] |
| 308 | + return [] |
| 309 | + |
| 310 | + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: |
| 311 | + return [ |
| 312 | + ManifestEntry.from_args( |
| 313 | + status=ManifestEntryStatus.ADDED, |
| 314 | + snapshot_id=self.added_snapshot_id, |
| 315 | + ) |
| 316 | + ] |
| 317 | + |
| 318 | + with ( |
| 319 | + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), |
| 320 | + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), |
| 321 | + ): |
| 322 | + result = list( |
| 323 | + _added_data_files( |
| 324 | + table=table, |
| 325 | + starting_snapshot=newest_snapshot, |
| 326 | + data_filter=None, |
| 327 | + parent_snapshot=oldest_snapshot, |
| 328 | + partition_set=None, |
| 329 | + ) |
| 330 | + ) |
| 331 | + |
| 332 | + assert len(result) == 0 |
| 333 | + |
| 334 | + |
| 335 | +def test_validate_added_data_files_raises_on_conflict( |
| 336 | + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], |
| 337 | +) -> None: |
| 338 | + table, _ = table_v2_with_extensive_snapshots_and_manifests |
| 339 | + oldest_snapshot = table.snapshots()[0] |
| 340 | + newest_snapshot = cast(Snapshot, table.current_snapshot()) |
| 341 | + |
| 342 | + class DummyEntry: |
| 343 | + snapshot_id = 123 |
| 344 | + |
| 345 | + with patch("pyiceberg.table.update.validate._added_data_files", return_value=[DummyEntry()]): |
| 346 | + with pytest.raises(ValidationException): |
| 347 | + _validate_added_data_files( |
| 348 | + table=table, |
| 349 | + starting_snapshot=newest_snapshot, |
| 350 | + data_filter=None, |
| 351 | + parent_snapshot=oldest_snapshot, |
| 352 | + ) |
0 commit comments