Skip to content

feat: Add Redis-backed QueueManager for production deployments#447

Closed
mjunaidca wants to merge 11 commits intoa2aproject:mainfrom
mjunaidca:feat/redis-queue-manager
Closed

feat: Add Redis-backed QueueManager for production deployments#447
mjunaidca wants to merge 11 commits intoa2aproject:mainfrom
mjunaidca:feat/redis-queue-manager

Conversation

@mjunaidca
Copy link
Copy Markdown

@mjunaidca mjunaidca commented Aug 30, 2025

Description

This PR adds Redis QueueManager to the A2A Python SDK, enabling production deployments in distributed environments like Kubernetes.

Changes Included:

Core Redis Components:

  • RedisEventQueue: Redis Streams-based event queue implementation
  • RedisQueueManager: Distributed queue manager for production use
  • RedisEventConsumer: Consumer for Redis stream events
  • RedisRequestHandler: Request handler with Redis integration

Key Features:

  • Distributed Architecture: Works seamlessly across multiple pods/containers
  • Redis Streams: Leverages Redis Streams for reliable event persistence
  • Production Ready: Handles pod restarts, scaling, and network partitions
  • Backward Compatible: Maintains existing API with deprecation warnings
  • Comprehensive Tests: Full test coverage for all Redis components
  • Environment Controls: Configurable strict mode for production deployments

Installation

# Install with Redis support
pip install a2a-sdk[redis]
# or
uv add a2a-sdk[redis]

Infrastructure Requirements:

  • Redis server (widely available in cloud platforms)
  • redis Python package (optional dependency)

Usage Example:

import os
import redis.asyncio as aioredis
from a2a.server.events.redis_queue_manager import RedisQueueManager
from a2a.server.request_handlers.redis_request_handler import create_redis_request_handler
from a2a.server.tasks import InMemoryTaskStore

# Connect to Redis
redis_client = aioredis.from_url(
    os.getenv("REDIS_URL", "redis://localhost:6379"),
    decode_responses=True,
    max_connections=20
)

# Create Redis-backed queue manager
queue_manager = RedisQueueManager(
    redis_client=redis_client, 
    stream_prefix="a2a:task"
)

# Create request handler with Redis integration
request_handler = create_redis_request_handler(
    agent_executor=your_agent_executor,
    task_store=InMemoryTaskStore(),
    redis_client=redis_client,
    stream_prefix="a2a:task"
)

# Use with A2A server
app = A2AFastAPIApplication(
    agent_card=your_agent_card,
    http_handler=request_handler
).build()

Testing

All components include comprehensive unit tests covering:

  • Event enqueue/dequeue operations
  • Complex data serialization (Unicode, nested objects)
  • FIFO ordering guarantees
  • Tap functionality for multiple consumers
  • Error handling and edge cases
  • Distributed operation scenarios

Fixes #446
Fixes #86

Breaking Changes

None. This is a pure addition with backward compatibility maintained through deprecation warnings.

- Add RedisEventQueue for Redis Streams-based event queuing
- Add RedisQueueManager for distributed queue management
- Add RedisEventConsumer for consuming Redis stream events
- Add RedisRequestHandler for Redis-backed request handling
- Add comprehensive test coverage for all Redis components
- Update DefaultRequestHandler with backward compatibility
- Add environment variable controls for strict deployment modes

This implementation enables production deployments in distributed environments like Kubernetes, addressing the limitation of only having InMemoryQueueManager which cannot be used in multi-pod setups.

Redis is widely used in agentic AI platforms like LangGraph and provides reliable, scalable event streaming for serverless and distributed architectures.
@mjunaidca mjunaidca requested a review from a team as a code owner August 30, 2025 10:58
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @mjunaidca, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the A2A Python SDK by integrating Redis-backed queue management. The primary goal is to facilitate seamless and reliable operation in distributed production environments, ensuring event persistence and robust handling of various deployment challenges. This change provides a scalable and resilient foundation for managing asynchronous tasks and events within the SDK.

Highlights

  • New Redis-backed Queue Management: Introduces RedisQueueManager and RedisEventQueue to enable robust, distributed event queuing using Redis Streams, specifically designed for production deployments in environments like Kubernetes.
  • Core Redis Components: Adds RedisEventQueue (Redis Streams-based event queue), RedisQueueManager (distributed queue manager), RedisEventConsumer (consumer for Redis stream events), and RedisRequestHandler (request handler with Redis integration).
  • Production Readiness & Scalability: The new Redis integration supports distributed architectures, leverages Redis Streams for reliable event persistence, handles pod restarts, scaling, and network partitions, making the SDK production-ready.
  • Backward Compatibility & Deprecation: Maintains backward compatibility with existing APIs, providing deprecation warnings for the InMemoryQueueManager fallback. An environment variable (A2A_DISABLE_QUEUE_MANAGER_FALLBACK) is introduced to enforce explicit QueueManager instance passing in strict production setups.
  • Comprehensive Testing: Includes full test coverage for all new Redis components, covering event operations, data serialization, FIFO ordering, tap functionality, error handling, and distributed scenarios.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a Redis-backed QueueManager, a significant feature for enabling production deployments in distributed environments. The implementation is comprehensive, covering the core Redis components, extensive tests, and backward compatibility. My review focuses on improving robustness, efficiency, and maintainability. Key feedback includes fixing a potential infinite loop in the RedisEventConsumer, simplifying the RedisEventQueue implementation by removing test-specific logic, improving the efficiency and consistency of the RedisStreamInjector, and enhancing type safety. Overall, this is a strong addition, and addressing these points will further solidify the production-readiness of the new components.

Comment thread src/a2a/server/events/redis_event_consumer.py
Comment thread src/a2a/utils/stream_write/redis_stream_writer.py
Comment thread src/a2a/server/events/redis_event_queue.py Outdated
Comment thread src/a2a/server/events/redis_event_queue.py
Comment thread src/a2a/server/events/redis_queue_manager.py
Comment thread src/a2a/utils/stream_write/redis_stream_writer.py
Comment thread src/a2a/utils/stream_write/redis_stream_writer.py Outdated
Comment thread src/a2a/utils/stream_write/redis_stream_writer.py Outdated
Comment thread tests/server/events/test_redis_event_queue.py Outdated
Comment thread tests/server/events/test_redis_queue_manager.py Outdated
pilartomas and others added 5 commits August 30, 2025 18:46
# Description

The A2A client is receiving `unknown` state over REST transport while it
should receive `auth_required`.

- [x] Follow the [`CONTRIBUTING`
Guide](https://github.com/a2aproject/a2a-python/blob/main/CONTRIBUTING.md).
- [x] Make your Pull Request title in the
<https://www.conventionalcommits.org/> specification.
- Important Prefixes for
[release-please](https://github.com/googleapis/release-please):
- `fix:` which represents bug fixes, and correlates to a
[SemVer](https://semver.org/) patch.
- `feat:` represents a new feature, and correlates to a SemVer minor.
- `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking
change (indicated by the `!`) and will result in a SemVer major.
- [ ] Ensure the tests and linter pass (Run `bash scripts/format.sh`
from the repository root to format)
- [ ] Appropriate docs were updated (if necessary)

---------

Signed-off-by: Tomas Pilar <thomas7pilar@gmail.com>
…a2aproject#441)

# Description

Thank you for opening a Pull Request!
Before submitting your PR, there are a few things you can do to make
sure it goes smoothly:

- [ ] Follow the [`CONTRIBUTING`
Guide](https://github.com/a2aproject/a2a-python/blob/main/CONTRIBUTING.md).
- [ ] Make your Pull Request title in the
<https://www.conventionalcommits.org/> specification.
- Important Prefixes for
[release-please](https://github.com/googleapis/release-please):
- `fix:` which represents bug fixes, and correlates to a
[SemVer](https://semver.org/) patch.
- `feat:` represents a new feature, and correlates to a SemVer minor.
- `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking
change (indicated by the `!`) and will result in a SemVer major.
- [ ] Ensure the tests and linter pass (Run `bash scripts/format.sh`
from the repository root to format)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
@mjunaidca mjunaidca force-pushed the feat/redis-queue-manager branch from da9a390 to 002e049 Compare August 30, 2025 15:08
- Updated test cases in `test_redis_queue_manager.py` to improve structure and readability.
- Added tests for handling None redis_client and multiple taps on the same task.
- Introduced logging tests to verify logging behavior during queue operations.
- Added error handling tests for closing queues and creating/tapping RedisEventQueue.
- Created new test suite `test_redis_stream_writer.py` to cover RedisStreamInjector functionality.
- Enhanced existing tests in `test_default_request_handler.py` and `test_redis_request_handler.py` for consistency and clarity.
@holtskinner holtskinner changed the title feat: Add Redis-backed QueueManager for production deployments feat: Add Redis-backed QueueManager for production deployments Sep 9, 2025
Copy link
Copy Markdown
Member

@holtskinner holtskinner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjunaidca I fixed the Test errors by adding a meta-extra all to the install and the Unit Test GitHub Action

Please address the linter errors and we can review further.

@holtskinner
Copy link
Copy Markdown
Member

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-implemented feature: a Redis-backed QueueManager to support production deployments in distributed environments. The new components like RedisEventQueue, RedisQueueManager, and RedisEventConsumer are well-structured and come with comprehensive tests. The changes are backward compatible, which is excellent. My review includes a few suggestions to enhance maintainability, improve type safety, and refine logging, along with a minor correction in a test case.

Comment thread pyproject.toml
Comment on lines +219 to +228
if hasattr(self._redis, 'streams'):
lst = self._redis.streams.get(self._stream_key, [])
if lst:
max_id = max(int(eid.split('-')[0]) for eid, _ in lst)
q._last_id = f'{max_id}-0'
else:
q._last_id = '0'
else:
# For real Redis, use '$' as approximation
q._last_id = '$'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The tap method includes a special code path for FakeRedis by checking hasattr(self._redis, 'streams'). This mixes production logic with testing concerns, which can make the code harder to maintain. It's generally better to have test fakes that fully conform to the real object's interface or to use dependency injection for testing, rather than embedding test-specific logic in the production code. Consider refactoring this to keep the implementation clean.

Comment on lines +122 to +123
logger.info('RedisEventQueue value: %s', RedisEventQueue)
logger.info('RedisEventQueue type: %s', type(RedisEventQueue))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

These logger.info calls appear to be for debugging purposes. Logging the type and value of RedisEventQueue at the INFO level is likely too verbose for production environments. Consider changing them to DEBUG level or removing them if they are no longer needed.

Comment on lines +12 to +16
agent_executor: Any,
task_store: Any,
redis_client: Any,
stream_prefix: str = 'a2a:task',
**kwargs: Any,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The type hints for agent_executor, task_store, and redis_client are Any. For better type safety and code clarity, consider using more specific types. You can use forward references (strings) and a TYPE_CHECKING block to import the types without causing circular dependencies. For example:

from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from a2a.server.agent_execution import AgentExecutor
    from a2a.server.tasks import TaskStore
    try:
        from redis.asyncio import Redis as RedisClient
    except ImportError:
        RedisClient = Any
Suggested change
agent_executor: Any,
task_store: Any,
redis_client: Any,
stream_prefix: str = 'a2a:task',
**kwargs: Any,
agent_executor: "AgentExecutor",
task_store: "TaskStore",
redis_client: "RedisClient",
stream_prefix: str = 'a2a:task',
**kwargs: Any,

Comment on lines +160 to +163
# Next iteration should stop
with pytest.raises(StopAsyncIteration):
await anext(it)
assert results == [1, 2]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

This test's logic seems slightly off. The consume_all implementation will yield one more item after q._closed is set to True before the loop breaks. The test should account for this by consuming one more item before asserting StopAsyncIteration.

Suggested change
# Next iteration should stop
with pytest.raises(StopAsyncIteration):
await anext(it)
assert results == [1, 2]
# The next iteration will yield one more item before stopping
results.append(await anext(it))
with pytest.raises(StopAsyncIteration):
await anext(it)
assert results == [1, 2, 3]

@holtskinner
Copy link
Copy Markdown
Member

Thanks for the contribution, however the A2A SDK is designed to not be tied to specific vendors/proprietary systems. See this response from #269 (comment)

Sorry was OOO for the long weekend. To clarify a few things:

  • The DatabaseTaskStore is not tied to a specific vendor. It uses sqlalchemy library which allows for many DB implementations to be directly used.
  • I agree we should not have a vendor specific implementation in the SDK. The pattern we are following is to define the right interfaces/logic in the SDK and show a proof of concept with a specific vendor solution in the samples repo. This way you can verify your abstractions are valid for at least one vendor, and hopefully others can implement different versions for different vendors until the SDK abstractions are correct.

To that end, I think it makes sense to commit an ProducerConsumerQueueManager or similar which provides the right logic and abstractions around using an distributed producer/consumer paradigm with the QueueManager interface. Then you can implement the redis based producer/consumer instance (in the samples repo) to prove the end-to-end behavior

If you would like, it would be great to create a separate a2a-redis Python Library which includes your extra modules.

@holtskinner holtskinner closed this Sep 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feat]: Add Redis-backed QueueManager for Production Deployments [Feat]: Regarding the implementation method of the abstract method in queue_manager

5 participants