Skip to content

Commit 3da1528

Browse files
committed
feat: add snapshot expiration methods with retention strategies
1 parent 5d6e1e2 commit 3da1528

2 files changed

Lines changed: 366 additions & 1 deletion

File tree

pyiceberg/table/update/snapshot.py

Lines changed: 195 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,7 @@ def _get_protected_snapshot_ids(self) -> Set[int]:
953953
if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]
954954
}
955955

956-
def by_id(self, snapshot_id: int) -> ExpireSnapshots:
956+
def by_id(self, snapshot_id: int) -> "ExpireSnapshots":
957957
"""
958958
Expire a snapshot by its ID.
959959
@@ -1005,3 +1005,197 @@ def older_than(self, dt: datetime) -> "ExpireSnapshots":
10051005
if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids:
10061006
self._snapshot_ids_to_expire.add(snapshot.snapshot_id)
10071007
return self
1008+
1009+
def older_than_with_retention(
1010+
self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None
1011+
) -> "ExpireSnapshots":
1012+
"""Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies.
1013+
1014+
Args:
1015+
timestamp_ms: Only snapshots with timestamp_ms < this value will be expired.
1016+
retain_last_n: Always keep the last N snapshots regardless of age.
1017+
min_snapshots_to_keep: Minimum number of snapshots to keep in total.
1018+
1019+
Returns:
1020+
This for method chaining.
1021+
"""
1022+
snapshots_to_expire = self._get_snapshots_to_expire_with_retention(
1023+
timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep
1024+
)
1025+
self._snapshot_ids_to_expire.update(snapshots_to_expire)
1026+
return self
1027+
1028+
def with_retention_policy(
1029+
self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None
1030+
) -> "ExpireSnapshots":
1031+
"""Comprehensive snapshot expiration with multiple retention strategies.
1032+
1033+
This method provides a unified interface for snapshot expiration with various
1034+
retention policies to ensure operational resilience while allowing space reclamation.
1035+
1036+
The method will use table properties as defaults if they are set:
1037+
- history.expire.max-snapshot-age-ms: Default for timestamp_ms if not provided
1038+
- history.expire.min-snapshots-to-keep: Default for min_snapshots_to_keep if not provided
1039+
- history.expire.max-ref-age-ms: Used for ref expiration (branches/tags)
1040+
1041+
Args:
1042+
timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration.
1043+
If None, will use history.expire.max-snapshot-age-ms table property if set.
1044+
retain_last_n: Always keep the last N snapshots regardless of age.
1045+
Useful when regular snapshot creation occurs and users want to keep
1046+
the last few for rollback purposes.
1047+
min_snapshots_to_keep: Minimum number of snapshots to keep in total.
1048+
Acts as a guardrail to prevent aggressive expiration logic.
1049+
If None, will use history.expire.min-snapshots-to-keep table property if set.
1050+
1051+
Returns:
1052+
This for method chaining.
1053+
1054+
Raises:
1055+
ValueError: If retain_last_n or min_snapshots_to_keep is less than 1.
1056+
1057+
Examples:
1058+
# Use table property defaults
1059+
table.expire_snapshots().with_retention_policy().commit()
1060+
1061+
# Override defaults with explicit values
1062+
table.expire_snapshots().with_retention_policy(
1063+
timestamp_ms=1234567890000,
1064+
retain_last_n=10,
1065+
min_snapshots_to_keep=5
1066+
).commit()
1067+
"""
1068+
# Get default values from table properties
1069+
default_max_age, default_min_snapshots, _ = self._get_expiration_properties()
1070+
1071+
# Use defaults from table properties if not explicitly provided
1072+
if timestamp_ms is None:
1073+
timestamp_ms = default_max_age
1074+
1075+
if min_snapshots_to_keep is None:
1076+
min_snapshots_to_keep = default_min_snapshots
1077+
1078+
# If no expiration criteria are provided, don't expire anything
1079+
if timestamp_ms is None and retain_last_n is None and min_snapshots_to_keep is None:
1080+
return self
1081+
1082+
if retain_last_n is not None and retain_last_n < 1:
1083+
raise ValueError("retain_last_n must be at least 1")
1084+
1085+
if min_snapshots_to_keep is not None and min_snapshots_to_keep < 1:
1086+
raise ValueError("min_snapshots_to_keep must be at least 1")
1087+
1088+
snapshots_to_expire = self._get_snapshots_to_expire_with_retention(
1089+
timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep
1090+
)
1091+
self._snapshot_ids_to_expire.update(snapshots_to_expire)
1092+
return self
1093+
1094+
def retain_last_n(self, n: int) -> "ExpireSnapshots":
1095+
"""Keep only the last N snapshots, expiring all others.
1096+
1097+
Args:
1098+
n: Number of most recent snapshots to keep.
1099+
1100+
Returns:
1101+
This for method chaining.
1102+
1103+
Raises:
1104+
ValueError: If n is less than 1.
1105+
"""
1106+
if n < 1:
1107+
raise ValueError("Number of snapshots to retain must be at least 1")
1108+
1109+
protected_ids = self._get_protected_snapshot_ids()
1110+
1111+
# Sort snapshots by timestamp (most recent first)
1112+
sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True)
1113+
1114+
# Keep the last N snapshots and all protected ones
1115+
snapshots_to_keep = set()
1116+
snapshots_to_keep.update(protected_ids)
1117+
1118+
# Add the N most recent snapshots
1119+
for i, snapshot in enumerate(sorted_snapshots):
1120+
if i < n:
1121+
snapshots_to_keep.add(snapshot.snapshot_id)
1122+
1123+
# Find snapshots to expire
1124+
snapshots_to_expire = []
1125+
for snapshot in self._transaction.table_metadata.snapshots:
1126+
if snapshot.snapshot_id not in snapshots_to_keep:
1127+
snapshots_to_expire.append(snapshot.snapshot_id)
1128+
1129+
self._snapshot_ids_to_expire.update(snapshots_to_expire)
1130+
return self
1131+
1132+
def _get_snapshots_to_expire_with_retention(
1133+
self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None
1134+
) -> List[int]:
1135+
"""Get snapshots to expire considering retention strategies.
1136+
1137+
Args:
1138+
timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration.
1139+
retain_last_n: Always keep the last N snapshots regardless of age.
1140+
min_snapshots_to_keep: Minimum number of snapshots to keep in total.
1141+
1142+
Returns:
1143+
List of snapshot IDs to expire.
1144+
"""
1145+
protected_ids = self._get_protected_snapshot_ids()
1146+
1147+
# Sort snapshots by timestamp (most recent first)
1148+
sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True)
1149+
1150+
# Start with all snapshots that could be expired
1151+
candidates_for_expiration = []
1152+
snapshots_to_keep = set(protected_ids)
1153+
1154+
# Apply retain_last_n constraint
1155+
if retain_last_n is not None:
1156+
for i, snapshot in enumerate(sorted_snapshots):
1157+
if i < retain_last_n:
1158+
snapshots_to_keep.add(snapshot.snapshot_id)
1159+
1160+
# Apply timestamp constraint
1161+
for snapshot in self._transaction.table_metadata.snapshots:
1162+
if snapshot.snapshot_id not in snapshots_to_keep and (timestamp_ms is None or snapshot.timestamp_ms < timestamp_ms):
1163+
candidates_for_expiration.append(snapshot)
1164+
1165+
# Sort candidates by timestamp (oldest first) for potential expiration
1166+
candidates_for_expiration.sort(key=lambda s: s.timestamp_ms)
1167+
1168+
# Apply min_snapshots_to_keep constraint
1169+
total_snapshots = len(self._transaction.table_metadata.snapshots)
1170+
snapshots_to_expire: List[int] = []
1171+
1172+
for candidate in candidates_for_expiration:
1173+
# Check if expiring this snapshot would violate min_snapshots_to_keep
1174+
remaining_after_expiration = total_snapshots - len(snapshots_to_expire) - 1
1175+
1176+
if min_snapshots_to_keep is None or remaining_after_expiration >= min_snapshots_to_keep:
1177+
snapshots_to_expire.append(candidate.snapshot_id)
1178+
else:
1179+
# Stop expiring to maintain minimum count
1180+
break
1181+
1182+
return snapshots_to_expire
1183+
1184+
def _get_expiration_properties(self) -> tuple[Optional[int], Optional[int], Optional[int]]:
1185+
"""Get the default expiration properties from table properties.
1186+
1187+
Returns:
1188+
Tuple of (max_snapshot_age_ms, min_snapshots_to_keep, max_ref_age_ms)
1189+
"""
1190+
properties = self._transaction.table_metadata.properties
1191+
1192+
max_snapshot_age_ms = properties.get("history.expire.max-snapshot-age-ms")
1193+
max_snapshot_age = int(max_snapshot_age_ms) if max_snapshot_age_ms is not None else None
1194+
1195+
min_snapshots = properties.get("history.expire.min-snapshots-to-keep")
1196+
min_snapshots_to_keep = int(min_snapshots) if min_snapshots is not None else None
1197+
1198+
max_ref_age = properties.get("history.expire.max-ref-age-ms")
1199+
max_ref_age_ms = int(max_ref_age) if max_ref_age is not None else None
1200+
1201+
return max_snapshot_age, min_snapshots_to_keep, max_ref_age_ms

tests/table/test_expire_snapshots.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import pytest
2222

2323
from pyiceberg.table import CommitTableResponse, Table
24+
from pyiceberg.table.update.snapshot import ExpireSnapshots
2425

2526

2627
def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None:
@@ -223,3 +224,173 @@ def test_expire_snapshots_by_ids(table_v2: Table) -> None:
223224
assert EXPIRE_SNAPSHOT_1 not in remaining_snapshots
224225
assert EXPIRE_SNAPSHOT_2 not in remaining_snapshots
225226
assert len(table_v2.metadata.snapshots) == 1
227+
228+
229+
def test_retain_last_n_with_protection(table_v2: Table) -> None:
230+
"""Test retain_last_n keeps most recent snapshots plus protected ones."""
231+
from types import SimpleNamespace
232+
233+
# Clear shared state set on the class between tests
234+
ExpireSnapshots._snapshot_ids_to_expire.clear()
235+
236+
S1 = 101 # oldest (also protected)
237+
S2 = 102
238+
S3 = 103
239+
S4 = 104 # newest
240+
241+
# Protected S1 as branch head
242+
table_v2.metadata = table_v2.metadata.model_copy(
243+
update={
244+
"refs": {
245+
"main": MagicMock(snapshot_id=S1, snapshot_ref_type="branch"),
246+
},
247+
"snapshots": [
248+
SimpleNamespace(snapshot_id=S1, timestamp_ms=1, parent_snapshot_id=None),
249+
SimpleNamespace(snapshot_id=S2, timestamp_ms=2, parent_snapshot_id=None),
250+
SimpleNamespace(snapshot_id=S3, timestamp_ms=3, parent_snapshot_id=None),
251+
SimpleNamespace(snapshot_id=S4, timestamp_ms=4, parent_snapshot_id=None),
252+
],
253+
}
254+
)
255+
256+
table_v2.catalog = MagicMock()
257+
kept_ids = {S1, S3, S4} # retain_last_n=2 keeps S4,S3 plus protected S1
258+
mock_response = CommitTableResponse(
259+
metadata=table_v2.metadata.model_copy(update={"snapshots": list(kept_ids)}),
260+
metadata_location="mock://metadata/location",
261+
uuid=uuid4(),
262+
)
263+
table_v2.catalog.commit_table.return_value = mock_response
264+
265+
table_v2.maintenance.expire_snapshots().retain_last_n(2).commit()
266+
table_v2.metadata = mock_response.metadata
267+
268+
args, kwargs = table_v2.catalog.commit_table.call_args
269+
updates = args[2] if len(args) > 2 else ()
270+
remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None)
271+
assert remove_update is not None
272+
# Only S2 should be expired
273+
assert set(remove_update.snapshot_ids) == {S2}
274+
assert S2 not in table_v2.metadata.snapshots
275+
276+
277+
def test_older_than_with_retention_combination(table_v2: Table) -> None:
278+
"""Test older_than_with_retention combining timestamp, retain_last_n and min_snapshots_to_keep."""
279+
from types import SimpleNamespace
280+
281+
ExpireSnapshots._snapshot_ids_to_expire.clear()
282+
283+
# Create 5 snapshots with increasing timestamps
284+
S1, S2, S3, S4, S5 = 201, 202, 203, 204, 205
285+
snapshots = [
286+
SimpleNamespace(snapshot_id=S1, timestamp_ms=100, parent_snapshot_id=None),
287+
SimpleNamespace(snapshot_id=S2, timestamp_ms=200, parent_snapshot_id=None),
288+
SimpleNamespace(snapshot_id=S3, timestamp_ms=300, parent_snapshot_id=None),
289+
SimpleNamespace(snapshot_id=S4, timestamp_ms=400, parent_snapshot_id=None),
290+
SimpleNamespace(snapshot_id=S5, timestamp_ms=500, parent_snapshot_id=None),
291+
]
292+
table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots})
293+
table_v2.catalog = MagicMock()
294+
295+
# Expect to expire S1,S2,S3 ; keep S4 (due to min snapshots) and S5 (retain_last_n=1)
296+
mock_response = CommitTableResponse(
297+
metadata=table_v2.metadata.model_copy(update={"snapshots": [S4, S5]}),
298+
metadata_location="mock://metadata/location",
299+
uuid=uuid4(),
300+
)
301+
table_v2.catalog.commit_table.return_value = mock_response
302+
303+
table_v2.maintenance.expire_snapshots().older_than_with_retention(
304+
timestamp_ms=450, retain_last_n=1, min_snapshots_to_keep=2
305+
).commit()
306+
table_v2.metadata = mock_response.metadata
307+
308+
args, kwargs = table_v2.catalog.commit_table.call_args
309+
updates = args[2] if len(args) > 2 else ()
310+
remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None)
311+
assert remove_update is not None
312+
assert set(remove_update.snapshot_ids) == {S1, S2, S3}
313+
assert set(table_v2.metadata.snapshots) == {S4, S5}
314+
315+
316+
def test_with_retention_policy_defaults(table_v2: Table) -> None:
317+
"""Test with_retention_policy uses table property defaults when arguments omitted."""
318+
from types import SimpleNamespace
319+
320+
ExpireSnapshots._snapshot_ids_to_expire.clear()
321+
322+
# Properties: expire snapshots older than 350ms, keep at least 3 snapshots
323+
properties = {
324+
"history.expire.max-snapshot-age-ms": "350",
325+
"history.expire.min-snapshots-to-keep": "3",
326+
}
327+
S1, S2, S3, S4, S5 = 301, 302, 303, 304, 305
328+
snapshots = [
329+
SimpleNamespace(snapshot_id=S1, timestamp_ms=100, parent_snapshot_id=None),
330+
SimpleNamespace(snapshot_id=S2, timestamp_ms=200, parent_snapshot_id=None),
331+
SimpleNamespace(snapshot_id=S3, timestamp_ms=300, parent_snapshot_id=None),
332+
SimpleNamespace(snapshot_id=S4, timestamp_ms=400, parent_snapshot_id=None),
333+
SimpleNamespace(snapshot_id=S5, timestamp_ms=500, parent_snapshot_id=None),
334+
]
335+
table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots, "properties": properties})
336+
table_v2.catalog = MagicMock()
337+
338+
# Expect S1,S2 expired; S3 kept due to min_snapshots_to_keep
339+
mock_response = CommitTableResponse(
340+
metadata=table_v2.metadata.model_copy(update={"snapshots": [S3, S4, S5]}),
341+
metadata_location="mock://metadata/location",
342+
uuid=uuid4(),
343+
)
344+
table_v2.catalog.commit_table.return_value = mock_response
345+
346+
table_v2.maintenance.expire_snapshots().with_retention_policy().commit()
347+
table_v2.metadata = mock_response.metadata
348+
349+
args, kwargs = table_v2.catalog.commit_table.call_args
350+
updates = args[2] if len(args) > 2 else ()
351+
remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None)
352+
assert remove_update is not None
353+
assert set(remove_update.snapshot_ids) == {S1, S2}
354+
assert set(table_v2.metadata.snapshots) == {S3, S4, S5}
355+
356+
357+
def test_get_expiration_properties(table_v2: Table) -> None:
358+
"""Test retrieval of expiration properties from table metadata."""
359+
ExpireSnapshots._snapshot_ids_to_expire.clear()
360+
properties = {
361+
"history.expire.max-snapshot-age-ms": "60000",
362+
"history.expire.min-snapshots-to-keep": "5",
363+
"history.expire.max-ref-age-ms": "120000",
364+
}
365+
table_v2.metadata = table_v2.metadata.model_copy(update={"properties": properties})
366+
expire = table_v2.maintenance.expire_snapshots()
367+
max_age, min_snaps, max_ref_age = expire._get_expiration_properties()
368+
assert max_age == 60000
369+
assert min_snaps == 5
370+
assert max_ref_age == 120000
371+
372+
373+
def test_get_snapshots_to_expire_with_retention_respects_protection(table_v2: Table) -> None:
374+
"""Internal helper should not select protected snapshots for expiration."""
375+
from types import SimpleNamespace
376+
377+
ExpireSnapshots._snapshot_ids_to_expire.clear()
378+
379+
P = 401 # protected
380+
A = 402
381+
B = 403
382+
table_v2.metadata = table_v2.metadata.model_copy(
383+
update={
384+
"refs": {"main": MagicMock(snapshot_id=P, snapshot_ref_type="branch")},
385+
"snapshots": [
386+
SimpleNamespace(snapshot_id=P, timestamp_ms=10, parent_snapshot_id=None),
387+
SimpleNamespace(snapshot_id=A, timestamp_ms=20, parent_snapshot_id=None),
388+
SimpleNamespace(snapshot_id=B, timestamp_ms=30, parent_snapshot_id=None),
389+
],
390+
}
391+
)
392+
expire = table_v2.maintenance.expire_snapshots()
393+
to_expire = expire._get_snapshots_to_expire_with_retention(timestamp_ms=100, retain_last_n=None, min_snapshots_to_keep=1)
394+
# Protected snapshot P should not be in list; both A and B can expire respecting min keep
395+
assert P not in to_expire
396+
assert set(to_expire) == {A, B}

0 commit comments

Comments
 (0)