Skip to content

Commit c74ec07

Browse files
ruimgfclaude
andcommitted
fix: extract non-blocking path to method, update tests
- Extract non-blocking logic to _on_message_send_non_blocking to fix PLR0915 (too many statements) lint error - Update test_on_message_send_with_push_notification_in_non_blocking_request to test the new immediate-return behavior instead of mocking consume_and_break_on_interrupt - Update test_on_message_send_non_blocking history assertion to account for the immediate return having only the initial user message Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 481bdbc commit c74ec07

2 files changed

Lines changed: 99 additions & 154 deletions

File tree

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 64 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,60 @@ async def _send_push_notification_if_needed(
287287
if isinstance(latest_task, Task):
288288
await self._push_sender.send_notification(latest_task)
289289

290+
async def _on_message_send_non_blocking(
291+
self,
292+
params: MessageSendParams,
293+
task_manager: TaskManager,
294+
task_id: str,
295+
queue: EventQueue,
296+
result_aggregator: ResultAggregator,
297+
producer_task: asyncio.Task,
298+
consumer: EventConsumer,
299+
) -> Task:
300+
"""Non-blocking fast path for ``on_message_send``.
301+
302+
Returns the task immediately without waiting for executor events.
303+
Event consumption and push notifications are handled in a background
304+
task. This avoids the latency introduced by the ``EventConsumer``
305+
polling loop which can add seconds of delay when the event loop is
306+
busy with other work.
307+
"""
308+
task = await task_manager.get_task()
309+
if not task:
310+
task = Task(
311+
id=task_id,
312+
context_id=params.message.context_id,
313+
status=TaskStatus(state=TaskState.submitted),
314+
history=[params.message],
315+
)
316+
317+
async def _background_consume() -> None:
318+
try:
319+
async for _event in result_aggregator.consume_and_emit(
320+
consumer
321+
):
322+
await self._send_push_notification_if_needed(
323+
task_id, result_aggregator
324+
)
325+
except Exception:
326+
logger.exception(
327+
'Background event consumption failed for task %s',
328+
task_id,
329+
)
330+
finally:
331+
await self._cleanup_producer(producer_task, task_id)
332+
333+
bg_task = asyncio.create_task(_background_consume())
334+
bg_task.set_name(f'non_blocking_consume:{task_id}')
335+
self._track_background_task(bg_task)
336+
337+
if params.configuration:
338+
task = apply_history_length(
339+
task, params.configuration.history_length
340+
)
341+
342+
return task
343+
290344
async def on_message_send(
291345
self,
292346
params: MessageSendParams,
@@ -300,9 +354,7 @@ async def on_message_send(
300354
When ``blocking`` is ``False``, the handler returns the task
301355
immediately without waiting for executor events and processes
302356
everything in the background. Results are delivered via push
303-
notifications. This avoids the latency introduced by the
304-
``EventConsumer`` polling loop which can add seconds of delay
305-
when the event loop is busy with other work.
357+
notifications.
306358
"""
307359
(
308360
_task_manager,
@@ -319,44 +371,16 @@ async def on_message_send(
319371
if params.configuration and params.configuration.blocking is False:
320372
blocking = False
321373

322-
# Non-blocking fast path: return the task immediately and process
323-
# events entirely in the background via push notifications.
324374
if not blocking:
325-
task = await _task_manager.get_task()
326-
if not task:
327-
task = Task(
328-
id=task_id,
329-
context_id=params.message.context_id,
330-
status=TaskStatus(state=TaskState.submitted),
331-
history=[params.message],
332-
)
333-
334-
async def _background_consume() -> None:
335-
try:
336-
async for _event in result_aggregator.consume_and_emit(
337-
consumer
338-
):
339-
await self._send_push_notification_if_needed(
340-
task_id, result_aggregator
341-
)
342-
except Exception:
343-
logger.exception(
344-
'Background event consumption failed for task %s',
345-
task_id,
346-
)
347-
finally:
348-
await self._cleanup_producer(producer_task, task_id)
349-
350-
bg_task = asyncio.create_task(_background_consume())
351-
bg_task.set_name(f'non_blocking_consume:{task_id}')
352-
self._track_background_task(bg_task)
353-
354-
if params.configuration:
355-
task = apply_history_length(
356-
task, params.configuration.history_length
357-
)
358-
359-
return task
375+
return await self._on_message_send_non_blocking(
376+
params,
377+
_task_manager,
378+
task_id,
379+
queue,
380+
result_aggregator,
381+
producer_task,
382+
consumer,
383+
)
360384

361385
# Blocking path: wait for completion or interruption.
362386
interrupted_or_non_blocking = False

tests/server/request_handlers/test_default_request_handler.py

Lines changed: 35 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -460,135 +460,56 @@ async def get_current_result():
460460

461461
@pytest.mark.asyncio
462462
async def test_on_message_send_with_push_notification_in_non_blocking_request():
463-
"""Test that push notification callback is called during background event processing for non-blocking requests."""
464-
mock_task_store = AsyncMock(spec=TaskStore)
465-
mock_push_notification_store = AsyncMock(spec=PushNotificationConfigStore)
466-
mock_agent_executor = AsyncMock(spec=AgentExecutor)
467-
mock_request_context_builder = AsyncMock(spec=RequestContextBuilder)
463+
"""Test that non-blocking requests return immediately and process push notifications in background."""
464+
task_store = InMemoryTaskStore()
465+
push_store = InMemoryPushNotificationConfigStore()
468466
mock_push_sender = AsyncMock()
469467

470-
task_id = 'non_blocking_task_1'
471-
context_id = 'non_blocking_ctx_1'
472-
473-
# Create a task that will be returned after the first event
474-
initial_task = create_sample_task(
475-
task_id=task_id, context_id=context_id, status_state=TaskState.working
476-
)
477-
478-
# Create a final task that will be available during background processing
479-
final_task = create_sample_task(
480-
task_id=task_id, context_id=context_id, status_state=TaskState.completed
481-
)
482-
483-
mock_task_store.get.return_value = None
484-
485-
# Mock request context
486-
mock_request_context = MagicMock(spec=RequestContext)
487-
mock_request_context.task_id = task_id
488-
mock_request_context.context_id = context_id
489-
mock_request_context_builder.build.return_value = mock_request_context
490-
491468
request_handler = DefaultRequestHandler(
492-
agent_executor=mock_agent_executor,
493-
task_store=mock_task_store,
494-
push_config_store=mock_push_notification_store,
495-
request_context_builder=mock_request_context_builder,
469+
agent_executor=HelloAgentExecutor(),
470+
task_store=task_store,
471+
push_config_store=push_store,
496472
push_sender=mock_push_sender,
497473
)
498474

499-
# Configure push notification
500475
push_config = PushNotificationConfig(url='http://callback.com/push')
501-
message_config = MessageSendConfiguration(
502-
push_notification_config=push_config,
503-
accepted_output_modes=['text/plain'],
504-
blocking=False, # Non-blocking request
505-
)
506476
params = MessageSendParams(
507477
message=Message(
508478
role=Role.user,
509479
message_id='msg_non_blocking',
510-
parts=[],
511-
task_id=task_id,
512-
context_id=context_id,
480+
parts=[Part(root=TextPart(text='Hi'))],
481+
),
482+
configuration=MessageSendConfiguration(
483+
push_notification_config=push_config,
484+
accepted_output_modes=['text/plain'],
485+
blocking=False,
513486
),
514-
configuration=message_config,
515-
)
516-
517-
# Mock ResultAggregator with custom behavior
518-
mock_result_aggregator_instance = AsyncMock(spec=ResultAggregator)
519-
520-
# First call returns the initial task and indicates interruption (non-blocking)
521-
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
522-
initial_task,
523-
True, # interrupted = True for non-blocking
524-
MagicMock(spec=asyncio.Task), # background task
525-
)
526-
527-
# Mock the current_result property to return the final task
528-
async def get_current_result():
529-
return final_task
530-
531-
type(mock_result_aggregator_instance).current_result = PropertyMock(
532-
return_value=get_current_result()
533487
)
534488

535-
# Track if the event_callback was passed to consume_and_break_on_interrupt
536-
event_callback_passed = False
537-
event_callback_received = None
538-
539-
async def mock_consume_and_break_on_interrupt(
540-
consumer, blocking=True, event_callback=None
541-
):
542-
nonlocal event_callback_passed, event_callback_received
543-
event_callback_passed = event_callback is not None
544-
event_callback_received = event_callback
545-
return (
546-
initial_task,
547-
True,
548-
MagicMock(spec=asyncio.Task),
549-
) # interrupted = True for non-blocking
550-
551-
mock_result_aggregator_instance.consume_and_break_on_interrupt = (
552-
mock_consume_and_break_on_interrupt
489+
result = await request_handler.on_message_send(
490+
params, create_server_call_context()
553491
)
554492

555-
with (
556-
patch(
557-
'a2a.server.request_handlers.default_request_handler.ResultAggregator',
558-
return_value=mock_result_aggregator_instance,
559-
),
560-
patch(
561-
'a2a.server.request_handlers.default_request_handler.TaskManager.get_task',
562-
return_value=initial_task,
563-
),
564-
patch(
565-
'a2a.server.request_handlers.default_request_handler.TaskManager.update_with_message',
566-
return_value=initial_task,
567-
),
568-
):
569-
# Execute the non-blocking request
570-
result = await request_handler.on_message_send(
571-
params, create_server_call_context()
572-
)
493+
# Non-blocking: should return immediately with submitted state
494+
assert result is not None
495+
assert isinstance(result, Task)
496+
assert result.status.state == TaskState.submitted
573497

574-
# Verify the result is the initial task (non-blocking behavior)
575-
assert result == initial_task
498+
# Wait for background processing to complete
499+
for _ in range(10):
500+
await asyncio.sleep(0.1)
501+
task = await task_store.get(result.id)
502+
if task and task.status.state == TaskState.completed:
503+
break
576504

577-
# Verify that the event_callback was passed to consume_and_break_on_interrupt
578-
assert event_callback_passed, (
579-
'event_callback should have been passed to consume_and_break_on_interrupt'
580-
)
581-
assert event_callback_received is not None, (
582-
'event_callback should not be None'
583-
)
505+
assert task is not None
506+
assert task.status.state == TaskState.completed
584507

585-
# Verify that the push notification was sent with the final task
586-
mock_push_sender.send_notification.assert_called_with(final_task)
508+
# Verify push notification config was stored
509+
push_store.set_info.assert_awaited_once_with(result.id, push_config)
587510

588-
# Verify that the push notification config was stored
589-
mock_push_notification_store.set_info.assert_awaited_once_with(
590-
task_id, push_config
591-
)
511+
# Verify push notifications were sent during background processing
512+
assert mock_push_sender.send_notification.call_count >= 1
592513

593514

594515
@pytest.mark.asyncio
@@ -843,11 +764,11 @@ async def test_on_message_send_non_blocking():
843764

844765
assert task is not None
845766
assert task.status.state == TaskState.completed
846-
assert (
847-
result.history
848-
and task.history
849-
and len(result.history) == len(task.history)
850-
)
767+
# The immediately returned result has the initial history (user message),
768+
# while the completed task may have additional history entries from the
769+
# executor. The initial result should have at least the user message.
770+
assert result.history and len(result.history) >= 1
771+
assert task.history and len(task.history) >= 1
851772

852773

853774
@pytest.mark.asyncio

0 commit comments

Comments
 (0)