Skip to content

Commit 28cd091

Browse files
committed
add anonymous support to pyarrow/oss, update docs
1 parent 98ec0c6 commit 28cd091

5 files changed

Lines changed: 50 additions & 8 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ For the FileIO there are several configuration options available:
127127
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
128128
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
129129
| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. |
130+
| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or boto's credential resolver. |
130131

131132
<!-- markdown-link-check-enable-->
132133

@@ -197,6 +198,8 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya
197198
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
198199
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
199200
| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This is set to `True` by default as OSS can only be accessed with virtual hosted style address. |
201+
| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or standard AWS configuration methods. |
202+
200203

201204
<!-- markdown-link-check-enable-->
202205

pyiceberg/io/fsspec.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
OutputStream,
8585
)
8686
from pyiceberg.typedef import Properties
87-
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool
87+
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool, strtobool
8888

8989
logger = logging.getLogger(__name__)
9090

@@ -164,9 +164,9 @@ def _s3(properties: Properties) -> AbstractFileSystem:
164164

165165
if request_timeout := properties.get(S3_REQUEST_TIMEOUT):
166166
config_kwargs["read_timeout"] = float(request_timeout)
167-
167+
168168
if s3_anonymous := properties.get(S3_ANONYMOUS):
169-
config_kwargs["anon"] = bool(s3_anonymous)
169+
config_kwargs["anon"] = strtobool(s3_anonymous)
170170

171171
fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs)
172172

pyiceberg/io/pyarrow.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
HDFS_USER,
110110
PYARROW_USE_LARGE_TYPES_ON_READ,
111111
S3_ACCESS_KEY_ID,
112+
S3_ANONYMOUS,
112113
S3_CONNECT_TIMEOUT,
113114
S3_ENDPOINT,
114115
S3_FORCE_VIRTUAL_ADDRESSING,
@@ -185,7 +186,7 @@
185186
from pyiceberg.utils.datetime import millis_to_datetime
186187
from pyiceberg.utils.decimal import unscaled_to_decimal
187188
from pyiceberg.utils.deprecated import deprecation_message
188-
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
189+
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int, strtobool
189190
from pyiceberg.utils.singleton import Singleton
190191
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
191192

@@ -450,6 +451,9 @@ def _initialize_oss_fs(self) -> FileSystem:
450451
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
451452
client_kwargs["session_name"] = session_name
452453

454+
if s3_anonymous := self.properties.get(S3_ANONYMOUS):
455+
client_kwargs["anonymous"] = strtobool(s3_anonymous)
456+
453457
return S3FileSystem(**client_kwargs)
454458

455459
def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
@@ -501,6 +505,9 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
501505
):
502506
client_kwargs["retry_strategy"] = retry_instance
503507

508+
if s3_anonymous := self.properties.get(S3_ANONYMOUS):
509+
client_kwargs["anonymous"] = strtobool(s3_anonymous)
510+
504511
return S3FileSystem(**client_kwargs)
505512

506513
def _initialize_azure_fs(self) -> FileSystem:
@@ -2793,9 +2800,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
27932800
functools.reduce(
27942801
operator.and_,
27952802
[
2796-
pc.field(partition_field_name) == unique_partition[partition_field_name]
2797-
if unique_partition[partition_field_name] is not None
2798-
else pc.field(partition_field_name).is_null()
2803+
(
2804+
pc.field(partition_field_name) == unique_partition[partition_field_name]
2805+
if unique_partition[partition_field_name] is not None
2806+
else pc.field(partition_field_name).is_null()
2807+
)
27992808
for field, partition_field_name in zip(spec.fields, partition_fields)
28002809
],
28012810
)

tests/io/test_fsspec.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,8 @@ def test_fsspec_s3_session_properties() -> None:
265265
config_kwargs={},
266266
)
267267

268-
def test_fsspec_s3_session_properties_anon_config() -> None:
268+
269+
def test_fsspec_s3_session_properties_with_anonymous() -> None:
269270
session_properties: Properties = {
270271
"s3.anonymous": "true",
271272
"s3.endpoint": "http://localhost:9000",

tests/io/test_pyarrow.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,35 @@ def test_pyarrow_s3_session_properties() -> None:
390390
)
391391

392392

393+
def test_pyarrow_s3_session_properties_with_anonymous() -> None:
394+
session_properties: Properties = {
395+
"s3.anonymous": "true",
396+
"s3.endpoint": "http://localhost:9000",
397+
"s3.access-key-id": "admin",
398+
"s3.secret-access-key": "password",
399+
"s3.region": "us-east-1",
400+
"s3.session-token": "s3.session-token",
401+
**UNIFIED_AWS_SESSION_PROPERTIES,
402+
}
403+
404+
with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
405+
s3_fileio = PyArrowFileIO(properties=session_properties)
406+
filename = str(uuid.uuid4())
407+
408+
# Mock `resolve_s3_region` to prevent from the location used resolving to a different s3 region
409+
mock_s3_region_resolver.side_effect = OSError("S3 bucket is not found")
410+
s3_fileio.new_input(location=f"s3://warehouse/{filename}")
411+
412+
mock_s3fs.assert_called_with(
413+
anonymous=True,
414+
endpoint_override="http://localhost:9000",
415+
access_key="admin",
416+
secret_key="password",
417+
region="us-east-1",
418+
session_token="s3.session-token",
419+
)
420+
421+
393422
def test_pyarrow_unified_session_properties() -> None:
394423
session_properties: Properties = {
395424
"s3.endpoint": "http://localhost:9000",

0 commit comments

Comments
 (0)