feat: Add Redis-backed QueueManager for production deployments#447
feat: Add Redis-backed QueueManager for production deployments#447mjunaidca wants to merge 11 commits intoa2aproject:mainfrom
QueueManager for production deployments#447Conversation
- Add RedisEventQueue for Redis Streams-based event queuing - Add RedisQueueManager for distributed queue management - Add RedisEventConsumer for consuming Redis stream events - Add RedisRequestHandler for Redis-backed request handling - Add comprehensive test coverage for all Redis components - Update DefaultRequestHandler with backward compatibility - Add environment variable controls for strict deployment modes This implementation enables production deployments in distributed environments like Kubernetes, addressing the limitation of only having InMemoryQueueManager which cannot be used in multi-pod setups. Redis is widely used in agentic AI platforms like LangGraph and provides reliable, scalable event streaming for serverless and distributed architectures.
There was a problem hiding this comment.
Summary of Changes
Hello @mjunaidca, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request significantly enhances the A2A Python SDK by integrating Redis-backed queue management. The primary goal is to facilitate seamless and reliable operation in distributed production environments, ensuring event persistence and robust handling of various deployment challenges. This change provides a scalable and resilient foundation for managing asynchronous tasks and events within the SDK.
Highlights
- New Redis-backed Queue Management: Introduces
RedisQueueManagerandRedisEventQueueto enable robust, distributed event queuing using Redis Streams, specifically designed for production deployments in environments like Kubernetes. - Core Redis Components: Adds
RedisEventQueue(Redis Streams-based event queue),RedisQueueManager(distributed queue manager),RedisEventConsumer(consumer for Redis stream events), andRedisRequestHandler(request handler with Redis integration). - Production Readiness & Scalability: The new Redis integration supports distributed architectures, leverages Redis Streams for reliable event persistence, handles pod restarts, scaling, and network partitions, making the SDK production-ready.
- Backward Compatibility & Deprecation: Maintains backward compatibility with existing APIs, providing deprecation warnings for the
InMemoryQueueManagerfallback. An environment variable (A2A_DISABLE_QUEUE_MANAGER_FALLBACK) is introduced to enforce explicitQueueManagerinstance passing in strict production setups. - Comprehensive Testing: Includes full test coverage for all new Redis components, covering event operations, data serialization, FIFO ordering, tap functionality, error handling, and distributed scenarios.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Code Review
This pull request introduces a Redis-backed QueueManager, a significant feature for enabling production deployments in distributed environments. The implementation is comprehensive, covering the core Redis components, extensive tests, and backward compatibility. My review focuses on improving robustness, efficiency, and maintainability. Key feedback includes fixing a potential infinite loop in the RedisEventConsumer, simplifying the RedisEventQueue implementation by removing test-specific logic, improving the efficiency and consistency of the RedisStreamInjector, and enhancing type safety. Overall, this is a strong addition, and addressing these points will further solidify the production-readiness of the new components.
# Description The A2A client is receiving `unknown` state over REST transport while it should receive `auth_required`. - [x] Follow the [`CONTRIBUTING` Guide](https://github.com/a2aproject/a2a-python/blob/main/CONTRIBUTING.md). - [x] Make your Pull Request title in the <https://www.conventionalcommits.org/> specification. - Important Prefixes for [release-please](https://github.com/googleapis/release-please): - `fix:` which represents bug fixes, and correlates to a [SemVer](https://semver.org/) patch. - `feat:` represents a new feature, and correlates to a SemVer minor. - `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking change (indicated by the `!`) and will result in a SemVer major. - [ ] Ensure the tests and linter pass (Run `bash scripts/format.sh` from the repository root to format) - [ ] Appropriate docs were updated (if necessary) --------- Signed-off-by: Tomas Pilar <thomas7pilar@gmail.com>
…a2aproject#441) # Description Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Follow the [`CONTRIBUTING` Guide](https://github.com/a2aproject/a2a-python/blob/main/CONTRIBUTING.md). - [ ] Make your Pull Request title in the <https://www.conventionalcommits.org/> specification. - Important Prefixes for [release-please](https://github.com/googleapis/release-please): - `fix:` which represents bug fixes, and correlates to a [SemVer](https://semver.org/) patch. - `feat:` represents a new feature, and correlates to a SemVer minor. - `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking change (indicated by the `!`) and will result in a SemVer major. - [ ] Ensure the tests and linter pass (Run `bash scripts/format.sh` from the repository root to format) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
da9a390 to
002e049
Compare
- Updated test cases in `test_redis_queue_manager.py` to improve structure and readability. - Added tests for handling None redis_client and multiple taps on the same task. - Introduced logging tests to verify logging behavior during queue operations. - Added error handling tests for closing queues and creating/tapping RedisEventQueue. - Created new test suite `test_redis_stream_writer.py` to cover RedisStreamInjector functionality. - Enhanced existing tests in `test_default_request_handler.py` and `test_redis_request_handler.py` for consistency and clarity.
…mprove error handling
QueueManager for production deployments
holtskinner
left a comment
There was a problem hiding this comment.
@mjunaidca I fixed the Test errors by adding a meta-extra all to the install and the Unit Test GitHub Action
Please address the linter errors and we can review further.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-implemented feature: a Redis-backed QueueManager to support production deployments in distributed environments. The new components like RedisEventQueue, RedisQueueManager, and RedisEventConsumer are well-structured and come with comprehensive tests. The changes are backward compatible, which is excellent. My review includes a few suggestions to enhance maintainability, improve type safety, and refine logging, along with a minor correction in a test case.
| if hasattr(self._redis, 'streams'): | ||
| lst = self._redis.streams.get(self._stream_key, []) | ||
| if lst: | ||
| max_id = max(int(eid.split('-')[0]) for eid, _ in lst) | ||
| q._last_id = f'{max_id}-0' | ||
| else: | ||
| q._last_id = '0' | ||
| else: | ||
| # For real Redis, use '$' as approximation | ||
| q._last_id = '$' |
There was a problem hiding this comment.
The tap method includes a special code path for FakeRedis by checking hasattr(self._redis, 'streams'). This mixes production logic with testing concerns, which can make the code harder to maintain. It's generally better to have test fakes that fully conform to the real object's interface or to use dependency injection for testing, rather than embedding test-specific logic in the production code. Consider refactoring this to keep the implementation clean.
| logger.info('RedisEventQueue value: %s', RedisEventQueue) | ||
| logger.info('RedisEventQueue type: %s', type(RedisEventQueue)) |
There was a problem hiding this comment.
| agent_executor: Any, | ||
| task_store: Any, | ||
| redis_client: Any, | ||
| stream_prefix: str = 'a2a:task', | ||
| **kwargs: Any, |
There was a problem hiding this comment.
The type hints for agent_executor, task_store, and redis_client are Any. For better type safety and code clarity, consider using more specific types. You can use forward references (strings) and a TYPE_CHECKING block to import the types without causing circular dependencies. For example:
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from a2a.server.agent_execution import AgentExecutor
from a2a.server.tasks import TaskStore
try:
from redis.asyncio import Redis as RedisClient
except ImportError:
RedisClient = Any| agent_executor: Any, | |
| task_store: Any, | |
| redis_client: Any, | |
| stream_prefix: str = 'a2a:task', | |
| **kwargs: Any, | |
| agent_executor: "AgentExecutor", | |
| task_store: "TaskStore", | |
| redis_client: "RedisClient", | |
| stream_prefix: str = 'a2a:task', | |
| **kwargs: Any, |
| # Next iteration should stop | ||
| with pytest.raises(StopAsyncIteration): | ||
| await anext(it) | ||
| assert results == [1, 2] |
There was a problem hiding this comment.
This test's logic seems slightly off. The consume_all implementation will yield one more item after q._closed is set to True before the loop breaks. The test should account for this by consuming one more item before asserting StopAsyncIteration.
| # Next iteration should stop | |
| with pytest.raises(StopAsyncIteration): | |
| await anext(it) | |
| assert results == [1, 2] | |
| # The next iteration will yield one more item before stopping | |
| results.append(await anext(it)) | |
| with pytest.raises(StopAsyncIteration): | |
| await anext(it) | |
| assert results == [1, 2, 3] |
|
Thanks for the contribution, however the A2A SDK is designed to not be tied to specific vendors/proprietary systems. See this response from #269 (comment)
If you would like, it would be great to create a separate a2a-redis Python Library which includes your extra modules. |
Description
This PR adds Redis
QueueManagerto the A2A Python SDK, enabling production deployments in distributed environments like Kubernetes.Changes Included:
Core Redis Components:
RedisEventQueue: Redis Streams-based event queue implementationRedisQueueManager: Distributed queue manager for production useRedisEventConsumer: Consumer for Redis stream eventsRedisRequestHandler: Request handler with Redis integrationKey Features:
Installation
Infrastructure Requirements:
redisPython package (optional dependency)Usage Example:
Testing
All components include comprehensive unit tests covering:
Fixes #446
Fixes #86
Breaking Changes
None. This is a pure addition with backward compatibility maintained through deprecation warnings.