Skip to content

Commit 694cb0b

Browse files
committed
EventQueue: enqueue items in child queues without blocking
1 parent fd0a1bd commit 694cb0b

2 files changed

Lines changed: 72 additions & 5 deletions

File tree

src/a2a/server/events/event_queue.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
4141
self._children: list[EventQueue] = []
4242
self._is_closed = False
4343
self._lock = asyncio.Lock()
44+
self._bg_tasks: set[asyncio.Task[None]] = set()
4445
logger.debug('EventQueue initialized.')
4546

4647
async def enqueue_event(self, event: Event) -> None:
47-
"""Enqueues an event to this queue and all its children.
48+
"""Enqueues an event to this queue and propagates it to all child queues.
4849
4950
Args:
5051
event: The event object to enqueue.
@@ -59,7 +60,12 @@ async def enqueue_event(self, event: Event) -> None:
5960
# Make sure to use put instead of put_nowait to avoid blocking the event loop.
6061
await self.queue.put(event)
6162
for child in self._children:
62-
await child.enqueue_event(event)
63+
# We use a background task to enqueue to children to avoid blocking
64+
# the parent queue if a child queue is full (e.g. slow consumer).
65+
# This prevents deadlocks where a slow consumer blocks the producer.
66+
task = asyncio.create_task(child.enqueue_event(event))
67+
self._bg_tasks.add(task)
68+
task.add_done_callback(self._bg_tasks.discard)
6369

6470
async def dequeue_event(self, no_wait: bool = False) -> Event:
6571
"""Dequeues an event from the queue.
@@ -132,6 +138,17 @@ def tap(self) -> 'EventQueue':
132138
self._children.append(queue)
133139
return queue
134140

141+
async def flush(self) -> None:
142+
"""Waits for all pending background propagation tasks to complete recursively."""
143+
while self._bg_tasks:
144+
# Copy the set to avoid "Set changed size during iteration"
145+
tasks = list(self._bg_tasks)
146+
if tasks:
147+
await asyncio.gather(*tasks, return_exceptions=True)
148+
149+
if self._children:
150+
await asyncio.gather(*(child.flush() for child in self._children))
151+
135152
async def close(self, immediate: bool = False) -> None:
136153
"""Closes the queue for future push events and also closes all child queues.
137154
@@ -161,6 +178,12 @@ async def close(self, immediate: bool = False) -> None:
161178
return
162179
if not self._is_closed:
163180
self._is_closed = True
181+
182+
if immediate:
183+
# Cancel all pending background propagation tasks
184+
for task in self._bg_tasks:
185+
task.cancel()
186+
164187
# If using python 3.13 or higher, use shutdown but match <3.13 semantics
165188
if sys.version_info >= (3, 13):
166189
if immediate:
@@ -170,10 +193,12 @@ async def close(self, immediate: bool = False) -> None:
170193
for child in self._children:
171194
await child.close(True)
172195
return
173-
# Graceful: prevent further gets/puts via shutdown, then wait for drain and children
196+
# Graceful: prevent further gets/puts via shutdown, then wait for drain, propagation and children
174197
self.queue.shutdown(False)
175198
await asyncio.gather(
176-
self.queue.join(), *(child.close() for child in self._children)
199+
self.queue.join(),
200+
self.flush(),
201+
*(child.close() for child in self._children),
177202
)
178203
# Otherwise, join the queue
179204
else:
@@ -182,8 +207,11 @@ async def close(self, immediate: bool = False) -> None:
182207
for child in self._children:
183208
await child.close(immediate)
184209
return
210+
# Graceful: wait for drain, propagation and children
185211
await asyncio.gather(
186-
self.queue.join(), *(child.close() for child in self._children)
212+
self.queue.join(),
213+
self.flush(),
214+
*(child.close() for child in self._children),
187215
)
188216

189217
def is_closed(self) -> bool:

tests/server/events/test_event_queue.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ async def test_enqueue_event_propagates_to_children(
160160
await event_queue.enqueue_event(event1)
161161
await event_queue.enqueue_event(event2)
162162

163+
# Wait for all background tasks to complete
164+
await event_queue.flush()
165+
163166
# Check parent queue
164167
assert await event_queue.dequeue_event(no_wait=True) == event1
165168
assert await event_queue.dequeue_event(no_wait=True) == event2
@@ -203,6 +206,36 @@ async def test_enqueue_event_when_closed(
203206
await child_queue.dequeue_event(no_wait=True)
204207

205208

209+
@pytest.mark.asyncio
210+
async def test_event_queue_slow_consumer_does_not_block_parent(
211+
event_queue: EventQueue,
212+
) -> None:
213+
"""Test that a slow or blocked consumer on a tapped queue doesn't block the parent queue."""
214+
child_queue = event_queue.tap()
215+
216+
# Artificially limit the child queue to a size of 1 so it fills up instantly
217+
child_queue.queue = asyncio.Queue(maxsize=1)
218+
219+
# Enqueue first event. It should fit in the child queue.
220+
event1 = create_sample_message('1')
221+
await event_queue.enqueue_event(event1)
222+
223+
# Enqueue second event. The child queue is now full.
224+
# If the parent blocks on `await child_queue.enqueue_event()`, this will hang.
225+
event2 = create_sample_message('2')
226+
try:
227+
# Give it a short timeout. If it hangs, it means the parent is blocked.
228+
await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1)
229+
except asyncio.TimeoutError:
230+
pytest.fail(
231+
'Parent EventQueue was blocked by a full child queue (slow consumer)!'
232+
)
233+
234+
# Clean up to prevent background tasks from leaking or complaining
235+
await child_queue.dequeue_event()
236+
await child_queue.dequeue_event()
237+
238+
206239
@pytest.fixture
207240
def expected_queue_closed_exception() -> type[Exception]:
208241
if sys.version_info < (3, 13):
@@ -420,6 +453,9 @@ async def test_close_immediate_propagates_to_children(
420453
event = create_sample_message()
421454
await event_queue.enqueue_event(event)
422455

456+
# Wait for background propagation to finish
457+
await event_queue.flush()
458+
423459
assert child_queue.is_closed() is False
424460
assert child_queue.queue.empty() is False
425461

@@ -440,6 +476,9 @@ async def test_clear_events_current_queue_only(event_queue: EventQueue) -> None:
440476
await event_queue.enqueue_event(event1)
441477
await event_queue.enqueue_event(event2)
442478

479+
# Wait for all background tasks to complete
480+
await event_queue.flush()
481+
443482
# Clear only parent queue
444483
await event_queue.clear_events(clear_child_queues=False)
445484

0 commit comments

Comments
 (0)