Skip to content

Commit c55eecc

Browse files
author
Alexandra Popova
committed
Check storage-credentials before config in table response
1 parent be91f8a commit c55eecc

2 files changed

Lines changed: 66 additions & 58 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
Any,
2121
Dict,
2222
List,
23-
Optional,
2423
Set,
2524
Tuple,
2625
Union,
@@ -153,18 +152,18 @@ def _retry_hook(retry_state: RetryCallState) -> None:
153152

154153

155154
class TableResponse(IcebergBaseModel):
156-
metadata_location: Optional[str] = Field(alias="metadata-location", default=None)
155+
metadata_location: str | None = Field(alias="metadata-location", default=None)
157156
metadata: TableMetadata
158157
config: Properties = Field(default_factory=dict)
159-
storage_credentials: Optional[Properties] = Field(alias="storage-credentials", default=None)
158+
storage_credentials: Properties | None = Field(alias="storage-credentials", default=None)
160159

161160

162161
class CreateTableRequest(IcebergBaseModel):
163162
name: str = Field()
164-
location: Optional[str] = Field()
163+
location: str | None = Field()
165164
table_schema: Schema = Field(alias="schema")
166-
partition_spec: Optional[PartitionSpec] = Field(alias="partition-spec")
167-
write_order: Optional[SortOrder] = Field(alias="write-order")
165+
partition_spec: PartitionSpec | None = Field(alias="partition-spec")
166+
write_order: SortOrder | None = Field(alias="write-order")
168167
stage_create: bool = Field(alias="stage-create", default=False)
169168
properties: Dict[str, str] = Field(default_factory=dict)
170169

@@ -180,8 +179,8 @@ class RegisterTableRequest(IcebergBaseModel):
180179

181180

182181
class ConfigResponse(IcebergBaseModel):
183-
defaults: Optional[Properties] = Field(default_factory=dict)
184-
overrides: Optional[Properties] = Field(default_factory=dict)
182+
defaults: Properties | None = Field(default_factory=dict)
183+
overrides: Properties | None = Field(default_factory=dict)
185184

186185

187186
class ListNamespaceResponse(IcebergBaseModel):
@@ -295,7 +294,7 @@ def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
295294

296295
return AuthManagerFactory.create("legacyoauth2", auth_config)
297296

298-
def _check_valid_namespace_identifier(self, identifier: Union[str, Identifier]) -> Identifier:
297+
def _check_valid_namespace_identifier(self, identifier: str | Identifier) -> Identifier:
299298
"""Check if the identifier has at least one element."""
300299
identifier_tuple = Catalog.identifier_to_tuple(identifier)
301300
if len(identifier_tuple) < 1:
@@ -378,22 +377,22 @@ def _fetch_config(self) -> None:
378377
# Update URI based on overrides
379378
self.uri = config[URI]
380379

381-
def _identifier_to_validated_tuple(self, identifier: Union[str, Identifier]) -> Identifier:
380+
def _identifier_to_validated_tuple(self, identifier: str | Identifier) -> Identifier:
382381
identifier_tuple = self.identifier_to_tuple(identifier)
383382
if len(identifier_tuple) <= 1:
384383
raise NoSuchIdentifierError(f"Missing namespace or invalid identifier: {'.'.join(identifier_tuple)}")
385384
return identifier_tuple
386385

387386
def _split_identifier_for_path(
388-
self, identifier: Union[str, Identifier, TableIdentifier], kind: IdentifierKind = IdentifierKind.TABLE
387+
self, identifier: str | Identifier | TableIdentifier, kind: IdentifierKind = IdentifierKind.TABLE
389388
) -> Properties:
390389
if isinstance(identifier, TableIdentifier):
391390
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root), kind.value: identifier.name}
392391
identifier_tuple = self._identifier_to_validated_tuple(identifier)
393392

394393
return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), kind.value: identifier_tuple[-1]}
395394

396-
def _split_identifier_for_json(self, identifier: Union[str, Identifier]) -> Dict[str, Union[Identifier, str]]:
395+
def _split_identifier_for_json(self, identifier: str | Identifier) -> Dict[str, Identifier | str]:
397396
identifier_tuple = self._identifier_to_validated_tuple(identifier)
398397
return {"namespace": identifier_tuple[:-1], "name": identifier_tuple[-1]}
399398

@@ -497,9 +496,9 @@ def _config_headers(self, session: Session) -> None:
497496

498497
def _create_table(
499498
self,
500-
identifier: Union[str, Identifier],
499+
identifier: str | Identifier,
501500
schema: Union[Schema, "pa.Schema"],
502-
location: Optional[str] = None,
501+
location: str | None = None,
503502
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
504503
sort_order: SortOrder = UNSORTED_SORT_ORDER,
505504
properties: Properties = EMPTY_DICT,
@@ -539,9 +538,9 @@ def _create_table(
539538
@retry(**_RETRY_ARGS)
540539
def create_table(
541540
self,
542-
identifier: Union[str, Identifier],
541+
identifier: str | Identifier,
543542
schema: Union[Schema, "pa.Schema"],
544-
location: Optional[str] = None,
543+
location: str | None = None,
545544
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
546545
sort_order: SortOrder = UNSORTED_SORT_ORDER,
547546
properties: Properties = EMPTY_DICT,
@@ -560,9 +559,9 @@ def create_table(
560559
@retry(**_RETRY_ARGS)
561560
def create_table_transaction(
562561
self,
563-
identifier: Union[str, Identifier],
562+
identifier: str | Identifier,
564563
schema: Union[Schema, "pa.Schema"],
565-
location: Optional[str] = None,
564+
location: str | None = None,
566565
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
567566
sort_order: SortOrder = UNSORTED_SORT_ORDER,
568567
properties: Properties = EMPTY_DICT,
@@ -580,7 +579,7 @@ def create_table_transaction(
580579
return CreateTableTransaction(staged_table)
581580

582581
@retry(**_RETRY_ARGS)
583-
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
582+
def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
584583
"""Register a new table using existing metadata.
585584
586585
Args:
@@ -612,7 +611,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
612611
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
613612

614613
@retry(**_RETRY_ARGS)
615-
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
614+
def list_tables(self, namespace: str | Identifier) -> List[Identifier]:
616615
namespace_tuple = self._check_valid_namespace_identifier(namespace)
617616
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
618617
response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat))
@@ -623,7 +622,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
623622
return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers]
624623

625624
@retry(**_RETRY_ARGS)
626-
def load_table(self, identifier: Union[str, Identifier]) -> Table:
625+
def load_table(self, identifier: str | Identifier) -> Table:
627626
params = {}
628627
if mode := self.properties.get(SNAPSHOT_LOADING_MODE):
629628
if mode in {"all", "refs"}:
@@ -643,7 +642,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
643642
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
644643

645644
@retry(**_RETRY_ARGS)
646-
def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None:
645+
def drop_table(self, identifier: str | Identifier, purge_requested: bool = False) -> None:
647646
response = self._session.delete(
648647
self.url(Endpoints.drop_table, prefixed=True, **self._split_identifier_for_path(identifier)),
649648
params={"purgeRequested": purge_requested},
@@ -654,11 +653,11 @@ def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool =
654653
_handle_non_200_response(exc, {404: NoSuchTableError})
655654

656655
@retry(**_RETRY_ARGS)
657-
def purge_table(self, identifier: Union[str, Identifier]) -> None:
656+
def purge_table(self, identifier: str | Identifier) -> None:
658657
self.drop_table(identifier=identifier, purge_requested=True)
659658

660659
@retry(**_RETRY_ARGS)
661-
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
660+
def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table:
662661
payload = {
663662
"source": self._split_identifier_for_json(from_identifier),
664663
"destination": self._split_identifier_for_json(to_identifier),
@@ -683,7 +682,7 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm
683682
return table_request
684683

685684
@retry(**_RETRY_ARGS)
686-
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
685+
def list_views(self, namespace: str | Identifier) -> List[Identifier]:
687686
namespace_tuple = self._check_valid_namespace_identifier(namespace)
688687
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
689688
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat))
@@ -740,7 +739,7 @@ def commit_table(
740739
return CommitTableResponse.model_validate_json(response.text)
741740

742741
@retry(**_RETRY_ARGS)
743-
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
742+
def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None:
744743
namespace_tuple = self._check_valid_namespace_identifier(namespace)
745744
payload = {"namespace": namespace_tuple, "properties": properties}
746745
response = self._session.post(self.url(Endpoints.create_namespace), json=payload)
@@ -750,7 +749,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
750749
_handle_non_200_response(exc, {409: NamespaceAlreadyExistsError})
751750

752751
@retry(**_RETRY_ARGS)
753-
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
752+
def drop_namespace(self, namespace: str | Identifier) -> None:
754753
namespace_tuple = self._check_valid_namespace_identifier(namespace)
755754
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
756755
response = self._session.delete(self.url(Endpoints.drop_namespace, namespace=namespace))
@@ -760,7 +759,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
760759
_handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError})
761760

762761
@retry(**_RETRY_ARGS)
763-
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
762+
def list_namespaces(self, namespace: str | Identifier = ()) -> List[Identifier]:
764763
namespace_tuple = self.identifier_to_tuple(namespace)
765764
response = self._session.get(
766765
self.url(
@@ -777,7 +776,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi
777776
return ListNamespaceResponse.model_validate_json(response.text).namespaces
778777

779778
@retry(**_RETRY_ARGS)
780-
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
779+
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
781780
namespace_tuple = self._check_valid_namespace_identifier(namespace)
782781
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
783782
response = self._session.get(self.url(Endpoints.load_namespace_metadata, namespace=namespace))
@@ -790,7 +789,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper
790789

791790
@retry(**_RETRY_ARGS)
792791
def update_namespace_properties(
793-
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
792+
self, namespace: str | Identifier, removals: Set[str] | None = None, updates: Properties = EMPTY_DICT
794793
) -> PropertiesUpdateSummary:
795794
namespace_tuple = self._check_valid_namespace_identifier(namespace)
796795
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
@@ -808,7 +807,7 @@ def update_namespace_properties(
808807
)
809808

810809
@retry(**_RETRY_ARGS)
811-
def namespace_exists(self, namespace: Union[str, Identifier]) -> bool:
810+
def namespace_exists(self, namespace: str | Identifier) -> bool:
812811
namespace_tuple = self._check_valid_namespace_identifier(namespace)
813812
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
814813
response = self._session.head(self.url(Endpoints.namespace_exists, namespace=namespace))
@@ -826,7 +825,7 @@ def namespace_exists(self, namespace: Union[str, Identifier]) -> bool:
826825
return False
827826

828827
@retry(**_RETRY_ARGS)
829-
def table_exists(self, identifier: Union[str, Identifier]) -> bool:
828+
def table_exists(self, identifier: str | Identifier) -> bool:
830829
"""Check if a table exists.
831830
832831
Args:
@@ -852,7 +851,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
852851
return False
853852

854853
@retry(**_RETRY_ARGS)
855-
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
854+
def view_exists(self, identifier: str | Identifier) -> bool:
856855
"""Check if a view exists.
857856
858857
Args:
@@ -877,7 +876,7 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool:
877876
return False
878877

879878
@retry(**_RETRY_ARGS)
880-
def drop_view(self, identifier: Union[str]) -> None:
879+
def drop_view(self, identifier: str) -> None:
881880
response = self._session.delete(
882881
self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)),
883882
)

tests/catalog/test_rest.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import base64
1919
import os
2020
from typing import Any, Callable, Dict, cast
21-
from copy import deepcopy
2221
from unittest import mock
2322

2423
import pytest
@@ -40,7 +39,7 @@
4039
ServerError,
4140
TableAlreadyExistsError,
4241
)
43-
from pyiceberg.io import load_file_io
42+
from pyiceberg.io import AWS_ACCESS_KEY_ID, load_file_io
4443
from pyiceberg.partitioning import PartitionField, PartitionSpec
4544
from pyiceberg.schema import Schema
4645
from pyiceberg.table import Table
@@ -859,41 +858,51 @@ def test_load_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_
859858
assert actual == expected
860859

861860

862-
def test_load_table_prefers_storage_credentials_over_config(
863-
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any], monkeypatch: pytest.MonkeyPatch
864-
) -> None:
865-
table_resp = deepcopy(example_table_metadata_with_snapshot_v1_rest_json)
866-
table_resp["config"] = {"some.key": "from-config", "only.config": "only-config"}
867-
table_resp["storage-credentials"] = {"some.key": "from-cred", "only.creds": "only-creds"}
861+
def test_storage_credentials_over_config(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
862+
response = {
863+
"metadata-location": example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
864+
"metadata": example_table_metadata_with_snapshot_v1_rest_json["metadata"],
865+
"config": {
866+
AWS_ACCESS_KEY_ID: "from_config",
867+
},
868+
"storage-credentials": {
869+
AWS_ACCESS_KEY_ID: "from_storage_credentials",
870+
},
871+
}
868872

869-
table_resp["metadata"].setdefault("properties", {})["meta.key"] = "from-metadata"
873+
rest_mock.get(
874+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
875+
json=response,
876+
status_code=200,
877+
request_headers=TEST_HEADERS,
878+
)
879+
880+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
881+
table = catalog.load_table(("fokko", "table"))
870882

871-
captured: Dict[str, Any] = {}
883+
assert table.io.properties[AWS_ACCESS_KEY_ID] == "from_storage_credentials"
872884

873-
def _capture_io(_self: Any, properties: Dict[str, Any], location: Any) -> Any: # type: ignore
874-
captured["properties"] = dict(properties)
875-
captured["location"] = location
876-
class _DummyIO:
877-
pass
878-
return _DummyIO()
879885

880-
monkeypatch.setattr(RestCatalog, "_load_file_io", _capture_io, raising=True)
886+
def test_config_when_no_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
887+
response = {
888+
"metadata-location": example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
889+
"metadata": example_table_metadata_with_snapshot_v1_rest_json["metadata"],
890+
"config": {
891+
AWS_ACCESS_KEY_ID: "from_config",
892+
},
893+
}
881894

882895
rest_mock.get(
883896
f"{TEST_URI}v1/namespaces/fokko/tables/table",
884-
json=table_resp,
897+
json=response,
885898
status_code=200,
886899
request_headers=TEST_HEADERS,
887900
)
888901

889902
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
890-
_ = catalog.load_table(("fokko", "table"))
891-
892-
props = cast(Dict[str, Any], captured["properties"])
893-
assert props["meta.key"] == "from-metadata"
894-
assert props["some.key"] == "from-cred"
895-
assert props["only.creds"] == "only-creds"
903+
table = catalog.load_table(("fokko", "table"))
896904

905+
assert table.io.properties[AWS_ACCESS_KEY_ID] == "from_config"
897906

898907
def test_load_table_200_loading_mode(
899908
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]

0 commit comments

Comments
 (0)