|
1 | | -# pylint:disable=redefined-outer-name |
2 | | -# pylint:disable=redefined-outer-name |
3 | | -from unittest.mock import Mock |
| 1 | +from datetime import datetime, timezone |
| 2 | +from pathlib import PosixPath |
| 3 | +from random import randint |
| 4 | +import time |
| 5 | +from typing import Any, Dict, Optional |
4 | 6 | import pytest |
5 | | - |
| 7 | +from pyiceberg.catalog.memory import InMemoryCatalog |
| 8 | +from pyiceberg.catalog.noop import NoopCatalog |
| 9 | +from pyiceberg.io import load_file_io |
| 10 | +from pyiceberg.table import Table |
| 11 | +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder |
| 12 | +from pyiceberg.table.update.snapshot import ExpireSnapshots |
| 13 | +from pyiceberg.transforms import IdentityTransform |
| 14 | +from pyiceberg.types import BooleanType, FloatType, IntegerType, ListType, LongType, MapType, StructType |
| 15 | +from tests.catalog.test_base import InMemoryCatalog, Table |
6 | 16 | from pyiceberg.table import Table |
7 | | -from pyiceberg.table.metadata import new_table_metadata |
8 | | -from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry |
9 | | -from pyiceberg.table.update.snapshot import ManageSnapshots |
10 | | - |
11 | 17 | from pyiceberg.schema import Schema |
12 | | -from pyiceberg.partitioning import PartitionSpec |
13 | | -from pyiceberg.table.sorting import SortOrder |
| 18 | +from pyiceberg.types import NestedField, LongType, StringType |
| 19 | +from pyiceberg.table.snapshots import Snapshot |
| 20 | +from pyiceberg.table.metadata import TableMetadata, TableMetadataV2, new_table_metadata |
14 | 21 |
|
15 | 22 |
|
16 | 23 |
|
17 | | -@pytest.fixture |
18 | | -def mock_table(): |
19 | | - """ |
20 | | - Creates a mock Iceberg table with predefined metadata, snapshots, and snapshot log entries. |
21 | | - The mock table includes: |
22 | | - - Snapshots with unique IDs, timestamps, and manifest lists. |
23 | | - - A snapshot log that tracks the history of snapshots with their IDs and timestamps. |
24 | | - - Table metadata including schema, partition spec, sort order, location, properties, and UUID. |
25 | | - - A current snapshot ID and last updated timestamp. |
26 | | - Returns: |
27 | | - Mock: A mock object representing an Iceberg table with the specified metadata and attributes. |
28 | | - """ |
29 | | - snapshots = [ |
30 | | - Snapshot(snapshot_id=1, timestamp_ms=1000, manifest_list="manifest1.avro"), |
31 | | - Snapshot(snapshot_id=2, timestamp_ms=2000, manifest_list="manifest2.avro"), |
32 | | - Snapshot(snapshot_id=3, timestamp_ms=3000, manifest_list="manifest3.avro"), |
33 | | - ] |
34 | | - snapshot_log = [ |
35 | | - SnapshotLogEntry(snapshot_id=1, timestamp_ms=1000), |
36 | | - SnapshotLogEntry(snapshot_id=2, timestamp_ms=2000), |
37 | | - SnapshotLogEntry(snapshot_id=3, timestamp_ms=3000), |
38 | | - ] |
39 | 24 |
|
40 | | - metadata = new_table_metadata( |
41 | | - schema=Schema(fields=[]), |
42 | | - partition_spec=PartitionSpec(spec_id=0, fields=[]), |
43 | | - sort_order=SortOrder(order_id=0, fields=[]), |
44 | | - location="s3://example-bucket/path/", |
45 | | - properties={}, |
46 | | - table_uuid="12345678-1234-1234-1234-123456789abc", |
47 | | - ).model_copy( |
48 | | - update={ |
49 | | - "snapshots": snapshots, |
50 | | - "snapshot_log": snapshot_log, |
51 | | - "current_snapshot_id": 3, |
52 | | - "last_updated_ms": 3000, |
| 25 | +@pytest.fixture |
| 26 | +def generate_test_table() -> Table: |
| 27 | + def generate_snapshot( |
| 28 | + snapshot_id: int, |
| 29 | + parent_snapshot_id: Optional[int] = None, |
| 30 | + timestamp_ms: Optional[int] = None, |
| 31 | + sequence_number: int = 0, |
| 32 | + ) -> Dict[str, Any]: |
| 33 | + return { |
| 34 | + "snapshot-id": snapshot_id, |
| 35 | + "parent-snapshot-id": parent_snapshot_id, |
| 36 | + "timestamp-ms": timestamp_ms or int(time.time() * 1000), |
| 37 | + "sequence-number": sequence_number, |
| 38 | + "summary": {"operation": "append"}, |
| 39 | + "manifest-list": f"s3://a/b/{snapshot_id}.avro", |
53 | 40 | } |
54 | | - ) |
55 | 41 |
|
56 | | - table = Mock(spec=Table) |
57 | | - table.metadata = metadata |
58 | | - table.identifier = ("db", "table") |
| 42 | + snapshots = [] |
| 43 | + snapshot_log = [] |
| 44 | + initial_snapshot_id = 3051729675574597004 |
| 45 | + |
| 46 | + for i in range(2000): |
| 47 | + snapshot_id = initial_snapshot_id + i |
| 48 | + parent_snapshot_id = snapshot_id - 1 if i > 0 else None |
| 49 | + timestamp_ms = int(time.time() * 1000) - randint(0, 1000000) |
| 50 | + snapshots.append(generate_snapshot(snapshot_id, parent_snapshot_id, timestamp_ms, i)) |
| 51 | + snapshot_log.append({"snapshot-id": snapshot_id, "timestamp-ms": timestamp_ms}) |
59 | 52 |
|
| 53 | + metadata = { |
| 54 | + "format-version": 2, |
| 55 | + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", |
| 56 | + "location": "s3://bucket/test/location", |
| 57 | + "last-sequence-number": 34, |
| 58 | + "last-updated-ms": 1602638573590, |
| 59 | + "last-column-id": 3, |
| 60 | + "current-schema-id": 1, |
| 61 | + "schemas": [ |
| 62 | + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}, |
| 63 | + { |
| 64 | + "type": "struct", |
| 65 | + "schema-id": 1, |
| 66 | + "identifier-field-ids": [1, 2], |
| 67 | + "fields": [ |
| 68 | + {"id": 1, "name": "x", "required": True, "type": "long"}, |
| 69 | + {"id": 2, "name": "y", "required": True, "type": "long", "doc": "comment"}, |
| 70 | + {"id": 3, "name": "z", "required": True, "type": "long"}, |
| 71 | + ], |
| 72 | + }, |
| 73 | + ], |
| 74 | + "default-spec-id": 0, |
| 75 | + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], |
| 76 | + "last-partition-id": 1000, |
| 77 | + "default-sort-order-id": 3, |
| 78 | + "sort-orders": [ |
| 79 | + { |
| 80 | + "order-id": 3, |
| 81 | + "fields": [ |
| 82 | + {"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"}, |
| 83 | + {"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"}, |
| 84 | + ], |
| 85 | + } |
| 86 | + ], |
| 87 | + "properties": {"read.split.target.size": "134217728"}, |
| 88 | + "current-snapshot-id": initial_snapshot_id + 1999, |
| 89 | + "snapshots": snapshots, |
| 90 | + "snapshot-log": snapshot_log, |
| 91 | + "metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}], |
| 92 | + "refs": {"test": {"snapshot-id": initial_snapshot_id, "type": "tag", "max-ref-age-ms": 10000000}}, |
| 93 | + } |
| 94 | + |
| 95 | + return Table( |
| 96 | + identifier=("database", "table"), |
| 97 | + metadata=metadata, |
| 98 | + metadata_location=f"{metadata['location']}/uuid.metadata.json", |
| 99 | + io=load_file_io(), |
| 100 | + catalog=NoopCatalog("NoopCatalog"), |
| 101 | + ) |
60 | 102 |
|
61 | | - return table |
62 | 103 |
|
63 | | -def test_expire_snapshots_removes_correct_snapshots(mock_table: Mock): |
| 104 | + |
| 105 | +def test_expire_snapshots_removes_correct_snapshots(generate_test_table): |
64 | 106 | """ |
65 | 107 | Test case for the `ExpireSnapshots` class to ensure that the correct snapshots |
66 | 108 | are removed and the delete function is called the expected number of times. |
67 | | -
|
68 | 109 | """ |
69 | 110 |
|
70 | | - with ManageSnapshots(mock_table) as transaction: |
71 | | - # Mock the transaction to return the mock table |
72 | | - transaction.exipre_snapshot_by_id(1).exipre_snapshot_by_id(2).expire_snapshots().cleanup_files() |
73 | | - |
| 111 | + # Use the fixture-provided table |
| 112 | + with ExpireSnapshots(generate_test_table.transaction()) as manage_snapshots: |
| 113 | + manage_snapshots.expire_snapshot_id(3051729675574597004) |
74 | 114 |
|
75 | | - for snapshot in mock_table.metadata.snapshots: |
76 | | - # Verify that the snapshot is removed from the metadata |
77 | | - assert snapshot.snapshot_id not in [1, 2] |
| 115 | + # Check the remaining snapshots |
| 116 | + remaining_snapshot_ids = {snapshot.snapshot_id for snapshot in generate_test_table.metadata.snapshots} |
| 117 | + assert not remaining_snapshot_ids.issubset({3051729675574597004}) |
0 commit comments