Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ async def on_message_send(
blocking = False

interrupted_or_non_blocking = False
success = False
try:
# Create async callback for push notifications
async def push_notification_callback() -> None:
Expand All @@ -327,6 +328,7 @@ async def push_notification_callback() -> None:
blocking=blocking,
event_callback=push_notification_callback,
)
success = True

except Exception:
logger.exception('Agent execution failed')
Expand All @@ -339,7 +341,14 @@ async def push_notification_callback() -> None:
cleanup_task.set_name(f'cleanup_producer:{task_id}')
self._track_background_task(cleanup_task)
else:
await self._cleanup_producer(producer_task, task_id)
# If we are blocking and not interrupted, but the result is not set
# (meaning exception or other failure), we should cancel the producer.
# 'result' (local var) is bound before this block if success.
# However, to be safe, we can check if successful using a flag.
cancel_producer = not success
await self._cleanup_producer(
producer_task, task_id, cancel=cancel_producer
)
Comment on lines +345 to +352
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The logic here is correct, but the comment is quite verbose and an intermediate variable is used. For improved readability and conciseness, you could combine the variable assignment with the function call and shorten the comment.

                # In blocking mode, if an exception occurred (i.e., `success` is False),
                # the producer task might be running indefinitely. We must cancel it
                # to prevent the cleanup from hanging while waiting for it.
                await self._cleanup_producer(
                    producer_task, task_id, cancel=not success
                )


if not result:
raise ServerError(error=InternalError())
Expand Down Expand Up @@ -433,8 +442,11 @@ async def _cleanup_producer(
self,
producer_task: asyncio.Task,
task_id: str,
cancel: bool = False,
) -> None:
"""Cleans up the agent execution task and queue manager entry."""
if cancel:
producer_task.cancel()
await producer_task
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When cancel=True, producer_task.cancel() is called. Subsequently, await producer_task will raise an asyncio.CancelledError. This exception is currently unhandled. If _cleanup_producer is called from a finally block (as it is in on_message_send), this unhandled CancelledError can mask the original exception that led to the finally block executing. To prevent this and handle the expected cancellation gracefully, it's best to wrap the await in a try...except block.

Suggested change
await producer_task
try:
await producer_task
except asyncio.CancelledError:
# This is expected if the task was cancelled, so we can ignore.
pass

await self._queue_manager.close(task_id)
async with self._running_agents_lock:
Expand Down
Loading