22import logging
33import sys
44
5+ from abc import ABC , abstractmethod
56from types import TracebackType
6- from typing import Any
7+ from typing import Any , cast
78
89from typing_extensions import Self
910
@@ -46,8 +47,129 @@ def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
4647DEFAULT_MAX_QUEUE_SIZE = 1024
4748
4849
50+ class EventQueue (ABC ):
51+ """Base class and factory for EventQueueSource.
52+
53+ EventQueue provides an abstraction for a queue of events that can be tapped
54+ by multiple consumers.
55+ EventQueue maintain main queue and source and maintain child queues in sync.
56+ GUARANTEE: All sinks (including the default one) will receive events in the exact same order.
57+
58+ WARNING (Concurrency): All events from all sinks (both the default queue and any
59+ tapped child queues) must be regularly consumed and marked as done. If any single
60+ consumer stops processing and its queue reaches capacity, it can block the event
61+ dispatcher and stall the entire system, causing a widespread deadlock.
62+
63+ WARNING (Memory Leak): Event queues spawn background tasks. To prevent memory
64+ and task leaks, all queue objects (both source and sinks) MUST be explicitly
65+ closed via `await queue.close()` or by using the async context manager (`async with queue:`).
66+ Child queues are automatically closed when parent queue is closed, but you
67+ should still close them explicitly to prevent queues from reaching capacity by
68+ unconsumed events.
69+
70+ Typical usage:
71+ queue = EventQueue()
72+ child_queue1 = await queue.tap()
73+ child_queue2 = await queue.tap()
74+
75+ async for event in child_queue1:
76+ do_some_work(event)
77+ child_queue1.task_done()
78+ """
79+
80+ def __new__ (cls , * args : Any , ** kwargs : Any ) -> Self :
81+ """Redirects instantiation to EventQueueLegacy for backwards compatibility."""
82+ if cls is EventQueue :
83+ instance = EventQueueLegacy .__new__ (EventQueueLegacy )
84+ EventQueueLegacy .__init__ (instance , * args , ** kwargs )
85+ return cast ('Self' , instance )
86+ return super ().__new__ (cls )
87+
88+ @abstractmethod
89+ async def enqueue_event (self , event : Event ) -> None :
90+ """Pushes an event into the queue.
91+
92+ Only main queue can enqueue events. Child queues can only dequeue events.
93+ """
94+ raise NotImplementedError
95+
96+ @abstractmethod
97+ async def dequeue_event (self ) -> Event :
98+ """Pulls an event from the queue."""
99+ raise NotImplementedError
100+
101+ @abstractmethod
102+ def task_done (self ) -> None :
103+ """Signals that a work on dequeued event is complete."""
104+ raise NotImplementedError
105+
106+ @abstractmethod
107+ async def tap (
108+ self , max_queue_size : int = DEFAULT_MAX_QUEUE_SIZE
109+ ) -> 'EventQueue' :
110+ """Creates a child queue that receives future events.
111+
112+ Note: The tapped queue may receive some old events if the incoming event
113+ queue is lagging behind and hasn't dispatched them yet.
114+ """
115+ raise NotImplementedError
116+
117+ @abstractmethod
118+ async def close (self , immediate : bool = False ) -> None :
119+ """Closes the queue.
120+
121+ For parent queue: it closes the main queue and all its child queues.
122+ For child queue: it closes only child queue.
123+
124+ It is safe to call it multiple times.
125+ If immediate is True, the queue will be closed without waiting for all events to be processed.
126+ If immediate is False, the queue will be closed after all events are processed (and confirmed with task_done() calls).
127+
128+ WARNING: Closing the parent queue with immediate=False is a deadlock risk if there are unconsumed events
129+ in any of the child sinks and the consumer has crashed without draining its queue.
130+ It is highly recommended to wrap graceful shutdowns with a timeout, e.g.,
131+ `asyncio.wait_for(queue.close(immediate=False), timeout=...)`.
132+ """
133+ raise NotImplementedError
134+
135+ @abstractmethod
136+ def is_closed (self ) -> bool :
137+ """[DEPRECATED] Checks if the queue is closed.
138+
139+ NOTE: Relying on this for enqueue logic introduces race conditions.
140+ It is maintained primarily for backwards compatibility, workarounds for
141+ Python 3.10/3.12 async queues in consumers, and for the test suite.
142+ """
143+ raise NotImplementedError
144+
145+ @abstractmethod
146+ async def __aenter__ (self ) -> Self :
147+ """Enters the async context manager, returning the queue itself.
148+
149+ WARNING: See `__aexit__` for important deadlock risks associated with
150+ exiting this context manager if unconsumed events remain.
151+ """
152+ raise NotImplementedError
153+
154+ @abstractmethod
155+ async def __aexit__ (
156+ self ,
157+ exc_type : type [BaseException ] | None ,
158+ exc_val : BaseException | None ,
159+ exc_tb : TracebackType | None ,
160+ ) -> None :
161+ """Exits the async context manager, ensuring close() is called.
162+
163+ WARNING: The context manager calls `close(immediate=False)` by default.
164+ If a consumer exits the `async with` block early (e.g., due to an exception
165+ or an explicit `break`) while unconsumed events remain in the queue,
166+ `__aexit__` will deadlock waiting for `task_done()` to be called on those events.
167+ """
168+ raise NotImplementedError
169+
170+
49171@trace_class (kind = SpanKind .SERVER )
50- class EventQueue :
172+ class EventQueueLegacy ( EventQueue ) :
51173 """Event queue for A2A responses from agent.
52174
53175 Acts as a buffer between the agent's asynchronous execution and the
@@ -63,14 +185,19 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
63185 if max_queue_size <= 0 :
64186 raise ValueError ('max_queue_size must be greater than 0' )
65187
66- self .queue : AsyncQueue [Event ] = _create_async_queue (
188+ self ._queue : AsyncQueue [Event ] = _create_async_queue (
67189 maxsize = max_queue_size
68190 )
69191 self ._children : list [EventQueue ] = []
70192 self ._is_closed = False
71193 self ._lock = asyncio .Lock ()
72194 logger .debug ('EventQueue initialized.' )
73195
196+ @property
197+ def queue (self ) -> AsyncQueue [Event ]:
198+ """[DEPRECATED] Returns the underlying asyncio.Queue."""
199+ return self ._queue
200+
74201 async def __aenter__ (self ) -> Self :
75202 """Enters the async context manager, returning the queue itself."""
76203 return self
@@ -106,7 +233,7 @@ async def enqueue_event(self, event: Event) -> None:
106233 for child in self ._children :
107234 await child .enqueue_event (event )
108235
109- async def dequeue_event (self , no_wait : bool = False ) -> Event :
236+ async def dequeue_event (self ) -> Event :
110237 """Dequeues an event from the queue.
111238
112239 This implementation expects that dequeue to raise an exception when
@@ -115,38 +242,23 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
115242 the user is awaiting the queue.get method. Python<=3.12 this needs to
116243 manage this lifecycle itself. The current implementation can lead to
117244 blocking if the dequeue_event is called before the EventQueue has been
118- closed but when there are no events on the queue. Two ways to avoid this
119- are to call this with no_wait = True which won't block, but is the
120- callers responsibility to retry as appropriate. Alternatively, one can
121- use an async Task management solution to cancel the get task if the queue
245+ closed but when there are no events on the queue. One way to avoid this
246+ is to use an async Task management solution to cancel the get task if the queue
122247 has closed or some other condition is met. The implementation of the
123248 EventConsumer uses an async.wait with a timeout to abort the
124249 dequeue_event call and retry, when it will return with a closed error.
125250
126- Args:
127- no_wait: If True, retrieve an event immediately or raise `asyncio.QueueEmpty`.
128- If False (default), wait until an event is available.
129-
130251 Returns:
131252 The next event from the queue.
132253
133254 Raises:
134- asyncio.QueueEmpty: If `no_wait` is True and the queue is empty.
135255 asyncio.QueueShutDown: If the queue has been closed and is empty.
136256 """
137257 async with self ._lock :
138258 if self ._is_closed and self .queue .empty ():
139259 logger .warning ('Queue is closed. Event will not be dequeued.' )
140260 raise QueueShutDown ('Queue is closed.' )
141261
142- if no_wait :
143- logger .debug ('Attempting to dequeue event (no_wait=True).' )
144- event = self .queue .get_nowait ()
145- logger .debug (
146- 'Dequeued event (no_wait=True) of type: %s' , type (event )
147- )
148- return event
149-
150262 logger .debug ('Attempting to dequeue event (waiting).' )
151263 event = await self .queue .get ()
152264 logger .debug ('Dequeued event (waited) of type: %s' , type (event ))
@@ -160,15 +272,17 @@ def task_done(self) -> None:
160272 logger .debug ('Marking task as done in EventQueue.' )
161273 self .queue .task_done ()
162274
163- def tap (self ) -> 'EventQueue' :
164- """Taps the event queue to create a new child queue that receives all future events.
275+ async def tap (
276+ self , max_queue_size : int = DEFAULT_MAX_QUEUE_SIZE
277+ ) -> 'EventQueueLegacy' :
278+ """Taps the event queue to create a new child queue that receives future events.
165279
166280 Returns:
167281 A new `EventQueue` instance that will receive all events enqueued
168282 to this parent queue from this point forward.
169283 """
170284 logger .debug ('Tapping EventQueue to create a child queue.' )
171- queue = EventQueue ( )
285+ queue = EventQueueLegacy ( max_queue_size = max_queue_size )
172286 self ._children .append (queue )
173287 return queue
174288
@@ -199,48 +313,3 @@ async def close(self, immediate: bool = False) -> None:
199313 def is_closed (self ) -> bool :
200314 """Checks if the queue is closed."""
201315 return self ._is_closed
202-
203- async def clear_events (self , clear_child_queues : bool = True ) -> None :
204- """Clears all events from the current queue and optionally all child queues.
205-
206- This method removes all pending events from the queue without processing them.
207- Child queues can be optionally cleared based on the clear_child_queues parameter.
208-
209- Args:
210- clear_child_queues: If True (default), clear all child queues as well.
211- If False, only clear the current queue, leaving child queues untouched.
212- """
213- logger .debug ('Clearing all events from EventQueue and child queues.' )
214-
215- # Clear all events from the queue, even if closed
216- cleared_count = 0
217- async with self ._lock :
218- try :
219- while True :
220- event = self .queue .get_nowait ()
221- logger .debug (
222- 'Discarding unprocessed event of type: %s, content: %s' ,
223- type (event ),
224- event ,
225- )
226- self .queue .task_done ()
227- cleared_count += 1
228- except asyncio .QueueEmpty :
229- pass
230- except QueueShutDown :
231- pass
232-
233- if cleared_count > 0 :
234- logger .debug (
235- 'Cleared %d unprocessed events from EventQueue.' ,
236- cleared_count ,
237- )
238-
239- # Clear all child queues (lock released before awaiting child tasks)
240- if clear_child_queues and self ._children :
241- child_tasks = [
242- asyncio .create_task (child .clear_events ())
243- for child in self ._children
244- ]
245-
246- await asyncio .gather (* child_tasks , return_exceptions = True )
0 commit comments