From c7a29a9d6340b136e70b8ecbfdc5d612099d2b6e Mon Sep 17 00:00:00 2001 From: Ivan Shymko Date: Mon, 20 Apr 2026 07:52:27 +0000 Subject: [PATCH 1/3] fix: rely on agent executor implementation for stream termination --- .../request_handlers/default_request_handler_v2.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/a2a/server/request_handlers/default_request_handler_v2.py b/src/a2a/server/request_handlers/default_request_handler_v2.py index c0c6b5445..4d199f197 100644 --- a/src/a2a/server/request_handlers/default_request_handler_v2.py +++ b/src/a2a/server/request_handlers/default_request_handler_v2.py @@ -271,11 +271,15 @@ async def on_message_send( # noqa: D102 ): self._validate_task_id_match(task_id, event.id) result = event + # DO break here as it's "return_immediately". + # AgentExecutor will continue to run in the background. break if isinstance(event, Message): result = event - break + # Do NOT break here as Message is supposed to be the only + # event in "Message-only" interaction. + # We rely on AgentExecutor here to simplify wrong implementations detection. if result is None: logger.debug('Missing result for task %s', request_context.task_id) @@ -311,15 +315,14 @@ async def on_message_send_stream( # noqa: D102 request=request_context, include_initial_task=False, ): + # Do NOT break here as we rely on AgentExecutor to yield control + # to simplify wrong implementations detection. if isinstance(event, Task): self._validate_task_id_match(task_id, event.id) yield apply_history_length(event, params.configuration) else: yield event - if isinstance(event, Message): - break - @validate_request_params @validate( lambda self: self._agent_card.capabilities.push_notifications, From e3236ce89ac5e58e7bd1d29fcd4603567213229e Mon Sep 17 00:00:00 2001 From: Ivan Shymko Date: Mon, 20 Apr 2026 08:15:38 +0000 Subject: [PATCH 2/3] Updates --- .../default_request_handler_v2.py | 12 +- .../test_default_request_handler_v2.py | 163 ++++++++++++++++++ 2 files changed, 172 insertions(+), 3 deletions(-) diff --git a/src/a2a/server/request_handlers/default_request_handler_v2.py b/src/a2a/server/request_handlers/default_request_handler_v2.py index 4d199f197..ecdc0cfef 100644 --- a/src/a2a/server/request_handlers/default_request_handler_v2.py +++ b/src/a2a/server/request_handlers/default_request_handler_v2.py @@ -279,7 +279,9 @@ async def on_message_send( # noqa: D102 result = event # Do NOT break here as Message is supposed to be the only # event in "Message-only" interaction. - # We rely on AgentExecutor here to simplify wrong implementations detection. + # ActiveTask consumer (see active_task.py) validates the event + # stream and raises InvalidAgentResponseError if more events are + # pushed after a Message. if result is None: logger.debug('Missing result for task %s', request_context.task_id) @@ -315,8 +317,12 @@ async def on_message_send_stream( # noqa: D102 request=request_context, include_initial_task=False, ): - # Do NOT break here as we rely on AgentExecutor to yield control - # to simplify wrong implementations detection. + # Do NOT break here as we rely on AgentExecutor to yield control. + # ActiveTask consumer (see active_task.py) validates the event + # stream and raises InvalidAgentResponseError on misbehaving agents: + # - an event after a Message + # - Message after entering task mode + # - an event after a terminal state if isinstance(event, Task): self._validate_task_id_match(task_id, event.id) yield apply_history_length(event, params.configuration) diff --git a/tests/server/request_handlers/test_default_request_handler_v2.py b/tests/server/request_handlers/test_default_request_handler_v2.py index fda1ab960..6843e1471 100644 --- a/tests/server/request_handlers/test_default_request_handler_v2.py +++ b/tests/server/request_handlers/test_default_request_handler_v2.py @@ -28,6 +28,7 @@ ) from a2a.types import ( InternalError, + InvalidAgentResponseError, InvalidParamsError, TaskNotFoundError, PushNotificationNotSupportedError, @@ -1244,3 +1245,165 @@ async def test_on_message_send_with_push_notification(): push_store.set_info.assert_awaited_once_with( result.id, push_config, context ) + + +class MultipleMessagesAgentExecutor(AgentExecutor): + """Misbehaving agent that yields more than one Message.""" + + async def execute(self, context: RequestContext, event_queue: EventQueue): + await event_queue.enqueue_event( + new_text_message('first', role=Role.ROLE_AGENT) + ) + await event_queue.enqueue_event( + new_text_message('second', role=Role.ROLE_AGENT) + ) + + async def cancel(self, context: RequestContext, event_queue: EventQueue): + pass + + +class MessageAfterTaskEventAgentExecutor(AgentExecutor): + """Misbehaving agent that yields a task-mode event then a Message.""" + + async def execute(self, context: RequestContext, event_queue: EventQueue): + task = new_task_from_user_message(context.message) + await event_queue.enqueue_event(task) + updater = TaskUpdater(event_queue, task.id, task.context_id) + await updater.update_status(TaskState.TASK_STATE_WORKING) + await event_queue.enqueue_event( + new_text_message('stray message', role=Role.ROLE_AGENT) + ) + + async def cancel(self, context: RequestContext, event_queue: EventQueue): + pass + + +class TaskEventAfterMessageAgentExecutor(AgentExecutor): + """Misbehaving agent that yields a Message and then a task-mode event.""" + + async def execute(self, context: RequestContext, event_queue: EventQueue): + await event_queue.enqueue_event( + new_text_message('only message', role=Role.ROLE_AGENT) + ) + await event_queue.enqueue_event( + TaskStatusUpdateEvent( + task_id=str(context.task_id or ''), + context_id=str(context.context_id or ''), + status=TaskStatus(state=TaskState.TASK_STATE_WORKING), + ) + ) + + async def cancel(self, context: RequestContext, event_queue: EventQueue): + pass + + +class EventAfterTerminalStateAgentExecutor(AgentExecutor): + """Misbehaving agent that yields an event after reaching a terminal state.""" + + async def execute(self, context: RequestContext, event_queue: EventQueue): + task = new_task_from_user_message(context.message) + await event_queue.enqueue_event(task) + updater = TaskUpdater(event_queue, task.id, task.context_id) + await updater.complete() + await event_queue.enqueue_event( + new_text_message('after terminal', role=Role.ROLE_AGENT) + ) + + async def cancel(self, context: RequestContext, event_queue: EventQueue): + pass + + +@pytest.mark.asyncio +@pytest.mark.timeout(1) +async def test_on_message_send_stream_rejects_multiple_messages(): + """Stream surfaces InvalidAgentResponseError when the agent yields a + second Message after the first one (see comment in on_message_send_stream).""" + request_handler = DefaultRequestHandlerV2( + agent_executor=MultipleMessagesAgentExecutor(), + task_store=InMemoryTaskStore(), + agent_card=create_default_agent_card(), + ) + params = SendMessageRequest( + message=Message( + role=Role.ROLE_USER, + message_id='msg_multi_stream', + parts=[Part(text='Hi')], + ) + ) + with pytest.raises(InvalidAgentResponseError): + async for _ in request_handler.on_message_send_stream( + params, create_server_call_context() + ): + pass + + +@pytest.mark.asyncio +@pytest.mark.timeout(1) +async def test_on_message_send_stream_rejects_message_after_task_event(): + """Stream surfaces InvalidAgentResponseError when the agent yields a + Message after entering task mode (see comment in on_message_send_stream).""" + request_handler = DefaultRequestHandlerV2( + agent_executor=MessageAfterTaskEventAgentExecutor(), + task_store=InMemoryTaskStore(), + agent_card=create_default_agent_card(), + ) + params = SendMessageRequest( + message=Message( + role=Role.ROLE_USER, + message_id='msg_after_task_stream', + parts=[Part(text='Hi')], + ) + ) + with pytest.raises(InvalidAgentResponseError): + async for _ in request_handler.on_message_send_stream( + params, create_server_call_context() + ): + pass + + +@pytest.mark.asyncio +@pytest.mark.timeout(1) +async def test_on_message_send_stream_rejects_task_event_after_message(): + """Stream surfaces InvalidAgentResponseError when the agent yields a + task-mode event after a Message (see comment in on_message_send_stream).""" + request_handler = DefaultRequestHandlerV2( + agent_executor=TaskEventAfterMessageAgentExecutor(), + task_store=InMemoryTaskStore(), + agent_card=create_default_agent_card(), + ) + params = SendMessageRequest( + message=Message( + role=Role.ROLE_USER, + message_id='msg_then_task_stream', + parts=[Part(text='Hi')], + ) + ) + with pytest.raises(InvalidAgentResponseError): + async for _ in request_handler.on_message_send_stream( + params, create_server_call_context() + ): + pass + + +@pytest.mark.asyncio +@pytest.mark.timeout(1) +async def test_on_message_send_stream_rejects_event_after_terminal_state(): + """Stream surfaces InvalidAgentResponseError when the agent yields an event + after reaching a terminal state (see comment in on_message_send_stream).""" + request_handler = DefaultRequestHandlerV2( + agent_executor=EventAfterTerminalStateAgentExecutor(), + task_store=InMemoryTaskStore(), + agent_card=create_default_agent_card(), + ) + params = SendMessageRequest( + message=Message( + role=Role.ROLE_USER, + message_id='msg_after_terminal_stream', + parts=[Part(text='Hi')], + ) + ) + with pytest.raises(InvalidAgentResponseError): + async for _ in request_handler.on_message_send_stream( + params, create_server_call_context() + ): + pass From f5123895072ca446d51ac40b63190f5f892993d2 Mon Sep 17 00:00:00 2001 From: Ivan Shymko Date: Mon, 20 Apr 2026 08:28:02 +0000 Subject: [PATCH 3/3] Assert error messages --- .../test_default_request_handler_v2.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/server/request_handlers/test_default_request_handler_v2.py b/tests/server/request_handlers/test_default_request_handler_v2.py index 6843e1471..e35b8f720 100644 --- a/tests/server/request_handlers/test_default_request_handler_v2.py +++ b/tests/server/request_handlers/test_default_request_handler_v2.py @@ -1330,7 +1330,7 @@ async def test_on_message_send_stream_rejects_multiple_messages(): parts=[Part(text='Hi')], ) ) - with pytest.raises(InvalidAgentResponseError): + with pytest.raises(InvalidAgentResponseError, match='Multiple Message'): async for _ in request_handler.on_message_send_stream( params, create_server_call_context() ): @@ -1354,7 +1354,9 @@ async def test_on_message_send_stream_rejects_message_after_task_event(): parts=[Part(text='Hi')], ) ) - with pytest.raises(InvalidAgentResponseError): + with pytest.raises( + InvalidAgentResponseError, match='Message object in task mode' + ): async for _ in request_handler.on_message_send_stream( params, create_server_call_context() ): @@ -1378,7 +1380,7 @@ async def test_on_message_send_stream_rejects_task_event_after_message(): parts=[Part(text='Hi')], ) ) - with pytest.raises(InvalidAgentResponseError): + with pytest.raises(InvalidAgentResponseError, match='in message mode'): async for _ in request_handler.on_message_send_stream( params, create_server_call_context() ): @@ -1402,7 +1404,9 @@ async def test_on_message_send_stream_rejects_event_after_terminal_state(): parts=[Part(text='Hi')], ) ) - with pytest.raises(InvalidAgentResponseError): + with pytest.raises( + InvalidAgentResponseError, match='Message object in task mode' + ): async for _ in request_handler.on_message_send_stream( params, create_server_call_context() ):