Skip to content

Commit 1e16248

Browse files
committed
feat(sdk,core): Session channel SDK toolkits + waitpoints — client side
Build the client-side half of the Session channel extensions that the sessions PR shipped on the server. Pairs with POST /api/v1/runs/:runFriendlyId/session-streams/wait and the append-fires-waitpoints wiring on the session append handler. Extend SessionHandle with two asymmetric channels mirroring the run-scoped streams primitives: - .in (SessionInputChannel) mirrors streams.input. on / once / peek / wait / waitWithIdleTimeout for the task to consume; send for external clients to produce. .wait / .waitWithIdleTimeout suspend the run on a session-stream waitpoint; it resumes when a record lands on .in, same semantics as streams.input.wait on a run-scoped input stream. - .out (SessionOutputChannel) mirrors streams.define. append / pipe / writer for the task to produce records — all three route through SessionStreamInstance -> StreamsWriterV2 for uniform parsed-object serialization on the subscribe side. read returns an SSE subscription for external consumers. The two channels are disjoint classes with zero overlapping methods. SessionHandle is { id, in, out } so directional tags stay at every call site. No public initialize() — S2 credentials are an internal detail of pipe / writer. Core - StandardSessionStreamManager + sessionStreams global: SSE-backed tail with once/on/peek buffering, auto-reconnect, lastSeqNum resume. Keyed on {sessionId, io}. Registered in dev- and managed- run workers; taskExecutor clears handlers at run end alongside input streams. - SessionStreamInstance: S2-only parallel of StreamInstance. Fetches session S2 creds via initializeSessionStream and pipes through StreamsWriterV2. - ApiClient.createSessionStreamWaitpoint — calls the new server route. Reference - references/hello-world/src/trigger/sessionsSmoke.ts now exercises .out.writer alongside .out.append. - references/hello-world/src/trigger/sessionsWaitSmoke.ts (new) — end-to-end waitpoint validation. Orchestrator suspends on .in.waitWithIdleTimeout; a delayed sender task fires the waitpoint via .in.send; orchestrator resumes with the payload. match: true.
1 parent 6034982 commit 1e16248

19 files changed

Lines changed: 1266 additions & 75 deletions

File tree

.changeset/session-sdk-toolkit.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
"@trigger.dev/core": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Extend `SessionHandle` with two asymmetric channels mirroring the run-scoped streams primitives:
7+
8+
- `.in` (`SessionInputChannel`) mirrors `streams.input``on` / `once` / `peek` / `wait` / `waitWithIdleTimeout` for the task to consume, `send` for external clients to produce. `.wait` / `.waitWithIdleTimeout` suspend the run on a session-stream waitpoint; the run resumes when a record lands on `.in`.
9+
- `.out` (`SessionOutputChannel`) mirrors `streams.define``append` / `pipe` / `writer` for the task to produce records (all route through direct-to-S2 for uniform parsed-object serialization), plus `read` for external SSE subscribers.
10+
11+
Adds the `sessionStreams` global + `StandardSessionStreamManager` (SSE-backed tail + buffer keyed on `{sessionId, io}`, registered in dev/managed run workers), `SessionStreamInstance` for direct-to-S2 piping, and `ApiClient.createSessionStreamWaitpoint` wiring.

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {
3434
heartbeats,
3535
realtimeStreams,
3636
inputStreams,
37+
sessionStreams,
3738
} from "@trigger.dev/core/v3";
3839
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
3940
import {
@@ -61,6 +62,7 @@ import {
6162
StandardHeartbeatsManager,
6263
StandardRealtimeStreamsManager,
6364
StandardInputStreamManager,
65+
StandardSessionStreamManager,
6466
} from "@trigger.dev/core/v3/workers";
6567
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
6668
import { readFile } from "node:fs/promises";
@@ -170,6 +172,14 @@ const standardInputStreamManager = new StandardInputStreamManager(
170172
);
171173
inputStreams.setGlobalManager(standardInputStreamManager);
172174

175+
const standardSessionStreamManager = new StandardSessionStreamManager(
176+
apiClientManager.clientOrThrow(),
177+
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
178+
(getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
179+
false
180+
);
181+
sessionStreams.setGlobalManager(standardSessionStreamManager);
182+
173183
const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000);
174184
const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs);
175185
waitUntil.setGlobalManager(waitUntilManager);

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
heartbeats,
3434
realtimeStreams,
3535
inputStreams,
36+
sessionStreams,
3637
} from "@trigger.dev/core/v3";
3738
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
3839
import {
@@ -61,6 +62,7 @@ import {
6162
StandardHeartbeatsManager,
6263
StandardRealtimeStreamsManager,
6364
StandardInputStreamManager,
65+
StandardSessionStreamManager,
6466
} from "@trigger.dev/core/v3/workers";
6567
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
6668
import { readFile } from "node:fs/promises";
@@ -150,6 +152,14 @@ const standardInputStreamManager = new StandardInputStreamManager(
150152
);
151153
inputStreams.setGlobalManager(standardInputStreamManager);
152154

155+
const standardSessionStreamManager = new StandardSessionStreamManager(
156+
apiClientManager.clientOrThrow(),
157+
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
158+
(getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
159+
false
160+
);
161+
sessionStreams.setGlobalManager(standardSessionStreamManager);
162+
153163
const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000);
154164
const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs);
155165
waitUntil.setGlobalManager(waitUntilManager);

packages/core/src/v3/apiClient/index.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import {
2828
CreateEnvironmentVariableRequestBody,
2929
CreateInputStreamWaitpointRequestBody,
3030
CreateInputStreamWaitpointResponseBody,
31+
CreateSessionStreamWaitpointRequestBody,
32+
CreateSessionStreamWaitpointResponseBody,
3133
CreateScheduleOptions,
3234
CreateStreamResponseBody,
3335
CreateUploadPayloadUrlResponseBody,
@@ -1656,6 +1658,23 @@ export class ApiClient {
16561658
);
16571659
}
16581660

1661+
async createSessionStreamWaitpoint(
1662+
runFriendlyId: string,
1663+
body: CreateSessionStreamWaitpointRequestBody,
1664+
requestOptions?: ZodFetchOptions
1665+
) {
1666+
return zodfetch(
1667+
CreateSessionStreamWaitpointResponseBody,
1668+
`${this.baseUrl}/api/v1/runs/${runFriendlyId}/session-streams/wait`,
1669+
{
1670+
method: "POST",
1671+
headers: this.#getHeaders(false),
1672+
body: JSON.stringify(body),
1673+
},
1674+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1675+
);
1676+
}
1677+
16591678
async generateJWTClaims(requestOptions?: ZodFetchOptions): Promise<Record<string, any>> {
16601679
return zodfetch(
16611680
z.record(z.any()),

packages/core/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export * from "./locals-api.js";
2121
export * from "./heartbeats-api.js";
2222
export * from "./realtime-streams-api.js";
2323
export * from "./input-streams-api.js";
24+
export * from "./session-streams-api.js";
2425
export * from "./waitpoints/index.js";
2526
export * from "./schemas/index.js";
2627
export { SemanticInternalAttributes } from "./semanticInternalAttributes.js";

packages/core/src/v3/realtime-streams-api.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@ import { RealtimeStreamsAPI } from "./realtimeStreams/index.js";
55
export const realtimeStreams = RealtimeStreamsAPI.getInstance();
66

77
export * from "./realtimeStreams/types.js";
8+
export { SessionStreamInstance } from "./realtimeStreams/sessionStreamInstance.js";
9+
export type { SessionStreamInstanceOptions } from "./realtimeStreams/sessionStreamInstance.js";

packages/core/src/v3/realtimeStreams/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ import {
66
RealtimeStreamsManager,
77
} from "./types.js";
88

9+
// Re-export the session-scoped stream instance so the SDK's
10+
// `SessionOutputChannel.pipe` / `.writer` can construct it without reaching
11+
// into the core package's internals.
12+
export { SessionStreamInstance } from "./sessionStreamInstance.js";
13+
export type { SessionStreamInstanceOptions } from "./sessionStreamInstance.js";
14+
915
const API_NAME = "realtime-streams";
1016

1117
const NOOP_MANAGER = new NoopRealtimeStreamsManager();
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import { ApiClient } from "../apiClient/index.js";
2+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
3+
import { AnyZodFetchOptions } from "../zodfetch.js";
4+
import { StreamsWriterV2 } from "./streamsWriterV2.js";
5+
import { StreamsWriter, StreamWriteResult } from "./types.js";
6+
7+
export type SessionStreamInstanceOptions<T> = {
8+
apiClient: ApiClient;
9+
baseUrl: string;
10+
sessionId: string;
11+
io: "out" | "in";
12+
source: ReadableStream<T>;
13+
signal?: AbortSignal;
14+
requestOptions?: AnyZodFetchOptions;
15+
debug?: boolean;
16+
};
17+
18+
/**
19+
* Session-scoped parallel to {@link StreamInstance}. Calls
20+
* `initializeSessionStream` to fetch S2 credentials for the session's
21+
* channel, then pipes `source` directly to S2 via {@link StreamsWriterV2}.
22+
*
23+
* Sessions are S2-only — there's no v1 (Redis) fallback — so this
24+
* skips the version-detection dance `StreamInstance` does.
25+
*/
26+
export class SessionStreamInstance<T> implements StreamsWriter {
27+
private streamPromise: Promise<StreamsWriterV2<T>>;
28+
29+
constructor(private options: SessionStreamInstanceOptions<T>) {
30+
this.streamPromise = this.initializeWriter();
31+
}
32+
33+
private async initializeWriter(): Promise<StreamsWriterV2<T>> {
34+
const response = await this.options.apiClient.initializeSessionStream(
35+
this.options.sessionId,
36+
this.options.io,
37+
this.options?.requestOptions
38+
);
39+
40+
const headers = response.headers ?? {};
41+
const accessToken = headers["x-s2-access-token"];
42+
const basin = headers["x-s2-basin"];
43+
const streamName = headers["x-s2-stream-name"];
44+
const endpoint = headers["x-s2-endpoint"];
45+
const flushIntervalMs = headers["x-s2-flush-interval-ms"]
46+
? parseInt(headers["x-s2-flush-interval-ms"])
47+
: undefined;
48+
const maxRetries = headers["x-s2-max-retries"]
49+
? parseInt(headers["x-s2-max-retries"])
50+
: undefined;
51+
52+
if (!accessToken || !basin || !streamName) {
53+
throw new Error(
54+
"Session stream initialize did not return S2 credentials — server may be configured for v1 realtime streams, which sessions do not support."
55+
);
56+
}
57+
58+
return new StreamsWriterV2({
59+
basin,
60+
stream: streamName,
61+
accessToken,
62+
endpoint,
63+
source: this.options.source,
64+
signal: this.options.signal,
65+
debug: this.options.debug,
66+
flushIntervalMs,
67+
maxRetries,
68+
});
69+
}
70+
71+
public async wait(): Promise<StreamWriteResult> {
72+
const writer = await this.streamPromise;
73+
return writer.wait();
74+
}
75+
76+
public get stream(): AsyncIterableStream<T> {
77+
const self = this;
78+
79+
return new ReadableStream<T>({
80+
async start(controller) {
81+
const streamWriter = await self.streamPromise;
82+
83+
const iterator = streamWriter[Symbol.asyncIterator]();
84+
85+
while (true) {
86+
if (self.options.signal?.aborted) {
87+
controller.close();
88+
break;
89+
}
90+
91+
const { done, value } = await iterator.next();
92+
93+
if (done) {
94+
controller.close();
95+
break;
96+
}
97+
98+
controller.enqueue(value);
99+
}
100+
},
101+
});
102+
}
103+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Split module-level variable definition into separate files to allow
2+
// tree-shaking on each api instance.
3+
import { SessionStreamsAPI } from "./sessionStreams/index.js";
4+
5+
export const sessionStreams = SessionStreamsAPI.getInstance();
6+
7+
export * from "./sessionStreams/types.js";
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { getGlobal, registerGlobal } from "../utils/globals.js";
2+
import { NoopSessionStreamManager } from "./noopManager.js";
3+
import {
4+
InputStreamOncePromise,
5+
SessionChannelIO,
6+
SessionStreamManager,
7+
} from "./types.js";
8+
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
9+
10+
const API_NAME = "session-streams";
11+
12+
const NOOP_MANAGER = new NoopSessionStreamManager();
13+
14+
export class SessionStreamsAPI implements SessionStreamManager {
15+
private static _instance?: SessionStreamsAPI;
16+
17+
private constructor() {}
18+
19+
public static getInstance(): SessionStreamsAPI {
20+
if (!this._instance) {
21+
this._instance = new SessionStreamsAPI();
22+
}
23+
return this._instance;
24+
}
25+
26+
setGlobalManager(manager: SessionStreamManager): boolean {
27+
return registerGlobal(API_NAME, manager);
28+
}
29+
30+
#getManager(): SessionStreamManager {
31+
return getGlobal(API_NAME) ?? NOOP_MANAGER;
32+
}
33+
34+
public on(
35+
sessionId: string,
36+
io: SessionChannelIO,
37+
handler: (data: unknown) => void | Promise<void>
38+
): { off: () => void } {
39+
return this.#getManager().on(sessionId, io, handler);
40+
}
41+
42+
public once(
43+
sessionId: string,
44+
io: SessionChannelIO,
45+
options?: InputStreamOnceOptions
46+
): InputStreamOncePromise<unknown> {
47+
return this.#getManager().once(sessionId, io, options);
48+
}
49+
50+
public peek(sessionId: string, io: SessionChannelIO): unknown | undefined {
51+
return this.#getManager().peek(sessionId, io);
52+
}
53+
54+
public lastSeqNum(sessionId: string, io: SessionChannelIO): number | undefined {
55+
return this.#getManager().lastSeqNum(sessionId, io);
56+
}
57+
58+
public setLastSeqNum(sessionId: string, io: SessionChannelIO, seqNum: number): void {
59+
this.#getManager().setLastSeqNum(sessionId, io, seqNum);
60+
}
61+
62+
public shiftBuffer(sessionId: string, io: SessionChannelIO): boolean {
63+
return this.#getManager().shiftBuffer(sessionId, io);
64+
}
65+
66+
public disconnectStream(sessionId: string, io: SessionChannelIO): void {
67+
this.#getManager().disconnectStream(sessionId, io);
68+
}
69+
70+
public clearHandlers(): void {
71+
this.#getManager().clearHandlers();
72+
}
73+
74+
public reset(): void {
75+
this.#getManager().reset();
76+
}
77+
78+
public disconnect(): void {
79+
this.#getManager().disconnect();
80+
}
81+
}

0 commit comments

Comments
 (0)