Skip to content

Commit c140c3e

Browse files
committed
fix(sdk,chat): route pipeChat through session.out + chat-agent smoke test
pipeChat (the internal that auto-pipes a chat.agent's returned streamText result to the chat output) was still calling streams.pipe(CHAT_STREAM_KEY, stream) — a run-scoped run-streams path. After the session migration, the module-level facades (chatStream, messagesInput, stopInput) routed correctly, but pipeChat bypassed the facade and went straight to the old run-scoped pipe. Result: the turn-complete control chunk reached the session.out subscriber (written via chatStream.writer in writeTurnCompleteChunk) but every streamed UIMessageChunk from the LLM's turn was written to the dead run-scoped stream and never surfaced on session.out. Swap the pipe target to chatStream.pipe (the session-routed facade). The target / streamKey options still type-check for API parity but are no longer meaningful — sessions are the address, and sub-agents that need to write to a parent chat open the parent's Session explicitly. Smoke now catches all 14 chunks (start / start-step / text-start / 7x text-delta / text-end / finish-step / finish / trigger:turn-complete) with ids 0 through 13 from session.out, match: true. Also adds references/hello-world/src/trigger/chatAgentSmoke.ts — end-to-end validation: - sessions.create with externalId = chatId - trigger test-agent with {chatId, sessionId, messages, …} - handle.out.read(...) SSE subscribe, capture chunks by id+type - sessions.close on completion Triggered from the dashboard or MCP as chat-agent-smoke. Requires OPENAI_API_KEY in the dev env (the test-agent uses openai:gpt-4o-mini).
1 parent 3138f89 commit c140c3e

2 files changed

Lines changed: 126 additions & 7 deletions

File tree

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2687,7 +2687,6 @@ async function pipeChat(
26872687
options?: PipeChatOptions
26882688
): Promise<void> {
26892689
locals.set(chatPipeCountKey, (locals.get(chatPipeCountKey) ?? 0) + 1);
2690-
const streamKey = options?.streamKey ?? CHAT_STREAM_KEY;
26912690

26922691
let stream: AsyncIterable<unknown> | ReadableStream<unknown>;
26932692

@@ -2702,18 +2701,27 @@ async function pipeChat(
27022701
);
27032702
}
27042703

2705-
const pipeOptions: PipeStreamOptions = {};
2704+
const pipeOptions: SessionPipeStreamOptions = {};
27062705
if (options?.signal) {
27072706
pipeOptions.signal = options.signal;
27082707
}
2709-
if (options?.target) {
2710-
pipeOptions.target = options.target;
2711-
}
27122708
if (options?.spanName) {
27132709
pipeOptions.spanName = options.spanName;
27142710
}
2715-
2716-
const { waitUntilComplete } = streams.pipe(streamKey, stream, pipeOptions);
2711+
// `options.target` / `options.streamKey` are accepted for API parity
2712+
// with the pre-migration run-scoped pipe but no longer have meaning —
2713+
// sessions are the address (single stream per session, no sub-run
2714+
// targeting). Sub-agents that need to write into a parent's chat now
2715+
// open that session explicitly via `sessions.open(parentSessionId).out.pipe`.
2716+
2717+
// The generic is typed for `UIMessageChunk`, but `pipeChat` also
2718+
// accepts opaque UIMessageStreamable / raw iterables whose element
2719+
// type we don't know at compile time. Cast — runtime behaviour is
2720+
// identical (bytes go to session.out either way).
2721+
const { waitUntilComplete } = chatStream.pipe(
2722+
stream as ReadableStream<UIMessageChunk> | AsyncIterable<UIMessageChunk>,
2723+
pipeOptions
2724+
);
27172725
await waitUntilComplete();
27182726
}
27192727

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { logger, sessions, task, tasks } from "@trigger.dev/sdk";
2+
3+
/**
4+
* End-to-end smoke test for the chat.agent -> Sessions migration.
5+
*
6+
* Flow:
7+
* 1. Create a Session with a deterministic externalId so the
8+
* `test-agent` run can `sessions.open(...)` it on startup.
9+
* 2. Trigger `test-agent` with `{chatId, sessionId, messages, trigger,
10+
* metadata}` — mirrors what TriggerChatTransport would send for a
11+
* first message, minus the browser-triggered access token layer.
12+
* 3. `session.out.read({...})` — consume the agent's UIMessageChunks
13+
* as they stream out. Bail after the first text-delta (good
14+
* enough to prove output flow + SSE subscription).
15+
* 4. `sessions.close(...)` — tidy up.
16+
*
17+
* Trigger from the dashboard or MCP:
18+
*
19+
* mcp__trigger__trigger_task(taskId: "chat-agent-smoke", payload: {})
20+
*
21+
* Expects OPENAI_API_KEY set in the env (the test-agent uses
22+
* `openai:gpt-4o-mini`). If the key is missing the smoke reports an
23+
* error payload without crashing.
24+
*/
25+
export const chatAgentSmoke = task({
26+
id: "chat-agent-smoke",
27+
run: async () => {
28+
const stamp = Date.now();
29+
const chatId = `chat-agent-smoke-${stamp}`;
30+
31+
logger.info("creating chat.agent backing session", { externalId: chatId });
32+
const session = await sessions.create({
33+
type: "chat.agent",
34+
externalId: chatId,
35+
tags: ["chat-agent-smoke"],
36+
});
37+
38+
logger.info("triggering test-agent run", {
39+
chatId,
40+
sessionId: session.id,
41+
});
42+
await tasks.trigger("test-agent", {
43+
chatId,
44+
sessionId: session.id,
45+
trigger: "submit-message",
46+
messages: [
47+
{
48+
id: `m-${stamp}`,
49+
role: "user",
50+
parts: [{ type: "text", text: "Say hello in five words." }],
51+
},
52+
],
53+
metadata: { userId: "smoke", model: "openai:gpt-4o-mini" },
54+
});
55+
56+
logger.info("subscribing to session.out, waiting for first chunks");
57+
const handle = sessions.open(session.id);
58+
const controller = new AbortController();
59+
const timeout = setTimeout(() => controller.abort(), 60_000);
60+
61+
const received: Array<{ type?: string; id?: string }> = [];
62+
let firstTextDelta: string | undefined;
63+
let turnCompleteSeen = false;
64+
65+
try {
66+
const stream = await handle.out.read<Record<string, unknown>>({
67+
signal: controller.signal,
68+
timeoutInSeconds: 30,
69+
// Start from seq 0 so we don't race the agent's early writes.
70+
lastEventId: "-1",
71+
onPart: (part) => {
72+
// Record the event id alongside the chunk so we can see the
73+
// full sequence that came down the wire.
74+
received.push({
75+
id: part.id,
76+
type: (part.chunk as { type?: string } | null)?.type,
77+
});
78+
},
79+
});
80+
81+
for await (const chunk of stream) {
82+
if (chunk.type === "text-delta" && typeof chunk.delta === "string") {
83+
firstTextDelta ??= chunk.delta;
84+
}
85+
if (chunk.type === "trigger:turn-complete") {
86+
turnCompleteSeen = true;
87+
break;
88+
}
89+
if (received.length > 500) break;
90+
}
91+
} catch (err) {
92+
if ((err as Error).name !== "AbortError") throw err;
93+
} finally {
94+
clearTimeout(timeout);
95+
}
96+
97+
await sessions.close(session.id, { reason: "chat-agent-smoke-done" });
98+
99+
return {
100+
ok: received.length > 0,
101+
chatId,
102+
sessionId: session.id,
103+
chunkCount: received.length,
104+
firstTextDelta,
105+
turnCompleteSeen,
106+
types: [...new Set(received.map((c) => c.type ?? "<unknown>"))],
107+
firstFiveIds: received.slice(0, 5).map((c) => `${c.id}:${c.type ?? "<u>"}`),
108+
lastFiveIds: received.slice(-5).map((c) => `${c.id}:${c.type ?? "<u>"}`),
109+
};
110+
},
111+
});

0 commit comments

Comments
 (0)