Skip to content

Commit 6462801

Browse files
committed
Address PR review feedback: rename methods, update types, clean up aliases
- Rename on_resubscribe_to_task to on_subscribe_to_task across all handlers - Update METHOD_TO_MODEL with gRPC-style method names (SendMessage, GetTask, etc.) - Update JSON-RPC client to use new method names - Fix ListTaskPushNotificationConfigResponse to use 'configs' field - Remove TaskResubscriptionRequest alias from extras.py - Update TransportProtocol imports to use a2a.utils.constants - Fix on_get_task_push_notification_config params type - Update all tests for new method names and response types
1 parent 7405dc7 commit 6462801

20 files changed

Lines changed: 140 additions & 162 deletions

src/a2a/client/transports/jsonrpc.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ async def send_message(
102102
) -> SendMessageResponse:
103103
"""Sends a non-streaming message request to the agent."""
104104
rpc_request = JSONRPC20Request(
105-
method='message/send',
105+
method='SendMessage',
106106
params=json_format.MessageToDict(request),
107107
_id=str(uuid4()),
108108
)
@@ -111,7 +111,7 @@ async def send_message(
111111
extensions if extensions is not None else self.extensions,
112112
)
113113
payload, modified_kwargs = await self._apply_interceptors(
114-
'message/send',
114+
'SendMessage',
115115
cast('dict[str, Any]', rpc_request.data),
116116
modified_kwargs,
117117
context,
@@ -134,7 +134,7 @@ async def send_message_streaming(
134134
) -> AsyncGenerator[StreamResponse]:
135135
"""Sends a streaming message request to the agent and yields responses as they arrive."""
136136
rpc_request = JSONRPC20Request(
137-
method='message/stream',
137+
method='SendStreamingMessage',
138138
params=json_format.MessageToDict(request),
139139
_id=str(uuid4()),
140140
)
@@ -143,7 +143,7 @@ async def send_message_streaming(
143143
extensions if extensions is not None else self.extensions,
144144
)
145145
payload, modified_kwargs = await self._apply_interceptors(
146-
'message/stream',
146+
'SendStreamingMessage',
147147
cast('dict[str, Any]', rpc_request.data),
148148
modified_kwargs,
149149
context,
@@ -213,7 +213,7 @@ async def get_task(
213213
) -> Task:
214214
"""Retrieves the current state and history of a specific task."""
215215
rpc_request = JSONRPC20Request(
216-
method='tasks/get',
216+
method='GetTask',
217217
params=json_format.MessageToDict(request),
218218
_id=str(uuid4()),
219219
)
@@ -222,7 +222,7 @@ async def get_task(
222222
extensions if extensions is not None else self.extensions,
223223
)
224224
payload, modified_kwargs = await self._apply_interceptors(
225-
'tasks/get',
225+
'GetTask',
226226
cast('dict[str, Any]', rpc_request.data),
227227
modified_kwargs,
228228
context,
@@ -243,7 +243,7 @@ async def cancel_task(
243243
) -> Task:
244244
"""Requests the agent to cancel a specific task."""
245245
rpc_request = JSONRPC20Request(
246-
method='tasks/cancel',
246+
method='CancelTask',
247247
params=json_format.MessageToDict(request),
248248
_id=str(uuid4()),
249249
)
@@ -252,7 +252,7 @@ async def cancel_task(
252252
extensions if extensions is not None else self.extensions,
253253
)
254254
payload, modified_kwargs = await self._apply_interceptors(
255-
'tasks/cancel',
255+
'CancelTask',
256256
cast('dict[str, Any]', rpc_request.data),
257257
modified_kwargs,
258258
context,
@@ -273,7 +273,7 @@ async def set_task_callback(
273273
) -> TaskPushNotificationConfig:
274274
"""Sets or updates the push notification configuration for a specific task."""
275275
rpc_request = JSONRPC20Request(
276-
method='tasks/pushNotificationConfig/set',
276+
method='SetTaskPushNotificationConfig',
277277
params=json_format.MessageToDict(request),
278278
_id=str(uuid4()),
279279
)
@@ -282,7 +282,7 @@ async def set_task_callback(
282282
extensions if extensions is not None else self.extensions,
283283
)
284284
payload, modified_kwargs = await self._apply_interceptors(
285-
'tasks/pushNotificationConfig/set',
285+
'SetTaskPushNotificationConfig',
286286
cast('dict[str, Any]', rpc_request.data),
287287
modified_kwargs,
288288
context,
@@ -305,7 +305,7 @@ async def get_task_callback(
305305
) -> TaskPushNotificationConfig:
306306
"""Retrieves the push notification configuration for a specific task."""
307307
rpc_request = JSONRPC20Request(
308-
method='tasks/pushNotificationConfig/get',
308+
method='GetTaskPushNotificationConfig',
309309
params=json_format.MessageToDict(request),
310310
_id=str(uuid4()),
311311
)
@@ -314,7 +314,7 @@ async def get_task_callback(
314314
extensions if extensions is not None else self.extensions,
315315
)
316316
payload, modified_kwargs = await self._apply_interceptors(
317-
'tasks/pushNotificationConfig/get',
317+
'GetTaskPushNotificationConfig',
318318
cast('dict[str, Any]', rpc_request.data),
319319
modified_kwargs,
320320
context,
@@ -337,7 +337,7 @@ async def subscribe(
337337
) -> AsyncGenerator[StreamResponse]:
338338
"""Reconnects to get task updates."""
339339
rpc_request = JSONRPC20Request(
340-
method='tasks/resubscribe',
340+
method='SubscribeToTask',
341341
params=json_format.MessageToDict(request),
342342
_id=str(uuid4()),
343343
)
@@ -346,7 +346,7 @@ async def subscribe(
346346
extensions if extensions is not None else self.extensions,
347347
)
348348
payload, modified_kwargs = await self._apply_interceptors(
349-
'tasks/resubscribe',
349+
'SubscribeToTask',
350350
cast('dict[str, Any]', rpc_request.data),
351351
modified_kwargs,
352352
context,

src/a2a/server/apps/jsonrpc/jsonrpc_app.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
ListTaskPushNotificationConfigRequest,
3232
SendMessageRequest,
3333
SetTaskPushNotificationConfigRequest,
34+
SubscribeToTaskRequest,
3435
)
3536
from a2a.types.extras import (
3637
A2AError,
@@ -40,7 +41,6 @@
4041
InvalidRequestError,
4142
JSONParseError,
4243
MethodNotFoundError,
43-
TaskResubscriptionRequest,
4444
UnsupportedOperationError,
4545
)
4646
from a2a.utils.constants import (
@@ -154,17 +154,18 @@ class JSONRPCApplication(ABC):
154154

155155
# Method-to-model mapping for centralized routing
156156
# Proto types don't have model_fields, so we define the mapping explicitly
157+
# Method names match gRPC service method names
157158
METHOD_TO_MODEL: dict[str, type] = {
158-
'message/send': SendMessageRequest,
159-
'message/stream': SendMessageRequest, # Same proto type as message/send
160-
'tasks/get': GetTaskRequest,
161-
'tasks/cancel': CancelTaskRequest,
162-
'tasks/pushNotificationConfig/set': SetTaskPushNotificationConfigRequest,
163-
'tasks/pushNotificationConfig/get': GetTaskPushNotificationConfigRequest,
164-
'tasks/pushNotificationConfig/list': ListTaskPushNotificationConfigRequest,
165-
'tasks/pushNotificationConfig/delete': DeleteTaskPushNotificationConfigRequest,
166-
'tasks/resubscribe': TaskResubscriptionRequest,
167-
'agent/authenticatedExtendedCard': GetExtendedAgentCardRequest,
159+
'SendMessage': SendMessageRequest,
160+
'SendStreamingMessage': SendMessageRequest, # Same proto type as SendMessage
161+
'GetTask': GetTaskRequest,
162+
'CancelTask': CancelTaskRequest,
163+
'SetTaskPushNotificationConfig': SetTaskPushNotificationConfigRequest,
164+
'GetTaskPushNotificationConfig': GetTaskPushNotificationConfigRequest,
165+
'ListTaskPushNotificationConfig': ListTaskPushNotificationConfigRequest,
166+
'DeleteTaskPushNotificationConfig': DeleteTaskPushNotificationConfigRequest,
167+
'SubscribeToTask': SubscribeToTaskRequest,
168+
'GetExtendedAgentCard': GetExtendedAgentCardRequest,
168169
}
169170

170171
def __init__( # noqa: PLR0913
@@ -358,8 +359,7 @@ async def _handle_requests(self, request: Request) -> Response: # noqa: PLR0911
358359
call_context.state['request_id'] = request_id
359360

360361
# Route streaming requests by method name
361-
# (message/send and message/stream both use SendMessageRequest)
362-
if method in ('message/stream', 'tasks/resubscribe'):
362+
if method in ('SendStreamingMessage', 'SubscribeToTask'):
363363
return await self._process_streaming_request(
364364
request_id, specific_request, call_context
365365
)
@@ -396,7 +396,7 @@ async def _process_streaming_request(
396396
request_obj: A2ARequest,
397397
context: ServerCallContext,
398398
) -> Response:
399-
"""Processes streaming requests (message/stream or tasks/resubscribe).
399+
"""Processes streaming requests (SendStreamingMessage or SubscribeToTask).
400400
401401
Args:
402402
request_id: The ID of the request.
@@ -407,16 +407,16 @@ async def _process_streaming_request(
407407
An `EventSourceResponse` object to stream results to the client.
408408
"""
409409
handler_result: Any = None
410-
# Check for streaming message request (same type as send, but handled differently)
410+
# Check for streaming message request (same type as SendMessage, but handled differently)
411411
if isinstance(
412412
request_obj,
413413
SendMessageRequest,
414414
):
415415
handler_result = self.handler.on_message_send_stream(
416416
request_obj, context
417417
)
418-
elif isinstance(request_obj, TaskResubscriptionRequest):
419-
handler_result = self.handler.on_resubscribe_to_task(
418+
elif isinstance(request_obj, SubscribeToTaskRequest):
419+
handler_result = self.handler.on_subscribe_to_task(
420420
request_obj, context
421421
)
422422

src/a2a/server/apps/rest/rest_adapter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def routes(self) -> dict[tuple[str, str], Callable[[Request], Any]]:
215215
),
216216
('/v1/tasks/{id}:subscribe', 'GET'): functools.partial(
217217
self._handle_streaming_request,
218-
self.handler.on_resubscribe_to_task,
218+
self.handler.on_subscribe_to_task,
219219
),
220220
('/v1/tasks/{id}', 'GET'): functools.partial(
221221
self._handle_request, self.handler.on_get_task

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
GetTaskPushNotificationConfigRequest,
3434
GetTaskRequest,
3535
ListTaskPushNotificationConfigRequest,
36+
ListTaskPushNotificationConfigResponse,
3637
Message,
3738
SendMessageRequest,
3839
SetTaskPushNotificationConfigRequest,
@@ -501,7 +502,7 @@ async def on_set_task_push_notification_config(
501502

502503
async def on_get_task_push_notification_config(
503504
self,
504-
params: CancelTaskRequest | GetTaskPushNotificationConfigRequest,
505+
params: GetTaskPushNotificationConfigRequest,
505506
context: ServerCallContext | None = None,
506507
) -> TaskPushNotificationConfig:
507508
"""Default handler for 'tasks/pushNotificationConfig/get'.
@@ -531,12 +532,12 @@ async def on_get_task_push_notification_config(
531532
push_notification_config=push_notification_config[0],
532533
)
533534

534-
async def on_resubscribe_to_task(
535+
async def on_subscribe_to_task(
535536
self,
536537
params: SubscribeToTaskRequest,
537538
context: ServerCallContext | None = None,
538539
) -> AsyncGenerator[Event]:
539-
"""Default handler for 'tasks/resubscribe'.
540+
"""Default handler for 'SubscribeToTask'.
540541
541542
Allows a client to re-attach to a running streaming task's event stream.
542543
Requires the task and its queue to still be active.
@@ -575,8 +576,8 @@ async def on_list_task_push_notification_config(
575576
self,
576577
params: ListTaskPushNotificationConfigRequest,
577578
context: ServerCallContext | None = None,
578-
) -> list[TaskPushNotificationConfig]:
579-
"""Default handler for 'tasks/pushNotificationConfig/list'.
579+
) -> ListTaskPushNotificationConfigResponse:
580+
"""Default handler for 'ListTaskPushNotificationConfig'.
580581
581582
Requires a `PushConfigStore` to be configured.
582583
"""
@@ -592,13 +593,15 @@ async def on_list_task_push_notification_config(
592593
task_id
593594
)
594595

595-
return [
596-
TaskPushNotificationConfig(
597-
name=f'tasks/{task_id}/pushNotificationConfigs/{config.id}',
598-
push_notification_config=config,
599-
)
600-
for config in push_notification_config_list
601-
]
596+
return ListTaskPushNotificationConfigResponse(
597+
configs=[
598+
TaskPushNotificationConfig(
599+
name=f'tasks/{task_id}/pushNotificationConfigs/{config.id}',
600+
push_notification_config=config,
601+
)
602+
for config in push_notification_config_list
603+
]
604+
)
602605

603606
async def on_delete_task_push_notification_config(
604607
self,

src/a2a/server/request_handlers/grpc_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ async def SubscribeToTask(
224224
"""
225225
try:
226226
server_context = self.context_builder.build(context)
227-
async for event in self.request_handler.on_resubscribe_to_task(
227+
async for event in self.request_handler.on_subscribe_to_task(
228228
request,
229229
server_context,
230230
):

src/a2a/server/request_handlers/jsonrpc_handler.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
Message,
2222
SendMessageRequest,
2323
SetTaskPushNotificationConfigRequest,
24+
SubscribeToTaskRequest,
2425
Task,
2526
)
2627
from a2a.types.extras import (
2728
AuthenticatedExtendedCardNotConfiguredError,
2829
InternalError,
2930
TaskNotFoundError,
30-
TaskResubscriptionRequest,
3131
)
3232
from a2a.utils import proto_utils
3333
from a2a.utils.errors import ServerError
@@ -202,24 +202,24 @@ async def on_cancel_task(
202202

203203
return _build_error_response(request_id, TaskNotFoundError())
204204

205-
async def on_resubscribe_to_task(
205+
async def on_subscribe_to_task(
206206
self,
207-
request: TaskResubscriptionRequest,
207+
request: SubscribeToTaskRequest,
208208
context: ServerCallContext | None = None,
209209
) -> AsyncIterable[dict[str, Any]]:
210-
"""Handles the 'tasks/resubscribe' JSON-RPC method.
210+
"""Handles the 'SubscribeToTask' JSON-RPC method.
211211
212212
Yields response objects as they are produced by the underlying handler's stream.
213213
214214
Args:
215-
request: The incoming `TaskResubscriptionRequest` object.
215+
request: The incoming `SubscribeToTaskRequest` object.
216216
context: Context provided by the server.
217217
218218
Yields:
219219
Dict representations of JSON-RPC responses containing streaming events.
220220
"""
221221
try:
222-
async for event in self.request_handler.on_resubscribe_to_task(
222+
async for event in self.request_handler.on_subscribe_to_task(
223223
request, context
224224
):
225225
# Wrap the event in StreamResponse for consistent client parsing
@@ -338,7 +338,7 @@ async def list_push_notification_config(
338338
request: ListTaskPushNotificationConfigRequest,
339339
context: ServerCallContext | None = None,
340340
) -> dict[str, Any]:
341-
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.
341+
"""Handles the 'ListTaskPushNotificationConfig' JSON-RPC method.
342342
343343
Args:
344344
request: The incoming `ListTaskPushNotificationConfigRequest` object.
@@ -349,14 +349,11 @@ async def list_push_notification_config(
349349
"""
350350
request_id = self._get_request_id(context)
351351
try:
352-
configs = await self.request_handler.on_list_task_push_notification_config(
352+
response = await self.request_handler.on_list_task_push_notification_config(
353353
request, context
354354
)
355-
# configs is a list of TaskPushNotificationConfig protos
356-
result = [
357-
MessageToDict(c, preserving_proto_field_name=False)
358-
for c in configs
359-
]
355+
# response is a ListTaskPushNotificationConfigResponse proto
356+
result = MessageToDict(response, preserving_proto_field_name=False)
360357
return _build_success_response(request_id, result)
361358
except ServerError as e:
362359
return _build_error_response(

0 commit comments

Comments
 (0)