Skip to content

Commit b3ddf48

Browse files
committed
Add support for commit.retry.total-timeout-ms property
1 parent 5f8c387 commit b3ddf48

3 files changed

Lines changed: 22 additions & 4 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
from pydantic import Field
4444
from sortedcontainers import SortedList
45-
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential
45+
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, stop_after_delay, wait_random_exponential
4646

4747
import pyiceberg.expressions.parser as parser
4848
from pyiceberg.exceptions import CommitFailedException
@@ -91,6 +91,8 @@
9191
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
9292
COMMIT_NUM_RETRIES,
9393
COMMIT_NUM_RETRIES_DEFAULT,
94+
COMMIT_RETRY_TOTAL_TIMEOUT_MS,
95+
COMMIT_RETRY_TOTAL_TIMEOUT_MS_DEFAULT,
9496
INITIAL_SEQUENCE_NUMBER,
9597
TableMetadata,
9698
)
@@ -785,6 +787,7 @@ def commit_transaction(self) -> Table:
785787
min_wait_ms = int(self.table_metadata.properties.get(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT))
786788
max_wait_ms = int(self.table_metadata.properties.get(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT))
787789
num_retries = int(self.table_metadata.properties.get(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
790+
timeout_ms = int(self.table_metadata.properties.get(COMMIT_RETRY_TOTAL_TIMEOUT_MS, COMMIT_RETRY_TOTAL_TIMEOUT_MS_DEFAULT))
788791

789792
def _before_attempt(state: RetryCallState):
790793
if state.attempt_number > 1:
@@ -814,7 +817,7 @@ def _error_callback(state: RetryCallState):
814817
@wraps(self.commit_transaction)
815818
@retry(
816819
wait=wait_random_exponential(min=min_wait_ms / 1000, max=max_wait_ms / 1000),
817-
stop=stop_after_attempt(num_retries + 1),
820+
stop=(stop_after_attempt(num_retries + 1) | stop_after_delay(timeout_ms / 1000)),
818821
retry=retry_if_exception_type(CommitFailedException),
819822
before=_before_attempt,
820823
retry_error_callback=_error_callback,

pyiceberg/table/metadata.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms"
7171
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 5000 # 5 seconds
7272

73+
COMMIT_RETRY_TOTAL_TIMEOUT_MS = "commit.retry.total-timeout-ms"
74+
COMMIT_RETRY_TOTAL_TIMEOUT_MS_DEFAULT = 1_800_000 # 30 mins
7375

7476
INITIAL_SEQUENCE_NUMBER = 0
7577
INITIAL_SPEC_ID = 0

pyiceberg/table/update/__init__.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,15 @@
2525
from typing import TYPE_CHECKING, Annotated, Any, Dict, Generic, List, Literal, Optional, Tuple, TypeVar, Union, cast
2626

2727
from pydantic import Field, field_validator, model_validator
28-
from tenacity import RetryCallState, before_sleep_log, retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential
28+
from tenacity import (
29+
RetryCallState,
30+
before_sleep_log,
31+
retry,
32+
retry_if_exception_type,
33+
stop_after_attempt,
34+
stop_after_delay,
35+
wait_random_exponential,
36+
)
2937

3038
from pyiceberg.exceptions import CommitFailedException
3139
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
@@ -37,6 +45,8 @@
3745
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
3846
COMMIT_NUM_RETRIES,
3947
COMMIT_NUM_RETRIES_DEFAULT,
48+
COMMIT_RETRY_TOTAL_TIMEOUT_MS,
49+
COMMIT_RETRY_TOTAL_TIMEOUT_MS_DEFAULT,
4050
SUPPORTED_TABLE_FORMAT_VERSION,
4151
TableMetadata,
4252
TableMetadataUtil,
@@ -86,6 +96,9 @@ def commit(self) -> None:
8696
self._transaction.table_metadata.properties.get(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT)
8797
)
8898
num_retries = int(self._transaction.table_metadata.properties.get(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
99+
timeout_ms = int(
100+
self._transaction.table_metadata.properties.get(COMMIT_RETRY_TOTAL_TIMEOUT_MS, COMMIT_RETRY_TOTAL_TIMEOUT_MS_DEFAULT)
101+
)
89102

90103
def _before_commit_inner(state: RetryCallState) -> None:
91104
if state.attempt_number > 1:
@@ -94,7 +107,7 @@ def _before_commit_inner(state: RetryCallState) -> None:
94107
@wraps(self.commit)
95108
@retry(
96109
wait=wait_random_exponential(min=min_wait_ms / 1000, max=max_wait_ms / 1000),
97-
stop=stop_after_attempt(num_retries),
110+
stop=(stop_after_attempt(num_retries) | stop_after_delay(timeout_ms / 1000)),
98111
retry=retry_if_exception_type(CommitFailedException),
99112
before=_before_commit_inner,
100113
before_sleep=lambda state: logger.debug(

0 commit comments

Comments
 (0)