perf(chat-api): share one KafkaProducer across topics#46
Merged
Conversation
Reduce connection count for cost control. The three producer wrappers (message-input, tool-use-result, reply-to) each opened their own KafkaProducer, so the process held one set of broker connections per topic. KafkaProducer is thread-safe and routes per record to the topic on the ProducerRecord, so they now share a single instance built by KafkaProducerFactory and closed once at shutdown. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR reduces Kafka broker connection usage in chat-api by consolidating three per-topic KafkaProducer instances into a single shared, process-wide producer created once and closed once on shutdown.
Changes:
- Introduces
KafkaProducerFactoryto centralize construction/configuration of the shared producer. - Updates
KafkaMessageProducer,ToolResultProducer, andReplyToProducerto accept an injected shared producer and stop owning/closing it. - Updates
ChatApiAppto create, wire, and close the shared producer during shutdown.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| api/chat-api/src/main/java/io/flightdeck/api/ToolResultProducer.java | Switches to injected shared KafkaProducer and removes per-wrapper producer lifecycle management. |
| api/chat-api/src/main/java/io/flightdeck/api/ReplyToProducer.java | Switches to injected shared KafkaProducer and removes per-wrapper producer lifecycle management. |
| api/chat-api/src/main/java/io/flightdeck/api/KafkaProducerFactory.java | Adds a factory to build the single shared KafkaProducer instance/config. |
| api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java | Switches to injected shared KafkaProducer and removes per-wrapper producer lifecycle management. |
| api/chat-api/src/main/java/io/flightdeck/api/ChatApiApp.java | Wires a single shared producer into the wrappers and closes it once during shutdown. |
Comments suppressed due to low confidence (1)
api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java:33
- This method Javadoc mentions the unprefixed
message-inputtopic, but the actual topic is{AGENT_NAME}-message-inputviaTOPIC. Aligning the comment with the implementation helps operators trace records correctly.
/**
* Sends a message to the message-input topic, keyed by sessionId.
*/
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
9
to
11
| * Produces chat messages to the Kafka {@code message-input} topic. | ||
| * The record key is the session_id so that all messages for a session | ||
| * land on the same partition. |
Comment on lines
9
to
13
| * Produces async tool results to the Kafka {@code tool-use-result} topic — the | ||
| * same topic the synchronous tool consumers write to. The record key is the | ||
| * session_id so results co-partition with the aggregator's per-session store. | ||
| * <p>Fed by {@link ToolResponseHandler} when an external system calls back to | ||
| * {@code POST /api/tools/response} with the result of an async tool. |
| log.info("Tool result producer wired to shared producer — topic={}", TOPIC); | ||
| } | ||
|
|
||
| /** Sends a tool-use-result record to {@code tool-use-result}, keyed by sessionId. */ |
Drive the real /api/chat and /api/tools/response endpoints against a MockProducer and assert that message-input, reply-to and tool-use-result records all land on the single shared producer, keyed by session_id. To make the shared producer injectable, widen the three wrapper constructors from KafkaProducer to the Producer interface (KafkaProducer still satisfies them at runtime). Surefire now sets AGENT_NAME and a single ALLOWED_HOST_MAPPING entry so the wrappers load and the reply-to path is reachable. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Reduce connection count for cost control.
The chat-api process held three separate
KafkaProducerinstances — one each formessage-input,tool-use-result, andreply-to. Each producer opens its own set of broker connections, so a single agent multiplied its billed connection count by the number of producer wrappers.A
KafkaProduceris thread-safe and routes every record to the topic named on itsProducerRecord, so one instance serves all three topics. This PR introducesKafkaProducerFactoryto build a single shared producer, injects it into the three wrappers (whosesend/tombstoneAPIs are unchanged), and closes it once at shutdown.🤖 Generated with Claude Code