Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions .github/workflows/multi-agent-integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
name: Multi-Agent Integration

# End-to-end test of the examples/multi-agent-setup stack: build the images,
# send a message to orchestrator-api, and verify the orchestrator → worker →
# callback round-trip.
#
# Runs post-merge on main (and on manual dispatch) — not as a PR gate. It hits
# the real Claude API, so it needs the CLAUDE_API_KEY repo secret; if the secret
# is unset the job soft-skips rather than failing main.

on:
push:
branches: [main]
paths:
- examples/multi-agent-setup/**
- api/**
- frontend/**
- processor-apps/processing/**
- think/think-consumer/**
- .github/workflows/multi-agent-integration.yml
workflow_dispatch:

jobs:
e2e:
name: Orchestrator → Worker round-trip
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- uses: actions/checkout@v4

- name: Check for CLAUDE_API_KEY secret
id: guard
env:
CLAUDE_API_KEY: ${{ secrets.CLAUDE_API_KEY }}
run: |
if [ -z "$CLAUDE_API_KEY" ]; then
echo "::warning::CLAUDE_API_KEY secret not set — skipping integration test"
echo "skip=true" >> "$GITHUB_OUTPUT"
fi

- name: Run integration test
if: steps.guard.outputs.skip != 'true'
working-directory: examples/multi-agent-setup
env:
CLAUDE_API_KEY: ${{ secrets.CLAUDE_API_KEY }}
TIMEOUT: "300"
run: ./integration-test.sh

- name: Tear down (backstop)
if: always()
working-directory: examples/multi-agent-setup
run: docker compose -p multiagent-it down -v || true
4 changes: 3 additions & 1 deletion api/chat-api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ WORKDIR /app
COPY pom.xml .
RUN mvn dependency:go-offline -q
COPY src ./src
RUN mvn package -q -DskipTests
# Skip test compilation+execution for the image build (tests run in CI via
# `mvn test`); keeps the image build decoupled from test sources.
RUN mvn package -q -Dmaven.test.skip=true

FROM eclipse-temurin:17-jre
WORKDIR /app
Expand Down
19 changes: 19 additions & 0 deletions api/chat-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,29 @@
<artifactId>Java-WebSocket</artifactId>
<version>1.5.7</version>
</dependency>

<!-- Testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.25.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
81 changes: 81 additions & 0 deletions api/chat-api/src/main/java/io/flightdeck/api/CallbackRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.flightdeck.api;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

/**
* Resolves a logical callback-service name to the trusted base URL configured for
* it, then appends the fixed callback path ({@value #CALLBACK_PATH}).
*
* <p>The mapping is loaded once from the {@code ALLOWED_HOST_MAPPING} environment
* variable: a comma-separated list of {@code name:baseUrl} entries. Only the first
* colon of each entry separates the name from the URL, so the base URL keeps its
* own {@code scheme://host[:port]} colons:
*
* <pre>
* ALLOWED_HOST_MAPPING=my-agent-a:https://hosta.local,my-agent-c:http://hostc.local
* </pre>
*
* <p><b>Why this exists.</b> A caller (a peer agent) supplies only the service
* <em>name</em> in its {@code reply} descriptor; the destination URL is chosen
* here from operator-controlled config, never from caller input. An untrusted
* caller therefore cannot steer the server-side callback at an arbitrary host —
* the SSRF primitive that an attacker-controlled {@code endpoint} would create is
* structurally removed. Unknown names fail closed.
*/
final class CallbackRegistry {

/** Fixed path appended to every resolved base URL. */
static final String CALLBACK_PATH = "/api/tools/response";

private static final Map<String, String> MAPPING =
parse(ChatApiApp.env("ALLOWED_HOST_MAPPING", ""));

private CallbackRegistry() {}

/** True if {@code service} resolves to a configured base URL. */
static boolean isKnown(String service) {
return service != null && MAPPING.containsKey(service);
}

/**
* Resolves the full callback URL for a service name.
*
* @throws IllegalArgumentException if the name is not configured (fail closed)
*/
static String resolve(String service) {
String base = service == null ? null : MAPPING.get(service);
if (base == null) {
throw new IllegalArgumentException("unknown callbackService: " + service);
}
return toCallbackUrl(base);
}

/** Strips any trailing slash from the base URL and appends the fixed callback path. */
static String toCallbackUrl(String base) {
String trimmed = base.endsWith("/") ? base.substring(0, base.length() - 1) : base;
return trimmed + CALLBACK_PATH;
}

/** Parses {@code name:baseUrl} entries, splitting each on its FIRST colon only. */
static Map<String, String> parse(String raw) {
Map<String, String> mapping = new LinkedHashMap<>();
if (raw == null || raw.isBlank()) {
return Collections.unmodifiableMap(mapping);
}
for (String entry : raw.split(",")) {
String e = entry.trim();
if (e.isEmpty()) continue;
int sep = e.indexOf(':');
String name = sep > 0 ? e.substring(0, sep).trim() : "";
String url = sep > 0 ? e.substring(sep + 1).trim() : "";
if (name.isEmpty() || url.isEmpty()) {
throw new IllegalArgumentException(
"Malformed ALLOWED_HOST_MAPPING entry (expected name:baseUrl): " + entry);
}
mapping.put(name, url);
}
return Collections.unmodifiableMap(mapping);
}
}
23 changes: 19 additions & 4 deletions api/chat-api/src/main/java/io/flightdeck/api/ChatApiApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,24 @@ public class ChatApiApp {
private static final int WS_PORT = Integer.parseInt(env("WS_PORT", "8001"));

public static void main(String[] args) throws Exception {
// 1. Kafka producer (chat → message-input)
// 1. Kafka producers (chat → message-input, async callbacks → tool-use-result,
// multi-agent reply routes → reply-to)
KafkaMessageProducer producer = new KafkaMessageProducer();
ToolResultProducer toolResultProducer = new ToolResultProducer();
ReplyToProducer replyToProducer = new ReplyToProducer();

// Shared secret for verifying async tool callback tokens. Optional —
// if unset, /api/tools/response rejects every callback.
String callbackSecret = env("TOOL_CALLBACK_SECRET", "");
if (callbackSecret.isBlank()) {
log.warn("TOOL_CALLBACK_SECRET is not set — /api/tools/response will reject all callbacks");
}
Comment thread
tsuz marked this conversation as resolved.

// 2. HTTP server for REST API
HttpServer httpServer = HttpServer.create(new InetSocketAddress(PORT), 0);
httpServer.createContext("/api/chat", new ChatHandler(producer));
httpServer.createContext("/api/chat", new ChatHandler(producer, replyToProducer));
httpServer.createContext("/api/tools/response",
new ToolResponseHandler(toolResultProducer, callbackSecret));
httpServer.setExecutor(null);
httpServer.start();
log.info("HTTP server started on port {}", PORT);
Expand All @@ -35,8 +47,9 @@ public static void main(String[] args) throws Exception {
ChatWebSocketServer wsServer = new ChatWebSocketServer(WS_PORT);
wsServer.start();

// 4. Kafka consumer (message-output → WebSocket chat response)
OutputConsumer outputConsumer = new OutputConsumer(wsServer);
// 4. Kafka consumer (message-output → WebSocket chat response, or → HTTP
// callback for sessions that carry a reply-to descriptor)
OutputConsumer outputConsumer = new OutputConsumer(wsServer, replyToProducer);
Thread outputThread = new Thread(outputConsumer, "output-consumer");
outputThread.setDaemon(true);
outputThread.start();
Expand All @@ -55,6 +68,8 @@ public static void main(String[] args) throws Exception {
try { wsServer.stop(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
httpServer.stop(2);
producer.close();
toolResultProducer.close();
replyToProducer.close();
}));

log.info("Chat API ready — HTTP={} WS={}", PORT, WS_PORT);
Expand Down
41 changes: 40 additions & 1 deletion api/chat-api/src/main/java/io/flightdeck/api/ChatHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,33 @@
* { "session_id": "...", "user_id": "user_42", "role": "user",
* "content": "hello", "timestamp": "...",
* "metadata": { "locale": "en-US", "client": "web" } }
*
* <p>For multi-agent calls the request may also carry a transport-level
* {@code reply} descriptor naming where this session's terminal response should
* be delivered, e.g.:
* <pre>
* { "session_id": "...", "content": "...",
* "reply": { "callbackService": "my-agent-a", "bearerToken": "&lt;HMAC&gt;" } }
* </pre>
* {@code callbackService} is a logical name resolved server-side against
* {@code ALLOWED_HOST_MAPPING} ({@link CallbackRegistry}); the caller never
* supplies a URL, so the descriptor cannot steer the callback at an arbitrary
* host. Unknown names are rejected here with a 400.
* The {@code reply} object is NOT placed into the message content/metadata — it
* is written to the reply-to topic (keyed by session_id) so it never reaches the
* LLM. The agent processes the request as an ordinary chat.
*/
public class ChatHandler implements HttpHandler {

private static final Logger log = LoggerFactory.getLogger(ChatHandler.class);
private static final ObjectMapper mapper = new ObjectMapper();

private final KafkaMessageProducer producer;
private final ReplyToProducer replyToProducer;

public ChatHandler(KafkaMessageProducer producer) {
public ChatHandler(KafkaMessageProducer producer, ReplyToProducer replyToProducer) {
this.producer = producer;
this.replyToProducer = replyToProducer;
}

@Override
Expand All @@ -59,6 +76,28 @@ public void handle(HttpExchange exchange) throws IOException {
String sessionId = requireField(body, "session_id");
String content = requireField(body, "content");

// Transport-level reply routing (multi-agent). Written to the reply-to
// topic keyed by session_id; never placed into the message content.
if (body.hasNonNull("reply")) {
JsonNode reply = body.get("reply");
if (!reply.isObject()) {
throw new IllegalArgumentException("'reply' must be an object");
}
String callbackService = reply.path("callbackService").asText("");
if (callbackService.isBlank()) {
throw new IllegalArgumentException("'reply' must include 'callbackService'");
}
// Fail closed: reject a descriptor naming a service this agent is not
// configured to call back, so unroutable routes never reach the
// reply-to topic and the caller gets an immediate 400.
if (!CallbackRegistry.isKnown(callbackService)) {
throw new IllegalArgumentException("unknown callbackService: " + callbackService);
}
replyToProducer.send(sessionId, mapper.writeValueAsString(reply));
log.info("[{}] Stored reply-to descriptor (callbackService={})",
sessionId, callbackService);
}
Comment thread
tsuz marked this conversation as resolved.

// Build the full message-input payload
ObjectNode message = mapper.createObjectNode();
message.put("session_id", sessionId);
Expand Down
Loading
Loading