@@ -287,59 +287,26 @@ async def _send_push_notification_if_needed(
287287 if isinstance (latest_task , Task ):
288288 await self ._push_sender .send_notification (latest_task )
289289
290- async def _on_message_send_non_blocking (
290+ async def _consume_and_notify_in_background ( # noqa: PLR0913
291291 self ,
292- params : MessageSendParams ,
293- task_manager : TaskManager ,
294292 task_id : str ,
295- queue : EventQueue ,
296293 result_aggregator : ResultAggregator ,
297- producer_task : asyncio .Task ,
298294 consumer : EventConsumer ,
299- ) -> Task :
300- """Non-blocking fast path for ``on_message_send``.
301-
302- Returns the task immediately without waiting for executor events.
303- Event consumption and push notifications are handled in a background
304- task. This avoids the latency introduced by the ``EventConsumer``
305- polling loop which can add seconds of delay when the event loop is
306- busy with other work.
307- """
308- task = await task_manager .get_task ()
309- if not task :
310- task = Task (
311- id = task_id ,
312- context_id = params .message .context_id ,
313- status = TaskStatus (state = TaskState .submitted ),
314- history = [params .message ],
315- )
316-
317- async def _background_consume () -> None :
318- try :
319- async for _event in result_aggregator .consume_and_emit (
320- consumer
321- ):
322- await self ._send_push_notification_if_needed (
323- task_id , result_aggregator
324- )
325- except Exception :
326- logger .exception (
327- 'Background event consumption failed for task %s' ,
328- task_id ,
295+ producer_task : asyncio .Task ,
296+ ) -> None :
297+ """Consume executor events and send push notifications in background."""
298+ try :
299+ async for _event in result_aggregator .consume_and_emit (consumer ):
300+ await self ._send_push_notification_if_needed (
301+ task_id , result_aggregator
329302 )
330- finally :
331- await self ._cleanup_producer (producer_task , task_id )
332-
333- bg_task = asyncio .create_task (_background_consume ())
334- bg_task .set_name (f'non_blocking_consume:{ task_id } ' )
335- self ._track_background_task (bg_task )
336-
337- if params .configuration :
338- task = apply_history_length (
339- task , params .configuration .history_length
303+ except Exception :
304+ logger .exception (
305+ 'Background event consumption failed for task %s' ,
306+ task_id ,
340307 )
341-
342- return task
308+ finally :
309+ await self . _cleanup_producer ( producer_task , task_id )
343310
344311 async def on_message_send (
345312 self ,
@@ -371,16 +338,32 @@ async def on_message_send(
371338 if params .configuration and params .configuration .blocking is False :
372339 blocking = False
373340
341+ # Non-blocking fast path: return the task immediately and process
342+ # events entirely in the background via push notifications.
374343 if not blocking :
375- return await self ._on_message_send_non_blocking (
376- params ,
377- _task_manager ,
378- task_id ,
379- queue ,
380- result_aggregator ,
381- producer_task ,
382- consumer ,
344+ task = await _task_manager .get_task ()
345+ if not task :
346+ task = Task (
347+ id = task_id ,
348+ context_id = params .message .context_id ,
349+ status = TaskStatus (state = TaskState .submitted ),
350+ history = [params .message ],
351+ )
352+
353+ bg_task = asyncio .create_task (
354+ self ._consume_and_notify_in_background (
355+ task_id , result_aggregator , consumer , producer_task
356+ )
383357 )
358+ bg_task .set_name (f'non_blocking_consume:{ task_id } ' )
359+ self ._track_background_task (bg_task )
360+
361+ if params .configuration :
362+ task = apply_history_length (
363+ task , params .configuration .history_length
364+ )
365+
366+ return task
384367
385368 # Blocking path: wait for completion or interruption.
386369 interrupted_or_non_blocking = False
0 commit comments