Skip to content

Commit 61e05ec

Browse files
committed
Roll added manifests at commit.manifest.target-size-bytes in fast-append
_FastAppendFiles previously wrote every added data file into a single manifest, regardless of commit.manifest.target-size-bytes. Java's RollingManifestWriter honours this property on the fast-append path too; this change brings pyiceberg to parity. The first manifest is written inline until it reaches the target, which yields an exact entries-per-manifest count; remaining chunks are then submitted to the existing ExecutorFactory pool so the GIL-bound encode of chunk K overlaps with the compress/upload (both GIL-releasing) of earlier chunks. _write_delete_manifest and _existing_manifests are submitted first so they run concurrently with the inline first chunk. Small appends (under the target in one manifest) take the same code path and produce a single manifest as before. Multiple small manifests let query planners read them in parallel, prune whole manifests via the manifest-list partition bounds without opening them, and keep delete/overwrite rewrites bounded to the affected manifest instead of the full set.
1 parent e6d5129 commit 61e05ec

2 files changed

Lines changed: 147 additions & 20 deletions

File tree

pyiceberg/table/update/snapshot.py

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from abc import abstractmethod
2222
from collections import defaultdict
2323
from collections.abc import Callable
24+
from concurrent.futures import Future
2425
from datetime import datetime
2526
from functools import cached_property
2627
from typing import TYPE_CHECKING, Generic
@@ -176,24 +177,30 @@ def _process_manifests(self, manifests: list[ManifestFile]) -> list[ManifestFile
176177
return manifests
177178

178179
def _manifests(self) -> list[ManifestFile]:
179-
def _write_added_manifest() -> list[ManifestFile]:
180-
if self._added_data_files:
181-
with self.new_manifest_writer(
182-
spec=self._transaction.table_metadata.spec(),
183-
) as writer:
184-
for data_file in self._added_data_files:
185-
writer.add(
186-
ManifestEntry.from_args(
187-
status=ManifestEntryStatus.ADDED,
188-
snapshot_id=self._snapshot_id,
189-
sequence_number=None,
190-
file_sequence_number=None,
191-
data_file=data_file,
192-
)
193-
)
194-
return [writer.to_manifest_file()]
195-
else:
196-
return []
180+
from pyiceberg.table import TableProperties
181+
182+
table_metadata = self._transaction.table_metadata
183+
spec = table_metadata.spec()
184+
target_size_bytes: int = property_as_int(
185+
table_metadata.properties,
186+
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
187+
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
188+
) # type: ignore
189+
190+
def _added_entry(data_file: DataFile) -> ManifestEntry:
191+
return ManifestEntry.from_args(
192+
status=ManifestEntryStatus.ADDED,
193+
snapshot_id=self._snapshot_id,
194+
sequence_number=None,
195+
file_sequence_number=None,
196+
data_file=data_file,
197+
)
198+
199+
def _write_added_chunk(files: list[DataFile]) -> ManifestFile:
200+
with self.new_manifest_writer(spec=spec) as writer:
201+
for data_file in files:
202+
writer.add(_added_entry(data_file))
203+
return writer.to_manifest_file()
197204

198205
def _write_delete_manifest() -> list[ManifestFile]:
199206
# Check if we need to mark the files as deleted
@@ -217,11 +224,32 @@ def _write_delete_manifest() -> list[ManifestFile]:
217224

218225
executor = ExecutorFactory.get_or_create()
219226

220-
added_manifests = executor.submit(_write_added_manifest)
221227
delete_manifests = executor.submit(_write_delete_manifest)
222228
existing_manifests = executor.submit(self._existing_manifests)
223229

224-
return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())
230+
# Roll added data files into multiple manifests sized to
231+
# commit.manifest.target-size-bytes. The first manifest is written
232+
# inline until it reaches the target, which yields an exact
233+
# entries-per-manifest count; remaining chunks are then fanned out
234+
# across the executor so one chunk's GIL-bound encode overlaps with
235+
# the compress/upload of earlier chunks.
236+
added_manifests: list[ManifestFile] = []
237+
added_futures: list[Future[ManifestFile]] = []
238+
if self._added_data_files:
239+
added = self._added_data_files
240+
with self.new_manifest_writer(spec=spec) as first:
241+
i = 0
242+
while i < len(added) and (first.tell() < target_size_bytes or i == 0):
243+
first.add(_added_entry(added[i]))
244+
i += 1
245+
added_manifests.append(first.to_manifest_file())
246+
chunk_size = i
247+
added_futures = [
248+
executor.submit(_write_added_chunk, added[j : j + chunk_size]) for j in range(i, len(added), chunk_size)
249+
]
250+
251+
added_manifests += [f.result() for f in added_futures]
252+
return self._process_manifests(added_manifests + delete_manifests.result() + existing_manifests.result())
225253

226254
def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
227255
from pyiceberg.table import TableProperties
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
import pytest
20+
21+
from pyiceberg.catalog.memory import InMemoryCatalog
22+
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
23+
from pyiceberg.schema import Schema
24+
from pyiceberg.table import TableProperties
25+
from pyiceberg.table.snapshots import Operation
26+
from pyiceberg.table.update.snapshot import _FastAppendFiles
27+
from pyiceberg.typedef import Record
28+
from pyiceberg.types import IntegerType, NestedField, StringType
29+
30+
31+
@pytest.fixture
32+
def catalog(tmp_path: str) -> InMemoryCatalog:
33+
cat = InMemoryCatalog("test", warehouse=str(tmp_path))
34+
cat.create_namespace("default")
35+
return cat
36+
37+
38+
def _make_data_file(i: int) -> DataFile:
39+
return DataFile.from_args(
40+
content=DataFileContent.DATA,
41+
file_path=f"file:///tmp/part-{i:08d}.parquet",
42+
file_format=FileFormat.PARQUET,
43+
partition=Record(),
44+
record_count=100,
45+
file_size_in_bytes=12345,
46+
column_sizes={1: 100, 2: 100},
47+
value_counts={1: 100, 2: 100},
48+
null_value_counts={1: 0, 2: 0},
49+
nan_value_counts={},
50+
lower_bounds={},
51+
upper_bounds={},
52+
key_metadata=None,
53+
split_offsets=None,
54+
equality_ids=None,
55+
sort_order_id=None,
56+
spec_id=0,
57+
)
58+
59+
60+
def test_fast_append_rolls_added_manifests_at_target_size(catalog: InMemoryCatalog) -> None:
61+
schema = Schema(
62+
NestedField(1, "id", IntegerType(), required=True),
63+
NestedField(2, "name", StringType(), required=False),
64+
)
65+
table = catalog.create_table(
66+
"default.roll",
67+
schema=schema,
68+
properties={TableProperties.MANIFEST_TARGET_SIZE_BYTES: "4096"},
69+
)
70+
71+
txn = table.transaction()
72+
append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table.io)
73+
n_files = 200
74+
for i in range(n_files):
75+
append.append_data_file(_make_data_file(i))
76+
77+
manifests = append._manifests()
78+
79+
assert len(manifests) > 1, f"expected added files to roll into multiple manifests, got {len(manifests)}"
80+
assert sum(m.added_files_count or 0 for m in manifests) == n_files
81+
# The first manifest is written until it reaches the target, so it may slightly
82+
# overshoot; remaining manifests are chunked by the same entry count and should
83+
# be in the same ballpark.
84+
for m in manifests:
85+
assert m.manifest_length < 4 * 4096, f"manifest {m.manifest_path} is {m.manifest_length} bytes"
86+
87+
88+
def test_fast_append_single_manifest_when_under_target(catalog: InMemoryCatalog) -> None:
89+
schema = Schema(NestedField(1, "id", IntegerType(), required=True))
90+
table = catalog.create_table("default.small", schema=schema)
91+
92+
txn = table.transaction()
93+
append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table.io)
94+
for i in range(3):
95+
append.append_data_file(_make_data_file(i))
96+
97+
manifests = append._manifests()
98+
assert len(manifests) == 1
99+
assert manifests[0].added_files_count == 3

0 commit comments

Comments
 (0)