Skip to content

Commit f7d63b6

Browse files
test(benchmark): add_files dup-check wall + tracemalloc growth
1 parent 4ed64d4 commit f7d63b6

1 file changed

Lines changed: 114 additions & 0 deletions

File tree

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Standalone benchmark for `add_files(check_duplicate_files=True)`.
18+
19+
Measures wall-clock and `tracemalloc` peak for the dup-check phase across
20+
N consecutive `add_files` calls on a growing table. Run before and after
21+
the fix to compare; this script doesn't import unreleased code, so it
22+
works against any pyiceberg checkout.
23+
24+
Usage:
25+
cd /path/to/iceberg-python
26+
uv run python tests/benchmark/bench_add_files_dup_check.py
27+
"""
28+
29+
from __future__ import annotations
30+
31+
import gc
32+
import tempfile
33+
import time
34+
import tracemalloc
35+
from pathlib import Path
36+
37+
import pyarrow as pa
38+
import pyarrow.parquet as pq
39+
40+
from pyiceberg.catalog.memory import InMemoryCatalog
41+
from pyiceberg.schema import Schema
42+
from pyiceberg.types import IntegerType, NestedField, StringType
43+
44+
45+
def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]:
46+
"""Build a wide-ish schema so per-column stats decoding has work to do."""
47+
iceberg_fields = [NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)]
48+
for i in range(2, num_columns + 1):
49+
iceberg_fields.append(
50+
NestedField(field_id=i, name=f"col_{i}", field_type=StringType(), required=False)
51+
)
52+
iceberg_schema = Schema(*iceberg_fields)
53+
arrow_schema = pa.schema([pa.field("id", pa.int32(), nullable=False)] + [
54+
pa.field(f"col_{i}", pa.string(), nullable=True) for i in range(2, num_columns + 1)
55+
])
56+
return iceberg_schema, arrow_schema
57+
58+
59+
def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema: pa.Schema) -> list[str]:
60+
"""Write `n_files` tiny parquet files; return their absolute file:// paths."""
61+
paths: list[str] = []
62+
rows = pa.Table.from_pydict(
63+
{
64+
name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j in range(8)]
65+
for name in arrow_schema.names
66+
},
67+
schema=arrow_schema,
68+
)
69+
for i in range(n_files):
70+
p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet"
71+
pq.write_table(rows, p)
72+
paths.append(f"file://{p}")
73+
return paths
74+
75+
76+
def main() -> None:
77+
num_batches = 10
78+
files_per_batch = 200
79+
80+
iceberg_schema, arrow_schema = _wide_schema(num_columns=30)
81+
82+
with tempfile.TemporaryDirectory() as tmp_root:
83+
warehouse = Path(tmp_root) / "warehouse"
84+
data_dir = Path(tmp_root) / "data"
85+
warehouse.mkdir()
86+
data_dir.mkdir()
87+
88+
catalog = InMemoryCatalog("bench", warehouse=f"file://{warehouse}")
89+
catalog.create_namespace("default")
90+
table = catalog.create_table("default.bench", schema=iceberg_schema)
91+
92+
gc.collect()
93+
tracemalloc.start()
94+
95+
print(f"\nadd_files(check_duplicate_files=True) benchmark")
96+
print(f" batches={num_batches}, files_per_batch={files_per_batch}, columns={len(arrow_schema.names)}")
97+
print(f"{'batch':>5} {'wall_s':>8} {'tracemalloc_peak_MB':>22} {'cumulative_files':>17}")
98+
99+
cumulative = 0
100+
for b in range(num_batches):
101+
paths = _write_files(data_dir, b, files_per_batch, arrow_schema)
102+
tracemalloc.reset_peak()
103+
t0 = time.perf_counter()
104+
table.add_files(file_paths=paths, check_duplicate_files=True)
105+
wall = time.perf_counter() - t0
106+
_, peak = tracemalloc.get_traced_memory()
107+
cumulative += files_per_batch
108+
print(f"{b:>5d} {wall:>8.2f} {peak / (1024 * 1024):>22.1f} {cumulative:>17d}")
109+
110+
tracemalloc.stop()
111+
112+
113+
if __name__ == "__main__":
114+
main()

0 commit comments

Comments
 (0)