Skip to content

Commit a04e503

Browse files
geruhchinmay-bhat
andcommitted
feat: Add support for rolling back to timestamp
Co-authored-by: Chinmay Bhat <12948588+chinmay-bhat@users.noreply.github.com>
1 parent 4b193f9 commit a04e503

5 files changed

Lines changed: 230 additions & 0 deletions

File tree

pyiceberg/table/snapshots.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,3 +472,24 @@ def ancestors_between(from_snapshot: Snapshot | None, to_snapshot: Snapshot, tab
472472
break
473473
else:
474474
yield from ancestors_of(to_snapshot, table_metadata)
475+
476+
477+
def latest_ancestor_before_timestamp(table_metadata: TableMetadata, timestamp_ms: int) -> Snapshot | None:
478+
"""Find the latest ancestor snapshot whose timestamp is before the provided timestamp.
479+
480+
Args:
481+
table_metadata: The table metadata for a table
482+
timestamp_ms: lookup snapshots before this timestamp
483+
484+
Returns:
485+
The latest ancestor snapshot older than the timestamp, or None if not found.
486+
"""
487+
result: Snapshot | None = None
488+
result_timestamp: int = 0
489+
490+
for ancestor in ancestors_of(table_metadata.current_snapshot(), table_metadata):
491+
if timestamp_ms > ancestor.timestamp_ms > result_timestamp:
492+
result = ancestor
493+
result_timestamp = ancestor.timestamp_ms
494+
495+
return result

pyiceberg/table/update/snapshot.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
SnapshotSummaryCollector,
6666
Summary,
6767
ancestors_of,
68+
latest_ancestor_before_timestamp,
6869
update_snapshot_summaries,
6970
)
7071
from pyiceberg.table.update import (
@@ -1007,6 +1008,33 @@ def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots:
10071008

10081009
return self.set_current_snapshot(snapshot_id=snapshot_id)
10091010

1011+
def rollback_to_timestamp(self, timestamp_ms: int) -> ManageSnapshots:
1012+
"""Rollback the table to the latest snapshot before the given timestamp.
1013+
1014+
Finds the latest ancestor snapshot whose timestamp is before the given timestamp and rolls back to it.
1015+
1016+
Args:
1017+
timestamp_ms: Rollback to the latest snapshot before this timestamp in milliseconds.
1018+
Returns:
1019+
This for method chaining
1020+
Raises:
1021+
ValueError: If no valid snapshot exists older than the given timestamp.
1022+
"""
1023+
self._commit_if_ref_updates_exist()
1024+
1025+
snapshot = latest_ancestor_before_timestamp(self._transaction.table_metadata, timestamp_ms)
1026+
if snapshot is None:
1027+
raise ValueError(f"Cannot roll back, no valid snapshot older than: {timestamp_ms}")
1028+
1029+
update, requirement = self._transaction._set_ref_snapshot(
1030+
snapshot_id=snapshot.snapshot_id,
1031+
ref_name=MAIN_BRANCH,
1032+
type=str(SnapshotRefType.BRANCH),
1033+
)
1034+
self._updates += update
1035+
self._requirements += requirement
1036+
return self
1037+
10101038
def _is_current_ancestor(self, snapshot_id: int) -> bool:
10111039
return snapshot_id in self._current_ancestors()
10121040

tests/integration/test_snapshot_operations.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,32 @@ def test_rollback_to_snapshot(catalog: Catalog) -> None:
186186
tbl = catalog.load_table(identifier)
187187
restored_snapshot = tbl.current_snapshot()
188188
assert restored_snapshot and restored_snapshot.snapshot_id == current_snapshot_id
189+
190+
191+
@pytest.mark.integration
192+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
193+
def test_rollback_to_timestamp(catalog: Catalog) -> None:
194+
identifier = "default.test_table_snapshot_operations"
195+
tbl = catalog.load_table(identifier)
196+
197+
current_snapshot = tbl.current_snapshot()
198+
assert current_snapshot is not None
199+
assert current_snapshot.parent_snapshot_id is not None
200+
201+
parent_snapshot = tbl.metadata.snapshot_by_id(current_snapshot.parent_snapshot_id)
202+
assert parent_snapshot is not None
203+
204+
# rollback_to_timestamp finds the latest ancestor with timestamp less than given timestamp
205+
tbl.manage_snapshots().rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms).commit()
206+
207+
tbl = catalog.load_table(identifier)
208+
updated_snapshot = tbl.current_snapshot()
209+
assert updated_snapshot is not None
210+
assert updated_snapshot.snapshot_id == parent_snapshot.snapshot_id
211+
212+
# restore table
213+
tbl.manage_snapshots().set_current_snapshot(snapshot_id=current_snapshot.snapshot_id).commit()
214+
tbl = catalog.load_table(identifier)
215+
restored_snapshot = tbl.current_snapshot()
216+
assert restored_snapshot is not None
217+
assert restored_snapshot.snapshot_id == current_snapshot.snapshot_id

tests/table/test_manage_snapshots.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,78 @@ def test_rollback_to_current_snapshot(table_v2: Table) -> None:
303303

304304
table_v2.manage_snapshots().rollback_to_snapshot(snapshot_id=current_snapshot.snapshot_id).commit()
305305
table_v2.catalog.commit_table.assert_called_once()
306+
307+
308+
def test_rollback_to_timestamp() -> None:
309+
from pyiceberg.table.metadata import TableMetadataV2
310+
311+
metadata_dict = {
312+
"format-version": 2,
313+
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
314+
"location": "s3://bucket/test/location",
315+
"last-sequence-number": 4,
316+
"last-updated-ms": 1602638573590,
317+
"last-column-id": 1,
318+
"current-schema-id": 0,
319+
"schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}],
320+
"default-spec-id": 0,
321+
"partition-specs": [{"spec-id": 0, "fields": []}],
322+
"last-partition-id": 999,
323+
"default-sort-order-id": 0,
324+
"sort-orders": [{"order-id": 0, "fields": []}],
325+
"current-snapshot-id": 4,
326+
"snapshots": [
327+
{"snapshot-id": 1, "timestamp-ms": 1000, "sequence-number": 1, "manifest-list": "s3://a/1.avro"},
328+
{
329+
"snapshot-id": 2,
330+
"parent-snapshot-id": 1,
331+
"timestamp-ms": 2000,
332+
"sequence-number": 2,
333+
"manifest-list": "s3://a/2.avro",
334+
},
335+
{
336+
"snapshot-id": 3,
337+
"parent-snapshot-id": 2,
338+
"timestamp-ms": 3000,
339+
"sequence-number": 3,
340+
"manifest-list": "s3://a/3.avro",
341+
},
342+
{
343+
"snapshot-id": 4,
344+
"parent-snapshot-id": 3,
345+
"timestamp-ms": 4000,
346+
"sequence-number": 4,
347+
"manifest-list": "s3://a/4.avro",
348+
},
349+
],
350+
}
351+
352+
mock_catalog = MagicMock()
353+
table = Table(
354+
identifier=("db", "table"),
355+
metadata=TableMetadataV2(**metadata_dict),
356+
metadata_location="s3://bucket/test/metadata.json",
357+
io=load_file_io(),
358+
catalog=mock_catalog,
359+
)
360+
mock_catalog.commit_table.return_value = _mock_commit_response(table)
361+
362+
# verify we find the ancestor before timestamp 2500
363+
table.manage_snapshots().rollback_to_timestamp(timestamp_ms=2500).commit()
364+
365+
updates = _get_updates(mock_catalog)
366+
set_ref_updates = [u for u in updates if isinstance(u, SetSnapshotRefUpdate)]
367+
368+
assert len(set_ref_updates) == 1
369+
assert set_ref_updates[0].snapshot_id == 2
370+
assert set_ref_updates[0].ref_name == "main"
371+
372+
373+
def test_rollback_to_timestamp_no_valid_snapshot(table_v2: Table) -> None:
374+
# The oldest snapshot is at timestamp 1515100955770
375+
table_v2.catalog = MagicMock()
376+
377+
with pytest.raises(ValueError, match="Cannot roll back, no valid snapshot older than"):
378+
table_v2.manage_snapshots().rollback_to_timestamp(timestamp_ms=1515100955770).commit()
379+
380+
table_v2.catalog.commit_table.assert_not_called()

tests/table/test_snapshots.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
Summary,
3131
ancestors_between,
3232
ancestors_of,
33+
latest_ancestor_before_timestamp,
3334
update_snapshot_summaries,
3435
)
3536
from pyiceberg.transforms import IdentityTransform
@@ -456,3 +457,79 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
456457
)
457458
== 2000
458459
)
460+
461+
462+
def test_latest_ancestor_before_timestamp() -> None:
463+
from pyiceberg.table.metadata import TableMetadataV2
464+
465+
# Create metadata with 4 snapshots at ordered timestamps
466+
metadata = TableMetadataV2(
467+
**{
468+
"format-version": 2,
469+
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
470+
"location": "s3://bucket/test/location",
471+
"last-sequence-number": 4,
472+
"last-updated-ms": 1602638573590,
473+
"last-column-id": 1,
474+
"current-schema-id": 0,
475+
"schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}],
476+
"default-spec-id": 0,
477+
"partition-specs": [{"spec-id": 0, "fields": []}],
478+
"last-partition-id": 999,
479+
"default-sort-order-id": 0,
480+
"sort-orders": [{"order-id": 0, "fields": []}],
481+
"current-snapshot-id": 4,
482+
"snapshots": [
483+
{
484+
"snapshot-id": 1,
485+
"timestamp-ms": 1000,
486+
"sequence-number": 1,
487+
"summary": {"operation": "append"},
488+
"manifest-list": "s3://a/1.avro",
489+
},
490+
{
491+
"snapshot-id": 2,
492+
"parent-snapshot-id": 1,
493+
"timestamp-ms": 2000,
494+
"sequence-number": 2,
495+
"summary": {"operation": "append"},
496+
"manifest-list": "s3://a/2.avro",
497+
},
498+
{
499+
"snapshot-id": 3,
500+
"parent-snapshot-id": 2,
501+
"timestamp-ms": 3000,
502+
"sequence-number": 3,
503+
"summary": {"operation": "append"},
504+
"manifest-list": "s3://a/3.avro",
505+
},
506+
{
507+
"snapshot-id": 4,
508+
"parent-snapshot-id": 3,
509+
"timestamp-ms": 4000,
510+
"sequence-number": 4,
511+
"summary": {"operation": "append"},
512+
"manifest-list": "s3://a/4.avro",
513+
},
514+
],
515+
}
516+
)
517+
518+
result = latest_ancestor_before_timestamp(metadata, 3500)
519+
assert result is not None
520+
assert result.snapshot_id == 3
521+
522+
result = latest_ancestor_before_timestamp(metadata, 2500)
523+
assert result is not None
524+
assert result.snapshot_id == 2
525+
526+
result = latest_ancestor_before_timestamp(metadata, 5000)
527+
assert result is not None
528+
assert result.snapshot_id == 4
529+
530+
result = latest_ancestor_before_timestamp(metadata, 3000)
531+
assert result is not None
532+
assert result.snapshot_id == 2
533+
534+
result = latest_ancestor_before_timestamp(metadata, 1000)
535+
assert result is None

0 commit comments

Comments
 (0)