forked from a2aproject/a2a-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_event_consumer.py
More file actions
54 lines (40 loc) · 1.75 KB
/
redis_event_consumer.py
File metadata and controls
54 lines (40 loc) · 1.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from __future__ import annotations
import asyncio
import logging
from typing import Protocol, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from a2a.utils.telemetry import SpanKind, trace_class
class QueueLike(Protocol):
"""Protocol describing a minimal queue-like object used by consumers.
It must provide an async `dequeue_event(no_wait: bool)` method and an
`is_closed()` method.
"""
async def dequeue_event(self, no_wait: bool = False) -> object:
"""Return the next queued event or raise asyncio.QueueEmpty if none when no_wait is True."""
def is_closed(self) -> bool:
"""Return True if the underlying queue has been closed."""
...
logger = logging.getLogger(__name__)
@trace_class(kind=SpanKind.SERVER)
class RedisEventConsumer:
"""Adapter that provides the same consume semantics for a Redis-backed EventQueue.
It wraps a RedisEventQueue instance and exposes methods compatible with
existing code expecting an EventQueue (not strictly required but helpful).
"""
def __init__(self, queue: QueueLike) -> None:
"""Wrap a queue-like object that exposes dequeue_event and is_closed."""
self._queue = queue
async def consume_one(self) -> object:
"""Consume a single event without waiting; raises asyncio.QueueEmpty if none."""
return await self._queue.dequeue_event(no_wait=True)
async def consume_all(self) -> AsyncGenerator:
"""Yield events until the queue is closed."""
while True:
try:
event = await self._queue.dequeue_event()
yield event
if self._queue.is_closed():
break
except asyncio.QueueEmpty:
continue