-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: Sessions - bidirectional durable agent streams #3417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ericallam
merged 23 commits into
main
from
feature/tri-8627-session-primitive-server-side-schema-routes-clickhouse
Apr 28, 2026
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
edded41
feat(webapp,clickhouse,database,core): Session primitive (server side)
ericallam f6149c9
code review fixes
ericallam a724f9c
fix(core): reject externalId starting with 'session_' on Session crea…
ericallam fceabc1
fix(webapp): allow JWT + CORS on sessions list endpoint
ericallam 2f8903a
fix(webapp): tighten sessions create + list auth
ericallam aaab958
feat(webapp,core): Session channel waitpoints — server side
ericallam 4f2c0e7
fix(webapp): CORS + allowJWT on public session create + append preflight
ericallam 84d3db1
fix(webapp): address #3417 PR review feedback
ericallam b6e642f
fix(webapp): correct backward pagination slice on session list
ericallam 4453a45
feat(webapp): session.out wait=0 + X-Session-Settled on settled tail
ericallam 71f02ce
x-peek-settled header
ericallam 427541c
feat(webapp,db,core): Sessions become run manager
ericallam a349d02
fix(webapp): address #3417 PR review feedback
ericallam c397160
fix(webapp): address #3417 PR review feedback (round 2)
ericallam 4b15c7d
chore(server-changes): consolidate sessions PR into one entry
ericallam c0e87bf
fix(webapp): use prisma writer for read-after-write of triggered run …
ericallam e5f9dd1
fix(webapp): use prisma writer for post-race re-read of session row i…
ericallam 554940d
fix(webapp): parse stringified chunk envelope in peek-settled fast path
ericallam cf67175
fix(webapp): use prisma writer for cancelLostRaceRun's just-triggered…
ericallam faf7888
fix(webapp): reject create on closed sessions with 409
ericallam 1a880fd
fix(webapp): hardcode v2 for session-streams wait race-check
ericallam 0caa55a
fix(webapp): mask 404 as 403 when findResource returns null on author…
ericallam 188fa43
fix(webapp): only mask 404 as 403 when authorization fails
ericallam File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@trigger.dev/core": patch | ||
| --- | ||
|
|
||
| Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc. Ships the `CreateSessionStreamWaitpoint` request/response schemas alongside the main Session CRUD. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Add the `Session` primitive — a durable, task-bound, bidirectional I/O channel that outlives a single run and acts as the run manager for `chat.agent`. Ships the Postgres `Session` + `SessionRun` tables, ClickHouse `sessions_v1` + replication service, the `sessions` JWT scope, and the public CRUD + realtime routes (`/api/v1/sessions`, `/realtime/v1/sessions/:session/:io`) including `end-and-continue` for server-orchestrated run handoffs and session-stream waitpoints. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
188 changes: 188 additions & 0 deletions
188
apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,188 @@ | ||
| import { json } from "@remix-run/server-runtime"; | ||
| import { | ||
| CreateSessionStreamWaitpointRequestBody, | ||
| type CreateSessionStreamWaitpointResponseBody, | ||
| } from "@trigger.dev/core/v3"; | ||
| import { WaitpointId } from "@trigger.dev/core/v3/isomorphic"; | ||
| import { z } from "zod"; | ||
| import { $replica } from "~/db.server"; | ||
| import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server"; | ||
| import { | ||
| canonicalSessionAddressingKey, | ||
| isSessionFriendlyIdForm, | ||
| resolveSessionByIdOrExternalId, | ||
| } from "~/services/realtime/sessions.server"; | ||
| import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server"; | ||
| import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; | ||
| import { | ||
| addSessionStreamWaitpoint, | ||
| removeSessionStreamWaitpoint, | ||
| } from "~/services/sessionStreamWaitpointCache.server"; | ||
| import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||
| import { logger } from "~/services/logger.server"; | ||
| import { parseDelay } from "~/utils/delays"; | ||
| import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; | ||
| import { engine } from "~/v3/runEngine.server"; | ||
| import { ServiceValidationError } from "~/v3/services/baseService.server"; | ||
|
|
||
| const ParamsSchema = z.object({ | ||
| runFriendlyId: z.string(), | ||
| }); | ||
|
|
||
| const { action, loader } = createActionApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| body: CreateSessionStreamWaitpointRequestBody, | ||
| maxContentLength: 1024 * 10, // 10KB | ||
| method: "POST", | ||
| }, | ||
| async ({ authentication, body, params }) => { | ||
| try { | ||
| const run = await $replica.taskRun.findFirst({ | ||
| where: { | ||
| friendlyId: params.runFriendlyId, | ||
| runtimeEnvironmentId: authentication.environment.id, | ||
| }, | ||
| select: { | ||
| id: true, | ||
| friendlyId: true, | ||
| realtimeStreamsVersion: true, | ||
| }, | ||
| }); | ||
|
|
||
| if (!run) { | ||
| return json({ error: "Run not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| // Row-optional addressing — see the .out / .in.append handlers. | ||
| // The waitpoint cache + S2 stream key derive from the row's | ||
| // canonical identity (externalId if set, else friendlyId), so | ||
| // the agent's wait registration and the append-side drain | ||
| // converge regardless of which URL form each side used. | ||
| const maybeSession = await resolveSessionByIdOrExternalId( | ||
| $replica, | ||
| authentication.environment.id, | ||
| body.session | ||
| ); | ||
|
|
||
| if (!maybeSession && isSessionFriendlyIdForm(body.session)) { | ||
| return json({ error: "Session not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| const addressingKey = canonicalSessionAddressingKey(maybeSession, body.session); | ||
|
|
||
| const idempotencyKeyExpiresAt = body.idempotencyKeyTTL | ||
| ? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL) | ||
| : undefined; | ||
|
|
||
| const timeout = await parseDelay(body.timeout); | ||
|
|
||
| const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags; | ||
|
|
||
| if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) { | ||
| throw new ServiceValidationError( | ||
| `Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.` | ||
| ); | ||
| } | ||
|
|
||
| if (bodyTags && bodyTags.length > 0) { | ||
| for (const tag of bodyTags) { | ||
| await createWaitpointTag({ | ||
| tag, | ||
| environmentId: authentication.environment.id, | ||
| projectId: authentication.environment.projectId, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // Step 1: Create the waitpoint. | ||
| const result = await engine.createManualWaitpoint({ | ||
| environmentId: authentication.environment.id, | ||
| projectId: authentication.environment.projectId, | ||
| idempotencyKey: body.idempotencyKey, | ||
| idempotencyKeyExpiresAt, | ||
| timeout, | ||
| tags: bodyTags, | ||
| }); | ||
|
|
||
| // Step 2: Register the waitpoint on the session channel so the next | ||
| // append fires it. Keyed by (addressingKey, io) — the canonical | ||
| // string for the row. The append handler drains by the same | ||
| // canonical key, so writers and readers converge regardless of | ||
| // which URL form the agent vs. the appending caller used. | ||
| const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined; | ||
| await addSessionStreamWaitpoint( | ||
| addressingKey, | ||
| body.io, | ||
| result.waitpoint.id, | ||
| ttlMs && ttlMs > 0 ? ttlMs : undefined | ||
| ); | ||
|
|
||
| // Step 3: Race-check. If a record landed on the channel before this | ||
| // .wait() call, complete the waitpoint synchronously with that data | ||
| // and remove the pending registration. | ||
| if (!result.isCached) { | ||
| try { | ||
| // Session streams are always v2 (S2) — the writer in | ||
| // `appendPartToSessionStream` and the SSE subscribe both | ||
| // hardcode "v2", so the race-check reader has to match. | ||
| // Don't fall through to the run's own `realtimeStreamsVersion`, | ||
| // which only describes the run's run-scoped streams. | ||
| const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2"); | ||
|
|
||
| if (realtimeStream instanceof S2RealtimeStreams) { | ||
| const records = await realtimeStream.readSessionStreamRecords( | ||
| addressingKey, | ||
| body.io, | ||
| body.lastSeqNum | ||
| ); | ||
|
|
||
| if (records.length > 0) { | ||
| const record = records[0]!; | ||
|
|
||
| await engine.completeWaitpoint({ | ||
| id: result.waitpoint.id, | ||
| output: { | ||
| value: record.data, | ||
| type: "application/json", | ||
| isError: false, | ||
| }, | ||
| }); | ||
|
|
||
| await removeSessionStreamWaitpoint( | ||
| addressingKey, | ||
| body.io, | ||
| result.waitpoint.id | ||
| ); | ||
| } | ||
| } | ||
| } catch (error) { | ||
| // Non-fatal: pending registration stays in Redis; the next append | ||
| // will complete the waitpoint via the append handler path. Log so | ||
| // a broken race-check doesn't silently degrade to timeout-only. | ||
| logger.warn("session-stream wait race-check failed", { | ||
| addressingKey, | ||
| io: body.io, | ||
| waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id), | ||
| error, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| return json<CreateSessionStreamWaitpointResponseBody>({ | ||
| waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id), | ||
| isCached: result.isCached, | ||
| }); | ||
| } catch (error) { | ||
| if (error instanceof ServiceValidationError) { | ||
| return json({ error: error.message }, { status: 422 }); | ||
| } | ||
| // Don't forward raw internal error messages (could leak Prisma/engine | ||
| // details). Log server-side and return a generic 500. | ||
| logger.error("Failed to create session-stream waitpoint", { error }); | ||
| return json({ error: "Something went wrong" }, { status: 500 }); | ||
| } | ||
| } | ||
| ); | ||
|
|
||
| export { action, loader }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| import { json } from "@remix-run/server-runtime"; | ||
| import { | ||
| CloseSessionRequestBody, | ||
| type RetrieveSessionResponseBody, | ||
| } from "@trigger.dev/core/v3"; | ||
| import { z } from "zod"; | ||
| import { $replica, prisma } from "~/db.server"; | ||
| import { | ||
| resolveSessionByIdOrExternalId, | ||
| serializeSessionWithFriendlyRunId, | ||
| } from "~/services/realtime/sessions.server"; | ||
| import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||
|
|
||
| const ParamsSchema = z.object({ | ||
| session: z.string(), | ||
| }); | ||
|
|
||
| const { action, loader } = createActionApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| body: CloseSessionRequestBody, | ||
| maxContentLength: 1024, | ||
| method: "POST", | ||
| allowJWT: true, | ||
| corsStrategy: "all", | ||
| authorization: { | ||
| action: "admin", | ||
| resource: (params) => ({ sessions: params.session }), | ||
| superScopes: ["admin:sessions", "admin:all", "admin"], | ||
| }, | ||
| }, | ||
| async ({ authentication, params, body }) => { | ||
| const existing = await resolveSessionByIdOrExternalId( | ||
| $replica, | ||
| authentication.environment.id, | ||
| params.session | ||
| ); | ||
|
|
||
| if (!existing) { | ||
| return json({ error: "Session not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| // Idempotent: if already closed, return the current row without clobbering | ||
| // the original closedAt / closedReason. | ||
| if (existing.closedAt) { | ||
| return json<RetrieveSessionResponseBody>( | ||
| await serializeSessionWithFriendlyRunId(existing) | ||
| ); | ||
| } | ||
|
|
||
| // `closedAt: null` on the where clause makes the update conditional at | ||
| // the DB level. Two concurrent closes race through the earlier read, | ||
| // but only one can win this update — the loser hits `count === 0` and | ||
| // falls back to reading the winning row. Closedness is write-once. | ||
| const { count } = await prisma.session.updateMany({ | ||
| where: { id: existing.id, closedAt: null }, | ||
| data: { | ||
| closedAt: new Date(), | ||
| closedReason: body.reason ?? null, | ||
| }, | ||
| }); | ||
|
|
||
| if (count === 0) { | ||
| const final = await prisma.session.findFirst({ where: { id: existing.id } }); | ||
| if (!final) return json({ error: "Session not found" }, { status: 404 }); | ||
| return json<RetrieveSessionResponseBody>( | ||
| await serializeSessionWithFriendlyRunId(final) | ||
| ); | ||
| } | ||
|
|
||
| const updated = await prisma.session.findFirst({ where: { id: existing.id } }); | ||
| if (!updated) return json({ error: "Session not found" }, { status: 404 }); | ||
| return json<RetrieveSessionResponseBody>( | ||
| await serializeSessionWithFriendlyRunId(updated) | ||
| ); | ||
| } | ||
| ); | ||
|
|
||
| export { action, loader }; | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.