Skip to content

Commit c9019ec

Browse files
authored
Add transaction operations to retry in UpdateTableMetadata
1 parent 07efc93 commit c9019ec

4 files changed

Lines changed: 97 additions & 16 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@
106106
from pyiceberg.table.update import (
107107
AddPartitionSpecUpdate,
108108
AddSchemaUpdate,
109-
AddSnapshotUpdate,
110109
AddSortOrderUpdate,
111110
AssertCreate,
112111
AssertRefSnapshotId,
@@ -127,7 +126,6 @@
127126
)
128127
from pyiceberg.table.update.schema import UpdateSchema
129128
from pyiceberg.table.update.snapshot import (
130-
_SnapshotProducer,
131129
ManageSnapshots,
132130
UpdateSnapshot,
133131
_FastAppendFiles,
@@ -161,6 +159,7 @@
161159
from duckdb import DuckDBPyConnection
162160

163161
from pyiceberg.catalog import Catalog
162+
from pyiceberg.table.update import UpdateTableMetadata
164163

165164
ALWAYS_TRUE = AlwaysTrue()
166165
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
@@ -262,7 +261,7 @@ class Transaction:
262261
_autocommit: bool
263262
_updates: Tuple[TableUpdate, ...]
264263
_requirements: Tuple[TableRequirement, ...]
265-
_snapshot_operations: Tuple[_SnapshotProducer, ...]
264+
_snapshot_operations: Tuple[UpdateTableMetadata, ...]
266265

267266
def __init__(self, table: Table, autocommit: bool = False):
268267
"""Open a transaction to stage and commit changes to a table.
@@ -504,8 +503,6 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
504503
for data_file in data_files:
505504
append_files.append_data_file(data_file)
506505

507-
self._snapshot_operations += (append_files,)
508-
509506
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
510507
"""
511508
Shorthand for overwriting existing partitions with a PyArrow table.
@@ -561,8 +558,6 @@ def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[st
561558
for data_file in data_files:
562559
append_files.append_data_file(data_file)
563560

564-
self._snapshot_operations += (append_files,)
565-
566561
def overwrite(
567562
self,
568563
df: pa.Table,
@@ -620,8 +615,6 @@ def overwrite(
620615
for data_file in data_files:
621616
append_files.append_data_file(data_file)
622617

623-
self._snapshot_operations += (append_files,)
624-
625618
def delete(
626619
self,
627620
delete_filter: Union[str, BooleanExpression],
@@ -716,8 +709,6 @@ def delete(
716709
if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
717710
warnings.warn("Delete operation did not match any records")
718711

719-
self._snapshot_operations += (delete_snapshot,)
720-
721712
def add_files(
722713
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
723714
) -> None:
@@ -754,8 +745,6 @@ def add_files(
754745
for data_file in data_files:
755746
append_snapshot.append_data_file(data_file)
756747

757-
self._snapshot_operations += (append_snapshot,)
758-
759748
def update_spec(self) -> UpdateSpec:
760749
"""Create a new UpdateSpec to update the partitioning of the table.
761750

pyiceberg/table/metadata.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@
6565
COMMIT_NUM_RETRIES_DEFAULT = 4
6666

6767
COMMIT_MIN_RETRY_WAIT_MS = "commit.retry.min-wait-ms"
68-
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 100
68+
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 1000 # 1 second
6969

7070
COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms"
71-
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60 * 1000 # 1 minute
71+
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 5000 # 5 seconds
7272

7373

7474
INITIAL_SEQUENCE_NUMBER = 0

pyiceberg/table/update/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ def _before_commit_inner(state: RetryCallState) -> None:
104104
def commit_inner() -> None:
105105
self._transaction._apply(*self._commit())
106106

107-
return commit_inner()
107+
commit_inner()
108+
self._transaction._snapshot_operations += (self,)
108109

109110
def _cleanup_commit_failure(self) -> None:
110111
"""Prepare the snapshot producer to commit against the latest version of the table after it has been updated."""

tests/table/test_init.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
1818
import json
19+
from unittest.mock import Mock
1920
import uuid
2021
from copy import copy
2122
from typing import Any, Dict
@@ -43,6 +44,7 @@
4344
from pyiceberg.partitioning import PartitionField, PartitionSpec
4445
from pyiceberg.schema import Schema
4546
from pyiceberg.table import (
47+
ALWAYS_TRUE,
4648
CommitTableRequest,
4749
StaticTable,
4850
Table,
@@ -94,6 +96,7 @@
9496
BucketTransform,
9597
IdentityTransform,
9698
)
99+
from pyiceberg.typedef import Record
97100
from pyiceberg.types import (
98101
BinaryType,
99102
BooleanType,
@@ -1378,3 +1381,91 @@ def test_remove_statistics_update(table_v2_with_statistics: Table) -> None:
13781381
table_v2_with_statistics.metadata,
13791382
(RemoveStatisticsUpdate(snapshot_id=123456789),),
13801383
)
1384+
1385+
1386+
def test_transaction_commit_retry(table_v1: Table, mocker: Mock) -> None:
1387+
import pyarrow as pa
1388+
1389+
mock_data_file = DataFile(
1390+
content=DataFileContent.DATA,
1391+
file_path="s3://some-path/some-file.parquet",
1392+
file_format=FileFormat.PARQUET,
1393+
partition=Record(),
1394+
record_count=131327,
1395+
file_size_in_bytes=220669226,
1396+
column_sizes={1: 220661854},
1397+
value_counts={1: 131327},
1398+
null_value_counts={1: 0},
1399+
nan_value_counts={},
1400+
lower_bounds={1: b"aaaaaaaaaaaaaaaa"},
1401+
upper_bounds={1: b"zzzzzzzzzzzzzzzz"},
1402+
key_metadata=b"\xde\xad\xbe\xef",
1403+
split_offsets=[4, 133697593],
1404+
equality_ids=[],
1405+
sort_order_id=4,
1406+
)
1407+
1408+
call_count = 0
1409+
captured_args = []
1410+
1411+
def mock_do_commit(*args, **kwargs):
1412+
"""Capture arguments to `Transaction._do_commit` and invoke an initial retry."""
1413+
1414+
nonlocal call_count
1415+
captured_args.append((args, kwargs))
1416+
call_count += 1
1417+
if call_count == 1:
1418+
raise CommitFailedException("Test")
1419+
return None
1420+
1421+
# Patch out IO of data, manifests, and metadata
1422+
mocker.patch("pyiceberg.io.pyarrow.write_file", return_value=[mock_data_file])
1423+
mocker.patch("pyiceberg.table.update.snapshot.write_manifest")
1424+
mocker.patch("pyiceberg.table.update.snapshot.write_manifest_list")
1425+
mocker.patch("pyiceberg.catalog.noop.NoopCatalog.load_table", return_value=table_v1)
1426+
mocker.patch("pyiceberg.table.Table._do_commit", side_effect=mock_do_commit)
1427+
1428+
schema = pa.schema(
1429+
[
1430+
pa.field("x", pa.int64(), nullable=False),
1431+
pa.field("y", pa.int64(), nullable=False),
1432+
pa.field("z", pa.int64(), nullable=False),
1433+
]
1434+
)
1435+
1436+
trx = table_v1.transaction()
1437+
with pytest.warns(UserWarning):
1438+
trx.delete(ALWAYS_TRUE)
1439+
trx.append(pa.Table.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]}, schema=schema))
1440+
trx.commit_transaction()
1441+
1442+
# Verify that _do_commit was called twice (first failed, second succeeded)
1443+
assert call_count == 2, f"Expected 2 calls to _do_commit, got {call_count}"
1444+
1445+
# Inspect the arguments passed to both commit attempts
1446+
_, first_call_kwargs = captured_args[0]
1447+
_, second_call_kwargs = captured_args[1]
1448+
1449+
# Extract updates and requirements from both calls
1450+
first_updates = first_call_kwargs.get("updates", ())
1451+
first_requirements = first_call_kwargs.get("requirements", ())
1452+
second_updates = second_call_kwargs.get("updates", ())
1453+
second_requirements = second_call_kwargs.get("requirements", ())
1454+
1455+
# Assert retry has same number of updates and requirements as first attempt
1456+
assert len(first_updates) == len(second_updates), f"Updates count mismatch: {len(first_updates)} vs {len(second_updates)}"
1457+
assert len(first_requirements) == len(second_requirements), (
1458+
f"Requirements count mismatch: {len(first_requirements)} vs {len(second_requirements)}"
1459+
)
1460+
1461+
# Assert retry has same types of updates as first attempt
1462+
first_update_types = [type(update).__name__ for update in first_updates]
1463+
second_update_types = [type(update).__name__ for update in second_updates]
1464+
assert first_update_types == second_update_types, f"Update types mismatch: {first_update_types} vs {second_update_types}"
1465+
1466+
# Assert retry has same types of requirements as first attempt
1467+
first_requirement_types = [type(req).__name__ for req in first_requirements]
1468+
second_requirement_types = [type(req).__name__ for req in second_requirements]
1469+
assert first_requirement_types == second_requirement_types, (
1470+
f"Requirement types mismatch: {first_requirement_types} vs {second_requirement_types}"
1471+
)

0 commit comments

Comments
 (0)