Skip to content

feat: multi-agent communication via async tool replies#42

Merged
tsuz merged 15 commits into
mainfrom
feat/multi-agent
May 31, 2026
Merged

feat: multi-agent communication via async tool replies#42
tsuz merged 15 commits into
mainfrom
feat/multi-agent

Conversation

@tsuz

@tsuz tsuz commented May 31, 2026

Copy link
Copy Markdown
Owner

Summary

Enables one FlightDeck agent to call another as an asynchronous tool: Agent A delegates a task, Agent B runs as an ordinary agent, and B's free-form answer flows back as A's tool result — without A holding a thread open. Closes the design discussed in #38.

Guiding principle: A treats B as a generic async tool; B treats the request as a generic chat. Neither knows the other is an agent.

What's included (Java side)

Agent A — callback side

  • POST /api/tools/response now authenticates via Authorization: Bearer <HMAC>; body is {"result": <…>}. Extracts session_id, tool_use_id, tool_id, name, total_tools from the token and writes a canonical tool-use-result. CORS removed.
  • tool_id added to ToolCallbackToken.Payload and ToolUseResult (nullable).
  • The seeded aggregator times a hung sub-agent out into a synthesized error result, so a turn never hangs.

Agent B — reply routing

  • New {agent}-reply-to topic (compact,delete + REPLY_TO_STATE_TTL_MS).
  • ChatHandler accepts an optional transport-level reply descriptor and writes it to reply-to keyed by session_id — never into message content.
  • EndTurnProcessor left-joins the reply-to KTable into UserResponse.reply_to (omitted for ordinary chats).
  • OutputConsumer delivers via HTTP callback when reply_to is present (bearer token, body {responseAsField: content}), with bounded retry on retriable failures and a tombstone-on-success so one-shot calls can't double-deliver; otherwise WebSocket as before.

Example

examples/multi-agent-setup/ — a runnable orchestrator → worker setup with the async dispatcher, the reply descriptor, and the callback wiring shown end to end. Builds chat-api/processing from source (functionality not yet in published images).

Docs

Companion page added in flightdeck-docs (concepts/multi-agent.md) on a matching feat/multi-agent branch — illustrates the design choices, the /api/tools/response contract, the reply descriptor schema, and configuration.

Tests

94/94 processing tests pass; both Java modules compile.

Not included / follow-ups

  • SDK token minting (pending() / asyncToken) is deferred; the example's dispatcher mints the token directly.
  • Unrelated in-flight KafkaEnvPropsTest files (a separate KafkaEnvProps refactor) are intentionally not part of this branch.

🤖 Generated with Claude Code

tsuz and others added 8 commits May 31, 2026 08:20
…uting

Enable an agent to call another agent as an asynchronous tool and have the
sub-agent's free-form response routed back as the tool result.

Agent A (callback side):
- ToolResponseHandler now authenticates via `Authorization: Bearer <HMAC>` at
  POST /api/tools/response; body is `{"result": <...>}`. Extracts session_id,
  tool_use_id, tool_id, name, total_tools from the token and writes a canonical
  tool-use-result. CORS removed (server-to-server).
- tool_id added to ToolCallbackToken.Payload and ToolUseResult (nullable).
- Seeded aggregator keeps the expected tool_use_id set + deadline so a hung
  sub-agent is timed out into a synthesized error result.

Agent B (reply routing):
- New {agent}-reply-to topic (compact,delete + retention=REPLY_TO_STATE_TTL_MS).
- ChatHandler accepts an optional transport-level `reply` descriptor and writes
  it to reply-to keyed by session_id — never into message content.
- EndTurnProcessor left-joins the reply-to KTable and embeds the descriptor into
  UserResponse.reply_to (omitted for ordinary chats).
- OutputConsumer delivers via HTTP callback when reply_to is present (bearer
  token, body {responseAsField: content}), with bounded retry on retriable
  failures and a tombstone-on-success so one-shot calls can't double-deliver;
  otherwise WebSocket delivery as before.

Tests: 94/94 processing tests pass; both Java modules compile.
SDK token minting deferred.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Two-agent example demonstrating the async callback + reply-routing design:
- orchestrator (Agent A): delegate_task tool whose dispatcher mints an HMAC
  callback token, POSTs the task to the worker's /api/chat with a reply
  descriptor, and acks "pending" (commits the tool-use offset, no result).
- worker (Agent B): an ordinary FlightDeck agent; its chat-api OutputConsumer
  POSTs the free-form answer back to the orchestrator's /api/tools/response.

chat-api and processing build from source (branch functionality not yet in the
published images). README walks through the flow and the design choices.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- chat-api and processing Dockerfiles now build with -Dmaven.test.skip=true so
  the runtime image build is decoupled from test sources (tests run in CI via
  `mvn test`). Fixes `docker compose build` failing to compile unrelated
  work-in-progress test files copied via `COPY src ./src`.
- TODO(multi-agent): reply-to is keyed by session_id, so only one outstanding
  reply route per session is supported. Key by sessionId:requestId to allow
  multiple concurrent callbacks within a session. Deferred.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… schema

The multi-agent example mixed a source-built processing/chat-api with a
published think-consumer:latest. The older think-consumer emits last_input_*
fields the newer processing reads as empty, which caused (a) the worker's answer
to come back empty (EndTurnProcessor.assembleContent saw an empty
last_input_response) and (b) the orchestrator to lose its history on the
tool-result turn (enrich rebuilt history from empty last_input_message /
last_input_response), sending Claude an orphaned tool_result → HTTP 400.

Build think-consumer from source too so all three core services share the
branch's schema. Also apply -Dmaven.test.skip=true to think-consumer's
Dockerfile (consistent with chat-api/processing) so the image build is
decoupled from test sources.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
EndTurnProcessor (worker-think-request-response → worker-message-output) and the
enrich/history join only read last_input_response / think_*_tokens. Published
think-consumer images predate the previous/last-input split and emit a single
"messages" list plus input_tokens/output_tokens/cost. Deserialized against this
model those fields were null, so the worker's answer came through as empty
content (and the orchestrator lost the assistant tool_use turn → Anthropic 400
on the tool result).

Add @JsonAlias so the model also accepts the older schema:
messages→last_input_response, input_tokens→think_input_tokens,
output_tokens→think_output_tokens, cost→total_session_cost,
prev_session_cost→previous_session_cost. assembleContent()/history filter by
role, so a combined messages list still yields correct content. Add an
EndTurnProcessor unit test that deserializes the real messages-schema wire JSON
and asserts content + token counts (fails without the aliases).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
integration-test.sh builds and starts the stack, POSTs a message to
orchestrator-api, and verifies the async round-trip: the worker handled a
delegated sub-session ({session}--{tool_use_id}) and the orchestrator produced a
non-empty final answer for the session. Tears the stack down on exit. Takes
CLAUDE_API_KEY from the environment; the default prompt derives ∫₀^∞ x²/(eˣ−1) dx
(= 2ζ(3) ≈ 2.404114) for an easily-checked result.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
New GitHub Actions workflow that runs examples/multi-agent-setup/integration-test.sh
end to end: builds the stack, posts to orchestrator-api, and verifies the
orchestrator → worker → callback round-trip.

It hits the real Claude API, so it runs only on manual dispatch and on pushes to
main / feat/multi-agent (never fork PRs, which lack the secret), and soft-skips
when CLAUDE_API_KEY is unset. Requires the CLAUDE_API_KEY repo secret.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Run the integration test in PR context but never automatically: bind the e2e
job to the `integration-tests` environment so it waits for a human approval
("Review deployments" → Approve) before building images or calling Claude. Drop
the automatic push trigger. Documented setup: create the environment with
required reviewers (the tap) and mark "Orchestrator → Worker round-trip" as a
required status check so a PR can't merge until the test is approved and passes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@tsuz tsuz had a problem deploying to integration-tests May 31, 2026 09:52 — with GitHub Actions Failure
Switch from a manual PR gate to a post-merge trigger: run the integration test
on push to main (when the example or core services change) and on manual
dispatch. Drop the environment/required-reviewer gate. Soft-skip when
CLAUDE_API_KEY is unset so it never reds-out main.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an asynchronous “agent-to-agent as tool” communication path by introducing a reply-routing topic and HTTP callback delivery, plus a seeded tool-result aggregator with timeout synthesis so turns don’t hang when async callbacks never arrive.

Changes:

  • Add multi-agent reply routing: /api/chat can persist a per-session reply descriptor to {agent}-reply-to, EndTurnProcessor joins it into message-output, and OutputConsumer delivers via HTTP callback + tombstones the route.
  • Add async tool callback endpoint: POST /api/tools/response verifies an HMAC token and produces canonical tool-use-result records (including optional tool_id).
  • Update processing tool aggregation: seed expected tool set from think-request-response, synthesize timeout errors for missing async tools, and retain tombstones to ignore late/duplicate callbacks; plus ThinkResponse schema compatibility via @JsonAlias.

Reviewed changes

Copilot reviewed 32 out of 32 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
think/think-consumer/Dockerfile Skip tests during image build with -Dmaven.test.skip=true.
processor-apps/processing/src/test/java/io/flightdeck/streams/processors/TransformToolUseDoneProcessorTest.java Update tests for new accumulator/result model fields.
processor-apps/processing/src/test/java/io/flightdeck/streams/processors/EndTurnProcessorTest.java Add reply-to join setup and regression coverage for older wire schema.
processor-apps/processing/src/test/java/io/flightdeck/streams/processors/AggregateToolExecutionResultProcessorTest.java Expand tests for seeding, async timeouts, and tombstone behavior.
processor-apps/processing/src/main/java/io/flightdeck/streams/processors/EndTurnProcessor.java Left-join reply-to descriptor into UserResponse.reply_to.
processor-apps/processing/src/main/java/io/flightdeck/streams/processors/AggregateToolExecutionResultProcessor.java Merge seed+result streams, add timeout sweep, synthesize missing error results, tombstone handling.
processor-apps/processing/src/main/java/io/flightdeck/streams/model/UserResponse.java Add nullable reply_to field (omitted when null).
processor-apps/processing/src/main/java/io/flightdeck/streams/model/ToolUseResult.java Add nullable tool_id field.
processor-apps/processing/src/main/java/io/flightdeck/streams/model/ToolResultAccumulator.java Add expected-set + deadline support; missing-tools computation.
processor-apps/processing/src/main/java/io/flightdeck/streams/model/ThinkResponse.java Add @JsonAlias to accept older think-consumer schema.
processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java Materialize reply-to KTable, ensure REPLY_TO topic exists with retention/cleanup config.
processor-apps/processing/src/main/java/io/flightdeck/streams/config/Topics.java Add REPLY_TO topic constant + documentation.
processor-apps/processing/Dockerfile Skip tests during image build with -Dmaven.test.skip=true.
examples/multi-agent-setup/worker/tools.json Add empty tool config for worker example.
examples/multi-agent-setup/worker/system-prompt.txt Add worker agent prompt for example stack.
examples/multi-agent-setup/README.md Document the multi-agent async tool delegation design and how to run it.
examples/multi-agent-setup/orchestrator/tools.json Add orchestrator tool definition for delegate_task.
examples/multi-agent-setup/orchestrator/system-prompt.txt Add orchestrator agent prompt for example stack.
examples/multi-agent-setup/orchestrator/dispatcher/requirements.txt Add Python dispatcher dependencies.
examples/multi-agent-setup/orchestrator/dispatcher/Dockerfile Add Docker image for Python dispatcher.
examples/multi-agent-setup/orchestrator/dispatcher/app.py Implement async dispatcher: mint token, call worker with reply descriptor, ack pending.
examples/multi-agent-setup/integration-test.sh Add end-to-end integration test for orchestrator→worker→callback round-trip.
examples/multi-agent-setup/docker-compose.yml Add runnable compose stack for orchestrator+worker agents and dispatcher.
api/chat-api/src/main/java/io/flightdeck/api/ToolResultProducer.java New Kafka producer for async tool results.
api/chat-api/src/main/java/io/flightdeck/api/ToolResponseHandler.java New /api/tools/response handler that verifies token and produces tool results.
api/chat-api/src/main/java/io/flightdeck/api/ToolCallbackToken.java New HMAC token verification utility.
api/chat-api/src/main/java/io/flightdeck/api/ReplyToProducer.java New producer for {agent}-reply-to (write + tombstone).
api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java Route message-output to WebSocket or HTTP callback with retries and tombstone-on-success.
api/chat-api/src/main/java/io/flightdeck/api/ChatHandler.java Accept optional reply descriptor and persist to reply-to topic.
api/chat-api/src/main/java/io/flightdeck/api/ChatApiApp.java Wire new producers/handlers and updated OutputConsumer constructor.
api/chat-api/Dockerfile Skip tests during image build with -Dmaven.test.skip=true.
.github/workflows/multi-agent-integration.yml Add post-merge integration workflow for example stack.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +239 to +245
if (current == null) {
// First result of a turn arriving before its seed: start a
// count-based accumulator (the seed merges in the expected set later).
current = new ToolResultAccumulator(
sessionId, result.sessionId(), result.totalTools(),
List.of(), List.of(), now + ASYNC_TOOL_TIMEOUT_MS, false, null);
}
Comment on lines +51 to +53
/** Time-based expiry for reply-to routing state (default 24h). */
static final long REPLY_TO_STATE_TTL_MS = Long.parseLong(
System.getenv().getOrDefault("REPLY_TO_STATE_TTL_MS", "86400000"));
Comment thread api/chat-api/src/main/java/io/flightdeck/api/ChatApiApp.java
Comment thread api/chat-api/src/main/java/io/flightdeck/api/ToolResultProducer.java Outdated
Comment on lines +75 to +78
} catch (ToolCallbackToken.InvalidTokenException e) {
log.warn("Rejected tool callback: {}", e.getMessage());
sendJson(exchange, 401, "{\"error\":\"" + e.getMessage() + "\"}");
return;
Comment thread api/chat-api/src/main/java/io/flightdeck/api/ChatHandler.java
Comment thread api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java
Comment on lines +130 to +179
/** POSTs the response back to the calling agent, with bounded retries. */
private void deliverHttp(String sessionId, String value, JsonNode replyTo) {
final HttpRequest request;
try {
request = buildRequest(value, replyTo);
} catch (Exception e) {
log.error("[{}] Invalid reply-to descriptor — skipping delivery: {}", sessionId, e.getMessage());
return;
}

long deadline = System.currentTimeMillis() + REPLY_RETRY_MAX_MS;
long backoff = RETRY_INITIAL_BACKOFF_MS;
int attempt = 0;

while (running) {
attempt++;
try {
HttpResponse<String> resp =
httpClient.send(request, HttpResponse.BodyHandlers.ofString());
int code = resp.statusCode();
if (code >= 200 && code < 300) {
log.info("[{}] Reply delivered (HTTP {}, attempt {}) — tombstoning route",
sessionId, code, attempt);
replyToProducer.tombstone(sessionId);
return;
}
if (!isRetriable(code)) {
log.error("[{}] Reply delivery failed with non-retriable HTTP {} — skipping. Body: {}",
sessionId, code, truncate(resp.body()));
return;
}
log.warn("[{}] Reply delivery got retriable HTTP {} (attempt {})", sessionId, code, attempt);
} catch (Exception e) {
// Connection/timeout errors are retriable.
log.warn("[{}] Reply delivery error (attempt {}): {}", sessionId, attempt, e.getMessage());
}

if (System.currentTimeMillis() + backoff >= deadline) {
log.error("[{}] Reply delivery exhausted retry budget ({}ms) after {} attempts — skipping",
sessionId, REPLY_RETRY_MAX_MS, attempt);
return;
}
try {
Thread.sleep(backoff);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
backoff = Math.min(backoff * 2, RETRY_MAX_BACKOFF_MS);
}
tsuz and others added 3 commits May 31, 2026 19:08
Broaden api/chat-api/** to api/** and add frontend/** to the path filter so a
push to main touching the API or frontend also triggers the integration test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
tsuz and others added 2 commits May 31, 2026 22:03
…-seed fallback

The count-based fallback (a tool result arriving before its seed) initialized
the accumulator with userId = result.sessionId(), because ToolUseResult carries
no user id. If that path completed before the seed merged (e.g. a fast sync tool,
or topic reordering around a restart), the emitted tool message carried
user_id = session_id — mis-keying memoir and any user-scoped logic until the next
user message corrected it.

Initialize userId as null instead; the seed (think-request-response) fills it in
when it merges, and downstream already tolerates a null user_id. Also set a real
timestamp rather than null. (Reported by Copilot review.)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The reply descriptor let an untrusted caller supply the callback URL
(endpoint/path/method), which the worker's OutputConsumer fetched verbatim
with a bearer token attached — a direct SSRF primitive reachable from the
unauthenticated /api/chat endpoint.

Callers now supply only a logical `callbackService` name; the destination
URL is resolved server-side from operator-controlled ALLOWED_HOST_MAPPING
(name:baseUrl,...) and the path is fixed to /api/tools/response. Unknown
names fail closed at ingress (ChatHandler -> 400) and at egress
(OutputConsumer, before send). The HMAC token is still echoed as
Authorization: Bearer to preserve /api/tools/response correlation.

New CallbackRegistry resolves and validates the mapping. Example wiring
updated: dispatcher sends CALLBACK_SERVICE instead of a URL; worker-api
configures ALLOWED_HOST_MAPPING. Adds JUnit/AssertJ + surefire and tests
for the registry, ingress validation, and routing predicate.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 37 out of 37 changed files in this pull request and generated 3 comments.

Comment on lines +181 to +185
Set<String> seedIds = expected.stream()
.map(ExpectedTool::toolUseId).collect(Collectors.toSet());
if (knownIds(current).containsAll(seedIds)) {
return;
}
Comment thread api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java
Comment on lines +239 to +243
if (current == null) {
// First result of a turn arriving before its seed: start a
// count-based accumulator (the seed merges in the expected set later).
// user_id is unknown here — ToolUseResult carries none. Leave it
// null; the seed (think-request-response) fills it in when it
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
@tsuz tsuz merged commit b5a295f into main May 31, 2026
2 checks passed
@tsuz tsuz deleted the feat/multi-agent branch May 31, 2026 21:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants