Skip to content

Commit 9c0e129

Browse files
test(benchmark): convert add_files dup-check bench to pytest style
1 parent f7d63b6 commit 9c0e129

2 files changed

Lines changed: 134 additions & 114 deletions

File tree

tests/benchmark/bench_add_files_dup_check.py

Lines changed: 0 additions & 114 deletions
This file was deleted.
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Memory benchmark for `add_files(check_duplicate_files=True)`.
18+
19+
Reproduces the per-call cost of the duplicate-files check on a growing
20+
table. Before fix: each call materializes every DataFile in the snapshot
21+
into a pyarrow Table (with readable_metrics, partition decode, full stats
22+
dicts) and post-filters on file_path — peak memory grows roughly linearly
23+
with cumulative file count, dominated by per-column stats decoding.
24+
After fix: streaming manifest scan with set containment on file_path,
25+
peak memory stays flat.
26+
27+
Run with: uv run pytest tests/benchmark/test_add_files_dup_check_benchmark.py -v -s -m benchmark
28+
"""
29+
30+
from __future__ import annotations
31+
32+
import gc
33+
import tempfile
34+
import tracemalloc
35+
from pathlib import Path
36+
37+
import pyarrow as pa
38+
import pyarrow.parquet as pq
39+
import pytest
40+
41+
from pyiceberg.catalog.memory import InMemoryCatalog
42+
from pyiceberg.schema import Schema
43+
from pyiceberg.types import IntegerType, NestedField, StringType
44+
45+
46+
@pytest.fixture
47+
def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog:
48+
warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
49+
catalog = InMemoryCatalog("memory_test", warehouse=f"file://{warehouse_path}")
50+
catalog.create_namespace("default")
51+
return catalog
52+
53+
54+
def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]:
55+
"""Build a wide-ish schema so per-column stats decoding has work to do."""
56+
iceberg_fields = [NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)]
57+
for i in range(2, num_columns + 1):
58+
iceberg_fields.append(NestedField(field_id=i, name=f"col_{i}", field_type=StringType(), required=False))
59+
iceberg_schema = Schema(*iceberg_fields)
60+
arrow_schema = pa.schema(
61+
[pa.field("id", pa.int32(), nullable=False)]
62+
+ [pa.field(f"col_{i}", pa.string(), nullable=True) for i in range(2, num_columns + 1)]
63+
)
64+
return iceberg_schema, arrow_schema
65+
66+
67+
def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema: pa.Schema) -> list[str]:
68+
paths: list[str] = []
69+
rows = pa.Table.from_pydict(
70+
{
71+
name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j in range(8)]
72+
for name in arrow_schema.names
73+
},
74+
schema=arrow_schema,
75+
)
76+
for i in range(n_files):
77+
p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet"
78+
pq.write_table(rows, p)
79+
paths.append(f"file://{p}")
80+
return paths
81+
82+
83+
@pytest.mark.benchmark
84+
def test_add_files_dup_check_memory_growth(memory_catalog: InMemoryCatalog) -> None:
85+
"""Peak memory per `add_files(check_duplicate_files=True)` call should stay
86+
flat across consecutive calls on a growing table.
87+
88+
With the materialize-then-filter implementation, peak grows roughly linearly
89+
with cumulative file count (per-column stats decoding into a pyarrow Table).
90+
With the streaming-scan implementation, peak stays bounded by the per-call
91+
workload.
92+
"""
93+
num_batches = 10
94+
files_per_batch = 200
95+
iceberg_schema, arrow_schema = _wide_schema(num_columns=30)
96+
97+
with tempfile.TemporaryDirectory() as tmp_root:
98+
data_dir = Path(tmp_root) / "data"
99+
data_dir.mkdir()
100+
table = memory_catalog.create_table("default.add_files_bench", schema=iceberg_schema)
101+
102+
gc.collect()
103+
tracemalloc.start()
104+
105+
peaks_mb: list[float] = []
106+
print(f"\n--- add_files dup-check benchmark ({num_batches} batches × {files_per_batch} files, 30 cols) ---")
107+
print(f"{'batch':>5} {'tracemalloc_peak_MB':>22} {'cumulative_files':>17}")
108+
109+
cumulative = 0
110+
for b in range(num_batches):
111+
paths = _write_files(data_dir, b, files_per_batch, arrow_schema)
112+
tracemalloc.reset_peak()
113+
table.add_files(file_paths=paths, check_duplicate_files=True)
114+
_, peak = tracemalloc.get_traced_memory()
115+
peak_mb = peak / (1024 * 1024)
116+
peaks_mb.append(peak_mb)
117+
cumulative += files_per_batch
118+
print(f"{b:>5d} {peak_mb:>22.1f} {cumulative:>17d}")
119+
120+
tracemalloc.stop()
121+
122+
# Growth ratio: last call peak vs first call peak.
123+
# Materialize-then-filter (pre-fix): observed ~7× on this workload.
124+
# Streaming scan (post-fix): observed ~1×–1.5× (mostly noise).
125+
# Threshold of 3× catches the regression while tolerating variance.
126+
first_peak = peaks_mb[0]
127+
last_peak = peaks_mb[-1]
128+
ratio = last_peak / first_peak if first_peak > 0 else float("inf")
129+
print(f"\n Peak ratio (last / first): {ratio:.1f}×")
130+
max_ratio = 3.0
131+
assert ratio < max_ratio, (
132+
f"Peak memory ratio ({ratio:.1f}×) exceeds {max_ratio}×. "
133+
"Dup-check materializes the full snapshot rather than streaming on file_path."
134+
)

0 commit comments

Comments
 (0)