Skip to content

Commit 2b1fb0e

Browse files
committed
perf: Cache Transaction.table_metadata between staged updates
Transaction.table_metadata calls update_table_metadata on every access, which replays every staged update through model_copy(deep=True). Code that reads the property repeatedly within a single logical operation (e.g. the snapshot producer's schema()/spec()/new_manifest_writer() helpers inside per-manifest loops) pays that cost each time. This caches the result keyed on the identity of the base metadata and the staged-updates tuple. Since _updates is an immutable tuple, every self._updates += (...) rebinds it to a new object, so the identity check self-invalidates without any explicit cache-clearing at mutation sites. The cache also invalidates when the underlying Table.metadata is refreshed after a commit. The only observable difference is that last_updated_ms on the returned metadata is now stable across repeated reads of the same logical state, instead of being re-stamped with now() on each access. The timestamp written at commit time is unaffected. Adds a test that asserts repeated reads compute once, and that staging an update invalidates and recomputes.
1 parent e6d5129 commit 2b1fb0e

2 files changed

Lines changed: 44 additions & 8 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ class Transaction:
211211
_autocommit: bool
212212
_updates: tuple[TableUpdate, ...]
213213
_requirements: tuple[TableRequirement, ...]
214+
_table_metadata_cache: tuple[TableMetadata, tuple[TableUpdate, ...], TableMetadata] | None
214215

215216
def __init__(self, table: Table, autocommit: bool = False):
216217
"""Open a transaction to stage and commit changes to a table.
@@ -223,10 +224,21 @@ def __init__(self, table: Table, autocommit: bool = False):
223224
self._autocommit = autocommit
224225
self._updates = ()
225226
self._requirements = ()
227+
self._table_metadata_cache = None
226228

227229
@property
228230
def table_metadata(self) -> TableMetadata:
229-
return update_table_metadata(self._table.metadata, self._updates)
231+
base, updates = self._table.metadata, self._updates
232+
# update_table_metadata replays every staged update via model_copy(deep=True);
233+
# the cache is keyed on the identity of its inputs so it self-invalidates
234+
# whenever _updates is reassigned (tuple += creates a new object) or the
235+
# underlying table metadata is refreshed.
236+
cached = self._table_metadata_cache
237+
if cached is not None and cached[0] is base and cached[1] is updates:
238+
return cached[2]
239+
result = update_table_metadata(base, updates)
240+
self._table_metadata_cache = (base, updates, result)
241+
return result
230242

231243
def __enter__(self) -> Transaction:
232244
"""Start a transaction to update the table."""
@@ -1961,13 +1973,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
19611973
# The lambda created here is run in multiple threads.
19621974
# So we avoid creating _EvaluatorExpression methods bound to a single
19631975
# shared instance across multiple threads.
1964-
return lambda datafile: (
1965-
residual_evaluator_of(
1966-
spec=spec,
1967-
expr=self.row_filter,
1968-
case_sensitive=self.case_sensitive,
1969-
schema=self.table_metadata.schema(),
1970-
)
1976+
return lambda datafile: residual_evaluator_of(
1977+
spec=spec,
1978+
expr=self.row_filter,
1979+
case_sensitive=self.case_sensitive,
1980+
schema=self.table_metadata.schema(),
19711981
)
19721982

19731983
@staticmethod

tests/table/test_init.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,3 +1766,29 @@ def test_build_large_partition_predicate(table_v2: Table) -> None:
17661766
)
17671767

17681768
bind(table_v2.metadata.schema(), expr, case_sensitive=True)
1769+
1770+
1771+
def test_transaction_table_metadata_cached(table_v2: Table) -> None:
1772+
"""Transaction.table_metadata should not recompute (replay updates via model_copy)
1773+
on every access while the underlying inputs are unchanged, and must recompute once
1774+
new updates are staged.
1775+
"""
1776+
from unittest import mock
1777+
1778+
from pyiceberg.table.update import SetPropertiesUpdate, update_table_metadata
1779+
1780+
with mock.patch("pyiceberg.table.update_table_metadata", wraps=update_table_metadata) as spy:
1781+
txn = table_v2.transaction()
1782+
1783+
first = txn.table_metadata
1784+
for _ in range(10):
1785+
assert txn.table_metadata is first
1786+
assert spy.call_count == 1, f"expected 1 recompute for repeated reads, got {spy.call_count}"
1787+
1788+
txn._stage((SetPropertiesUpdate(updates={"k": "v"}),))
1789+
second = txn.table_metadata
1790+
assert second is not first
1791+
assert second.properties["k"] == "v"
1792+
for _ in range(10):
1793+
assert txn.table_metadata is second
1794+
assert spy.call_count == 2, f"expected 2 recomputes after one staged update, got {spy.call_count}"

0 commit comments

Comments
 (0)