@@ -312,6 +312,7 @@ async def on_message_send(
312312 blocking = False
313313
314314 interrupted_or_non_blocking = False
315+ success = False
315316 try :
316317 # Create async callback for push notifications
317318 async def push_notification_callback () -> None :
@@ -327,6 +328,7 @@ async def push_notification_callback() -> None:
327328 blocking = blocking ,
328329 event_callback = push_notification_callback ,
329330 )
331+ success = True
330332
331333 except Exception :
332334 logger .exception ('Agent execution failed' )
@@ -339,7 +341,14 @@ async def push_notification_callback() -> None:
339341 cleanup_task .set_name (f'cleanup_producer:{ task_id } ' )
340342 self ._track_background_task (cleanup_task )
341343 else :
342- await self ._cleanup_producer (producer_task , task_id )
344+ # If we are blocking and not interrupted, but the result is not set
345+ # (meaning exception or other failure), we should cancel the producer.
346+ # 'result' (local var) is bound before this block if success.
347+ # However, to be safe, we can check if successful using a flag.
348+ cancel_producer = not success
349+ await self ._cleanup_producer (
350+ producer_task , task_id , cancel = cancel_producer
351+ )
343352
344353 if not result :
345354 raise ServerError (error = InternalError ())
@@ -433,8 +442,11 @@ async def _cleanup_producer(
433442 self ,
434443 producer_task : asyncio .Task ,
435444 task_id : str ,
445+ cancel : bool = False ,
436446 ) -> None :
437447 """Cleans up the agent execution task and queue manager entry."""
448+ if cancel :
449+ producer_task .cancel ()
438450 await producer_task
439451 await self ._queue_manager .close (task_id )
440452 async with self ._running_agents_lock :
0 commit comments