Skip to content

Commit 5c8dc67

Browse files
committed
Add table.maintenance.compact() for full-table data file compaction
This introduces a simplified, whole-table compaction strategy via the MaintenanceTable API (`table.maintenance.compact()`). Key implementation details: - Reads the entire table state into memory via `.to_arrow()`. - Uses `table.overwrite()` to rewrite data, leveraging PyIceberg's target file bin-packing (`write.target-file-size-bytes`) natively. - Ensures atomicity by executing within a table transaction. - Explicitly sets `snapshot-type: replace` and `replace-operation: compaction` to ensure correct metadata history for downstream engines. - Includes a guard to safely ignore compaction requests on empty tables. Includes full Pytest coverage in `tests/table/test_maintenance.py`. Closes #1092
1 parent 4173ef7 commit 5c8dc67

2 files changed

Lines changed: 124 additions & 0 deletions

File tree

pyiceberg/table/maintenance.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,26 @@ def expire_snapshots(self) -> ExpireSnapshots:
4343
from pyiceberg.table.update.snapshot import ExpireSnapshots
4444

4545
return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True))
46+
47+
def compact(self) -> None:
48+
"""Compact the table's data files by reading and overwriting the entire table.
49+
50+
Note: This is a full-table compaction that leverages Arrow for binpacking.
51+
It currently reads the entire table into memory via `.to_arrow()`.
52+
53+
This reads all existing data into memory and writes it back out using the
54+
target file size settings (write.target-file-size-bytes), atomically
55+
dropping the old files and replacing them with fewer, larger files.
56+
"""
57+
# Read the current table state into memory
58+
arrow_table = self.tbl.scan().to_arrow()
59+
60+
# Guard: if the table is completely empty, there's nothing to compact.
61+
# Doing an overwrite with an empty table would result in deleting everything.
62+
if arrow_table.num_rows == 0:
63+
logger.info("Table contains no rows, skipping compaction.")
64+
return
65+
66+
# Overwrite the table atomically (REPLACE operation)
67+
with self.tbl.transaction() as txn:
68+
txn.overwrite(arrow_table, snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"})

tests/table/test_maintenance.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import random
18+
import pyarrow as pa
19+
import pytest
20+
21+
from pyiceberg.catalog import Catalog
22+
from pyiceberg.schema import Schema
23+
from pyiceberg.partitioning import PartitionSpec, PartitionField
24+
from pyiceberg.transforms import IdentityTransform
25+
from pyiceberg.exceptions import NoSuchNamespaceError
26+
27+
28+
def test_maintenance_compact(catalog: Catalog) -> None:
29+
# Setup Schema and specs
30+
from pyiceberg.types import NestedField, StringType, LongType
31+
schema = Schema(
32+
NestedField(1, "id", LongType()),
33+
NestedField(2, "category", StringType()),
34+
NestedField(3, "value", LongType()),
35+
)
36+
spec = PartitionSpec(
37+
PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="category")
38+
)
39+
40+
# Create the namespace and table
41+
try:
42+
catalog.create_namespace("default")
43+
except NoSuchNamespaceError:
44+
pass
45+
table = catalog.create_table(
46+
"default.test_compaction",
47+
schema=schema,
48+
partition_spec=spec,
49+
)
50+
51+
# Append many small data files
52+
categories = ["cat1", "cat2", "cat3"]
53+
for i in range(12):
54+
table.append(pa.table({
55+
"id": list(range(i * 10, (i + 1) * 10)),
56+
"category": [categories[i % 3]] * 10,
57+
"value": [random.randint(1, 100) for _ in range(10)],
58+
}))
59+
60+
# Verify state before compaction
61+
before_files = list(table.scan().plan_files())
62+
assert len(before_files) == 12
63+
assert table.scan().to_arrow().num_rows == 120
64+
65+
# Execute Compaction
66+
table.maintenance.compact()
67+
68+
# Verify state after compaction
69+
table.refresh()
70+
after_files = list(table.scan().plan_files())
71+
assert len(after_files) == 3 # Should be 1 optimized data file per partition
72+
assert table.scan().to_arrow().num_rows == 120
73+
74+
# Ensure snapshot properties specify the replace-operation
75+
new_snapshot = table.current_snapshot()
76+
assert new_snapshot is not None
77+
assert new_snapshot.summary.get("snapshot-type") == "replace"
78+
assert new_snapshot.summary.get("replace-operation") == "compaction"
79+
80+
81+
def test_maintenance_compact_empty_table(catalog: Catalog) -> None:
82+
from pyiceberg.types import NestedField, StringType, LongType
83+
schema = Schema(
84+
NestedField(1, "id", LongType()),
85+
NestedField(2, "category", StringType()),
86+
)
87+
88+
try:
89+
catalog.create_namespace("default")
90+
except NoSuchNamespaceError:
91+
pass
92+
93+
table = catalog.create_table("default.test_compaction_empty", schema=schema)
94+
before_snapshots = len(table.history())
95+
96+
# Should safely return doing nothing
97+
table.maintenance.compact()
98+
99+
table.refresh()
100+
after_snapshots = len(table.history())
101+
assert before_snapshots == after_snapshots # No new snapshot should be made

0 commit comments

Comments
 (0)