From 2b1fb0e1ff119884e439e3e23b0ab9b957c49295 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 30 Apr 2026 14:13:47 -0700 Subject: [PATCH 1/2] perf: Cache Transaction.table_metadata between staged updates Transaction.table_metadata calls update_table_metadata on every access, which replays every staged update through model_copy(deep=True). Code that reads the property repeatedly within a single logical operation (e.g. the snapshot producer's schema()/spec()/new_manifest_writer() helpers inside per-manifest loops) pays that cost each time. This caches the result keyed on the identity of the base metadata and the staged-updates tuple. Since _updates is an immutable tuple, every self._updates += (...) rebinds it to a new object, so the identity check self-invalidates without any explicit cache-clearing at mutation sites. The cache also invalidates when the underlying Table.metadata is refreshed after a commit. The only observable difference is that last_updated_ms on the returned metadata is now stable across repeated reads of the same logical state, instead of being re-stamped with now() on each access. The timestamp written at commit time is unaffected. Adds a test that asserts repeated reads compute once, and that staging an update invalidates and recomputes. --- pyiceberg/table/__init__.py | 26 ++++++++++++++++++-------- tests/table/test_init.py | 26 ++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7f1524642b..0c4573a0c6 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -211,6 +211,7 @@ class Transaction: _autocommit: bool _updates: tuple[TableUpdate, ...] _requirements: tuple[TableRequirement, ...] + _table_metadata_cache: tuple[TableMetadata, tuple[TableUpdate, ...], TableMetadata] | None def __init__(self, table: Table, autocommit: bool = False): """Open a transaction to stage and commit changes to a table. @@ -223,10 +224,21 @@ def __init__(self, table: Table, autocommit: bool = False): self._autocommit = autocommit self._updates = () self._requirements = () + self._table_metadata_cache = None @property def table_metadata(self) -> TableMetadata: - return update_table_metadata(self._table.metadata, self._updates) + base, updates = self._table.metadata, self._updates + # update_table_metadata replays every staged update via model_copy(deep=True); + # the cache is keyed on the identity of its inputs so it self-invalidates + # whenever _updates is reassigned (tuple += creates a new object) or the + # underlying table metadata is refreshed. + cached = self._table_metadata_cache + if cached is not None and cached[0] is base and cached[1] is updates: + return cached[2] + result = update_table_metadata(base, updates) + self._table_metadata_cache = (base, updates, result) + return result def __enter__(self) -> Transaction: """Start a transaction to update the table.""" @@ -1961,13 +1973,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) + return lambda datafile: residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), ) @staticmethod diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 0c4ea258f3..630d491a20 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1766,3 +1766,29 @@ def test_build_large_partition_predicate(table_v2: Table) -> None: ) bind(table_v2.metadata.schema(), expr, case_sensitive=True) + + +def test_transaction_table_metadata_cached(table_v2: Table) -> None: + """Transaction.table_metadata should not recompute (replay updates via model_copy) + on every access while the underlying inputs are unchanged, and must recompute once + new updates are staged. + """ + from unittest import mock + + from pyiceberg.table.update import SetPropertiesUpdate, update_table_metadata + + with mock.patch("pyiceberg.table.update_table_metadata", wraps=update_table_metadata) as spy: + txn = table_v2.transaction() + + first = txn.table_metadata + for _ in range(10): + assert txn.table_metadata is first + assert spy.call_count == 1, f"expected 1 recompute for repeated reads, got {spy.call_count}" + + txn._stage((SetPropertiesUpdate(updates={"k": "v"}),)) + second = txn.table_metadata + assert second is not first + assert second.properties["k"] == "v" + for _ in range(10): + assert txn.table_metadata is second + assert spy.call_count == 2, f"expected 2 recomputes after one staged update, got {spy.call_count}" From c87f74f3f6443b1834c6df80fb450c6100d25334 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 30 Apr 2026 14:14:32 -0700 Subject: [PATCH 2/2] Revert unrelated ruff-format drift in DataScan --- pyiceberg/table/__init__.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0c4573a0c6..83d7612a10 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1973,11 +1973,13 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), + return lambda datafile: ( + residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), + ) ) @staticmethod