Skip to content

Commit 7839569

Browse files
committed
perf: Hoist table_metadata at remaining repeat-access sites in snapshot update
Follow-up to #2674. Transaction.table_metadata replays all staged updates via model_copy on every access; this applies the #2674 hoist pattern to three more sites in snapshot.py that still read the property more than once per invocation: - _SnapshotProducer._summary: hoist spec()/schema() out of the per-data-file loop - _DeleteFiles._compute_deletes: hoist table_metadata/schema once (was 3 accesses at method entry) - _MergeAppendFiles.__init__: 3 consecutive .properties reads -> 1 Adds a regression test asserting _summary() access count is independent of file count and _MergeAppendFiles.__init__ adds exactly one access over its superclass.
1 parent e6d5129 commit 7839569

2 files changed

Lines changed: 67 additions & 11 deletions

File tree

pyiceberg/table/update/snapshot.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
228228

229229
# avoid copying metadata for each data file
230230
table_metadata = self._transaction.table_metadata
231+
schema = table_metadata.schema()
232+
default_spec = table_metadata.spec()
231233

232234
partition_summary_limit = int(
233235
table_metadata.properties.get(
@@ -239,8 +241,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
239241
for data_file in self._added_data_files:
240242
ssc.add_file(
241243
data_file=data_file,
242-
partition_spec=table_metadata.spec(),
243-
schema=table_metadata.schema(),
244+
partition_spec=default_spec,
245+
schema=schema,
244246
)
245247

246248
if len(self._deleted_data_files) > 0:
@@ -249,7 +251,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
249251
ssc.remove_file(
250252
data_file=data_file,
251253
partition_spec=specs[data_file.spec_id],
252-
schema=table_metadata.schema(),
254+
schema=schema,
253255
)
254256

255257
previous_snapshot = (
@@ -424,12 +426,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
424426
data_file=entry.data_file,
425427
)
426428

429+
# avoid copying metadata for each evaluator
430+
table_metadata = self._transaction.table_metadata
431+
schema = table_metadata.schema()
432+
427433
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
428-
strict_metrics_evaluator = _StrictMetricsEvaluator(
429-
self.schema(), self._predicate, case_sensitive=self._case_sensitive
430-
).eval
434+
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
431435
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
432-
self.schema(), self._predicate, case_sensitive=self._case_sensitive
436+
schema, self._predicate, case_sensitive=self._case_sensitive
433437
).eval
434438

435439
existing_manifests = []
@@ -441,7 +445,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
441445
# Should be the current tip of the _target_branch
442446
parent_snapshot_id_for_delete_source = self._parent_snapshot_id
443447
if parent_snapshot_id_for_delete_source is not None:
444-
snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source)
448+
snapshot = table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source)
445449
if snapshot: # Ensure snapshot is found
446450
for manifest_file in snapshot.manifests(io=self._io):
447451
if manifest_file.content == ManifestContent.DATA:
@@ -542,18 +546,19 @@ def __init__(
542546
from pyiceberg.table import TableProperties
543547

544548
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch)
549+
table_properties = self._transaction.table_metadata.properties
545550
self._target_size_bytes = property_as_int(
546-
self._transaction.table_metadata.properties,
551+
table_properties,
547552
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
548553
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
549554
) # type: ignore
550555
self._min_count_to_merge = property_as_int(
551-
self._transaction.table_metadata.properties,
556+
table_properties,
552557
TableProperties.MANIFEST_MIN_MERGE_COUNT,
553558
TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT,
554559
) # type: ignore
555560
self._merge_enabled = property_as_bool(
556-
self._transaction.table_metadata.properties,
561+
table_properties,
557562
TableProperties.MANIFEST_MERGE_ENABLED,
558563
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
559564
)

tests/table/test_snapshots.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,3 +551,54 @@ def test_latest_ancestor_before_timestamp() -> None:
551551

552552
result = latest_ancestor_before_timestamp(metadata, 1000)
553553
assert result is None
554+
555+
556+
def test_snapshot_producer_bounded_metadata_access(table_v2: Table) -> None:
557+
"""Transaction.table_metadata recomputes via model_copy on every access, so the
558+
snapshot producer must not read it once per item. Guards the hoisting introduced
559+
in #2674 and extended here.
560+
"""
561+
from unittest.mock import patch
562+
563+
from pyiceberg.table import Transaction
564+
from pyiceberg.table.metadata import TableMetadata
565+
from pyiceberg.table.update.snapshot import _FastAppendFiles, _MergeAppendFiles
566+
567+
original_fget = Transaction.table_metadata.fget
568+
call_count = 0
569+
570+
def counting(self: Transaction) -> TableMetadata:
571+
nonlocal call_count
572+
call_count += 1
573+
return original_fget(self)
574+
575+
def make_file() -> DataFile:
576+
return DataFile.from_args(content=DataFileContent.DATA, record_count=1, file_size_in_bytes=1, partition=Record())
577+
578+
txn = table_v2.transaction()
579+
580+
with patch.object(Transaction, "table_metadata", property(counting)):
581+
# _summary() access count must not scale with the number of data files
582+
def summary_accesses(n_files: int) -> int:
583+
append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
584+
for _ in range(n_files):
585+
append.append_data_file(make_file())
586+
before = call_count
587+
append._summary()
588+
return call_count - before
589+
590+
few, many = summary_accesses(10), summary_accesses(100)
591+
assert few == many, f"_summary() table_metadata accesses scale with file count ({few} vs {many})"
592+
assert many <= 2, f"_summary() accessed table_metadata {many} times; expected O(1)"
593+
594+
# _MergeAppendFiles.__init__ should add exactly one access over _FastAppendFiles.__init__
595+
before = call_count
596+
_FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
597+
fast_init = call_count - before
598+
before = call_count
599+
_MergeAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
600+
merge_init = call_count - before
601+
assert merge_init - fast_init == 1, (
602+
f"_MergeAppendFiles.__init__ made {merge_init - fast_init} extra table_metadata "
603+
"accesses over its superclass; expected 1 (hoisted)"
604+
)

0 commit comments

Comments
 (0)