Skip to content

Commit 1b8718f

Browse files
Add remove_expired_refs to ExpireSnapshots
1 parent 721c5aa commit 1b8718f

3 files changed

Lines changed: 253 additions & 42 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ class TableProperties:
205205
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
206206
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
207207

208+
MAX_REF_AGE_MS = "history.expire.max-ref-age-ms"
209+
208210

209211
class Transaction:
210212
_table: Table

pyiceberg/table/update/snapshot.py

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,53 +1025,71 @@ def _current_ancestors(self) -> set[int]:
10251025

10261026

10271027
class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
1028-
"""Expire snapshots by ID.
1028+
"""Expire snapshots and refs.
10291029
10301030
Use table.expire_snapshots().<operation>().commit() to run a specific operation.
10311031
Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
1032-
Pending changes are applied on commit.
1032+
Pending changes are applied on commit. Call order does not affect the result.
10331033
"""
10341034

10351035
_updates: tuple[TableUpdate, ...]
10361036
_requirements: tuple[TableRequirement, ...]
10371037
_snapshot_ids_to_expire: set[int]
1038+
_ref_names_to_expire: set[str]
1039+
_expire_older_than_ms: int | None
10381040

10391041
def __init__(self, transaction: Transaction) -> None:
10401042
super().__init__(transaction)
10411043
self._updates = ()
10421044
self._requirements = ()
10431045
self._snapshot_ids_to_expire = set()
1046+
self._ref_names_to_expire = set()
1047+
self._expire_older_than_ms = None
10441048

10451049
def _commit(self) -> UpdatesAndRequirements:
10461050
"""
10471051
Commit the staged updates and requirements.
10481052
1049-
This will remove the snapshots with the given IDs, but will always skip protected snapshots (branch/tag heads).
1053+
Applies all pending expirations: explicit snapshot IDs, age-based snapshot expiry,
1054+
and ref removals. Protected snapshots (branch/tag heads not being expired) are always
1055+
excluded. The age threshold from older_than() is evaluated here so that call order
1056+
with remove_expired_refs() does not affect the result.
10501057
10511058
Returns:
10521059
Tuple of updates and requirements to be committed,
10531060
as required by the calling parent apply functions.
10541061
"""
1055-
# Remove any protected snapshot IDs from the set to expire, just in case
10561062
protected_ids = self._get_protected_snapshot_ids()
1057-
self._snapshot_ids_to_expire -= protected_ids
1058-
update = RemoveSnapshotsUpdate(snapshot_ids=self._snapshot_ids_to_expire)
1059-
self._updates += (update,)
1063+
1064+
if self._expire_older_than_ms is not None:
1065+
for snapshot in self._transaction.table_metadata.snapshots:
1066+
if snapshot.timestamp_ms < self._expire_older_than_ms and snapshot.snapshot_id not in protected_ids:
1067+
self._snapshot_ids_to_expire.add(snapshot.snapshot_id)
1068+
1069+
snapshot_ids_to_expire = self._snapshot_ids_to_expire - protected_ids
1070+
1071+
updates: list[TableUpdate] = list(self._updates)
1072+
for ref_name in self._ref_names_to_expire:
1073+
updates.append(RemoveSnapshotRefUpdate(ref_name=ref_name))
1074+
if snapshot_ids_to_expire:
1075+
updates.append(RemoveSnapshotsUpdate(snapshot_ids=snapshot_ids_to_expire))
1076+
self._updates = tuple(updates)
10601077
return self._updates, self._requirements
10611078

10621079
def _get_protected_snapshot_ids(self) -> set[int]:
10631080
"""
1064-
Get the IDs of protected snapshots.
1081+
Get the IDs of snapshots that must not be expired.
10651082
1066-
These are the HEAD snapshots of all branches and all tagged snapshots. These ids are to be excluded from expiration.
1083+
These are the HEAD snapshots of all branches and tags that are not
1084+
already marked for removal via remove_expired_refs().
10671085
10681086
Returns:
10691087
Set of protected snapshot IDs to exclude from expiration.
10701088
"""
10711089
return {
10721090
ref.snapshot_id
1073-
for ref in self._transaction.table_metadata.refs.values()
1074-
if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]
1091+
for name, ref in self._transaction.table_metadata.refs.items()
1092+
if name not in self._ref_names_to_expire
10751093
}
10761094

10771095
def by_id(self, snapshot_id: int) -> ExpireSnapshots:
@@ -1112,17 +1130,46 @@ def by_ids(self, snapshot_ids: list[int]) -> ExpireSnapshots:
11121130

11131131
def older_than(self, dt: datetime) -> ExpireSnapshots:
11141132
"""
1115-
Expire all unprotected snapshots with a timestamp older than a given value.
1133+
Expire all unprotected snapshots with a timestamp older than the given value.
1134+
1135+
The filter is evaluated at commit time so that snapshots left without a ref
1136+
by remove_expired_refs() are also considered, regardless of call order.
11161137
11171138
Args:
1118-
dt (datetime): Only snapshots with datetime < this value will be expired.
1139+
dt (datetime): Only snapshots with timestamp < this value will be expired.
11191140
11201141
Returns:
11211142
This for method chaining.
11221143
"""
1123-
protected_ids = self._get_protected_snapshot_ids()
1124-
expire_from = datetime_to_millis(dt)
1125-
for snapshot in self._transaction.table_metadata.snapshots:
1126-
if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids:
1127-
self._snapshot_ids_to_expire.add(snapshot.snapshot_id)
1144+
self._expire_older_than_ms = datetime_to_millis(dt)
1145+
return self
1146+
1147+
def remove_expired_refs(self, default_max_ref_age_ms: int | None = None) -> ExpireSnapshots:
1148+
"""
1149+
Mark stale branches and tags for removal.
1150+
1151+
A ref is expired when the age of its snapshot exceeds its own max_ref_age_ms.
1152+
If a ref has no per-ref max_ref_age_ms set, default_max_ref_age_ms is used as fallback.
1153+
The main branch is never removed.
1154+
1155+
Snapshots left without any live ref after this call are no longer protected,
1156+
so a subsequent older_than() will include them in age-based expiry.
1157+
1158+
Args:
1159+
default_max_ref_age_ms: Fallback max age in milliseconds for refs that have no
1160+
per-ref max_ref_age_ms configured. If None, such refs are skipped.
1161+
1162+
Returns:
1163+
This for method chaining.
1164+
"""
1165+
now_ms = int(datetime.now().timestamp() * 1000)
1166+
for name, ref in self._transaction.table_metadata.refs.items():
1167+
if name == MAIN_BRANCH:
1168+
continue
1169+
effective_max_ref_age_ms = ref.max_ref_age_ms if ref.max_ref_age_ms is not None else default_max_ref_age_ms
1170+
if effective_max_ref_age_ms is None:
1171+
continue
1172+
snapshot = self._transaction.table_metadata.snapshot_by_id(ref.snapshot_id)
1173+
if snapshot is None or (now_ms - snapshot.timestamp_ms) > effective_max_ref_age_ms:
1174+
self._ref_names_to_expire.add(name)
11281175
return self

0 commit comments

Comments
 (0)