Skip to content

Commit 481bdbc

Browse files
ruimgfclaude
andcommitted
fix: return task immediately when blocking=False in on_message_send
When `blocking=False` is set in MessageSendConfiguration, the handler now returns the Task object immediately without waiting for executor events. Event consumption and push notifications are processed entirely in the background. Previously, even with `blocking=False`, the handler waited for the first event from the EventConsumer polling loop (0.5s timeout per iteration). When the event loop was busy with background work, this caused 5-7s delays before the HTTP response was sent, leading to client timeouts. The new non-blocking fast path: 1. Creates the task and starts the executor (via _setup_message_execution) 2. Returns the task in 'submitted' state immediately 3. Consumes events and sends push notifications in a background task The blocking path remains unchanged for backward compatibility. Fixes #951 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b941eef commit 481bdbc

1 file changed

Lines changed: 49 additions & 1 deletion

File tree

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
TaskPushNotificationConfig,
4242
TaskQueryParams,
4343
TaskState,
44+
TaskStatus,
4445
UnsupportedOperationError,
4546
)
4647
from a2a.utils.errors import ServerError
@@ -295,6 +296,13 @@ async def on_message_send(
295296
296297
Starts the agent execution for the message and waits for the final
297298
result (Task or Message).
299+
300+
When ``blocking`` is ``False``, the handler returns the task
301+
immediately without waiting for executor events and processes
302+
everything in the background. Results are delivered via push
303+
notifications. This avoids the latency introduced by the
304+
``EventConsumer`` polling loop which can add seconds of delay
305+
when the event loop is busy with other work.
298306
"""
299307
(
300308
_task_manager,
@@ -311,6 +319,46 @@ async def on_message_send(
311319
if params.configuration and params.configuration.blocking is False:
312320
blocking = False
313321

322+
# Non-blocking fast path: return the task immediately and process
323+
# events entirely in the background via push notifications.
324+
if not blocking:
325+
task = await _task_manager.get_task()
326+
if not task:
327+
task = Task(
328+
id=task_id,
329+
context_id=params.message.context_id,
330+
status=TaskStatus(state=TaskState.submitted),
331+
history=[params.message],
332+
)
333+
334+
async def _background_consume() -> None:
335+
try:
336+
async for _event in result_aggregator.consume_and_emit(
337+
consumer
338+
):
339+
await self._send_push_notification_if_needed(
340+
task_id, result_aggregator
341+
)
342+
except Exception:
343+
logger.exception(
344+
'Background event consumption failed for task %s',
345+
task_id,
346+
)
347+
finally:
348+
await self._cleanup_producer(producer_task, task_id)
349+
350+
bg_task = asyncio.create_task(_background_consume())
351+
bg_task.set_name(f'non_blocking_consume:{task_id}')
352+
self._track_background_task(bg_task)
353+
354+
if params.configuration:
355+
task = apply_history_length(
356+
task, params.configuration.history_length
357+
)
358+
359+
return task
360+
361+
# Blocking path: wait for completion or interruption.
314362
interrupted_or_non_blocking = False
315363
try:
316364
# Create async callback for push notifications
@@ -325,7 +373,7 @@ async def push_notification_callback() -> None:
325373
bg_consume_task,
326374
) = await result_aggregator.consume_and_break_on_interrupt(
327375
consumer,
328-
blocking=blocking,
376+
blocking=True,
329377
event_callback=push_notification_callback,
330378
)
331379

0 commit comments

Comments
 (0)