Skip to content

Commit 6034982

Browse files
committed
feat(sdk,core): Session client SDK + hello-world smoke test
Client-side pair to the Session primitive server PR (TRI-8627). Run-scoped streams.pipe / streams.input are untouched. @trigger.dev/core ApiClient - createSession / retrieveSession / updateSession / closeSession — zodfetch against /api/v1/sessions control plane - listSessions — CursorPagePromise<SessionItem>, follows the runs/waitpoints convention (page[size], page[after], page[before] + filter[*]) - initializeSessionStream — PUT /realtime/v1/sessions/:session/:io, returns S2 creds in headers (feeds StreamsWriterV2 directly) - appendToSessionStream — POST …/append - subscribeToSessionStream — reuses SSEStreamSubscription for SSE subscribes (auto-retry, Last-Event-ID resume, abort propagation), so session subscribers get the exact same semantics as runs.fetchStream. Returns AsyncIterableStream<T>. @trigger.dev/sdk sessions namespace - sessions.create / retrieve / update / close / list — wraps the ApiClient with the standard tracer + accessoryAttributes + mergeRequestOptions. Returns ApiPromise / CursorPagePromise. - sessions.open(id) returns a SessionHandle with .out and .in SessionChannels. Each channel exposes append / send / subscribe / initialize. The handle is polymorphic on friendlyId or externalId. - auth.ts adds the `sessions` permission on PublicTokenPermissionProperties so auth.createPublicToken({ read: { sessions: ["session_abc"] } }) works. Reference - references/hello-world/src/trigger/sessionsSmoke.ts — idempotent Trigger.dev task that exercises every code path (control-plane CRUD, polymorphic lookup, list with tag/type/status/externalId filters, cursor pagination, out.initialize + append + subscribe SSE round-trip, in.send, close + idempotent re-close). Trigger via mcp__trigger__trigger_task(taskId: "sessions-smoke"). Verified live against the local webapp (project hello-world): 10/10 steps pass end-to-end, S2 round-trip returns appended chunks through the shared SSEStreamSubscription pipeline.
1 parent 8e7d068 commit 6034982

5 files changed

Lines changed: 714 additions & 0 deletions

File tree

packages/core/src/v3/apiClient/index.ts

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,16 @@ import {
1313
BatchTriggerTaskV3RequestBody,
1414
BatchTriggerTaskV3Response,
1515
CanceledRunResponse,
16+
CloseSessionRequestBody,
1617
CompleteWaitpointTokenRequestBody,
1718
CompleteWaitpointTokenResponseBody,
19+
CreatedSessionResponseBody,
20+
CreateSessionRequestBody,
21+
ListSessionsOptions,
22+
ListSessionsResponseBody,
23+
ListedSessionItem,
24+
RetrieveSessionResponseBody,
25+
UpdateSessionRequestBody,
1826
CreateBatchRequestBody,
1927
CreateBatchResponse,
2028
CreateEnvironmentVariableRequestBody,
@@ -1095,6 +1103,182 @@ export class ApiClient {
10951103
);
10961104
}
10971105

1106+
// ========================================================================
1107+
// Sessions
1108+
// ========================================================================
1109+
1110+
createSession(body: CreateSessionRequestBody, requestOptions?: ZodFetchOptions) {
1111+
return zodfetch(
1112+
CreatedSessionResponseBody,
1113+
`${this.baseUrl}/api/v1/sessions`,
1114+
{
1115+
method: "POST",
1116+
headers: this.#getHeaders(false),
1117+
body: JSON.stringify(body),
1118+
},
1119+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1120+
);
1121+
}
1122+
1123+
retrieveSession(sessionIdOrExternalId: string, requestOptions?: ZodFetchOptions) {
1124+
return zodfetch(
1125+
RetrieveSessionResponseBody,
1126+
`${this.baseUrl}/api/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}`,
1127+
{
1128+
method: "GET",
1129+
headers: this.#getHeaders(false),
1130+
},
1131+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1132+
);
1133+
}
1134+
1135+
updateSession(
1136+
sessionIdOrExternalId: string,
1137+
body: UpdateSessionRequestBody,
1138+
requestOptions?: ZodFetchOptions
1139+
) {
1140+
return zodfetch(
1141+
RetrieveSessionResponseBody,
1142+
`${this.baseUrl}/api/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}`,
1143+
{
1144+
method: "PATCH",
1145+
headers: this.#getHeaders(false),
1146+
body: JSON.stringify(body),
1147+
},
1148+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1149+
);
1150+
}
1151+
1152+
closeSession(
1153+
sessionIdOrExternalId: string,
1154+
body?: CloseSessionRequestBody,
1155+
requestOptions?: ZodFetchOptions
1156+
) {
1157+
return zodfetch(
1158+
RetrieveSessionResponseBody,
1159+
`${this.baseUrl}/api/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/close`,
1160+
{
1161+
method: "POST",
1162+
headers: this.#getHeaders(false),
1163+
body: JSON.stringify(body ?? {}),
1164+
},
1165+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1166+
);
1167+
}
1168+
1169+
listSessions(
1170+
options?: ListSessionsOptions,
1171+
requestOptions?: ZodFetchOptions
1172+
): CursorPagePromise<typeof ListedSessionItem> {
1173+
const searchParams = createSearchQueryForListSessions(options);
1174+
1175+
return zodfetchCursorPage(
1176+
ListedSessionItem,
1177+
`${this.baseUrl}/api/v1/sessions`,
1178+
{
1179+
query: searchParams,
1180+
limit: options?.limit,
1181+
after: options?.after,
1182+
before: options?.before,
1183+
},
1184+
{
1185+
method: "GET",
1186+
headers: this.#getHeaders(false),
1187+
},
1188+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1189+
);
1190+
}
1191+
1192+
// ========================================================================
1193+
// Session realtime channels
1194+
// ========================================================================
1195+
1196+
async initializeSessionStream(
1197+
sessionIdOrExternalId: string,
1198+
io: "out" | "in",
1199+
requestOptions?: ZodFetchOptions
1200+
) {
1201+
// The server returns S2 credentials in response headers alongside a tiny
1202+
// JSON body with the realtime version. Follow the same shape as
1203+
// `createStream` so downstream clients can feed them into
1204+
// `StreamsWriterV2`.
1205+
return zodfetch(
1206+
CreateStreamResponseBody,
1207+
`${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}`,
1208+
{
1209+
method: "PUT",
1210+
headers: this.#getHeaders(false),
1211+
},
1212+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1213+
)
1214+
.withResponse()
1215+
.then(({ data, response }) => ({
1216+
...data,
1217+
headers: Object.fromEntries(response.headers.entries()),
1218+
}));
1219+
}
1220+
1221+
async appendToSessionStream<TBody extends BodyInit>(
1222+
sessionIdOrExternalId: string,
1223+
io: "out" | "in",
1224+
part: TBody,
1225+
requestOptions?: ZodFetchOptions
1226+
) {
1227+
return zodfetch(
1228+
AppendToStreamResponseBody,
1229+
`${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`,
1230+
{
1231+
method: "POST",
1232+
headers: this.#getHeaders(false),
1233+
body: part,
1234+
},
1235+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1236+
);
1237+
}
1238+
1239+
/**
1240+
* Subscribe to SSE records on a Session channel. Reuses the same
1241+
* {@link SSEStreamSubscription} plumbing as `readStream` for run-scoped
1242+
* realtime streams — auto-retry, Last-Event-ID resume, abort-on-cancel.
1243+
*/
1244+
async subscribeToSessionStream<T = unknown>(
1245+
sessionIdOrExternalId: string,
1246+
io: "out" | "in",
1247+
options?: {
1248+
signal?: AbortSignal;
1249+
baseUrl?: string;
1250+
timeoutInSeconds?: number;
1251+
onComplete?: () => void;
1252+
onError?: (error: Error) => void;
1253+
lastEventId?: string;
1254+
onPart?: (part: SSEStreamPart<T>) => void;
1255+
}
1256+
): Promise<AsyncIterableStream<T>> {
1257+
const url = `${options?.baseUrl ?? this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}`;
1258+
1259+
const subscription = new SSEStreamSubscription(url, {
1260+
headers: this.getHeaders(),
1261+
signal: options?.signal,
1262+
onComplete: options?.onComplete,
1263+
onError: options?.onError,
1264+
timeoutInSeconds: options?.timeoutInSeconds,
1265+
lastEventId: options?.lastEventId,
1266+
});
1267+
1268+
const stream = await subscription.subscribe();
1269+
const onPart = options?.onPart;
1270+
1271+
return stream.pipeThrough(
1272+
new TransformStream<SSEStreamPart, T>({
1273+
transform(chunk, controller) {
1274+
const data = chunk.chunk as T;
1275+
onPart?.(chunk as SSEStreamPart<T>);
1276+
controller.enqueue(data);
1277+
},
1278+
})
1279+
);
1280+
}
1281+
10981282
async waitForDuration(
10991283
runId: string,
11001284
body: WaitForDurationRequestBody,
@@ -1836,6 +2020,47 @@ function queueNameFromQueueTypeName(queue: QueueTypeName): string {
18362020
return queue.name;
18372021
}
18382022

2023+
function createSearchQueryForListSessions(options?: ListSessionsOptions): URLSearchParams {
2024+
const searchParams = new URLSearchParams();
2025+
2026+
if (!options) return searchParams;
2027+
2028+
const appendMany = (name: string, value: string | string[] | undefined) => {
2029+
if (value === undefined) return;
2030+
searchParams.append(name, Array.isArray(value) ? value.join(",") : value);
2031+
};
2032+
2033+
appendMany("filter[type]", options.type);
2034+
appendMany("filter[tags]", options.tag);
2035+
appendMany("filter[taskIdentifier]", options.taskIdentifier);
2036+
2037+
if (options.externalId) {
2038+
searchParams.append("filter[externalId]", options.externalId);
2039+
}
2040+
2041+
appendMany("filter[status]", options.status as string | string[] | undefined);
2042+
2043+
if (options.period) {
2044+
searchParams.append("filter[createdAt][period]", options.period);
2045+
}
2046+
2047+
if (options.from !== undefined) {
2048+
searchParams.append(
2049+
"filter[createdAt][from]",
2050+
options.from instanceof Date ? options.from.getTime().toString() : options.from.toString()
2051+
);
2052+
}
2053+
2054+
if (options.to !== undefined) {
2055+
searchParams.append(
2056+
"filter[createdAt][to]",
2057+
options.to instanceof Date ? options.to.getTime().toString() : options.to.toString()
2058+
);
2059+
}
2060+
2061+
return searchParams;
2062+
}
2063+
18392064
function createSearchQueryForListWaitpointTokens(
18402065
query?: ListWaitpointTokensQueryParams
18412066
): URLSearchParams {

packages/trigger-sdk/src/v3/auth.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,17 @@ type PublicTokenPermissionProperties = {
6767
* Grant access to send data to input streams on specific runs
6868
*/
6969
inputStreams?: string | string[];
70+
71+
/**
72+
* Grant access to specific Sessions (the durable, typed I/O primitive that
73+
* outlives a single run). Use the session's friendlyId (e.g. `session_abc`).
74+
*
75+
* `read:sessions:{id}` lets the bearer read both the `.out` and `.in`
76+
* channels and list runs on the session. `write:sessions:{id}` lets the
77+
* bearer append to the session's channels. `trigger:sessions:{id}` permits
78+
* triggering new runs on the session.
79+
*/
80+
sessions?: string | string[];
7081
};
7182

7283
export type PublicTokenPermissions = {

packages/trigger-sdk/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export * from "./otel.js";
1717
export * from "./schemas.js";
1818
export * from "./heartbeats.js";
1919
export * from "./streams.js";
20+
export * from "./sessions.js";
2021
export * from "./query.js";
2122
export type { Context };
2223

0 commit comments

Comments
 (0)