Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
edded41
feat(webapp,clickhouse,database,core): Session primitive (server side)
ericallam Apr 20, 2026
f6149c9
code review fixes
ericallam Apr 20, 2026
a724f9c
fix(core): reject externalId starting with 'session_' on Session crea…
ericallam Apr 20, 2026
fceabc1
fix(webapp): allow JWT + CORS on sessions list endpoint
ericallam Apr 20, 2026
2f8903a
fix(webapp): tighten sessions create + list auth
ericallam Apr 20, 2026
aaab958
feat(webapp,core): Session channel waitpoints — server side
ericallam Apr 23, 2026
4f2c0e7
fix(webapp): CORS + allowJWT on public session create + append preflight
ericallam Apr 23, 2026
84d3db1
fix(webapp): address #3417 PR review feedback
ericallam Apr 23, 2026
b6e642f
fix(webapp): correct backward pagination slice on session list
ericallam Apr 23, 2026
4453a45
feat(webapp): session.out wait=0 + X-Session-Settled on settled tail
ericallam Apr 24, 2026
71f02ce
x-peek-settled header
ericallam Apr 25, 2026
427541c
feat(webapp,db,core): Sessions become run manager
ericallam Apr 27, 2026
a349d02
fix(webapp): address #3417 PR review feedback
ericallam Apr 27, 2026
c397160
fix(webapp): address #3417 PR review feedback (round 2)
ericallam Apr 27, 2026
4b15c7d
chore(server-changes): consolidate sessions PR into one entry
ericallam Apr 27, 2026
c0e87bf
fix(webapp): use prisma writer for read-after-write of triggered run …
ericallam Apr 28, 2026
e5f9dd1
fix(webapp): use prisma writer for post-race re-read of session row i…
ericallam Apr 28, 2026
554940d
fix(webapp): parse stringified chunk envelope in peek-settled fast path
ericallam Apr 28, 2026
cf67175
fix(webapp): use prisma writer for cancelLostRaceRun's just-triggered…
ericallam Apr 28, 2026
faf7888
fix(webapp): reject create on closed sessions with 409
ericallam Apr 28, 2026
1a880fd
fix(webapp): hardcode v2 for session-streams wait race-check
ericallam Apr 28, 2026
0caa55a
fix(webapp): mask 404 as 403 when findResource returns null on author…
ericallam Apr 28, 2026
188fa43
fix(webapp): only mask 404 as 403 when authorization fails
ericallam Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/session-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc. Ships the `CreateSessionStreamWaitpoint` request/response schemas alongside the main Session CRUD.
8 changes: 8 additions & 0 deletions .server-changes/session-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
area: webapp
type: feature
---

Add `Session` primitive — a durable, typed, bidirectional I/O primitive that outlives a single run, intended for agent/chat use cases. Ships the Postgres schema (`Session` table), control-plane CRUD routes (`POST/GET/PATCH /api/v1/sessions`, `POST /api/v1/sessions/:session/close` — polymorphic on friendlyId or externalId), `sessions` JWT scope, ClickHouse `sessions_v1` table, and `SessionsReplicationService` (logical replication from Postgres `Session` → ClickHouse `sessions_v1`). Run-scoped realtime streams (`streams.pipe`/`streams.input`) are unchanged and do **not** create Session rows.

Adds `POST /api/v1/runs/:runFriendlyId/session-streams/wait` (session-stream waitpoint creation) and wires `POST /realtime/v1/sessions/:session/:io/append` to fire any pending waitpoints on the channel. Gives `session.in` run-engine waitpoint semantics matching run-scoped input streams: a task can suspend while idle on a session channel and resume when an external client sends a record. Redis-backed pending-waitpoint set (`ssw:{sessionFriendlyId}:{io}`) is drained atomically on each append so multiple concurrent waiters (e.g. multi-tab chat) all resume together.
11 changes: 11 additions & 0 deletions .server-changes/sessions-public-api-cors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
area: webapp
type: fix
---

CORS + preflight parity on the public session API so browser-side chat transports can hit the session endpoints without being blocked:

- `POST /api/v1/sessions` (session upsert) gains `allowJWT: true` + `corsStrategy: "all"` so PATs minted by `chat.createTriggerAction` (and other browser-side session flows) pass the route's auth + respond to CORS preflight. Previously this route only accepted secret-key auth, which broke any browser-originated `sessions.create(...)` call — including the transport's direct `accessToken` fallback path.
- `POST /realtime/v1/sessions/:session/:io/append` now exports both `{ action, loader }`. The route builder installs the OPTIONS preflight handler on the `loader` even for write-only routes; without the loader export, the CORS preflight was returning 400 ("No loader for route") and Chrome treated the follow-up `POST` as `net::ERR_FAILED`.

Validated by an end-to-end UI smoke against the `references/ai-chat` app: brand-new chat → send → streamed assistant reply in ~4s → follow-up turn on the same session → `lastEventId` advances from 10 → 21.
28 changes: 28 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
import { signalsEmitter } from "./services/signals.server";

// Start the sessions replication service (subscribes to the logical replication
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
// runs deterministically on webapp boot rather than lazily via a singleton
// reference elsewhere in the module graph.
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
sessionsReplicationInstance
.start()
.then(() => {
console.log("🗃️ Sessions replication service started");
})
.catch((error) => {
console.error("🗃️ Sessions replication service failed to start", {
error,
});
});

signalsEmitter.on(
"SIGTERM",
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
);
signalsEmitter.on(
"SIGINT",
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

const ABORT_DELAY = 30000;

Expand Down
32 changes: 32 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,38 @@ const EnvironmentSchema = z
RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"),
RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"),

// Session replication (Postgres → ClickHouse sessions_v1). Shares Redis
// with the runs replicator for leader locking but has its own slot and
// publication so the two consume independently.
SESSION_REPLICATION_CLICKHOUSE_URL: z.string().optional(),
SESSION_REPLICATION_ENABLED: z.string().default("0"),
SESSION_REPLICATION_SLOT_NAME: z.string().default("sessions_to_clickhouse_v1"),
SESSION_REPLICATION_PUBLICATION_NAME: z
.string()
.default("sessions_to_clickhouse_v1_publication"),
SESSION_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(1),
SESSION_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
SESSION_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
SESSION_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
SESSION_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
SESSION_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
SESSION_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
SESSION_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
SESSION_REPLICATION_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
SESSION_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
SESSION_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"),
SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
SESSION_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
SESSION_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),

// Clickhouse
CLICKHOUSE_URL: z.string(),
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { json } from "@remix-run/server-runtime";
import {
CreateSessionStreamWaitpointRequestBody,
type CreateSessionStreamWaitpointResponseBody,
} from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import { z } from "zod";
import { $replica } from "~/db.server";
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import {
addSessionStreamWaitpoint,
removeSessionStreamWaitpoint,
} from "~/services/sessionStreamWaitpointCache.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { parseDelay } from "~/utils/delays";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { engine } from "~/v3/runEngine.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";

const ParamsSchema = z.object({
runFriendlyId: z.string(),
});

const { action, loader } = createActionApiRoute(
{
params: ParamsSchema,
body: CreateSessionStreamWaitpointRequestBody,
maxContentLength: 1024 * 10, // 10KB
method: "POST",
},
async ({ authentication, body, params }) => {
try {
const run = await $replica.taskRun.findFirst({
where: {
friendlyId: params.runFriendlyId,
runtimeEnvironmentId: authentication.environment.id,
},
select: {
id: true,
friendlyId: true,
realtimeStreamsVersion: true,
},
});

if (!run) {
return json({ error: "Run not found" }, { status: 404 });
}

const session = await resolveSessionByIdOrExternalId(
$replica,
authentication.environment.id,
body.session
);

if (!session) {
return json({ error: "Session not found" }, { status: 404 });
}

const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
: undefined;

const timeout = await parseDelay(body.timeout);

const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags;

if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) {
throw new ServiceValidationError(
`Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.`
);
}

if (bodyTags && bodyTags.length > 0) {
for (const tag of bodyTags) {
await createWaitpointTag({
tag,
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
});
}
}

// Step 1: Create the waitpoint.
const result = await engine.createManualWaitpoint({
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
idempotencyKeyExpiresAt,
timeout,
tags: bodyTags,
});

// Step 2: Register the waitpoint on the session channel so the next
// append fires it. Keyed by (sessionFriendlyId, io) — both runs on a
// multi-tab session wake on the same record.
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
await addSessionStreamWaitpoint(
session.friendlyId,
body.io,
result.waitpoint.id,
ttlMs && ttlMs > 0 ? ttlMs : undefined
);

// Step 3: Race-check. If a record landed on the channel before this
// .wait() call, complete the waitpoint synchronously with that data
// and remove the pending registration.
if (!result.isCached) {
try {
const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
run.realtimeStreamsVersion
);
Comment thread
ericallam marked this conversation as resolved.
Outdated

if (realtimeStream instanceof S2RealtimeStreams) {
const records = await realtimeStream.readSessionStreamRecords(
session.friendlyId,
body.io,
body.lastSeqNum
);

if (records.length > 0) {
const record = records[0]!;

await engine.completeWaitpoint({
id: result.waitpoint.id,
output: {
value: record.data,
type: "application/json",
isError: false,
},
});

await removeSessionStreamWaitpoint(
session.friendlyId,
body.io,
result.waitpoint.id
);
}
}
} catch {
// Non-fatal: pending registration stays in Redis; the next append
// will complete the waitpoint via the append handler path.
}
}

return json<CreateSessionStreamWaitpointResponseBody>({
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
isCached: result.isCached,
});
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 500 });
}

return json({ error: "Something went wrong" }, { status: 500 });
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
);

export { action, loader };
61 changes: 61 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.$session.close.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { json } from "@remix-run/server-runtime";
import {
CloseSessionRequestBody,
type RetrieveSessionResponseBody,
} from "@trigger.dev/core/v3";
import { z } from "zod";
import { prisma } from "~/db.server";
import {
resolveSessionByIdOrExternalId,
serializeSession,
} from "~/services/realtime/sessions.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
session: z.string(),
});

const { action } = createActionApiRoute(
{
params: ParamsSchema,
body: CloseSessionRequestBody,
maxContentLength: 1024,
method: "POST",
allowJWT: true,
corsStrategy: "all",
authorization: {
action: "admin",
resource: (params) => ({ sessions: params.session }),
superScopes: ["admin:sessions", "admin:all", "admin"],
},
},
async ({ authentication, params, body }) => {
const existing = await resolveSessionByIdOrExternalId(
prisma,
authentication.environment.id,
params.session
);

if (!existing) {
return json({ error: "Session not found" }, { status: 404 });
}

// Idempotent: if already closed, return the current row without clobbering
// the original closedAt / closedReason.
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(serializeSession(existing));
}

const updated = await prisma.session.update({
where: { id: existing.id },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return json<RetrieveSessionResponseBody>(serializeSession(updated));
}
);

export { action };
Comment thread
ericallam marked this conversation as resolved.
Outdated
Loading
Loading