Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
edded41
feat(webapp,clickhouse,database,core): Session primitive (server side)
ericallam Apr 20, 2026
f6149c9
code review fixes
ericallam Apr 20, 2026
a724f9c
fix(core): reject externalId starting with 'session_' on Session crea…
ericallam Apr 20, 2026
fceabc1
fix(webapp): allow JWT + CORS on sessions list endpoint
ericallam Apr 20, 2026
2f8903a
fix(webapp): tighten sessions create + list auth
ericallam Apr 20, 2026
aaab958
feat(webapp,core): Session channel waitpoints — server side
ericallam Apr 23, 2026
4f2c0e7
fix(webapp): CORS + allowJWT on public session create + append preflight
ericallam Apr 23, 2026
84d3db1
fix(webapp): address #3417 PR review feedback
ericallam Apr 23, 2026
b6e642f
fix(webapp): correct backward pagination slice on session list
ericallam Apr 23, 2026
4453a45
feat(webapp): session.out wait=0 + X-Session-Settled on settled tail
ericallam Apr 24, 2026
71f02ce
x-peek-settled header
ericallam Apr 25, 2026
427541c
feat(webapp,db,core): Sessions become run manager
ericallam Apr 27, 2026
a349d02
fix(webapp): address #3417 PR review feedback
ericallam Apr 27, 2026
c397160
fix(webapp): address #3417 PR review feedback (round 2)
ericallam Apr 27, 2026
4b15c7d
chore(server-changes): consolidate sessions PR into one entry
ericallam Apr 27, 2026
c0e87bf
fix(webapp): use prisma writer for read-after-write of triggered run …
ericallam Apr 28, 2026
e5f9dd1
fix(webapp): use prisma writer for post-race re-read of session row i…
ericallam Apr 28, 2026
554940d
fix(webapp): parse stringified chunk envelope in peek-settled fast path
ericallam Apr 28, 2026
cf67175
fix(webapp): use prisma writer for cancelLostRaceRun's just-triggered…
ericallam Apr 28, 2026
faf7888
fix(webapp): reject create on closed sessions with 409
ericallam Apr 28, 2026
1a880fd
fix(webapp): hardcode v2 for session-streams wait race-check
ericallam Apr 28, 2026
0caa55a
fix(webapp): mask 404 as 403 when findResource returns null on author…
ericallam Apr 28, 2026
188fa43
fix(webapp): only mask 404 as 403 when authorization fails
ericallam Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/session-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc. Ships the `CreateSessionStreamWaitpoint` request/response schemas alongside the main Session CRUD.
6 changes: 6 additions & 0 deletions .server-changes/session-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add the `Session` primitive — a durable, task-bound, bidirectional I/O channel that outlives a single run and acts as the run manager for `chat.agent`. Ships the Postgres `Session` + `SessionRun` tables, ClickHouse `sessions_v1` + replication service, the `sessions` JWT scope, and the public CRUD + realtime routes (`/api/v1/sessions`, `/realtime/v1/sessions/:session/:io`) including `end-and-continue` for server-orchestrated run handoffs and session-stream waitpoints.
38 changes: 38 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,44 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
import { signalsEmitter } from "./services/signals.server";

// Start the sessions replication service (subscribes to the logical replication
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
// runs deterministically on webapp boot rather than lazily via a singleton
// reference elsewhere in the module graph.
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
// Capture a non-nullable reference so the shutdown closure below
// doesn't need to re-null-check (TS narrowing doesn't follow through
// an inner function scope).
const replicator = sessionsReplicationInstance;
replicator
.start()
.then(() => {
console.log("🗃️ Sessions replication service started");
})
.catch((error) => {
console.error("🗃️ Sessions replication service failed to start", {
error,
});
});

// Wrap the async shutdown in a sync handler that catches rejections —
// SIGTERM/SIGINT fire during process teardown, and an unhandled
// promise rejection from `_replicationClient.stop()` there would
// bubble up past the process exit. Matches the pattern in
// dynamicFlushScheduler.server.ts.
const shutdownSessionsReplication = () => {
replicator.shutdown().catch((error) => {
console.error("🗃️ Sessions replication service shutdown error", {
error,
});
});
};
signalsEmitter.on("SIGTERM", shutdownSessionsReplication);
signalsEmitter.on("SIGINT", shutdownSessionsReplication);
}

const ABORT_DELAY = 30000;

Expand Down
32 changes: 32 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,38 @@ const EnvironmentSchema = z
RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"),
RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"),

// Session replication (Postgres → ClickHouse sessions_v1). Shares Redis
// with the runs replicator for leader locking but has its own slot and
// publication so the two consume independently.
SESSION_REPLICATION_CLICKHOUSE_URL: z.string().optional(),
SESSION_REPLICATION_ENABLED: z.string().default("0"),
SESSION_REPLICATION_SLOT_NAME: z.string().default("sessions_to_clickhouse_v1"),
SESSION_REPLICATION_PUBLICATION_NAME: z
.string()
.default("sessions_to_clickhouse_v1_publication"),
SESSION_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(1),
SESSION_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
SESSION_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
SESSION_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
SESSION_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
SESSION_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
SESSION_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
SESSION_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
SESSION_REPLICATION_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
SESSION_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
SESSION_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"),
SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
SESSION_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
SESSION_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),

// Clickhouse
CLICKHOUSE_URL: z.string(),
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import { json } from "@remix-run/server-runtime";
import {
CreateSessionStreamWaitpointRequestBody,
type CreateSessionStreamWaitpointResponseBody,
} from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import { z } from "zod";
import { $replica } from "~/db.server";
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
import {
canonicalSessionAddressingKey,
isSessionFriendlyIdForm,
resolveSessionByIdOrExternalId,
} from "~/services/realtime/sessions.server";
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import {
addSessionStreamWaitpoint,
removeSessionStreamWaitpoint,
} from "~/services/sessionStreamWaitpointCache.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { logger } from "~/services/logger.server";
import { parseDelay } from "~/utils/delays";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { engine } from "~/v3/runEngine.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";

const ParamsSchema = z.object({
runFriendlyId: z.string(),
});

const { action, loader } = createActionApiRoute(
{
params: ParamsSchema,
body: CreateSessionStreamWaitpointRequestBody,
maxContentLength: 1024 * 10, // 10KB
method: "POST",
},
async ({ authentication, body, params }) => {
try {
const run = await $replica.taskRun.findFirst({
where: {
friendlyId: params.runFriendlyId,
runtimeEnvironmentId: authentication.environment.id,
},
select: {
id: true,
friendlyId: true,
realtimeStreamsVersion: true,
},
});

if (!run) {
return json({ error: "Run not found" }, { status: 404 });
}

// Row-optional addressing — see the .out / .in.append handlers.
// The waitpoint cache + S2 stream key derive from the row's
// canonical identity (externalId if set, else friendlyId), so
// the agent's wait registration and the append-side drain
// converge regardless of which URL form each side used.
const maybeSession = await resolveSessionByIdOrExternalId(
$replica,
authentication.environment.id,
body.session
);

if (!maybeSession && isSessionFriendlyIdForm(body.session)) {
return json({ error: "Session not found" }, { status: 404 });
}

const addressingKey = canonicalSessionAddressingKey(maybeSession, body.session);

const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
: undefined;

const timeout = await parseDelay(body.timeout);

const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags;

if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) {
throw new ServiceValidationError(
`Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.`
);
}

if (bodyTags && bodyTags.length > 0) {
for (const tag of bodyTags) {
await createWaitpointTag({
tag,
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
});
}
}

// Step 1: Create the waitpoint.
const result = await engine.createManualWaitpoint({
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
idempotencyKeyExpiresAt,
timeout,
tags: bodyTags,
});

// Step 2: Register the waitpoint on the session channel so the next
// append fires it. Keyed by (addressingKey, io) — the canonical
// string for the row. The append handler drains by the same
// canonical key, so writers and readers converge regardless of
// which URL form the agent vs. the appending caller used.
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
await addSessionStreamWaitpoint(
addressingKey,
body.io,
result.waitpoint.id,
ttlMs && ttlMs > 0 ? ttlMs : undefined
);

// Step 3: Race-check. If a record landed on the channel before this
// .wait() call, complete the waitpoint synchronously with that data
// and remove the pending registration.
if (!result.isCached) {
try {
// Session streams are always v2 (S2) — the writer in
// `appendPartToSessionStream` and the SSE subscribe both
// hardcode "v2", so the race-check reader has to match.
// Don't fall through to the run's own `realtimeStreamsVersion`,
// which only describes the run's run-scoped streams.
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");

if (realtimeStream instanceof S2RealtimeStreams) {
const records = await realtimeStream.readSessionStreamRecords(
addressingKey,
body.io,
body.lastSeqNum
);

if (records.length > 0) {
const record = records[0]!;

await engine.completeWaitpoint({
id: result.waitpoint.id,
output: {
value: record.data,
type: "application/json",
isError: false,
},
});

await removeSessionStreamWaitpoint(
addressingKey,
body.io,
result.waitpoint.id
);
}
}
} catch (error) {
// Non-fatal: pending registration stays in Redis; the next append
// will complete the waitpoint via the append handler path. Log so
// a broken race-check doesn't silently degrade to timeout-only.
logger.warn("session-stream wait race-check failed", {
addressingKey,
io: body.io,
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
error,
});
}
}

return json<CreateSessionStreamWaitpointResponseBody>({
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
isCached: result.isCached,
});
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
}
// Don't forward raw internal error messages (could leak Prisma/engine
// details). Log server-side and return a generic 500.
logger.error("Failed to create session-stream waitpoint", { error });
return json({ error: "Something went wrong" }, { status: 500 });
}
}
);

export { action, loader };
79 changes: 79 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.$session.close.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { json } from "@remix-run/server-runtime";
import {
CloseSessionRequestBody,
type RetrieveSessionResponseBody,
} from "@trigger.dev/core/v3";
import { z } from "zod";
import { $replica, prisma } from "~/db.server";
import {
resolveSessionByIdOrExternalId,
serializeSessionWithFriendlyRunId,
} from "~/services/realtime/sessions.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
session: z.string(),
});

const { action, loader } = createActionApiRoute(
{
params: ParamsSchema,
body: CloseSessionRequestBody,
maxContentLength: 1024,
method: "POST",
allowJWT: true,
corsStrategy: "all",
authorization: {
action: "admin",
resource: (params) => ({ sessions: params.session }),
superScopes: ["admin:sessions", "admin:all", "admin"],
},
},
async ({ authentication, params, body }) => {
const existing = await resolveSessionByIdOrExternalId(
$replica,
authentication.environment.id,
params.session
);

if (!existing) {
return json({ error: "Session not found" }, { status: 404 });
}

// Idempotent: if already closed, return the current row without clobbering
// the original closedAt / closedReason.
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(
await serializeSessionWithFriendlyRunId(existing)
);
}

// `closedAt: null` on the where clause makes the update conditional at
// the DB level. Two concurrent closes race through the earlier read,
// but only one can win this update — the loser hits `count === 0` and
// falls back to reading the winning row. Closedness is write-once.
const { count } = await prisma.session.updateMany({
where: { id: existing.id, closedAt: null },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if (count === 0) {
const final = await prisma.session.findFirst({ where: { id: existing.id } });
if (!final) return json({ error: "Session not found" }, { status: 404 });
return json<RetrieveSessionResponseBody>(
await serializeSessionWithFriendlyRunId(final)
);
}

const updated = await prisma.session.findFirst({ where: { id: existing.id } });
if (!updated) return json({ error: "Session not found" }, { status: 404 });
return json<RetrieveSessionResponseBody>(
await serializeSessionWithFriendlyRunId(updated)
);
}
);

export { action, loader };
Loading
Loading