Skip to content

perf(chat-api): share one KafkaProducer across topics#46

Merged
tsuz merged 2 commits into
mainfrom
perf/chat-api-shared-producer
Jun 14, 2026
Merged

perf(chat-api): share one KafkaProducer across topics#46
tsuz merged 2 commits into
mainfrom
perf/chat-api-shared-producer

Conversation

@tsuz

@tsuz tsuz commented Jun 14, 2026

Copy link
Copy Markdown
Owner

Reduce connection count for cost control.

The chat-api process held three separate KafkaProducer instances — one each for message-input, tool-use-result, and reply-to. Each producer opens its own set of broker connections, so a single agent multiplied its billed connection count by the number of producer wrappers.

A KafkaProducer is thread-safe and routes every record to the topic named on its ProducerRecord, so one instance serves all three topics. This PR introduces KafkaProducerFactory to build a single shared producer, injects it into the three wrappers (whose send/tombstone APIs are unchanged), and closes it once at shutdown.

🤖 Generated with Claude Code

Reduce connection count for cost control. The three producer wrappers
(message-input, tool-use-result, reply-to) each opened their own
KafkaProducer, so the process held one set of broker connections per
topic. KafkaProducer is thread-safe and routes per record to the topic
on the ProducerRecord, so they now share a single instance built by
KafkaProducerFactory and closed once at shutdown.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR reduces Kafka broker connection usage in chat-api by consolidating three per-topic KafkaProducer instances into a single shared, process-wide producer created once and closed once on shutdown.

Changes:

  • Introduces KafkaProducerFactory to centralize construction/configuration of the shared producer.
  • Updates KafkaMessageProducer, ToolResultProducer, and ReplyToProducer to accept an injected shared producer and stop owning/closing it.
  • Updates ChatApiApp to create, wire, and close the shared producer during shutdown.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
api/chat-api/src/main/java/io/flightdeck/api/ToolResultProducer.java Switches to injected shared KafkaProducer and removes per-wrapper producer lifecycle management.
api/chat-api/src/main/java/io/flightdeck/api/ReplyToProducer.java Switches to injected shared KafkaProducer and removes per-wrapper producer lifecycle management.
api/chat-api/src/main/java/io/flightdeck/api/KafkaProducerFactory.java Adds a factory to build the single shared KafkaProducer instance/config.
api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java Switches to injected shared KafkaProducer and removes per-wrapper producer lifecycle management.
api/chat-api/src/main/java/io/flightdeck/api/ChatApiApp.java Wires a single shared producer into the wrappers and closes it once during shutdown.
Comments suppressed due to low confidence (1)

api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java:33

  • This method Javadoc mentions the unprefixed message-input topic, but the actual topic is {AGENT_NAME}-message-input via TOPIC. Aligning the comment with the implementation helps operators trace records correctly.
    /**
     * Sends a message to the message-input topic, keyed by sessionId.
     */

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 9 to 11
* Produces chat messages to the Kafka {@code message-input} topic.
* The record key is the session_id so that all messages for a session
* land on the same partition.
Comment on lines 9 to 13
* Produces async tool results to the Kafka {@code tool-use-result} topic — the
* same topic the synchronous tool consumers write to. The record key is the
* session_id so results co-partition with the aggregator's per-session store.
* <p>Fed by {@link ToolResponseHandler} when an external system calls back to
* {@code POST /api/tools/response} with the result of an async tool.
log.info("Tool result producer wired to shared producer — topic={}", TOPIC);
}

/** Sends a tool-use-result record to {@code tool-use-result}, keyed by sessionId. */
Drive the real /api/chat and /api/tools/response endpoints against a
MockProducer and assert that message-input, reply-to and tool-use-result
records all land on the single shared producer, keyed by session_id.

To make the shared producer injectable, widen the three wrapper
constructors from KafkaProducer to the Producer interface (KafkaProducer
still satisfies them at runtime). Surefire now sets AGENT_NAME and a
single ALLOWED_HOST_MAPPING entry so the wrappers load and the reply-to
path is reachable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@tsuz tsuz merged commit 0838dc1 into main Jun 14, 2026
2 checks passed
@tsuz tsuz deleted the perf/chat-api-shared-producer branch June 14, 2026 12:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants