Skip to content

Commit 5c574cc

Browse files
committed
fix
1 parent ebaea97 commit 5c574cc

1 file changed

Lines changed: 122 additions & 65 deletions

File tree

src/a2a/server/routes/jsonrpc_dispatcher.py

Lines changed: 122 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
UnsupportedOperationError,
5757
)
5858
from a2a.utils.helpers import maybe_await, validate, validate_version
59+
from a2a.utils.telemetry import SpanKind, trace_class
5960

6061

6162
INTERNAL_ERROR_CODE = -32603
@@ -166,6 +167,7 @@ def build(self, request: Request) -> ServerCallContext:
166167
)
167168

168169

170+
@trace_class(kind=SpanKind.SERVER)
169171
class JsonRpcDispatcher:
170172
"""Base class for A2A JSONRPC applications.
171173
@@ -483,8 +485,117 @@ async def _wrap_stream(
483485

484486
return _wrap_stream(stream)
485487

488+
async def _handle_send_message(
489+
self, request_obj: SendMessageRequest, context: ServerCallContext
490+
) -> dict[str, Any]:
491+
task_or_message = await self.request_handler.on_message_send(
492+
request_obj, context
493+
)
494+
if isinstance(task_or_message, Task):
495+
return MessageToDict(SendMessageResponse(task=task_or_message))
496+
return MessageToDict(SendMessageResponse(message=task_or_message))
497+
498+
async def _handle_cancel_task(
499+
self, request_obj: CancelTaskRequest, context: ServerCallContext
500+
) -> dict[str, Any]:
501+
task = await self.request_handler.on_cancel_task(request_obj, context)
502+
if task:
503+
return MessageToDict(task, preserving_proto_field_name=False)
504+
raise TaskNotFoundError
505+
506+
async def _handle_get_task(
507+
self, request_obj: GetTaskRequest, context: ServerCallContext
508+
) -> dict[str, Any]:
509+
task = await self.request_handler.on_get_task(request_obj, context)
510+
if task:
511+
return MessageToDict(task, preserving_proto_field_name=False)
512+
raise TaskNotFoundError
513+
514+
async def _handle_list_tasks(
515+
self, request_obj: ListTasksRequest, context: ServerCallContext
516+
) -> dict[str, Any]:
517+
tasks_response = await self.request_handler.on_list_tasks(
518+
request_obj, context
519+
)
520+
return MessageToDict(
521+
tasks_response,
522+
preserving_proto_field_name=False,
523+
always_print_fields_with_no_presence=True,
524+
)
525+
526+
@validate(
527+
lambda self: self.agent_card.capabilities.push_notifications,
528+
'Push notifications are not supported by the agent',
529+
)
530+
async def _handle_create_task_push_notification_config(
531+
self,
532+
request_obj: TaskPushNotificationConfig,
533+
context: ServerCallContext,
534+
) -> dict[str, Any]:
535+
result_config = (
536+
await self.request_handler.on_create_task_push_notification_config(
537+
request_obj, context
538+
)
539+
)
540+
return MessageToDict(result_config, preserving_proto_field_name=False)
541+
542+
async def _handle_get_task_push_notification_config(
543+
self,
544+
request_obj: GetTaskPushNotificationConfigRequest,
545+
context: ServerCallContext,
546+
) -> dict[str, Any]:
547+
config = (
548+
await self.request_handler.on_get_task_push_notification_config(
549+
request_obj, context
550+
)
551+
)
552+
return MessageToDict(config, preserving_proto_field_name=False)
553+
554+
async def _handle_list_task_push_notification_configs(
555+
self,
556+
request_obj: ListTaskPushNotificationConfigsRequest,
557+
context: ServerCallContext,
558+
) -> dict[str, Any]:
559+
configs_response = (
560+
await self.request_handler.on_list_task_push_notification_configs(
561+
request_obj, context
562+
)
563+
)
564+
return MessageToDict(
565+
configs_response, preserving_proto_field_name=False
566+
)
567+
568+
async def _handle_delete_task_push_notification_config(
569+
self,
570+
request_obj: DeleteTaskPushNotificationConfigRequest,
571+
context: ServerCallContext,
572+
) -> None:
573+
await self.request_handler.on_delete_task_push_notification_config(
574+
request_obj, context
575+
)
576+
577+
async def _handle_get_extended_agent_card(
578+
self,
579+
request_obj: GetExtendedAgentCardRequest,
580+
context: ServerCallContext,
581+
) -> dict[str, Any]:
582+
if not self.agent_card.capabilities.extended_agent_card:
583+
raise ExtendedAgentCardNotConfiguredError(
584+
message='The agent does not have an extended agent card configured'
585+
)
586+
base_card = self.extended_agent_card or self.agent_card
587+
card_to_serve = base_card
588+
if self.extended_card_modifier and context:
589+
card_to_serve = await maybe_await(
590+
self.extended_card_modifier(base_card, context)
591+
)
592+
elif self.card_modifier:
593+
card_to_serve = await maybe_await(self.card_modifier(base_card))
594+
595+
return MessageToDict(card_to_serve, preserving_proto_field_name=False)
596+
486597
@validate_version(constants.PROTOCOL_VERSION_1_0)
487-
async def _process_non_streaming_request( # noqa: PLR0911, PLR0912
598+
async def _process_non_streaming_request( # noqa: PLR0911
488599
self,
489600
request_id: str | int | None,
490601
request_obj: A2ARequest,
@@ -502,86 +613,32 @@ async def _process_non_streaming_request( # noqa: PLR0911, PLR0912
502613
"""
503614
match request_obj:
504615
case SendMessageRequest():
505-
task_or_message = await self.request_handler.on_message_send(
506-
request_obj, context
507-
)
508-
if isinstance(task_or_message, Task):
509-
return MessageToDict(
510-
SendMessageResponse(task=task_or_message)
511-
)
512-
return MessageToDict(
513-
SendMessageResponse(message=task_or_message)
514-
)
616+
return await self._handle_send_message(request_obj, context)
515617
case CancelTaskRequest():
516-
task = await self.request_handler.on_cancel_task(
517-
request_obj, context
518-
)
519-
if task:
520-
return MessageToDict(
521-
task, preserving_proto_field_name=False
522-
)
523-
raise TaskNotFoundError
618+
return await self._handle_cancel_task(request_obj, context)
524619
case GetTaskRequest():
525-
task = await self.request_handler.on_get_task(
526-
request_obj, context
527-
)
528-
if task:
529-
return MessageToDict(
530-
task, preserving_proto_field_name=False
531-
)
532-
raise TaskNotFoundError
620+
return await self._handle_get_task(request_obj, context)
533621
case ListTasksRequest():
534-
tasks_response = await self.request_handler.on_list_tasks(
535-
request_obj, context
536-
)
537-
return MessageToDict(
538-
tasks_response,
539-
preserving_proto_field_name=False,
540-
always_print_fields_with_no_presence=True,
541-
)
622+
return await self._handle_list_tasks(request_obj, context)
542623
case TaskPushNotificationConfig():
543-
await self._require_push_notifications()
544-
result_config = await self.request_handler.on_create_task_push_notification_config(
624+
return await self._handle_create_task_push_notification_config(
545625
request_obj, context
546626
)
547-
return MessageToDict(
548-
result_config, preserving_proto_field_name=False
549-
)
550627
case GetTaskPushNotificationConfigRequest():
551-
config = await self.request_handler.on_get_task_push_notification_config(
628+
return await self._handle_get_task_push_notification_config(
552629
request_obj, context
553630
)
554-
return MessageToDict(config, preserving_proto_field_name=False)
555631
case ListTaskPushNotificationConfigsRequest():
556-
configs_response = await self.request_handler.on_list_task_push_notification_configs(
632+
return await self._handle_list_task_push_notification_configs(
557633
request_obj, context
558634
)
559-
return MessageToDict(
560-
configs_response, preserving_proto_field_name=False
561-
)
562635
case DeleteTaskPushNotificationConfigRequest():
563-
await self.request_handler.on_delete_task_push_notification_config(
636+
return await self._handle_delete_task_push_notification_config(
564637
request_obj, context
565638
)
566-
return None
567639
case GetExtendedAgentCardRequest():
568-
if not self.agent_card.capabilities.extended_agent_card:
569-
raise ExtendedAgentCardNotConfiguredError(
570-
message='The agent does not have an extended agent card configured'
571-
)
572-
base_card = self.extended_agent_card or self.agent_card
573-
card_to_serve = base_card
574-
if self.extended_card_modifier and context:
575-
card_to_serve = await maybe_await(
576-
self.extended_card_modifier(base_card, context)
577-
)
578-
elif self.card_modifier:
579-
card_to_serve = await maybe_await(
580-
self.card_modifier(base_card)
581-
)
582-
583-
return MessageToDict(
584-
card_to_serve, preserving_proto_field_name=False
640+
return await self._handle_get_extended_agent_card(
641+
request_obj, context
585642
)
586643
case _:
587644
logger.error(

0 commit comments

Comments
 (0)