Skip to content

Commit f114f3d

Browse files
committed
Extract common code for HTTP from JSON-RPC and REST
1 parent 690bb5d commit f114f3d

8 files changed

Lines changed: 156 additions & 129 deletions

File tree

src/a2a/client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from a2a.client.client_factory import ClientFactory, minimal_agent_card
1414
from a2a.client.errors import (
1515
A2AClientError,
16+
A2AClientTimeoutError,
1617
AgentCardResolutionError,
1718
)
1819
from a2a.client.helpers import create_text_message_object
@@ -25,6 +26,7 @@
2526
__all__ = [
2627
'A2ACardResolver',
2728
'A2AClientError',
29+
'A2AClientTimeoutError',
2830
'AgentCardResolutionError',
2931
'AuthInterceptor',
3032
'BaseClient',

src/a2a/client/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ class AgentCardResolutionError(A2AClientError):
1313
def __init__(self, message: str, status_code: int | None = None) -> None:
1414
super().__init__(message)
1515
self.status_code = status_code
16+
17+
18+
class A2AClientTimeoutError(A2AClientError):
19+
"""Exception for timeout errors during a request."""

src/a2a/client/transports/grpc.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from functools import wraps
55
from typing import Any, NoReturn
66

7-
from a2a.client.errors import A2AClientError
7+
from a2a.client.errors import A2AClientError, A2AClientTimeoutError
88
from a2a.utils.errors import JSON_RPC_ERROR_CODE_MAP
99

1010

@@ -53,6 +53,9 @@
5353

5454

5555
def _map_grpc_error(e: grpc.aio.AioRpcError) -> NoReturn:
56+
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
57+
raise A2AClientTimeoutError('Client Request timed out') from e
58+
5659
details = e.details()
5760
if isinstance(details, str) and ': ' in details:
5861
error_type_name, error_message = details.split(': ', 1)
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import json
2+
3+
from collections.abc import AsyncGenerator, Callable, Iterator
4+
from contextlib import contextmanager
5+
from typing import Any, NoReturn
6+
7+
import httpx
8+
9+
from httpx_sse import SSEError, aconnect_sse
10+
11+
from a2a.client.errors import A2AClientError, A2AClientTimeoutError
12+
13+
14+
@contextmanager
15+
def handle_http_exceptions(
16+
status_error_handler: Callable[[httpx.HTTPStatusError], NoReturn]
17+
| None = None,
18+
) -> Iterator[None]:
19+
"""Handles common HTTP exceptions for REST and JSON-RPC transports.
20+
21+
Args:
22+
status_error_handler: Optional handler for `httpx.HTTPStatusError`.
23+
If provided, this handler should raise an appropriate domain-specific exception.
24+
If not provided, a default `A2AClientError` will be raised.
25+
"""
26+
try:
27+
yield
28+
except httpx.TimeoutException as e:
29+
raise A2AClientTimeoutError('Client Request timed out') from e
30+
except httpx.HTTPStatusError as e:
31+
if status_error_handler:
32+
status_error_handler(e)
33+
raise A2AClientError(f'HTTP Error {e.response.status_code}: {e}') from e
34+
except SSEError as e:
35+
raise A2AClientError(
36+
f'Invalid SSE response or protocol error: {e}'
37+
) from e
38+
except httpx.RequestError as e:
39+
raise A2AClientError(f'Network communication error: {e}') from e
40+
except json.JSONDecodeError as e:
41+
raise A2AClientError(f'JSON Decode Error: {e}') from e
42+
43+
44+
async def send_http_request(
45+
httpx_client: httpx.AsyncClient,
46+
request: httpx.Request,
47+
status_error_handler: Callable[[httpx.HTTPStatusError], NoReturn]
48+
| None = None,
49+
) -> dict[str, Any]:
50+
"""Sends an HTTP request and parses the JSON response, handling common exceptions."""
51+
with handle_http_exceptions(status_error_handler):
52+
response = await httpx_client.send(request)
53+
response.raise_for_status()
54+
return response.json()
55+
56+
57+
async def send_http_stream_request(
58+
httpx_client: httpx.AsyncClient,
59+
method: str,
60+
url: str,
61+
status_error_handler: Callable[[httpx.HTTPStatusError], NoReturn]
62+
| None = None,
63+
**kwargs: Any,
64+
) -> AsyncGenerator[str]:
65+
"""Sends a streaming HTTP request, yielding SSE data strings and handling exceptions."""
66+
with handle_http_exceptions(status_error_handler):
67+
async with aconnect_sse(
68+
httpx_client, method, url, **kwargs
69+
) as event_source:
70+
event_source.response.raise_for_status()
71+
async for sse in event_source.aiter_sse():
72+
if not sse.data:
73+
continue
74+
yield sse.data

src/a2a/client/transports/jsonrpc.py

Lines changed: 23 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import logging
32

43
from collections.abc import AsyncGenerator, Callable
@@ -8,12 +7,15 @@
87
import httpx
98

109
from google.protobuf import json_format
11-
from httpx_sse import SSEError, aconnect_sse
1210
from jsonrpc.jsonrpc2 import JSONRPC20Request, JSONRPC20Response
1311

1412
from a2a.client.errors import A2AClientError
1513
from a2a.client.middleware import ClientCallContext, ClientCallInterceptor
1614
from a2a.client.transports.base import ClientTransport
15+
from a2a.client.transports.http_helpers import (
16+
send_http_request,
17+
send_http_stream_request,
18+
)
1719
from a2a.extensions.common import update_extension_header
1820
from a2a.types.a2a_pb2 import (
1921
AgentCard,
@@ -470,22 +472,10 @@ async def _send_request(
470472
rpc_request_payload: dict[str, Any],
471473
http_kwargs: dict[str, Any] | None = None,
472474
) -> dict[str, Any]:
473-
try:
474-
response = await self.httpx_client.post(
475-
self.url, json=rpc_request_payload, **(http_kwargs or {})
476-
)
477-
response.raise_for_status()
478-
return response.json()
479-
except httpx.TimeoutException as e:
480-
raise A2AClientError('Client Request timed out') from e
481-
except httpx.HTTPStatusError as e:
482-
raise A2AClientError(
483-
f'HTTP Error {e.response.status_code}: {e}'
484-
) from e
485-
except json.JSONDecodeError as e:
486-
raise A2AClientError(str(e)) from e
487-
except httpx.RequestError as e:
488-
raise A2AClientError(f'Network communication error: {e}') from e
475+
request = self.httpx_client.build_request(
476+
'POST', self.url, json=rpc_request_payload, **(http_kwargs or {})
477+
)
478+
return await send_http_request(self.httpx_client, request)
489479

490480
async def _send_stream_request(
491481
self,
@@ -500,39 +490,18 @@ async def _send_stream_request(
500490
headers.update(final_kwargs.get('headers', {}))
501491
final_kwargs['headers'] = headers
502492

503-
try:
504-
async with aconnect_sse(
505-
self.httpx_client,
506-
'POST',
507-
self.url,
508-
json=rpc_request_payload,
509-
**final_kwargs,
510-
) as event_source:
511-
try:
512-
event_source.response.raise_for_status()
513-
async for sse in event_source.aiter_sse():
514-
if not sse.data:
515-
continue
516-
json_rpc_response = JSONRPC20Response.from_json(
517-
sse.data
518-
)
519-
if json_rpc_response.error:
520-
self._handle_jsonrpc_error(json_rpc_response.error)
521-
response: StreamResponse = json_format.ParseDict(
522-
json_rpc_response.result, StreamResponse()
523-
)
524-
yield response
525-
except httpx.HTTPStatusError as e:
526-
raise A2AClientError(
527-
f'HTTP Error {e.response.status_code}: {e}'
528-
) from e
529-
except SSEError as e:
530-
raise A2AClientError(
531-
f'Invalid SSE response or protocol error: {e}'
532-
) from e
533-
except httpx.TimeoutException as e:
534-
raise A2AClientError('Client Request timed out') from e
535-
except httpx.RequestError as e:
536-
raise A2AClientError(f'Network communication error: {e}') from e
537-
except json.JSONDecodeError as e:
538-
raise A2AClientError(str(e)) from e
493+
async for sse_data in send_http_stream_request(
494+
self.httpx_client,
495+
'POST',
496+
self.url,
497+
None,
498+
json=rpc_request_payload,
499+
**final_kwargs,
500+
):
501+
json_rpc_response = JSONRPC20Response.from_json(sse_data)
502+
if json_rpc_response.error:
503+
self._handle_jsonrpc_error(json_rpc_response.error)
504+
response: StreamResponse = json_format.ParseDict(
505+
json_rpc_response.result, StreamResponse()
506+
)
507+
yield response

src/a2a/client/transports/rest.py

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88

99
from google.protobuf.json_format import MessageToDict, Parse, ParseDict
1010
from google.protobuf.message import Message
11-
from httpx_sse import SSEError, aconnect_sse
1211

1312
from a2a.client.errors import A2AClientError
1413
from a2a.client.middleware import ClientCallContext, ClientCallInterceptor
1514
from a2a.client.transports.base import ClientTransport
15+
from a2a.client.transports.http_helpers import (
16+
send_http_request,
17+
send_http_stream_request,
18+
)
1619
from a2a.extensions.common import update_extension_header
1720
from a2a.types.a2a_pb2 import (
1821
AgentCard,
@@ -427,48 +430,20 @@ async def _send_stream_request(
427430
final_kwargs.update(kwargs)
428431
final_kwargs.setdefault('timeout', None)
429432

430-
try:
431-
async with aconnect_sse(
432-
self.httpx_client,
433-
method,
434-
f'{self.url}{target}',
435-
**final_kwargs,
436-
) as event_source:
437-
try:
438-
event_source.response.raise_for_status()
439-
async for sse in event_source.aiter_sse():
440-
if not sse.data:
441-
continue
442-
event: StreamResponse = Parse(
443-
sse.data, StreamResponse()
444-
)
445-
yield event
446-
except httpx.HTTPStatusError as e:
447-
self._handle_http_error(e)
448-
except SSEError as e:
449-
raise A2AClientError(
450-
f'Invalid SSE response or protocol error: {e}'
451-
) from e
452-
except httpx.TimeoutException as e:
453-
raise A2AClientError('Client Request timed out') from e
454-
except httpx.RequestError as e:
455-
raise A2AClientError(f'Network communication error: {e}') from e
456-
except json.JSONDecodeError as e:
457-
raise A2AClientError(f'JSON Decode Error: {e}') from e
433+
async for sse_data in send_http_stream_request(
434+
self.httpx_client,
435+
method,
436+
f'{self.url}{target}',
437+
self._handle_http_error,
438+
**final_kwargs,
439+
):
440+
event: StreamResponse = Parse(sse_data, StreamResponse())
441+
yield event
458442

459443
async def _send_request(self, request: httpx.Request) -> dict[str, Any]:
460-
try:
461-
response = await self.httpx_client.send(request)
462-
response.raise_for_status()
463-
return response.json()
464-
except httpx.TimeoutException as e:
465-
raise A2AClientError('Client Request timed out') from e
466-
except httpx.HTTPStatusError as e:
467-
self._handle_http_error(e)
468-
except json.JSONDecodeError as e:
469-
raise A2AClientError(f'JSON Decode Error: {e}') from e
470-
except httpx.RequestError as e:
471-
raise A2AClientError(f'Network communication error: {e}') from e
444+
return await send_http_request(
445+
self.httpx_client, request, self._handle_http_error
446+
)
472447

473448
async def _send_post_request(
474449
self,

0 commit comments

Comments
 (0)