-
Notifications
You must be signed in to change notification settings - Fork 429
Expand file tree
/
Copy pathresponse_helpers.py
More file actions
172 lines (145 loc) · 5.53 KB
/
response_helpers.py
File metadata and controls
172 lines (145 loc) · 5.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""Helper functions for building A2A JSON-RPC responses."""
from typing import Any
from google.protobuf.json_format import MessageToDict
from google.protobuf.message import Message as ProtoMessage
from jsonrpc.jsonrpc2 import JSONRPC20Response
from a2a.compat.v0_3.conversions import to_compat_agent_card
from a2a.server.jsonrpc_models import (
InternalError as JSONRPCInternalError,
)
from a2a.server.jsonrpc_models import (
JSONRPCError,
)
from a2a.types.a2a_pb2 import (
AgentCard,
ListTasksResponse,
Message,
StreamResponse,
Task,
TaskArtifactUpdateEvent,
TaskPushNotificationConfig,
TaskStatusUpdateEvent,
)
from a2a.types.a2a_pb2 import (
SendMessageResponse as SendMessageResponseProto,
)
from a2a.utils.errors import (
JSON_RPC_ERROR_CODE_MAP,
A2AError,
AuthenticatedExtendedCardNotConfiguredError,
ContentTypeNotSupportedError,
InternalError,
InvalidAgentResponseError,
InvalidParamsError,
InvalidRequestError,
MethodNotFoundError,
PushNotificationNotSupportedError,
TaskNotCancelableError,
TaskNotFoundError,
UnsupportedOperationError,
)
EXCEPTION_MAP: dict[type[A2AError], type[JSONRPCError]] = {
TaskNotFoundError: JSONRPCError,
TaskNotCancelableError: JSONRPCError,
PushNotificationNotSupportedError: JSONRPCError,
UnsupportedOperationError: JSONRPCError,
ContentTypeNotSupportedError: JSONRPCError,
InvalidAgentResponseError: JSONRPCError,
AuthenticatedExtendedCardNotConfiguredError: JSONRPCError,
InvalidParamsError: JSONRPCError,
InvalidRequestError: JSONRPCError,
MethodNotFoundError: JSONRPCError,
InternalError: JSONRPCInternalError,
}
# Tuple of all A2AError types for isinstance checks
_A2A_ERROR_TYPES: tuple[type, ...] = (A2AError,)
# Result types for handler responses
EventTypes = (
Task
| Message
| TaskArtifactUpdateEvent
| TaskStatusUpdateEvent
| TaskPushNotificationConfig
| StreamResponse
| SendMessageResponseProto
| A2AError
| JSONRPCError
| list[TaskPushNotificationConfig]
| ListTasksResponse
)
"""Type alias for possible event types produced by handlers."""
def agent_card_to_dict(card: AgentCard) -> dict[str, Any]:
"""Convert AgentCard to dict and inject backward compatibility fields."""
result = MessageToDict(card)
compat_card = to_compat_agent_card(card)
compat_dict = compat_card.model_dump(exclude_none=True)
# Do not include supportsAuthenticatedExtendedCard if false
if not compat_dict.get('supportsAuthenticatedExtendedCard'):
compat_dict.pop('supportsAuthenticatedExtendedCard', None)
def merge(dict1: dict[str, Any], dict2: dict[str, Any]) -> dict[str, Any]:
for k, v in dict2.items():
if k not in dict1:
dict1[k] = v
elif isinstance(v, dict) and isinstance(dict1[k], dict):
merge(dict1[k], v)
elif isinstance(v, list) and isinstance(dict1[k], list):
for i in range(min(len(dict1[k]), len(v))):
if isinstance(dict1[k][i], dict) and isinstance(v[i], dict):
merge(dict1[k][i], v[i])
return dict1
return merge(result, compat_dict)
def build_error_response(
request_id: str | int | None,
error: A2AError | JSONRPCError,
) -> dict[str, Any]:
"""Build a JSON-RPC error response dict.
Args:
request_id: The ID of the request that caused the error.
error: The A2AError or JSONRPCError object.
Returns:
A dict representing the JSON-RPC error response.
"""
jsonrpc_error: JSONRPCError
if isinstance(error, JSONRPCError):
jsonrpc_error = error
elif isinstance(error, A2AError):
error_type = type(error)
model_class = EXCEPTION_MAP.get(error_type, JSONRPCInternalError)
code = JSON_RPC_ERROR_CODE_MAP.get(error_type, -32603)
jsonrpc_error = model_class(
code=code,
message=str(error),
)
else:
jsonrpc_error = JSONRPCInternalError(message=str(error))
error_dict = jsonrpc_error.model_dump(exclude_none=True)
return JSONRPC20Response(error=error_dict, _id=request_id).data
def prepare_response_object(
request_id: str | int | None,
response: EventTypes,
success_response_types: tuple[type, ...],
) -> dict[str, Any]:
"""Build a JSON-RPC response dict from handler output.
Based on the type of the `response` object received from the handler,
it constructs either a success response or an error response.
Args:
request_id: The ID of the request.
response: The object received from the request handler.
success_response_types: A tuple of expected types for a successful result.
Returns:
A dict representing the JSON-RPC response (success or error).
"""
if isinstance(response, success_response_types):
# Convert proto message to dict for JSON serialization
result: Any = response
if isinstance(response, ProtoMessage):
result = MessageToDict(response, preserving_proto_field_name=False)
return JSONRPC20Response(result=result, _id=request_id).data
if isinstance(response, A2AError | JSONRPCError):
return build_error_response(request_id, response)
# If response is not an expected success type and not an error,
# it's an invalid type of response from the agent for this method.
error = InvalidAgentResponseError(
message='Agent returned invalid type response for this method'
)
return build_error_response(request_id, error)