Skip to content

Commit 6bc0cf8

Browse files
committed
Expose S3 retry strategy
1 parent f4da19e commit 6bc0cf8

4 files changed

Lines changed: 61 additions & 17 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -109,23 +109,24 @@ For the FileIO there are several configuration options available:
109109

110110
<!-- markdown-link-check-disable -->
111111

112-
| Key | Example | Description |
113-
|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
114-
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
115-
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
116-
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
117-
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
118-
| s3.role-session-name | session | An optional identifier for the assumed role session. |
119-
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
120-
| s3.signer | bearer | Configure the signature version of the FileIO. |
121-
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
122-
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
123-
| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). |
124-
| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). |
125-
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
126-
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
127-
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
112+
| Key | Example | Description |
113+
|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
114+
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
115+
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
116+
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
117+
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
118+
| s3.role-session-name | session | An optional identifier for the assumed role session. |
119+
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
120+
| s3.signer | bearer | Configure the signature version of the FileIO. |
121+
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
122+
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
123+
| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). |
124+
| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). |
125+
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
126+
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
127+
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
128128
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
129+
| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. |
129130

130131
<!-- markdown-link-check-enable-->
131132

pyiceberg/io/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
S3_ROLE_ARN = "s3.role-arn"
7171
S3_ROLE_SESSION_NAME = "s3.role-session-name"
7272
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
73+
S3_RETRY_STRATEGY_IMPL = "s3.retry-strategy-impl"
7374
HDFS_HOST = "hdfs.host"
7475
HDFS_PORT = "hdfs.port"
7576
HDFS_USER = "hdfs.user"

pyiceberg/io/pyarrow.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import concurrent.futures
2929
import fnmatch
3030
import functools
31+
import importlib
3132
import itertools
3233
import logging
3334
import operator
@@ -65,6 +66,7 @@
6566
import pyarrow.lib
6667
import pyarrow.parquet as pq
6768
from pyarrow import ChunkedArray
69+
from pyarrow._s3fs import S3RetryStrategy
6870
from pyarrow.fs import (
6971
FileInfo,
7072
FileSystem,
@@ -108,6 +110,7 @@
108110
S3_REGION,
109111
S3_REQUEST_TIMEOUT,
110112
S3_RESOLVE_REGION,
113+
S3_RETRY_STRATEGY_IMPL,
111114
S3_ROLE_ARN,
112115
S3_ROLE_SESSION_NAME,
113116
S3_SECRET_ACCESS_KEY,
@@ -212,6 +215,20 @@ def _cached_resolve_s3_region(bucket: str) -> Optional[str]:
212215
return None
213216

214217

218+
def _import_retry_strategy(impl: str) -> Optional[S3RetryStrategy]:
219+
try:
220+
path_parts = impl.split(".")
221+
if len(path_parts) < 2:
222+
raise ValueError(f"retry-strategy-impl should be full path (module.CustomS3RetryStrategy), got: {impl}")
223+
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
224+
module = importlib.import_module(module_name)
225+
class_ = getattr(module, class_name)
226+
return class_()
227+
except (ModuleNotFoundError, AttributeError):
228+
warnings.warn(f"Could not initialize S3 retry strategy: {impl}")
229+
return None
230+
231+
215232
class UnsupportedPyArrowTypeException(Exception):
216233
"""Cannot convert PyArrow type to corresponding Iceberg type."""
217234

@@ -473,6 +490,11 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
473490
if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
474491
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)
475492

493+
if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and (
494+
retry_instance := _import_retry_strategy(retry_strategy_impl)
495+
):
496+
client_kwargs["retry_strategy"] = retry_instance
497+
476498
return S3FileSystem(**client_kwargs)
477499

478500
def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:

tests/io/test_pyarrow.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import os
2020
import tempfile
2121
import uuid
22+
import warnings
2223
from datetime import date
2324
from typing import Any, List, Optional
2425
from unittest.mock import MagicMock, patch
@@ -27,6 +28,7 @@
2728
import pyarrow as pa
2829
import pyarrow.parquet as pq
2930
import pytest
31+
from pyarrow._s3fs import AwsDefaultS3RetryStrategy
3032
from pyarrow.fs import FileType, LocalFileSystem, S3FileSystem
3133

3234
from pyiceberg.exceptions import ResolveError
@@ -55,7 +57,7 @@
5557
Or,
5658
)
5759
from pyiceberg.expressions.literals import literal
58-
from pyiceberg.io import InputStream, OutputStream, load_file_io
60+
from pyiceberg.io import S3_RETRY_STRATEGY_IMPL, InputStream, OutputStream, load_file_io
5961
from pyiceberg.io.pyarrow import (
6062
ICEBERG_SCHEMA,
6163
ArrowScan,
@@ -2319,3 +2321,21 @@ def test_pyarrow_io_multi_fs() -> None:
23192321

23202322
# Same PyArrowFileIO instance resolves local file input to LocalFileSystem
23212323
assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem)
2324+
2325+
2326+
class SomeRetryStrategy(AwsDefaultS3RetryStrategy):
2327+
def __init__(self) -> None:
2328+
super().__init__()
2329+
warnings.warn("Initialized SomeRetryStrategy 👍")
2330+
2331+
2332+
def test_retry_strategy() -> None:
2333+
io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "tests.io.test_pyarrow.SomeRetryStrategy"})
2334+
with pytest.warns(UserWarning, match="Initialized SomeRetryStrategy.*"):
2335+
io.new_input("s3://bucket/path/to/file")
2336+
2337+
2338+
def test_retry_strategy_not_found() -> None:
2339+
io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"})
2340+
with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"):
2341+
io.new_input("s3://bucket/path/to/file")

0 commit comments

Comments
 (0)