feat: multi-agent communication via async tool replies#42
Merged
Conversation
…uting
Enable an agent to call another agent as an asynchronous tool and have the
sub-agent's free-form response routed back as the tool result.
Agent A (callback side):
- ToolResponseHandler now authenticates via `Authorization: Bearer <HMAC>` at
POST /api/tools/response; body is `{"result": <...>}`. Extracts session_id,
tool_use_id, tool_id, name, total_tools from the token and writes a canonical
tool-use-result. CORS removed (server-to-server).
- tool_id added to ToolCallbackToken.Payload and ToolUseResult (nullable).
- Seeded aggregator keeps the expected tool_use_id set + deadline so a hung
sub-agent is timed out into a synthesized error result.
Agent B (reply routing):
- New {agent}-reply-to topic (compact,delete + retention=REPLY_TO_STATE_TTL_MS).
- ChatHandler accepts an optional transport-level `reply` descriptor and writes
it to reply-to keyed by session_id — never into message content.
- EndTurnProcessor left-joins the reply-to KTable and embeds the descriptor into
UserResponse.reply_to (omitted for ordinary chats).
- OutputConsumer delivers via HTTP callback when reply_to is present (bearer
token, body {responseAsField: content}), with bounded retry on retriable
failures and a tombstone-on-success so one-shot calls can't double-deliver;
otherwise WebSocket delivery as before.
Tests: 94/94 processing tests pass; both Java modules compile.
SDK token minting deferred.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Two-agent example demonstrating the async callback + reply-routing design: - orchestrator (Agent A): delegate_task tool whose dispatcher mints an HMAC callback token, POSTs the task to the worker's /api/chat with a reply descriptor, and acks "pending" (commits the tool-use offset, no result). - worker (Agent B): an ordinary FlightDeck agent; its chat-api OutputConsumer POSTs the free-form answer back to the orchestrator's /api/tools/response. chat-api and processing build from source (branch functionality not yet in the published images). README walks through the flow and the design choices. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- chat-api and processing Dockerfiles now build with -Dmaven.test.skip=true so the runtime image build is decoupled from test sources (tests run in CI via `mvn test`). Fixes `docker compose build` failing to compile unrelated work-in-progress test files copied via `COPY src ./src`. - TODO(multi-agent): reply-to is keyed by session_id, so only one outstanding reply route per session is supported. Key by sessionId:requestId to allow multiple concurrent callbacks within a session. Deferred. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… schema The multi-agent example mixed a source-built processing/chat-api with a published think-consumer:latest. The older think-consumer emits last_input_* fields the newer processing reads as empty, which caused (a) the worker's answer to come back empty (EndTurnProcessor.assembleContent saw an empty last_input_response) and (b) the orchestrator to lose its history on the tool-result turn (enrich rebuilt history from empty last_input_message / last_input_response), sending Claude an orphaned tool_result → HTTP 400. Build think-consumer from source too so all three core services share the branch's schema. Also apply -Dmaven.test.skip=true to think-consumer's Dockerfile (consistent with chat-api/processing) so the image build is decoupled from test sources. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
EndTurnProcessor (worker-think-request-response → worker-message-output) and the enrich/history join only read last_input_response / think_*_tokens. Published think-consumer images predate the previous/last-input split and emit a single "messages" list plus input_tokens/output_tokens/cost. Deserialized against this model those fields were null, so the worker's answer came through as empty content (and the orchestrator lost the assistant tool_use turn → Anthropic 400 on the tool result). Add @JsonAlias so the model also accepts the older schema: messages→last_input_response, input_tokens→think_input_tokens, output_tokens→think_output_tokens, cost→total_session_cost, prev_session_cost→previous_session_cost. assembleContent()/history filter by role, so a combined messages list still yields correct content. Add an EndTurnProcessor unit test that deserializes the real messages-schema wire JSON and asserts content + token counts (fails without the aliases). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
integration-test.sh builds and starts the stack, POSTs a message to
orchestrator-api, and verifies the async round-trip: the worker handled a
delegated sub-session ({session}--{tool_use_id}) and the orchestrator produced a
non-empty final answer for the session. Tears the stack down on exit. Takes
CLAUDE_API_KEY from the environment; the default prompt derives ∫₀^∞ x²/(eˣ−1) dx
(= 2ζ(3) ≈ 2.404114) for an easily-checked result.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
New GitHub Actions workflow that runs examples/multi-agent-setup/integration-test.sh end to end: builds the stack, posts to orchestrator-api, and verifies the orchestrator → worker → callback round-trip. It hits the real Claude API, so it runs only on manual dispatch and on pushes to main / feat/multi-agent (never fork PRs, which lack the secret), and soft-skips when CLAUDE_API_KEY is unset. Requires the CLAUDE_API_KEY repo secret. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Run the integration test in PR context but never automatically: bind the e2e
job to the `integration-tests` environment so it waits for a human approval
("Review deployments" → Approve) before building images or calling Claude. Drop
the automatic push trigger. Documented setup: create the environment with
required reviewers (the tap) and mark "Orchestrator → Worker round-trip" as a
required status check so a PR can't merge until the test is approved and passes.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Switch from a manual PR gate to a post-merge trigger: run the integration test on push to main (when the example or core services change) and on manual dispatch. Drop the environment/required-reviewer gate. Soft-skip when CLAUDE_API_KEY is unset so it never reds-out main. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
Pull request overview
Adds an asynchronous “agent-to-agent as tool” communication path by introducing a reply-routing topic and HTTP callback delivery, plus a seeded tool-result aggregator with timeout synthesis so turns don’t hang when async callbacks never arrive.
Changes:
- Add multi-agent reply routing:
/api/chatcan persist a per-sessionreplydescriptor to{agent}-reply-to,EndTurnProcessorjoins it intomessage-output, andOutputConsumerdelivers via HTTP callback + tombstones the route. - Add async tool callback endpoint:
POST /api/tools/responseverifies an HMAC token and produces canonicaltool-use-resultrecords (including optionaltool_id). - Update processing tool aggregation: seed expected tool set from
think-request-response, synthesize timeout errors for missing async tools, and retain tombstones to ignore late/duplicate callbacks; plus ThinkResponse schema compatibility via@JsonAlias.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| think/think-consumer/Dockerfile | Skip tests during image build with -Dmaven.test.skip=true. |
| processor-apps/processing/src/test/java/io/flightdeck/streams/processors/TransformToolUseDoneProcessorTest.java | Update tests for new accumulator/result model fields. |
| processor-apps/processing/src/test/java/io/flightdeck/streams/processors/EndTurnProcessorTest.java | Add reply-to join setup and regression coverage for older wire schema. |
| processor-apps/processing/src/test/java/io/flightdeck/streams/processors/AggregateToolExecutionResultProcessorTest.java | Expand tests for seeding, async timeouts, and tombstone behavior. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/processors/EndTurnProcessor.java | Left-join reply-to descriptor into UserResponse.reply_to. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/processors/AggregateToolExecutionResultProcessor.java | Merge seed+result streams, add timeout sweep, synthesize missing error results, tombstone handling. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/model/UserResponse.java | Add nullable reply_to field (omitted when null). |
| processor-apps/processing/src/main/java/io/flightdeck/streams/model/ToolUseResult.java | Add nullable tool_id field. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/model/ToolResultAccumulator.java | Add expected-set + deadline support; missing-tools computation. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/model/ThinkResponse.java | Add @JsonAlias to accept older think-consumer schema. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java | Materialize reply-to KTable, ensure REPLY_TO topic exists with retention/cleanup config. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/config/Topics.java | Add REPLY_TO topic constant + documentation. |
| processor-apps/processing/Dockerfile | Skip tests during image build with -Dmaven.test.skip=true. |
| examples/multi-agent-setup/worker/tools.json | Add empty tool config for worker example. |
| examples/multi-agent-setup/worker/system-prompt.txt | Add worker agent prompt for example stack. |
| examples/multi-agent-setup/README.md | Document the multi-agent async tool delegation design and how to run it. |
| examples/multi-agent-setup/orchestrator/tools.json | Add orchestrator tool definition for delegate_task. |
| examples/multi-agent-setup/orchestrator/system-prompt.txt | Add orchestrator agent prompt for example stack. |
| examples/multi-agent-setup/orchestrator/dispatcher/requirements.txt | Add Python dispatcher dependencies. |
| examples/multi-agent-setup/orchestrator/dispatcher/Dockerfile | Add Docker image for Python dispatcher. |
| examples/multi-agent-setup/orchestrator/dispatcher/app.py | Implement async dispatcher: mint token, call worker with reply descriptor, ack pending. |
| examples/multi-agent-setup/integration-test.sh | Add end-to-end integration test for orchestrator→worker→callback round-trip. |
| examples/multi-agent-setup/docker-compose.yml | Add runnable compose stack for orchestrator+worker agents and dispatcher. |
| api/chat-api/src/main/java/io/flightdeck/api/ToolResultProducer.java | New Kafka producer for async tool results. |
| api/chat-api/src/main/java/io/flightdeck/api/ToolResponseHandler.java | New /api/tools/response handler that verifies token and produces tool results. |
| api/chat-api/src/main/java/io/flightdeck/api/ToolCallbackToken.java | New HMAC token verification utility. |
| api/chat-api/src/main/java/io/flightdeck/api/ReplyToProducer.java | New producer for {agent}-reply-to (write + tombstone). |
| api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java | Route message-output to WebSocket or HTTP callback with retries and tombstone-on-success. |
| api/chat-api/src/main/java/io/flightdeck/api/ChatHandler.java | Accept optional reply descriptor and persist to reply-to topic. |
| api/chat-api/src/main/java/io/flightdeck/api/ChatApiApp.java | Wire new producers/handlers and updated OutputConsumer constructor. |
| api/chat-api/Dockerfile | Skip tests during image build with -Dmaven.test.skip=true. |
| .github/workflows/multi-agent-integration.yml | Add post-merge integration workflow for example stack. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+239
to
+245
| if (current == null) { | ||
| // First result of a turn arriving before its seed: start a | ||
| // count-based accumulator (the seed merges in the expected set later). | ||
| current = new ToolResultAccumulator( | ||
| sessionId, result.sessionId(), result.totalTools(), | ||
| List.of(), List.of(), now + ASYNC_TOOL_TIMEOUT_MS, false, null); | ||
| } |
Comment on lines
+51
to
+53
| /** Time-based expiry for reply-to routing state (default 24h). */ | ||
| static final long REPLY_TO_STATE_TTL_MS = Long.parseLong( | ||
| System.getenv().getOrDefault("REPLY_TO_STATE_TTL_MS", "86400000")); |
Comment on lines
+75
to
+78
| } catch (ToolCallbackToken.InvalidTokenException e) { | ||
| log.warn("Rejected tool callback: {}", e.getMessage()); | ||
| sendJson(exchange, 401, "{\"error\":\"" + e.getMessage() + "\"}"); | ||
| return; |
Comment on lines
+130
to
+179
| /** POSTs the response back to the calling agent, with bounded retries. */ | ||
| private void deliverHttp(String sessionId, String value, JsonNode replyTo) { | ||
| final HttpRequest request; | ||
| try { | ||
| request = buildRequest(value, replyTo); | ||
| } catch (Exception e) { | ||
| log.error("[{}] Invalid reply-to descriptor — skipping delivery: {}", sessionId, e.getMessage()); | ||
| return; | ||
| } | ||
|
|
||
| long deadline = System.currentTimeMillis() + REPLY_RETRY_MAX_MS; | ||
| long backoff = RETRY_INITIAL_BACKOFF_MS; | ||
| int attempt = 0; | ||
|
|
||
| while (running) { | ||
| attempt++; | ||
| try { | ||
| HttpResponse<String> resp = | ||
| httpClient.send(request, HttpResponse.BodyHandlers.ofString()); | ||
| int code = resp.statusCode(); | ||
| if (code >= 200 && code < 300) { | ||
| log.info("[{}] Reply delivered (HTTP {}, attempt {}) — tombstoning route", | ||
| sessionId, code, attempt); | ||
| replyToProducer.tombstone(sessionId); | ||
| return; | ||
| } | ||
| if (!isRetriable(code)) { | ||
| log.error("[{}] Reply delivery failed with non-retriable HTTP {} — skipping. Body: {}", | ||
| sessionId, code, truncate(resp.body())); | ||
| return; | ||
| } | ||
| log.warn("[{}] Reply delivery got retriable HTTP {} (attempt {})", sessionId, code, attempt); | ||
| } catch (Exception e) { | ||
| // Connection/timeout errors are retriable. | ||
| log.warn("[{}] Reply delivery error (attempt {}): {}", sessionId, attempt, e.getMessage()); | ||
| } | ||
|
|
||
| if (System.currentTimeMillis() + backoff >= deadline) { | ||
| log.error("[{}] Reply delivery exhausted retry budget ({}ms) after {} attempts — skipping", | ||
| sessionId, REPLY_RETRY_MAX_MS, attempt); | ||
| return; | ||
| } | ||
| try { | ||
| Thread.sleep(backoff); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| return; | ||
| } | ||
| backoff = Math.min(backoff * 2, RETRY_MAX_BACKOFF_MS); | ||
| } |
Broaden api/chat-api/** to api/** and add frontend/** to the path filter so a push to main touching the API or frontend also triggers the integration test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
…-seed fallback The count-based fallback (a tool result arriving before its seed) initialized the accumulator with userId = result.sessionId(), because ToolUseResult carries no user id. If that path completed before the seed merged (e.g. a fast sync tool, or topic reordering around a restart), the emitted tool message carried user_id = session_id — mis-keying memoir and any user-scoped logic until the next user message corrected it. Initialize userId as null instead; the seed (think-request-response) fills it in when it merges, and downstream already tolerates a null user_id. Also set a real timestamp rather than null. (Reported by Copilot review.) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The reply descriptor let an untrusted caller supply the callback URL (endpoint/path/method), which the worker's OutputConsumer fetched verbatim with a bearer token attached — a direct SSRF primitive reachable from the unauthenticated /api/chat endpoint. Callers now supply only a logical `callbackService` name; the destination URL is resolved server-side from operator-controlled ALLOWED_HOST_MAPPING (name:baseUrl,...) and the path is fixed to /api/tools/response. Unknown names fail closed at ingress (ChatHandler -> 400) and at egress (OutputConsumer, before send). The HMAC token is still echoed as Authorization: Bearer to preserve /api/tools/response correlation. New CallbackRegistry resolves and validates the mapping. Example wiring updated: dispatcher sends CALLBACK_SERVICE instead of a URL; worker-api configures ALLOWED_HOST_MAPPING. Adds JUnit/AssertJ + surefire and tests for the registry, ingress validation, and routing predicate. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Comment on lines
+181
to
+185
| Set<String> seedIds = expected.stream() | ||
| .map(ExpectedTool::toolUseId).collect(Collectors.toSet()); | ||
| if (knownIds(current).containsAll(seedIds)) { | ||
| return; | ||
| } |
Comment on lines
+239
to
+243
| if (current == null) { | ||
| // First result of a turn arriving before its seed: start a | ||
| // count-based accumulator (the seed merges in the expected set later). | ||
| // user_id is unknown here — ToolUseResult carries none. Leave it | ||
| // null; the seed (think-request-response) fills it in when it |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Enables one FlightDeck agent to call another as an asynchronous tool: Agent A delegates a task, Agent B runs as an ordinary agent, and B's free-form answer flows back as A's tool result — without A holding a thread open. Closes the design discussed in #38.
Guiding principle: A treats B as a generic async tool; B treats the request as a generic chat. Neither knows the other is an agent.
What's included (Java side)
Agent A — callback side
POST /api/tools/responsenow authenticates viaAuthorization: Bearer <HMAC>; body is{"result": <…>}. Extracts session_id, tool_use_id, tool_id, name, total_tools from the token and writes a canonicaltool-use-result. CORS removed.tool_idadded toToolCallbackToken.PayloadandToolUseResult(nullable).Agent B — reply routing
{agent}-reply-totopic (compact,delete +REPLY_TO_STATE_TTL_MS).ChatHandleraccepts an optional transport-levelreplydescriptor and writes it to reply-to keyed by session_id — never into message content.EndTurnProcessorleft-joins the reply-to KTable intoUserResponse.reply_to(omitted for ordinary chats).OutputConsumerdelivers via HTTP callback when reply_to is present (bearer token, body{responseAsField: content}), with bounded retry on retriable failures and a tombstone-on-success so one-shot calls can't double-deliver; otherwise WebSocket as before.Example
examples/multi-agent-setup/— a runnable orchestrator → worker setup with the async dispatcher, the reply descriptor, and the callback wiring shown end to end. Buildschat-api/processingfrom source (functionality not yet in published images).Docs
Companion page added in
flightdeck-docs(concepts/multi-agent.md) on a matchingfeat/multi-agentbranch — illustrates the design choices, the/api/tools/responsecontract, the reply descriptor schema, and configuration.Tests
94/94 processing tests pass; both Java modules compile.
Not included / follow-ups
pending()/asyncToken) is deferred; the example's dispatcher mints the token directly.KafkaEnvPropsTestfiles (a separateKafkaEnvPropsrefactor) are intentionally not part of this branch.🤖 Generated with Claude Code