Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sql = ["sqlalchemy[asyncio,postgresql-asyncpg,aiomysql,aiosqlite]>=2.0.0"]
encryption = ["cryptography>=43.0.0"]
grpc = ["grpcio>=1.60", "grpcio-tools>=1.60", "grpcio_reflection>=1.7.0"]
telemetry = ["opentelemetry-api>=1.33.0", "opentelemetry-sdk>=1.33.0"]
redis = ["redis>=6.4.0"]
Comment thread
holtskinner marked this conversation as resolved.

[project.urls]
homepage = "https://a2a-protocol.org/"
Expand Down
54 changes: 54 additions & 0 deletions src/a2a/server/events/redis_event_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import annotations

import asyncio
import logging

from typing import Protocol, TYPE_CHECKING

Check failure on line 6 in src/a2a/server/events/redis_event_consumer.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (I001)

src/a2a/server/events/redis_event_consumer.py:1:1: I001 Import block is un-sorted or un-formatted
if TYPE_CHECKING:
from collections.abc import AsyncGenerator

from a2a.utils.telemetry import SpanKind, trace_class


class QueueLike(Protocol):
"""Protocol describing a minimal queue-like object used by consumers.

It must provide an async `dequeue_event(no_wait: bool)` method and an
`is_closed()` method.
"""

async def dequeue_event(self, no_wait: bool = False) -> object:
"""Return the next queued event or raise asyncio.QueueEmpty if none when no_wait is True."""

def is_closed(self) -> bool:
"""Return True if the underlying queue has been closed."""
...

logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.SERVER)
class RedisEventConsumer:
"""Adapter that provides the same consume semantics for a Redis-backed EventQueue.

It wraps a RedisEventQueue instance and exposes methods compatible with
existing code expecting an EventQueue (not strictly required but helpful).
"""

def __init__(self, queue: QueueLike) -> None:
"""Wrap a queue-like object that exposes dequeue_event and is_closed."""
self._queue = queue
async def consume_one(self) -> object:
"""Consume a single event without waiting; raises asyncio.QueueEmpty if none."""
return await self._queue.dequeue_event(no_wait=True)

async def consume_all(self) -> AsyncGenerator:
"""Yield events until the queue is closed."""
while True:
try:
event = await self._queue.dequeue_event()
yield event
if self._queue.is_closed():
break
except asyncio.QueueEmpty:
continue
Comment thread
mjunaidca marked this conversation as resolved.
221 changes: 221 additions & 0 deletions src/a2a/server/events/redis_event_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""Redis-backed EventQueue implementation using Redis Streams."""

from __future__ import annotations

import asyncio
import json
import logging
from typing import Any

Check failure on line 8 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (I001)

src/a2a/server/events/redis_event_queue.py:3:1: I001 Import block is un-sorted or un-formatted

try:
import redis.asyncio as aioredis # type: ignore
from redis.exceptions import RedisError # type: ignore

Check failure on line 12 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (I001)

src/a2a/server/events/redis_event_queue.py:11:5: I001 Import block is un-sorted or un-formatted
except ImportError: # pragma: no cover - optional dependency
aioredis = None # type: ignore
RedisError = Exception # type: ignore

from a2a.server.events.event_queue import EventQueue
from typing import TYPE_CHECKING

Check failure on line 18 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (I001)

src/a2a/server/events/redis_event_queue.py:17:1: I001 Import block is un-sorted or un-formatted
if TYPE_CHECKING:
from a2a.server.events.event_queue import Event
from pydantic import ValidationError
from a2a.types import (
Message,
Task,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
)
from a2a.utils.telemetry import SpanKind, trace_class

Check failure on line 28 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (I001)

src/a2a/server/events/redis_event_queue.py:21:1: I001 Import block is un-sorted or un-formatted

logger = logging.getLogger(__name__)


class RedisNotAvailableError(RuntimeError):
"""Raised when the redis.asyncio package is not installed."""


_TYPE_MAP = {
'Message': Message,
'Task': Task,
'TaskStatusUpdateEvent': TaskStatusUpdateEvent,
'TaskArtifactUpdateEvent': TaskArtifactUpdateEvent,
}


@trace_class(kind=SpanKind.SERVER)
class RedisEventQueue(EventQueue):
"""Redis-native EventQueue backed by a Redis Stream.

This implementation does not rely on in-memory queue structures. Each
instance manages its own read cursor (last_id). `tap()` returns a new
RedisEventQueue pointing to the same stream but starting at '$' so it
receives only future events.
"""

def __init__(
self,
task_id: str,
redis_client: Any,
stream_prefix: str = 'a2a:task',
maxlen: int | None = None,
read_block_ms: int = 500,
) -> None:
# Allow passing a custom redis client (e.g. a fake in tests).
if aioredis is None and redis_client is None:
raise RedisNotAvailableError('redis.asyncio is not available')

self._task_id = task_id
self._redis = redis_client
self._stream_key = f'{stream_prefix}:{task_id}'
self._maxlen = maxlen
self._read_block_ms = read_block_ms

# By default a normal queue should start at the beginning so it can
# consume existing entries. Taps will explicitly start at '$'.
self._last_id = '0-0'
self._is_closed = False

# No in-memory queue initialization — this class is Redis-native.

async def enqueue_event(self, event: Event) -> None:
"""Serialize and append an event to the Redis stream."""
if self._is_closed:
logger.warning('Attempt to enqueue to closed RedisEventQueue')
return
# Store payload as a JSON string to avoid client-specific mapping
# behaviour when reading back from the stream.
payload = {
'type': type(event).__name__,
'payload': event.json(),
}
kwargs: dict[str, Any] = {}
if self._maxlen:
kwargs['maxlen'] = self._maxlen
try:
await self._redis.xadd(self._stream_key, payload, **kwargs)
except RedisError:
logger.exception('Failed to XADD event to redis stream')

async def dequeue_event(self, no_wait: bool = False) -> Event | Any:

Check failure on line 99 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (PLR0915)

src/a2a/server/events/redis_event_queue.py:99:15: PLR0915 Too many statements (51 > 50)

Check failure on line 99 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (PLR0912)

src/a2a/server/events/redis_event_queue.py:99:15: PLR0912 Too many branches (16 > 12)
"""Read one event from the Redis stream respecting no_wait semantics.

Returns a parsed pydantic model matching the event type.
"""
if self._is_closed:
raise asyncio.QueueEmpty('Queue is closed')

block = 0 if no_wait else self._read_block_ms
# Keep reading until we find a parseable payload or a CLOSE tombstone.

Check failure on line 108 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`parseable` is not a recognized word. (unrecognized-spelling)
while True:
try:
result = await self._redis.xread(

Check failure on line 111 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`xread` is not a recognized word. (unrecognized-spelling)
{self._stream_key: self._last_id}, block=block, count=1
)
except RedisError:
logger.exception('Failed to XREAD from redis stream')

Check failure on line 115 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`XREAD` is not a recognized word. (unrecognized-spelling)
raise

if not result:
raise asyncio.QueueEmpty

_, entries = result[0]
entry_id, fields = entries[0]
self._last_id = entry_id

# Normalize keys/values: redis may return bytes for both keys and values
norm: dict[str, object] = {}
try:
for k, v in fields.items():
key = k.decode('utf-8') if isinstance(k, (bytes, bytearray)) else k

Check failure on line 129 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (UP038)

src/a2a/server/events/redis_event_queue.py:129:48: UP038 Use `X | Y` in `isinstance` call instead of `(X, Y)`
if isinstance(v, (bytes, bytearray)):

Check failure on line 130 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (UP038)

src/a2a/server/events/redis_event_queue.py:130:24: UP038 Use `X | Y` in `isinstance` call instead of `(X, Y)`
try:
val: object = v.decode('utf-8')
except Exception:

Check failure on line 133 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (BLE001)

src/a2a/server/events/redis_event_queue.py:133:32: BLE001 Do not catch blind exception: `Exception`
val = v
else:
val = v
norm[str(key)] = val
except Exception:
# Defensive: if normalization fails, skip this entry and continue
logger.debug('RedisEventQueue.dequeue_event: failed to normalize entry fields, skipping %s', entry_id)
continue

evt_type = norm.get('type')

Check failure on line 143 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)

# Handle tombstone/close message
if evt_type == 'CLOSE':

Check failure on line 146 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)

Check warning on line 146 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)
self._is_closed = True
raise asyncio.QueueEmpty('Queue closed')

raw_payload = norm.get('payload')
if raw_payload is None:
# Missing payload — likely due to key mismatch or malformed entry.
# Skip and continue to next entry instead of returning None to callers.
logger.debug('RedisEventQueue.dequeue_event: skipping entry %s with missing payload', entry_id)
# continue loop to read next entry
continue

# If payload is a JSON string, parse it; otherwise, use as-is.
if isinstance(raw_payload, str):
try:
data = json.loads(raw_payload)
except json.JSONDecodeError:
data = raw_payload
else:
data = raw_payload

model = _TYPE_MAP.get(evt_type)

Check failure on line 167 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Argument of type "object | None" cannot be assigned to parameter "key" of type "str" in function "get"   Type "object | None" is not assignable to type "str"     "object" is not assignable to "str" (reportArgumentType)

Check failure on line 167 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)

Check warning on line 167 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)
if model:
try:
return model.parse_obj(data)
except ValidationError as exc:
logger.exception('Failed to parse event payload into model')
raise ValueError(f'Failed to parse event of type {evt_type}') from exc

Check failure on line 173 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)

Check warning on line 173 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)

# Unknown type — return raw data for flexibility
logger.debug('Unknown event type: %s, returning raw payload', evt_type)

Check failure on line 176 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)

Check warning on line 176 in src/a2a/server/events/redis_event_queue.py

View workflow job for this annotation

GitHub Actions / Check Spelling

`evt` is not a recognized word. (unrecognized-spelling)
return data

def task_done(self) -> None: # streams do not require task_done semantics
"""No-op for Redis streams (kept for API compatibility)."""

def tap(self) -> EventQueue:
"""Return a new RedisEventQueue that starts at the stream tail ('$')."""
q = RedisEventQueue(
task_id=self._task_id,
redis_client=self._redis,
stream_prefix=self._stream_key.rsplit(':', 1)[0],
maxlen=self._maxlen,
read_block_ms=self._read_block_ms,
)
# Set tap's cursor to the current last entry id so it receives only
# events appended after this point.
try:
lst = getattr(self._redis, 'streams', {}).get(self._stream_key, [])
if lst:
q._last_id = lst[-1][0]
else:
q._last_id = '0-0'
except (AttributeError, KeyError, IndexError, TypeError):
# Fallback: start at stream tail if we can't determine the last ID
q._last_id = '$'
Comment thread
mjunaidca marked this conversation as resolved.
Outdated
Comment on lines +219 to +228
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The tap method includes a special code path for FakeRedis by checking hasattr(self._redis, 'streams'). This mixes production logic with testing concerns, which can make the code harder to maintain. It's generally better to have test fakes that fully conform to the real object's interface or to use dependency injection for testing, rather than embedding test-specific logic in the production code. Consider refactoring this to keep the implementation clean.

return q

async def close(self, immediate: bool = False) -> None:
"""Mark the stream closed and publish a tombstone entry for readers."""
try:
await self._redis.set(f'{self._stream_key}:closed', '1')
await self._redis.xadd(self._stream_key, {'type': 'CLOSE'})
Comment thread
mjunaidca marked this conversation as resolved.
except RedisError:
logger.exception('Failed to write close marker to redis')

def is_closed(self) -> bool:
"""Return True if this queue has been closed (close() called)."""
return self._is_closed

async def clear_events(self, clear_child_queues: bool = True) -> None:
"""Attempt to remove the underlying redis stream (best-effort)."""
try:
await self._redis.delete(self._stream_key)
except RedisError:
logger.exception('Failed to delete redis stream during clear_events')
Loading
Loading