Skip to content

kafka#389

Closed
bigxin0124 wants to merge 1 commit intoa2aproject:mainfrom
bigxin0124:feat/kafka
Closed

kafka#389
bigxin0124 wants to merge 1 commit intoa2aproject:mainfrom
bigxin0124:feat/kafka

Conversation

@bigxin0124
Copy link
Copy Markdown

Description

Thank you for opening a Pull Request!
Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Follow the CONTRIBUTING Guide.
  • Make your Pull Request title in the https://www.conventionalcommits.org/ specification.
    • Important Prefixes for release-please:
      • fix: which represents bug fixes, and correlates to a SemVer patch.
      • feat: represents a new feature, and correlates to a SemVer minor.
      • feat!:, or fix!:, refactor!:, etc., which represent a breaking change (indicated by the !) and will result in a SemVer major.
  • Ensure the tests and linter pass (Run bash scripts/format.sh from the repository root to format)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@bigxin0124 bigxin0124 requested a review from a team as a code owner August 8, 2025 09:41
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @bigxin0124, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

I've implemented a new Kafka-based transport layer for the A2A protocol, significantly enhancing its communication capabilities. This addition allows for reliable, high-throughput message exchange between agents, supporting various interaction models like request-response, streaming, and push notifications. The integration is comprehensive, covering both client and server components, and is accompanied by detailed design documents, usage examples, and development environment setup tools to facilitate adoption and testing.

Highlights

  • Full Kafka Transport Implementation: I've introduced a complete Kafka transport layer for the A2A protocol, enabling high-throughput, reliable communication. This includes both client-side (KafkaClientTransport) and server-side (KafkaServerApp, KafkaHandler) components, designed to integrate seamlessly with the existing A2A SDK.
  • Support for Diverse Communication Patterns: The new Kafka transport supports various communication patterns: traditional request-response (RPC) using correlation IDs, streaming for continuous data flows, and server-initiated push notifications to clients. A dedicated CorrelationManager handles the asynchronous matching of requests and responses.
  • Extensive Documentation and Examples: I've added comprehensive documentation (docs/kafka_transport.md, A2A on Kafka.md) and several examples (examples/kafka_example.py, examples/kafka_comprehensive_example.py, examples/kafka_handler_example.py, src/kafka_chatopenai_demo.py, src/kafka_currency_demo.py) to guide users on how to set up and utilize the Kafka transport. A scripts/setup_kafka_dev.py script and docker-compose.kafka.yml are also included for easy development environment setup.
  • Bug Fixes and Stability Improvements: Initial integration issues, such as incorrect error class imports and missing abstract method implementations, have been identified and fixed, as detailed in KAFKA_FIX_SUMMARY.md. This ensures the Kafka transport is robust and functional.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@bigxin0124 bigxin0124 closed this Aug 8, 2025
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces Kafka as a new transport layer, which is a significant and valuable addition. The implementation is comprehensive, covering client and server components, examples, and documentation. However, there are several critical issues that need to be addressed. The streaming implementation lacks proper timeout handling, which could lead to client-side hangs. The client is also unable to process server-side push notifications due to a mismatch in message handling logic. Additionally, some of the new tests are incorrect and will fail, and several examples contain hardcoded IP addresses, making them difficult to run. I've provided specific comments and suggestions to resolve these issues.

logger = logging.getLogger(__name__)

REQUEST_TOPIC = "a2a-requests-dev3"
BOOTSTRAP = "100.95.155.4:9094" # 如需本地测试请改为 "localhost:9092"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The BOOTSTRAP constant is hardcoded to a specific IP address (100.95.155.4:9094). This prevents the example from being run by most developers without code modification. It's recommended to default to a local address like localhost:9092 and allow overriding it via an environment variable for better portability.

Suggested change
BOOTSTRAP = "100.95.155.4:9094" # 如需本地测试请改为 "localhost:9092"
BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")

Comment on lines +190 to +192
if not correlation_id:
logger.warning("Received message without correlation_id")
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The consumer logic requires a correlation_id to process any message. However, push notifications are sent from the server without a correlation_id, which will cause the client to log a warning and drop all push notifications. The consumer logic should be updated to handle messages that lack a correlation_id, treating them as push notifications and dispatching them to a suitable handler.

Suggested change
if not correlation_id:
logger.warning("Received message without correlation_id")
continue
if not correlation_id:
# This could be a push notification.
logger.info("Received message without correlation_id, treating as push notification.")
# TODO: Add logic to handle push notifications, e.g., by invoking a callback.
continue

Comment on lines +319 to +337
try:
timeout = 30.0
if context and context.timeout:
timeout = context.timeout

# Yield responses as they arrive
while not streaming_future.is_done():
try:
# Wait for next response with timeout
result = await asyncio.wait_for(streaming_future.get(), timeout=5.0)
yield result
except asyncio.TimeoutError:
# Check if stream is done or if we've exceeded total timeout
if streaming_future.is_done():
break
# Continue waiting for more responses
continue

except Exception as e:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The total timeout for the streaming operation is not being enforced. The timeout value from the context is read, but only a fixed 5-second timeout is used for receiving each individual message. This can cause the client to hang indefinitely if the server stops sending messages without closing the stream. The entire streaming operation should be constrained by the overall timeout from the context.

Comment on lines +116 to +122
def test_initialization(self, kafka_transport, agent_card):
"""Test transport initialization."""
assert kafka_transport.agent_card == agent_card
assert kafka_transport.bootstrap_servers == "localhost:9092"
assert kafka_transport.request_topic == "test-requests"
assert kafka_transport.reply_topic == f"test-reply-{agent_card.id}"
assert not kafka_transport._running
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The agent_card fixture creates an AgentCard with an id field, which is not present in the model definition. This will cause an instantiation error. Furthermore, the test_initialization test asserts against agent_card.id and checks kafka_transport.reply_topic before it's initialized in the start() method. The test is incorrect and will fail. The fixture and test logic should be updated to use agent_card.name and reflect the correct transport lifecycle.

Suggested change
def test_initialization(self, kafka_transport, agent_card):
"""Test transport initialization."""
assert kafka_transport.agent_card == agent_card
assert kafka_transport.bootstrap_servers == "localhost:9092"
assert kafka_transport.request_topic == "test-requests"
assert kafka_transport.reply_topic == f"test-reply-{agent_card.id}"
assert not kafka_transport._running
def test_initialization(self, kafka_transport, agent_card):
"""Test transport initialization."""
assert kafka_transport.agent_card == agent_card
assert kafka_transport.bootstrap_servers == "localhost:9092"
assert kafka_transport.request_topic == "test-requests"
assert kafka_transport.reply_topic is None # Not set until start()
assert not kafka_transport._running

Comment thread A2A on Kafka.md
Comment on lines +109 to +111
"preferred_transport": str | None = Field(
default='JSONRPC', examples=['JSONRPC', 'GRPC', 'HTTP+JSON',]
),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The JSON example for AgentCard is not valid. It appears to contain Python syntax from a Pydantic model definition (str | None = Field(...)) rather than valid JSON. This should be corrected to provide a proper JSON example.

Suggested change
"preferred_transport": str | None = Field(
default='JSONRPC', examples=['JSONRPC', 'GRPC', 'HTTP+JSON',]
),
"preferred_transport": "JSONRPC",

Comment thread docs/kafka_transport.md
async def main():
# Create agent card
agent_card = AgentCard(
id="my-agent",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The AgentCard examples in this document use an id field (e.g., id="my-agent"). The AgentCard model in the codebase does not have an id field; it uses name. The documentation should be updated to use the correct field to avoid confusion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant