Skip to content

Commit 65fe2b8

Browse files
committed
feat: support pagination in list_* methods in rest catalog
1 parent 131dd15 commit 65fe2b8

2 files changed

Lines changed: 156 additions & 27 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 83 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
TYPE_CHECKING,
2020
Any,
2121
Dict,
22+
Iterator,
2223
List,
2324
Optional,
2425
Set,
@@ -184,6 +185,7 @@ class ConfigResponse(IcebergBaseModel):
184185

185186
class ListNamespaceResponse(IcebergBaseModel):
186187
namespaces: List[Identifier] = Field()
188+
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)
187189

188190

189191
class NamespaceResponse(IcebergBaseModel):
@@ -209,10 +211,12 @@ class ListViewResponseEntry(IcebergBaseModel):
209211

210212
class ListTablesResponse(IcebergBaseModel):
211213
identifiers: List[ListTableResponseEntry] = Field()
214+
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)
212215

213216

214217
class ListViewsResponse(IcebergBaseModel):
215218
identifiers: List[ListViewResponseEntry] = Field()
219+
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)
216220

217221

218222
class RestCatalog(Catalog):
@@ -583,16 +587,33 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
583587
table_response = TableResponse.model_validate_json(response.text)
584588
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
585589

586-
@retry(**_RETRY_ARGS)
587590
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
591+
return list(self.list_tables_lazy(namespace))
592+
593+
@retry(**_RETRY_ARGS)
594+
def list_tables_lazy(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
588595
namespace_tuple = self._check_valid_namespace_identifier(namespace)
589596
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
590-
response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat))
591-
try:
592-
response.raise_for_status()
593-
except HTTPError as exc:
594-
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
595-
return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers]
597+
598+
next_page_token: Optional[str] = None
599+
600+
while True:
601+
params: Dict[str, Any] = {}
602+
if next_page_token is not None:
603+
params["pageToken"] = next_page_token
604+
605+
response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat), params=params)
606+
try:
607+
response.raise_for_status()
608+
except HTTPError as exc:
609+
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
610+
parsed = ListTablesResponse.model_validate_json(response.text)
611+
for table in parsed.identifiers:
612+
yield (*table.namespace, table.name)
613+
614+
next_page_token = parsed.next_page_token
615+
if next_page_token is None:
616+
break
596617

597618
@retry(**_RETRY_ARGS)
598619
def load_table(self, identifier: Union[str, Identifier]) -> Table:
@@ -654,16 +675,34 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm
654675
)
655676
return table_request
656677

657-
@retry(**_RETRY_ARGS)
658678
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
679+
return list(self.list_views_lazy(namespace))
680+
681+
@retry(**_RETRY_ARGS)
682+
def list_views_lazy(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
659683
namespace_tuple = self._check_valid_namespace_identifier(namespace)
660684
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
661-
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat))
662-
try:
663-
response.raise_for_status()
664-
except HTTPError as exc:
665-
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
666-
return [(*view.namespace, view.name) for view in ListViewsResponse.model_validate_json(response.text).identifiers]
685+
686+
next_page_token: Optional[str] = None
687+
688+
while True:
689+
params: Dict[str, Any] = {}
690+
if next_page_token is not None:
691+
params["pageToken"] = next_page_token
692+
693+
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat), params=params)
694+
try:
695+
response.raise_for_status()
696+
except HTTPError as exc:
697+
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
698+
699+
parsed = ListViewsResponse.model_validate_json(response.text)
700+
for view in parsed.identifiers:
701+
yield (*view.namespace, view.name)
702+
703+
next_page_token = parsed.next_page_token
704+
if next_page_token is None:
705+
break
667706

668707
@retry(**_RETRY_ARGS)
669708
def commit_table(
@@ -731,22 +770,39 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
731770
except HTTPError as exc:
732771
_handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError})
733772

734-
@retry(**_RETRY_ARGS)
735773
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
774+
return list(self.list_namespaces_lazy(namespace))
775+
776+
@retry(**_RETRY_ARGS)
777+
def list_namespaces_lazy(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]:
736778
namespace_tuple = self.identifier_to_tuple(namespace)
737-
response = self._session.get(
738-
self.url(
739-
f"{Endpoints.list_namespaces}?parent={NAMESPACE_SEPARATOR.join(namespace_tuple)}"
740-
if namespace_tuple
741-
else Endpoints.list_namespaces
742-
),
743-
)
744-
try:
745-
response.raise_for_status()
746-
except HTTPError as exc:
747-
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
748779

749-
return ListNamespaceResponse.model_validate_json(response.text).namespaces
780+
next_page_token: Optional[str] = None
781+
782+
while True:
783+
params: Dict[str, Any] = {}
784+
if next_page_token is not None:
785+
params["pageToken"] = next_page_token
786+
787+
response = self._session.get(
788+
self.url(
789+
f"{Endpoints.list_namespaces}?parent={NAMESPACE_SEPARATOR.join(namespace_tuple)}"
790+
if namespace_tuple
791+
else Endpoints.list_namespaces
792+
),
793+
params=params,
794+
)
795+
try:
796+
response.raise_for_status()
797+
except HTTPError as exc:
798+
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
799+
800+
parsed = ListNamespaceResponse.model_validate_json(response.text)
801+
yield from parsed.namespaces
802+
803+
next_page_token = parsed.next_page_token
804+
if next_page_token is None:
805+
break
750806

751807
@retry(**_RETRY_ARGS)
752808
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:

tests/catalog/test_rest.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,28 @@ def test_list_tables_200(rest_mock: Mocker) -> None:
412412
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")]
413413

414414

415+
def test_list_tables_paginated_200(rest_mock: Mocker) -> None:
416+
namespace = "examples"
417+
rest_mock.get(
418+
f"{TEST_URI}v1/namespaces/{namespace}/tables",
419+
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"},
420+
status_code=200,
421+
request_headers=TEST_HEADERS,
422+
)
423+
rest_mock.get(
424+
f"{TEST_URI}v1/namespaces/{namespace}/tables?pageToken=page2",
425+
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]},
426+
status_code=200,
427+
request_headers=TEST_HEADERS,
428+
)
429+
430+
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [
431+
("examples", "fooshare"),
432+
("examples", "fooshare2"),
433+
]
434+
assert rest_mock.call_count == 3
435+
436+
415437
def test_list_tables_200_sigv4(rest_mock: Mocker) -> None:
416438
namespace = "examples"
417439
rest_mock.get(
@@ -458,6 +480,30 @@ def test_list_views_200(rest_mock: Mocker) -> None:
458480
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")]
459481

460482

483+
def test_list_views_paginated_200(rest_mock: Mocker) -> None:
484+
namespace = "examples"
485+
rest_mock.get(
486+
f"{TEST_URI}v1/namespaces/{namespace}/views",
487+
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"},
488+
status_code=200,
489+
request_headers=TEST_HEADERS,
490+
)
491+
492+
rest_mock.get(
493+
f"{TEST_URI}v1/namespaces/{namespace}/views?pageToken=page2",
494+
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]},
495+
status_code=200,
496+
request_headers=TEST_HEADERS,
497+
)
498+
499+
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [
500+
("examples", "fooshare"),
501+
("examples", "fooshare2"),
502+
]
503+
504+
assert rest_mock.call_count == 3
505+
506+
461507
def test_list_views_200_sigv4(rest_mock: Mocker) -> None:
462508
namespace = "examples"
463509
rest_mock.get(
@@ -543,6 +589,33 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None:
543589
]
544590

545591

592+
def test_list_namespaces_paginated_200(rest_mock: Mocker) -> None:
593+
rest_mock.get(
594+
f"{TEST_URI}v1/namespaces",
595+
json={"namespaces": [["default"], ["examples"], ["fokko"], ["system"]], "next-page-token": "page2"},
596+
status_code=200,
597+
request_headers=TEST_HEADERS,
598+
)
599+
rest_mock.get(
600+
f"{TEST_URI}v1/namespaces?pageToken=page2",
601+
json={"namespaces": [["default2"], ["examples2"], ["fokko2"], ["system2"]]},
602+
status_code=200,
603+
request_headers=TEST_HEADERS,
604+
)
605+
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [
606+
("default",),
607+
("examples",),
608+
("fokko",),
609+
("system",),
610+
("default2",),
611+
("examples2",),
612+
("fokko2",),
613+
("system2",),
614+
]
615+
616+
assert rest_mock.call_count == 3
617+
618+
546619
def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None:
547620
rest_mock.get(
548621
f"{TEST_URI}v1/namespaces?parent=accounting",

0 commit comments

Comments
 (0)