You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
fix: make event_consumer tolerant to closed queues on py3.13 (#407)
# Issue
On Python 3.13, the closed-queue signal is `asyncio.QueueShutDown`.
`event_consumer.py` aliased `asyncio.QueueShutDown` to `ClosedQueue`,
but `asyncio.QueueEmpty` can still arise on py3.13 and it's not handled
properly in `consume_all()`'s except blocks.
# How it's reproduced
In any `AgentExecutor` class:
```
# (...)
async def execute(
self,
request: RequestContext,
event_queue: EventQueue,
) -> None:
await event_queue.enqueue_event(
new_task(request.message)
)
```
In python < 3.13:
- Sending a `message/send` request works, a task is added to the
`InMemoryTaskStore`.
In python >= 3.13:
- Sending a `message/send` request crashes with exception:
```
File "venv/lib/python3.13/site-packages/a2a/server/apps/jsonrpc/jsonrpc_app.py", line 219, in _handle_requests
return await self._process_non_streaming_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
request_id, a2a_request, call_context
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "venv/lib/python3.13/site-packages/a2a/server/apps/jsonrpc/jsonrpc_app.py", line 306, in _process_non_streaming_request
handler_result = await self.handler.on_message_send(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
request_obj, context
^^^^^^^^^^^^^^^^^^^^
)
^
File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "venv/lib/python3.13/site-packages/a2a/server/request_handlers/jsonrpc_handler.py", line 87, in on_message_send
task_or_message = await self.request_handler.on_message_send(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
request.params, context
^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 282, in on_message_send
) = await result_aggregator.consume_and_break_on_interrupt(consumer)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "venv/lib/python3.13/site-packages/a2a/server/tasks/result_aggregator.py", line 115, in consume_and_break_on_interrupt
async for event in event_stream:
...<20 lines>...
break
File "venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 87, in consume_all
raise self._exception
File "venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 94, in consume_all
event = await asyncio.wait_for(
^^^^^^^^^^^^^^^^^^^^^^^
self.queue.dequeue_event(), timeout=self._timeout
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/opt/homebrew/Cellar/python@3.13/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
return await fut
^^^^^^^^^
File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "venv/lib/python3.13/site-packages/a2a/server/events/event_queue.py", line 95, in dequeue_event
raise asyncio.QueueEmpty('Queue is closed.')
asyncio.queues.QueueEmpty: Queue is closed.
```
# Fix
## Code
- `event_consumer.consume_all`:
- Catch `(QueueClosed, asyncio.QueueEmpty)` and break only when
`queue.is_closed()` is True; otherwise continue polling.
- `event_queue.dequeue_event`:
- Version-guard the early-raise: on <3.13 keep raising `QueueEmpty` when
closed+empty; on ≥3.13 skip the early raise and rely on
`queue.shutdown()` to surface `QueueShutDown` exceptions.
## Tests
Added 2 tests which fail on current implementation, but pass after the
fix.
---------
Co-authored-by: taralesc <taralesc@adobe.com>
0 commit comments