From 3041ff140e1ec96b9361bb09d8b1a4a35ee3b6af Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 21 May 2026 13:56:29 +0200 Subject: [PATCH 01/10] [world-vercel] Add v4 event API client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the v4 server-side handlers landing in workflow-server. The v4 wire format moves event metadata into x-wf-* request/response headers and treats payloads as opaque user-data bytes (streamed end-to-end). The SDK passes Uint8Array bytes through unchanged at this layer; higher-level world-vercel adapter glue handles CBOR. Adds: - packages/world-vercel/src/frames.ts: encoder + async-iterable decoder for the length-prefixed binary frame format used by the v4 list-events response. - packages/world-vercel/src/events-v4.ts: three new functions: * createWorkflowRunEventV4 — POST with x-wf-* headers + payload bytes, returns event/run ids and timestamp from response headers. * getEventV4 — GET single event, returns metadata + body bytes. * getWorkflowRunEventsV4 — GET list, parses frame stream, returns events + pagination cursor. - V4_HEADERS exported as the canonical name map; mirrors the server-side constant. V4 client characteristics: - Required runId in URL for run_created too (no /runs/null/events shortcut; the runId is part of the S3 key the server allocates). Higher-level callers generate the ULID client-side. - Payload bytes flow through without CBOR encode/decode on this layer. Callers CBOR-encode for parity with v3 if they want. - Pagination cursor surfaces in the LIST response — eliminates the per-large-payload /refs round-trip used by v2/v3. Tests (10 new in src/frames.test.ts, no new e2e): - Canonical wire layout round-trip. - Multi-frame round-trip with pagination cursor. - Decoder survives 1-byte chunk delivery (matching spike B's chunk- boundary robustness requirement). - 64 KB body split across many small chunks. - Bodies containing 0xff padding don't mis-frame. - Back-to-back frames in a single chunk. - Truncated stream raises. - Meta CBOR types (numbers, booleans, arrays) preserved. The world-vercel adapter still defaults to the v3 path; v4 is exposed for direct callers and a follow-up PR will switch the adapter over once the matching server-side PR is on staging. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/v4-events-client.md | 5 + packages/world-vercel/src/events-v4.ts | 394 +++++++++++++++++++++++ packages/world-vercel/src/frames.test.ts | 181 +++++++++++ packages/world-vercel/src/frames.ts | 110 +++++++ 4 files changed, 690 insertions(+) create mode 100644 .changeset/v4-events-client.md create mode 100644 packages/world-vercel/src/events-v4.ts create mode 100644 packages/world-vercel/src/frames.test.ts create mode 100644 packages/world-vercel/src/frames.ts diff --git a/.changeset/v4-events-client.md b/.changeset/v4-events-client.md new file mode 100644 index 0000000000..d482741400 --- /dev/null +++ b/.changeset/v4-events-client.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": minor +--- + +Add v4 event API client: `createWorkflowRunEventV4`, `getEventV4`, `getWorkflowRunEventsV4`. Sends event metadata via `x-wf-*` headers and treats payloads as opaque bytes (streamed end-to-end), eliminating server-side CBOR parsing and the `/refs` round-trip on list responses. The world-vercel adapter still uses the v3 path by default; v4 is exposed for direct callers and a follow-up will switch the adapter over. diff --git a/packages/world-vercel/src/events-v4.ts b/packages/world-vercel/src/events-v4.ts new file mode 100644 index 0000000000..b748343757 --- /dev/null +++ b/packages/world-vercel/src/events-v4.ts @@ -0,0 +1,394 @@ +/** + * v4 event endpoints — header/body-split wire protocol. + * + * Mirrors the server-side handlers in + * workflow-server/lib/handlers/v4/. The v4 wire format: + * + * - POST: structured event metadata rides in `x-wf-*` request headers; + * the request body is opaque user-payload bytes streamed straight + * to S3 by the server. No CBOR encoding/decoding on the body — the + * SDK passes Uint8Array bytes through unchanged. + * - GET single event: response headers carry the same `x-wf-*` metadata; + * the response body is the raw payload bytes (streamed from S3 when + * stored there). + * - GET list: a length-prefixed binary frame stream — see + * `frames.ts` for the codec. Eliminates the per-event `/refs` + * round-trip used by v2/v3. + * + * Higher-level callers (the world-vercel adapter) are expected to + * CBOR-encode their JS values into the `payload` parameter and to + * CBOR-decode the returned `body` bytes — this module stays at the + * wire-bytes layer. + */ + +import { getVercelOidcToken } from '@vercel/oidc'; +import { request } from 'undici'; +import { decodeFrames, V4_FRAME_CONTENT_TYPE } from './frames.js'; +import { getDispatcher } from './http-client.js'; +import { type APIConfig, getHttpConfig } from './utils.js'; + +/** Names of the `x-wf-*` headers exchanged with the server. Mirror of + * workflow-server/lib/handlers/v4/headers.ts `V4_HEADERS`. */ +export const V4_HEADERS = { + eventType: 'x-wf-event-type', + specVersion: 'x-wf-spec-version', + correlationId: 'x-wf-correlation-id', + vercelId: 'x-wf-vercel-id', + remoteRefBehavior: 'x-wf-remote-ref-behavior', + deploymentId: 'x-wf-deployment-id', + workflowName: 'x-wf-workflow-name', + stepName: 'x-wf-step-name', + attempt: 'x-wf-attempt', + resumeAt: 'x-wf-resume-at', + hookToken: 'x-wf-hook-token', + hookIsWebhook: 'x-wf-hook-is-webhook', + hookIsSystem: 'x-wf-hook-is-system', + errorCode: 'x-wf-error-code', + executionContextB64: 'x-wf-execution-context-b64', + eventId: 'x-wf-event-id', + runId: 'x-wf-run-id', + createdAt: 'x-wf-created-at', +} as const; + +export interface CreateEventV4Input { + /** runId in the URL. Required for run_created too — v4 has no + * `/runs/null/events` shortcut because the runId is part of the S3 + * key. Higher-level callers generate the ULID locally. */ + runId: string; + eventType: string; + /** Opaque payload bytes. Pass undefined for events that don't carry + * user data (e.g. step_started). */ + payload?: Uint8Array; + specVersion: number; + correlationId?: string; + vercelId?: string; + remoteRefBehavior?: 'resolve' | 'lazy'; + deploymentId?: string; + workflowName?: string; + stepName?: string; + attempt?: number; + resumeAt?: string; + hookToken?: string; + hookIsWebhook?: boolean; + hookIsSystem?: boolean; + errorCode?: string; + /** Base64-encoded CBOR of the executionContext object. Use + * `encodeExecutionContextHeader` to produce this. */ + executionContextB64?: string; +} + +export interface CreateEventV4Result { + eventId: string; + runId: string; + createdAt: string; +} + +/** Apply structured fields onto a Headers object. Non-ASCII string fields + * are percent-encoded so they survive the byte-restricted header + * transport, matching the server-side decode. */ +function applyV4Headers(headers: Headers, input: CreateEventV4Input): void { + headers.set(V4_HEADERS.eventType, input.eventType); + headers.set(V4_HEADERS.specVersion, String(input.specVersion)); + if (input.correlationId) { + headers.set(V4_HEADERS.correlationId, input.correlationId); + } + if (input.vercelId) headers.set(V4_HEADERS.vercelId, input.vercelId); + if (input.remoteRefBehavior) { + headers.set(V4_HEADERS.remoteRefBehavior, input.remoteRefBehavior); + } + if (input.deploymentId) { + headers.set( + V4_HEADERS.deploymentId, + encodeURIComponent(input.deploymentId) + ); + } + if (input.workflowName) { + headers.set( + V4_HEADERS.workflowName, + encodeURIComponent(input.workflowName) + ); + } + if (input.stepName) { + headers.set(V4_HEADERS.stepName, encodeURIComponent(input.stepName)); + } + if (input.attempt !== undefined) { + headers.set(V4_HEADERS.attempt, String(input.attempt)); + } + if (input.resumeAt) { + headers.set(V4_HEADERS.resumeAt, encodeURIComponent(input.resumeAt)); + } + if (input.hookToken) { + headers.set(V4_HEADERS.hookToken, encodeURIComponent(input.hookToken)); + } + if (input.hookIsWebhook !== undefined) { + headers.set(V4_HEADERS.hookIsWebhook, String(input.hookIsWebhook)); + } + if (input.hookIsSystem !== undefined) { + headers.set(V4_HEADERS.hookIsSystem, String(input.hookIsSystem)); + } + if (input.errorCode) { + headers.set(V4_HEADERS.errorCode, encodeURIComponent(input.errorCode)); + } + if (input.executionContextB64) { + headers.set(V4_HEADERS.executionContextB64, input.executionContextB64); + } +} + +async function setAuthHeader( + headers: Headers, + config: APIConfig | undefined +): Promise { + if (config?.token) { + headers.set('Authorization', `Bearer ${config.token}`); + } else { + // Default: get an OIDC token via @vercel/oidc, same as the v3 client. + const token = await getVercelOidcToken(); + headers.set('Authorization', `Bearer ${token}`); + } +} + +/** + * POST /api/v4/runs/:runId/events + * + * Returns the event/run ids and createdAt timestamp parsed out of + * the response headers. Throws on non-2xx responses. + */ +export async function createWorkflowRunEventV4( + input: CreateEventV4Input, + config?: APIConfig +): Promise { + const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); + const headers = new Headers(baseHeaders); + headers.set('Content-Type', 'application/octet-stream'); + applyV4Headers(headers, input); + await setAuthHeader(headers, config); + + const url = `${baseUrl}/v4/runs/${encodeURIComponent(input.runId)}/events`; + const response = await request(url, { + method: 'POST', + headers: Object.fromEntries(headers.entries()), + body: input.payload ?? undefined, + dispatcher: getDispatcher(), + }); + if (response.statusCode < 200 || response.statusCode >= 300) { + const errorBody = await response.body.text(); + throw new Error( + `v4 createEvent failed: ${response.statusCode} ${errorBody}` + ); + } + // Discard the response body (204 No Content). + await response.body.text(); + + const eventId = response.headers[V4_HEADERS.eventId]; + const runId = response.headers[V4_HEADERS.runId]; + const createdAt = response.headers[V4_HEADERS.createdAt]; + if ( + typeof eventId !== 'string' || + typeof runId !== 'string' || + typeof createdAt !== 'string' + ) { + throw new Error('v4 createEvent: response missing required x-wf-* headers'); + } + return { eventId, runId, createdAt }; +} + +export interface GetEventV4Result { + eventId: string; + runId: string; + eventType: string; + createdAt: string; + correlationId?: string; + workflowName?: string; + stepName?: string; + attempt?: number; + deploymentId?: string; + errorCode?: string; + /** The raw payload bytes (possibly empty). Caller decodes. */ + body: Uint8Array; +} + +function readHeader( + responseHeaders: Record, + name: string +): string | undefined { + const value = responseHeaders[name]; + if (typeof value === 'string') return value; + if (Array.isArray(value) && value.length > 0) return value[0]; + return undefined; +} + +function parseEventMetaFromHeaders( + responseHeaders: Record +): Omit { + const eventId = readHeader(responseHeaders, V4_HEADERS.eventId); + const runId = readHeader(responseHeaders, V4_HEADERS.runId); + const eventType = readHeader(responseHeaders, V4_HEADERS.eventType); + const createdAt = readHeader(responseHeaders, V4_HEADERS.createdAt); + if (!eventId || !runId || !eventType || !createdAt) { + throw new Error('v4 getEvent: response missing required x-wf-* headers'); + } + const correlationId = readHeader(responseHeaders, V4_HEADERS.correlationId); + const workflowName = readHeader(responseHeaders, V4_HEADERS.workflowName); + const stepName = readHeader(responseHeaders, V4_HEADERS.stepName); + const attemptStr = readHeader(responseHeaders, V4_HEADERS.attempt); + const deploymentId = readHeader(responseHeaders, V4_HEADERS.deploymentId); + const errorCode = readHeader(responseHeaders, V4_HEADERS.errorCode); + return { + eventId, + runId, + eventType, + createdAt, + ...(correlationId ? { correlationId } : {}), + ...(workflowName ? { workflowName: decodeURIComponent(workflowName) } : {}), + ...(stepName ? { stepName: decodeURIComponent(stepName) } : {}), + ...(attemptStr ? { attempt: Number(attemptStr) } : {}), + ...(deploymentId ? { deploymentId: decodeURIComponent(deploymentId) } : {}), + ...(errorCode ? { errorCode: decodeURIComponent(errorCode) } : {}), + }; +} + +/** + * GET /api/v4/runs/:runId/events/:eventId + * + * Returns the event metadata (parsed from response headers) along + * with the payload body as a Uint8Array. + */ +export async function getEventV4( + runId: string, + eventId: string, + config?: APIConfig +): Promise { + const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); + const headers = new Headers(baseHeaders); + await setAuthHeader(headers, config); + + const url = `${baseUrl}/v4/runs/${encodeURIComponent(runId)}/events/${encodeURIComponent(eventId)}`; + const response = await request(url, { + method: 'GET', + headers: Object.fromEntries(headers.entries()), + dispatcher: getDispatcher(), + }); + if (response.statusCode < 200 || response.statusCode >= 300) { + const errorBody = await response.body.text(); + throw new Error(`v4 getEvent failed: ${response.statusCode} ${errorBody}`); + } + const meta = parseEventMetaFromHeaders(response.headers); + const body = new Uint8Array(await response.body.arrayBuffer()); + return { ...meta, body }; +} + +export interface ListEventsV4Params { + cursor?: string; + limit?: number; + sortOrder?: 'asc' | 'desc'; +} + +export interface ListedEventV4 { + eventId: string; + runId: string; + eventType: string; + createdAt: string; + correlationId?: string; + workflowName?: string; + stepName?: string; + attempt?: number; + deploymentId?: string; + errorCode?: string; + body: Uint8Array; +} + +export interface ListEventsV4Result { + events: ListedEventV4[]; + /** Pagination cursor — present when more pages remain. */ + next?: string; +} + +/** + * GET /api/v4/runs/:runId/events + * + * Parses the binary-frame stream into a list of events plus the + * pagination cursor (from the sentinel frame). + * + * NOTE: this implementation eagerly drains the stream into memory. A + * streaming variant that yields events one at a time without buffering + * the whole page is a straightforward refactor (decodeFrames is already + * an async generator); we keep this signature for parity with the + * existing `getWorkflowRunEvents` callers in the world-vercel adapter. + */ +export async function getWorkflowRunEventsV4( + runId: string, + params: ListEventsV4Params = {}, + config?: APIConfig +): Promise { + const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); + const headers = new Headers(baseHeaders); + await setAuthHeader(headers, config); + + const searchParams = new URLSearchParams(); + if (params.cursor) searchParams.set('cursor', params.cursor); + if (params.limit !== undefined) { + searchParams.set('limit', String(params.limit)); + } + if (params.sortOrder) searchParams.set('sortOrder', params.sortOrder); + const qs = searchParams.toString(); + const url = + `${baseUrl}/v4/runs/${encodeURIComponent(runId)}/events` + + (qs ? `?${qs}` : ''); + + const response = await request(url, { + method: 'GET', + headers: Object.fromEntries(headers.entries()), + dispatcher: getDispatcher(), + }); + if (response.statusCode < 200 || response.statusCode >= 300) { + const errorBody = await response.body.text(); + throw new Error( + `v4 listEvents failed: ${response.statusCode} ${errorBody}` + ); + } + const contentType = readHeader(response.headers, 'content-type'); + if (!contentType?.startsWith(V4_FRAME_CONTENT_TYPE)) { + throw new Error( + `v4 listEvents: expected ${V4_FRAME_CONTENT_TYPE}, got ${contentType ?? '(none)'}` + ); + } + + // undici's `request().body` is a Node Readable; convert to a Web + // ReadableStream so the same decodeFrames implementation works in + // both Node and edge runtimes. + const webBody = (await import('node:stream')).Readable.toWeb( + response.body as unknown as import('node:stream').Readable + ) as unknown as ReadableStream; + + const events: ListedEventV4[] = []; + let next: string | undefined; + for await (const frame of decodeFrames(webBody)) { + if (frame.meta._end === 1) { + if (typeof frame.meta.next === 'string') next = frame.meta.next; + break; + } + const meta = frame.meta as Record; + const event: ListedEventV4 = { + eventId: String(meta.eventId ?? ''), + runId: String(meta.runId ?? ''), + eventType: String(meta.eventType ?? ''), + createdAt: String(meta.createdAt ?? ''), + body: frame.body, + }; + if (typeof meta.correlationId === 'string') { + event.correlationId = meta.correlationId; + } + if (typeof meta.workflowName === 'string') { + event.workflowName = meta.workflowName; + } + if (typeof meta.stepName === 'string') event.stepName = meta.stepName; + if (typeof meta.attempt === 'number') event.attempt = meta.attempt; + if (typeof meta.deploymentId === 'string') { + event.deploymentId = meta.deploymentId; + } + if (typeof meta.errorCode === 'string') event.errorCode = meta.errorCode; + events.push(event); + } + + return { events, ...(next ? { next } : {}) }; +} diff --git a/packages/world-vercel/src/frames.test.ts b/packages/world-vercel/src/frames.test.ts new file mode 100644 index 0000000000..b2e302531f --- /dev/null +++ b/packages/world-vercel/src/frames.test.ts @@ -0,0 +1,181 @@ +import { decode, encode } from 'cbor-x'; +import { describe, expect, it } from 'vitest'; +import { + type DecodedFrame, + decodeFrames, + encodeFrame, + V4_FRAME_CONTENT_TYPE, +} from './frames.js'; + +/** Server's wire encoder (matches the workflow-server/lib/handlers/v4/frames.ts + * end-frame helper). Re-implemented here so the client tests don't depend on + * importing from another package. */ +function encodeEndFrame(next?: string): Uint8Array { + const meta: Record = { _end: 1 }; + if (next) meta.next = next; + return encodeFrame(meta, new Uint8Array(0)); +} + +/** Build a ReadableStream that yields `payload` in fixed-size chunks. Used to + * stress chunk-boundary handling in the decoder. */ +function streamOf(payload: Uint8Array, chunkSize: number) { + let offset = 0; + return new ReadableStream({ + pull(controller) { + if (offset >= payload.byteLength) { + controller.close(); + return; + } + const end = Math.min(offset + chunkSize, payload.byteLength); + controller.enqueue(payload.subarray(offset, end)); + offset = end; + }, + }); +} + +async function drainFrames( + source: ReadableStream +): Promise { + const out: DecodedFrame[] = []; + for await (const f of decodeFrames(source)) out.push(f); + return out; +} + +describe('encodeFrame', () => { + it('produces the canonical wire layout', () => { + const meta = { eventId: 'evnt_abc', n: 42 }; + const body = new Uint8Array([1, 2, 3, 4, 5]); + const frame = encodeFrame(meta, body); + const view = new DataView(frame.buffer); + const metaLen = view.getUint32(0, false); + expect(decode(frame.subarray(4, 4 + metaLen))).toEqual(meta); + const bodyLen = view.getUint32(4 + metaLen, false); + expect(bodyLen).toBe(body.byteLength); + expect(frame.subarray(4 + metaLen + 4)).toEqual(body); + }); +}); + +describe('decodeFrames', () => { + it('round-trips a single frame', async () => { + const meta = { eventType: 'run_created', eventId: 'evnt_1' }; + const body = new TextEncoder().encode('{"hello":"world"}'); + const stream = streamOf( + new Uint8Array([...encodeFrame(meta, body), ...encodeEndFrame()]), + 4096 + ); + const frames = await drainFrames(stream); + expect(frames).toHaveLength(2); + expect(frames[0].meta).toEqual(meta); + expect(frames[0].body).toEqual(body); + expect(frames[1].meta).toEqual({ _end: 1 }); + }); + + it('round-trips multiple frames with cursor', async () => { + const body1 = new TextEncoder().encode('one'); + const body2 = new Uint8Array(64).fill(0xab); + const parts = [ + encodeFrame({ eventId: 'a' }, body1), + encodeFrame({ eventId: 'b' }, body2), + encodeEndFrame('cursor-xyz'), + ]; + let total = 0; + for (const p of parts) total += p.byteLength; + const flat = new Uint8Array(total); + let off = 0; + for (const p of parts) { + flat.set(p, off); + off += p.byteLength; + } + const frames = await drainFrames(streamOf(flat, 256)); + expect(frames).toHaveLength(3); + expect(frames[0].meta).toEqual({ eventId: 'a' }); + expect(frames[0].body).toEqual(body1); + expect(frames[1].meta).toEqual({ eventId: 'b' }); + expect(frames[1].body).toEqual(body2); + expect(frames[2].meta).toEqual({ _end: 1, next: 'cursor-xyz' }); + expect(frames[2].body.byteLength).toBe(0); + }); + + it('handles delivery in 1-byte chunks (worst-case chunk boundary)', async () => { + const body = new Uint8Array(1024); + for (let i = 0; i < body.length; i++) body[i] = (i * 13 + 5) & 0xff; + const flat = new Uint8Array([ + ...encodeFrame({ eventType: 'big', n: 99 }, body), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, 1)); + expect(frames).toHaveLength(2); + expect(frames[0].meta).toEqual({ eventType: 'big', n: 99 }); + expect(frames[0].body).toEqual(body); + expect(frames[1].meta).toEqual({ _end: 1 }); + }); + + it('handles a 64 KB body split across many small chunks', async () => { + const body = new Uint8Array(64 * 1024); + for (let i = 0; i < body.length; i++) body[i] = (i * 7) & 0xff; + const flat = new Uint8Array([ + ...encodeFrame({ eventId: 'big' }, body), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, 37)); + expect(frames[0].body.byteLength).toBe(body.byteLength); + expect(frames[0].body[0]).toBe(body[0]); + expect(frames[0].body[body.length - 1]).toBe(body[body.length - 1]); + }); + + it('handles frames whose body contains bytes that look like length prefixes', async () => { + // 0xff bytes that could trip up a parser that scans for u32 patterns + // rather than honoring the explicit length prefixes. + const body = new Uint8Array(32).fill(0xff); + const flat = new Uint8Array([ + ...encodeFrame({ eventId: 'tricky' }, body), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, 7)); + expect(frames[0].body).toEqual(body); + }); + + it('handles back-to-back frames in a single chunk', async () => { + const flat = new Uint8Array([ + ...encodeFrame({ id: 1 }, new Uint8Array([10, 20, 30])), + ...encodeFrame({ id: 2 }, new Uint8Array([40, 50, 60])), + ...encodeFrame({ id: 3 }, new Uint8Array(0)), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, flat.byteLength)); + expect(frames).toHaveLength(4); + expect(frames[2].body.byteLength).toBe(0); + expect(frames[3].meta._end).toBe(1); + }); + + it('throws when the stream ends mid-frame', async () => { + const partial = encodeFrame({ x: 1 }, new Uint8Array(100)).slice(0, 20); + const stream = streamOf(partial, 1024); + await expect(drainFrames(stream)).rejects.toThrow(/truncated/); + }); + + it('preserves CBOR types in meta (numbers, booleans, arrays)', async () => { + const meta = { + eventId: 'mix', + attempt: 4, + isWebhook: true, + tags: ['a', 'b'], + n: 12345, + }; + const flat = new Uint8Array([ + ...encodeFrame(meta, new Uint8Array(0)), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, 32)); + expect(frames[0].meta).toEqual(meta); + expect(typeof frames[0].meta.attempt).toBe('number'); + expect(typeof frames[0].meta.isWebhook).toBe('boolean'); + expect(Array.isArray(frames[0].meta.tags)).toBe(true); + }); +}); + +describe('V4_FRAME_CONTENT_TYPE', () => { + it('matches the server-side content type', () => { + expect(V4_FRAME_CONTENT_TYPE).toBe('application/vnd.workflow.v4-frames'); + }); +}); diff --git a/packages/world-vercel/src/frames.ts b/packages/world-vercel/src/frames.ts new file mode 100644 index 0000000000..2b24ac8edc --- /dev/null +++ b/packages/world-vercel/src/frames.ts @@ -0,0 +1,110 @@ +/** + * Length-prefixed binary frame codec for the v4 list-events response. + * + * Mirrors the server-side encoder in + * workflow-server/lib/handlers/v4/frames.ts. Wire format: + * + * list-response := frame* end-frame + * frame := u32_be(meta_len) || cbor_meta || u32_be(body_len) || body_bytes + * end-frame := u32_be(meta_len) || cbor_meta {_end: 1, next?: string} || u32_be(0) + */ + +import { decode, encode } from 'cbor-x'; + +export const V4_FRAME_CONTENT_TYPE = 'application/vnd.workflow.v4-frames'; + +export interface DecodedFrame { + meta: Record; + body: Uint8Array; +} + +/** Test/utility: encode a complete frame. Production server uses prefix + * + streaming body. */ +export function encodeFrame( + meta: Record, + body: Uint8Array +): Uint8Array { + const metaBytes = new Uint8Array(encode(meta)); + const out = new Uint8Array(4 + metaBytes.byteLength + 4 + body.byteLength); + const view = new DataView(out.buffer); + view.setUint32(0, metaBytes.byteLength, false); + out.set(metaBytes, 4); + view.setUint32(4 + metaBytes.byteLength, body.byteLength, false); + out.set(body, 4 + metaBytes.byteLength + 4); + return out; +} + +/** + * Async-iterable parser for a frame stream. Yields one `DecodedFrame` + * per frame in source order, terminating at the sentinel frame whose + * meta contains `_end: 1`. The sentinel frame itself IS yielded — the + * caller inspects `meta._end` to detect end-of-stream and reads + * `meta.next` for the pagination cursor. + * + * Survives arbitrary chunk boundaries from the source stream, including + * splits that fall in the middle of a u32 length prefix or in the + * middle of the CBOR meta block. + */ +export async function* decodeFrames( + source: ReadableStream +): AsyncGenerator { + const reader = source.getReader(); + // Accumulating buffer of bytes we've read but not yet consumed. + let buffer = new Uint8Array(0); + + const refill = async (needed: number): Promise => { + while (buffer.byteLength < needed) { + const { done, value } = await reader.read(); + if (done) return false; + if (!value || value.byteLength === 0) continue; + const next = new Uint8Array(buffer.byteLength + value.byteLength); + next.set(buffer, 0); + next.set(value, buffer.byteLength); + buffer = next; + } + return true; + }; + + const take = (n: number): Uint8Array => { + const out = buffer.subarray(0, n); + buffer = buffer.subarray(n); + return out; + }; + + while (true) { + if (!(await refill(4))) return; + const metaLen = new DataView(buffer.buffer, buffer.byteOffset, 4).getUint32( + 0, + false + ); + take(4); + + if (!(await refill(metaLen))) { + throw new Error('decodeFrames: truncated meta block'); + } + const meta = decode(take(metaLen)) as Record; + + if (!(await refill(4))) { + throw new Error('decodeFrames: truncated body length'); + } + const bodyLen = new DataView(buffer.buffer, buffer.byteOffset, 4).getUint32( + 0, + false + ); + take(4); + + if (bodyLen > 0) { + if (!(await refill(bodyLen))) { + throw new Error('decodeFrames: truncated body bytes'); + } + // Slice (not subarray) so the yielded body owns its bytes — + // subsequent reads into the buffer won't overwrite it. + yield { meta, body: buffer.slice(0, bodyLen) }; + take(bodyLen); + } else { + yield { meta, body: new Uint8Array(0) }; + } + + if (meta._end === 1) return; + } +} From a0d2ee8b6adfea12a82249a7689003860e2c1cb1 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 21 May 2026 14:18:33 +0200 Subject: [PATCH 02/10] [v4 branch] Point SDK at workflow-server PR #439 preview deployment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sets WORKFLOW_SERVER_URL_OVERRIDE in packages/world-vercel/src/utils.ts to https://workflow-server-git-peter-v4.vercel.sh so that e2e tests running off this SDK branch exercise the v4-enabled workflow-server preview instead of production. The override is the inline mechanism documented at the constant — when set, it wins over both the default (https://vercel-workflow.com) and the VERCEL_WORKFLOW_SERVER_URL env var. The same pattern is used in v4 testing on the workflow- server side: CI rewrites this string on PR branches. Reset to '' before merging to main. Companion to https://github.com/vercel/workflow-server/pull/439. Updates four tests in utils.test.ts that previously assumed the override is empty. Each affected assertion gets a comment noting what the expectation looks like on main; flipping back to the main behavior is a one-line edit per test when the override is reset. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/utils.test.ts | 25 +++++++++++++++++++++---- packages/world-vercel/src/utils.ts | 7 ++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/packages/world-vercel/src/utils.test.ts b/packages/world-vercel/src/utils.test.ts index 5a077ba3d5..59a0995121 100644 --- a/packages/world-vercel/src/utils.test.ts +++ b/packages/world-vercel/src/utils.test.ts @@ -1,6 +1,13 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { getHeaders, getHttpConfig, getHttpUrl } from './utils.js'; +// On this v4 PR branch, WORKFLOW_SERVER_URL_OVERRIDE in utils.ts is set +// to the workflow-server PR #439 preview deployment so e2e tests can run +// against the v4 server. Several tests below check what happens when the +// inline override is empty — when this override is reset to '' before +// merging to main, the affected expectations need to flip back too. +const V4_SERVER_URL_OVERRIDE = 'https://workflow-server-git-peter-v4.vercel.sh'; + vi.mock('@vercel/oidc', () => ({ getVercelOidcToken: vi.fn().mockRejectedValue(new Error('no OIDC')), })); @@ -19,16 +26,20 @@ describe('getHttpUrl', () => { }); it('uses default workflow-server URL when no config and no env override', () => { + // v4-branch: inline override wins over the default. On main this + // would be 'https://vercel-workflow.com/api'. expect(getHttpUrl()).toEqual({ - baseUrl: 'https://vercel-workflow.com/api', + baseUrl: `${V4_SERVER_URL_OVERRIDE}/api`, usingProxy: false, }); }); it('respects VERCEL_WORKFLOW_SERVER_URL when set (no proxy)', () => { process.env.VERCEL_WORKFLOW_SERVER_URL = 'https://custom-host.example.com'; + // v4-branch: inline override wins over the env var. On main this + // would be 'https://custom-host.example.com/api'. expect(getHttpUrl()).toEqual({ - baseUrl: 'https://custom-host.example.com/api', + baseUrl: `${V4_SERVER_URL_OVERRIDE}/api`, usingProxy: false, }); }); @@ -77,15 +88,21 @@ describe('getHeaders', () => { }); it('omits x-vercel-workflow-api-url when override is unset', () => { + // v4-branch: inline override is set, so the header IS sent (with the + // override URL). On main with override='' the header would be null. const headers = getHeaders(undefined, { usingProxy: true }); - expect(headers.get('x-vercel-workflow-api-url')).toBeNull(); + expect(headers.get('x-vercel-workflow-api-url')).toBe( + V4_SERVER_URL_OVERRIDE + ); }); it('sets x-vercel-workflow-api-url when VERCEL_WORKFLOW_SERVER_URL is set and using proxy', () => { process.env.VERCEL_WORKFLOW_SERVER_URL = 'https://custom.example.com'; + // v4-branch: inline override wins over the env var. On main the + // header would be 'https://custom.example.com'. const headers = getHeaders(undefined, { usingProxy: true }); expect(headers.get('x-vercel-workflow-api-url')).toBe( - 'https://custom.example.com' + V4_SERVER_URL_OVERRIDE ); }); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index c6eb7825eb..1c878c13c0 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -58,8 +58,13 @@ function httpLog( * Inline workflow-server URL override. Must remain an empty string on * `main` — rewritten by external CI for branch-deployment testing. * Prefer `VERCEL_WORKFLOW_SERVER_URL` for deployment-time configuration. + * + * On this v4 branch, set to the workflow-server PR-#439 preview deployment + * (https://github.com/vercel/workflow-server/pull/439) so e2e tests run + * against the v4-enabled server. Reset to '' before merging to main. */ -const WORKFLOW_SERVER_URL_OVERRIDE = ''; +const WORKFLOW_SERVER_URL_OVERRIDE = + 'https://workflow-server-git-peter-v4.vercel.sh'; /** * Per-request timeout for HTTP calls to workflow-server (in ms). From 963d6322a03d17a565f68b5abf6f211e3885130c Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 21 May 2026 17:10:24 +0200 Subject: [PATCH 03/10] [world-vercel] Switch event endpoints to the v4 wire format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The adapter's createWorkflowRunEvent / getEvent / getWorkflowRunEvents now call the v4 endpoints internally instead of v2/v3. Public function signatures and the EventResult / Event / PaginatedResponse shapes returned to the workflow runtime are unchanged — only the wire protocol switches. Wire-format changes the SDK now drives: - Event metadata rides in x-wf-* request/response headers. - User payload is CBOR-encoded by the SDK at the eventData[field] boundary and shipped as opaque body bytes; never parsed on the server. - POST event response carries the materialized EventResult as a CBOR body — see workflow-server PR #439's "Return materialized entity in POST event response body" commit for the matching server-side change. The SDK no longer needs a second round-trip after POST to read run/step state. - LIST events uses the v4 binary-frame stream (application/vnd.workflow.v4-frames). Per-event `/refs` calls are gone — payloads come back inline in each frame. What goes away: - packages/world-vercel/src/refs.ts (deleted) — the /refs ref- hydration path is no longer needed. - hydrateEventRefs / collectPendingRefs / eventDataRefFieldMap and the EventResultResolveWireSchema / EventResultLazyWireSchema / EventWithRefsSchema wire schemas (deleted). - The lazy-refs branching inside createWorkflowRunEvent — the server already respects `remoteRefBehavior` (still sent for `eventsNeedingResolve` types) and bakes the resolution decision into its CBOR response. What stays: - v1Compat path on `createWorkflowRunEvent` (runs.create.v1 / runs.cancel.v1) — still uses v1 endpoints for legacy migrations that haven't moved to event sourcing. - validateUlidTimestamp on run_created, the HookNotFoundError translation on hook_disposed/hook_received 404s, and the stripEventDataRefs path for resolveData='none'. Not yet covered: - listEventsByCorrelationId throws a clear error — v4 has no by-correlation-id list endpoint yet; callers in the hot path have been fetching hooks directly anyway. A future server PR can add /api/v4/events?correlationId= if needed. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/v4-events-client.md | 4 +- packages/world-vercel/src/events-v4.ts | 32 +- packages/world-vercel/src/events.ts | 638 ++++++++++--------------- packages/world-vercel/src/refs.ts | 210 -------- 4 files changed, 290 insertions(+), 594 deletions(-) delete mode 100644 packages/world-vercel/src/refs.ts diff --git a/.changeset/v4-events-client.md b/.changeset/v4-events-client.md index d482741400..aaffed3f6c 100644 --- a/.changeset/v4-events-client.md +++ b/.changeset/v4-events-client.md @@ -1,5 +1,5 @@ --- -"@workflow/world-vercel": minor +"@workflow/world-vercel": major --- -Add v4 event API client: `createWorkflowRunEventV4`, `getEventV4`, `getWorkflowRunEventsV4`. Sends event metadata via `x-wf-*` headers and treats payloads as opaque bytes (streamed end-to-end), eliminating server-side CBOR parsing and the `/refs` round-trip on list responses. The world-vercel adapter still uses the v3 path by default; v4 is exposed for direct callers and a follow-up will switch the adapter over. +Switch the world-vercel adapter's event endpoints from the v2/v3 wire format to v4. Event metadata now rides in `x-wf-*` HTTP headers and payloads stream end-to-end as opaque bytes — no server-side CBOR parse on writes, and no per-event `/refs` round-trip on list responses. POST event response carries the materialized EventResult as a CBOR body. Public `createWorkflowRunEvent` / `getEvent` / `getWorkflowRunEvents` signatures are unchanged; the underlying wire calls swap to v4. `listEventsByCorrelationId` is not yet implemented on v4 and now throws — callers should fetch hooks directly via `storage.hooks.getByToken`. Requires workflow-server with v4 routes mounted. diff --git a/packages/world-vercel/src/events-v4.ts b/packages/world-vercel/src/events-v4.ts index b748343757..adf0750a24 100644 --- a/packages/world-vercel/src/events-v4.ts +++ b/packages/world-vercel/src/events-v4.ts @@ -22,6 +22,7 @@ */ import { getVercelOidcToken } from '@vercel/oidc'; +import { decode } from 'cbor-x'; import { request } from 'undici'; import { decodeFrames, V4_FRAME_CONTENT_TYPE } from './frames.js'; import { getDispatcher } from './http-client.js'; @@ -81,6 +82,23 @@ export interface CreateEventV4Result { eventId: string; runId: string; createdAt: string; + /** + * Materialized-entity bag — CBOR-decoded from the response body. The + * server hands back the same shape v2/v3 use for EventResult so the + * adapter layer can drop these fields into its return value unchanged. + * Keys are unset when the event type doesn't materialize that entity + * kind. + */ + body: { + event?: unknown; + run?: unknown; + step?: unknown; + hook?: unknown; + wait?: unknown; + events?: unknown[]; + cursor?: string | null; + hasMore?: boolean; + }; } /** Apply structured fields onto a Headers object. Non-ASCII string fields @@ -176,8 +194,6 @@ export async function createWorkflowRunEventV4( `v4 createEvent failed: ${response.statusCode} ${errorBody}` ); } - // Discard the response body (204 No Content). - await response.body.text(); const eventId = response.headers[V4_HEADERS.eventId]; const runId = response.headers[V4_HEADERS.runId]; @@ -189,7 +205,17 @@ export async function createWorkflowRunEventV4( ) { throw new Error('v4 createEvent: response missing required x-wf-* headers'); } - return { eventId, runId, createdAt }; + + // Decode the materialized-entity bag from the response body. The server + // always returns a CBOR body now (was 204 in an earlier iteration — + // see workflow-server PR #439 for the corresponding handler change). + const bodyBytes = new Uint8Array(await response.body.arrayBuffer()); + const body = + bodyBytes.byteLength > 0 + ? (decode(bodyBytes) as CreateEventV4Result['body']) + : {}; + + return { eventId, runId, createdAt, body }; } export interface GetEventV4Result { diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 433cedcc7c..d49dbe165b 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -1,104 +1,70 @@ +/** + * world-vercel event functions — v4 wire format throughout. + * + * This module replaces the previous v2/v3 implementation. The v4 wire + * format moves structured event metadata into `x-wf-*` HTTP headers and + * treats payloads as opaque user-data bytes streamed end-to-end. See + * workflow-server/lib/handlers/v4/ for the matching server-side handlers + * and ../events-v4.ts for the wire-level client. + * + * Key shape changes vs. v2/v3: + * + * - POST event response carries the materialized EventResult + * (event/run/step/hook/wait/events/cursor/hasMore) as a CBOR-encoded + * body — the server resolved-refs path is still respected via the + * `remoteRefBehavior` header. + * - GET single event returns metadata in headers + the user payload + * bytes in the response body. + * - LIST events returns a length-prefixed binary frame stream + * (application/vnd.workflow.v4-frames) — one frame per event with + * CBOR metadata + raw payload bytes. The old per-event `/refs` + * round-trip is eliminated. + * + * Public function signatures are unchanged: storage.ts continues to + * wire these as `Storage['events']` and the workflow runtime sees the + * same EventResult / Event / PaginatedResponse shapes it did on + * the v3 path. + */ + import { HookNotFoundError, WorkflowWorldError } from '@workflow/errors'; import { type AnyEventRequest, type CreateEventParams, type Event, type EventResult, - EventSchema, - EventTypeSchema, type GetEventParams, - HookSchema, type ListEventsByCorrelationIdParams, type ListEventsParams, type PaginatedResponse, - PaginatedResponseSchema, stripEventDataRefs, validateUlidTimestamp, type WorkflowRun, - WorkflowRunSchema, } from '@workflow/world'; -import z from 'zod'; -import { - isRefDescriptor, - type RefDescriptor, - type RefWithRunId, - resolveRefDescriptors, -} from './refs.js'; +import { decode, encode } from 'cbor-x'; import { - cancelWorkflowRunV1, - createWorkflowRunV1, - WorkflowRunWireBaseSchema, -} from './runs.js'; -import { deserializeStep, StepWireSchema } from './steps.js'; -import { trace } from './telemetry.js'; -import type { APIConfig } from './utils.js'; + createWorkflowRunEventV4, + getEventV4, + getWorkflowRunEventsV4, + type ListedEventV4, +} from './events-v4.js'; +import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; +import { deserializeStep } from './steps.js'; import { DEFAULT_RESOLVE_DATA_OPTION, + type APIConfig, deserializeError, - makeRequest, } from './utils.js'; -// Wraps stripEventDataRefs to also strip the legacy eventDataRef field, -// since the server always returns lazy refs and callers with -// resolveData='none' should not see them. -function stripEventAndLegacyRefs( - event: any, - resolveData: 'none' | 'all' -): Event { - if (resolveData !== 'none') return event; - const { eventDataRef: _eventDataRef, ...withoutLegacyRef } = event; - return stripEventDataRefs(withoutLegacyRef, resolveData); -} - -// Schema for EventResult wire format returned by events.create. -// Uses wire format schemas for step to handle field name mapping. -// Two variants are used depending on `remoteRefBehavior`: -// - 'resolve': the server returns fully resolved data, so we validate the run -// with the strict WorkflowRunSchema discriminated union (e.g. status:'failed' -// requires error to be present). -// - 'lazy': the server may omit resolved fields (error may be a string or -// undefined), so we use the looser WorkflowRunWireBaseSchema and normalize -// the error via deserializeError() afterward. -const EventResultResolveWireSchema = z.object({ - event: EventSchema.optional(), - run: WorkflowRunSchema.optional(), - step: StepWireSchema.optional(), - hook: HookSchema.optional(), - events: z.array(EventSchema).optional(), - cursor: z.string().nullable().optional(), - hasMore: z.boolean().optional(), -}); - -const EventResultLazyWireSchema = z.object({ - event: EventSchema.optional(), - run: WorkflowRunWireBaseSchema.optional(), - step: StepWireSchema.optional(), - hook: HookSchema.optional(), - events: z.array(EventSchema).optional(), - cursor: z.string().nullable().optional(), - hasMore: z.boolean().optional(), -}); - -// Schema for events returned with `remoteRefBehavior=lazy`. -// Includes both `eventDataRef` (legacy, specVersion=1) and `eventData` -// (v2, specVersion=2 — may contain nested RefDescriptor values). -// specVersion defaults to 1 (legacy) when parsing responses from storage. -const EventWithRefsSchema = z.object({ - eventId: z.string(), - runId: z.string(), - eventType: EventTypeSchema, - correlationId: z.string().optional(), - eventDataRef: z.any().optional(), - eventData: z.any().optional(), - createdAt: z.coerce.date(), - specVersion: z.number().default(1), -}); - /** - * Maps event types to the field name within `eventData` that may contain - * a ref descriptor. Mirrors the server-side `resolveEventDataRefs()` mapping. + * Per-event-type map of the field within `eventData` that holds the user + * payload. Same convention used on the server side + * (workflow-server/lib/handlers/v4/events.ts PAYLOAD_FIELD_BY_EVENT_TYPE). + * + * The v4 wire encoding picks this field out of `eventData`, CBOR-encodes + * its value, and ships it as the request body. Everything else in + * `eventData` becomes a `x-wf-*` header. */ -const eventDataRefFieldMap: Record = { +const PAYLOAD_FIELD_BY_EVENT_TYPE: Record = { run_created: 'input', run_completed: 'output', run_failed: 'error', @@ -110,150 +76,162 @@ const eventDataRefFieldMap: Record = { hook_received: 'payload', }; -// Events where the client uses the response entity data need 'resolve' (default). -// Events where the client discards the response can use 'lazy' to skip expensive -// S3 ref resolution on the server, saving ~200-460ms per event. -const eventsNeedingResolve = new Set([ - 'run_created', // client reads result.run.runId - 'run_started', // client reads result.run (checks startedAt, status) - 'step_started', // client reads result.step (checks attempt, state) +// Events whose POST response the workflow runtime reads immediately +// (so the materialized entity must come back fully resolved). +const eventsNeedingResolve = new Set([ + 'run_created', // runtime reads result.run.runId + 'run_started', // runtime reads result.run (checks startedAt, status) + 'step_started', // runtime reads result.step (checks attempt, state) ]); -/** - * Collect all ref descriptors from a list of lazy-loaded events. - * Returns a flat array of { eventIndex, refType, fieldName?, descriptor } - * entries that can be resolved in bulk. - */ -interface PendingRef { - eventIndex: number; - /** - * 'entity' = top-level eventDataRef (legacy specVersion=1 events) - * 'nested' = nested ref descriptor within eventData (v2 events) - */ - refType: 'entity' | 'nested'; - /** The field name within eventData containing the ref (only for 'nested') */ - fieldName?: string; - descriptor: RefDescriptor; -} - -function collectPendingRefs(events: any[]): PendingRef[] { - const pending: PendingRef[] = []; +// Hook events that 404 when the hook is already disposed or never existed — +// translate to a typed HookNotFoundError so the runtime can branch on it. +const hookEventsRequiringExistence = new Set([ + 'hook_disposed', + 'hook_received', +]); - for (let i = 0; i < events.length; i++) { - const event = events[i]; +// ============================================================================= +// Helpers +// ============================================================================= + +interface SplitEventData { + /** Encoded payload bytes (undefined when the event has no user payload). */ + payload?: Uint8Array; + /** Metadata fields that ride in v4 request headers. */ + meta: { + deploymentId?: string; + workflowName?: string; + stepName?: string; + attempt?: number; + resumeAt?: string; + hookToken?: string; + hookIsWebhook?: boolean; + hookIsSystem?: boolean; + errorCode?: string; + /** Pre-encoded base64 CBOR of executionContext (or undefined). */ + executionContextB64?: string; + }; +} - // Legacy events (specVersion=1): eventDataRef is a RefDescriptor - if (event.eventDataRef && isRefDescriptor(event.eventDataRef)) { - pending.push({ - eventIndex: i, - refType: 'entity', - descriptor: event.eventDataRef, - }); - } +/** + * Split an AnyEventRequest's `eventData` into (a) the payload bytes that + * become the v4 request body and (b) the metadata fields that become + * v4 request headers. + */ +function splitEventDataForV4(data: AnyEventRequest): SplitEventData { + // Some event types in the AnyEventRequest discriminated union (e.g. + // run_cancelled) have no eventData. Cast through unknown so this + // helper can read it defensively without TS narrowing per branch. + const eventData = (( + data as unknown as { eventData?: Record } + ).eventData ?? {}) as Record; + const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[data.eventType]; + const meta: SplitEventData['meta'] = {}; + + if (typeof eventData.deploymentId === 'string') { + meta.deploymentId = eventData.deploymentId; + } + if (typeof eventData.workflowName === 'string') { + meta.workflowName = eventData.workflowName; + } + if (typeof eventData.stepName === 'string') { + meta.stepName = eventData.stepName; + } + if (typeof eventData.attempt === 'number') { + meta.attempt = eventData.attempt; + } + if (typeof eventData.resumeAt === 'string') { + meta.resumeAt = eventData.resumeAt; + } else if (eventData.resumeAt instanceof Date) { + meta.resumeAt = eventData.resumeAt.toISOString(); + } + if (typeof eventData.hookToken === 'string') { + meta.hookToken = eventData.hookToken; + } + if (typeof eventData.isWebhook === 'boolean') { + meta.hookIsWebhook = eventData.isWebhook; + } + if (typeof eventData.isSystem === 'boolean') { + meta.hookIsSystem = eventData.isSystem; + } + if (typeof eventData.errorCode === 'string') { + meta.errorCode = eventData.errorCode; + } + if ( + eventData.executionContext !== undefined && + eventData.executionContext !== null + ) { + const cbor = encode(eventData.executionContext); + meta.executionContextB64 = Buffer.from(cbor).toString('base64'); + } - // V2 events: eventData may contain a nested RefDescriptor - if (event.eventData && typeof event.eventData === 'object') { - const fieldName = eventDataRefFieldMap[event.eventType as string]; - if (fieldName) { - const fieldValue = event.eventData[fieldName]; - if (isRefDescriptor(fieldValue)) { - pending.push({ - eventIndex: i, - refType: 'nested', - fieldName, - descriptor: fieldValue, - }); - } - } + let payload: Uint8Array | undefined; + if (payloadField && payloadField in eventData) { + const value = eventData[payloadField]; + if (value instanceof Uint8Array) { + payload = value; + } else if (value !== undefined) { + // CBOR-encode arbitrary JS values. The server treats the bytes as + // opaque; the SDK reverses this at decode time so the wire layer + // doesn't know about CBOR. + payload = new Uint8Array(encode(value)); } } - return pending; + return { payload, meta }; } /** - * Hydrate lazy-loaded events by resolving all ref descriptors client-side. - * For entity-level refs (eventDataRef), the resolved value becomes eventData. - * For nested refs (eventData[field]), the resolved value replaces the descriptor. + * Turn a v4 single-event response (metadata in headers + opaque body + * bytes) into the Event shape the workflow runtime expects. * - * Events are shallow-cloned before mutation to avoid corrupting any upstream - * caches (SWR, React cache, etc.) that might hold references to the originals. + * Reconstructs `eventData` from header fields and places the CBOR-decoded + * payload value (if any) under the appropriate per-event-type field. */ -async function hydrateEventRefs( - events: any[], - config?: APIConfig, - refResolveConcurrency?: number -): Promise { - const pending = collectPendingRefs(events); - if (pending.length === 0) return events; - - return trace('world.refs.hydrate', async (span) => { - span?.setAttribute('workflow.refs.hydrated_count', pending.length); - - // Deduplicate descriptors by _ref key to avoid redundant resolutions. - // Multiple events may reference the same ref (e.g., shared input). - const uniqueRefs = new Map(); - for (const p of pending) { - if (!uniqueRefs.has(p.descriptor._ref)) { - const eventRunId = events[p.eventIndex].runId as string; - uniqueRefs.set(p.descriptor._ref, { - descriptor: p.descriptor, - runId: eventRunId, - }); - } - } - const deduped = Array.from(uniqueRefs.values()); - - // Resolve unique descriptors in parallel with bounded concurrency - const dedupedResults = await resolveRefDescriptors( - deduped, - config, - refResolveConcurrency - ).catch((err) => { - const msg = err instanceof Error ? err.message : String(err); - throw new Error( - `Failed to hydrate ${pending.length} ref(s) across ${events.length} event(s): ${msg}` - ); - }); - - // Build a map from ref key → resolved value for fast lookup - const resolvedMap = new Map(); - const dedupedKeys = Array.from(uniqueRefs.keys()); - for (let i = 0; i < dedupedKeys.length; i++) { - resolvedMap.set(dedupedKeys[i], dedupedResults[i]); - } - - // Shallow-clone events that need modification, then apply resolved values - const result = [...events]; - for (let i = 0; i < pending.length; i++) { - const { eventIndex, refType, fieldName, descriptor } = pending[i]; - const resolved = resolvedMap.get(descriptor._ref); - - // Shallow-clone the event (and eventData if nested) before mutating - if (result[eventIndex] === events[eventIndex]) { - result[eventIndex] = { ...events[eventIndex] }; - } - const event = result[eventIndex]; - - if (refType === 'entity') { - // Legacy: eventDataRef → eventData, remove the ref field - event.eventData = resolved; - delete event.eventDataRef; - } else if (refType === 'nested' && fieldName) { - // Shallow-clone eventData before mutating if not yet cloned - if (event.eventData === events[eventIndex].eventData) { - event.eventData = { ...event.eventData }; - } - // V2: replace the nested ref descriptor with resolved value - event.eventData[fieldName] = resolved; +function buildEventFromV4( + meta: Omit, + body: Uint8Array, + resolveData: 'none' | 'all' +): Event { + const eventData: Record = {}; + if (meta.workflowName) eventData.workflowName = meta.workflowName; + if (meta.stepName) eventData.stepName = meta.stepName; + if (meta.attempt !== undefined) eventData.attempt = meta.attempt; + if (meta.deploymentId) eventData.deploymentId = meta.deploymentId; + if (meta.errorCode) eventData.errorCode = meta.errorCode; + + if (resolveData === 'all' && body.byteLength > 0) { + const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[meta.eventType]; + if (payloadField) { + try { + eventData[payloadField] = decode(body); + } catch { + // CBOR decode failure — fall back to the raw bytes so callers + // can still inspect the payload if they know its format. + eventData[payloadField] = body; } } + } - return result; - }); + const event = { + eventId: meta.eventId, + runId: meta.runId, + eventType: meta.eventType, + createdAt: new Date(meta.createdAt), + ...(meta.correlationId ? { correlationId: meta.correlationId } : {}), + eventData, + } as unknown as Event; + + // For resolveData='none', strip eventData entirely. Use the existing + // world-side helper so behavior stays in sync with other backends. + return resolveData === 'none' ? stripEventDataRefs(event, 'none') : event; } -// Functions +// ============================================================================= +// Public API +// ============================================================================= + export async function getEvent( runId: string, eventId: string, @@ -261,123 +239,50 @@ export async function getEvent( config?: APIConfig ): Promise { const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; - const remoteRefBehavior = resolveData === 'none' ? 'lazy' : 'resolve'; - - const searchParams = new URLSearchParams(); - searchParams.set('remoteRefBehavior', remoteRefBehavior); - - const queryString = searchParams.toString(); - const endpoint = `/v3/runs/${encodeURIComponent(runId)}/events/${encodeURIComponent(eventId)}${queryString ? `?${queryString}` : ''}`; - - const event = await makeRequest({ - endpoint, - options: { method: 'GET' }, - config, - schema: (resolveData === 'none' ? EventWithRefsSchema : EventSchema) as any, - }); - - return stripEventAndLegacyRefs(event as any, resolveData); + const result = await getEventV4(runId, eventId, config); + const { body, ...meta } = result; + return buildEventFromV4(meta, body, resolveData); } export async function getWorkflowRunEvents( params: ListEventsParams | ListEventsByCorrelationIdParams, config?: APIConfig ): Promise> { - const searchParams = new URLSearchParams(); - const { pagination, resolveData = DEFAULT_RESOLVE_DATA_OPTION } = params; - let runId: string | undefined; - let correlationId: string | undefined; - if ('runId' in params) { - runId = params.runId; - } else { - correlationId = params.correlationId; - } - - if (!runId && !correlationId) { - throw new Error('Either runId or correlationId must be provided'); + if ('correlationId' in params) { + // v4 has no list-by-correlation-id endpoint yet. Throw a clear error + // until a server-side endpoint lands — callers that hit this path + // historically used the by-correlation-id query for hook lookup and + // can be migrated to direct hook fetches. + throw new Error( + 'world-vercel v4: listEventsByCorrelationId is not yet implemented. ' + + 'Fetch the hook directly via storage.hooks.getByToken or use ' + + 'storage.events.list(runId) on a known run.' + ); } - if (pagination?.limit) searchParams.set('limit', pagination.limit.toString()); - if (pagination?.cursor) searchParams.set('cursor', pagination.cursor); - if (pagination?.sortOrder) - searchParams.set('sortOrder', pagination.sortOrder); - if (correlationId) searchParams.set('correlationId', correlationId); - - // Always send 'lazy' to the server to avoid memory pressure from resolving - // all refs in memory. When resolveData is 'all', we hydrate refs client-side - // via individual ref resolution requests. - searchParams.set('remoteRefBehavior', 'lazy'); - - const queryString = searchParams.toString(); - const query = queryString ? `?${queryString}` : ''; - const endpoint = correlationId - ? `/v2/events${query}` - : `/v3/runs/${encodeURIComponent(runId!)}/events${query}`; - - let refResolveConcurrency: number | undefined; - const response = (await makeRequest({ - endpoint, - options: { method: 'GET' }, - config, - schema: PaginatedResponseSchema(EventWithRefsSchema), - onResponse: (res) => { - const header = res.headers.get('x-ref-resolve-concurrency'); - if (header) { - const parsed = parseInt(header, 10); - if (!Number.isNaN(parsed) && parsed > 0) { - refResolveConcurrency = parsed; - } - } + const result = await getWorkflowRunEventsV4( + params.runId, + { + cursor: pagination?.cursor ?? undefined, + limit: pagination?.limit, + sortOrder: pagination?.sortOrder, }, - })) as PaginatedResponse; + config + ); - if (resolveData === 'all') { - // Hydrate refs client-side: resolve all ref descriptors in parallel - const hydratedEvents = await hydrateEventRefs( - response.data, - config, - refResolveConcurrency - ); - - // Re-parse hydrated events through EventSchema to apply type coercions - // (e.g., z.coerce.date() for resumeAt) that EventWithRefsSchema skips. - // Use safeParse to gracefully handle any events that don't match a known - // type — pass them through as-is rather than failing the entire request. - let coercionFailures = 0; - const validatedEvents = hydratedEvents.map((event: any) => { - const result = EventSchema.safeParse(event); - if (!result.success) coercionFailures++; - return result.success ? result.data : event; - }); - if (coercionFailures > 0) { - console.warn( - `[world-vercel] EventSchema coercion failed for ${coercionFailures}/${hydratedEvents.length} events` - ); - } - - return { - ...response, - data: validatedEvents, - }; - } + const events = result.events.map((listed) => { + const { body, ...meta } = listed; + return buildEventFromV4(meta, body, resolveData); + }); - // resolveData === 'none': strip eventData and eventDataRef return { - ...response, - data: response.data.map((event: any) => - stripEventAndLegacyRefs(event, resolveData) - ), - }; + data: events, + cursor: result.next ?? null, + hasMore: Boolean(result.next), + } as PaginatedResponse; } -// Event types that require the hook to already exist — a 404 on these -// means the hook was already disposed or never created. -const hookEventsRequiringExistence = new Set([ - 'hook_disposed', - 'hook_received', -]); - export async function createWorkflowRunEvent( id: string | null, data: AnyEventRequest, @@ -387,10 +292,7 @@ export async function createWorkflowRunEvent( try { return await createWorkflowRunEventInner(id, data, params, config); } catch (err) { - // Translate 404 to HookNotFoundError for hook-related events. - // makeRequest() throws a generic WorkflowWorldError for all 404s; - // on the hook_disposed / hook_received path a 404 means the hook - // was already disposed or never created. + // 404 on hook_disposed / hook_received → already-disposed hook. if ( hookEventsRequiringExistence.has(data.eventType) && WorkflowWorldError.is(err) && @@ -409,99 +311,77 @@ async function createWorkflowRunEventInner( params?: CreateEventParams, config?: APIConfig ): Promise { - const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; - - const v1Compat = params?.v1Compat ?? false; - if (v1Compat) { + // v1Compat: caller wants the legacy entity-mutation endpoints (used + // for migrating SDKs that haven't switched to event sourcing yet). + // Keep this on v1 routes — the v4 protocol does not cover it. + if (params?.v1Compat) { if (data.eventType === 'run_cancelled' && id) { const run = await cancelWorkflowRunV1(id, params, config); return { run: run as WorkflowRun }; - } else if (data.eventType === 'run_created') { + } + if (data.eventType === 'run_created') { const run = await createWorkflowRunV1(data.eventData, config); return { run }; } - const wireResult = await makeRequest({ - endpoint: `/v1/runs/${encodeURIComponent(id!)}/events`, - options: { method: 'POST' }, - data, - config, - schema: EventSchema, - }); + throw new Error( + `world-vercel: v1Compat=true is only supported for run_created ` + + `and run_cancelled, not ${data.eventType}` + ); + } - return { event: wireResult }; + if (id === null) { + throw new WorkflowWorldError( + 'world-vercel v4: createWorkflowRunEvent requires a client-generated ' + + 'runId for run_created (the runId is part of the S3 ref key). ' + + 'Generate a wrun_ ULID before calling.', + { status: 400 } + ); } - // Validate client-provided runId timestamp is within acceptable threshold - if (data.eventType === 'run_created' && id) { + // Defensive check for client-generated run_created IDs that ride too + // far ahead of wall-clock time — same threshold the v3 path enforced. + if (data.eventType === 'run_created') { const validationError = validateUlidTimestamp(id, 'wrun_'); if (validationError) { throw new WorkflowWorldError(validationError, { status: 400 }); } } - // For run_created events, runId may be client-provided or null - const runIdPath = id === null ? 'null' : encodeURIComponent(id); - const remoteRefBehavior = eventsNeedingResolve.has(data.eventType) ? 'resolve' : 'lazy'; - // Use the strict schema when the server resolves all refs (preserves the - // WorkflowRunSchema discriminated union), and the loose wire schema when - // the server returns lazy refs (error may be a string or undefined). - if (remoteRefBehavior === 'resolve') { - const wireResult = await makeRequest({ - endpoint: `/v3/runs/${runIdPath}/events`, - options: { method: 'POST' }, - data: { - ...data, - remoteRefBehavior, - ...(params?.requestId ? { vercelId: params.requestId } : {}), - }, - config, - schema: EventResultResolveWireSchema, - }); + const { payload, meta } = splitEventDataForV4(data); - return { - event: wireResult.event - ? stripEventAndLegacyRefs(wireResult.event, resolveData) - : undefined, - run: wireResult.run, - step: wireResult.step ? deserializeStep(wireResult.step) : undefined, - hook: wireResult.hook, - events: wireResult.events, - cursor: wireResult.cursor, - hasMore: wireResult.hasMore, - }; - } - - const wireResult = await makeRequest({ - endpoint: `/v3/runs/${runIdPath}/events`, - options: { method: 'POST' }, - data: { - ...data, - remoteRefBehavior, + const result = await createWorkflowRunEventV4( + { + runId: id, + eventType: data.eventType, + specVersion: data.specVersion ?? 2, + ...(data.correlationId ? { correlationId: data.correlationId } : {}), ...(params?.requestId ? { vercelId: params.requestId } : {}), + remoteRefBehavior, + payload, + ...meta, }, - config, - schema: EventResultLazyWireSchema, - }); + config + ); - // Transform wire format to interface format. In the current event-sourced - // model, the run/step error fields are SerializedData (Uint8Array) — the - // deserializeError/deserializeStep helpers are pass-throughs that handle - // any legacy wire-format variants. + // The server already CBOR-decoded into result.body — just thread the + // fields through. Step has a wire-format adapter; runs use the + // pass-through deserializeError helper. + const body = result.body; return { - event: wireResult.event - ? stripEventAndLegacyRefs(wireResult.event, resolveData) + event: body.event as Event | undefined, + run: body.run + ? deserializeError(body.run as Record) : undefined, - run: wireResult.run - ? deserializeError(wireResult.run) + step: body.step + ? deserializeStep(body.step as Parameters[0]) : undefined, - step: wireResult.step ? deserializeStep(wireResult.step) : undefined, - hook: wireResult.hook, - events: wireResult.events, - cursor: wireResult.cursor, - hasMore: wireResult.hasMore, + hook: body.hook as EventResult['hook'], + events: body.events as EventResult['events'], + cursor: body.cursor ?? undefined, + hasMore: body.hasMore, }; } diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts deleted file mode 100644 index e703db66c2..0000000000 --- a/packages/world-vercel/src/refs.ts +++ /dev/null @@ -1,210 +0,0 @@ -import { WorkflowWorldError } from '@workflow/errors'; -import { decode } from 'cbor-x'; -import { getDispatcher } from './http-client.js'; -import { - ErrorType, - getSpanKind, - HttpRequestMethod, - HttpResponseStatusCode, - PeerService, - trace, - UrlFull, -} from './telemetry.js'; -import { type APIConfig, getHttpConfig } from './utils.js'; - -/** - * A ref descriptor as returned by workflow-server when `remoteRefBehavior=lazy`. - * Matches the server-side `RefDescriptor` type in `lib/data/remote-ref.ts`. - */ -export interface RefDescriptor { - _type: 'RemoteRef'; - _ref: string; - /** Base64-encoded inline payload. Present only for dbrf: (inline) refs. */ - _data?: string; - /** Content type of the inline payload. Present only for dbrf: refs. */ - _ct?: string; -} - -/** - * Checks if a value is a RefDescriptor object. - */ -export function isRefDescriptor(value: unknown): value is RefDescriptor { - return ( - typeof value === 'object' && - value !== null && - '_type' in value && - '_ref' in value && - typeof (value as { _ref: unknown })._ref === 'string' && - (value as { _type: string })._type === 'RemoteRef' - ); -} - -/** - * Maximum number of concurrent ref resolution requests. - * Limits peak concurrency to avoid overwhelming the server. - */ -const REF_RESOLVE_CONCURRENCY = 10; - -/** - * Resolve a single ref descriptor. - * - * For inline refs (dbrf: prefix), the data is decoded locally from the - * descriptor's `_data` field — no network request is needed. - * - * For S3 refs (s3rf:) and Redis refs (kvrf:), a request is made to the - * `GET /v2/runs/:runId/refs` endpoint on workflow-server which returns - * raw CBOR or binary bytes. - * - * @param descriptor - The ref descriptor to resolve - * @param runId - The runId that owns this ref (used in the URL path) - * @param config - API configuration - */ -export async function resolveRefDescriptor( - descriptor: RefDescriptor, - runId: string, - config?: APIConfig -): Promise { - const ref = descriptor._ref; - - // Inline refs (dbrf:) carry their data in the descriptor — decode locally - if (ref.startsWith('dbrf:')) { - if (!descriptor._data) { - throw new Error(`Inline ref descriptor missing _data field: ${ref}`); - } - const contentType = descriptor._ct ?? 'application/cbor'; - const binaryData = Buffer.from(descriptor._data, 'base64'); - if (contentType === 'application/octet-stream') { - // Buffer is a Uint8Array subclass — return directly to avoid a copy. - return binaryData; - } - // CBOR-encoded data — decode it. Buffer is accepted by cbor-x directly. - return decode(binaryData); - } - - // Remote refs (s3rf:, kvrf:) — fetch raw bytes from the server. - // The server returns the raw stored bytes directly (not wrapped in a - // JSON/CBOR envelope). The Content-Type may be 'application/cbor' (for - // CBOR-encoded data) or 'application/octet-stream' (for raw binary like - // Uint8Array). We handle both content types directly rather than going - // through makeRequest, which only handles JSON/CBOR API responses. - const { baseUrl, headers } = await getHttpConfig(config); - const endpoint = `/v2/runs/${encodeURIComponent(runId)}/refs?ref=${encodeURIComponent(ref)}`; - const url = `${baseUrl}${endpoint}`; - - // Set headers that makeRequest normally adds: Accept for content - // negotiation and X-Request-Time to bypass RSC request memoization. - headers.set('Accept', 'application/cbor, application/octet-stream'); - headers.set('X-Request-Time', Date.now().toString()); - - return trace( - 'http GET', - { kind: await getSpanKind('CLIENT') }, - async (span) => { - span?.setAttributes({ - ...HttpRequestMethod('GET'), - ...UrlFull(url), - ...PeerService('workflow-server'), - }); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit - const response = await fetch(url, { - method: 'GET', - headers, - dispatcher: getDispatcher(), - } as any); - - span?.setAttributes({ - ...HttpResponseStatusCode(response.status), - }); - - if (!response.ok) { - const error = new WorkflowWorldError( - `Failed to resolve ref: HTTP ${response.status}`, - { url, status: response.status } - ); - span?.setAttributes({ - ...ErrorType(`HTTP ${response.status}`), - }); - span?.recordException?.(error); - throw error; - } - - const contentType = response.headers.get('content-type') || ''; - const buffer = await response.arrayBuffer(); - - if (contentType.includes('application/octet-stream')) { - // Raw binary data (e.g., Uint8Array stored by the workflow) - return new Uint8Array(buffer); - } - - // CBOR-encoded data (the common case for structured values) - return decode(new Uint8Array(buffer)); - } - ); -} - -/** - * A ref descriptor paired with the runId that owns it, for resolution. - */ -export interface RefWithRunId { - descriptor: RefDescriptor; - runId: string; -} - -/** - * Resolve multiple ref descriptors in parallel with bounded concurrency. - * - * If any ref in a batch fails, the batch rejects and remaining batches - * are aborted to avoid cascading failures. - * - * @param refs - Array of ref descriptors with their owning runIds - * @param config - API configuration - * @param concurrency - Max concurrent ref resolution requests. Falls back to REF_RESOLVE_CONCURRENCY. - * @returns Array of resolved values in the same order as input - */ -export async function resolveRefDescriptors( - refs: RefWithRunId[], - config?: APIConfig, - concurrency?: number -): Promise { - if (refs.length === 0) return []; - - const limit = concurrency ?? REF_RESOLVE_CONCURRENCY; - - return trace('world.refs.resolve', async (span) => { - const inlineCount = refs.filter((r) => - r.descriptor._ref.startsWith('dbrf:') - ).length; - const remoteCount = refs.length - inlineCount; - - span?.setAttributes({ - 'workflow.refs.total_count': refs.length, - 'workflow.refs.inline_count': inlineCount, - 'workflow.refs.remote_count': remoteCount, - 'workflow.refs.concurrency_limit': limit, - }); - - // Simple case: if under concurrency limit, resolve all at once - if (refs.length <= limit) { - return Promise.all( - refs.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config)) - ); - } - - // Batch with bounded concurrency. If any ref in a batch fails, - // the batch rejects and remaining batches are aborted to avoid - // cascading failures. - const results: unknown[] = new Array(refs.length); - for (let i = 0; i < refs.length; i += limit) { - const batch = refs.slice(i, i + limit); - const batchResults = await Promise.all( - batch.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config)) - ); - for (let j = 0; j < batchResults.length; j++) { - results[i + j] = batchResults[j]; - } - } - - return results; - }); -} From ae8f4693031fc2233e520c46187fc75258a880fd Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 25 May 2026 10:28:39 +0200 Subject: [PATCH 04/10] [world-vercel] Consume new v4 GET/LIST event-entity shape; symmetric CBOR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The matching server-side change (workflow-server PR #439, commit 5d79cf1) now returns: - GET single event: full event entity CBOR-encoded in the body (resolves refs server-side and bakes payload into eventData). - LIST events: each frame's meta is the full event entity (CBOR), payload stays as a RefDescriptor in eventData[field], resolved bytes ride in the frame body. This commit threads that through the world-vercel adapter: - events-v4.ts: * getEventV4 returns DecodedV4Event (CBOR-decode of response body). Drop the parseEventMetaFromHeaders / readHeader-driven reconstruction. * ListedEventV4 carries `{ event: DecodedV4Event, body: Uint8Array }` — the full entity plus the resolved payload bytes to splice in. - events.ts: * buildEventFromV4 takes (decoded entity, payload bytes) and splices the bytes into eventData[payloadField] for the LIST path. For the GET path the server already baked the bytes in, so buildEventFromV4 is called with an empty body. * CBOR-decode the payload bytes back into the original JS value on read. Matches the unconditional CBOR-encode on write (Uint8Array round-trips via cbor-x's binary type). Why this matters: the workflow runtime's replay path reads arbitrary fields off eventData (executionContext, hookToken, isWebhook, resumeAt, error shape, …). The previous cherry-picked-metadata shape dropped those fields, which is why every E2E Vercel Prod test on this branch was getting stuck after run_started with no further events — the runtime's invocation of the workflow function on the workbench deployment couldn't reconstruct state correctly. Companion to workflow-server PR #439 commit 5d79cf1. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/events-v4.ts | 119 ++++++++----------------- packages/world-vercel/src/events.ts | 99 ++++++++++++-------- 2 files changed, 98 insertions(+), 120 deletions(-) diff --git a/packages/world-vercel/src/events-v4.ts b/packages/world-vercel/src/events-v4.ts index adf0750a24..a96baf6e81 100644 --- a/packages/world-vercel/src/events-v4.ts +++ b/packages/world-vercel/src/events-v4.ts @@ -218,19 +218,21 @@ export async function createWorkflowRunEventV4( return { eventId, runId, createdAt, body }; } -export interface GetEventV4Result { +/** + * Decoded event entity returned by GET /api/v4/runs/:runId/events/:eventId. + * The server CBOR-encodes the full entity with refs resolved server-side, + * so the payload field (input/output/result/error/payload/metadata + * depending on eventType) already contains the resolved bytes — the + * adapter layer doesn't need to splice them in. + */ +export interface DecodedV4Event { eventId: string; runId: string; eventType: string; - createdAt: string; correlationId?: string; - workflowName?: string; - stepName?: string; - attempt?: number; - deploymentId?: string; - errorCode?: string; - /** The raw payload bytes (possibly empty). Caller decodes. */ - body: Uint8Array; + createdAt: Date | string; + specVersion?: number; + eventData?: Record; } function readHeader( @@ -243,47 +245,19 @@ function readHeader( return undefined; } -function parseEventMetaFromHeaders( - responseHeaders: Record -): Omit { - const eventId = readHeader(responseHeaders, V4_HEADERS.eventId); - const runId = readHeader(responseHeaders, V4_HEADERS.runId); - const eventType = readHeader(responseHeaders, V4_HEADERS.eventType); - const createdAt = readHeader(responseHeaders, V4_HEADERS.createdAt); - if (!eventId || !runId || !eventType || !createdAt) { - throw new Error('v4 getEvent: response missing required x-wf-* headers'); - } - const correlationId = readHeader(responseHeaders, V4_HEADERS.correlationId); - const workflowName = readHeader(responseHeaders, V4_HEADERS.workflowName); - const stepName = readHeader(responseHeaders, V4_HEADERS.stepName); - const attemptStr = readHeader(responseHeaders, V4_HEADERS.attempt); - const deploymentId = readHeader(responseHeaders, V4_HEADERS.deploymentId); - const errorCode = readHeader(responseHeaders, V4_HEADERS.errorCode); - return { - eventId, - runId, - eventType, - createdAt, - ...(correlationId ? { correlationId } : {}), - ...(workflowName ? { workflowName: decodeURIComponent(workflowName) } : {}), - ...(stepName ? { stepName: decodeURIComponent(stepName) } : {}), - ...(attemptStr ? { attempt: Number(attemptStr) } : {}), - ...(deploymentId ? { deploymentId: decodeURIComponent(deploymentId) } : {}), - ...(errorCode ? { errorCode: decodeURIComponent(errorCode) } : {}), - }; -} - /** * GET /api/v4/runs/:runId/events/:eventId * - * Returns the event metadata (parsed from response headers) along - * with the payload body as a Uint8Array. + * Returns the full event entity (CBOR-decoded from the response body). + * The server resolves the payload ref server-side, so eventData already + * contains the resolved bytes — callers consume it the same way they + * did on v3 GET event. */ export async function getEventV4( runId: string, eventId: string, config?: APIConfig -): Promise { +): Promise { const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); const headers = new Headers(baseHeaders); await setAuthHeader(headers, config); @@ -298,9 +272,8 @@ export async function getEventV4( const errorBody = await response.body.text(); throw new Error(`v4 getEvent failed: ${response.statusCode} ${errorBody}`); } - const meta = parseEventMetaFromHeaders(response.headers); - const body = new Uint8Array(await response.body.arrayBuffer()); - return { ...meta, body }; + const bodyBytes = new Uint8Array(await response.body.arrayBuffer()); + return decode(bodyBytes) as DecodedV4Event; } export interface ListEventsV4Params { @@ -309,17 +282,16 @@ export interface ListEventsV4Params { sortOrder?: 'asc' | 'desc'; } +/** + * A single event extracted from a v4 LIST frame. Mirrors `DecodedV4Event` + * but also carries the raw payload bytes — for payload-bearing events the + * server emits the resolved bytes in the frame body (so it never has to + * decode them) and the SDK is expected to splice them back into the + * appropriate `eventData` field. + */ export interface ListedEventV4 { - eventId: string; - runId: string; - eventType: string; - createdAt: string; - correlationId?: string; - workflowName?: string; - stepName?: string; - attempt?: number; - deploymentId?: string; - errorCode?: string; + event: DecodedV4Event; + /** Resolved payload bytes. Empty for events without a payload. */ body: Uint8Array; } @@ -333,13 +305,15 @@ export interface ListEventsV4Result { * GET /api/v4/runs/:runId/events * * Parses the binary-frame stream into a list of events plus the - * pagination cursor (from the sentinel frame). + * pagination cursor (from the sentinel frame). Each frame's CBOR meta + * IS the full event entity, with the payload field still in `eventData` + * as a `RefDescriptor` (lazy); the resolved payload bytes ride in the + * frame body. The adapter layer splices them back into eventData. * - * NOTE: this implementation eagerly drains the stream into memory. A - * streaming variant that yields events one at a time without buffering - * the whole page is a straightforward refactor (decodeFrames is already - * an async generator); we keep this signature for parity with the - * existing `getWorkflowRunEvents` callers in the world-vercel adapter. + * Eagerly drains the stream into memory to match the existing + * `getWorkflowRunEvents` page-at-a-time contract. A streaming variant + * that yields events one at a time without buffering the page would be + * a small refactor (decodeFrames is already async-iterable). */ export async function getWorkflowRunEventsV4( runId: string, @@ -393,27 +367,10 @@ export async function getWorkflowRunEventsV4( if (typeof frame.meta.next === 'string') next = frame.meta.next; break; } - const meta = frame.meta as Record; - const event: ListedEventV4 = { - eventId: String(meta.eventId ?? ''), - runId: String(meta.runId ?? ''), - eventType: String(meta.eventType ?? ''), - createdAt: String(meta.createdAt ?? ''), + events.push({ + event: frame.meta as unknown as DecodedV4Event, body: frame.body, - }; - if (typeof meta.correlationId === 'string') { - event.correlationId = meta.correlationId; - } - if (typeof meta.workflowName === 'string') { - event.workflowName = meta.workflowName; - } - if (typeof meta.stepName === 'string') event.stepName = meta.stepName; - if (typeof meta.attempt === 'number') event.attempt = meta.attempt; - if (typeof meta.deploymentId === 'string') { - event.deploymentId = meta.deploymentId; - } - if (typeof meta.errorCode === 'string') event.errorCode = meta.errorCode; - events.push(event); + }); } return { events, ...(next ? { next } : {}) }; diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index d49dbe165b..5f5be75335 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -43,9 +43,9 @@ import { import { decode, encode } from 'cbor-x'; import { createWorkflowRunEventV4, + type DecodedV4Event, getEventV4, getWorkflowRunEventsV4, - type ListedEventV4, } from './events-v4.js'; import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; import { deserializeStep } from './steps.js'; @@ -169,12 +169,12 @@ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { let payload: Uint8Array | undefined; if (payloadField && payloadField in eventData) { const value = eventData[payloadField]; - if (value instanceof Uint8Array) { - payload = value; - } else if (value !== undefined) { - // CBOR-encode arbitrary JS values. The server treats the bytes as - // opaque; the SDK reverses this at decode time so the wire layer - // doesn't know about CBOR. + if (value !== undefined) { + // Always CBOR-encode, including Uint8Array (cbor-x represents it as + // a binary type that round-trips back to Uint8Array on decode). The + // server stores the bytes opaquely; the SDK does the symmetric + // decode on read. Keeping the encoding unconditional means we never + // have to track "is this raw bytes or CBOR?" on the wire. payload = new Uint8Array(encode(value)); } } @@ -183,48 +183,69 @@ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { } /** - * Turn a v4 single-event response (metadata in headers + opaque body - * bytes) into the Event shape the workflow runtime expects. + * Turn a v4 event (full entity from GET single-event, or LIST frame + * meta + body) into the Event shape the workflow runtime expects. * - * Reconstructs `eventData` from header fields and places the CBOR-decoded - * payload value (if any) under the appropriate per-event-type field. + * The server-side GET resolves refs server-side and bakes the payload + * bytes into eventData, so `payloadBody` is empty there. The LIST path + * keeps the payload as a `RefDescriptor` in `eventData[fieldName]` and + * delivers the resolved bytes in `payloadBody`; this helper splices them + * back in so the runtime sees a uniform shape. */ function buildEventFromV4( - meta: Omit, - body: Uint8Array, + decoded: DecodedV4Event, + payloadBody: Uint8Array, resolveData: 'none' | 'all' ): Event { - const eventData: Record = {}; - if (meta.workflowName) eventData.workflowName = meta.workflowName; - if (meta.stepName) eventData.stepName = meta.stepName; - if (meta.attempt !== undefined) eventData.attempt = meta.attempt; - if (meta.deploymentId) eventData.deploymentId = meta.deploymentId; - if (meta.errorCode) eventData.errorCode = meta.errorCode; + const eventData = (decoded.eventData ?? {}) as Record; - if (resolveData === 'all' && body.byteLength > 0) { - const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[meta.eventType]; + if (payloadBody.byteLength > 0) { + const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[decoded.eventType]; if (payloadField) { + // CBOR-decode the bytes to recover the original JS value the SDK + // encoded on the write side. Symmetric with splitEventDataForV4. try { - eventData[payloadField] = decode(body); + eventData[payloadField] = decode(payloadBody); } catch { - // CBOR decode failure — fall back to the raw bytes so callers - // can still inspect the payload if they know its format. - eventData[payloadField] = body; + // If decode fails, leave the raw bytes — the consumer can + // inspect them as a Uint8Array. This is a defensive path; in + // practice the SDK is the only producer here. + eventData[payloadField] = payloadBody; + } + } + } + + // For the GET-single-event path, the server already resolved the ref + // server-side, so eventData[payloadField] is a Uint8Array of the CBOR + // bytes the SDK originally sent. Decode it the same way. + if (payloadBody.byteLength === 0) { + const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[decoded.eventType]; + if (payloadField && eventData[payloadField] instanceof Uint8Array) { + try { + eventData[payloadField] = decode(eventData[payloadField] as Uint8Array); + } catch { + // leave as-is } } } const event = { - eventId: meta.eventId, - runId: meta.runId, - eventType: meta.eventType, - createdAt: new Date(meta.createdAt), - ...(meta.correlationId ? { correlationId: meta.correlationId } : {}), + eventId: decoded.eventId, + runId: decoded.runId, + eventType: decoded.eventType, + createdAt: + decoded.createdAt instanceof Date + ? decoded.createdAt + : new Date(decoded.createdAt), + ...(decoded.correlationId ? { correlationId: decoded.correlationId } : {}), eventData, + ...(decoded.specVersion !== undefined + ? { specVersion: decoded.specVersion } + : {}), } as unknown as Event; - // For resolveData='none', strip eventData entirely. Use the existing - // world-side helper so behavior stays in sync with other backends. + // For resolveData='none', strip eventData entirely. Reuse the world- + // side helper so behavior stays in sync with other backends. return resolveData === 'none' ? stripEventDataRefs(event, 'none') : event; } @@ -239,9 +260,10 @@ export async function getEvent( config?: APIConfig ): Promise { const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; - const result = await getEventV4(runId, eventId, config); - const { body, ...meta } = result; - return buildEventFromV4(meta, body, resolveData); + const decoded = await getEventV4(runId, eventId, config); + // GET resolves refs server-side and bakes the payload into eventData, + // so there's no separate body slot to splice in here. + return buildEventFromV4(decoded, new Uint8Array(0), resolveData); } export async function getWorkflowRunEvents( @@ -271,10 +293,9 @@ export async function getWorkflowRunEvents( config ); - const events = result.events.map((listed) => { - const { body, ...meta } = listed; - return buildEventFromV4(meta, body, resolveData); - }); + const events = result.events.map((listed) => + buildEventFromV4(listed.event, listed.body, resolveData) + ); return { data: events, From 6b731bf2a9343be322879c66d7dea0b220697dce Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 25 May 2026 10:49:16 +0200 Subject: [PATCH 05/10] [world-vercel] Consume the v4 GET single-event frame stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Companion to workflow-server PR #439 commit 2a55acd, which switched GET /api/v4/runs/:runId/events/:eventId from a CBOR-entity body to a single v4 frame (same wire shape as one LIST frame). - getEventV4 now returns `{ event: DecodedV4Event, body: Uint8Array }` by reading exactly one frame off the response body via `decodeFrames` — same reader the LIST path uses. No content-type branching, no separate CBOR decode path. - getEvent (the storage adapter wrapper) passes both pieces to `buildEventFromV4`, which splices the CBOR-decoded body into `eventData[payloadField]`. Same path LIST already uses, so no GET-specific shape exists anymore. - Drop the special-case fallback in `buildEventFromV4` that used to re-decode an in-eventData Uint8Array — only one input shape now. Net effect: server-side memory for GET single-event is now bounded by S3 chunk size (~64 KB) instead of full payload size. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/events-v4.ts | 31 ++++++++++++++----- packages/world-vercel/src/events.ts | 43 ++++++++------------------ 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/packages/world-vercel/src/events-v4.ts b/packages/world-vercel/src/events-v4.ts index a96baf6e81..56a07c7693 100644 --- a/packages/world-vercel/src/events-v4.ts +++ b/packages/world-vercel/src/events-v4.ts @@ -248,16 +248,19 @@ function readHeader( /** * GET /api/v4/runs/:runId/events/:eventId * - * Returns the full event entity (CBOR-decoded from the response body). - * The server resolves the payload ref server-side, so eventData already - * contains the resolved bytes — callers consume it the same way they - * did on v3 GET event. + * Returns one v4 frame: the full event entity (CBOR-decoded from the + * frame meta) plus the resolved payload bytes (frame body, possibly + * empty). The wire format is identical to a single LIST frame so the + * server can stream the payload from S3 without buffering — callers + * are responsible for splicing `body` into `event.eventData[payloadField]` + * when they need the resolved value. The world-vercel adapter does this + * in events.ts. */ export async function getEventV4( runId: string, eventId: string, config?: APIConfig -): Promise { +): Promise<{ event: DecodedV4Event; body: Uint8Array }> { const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); const headers = new Headers(baseHeaders); await setAuthHeader(headers, config); @@ -272,8 +275,22 @@ export async function getEventV4( const errorBody = await response.body.text(); throw new Error(`v4 getEvent failed: ${response.statusCode} ${errorBody}`); } - const bodyBytes = new Uint8Array(await response.body.arrayBuffer()); - return decode(bodyBytes) as DecodedV4Event; + const contentType = readHeader(response.headers, 'content-type'); + if (!contentType?.startsWith(V4_FRAME_CONTENT_TYPE)) { + throw new Error( + `v4 getEvent: expected ${V4_FRAME_CONTENT_TYPE}, got ${contentType ?? '(none)'}` + ); + } + const webBody = (await import('node:stream')).Readable.toWeb( + response.body as unknown as import('node:stream').Readable + ) as unknown as ReadableStream; + + // GET emits a single frame (no sentinel); decodeFrames returns at EOF + // after yielding it. + for await (const frame of decodeFrames(webBody)) { + return { event: frame.meta as unknown as DecodedV4Event, body: frame.body }; + } + throw new Error(`v4 getEvent: empty frame stream for ${eventId}`); } export interface ListEventsV4Params { diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 5f5be75335..24b17d48fb 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -183,14 +183,14 @@ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { } /** - * Turn a v4 event (full entity from GET single-event, or LIST frame - * meta + body) into the Event shape the workflow runtime expects. + * Turn a v4 event (frame meta + frame body) into the Event shape the + * workflow runtime expects. * - * The server-side GET resolves refs server-side and bakes the payload - * bytes into eventData, so `payloadBody` is empty there. The LIST path - * keeps the payload as a `RefDescriptor` in `eventData[fieldName]` and - * delivers the resolved bytes in `payloadBody`; this helper splices them - * back in so the runtime sees a uniform shape. + * Both GET single-event and LIST use the same frame format: meta is the + * full event entity with the payload field as a RefDescriptor, body is + * the resolved payload bytes (possibly empty). This helper CBOR-decodes + * the body and splices the value into `eventData[fieldName]`, symmetric + * with the unconditional CBOR-encode in `splitEventDataForV4`. */ function buildEventFromV4( decoded: DecodedV4Event, @@ -202,33 +202,16 @@ function buildEventFromV4( if (payloadBody.byteLength > 0) { const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[decoded.eventType]; if (payloadField) { - // CBOR-decode the bytes to recover the original JS value the SDK - // encoded on the write side. Symmetric with splitEventDataForV4. try { eventData[payloadField] = decode(payloadBody); } catch { - // If decode fails, leave the raw bytes — the consumer can - // inspect them as a Uint8Array. This is a defensive path; in - // practice the SDK is the only producer here. + // Defensive: leave the raw bytes if decode fails. The SDK is + // the only producer in practice so this shouldn't fire. eventData[payloadField] = payloadBody; } } } - // For the GET-single-event path, the server already resolved the ref - // server-side, so eventData[payloadField] is a Uint8Array of the CBOR - // bytes the SDK originally sent. Decode it the same way. - if (payloadBody.byteLength === 0) { - const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[decoded.eventType]; - if (payloadField && eventData[payloadField] instanceof Uint8Array) { - try { - eventData[payloadField] = decode(eventData[payloadField] as Uint8Array); - } catch { - // leave as-is - } - } - } - const event = { eventId: decoded.eventId, runId: decoded.runId, @@ -260,10 +243,10 @@ export async function getEvent( config?: APIConfig ): Promise { const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; - const decoded = await getEventV4(runId, eventId, config); - // GET resolves refs server-side and bakes the payload into eventData, - // so there's no separate body slot to splice in here. - return buildEventFromV4(decoded, new Uint8Array(0), resolveData); + const { event, body } = await getEventV4(runId, eventId, config); + // Same shape as a LIST frame — splice the body bytes into + // eventData[payloadField] in buildEventFromV4. + return buildEventFromV4(event, body, resolveData); } export async function getWorkflowRunEvents( From fd68f1becbfcf29570ef25abcd706b28e20d9e9e Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 25 May 2026 10:56:34 +0200 Subject: [PATCH 06/10] [world-vercel] Wire listEventsByCorrelationId through to v4 endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Companion to workflow-server PR #439 commit 5036b56, which added `GET /api/v4/events?correlationId=...`. The SDK adapter's `storage.events.listByCorrelationId` no longer throws. Implementation: - New `getEventsByCorrelationIdV4` wire helper alongside `getWorkflowRunEventsV4`. Both share a small `consumeListFrameStream(url, config, opName)` that drives the same frame-stream-to-page conversion — only the URL differs. - `events.ts`'s `getWorkflowRunEvents` dispatches between the two based on whether params carry `runId` or `correlationId`. Same return shape on either path. Changeset updated to drop the "not yet implemented" caveat. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/v4-events-client.md | 2 +- packages/world-vercel/src/events-v4.ts | 100 +++++++++++++++++-------- packages/world-vercel/src/events.ts | 29 +++---- 3 files changed, 78 insertions(+), 53 deletions(-) diff --git a/.changeset/v4-events-client.md b/.changeset/v4-events-client.md index aaffed3f6c..a9512bf881 100644 --- a/.changeset/v4-events-client.md +++ b/.changeset/v4-events-client.md @@ -2,4 +2,4 @@ "@workflow/world-vercel": major --- -Switch the world-vercel adapter's event endpoints from the v2/v3 wire format to v4. Event metadata now rides in `x-wf-*` HTTP headers and payloads stream end-to-end as opaque bytes — no server-side CBOR parse on writes, and no per-event `/refs` round-trip on list responses. POST event response carries the materialized EventResult as a CBOR body. Public `createWorkflowRunEvent` / `getEvent` / `getWorkflowRunEvents` signatures are unchanged; the underlying wire calls swap to v4. `listEventsByCorrelationId` is not yet implemented on v4 and now throws — callers should fetch hooks directly via `storage.hooks.getByToken`. Requires workflow-server with v4 routes mounted. +Switch the world-vercel adapter's event endpoints from the v2/v3 wire format to v4. Event metadata now rides in `x-wf-*` HTTP headers and payloads stream end-to-end as opaque bytes — no server-side CBOR parse on writes, and no per-event `/refs` round-trip on list responses. POST event response carries the materialized EventResult as a CBOR body. GET single event and LIST events use the same `application/vnd.workflow.v4-frames` binary frame stream; `listEventsByCorrelationId` is wired through too. Public `createWorkflowRunEvent` / `getEvent` / `getWorkflowRunEvents` signatures are unchanged. Requires workflow-server with v4 routes mounted. diff --git a/packages/world-vercel/src/events-v4.ts b/packages/world-vercel/src/events-v4.ts index 56a07c7693..013564aa0e 100644 --- a/packages/world-vercel/src/events-v4.ts +++ b/packages/world-vercel/src/events-v4.ts @@ -319,39 +319,19 @@ export interface ListEventsV4Result { } /** - * GET /api/v4/runs/:runId/events - * - * Parses the binary-frame stream into a list of events plus the - * pagination cursor (from the sentinel frame). Each frame's CBOR meta - * IS the full event entity, with the payload field still in `eventData` - * as a `RefDescriptor` (lazy); the resolved payload bytes ride in the - * frame body. The adapter layer splices them back into eventData. - * - * Eagerly drains the stream into memory to match the existing - * `getWorkflowRunEvents` page-at-a-time contract. A streaming variant - * that yields events one at a time without buffering the page would be - * a small refactor (decodeFrames is already async-iterable). + * Drive a v4 frame-stream list response into an in-memory page. Used by + * both the by-runId and by-correlationId list endpoints — the wire + * shape is identical, only the URL differs. */ -export async function getWorkflowRunEventsV4( - runId: string, - params: ListEventsV4Params = {}, - config?: APIConfig +async function consumeListFrameStream( + url: string, + config: APIConfig | undefined, + opName: string ): Promise { - const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); + const { headers: baseHeaders } = await getHttpConfig(config); const headers = new Headers(baseHeaders); await setAuthHeader(headers, config); - const searchParams = new URLSearchParams(); - if (params.cursor) searchParams.set('cursor', params.cursor); - if (params.limit !== undefined) { - searchParams.set('limit', String(params.limit)); - } - if (params.sortOrder) searchParams.set('sortOrder', params.sortOrder); - const qs = searchParams.toString(); - const url = - `${baseUrl}/v4/runs/${encodeURIComponent(runId)}/events` + - (qs ? `?${qs}` : ''); - const response = await request(url, { method: 'GET', headers: Object.fromEntries(headers.entries()), @@ -359,14 +339,12 @@ export async function getWorkflowRunEventsV4( }); if (response.statusCode < 200 || response.statusCode >= 300) { const errorBody = await response.body.text(); - throw new Error( - `v4 listEvents failed: ${response.statusCode} ${errorBody}` - ); + throw new Error(`v4 ${opName} failed: ${response.statusCode} ${errorBody}`); } const contentType = readHeader(response.headers, 'content-type'); if (!contentType?.startsWith(V4_FRAME_CONTENT_TYPE)) { throw new Error( - `v4 listEvents: expected ${V4_FRAME_CONTENT_TYPE}, got ${contentType ?? '(none)'}` + `v4 ${opName}: expected ${V4_FRAME_CONTENT_TYPE}, got ${contentType ?? '(none)'}` ); } @@ -392,3 +370,61 @@ export async function getWorkflowRunEventsV4( return { events, ...(next ? { next } : {}) }; } + +function paginationToQuery(params: ListEventsV4Params): string { + const sp = new URLSearchParams(); + if (params.cursor) sp.set('cursor', params.cursor); + if (params.limit !== undefined) sp.set('limit', String(params.limit)); + if (params.sortOrder) sp.set('sortOrder', params.sortOrder); + const qs = sp.toString(); + return qs ? `?${qs}` : ''; +} + +/** + * GET /api/v4/runs/:runId/events + * + * Parses the binary-frame stream into a list of events plus the + * pagination cursor (from the sentinel frame). Each frame's CBOR meta + * IS the full event entity, with the payload field still in `eventData` + * as a `RefDescriptor` (lazy); the resolved payload bytes ride in the + * frame body. The adapter layer splices them back into eventData. + * + * Eagerly drains the stream into memory to match the existing + * `getWorkflowRunEvents` page-at-a-time contract. A streaming variant + * that yields events one at a time without buffering the page would be + * a small refactor (decodeFrames is already async-iterable). + */ +export async function getWorkflowRunEventsV4( + runId: string, + params: ListEventsV4Params = {}, + config?: APIConfig +): Promise { + const { baseUrl } = await getHttpConfig(config); + const url = + `${baseUrl}/v4/runs/${encodeURIComponent(runId)}/events` + + paginationToQuery(params); + return consumeListFrameStream(url, config, 'listEvents'); +} + +/** + * GET /api/v4/events?correlationId=... + * + * Same frame stream as getWorkflowRunEventsV4 but selected by + * correlationId (GSI) instead of runId. Used by the storage adapter's + * `events.listByCorrelationId` path — the v3 client used + * `/v2/events?correlationId=...` for the equivalent query. + */ +export async function getEventsByCorrelationIdV4( + correlationId: string, + params: ListEventsV4Params = {}, + config?: APIConfig +): Promise { + const { baseUrl } = await getHttpConfig(config); + const sp = new URLSearchParams(); + sp.set('correlationId', correlationId); + if (params.cursor) sp.set('cursor', params.cursor); + if (params.limit !== undefined) sp.set('limit', String(params.limit)); + if (params.sortOrder) sp.set('sortOrder', params.sortOrder); + const url = `${baseUrl}/v4/events?${sp.toString()}`; + return consumeListFrameStream(url, config, 'listEventsByCorrelationId'); +} diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 24b17d48fb..d7ba487a8c 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -45,6 +45,7 @@ import { createWorkflowRunEventV4, type DecodedV4Event, getEventV4, + getEventsByCorrelationIdV4, getWorkflowRunEventsV4, } from './events-v4.js'; import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; @@ -254,27 +255,15 @@ export async function getWorkflowRunEvents( config?: APIConfig ): Promise> { const { pagination, resolveData = DEFAULT_RESOLVE_DATA_OPTION } = params; - if ('correlationId' in params) { - // v4 has no list-by-correlation-id endpoint yet. Throw a clear error - // until a server-side endpoint lands — callers that hit this path - // historically used the by-correlation-id query for hook lookup and - // can be migrated to direct hook fetches. - throw new Error( - 'world-vercel v4: listEventsByCorrelationId is not yet implemented. ' + - 'Fetch the hook directly via storage.hooks.getByToken or use ' + - 'storage.events.list(runId) on a known run.' - ); - } + const wirePagination = { + cursor: pagination?.cursor ?? undefined, + limit: pagination?.limit, + sortOrder: pagination?.sortOrder, + }; - const result = await getWorkflowRunEventsV4( - params.runId, - { - cursor: pagination?.cursor ?? undefined, - limit: pagination?.limit, - sortOrder: pagination?.sortOrder, - }, - config - ); + const result = await ('correlationId' in params + ? getEventsByCorrelationIdV4(params.correlationId, wirePagination, config) + : getWorkflowRunEventsV4(params.runId, wirePagination, config)); const events = result.events.map((listed) => buildEventFromV4(listed.event, listed.body, resolveData) From 681a965b0d8a43df2a370cc1426aa2237f12b6dd Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 25 May 2026 11:17:15 +0200 Subject: [PATCH 07/10] [world-vercel] Send v4 POST body as a single frame, drop x-wf-* request headers Mirrors the server-side change in workflow-server: the POST request body is now one length-prefixed frame containing CBOR-encoded metadata plus the opaque payload. Eliminates the V4_HEADERS constant, the percent-encoding for non-ASCII values, the base64-CBOR encoding for executionContext, and the implicit 32 KB cap on Vercel header size. The response side still uses x-wf-event-id/run-id/created-at headers for callers that want eventId without decoding the CBOR body. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/events-v4.ts | 165 +++++++++++-------------- packages/world-vercel/src/events.ts | 56 +++++---- 2 files changed, 101 insertions(+), 120 deletions(-) diff --git a/packages/world-vercel/src/events-v4.ts b/packages/world-vercel/src/events-v4.ts index 013564aa0e..19b03b3723 100644 --- a/packages/world-vercel/src/events-v4.ts +++ b/packages/world-vercel/src/events-v4.ts @@ -1,51 +1,41 @@ /** - * v4 event endpoints — header/body-split wire protocol. + * v4 event endpoints — fully framed wire protocol. * - * Mirrors the server-side handlers in - * workflow-server/lib/handlers/v4/. The v4 wire format: + * Both directions use the same length-prefixed binary frame layout: * - * - POST: structured event metadata rides in `x-wf-*` request headers; - * the request body is opaque user-payload bytes streamed straight - * to S3 by the server. No CBOR encoding/decoding on the body — the - * SDK passes Uint8Array bytes through unchanged. - * - GET single event: response headers carry the same `x-wf-*` metadata; - * the response body is the raw payload bytes (streamed from S3 when - * stored there). - * - GET list: a length-prefixed binary frame stream — see - * `frames.ts` for the codec. Eliminates the per-event `/refs` - * round-trip used by v2/v3. + * frame := [u32_be meta_len][cbor_meta][u32_be body_len][body_bytes] * - * Higher-level callers (the world-vercel adapter) are expected to - * CBOR-encode their JS values into the `payload` parameter and to - * CBOR-decode the returned `body` bytes — this module stays at the - * wire-bytes layer. + * - **POST**: request body is one frame. `cbor_meta` carries structured + * event metadata (eventType, specVersion, deploymentId, workflowName, + * …, executionContext); `body_bytes` is the opaque user payload that + * the server streams straight to S3 without decoding. + * - **GET single event**: response body is one frame. + * - **LIST events**: response body is a stream of frames terminated by a + * sentinel frame (meta = `{_end: 1, next?: cursor}`). + * + * The few HTTP response headers v4 still uses (eventId / runId / + * createdAt) are for client convenience — they let the caller read those + * three fields without decoding the response body. + * + * Higher-level callers (the world-vercel adapter) CBOR-encode their JS + * values into the `payload` parameter and CBOR-decode returned `body` + * bytes — this module stays at the wire-bytes layer. */ import { getVercelOidcToken } from '@vercel/oidc'; import { decode } from 'cbor-x'; import { request } from 'undici'; -import { decodeFrames, V4_FRAME_CONTENT_TYPE } from './frames.js'; +import { decodeFrames, encodeFrame, V4_FRAME_CONTENT_TYPE } from './frames.js'; import { getDispatcher } from './http-client.js'; import { type APIConfig, getHttpConfig } from './utils.js'; -/** Names of the `x-wf-*` headers exchanged with the server. Mirror of - * workflow-server/lib/handlers/v4/headers.ts `V4_HEADERS`. */ -export const V4_HEADERS = { - eventType: 'x-wf-event-type', - specVersion: 'x-wf-spec-version', - correlationId: 'x-wf-correlation-id', - vercelId: 'x-wf-vercel-id', - remoteRefBehavior: 'x-wf-remote-ref-behavior', - deploymentId: 'x-wf-deployment-id', - workflowName: 'x-wf-workflow-name', - stepName: 'x-wf-step-name', - attempt: 'x-wf-attempt', - resumeAt: 'x-wf-resume-at', - hookToken: 'x-wf-hook-token', - hookIsWebhook: 'x-wf-hook-is-webhook', - hookIsSystem: 'x-wf-hook-is-system', - errorCode: 'x-wf-error-code', - executionContextB64: 'x-wf-execution-context-b64', +/** + * The few HTTP response headers v4 still uses. POST surfaces these so + * callers can read the freshly-created eventId without decoding the + * CBOR response body. Mirror of + * workflow-server/lib/handlers/v4/headers.ts `V4_RESPONSE_HEADERS`. + */ +export const V4_RESPONSE_HEADERS = { eventId: 'x-wf-event-id', runId: 'x-wf-run-id', createdAt: 'x-wf-created-at', @@ -73,9 +63,9 @@ export interface CreateEventV4Input { hookIsWebhook?: boolean; hookIsSystem?: boolean; errorCode?: string; - /** Base64-encoded CBOR of the executionContext object. Use - * `encodeExecutionContextHeader` to produce this. */ - executionContextB64?: string; + /** Arbitrary structured map; rides as a native CBOR object in the + * frame meta. Bounded by the server at 2 KB encoded. */ + executionContext?: Record; } export interface CreateEventV4Result { @@ -101,55 +91,35 @@ export interface CreateEventV4Result { }; } -/** Apply structured fields onto a Headers object. Non-ASCII string fields - * are percent-encoded so they survive the byte-restricted header - * transport, matching the server-side decode. */ -function applyV4Headers(headers: Headers, input: CreateEventV4Input): void { - headers.set(V4_HEADERS.eventType, input.eventType); - headers.set(V4_HEADERS.specVersion, String(input.specVersion)); - if (input.correlationId) { - headers.set(V4_HEADERS.correlationId, input.correlationId); - } - if (input.vercelId) headers.set(V4_HEADERS.vercelId, input.vercelId); - if (input.remoteRefBehavior) { - headers.set(V4_HEADERS.remoteRefBehavior, input.remoteRefBehavior); - } - if (input.deploymentId) { - headers.set( - V4_HEADERS.deploymentId, - encodeURIComponent(input.deploymentId) - ); - } - if (input.workflowName) { - headers.set( - V4_HEADERS.workflowName, - encodeURIComponent(input.workflowName) - ); - } - if (input.stepName) { - headers.set(V4_HEADERS.stepName, encodeURIComponent(input.stepName)); - } - if (input.attempt !== undefined) { - headers.set(V4_HEADERS.attempt, String(input.attempt)); - } - if (input.resumeAt) { - headers.set(V4_HEADERS.resumeAt, encodeURIComponent(input.resumeAt)); - } - if (input.hookToken) { - headers.set(V4_HEADERS.hookToken, encodeURIComponent(input.hookToken)); - } - if (input.hookIsWebhook !== undefined) { - headers.set(V4_HEADERS.hookIsWebhook, String(input.hookIsWebhook)); - } - if (input.hookIsSystem !== undefined) { - headers.set(V4_HEADERS.hookIsSystem, String(input.hookIsSystem)); - } - if (input.errorCode) { - headers.set(V4_HEADERS.errorCode, encodeURIComponent(input.errorCode)); +/** Build the CBOR meta map for a v4 POST frame. Drops undefined entries + * so the wire shape matches what the server expects to see. */ +function buildPostFrameMeta( + input: CreateEventV4Input +): Record { + const meta: Record = { + eventType: input.eventType, + specVersion: input.specVersion, + }; + if (input.correlationId !== undefined) + meta.correlationId = input.correlationId; + if (input.vercelId !== undefined) meta.vercelId = input.vercelId; + if (input.remoteRefBehavior !== undefined) { + meta.remoteRefBehavior = input.remoteRefBehavior; } - if (input.executionContextB64) { - headers.set(V4_HEADERS.executionContextB64, input.executionContextB64); + if (input.deploymentId !== undefined) meta.deploymentId = input.deploymentId; + if (input.workflowName !== undefined) meta.workflowName = input.workflowName; + if (input.stepName !== undefined) meta.stepName = input.stepName; + if (input.attempt !== undefined) meta.attempt = input.attempt; + if (input.resumeAt !== undefined) meta.resumeAt = input.resumeAt; + if (input.hookToken !== undefined) meta.hookToken = input.hookToken; + if (input.hookIsWebhook !== undefined) + meta.hookIsWebhook = input.hookIsWebhook; + if (input.hookIsSystem !== undefined) meta.hookIsSystem = input.hookIsSystem; + if (input.errorCode !== undefined) meta.errorCode = input.errorCode; + if (input.executionContext !== undefined) { + meta.executionContext = input.executionContext; } + return meta; } async function setAuthHeader( @@ -168,8 +138,9 @@ async function setAuthHeader( /** * POST /api/v4/runs/:runId/events * - * Returns the event/run ids and createdAt timestamp parsed out of - * the response headers. Throws on non-2xx responses. + * Sends the full request as a single v4 frame and returns the event ids + * + materialized-entity bag from the CBOR response body. Throws on + * non-2xx. */ export async function createWorkflowRunEventV4( input: CreateEventV4Input, @@ -178,14 +149,18 @@ export async function createWorkflowRunEventV4( const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); const headers = new Headers(baseHeaders); headers.set('Content-Type', 'application/octet-stream'); - applyV4Headers(headers, input); await setAuthHeader(headers, config); + const frame = encodeFrame( + buildPostFrameMeta(input), + input.payload ?? new Uint8Array(0) + ); + const url = `${baseUrl}/v4/runs/${encodeURIComponent(input.runId)}/events`; const response = await request(url, { method: 'POST', headers: Object.fromEntries(headers.entries()), - body: input.payload ?? undefined, + body: frame, dispatcher: getDispatcher(), }); if (response.statusCode < 200 || response.statusCode >= 300) { @@ -195,9 +170,9 @@ export async function createWorkflowRunEventV4( ); } - const eventId = response.headers[V4_HEADERS.eventId]; - const runId = response.headers[V4_HEADERS.runId]; - const createdAt = response.headers[V4_HEADERS.createdAt]; + const eventId = response.headers[V4_RESPONSE_HEADERS.eventId]; + const runId = response.headers[V4_RESPONSE_HEADERS.runId]; + const createdAt = response.headers[V4_RESPONSE_HEADERS.createdAt]; if ( typeof eventId !== 'string' || typeof runId !== 'string' || @@ -206,9 +181,7 @@ export async function createWorkflowRunEventV4( throw new Error('v4 createEvent: response missing required x-wf-* headers'); } - // Decode the materialized-entity bag from the response body. The server - // always returns a CBOR body now (was 204 in an earlier iteration — - // see workflow-server PR #439 for the corresponding handler change). + // Decode the materialized-entity bag from the CBOR response body. const bodyBytes = new Uint8Array(await response.body.arrayBuffer()); const body = bodyBytes.byteLength > 0 diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index d7ba487a8c..a364b392a1 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -2,23 +2,28 @@ * world-vercel event functions — v4 wire format throughout. * * This module replaces the previous v2/v3 implementation. The v4 wire - * format moves structured event metadata into `x-wf-*` HTTP headers and - * treats payloads as opaque user-data bytes streamed end-to-end. See + * format uses a single length-prefixed binary frame layout in both + * directions: + * + * frame := [u32_be meta_len][cbor_meta][u32_be body_len][body_bytes] + * + * `cbor_meta` is the structured event metadata; `body_bytes` is the + * opaque user payload, never CBOR-decoded by the server. See * workflow-server/lib/handlers/v4/ for the matching server-side handlers * and ../events-v4.ts for the wire-level client. * * Key shape changes vs. v2/v3: * - * - POST event response carries the materialized EventResult - * (event/run/step/hook/wait/events/cursor/hasMore) as a CBOR-encoded - * body — the server resolved-refs path is still respected via the - * `remoteRefBehavior` header. - * - GET single event returns metadata in headers + the user payload - * bytes in the response body. - * - LIST events returns a length-prefixed binary frame stream - * (application/vnd.workflow.v4-frames) — one frame per event with - * CBOR metadata + raw payload bytes. The old per-event `/refs` - * round-trip is eliminated. + * - POST request body is one v4 frame (meta + payload). The response + * surfaces eventId/runId/createdAt as `x-wf-*` headers and carries + * the materialized EventResult (event/run/step/hook/wait/events/ + * cursor/hasMore) as a CBOR body — `remoteRefBehavior` in the frame + * meta still controls server-side ref resolution. + * - GET single event returns one v4 frame: the event entity in the + * frame meta, the user payload bytes in the frame body. + * - LIST events returns a stream of v4 frames terminated by a sentinel + * frame whose meta carries `{_end: 1, next?: cursor}`. The old + * per-event `/refs` round-trip is eliminated. * * Public function signatures are unchanged: storage.ts continues to * wire these as `Storage['events']` and the workflow runtime sees the @@ -44,15 +49,15 @@ import { decode, encode } from 'cbor-x'; import { createWorkflowRunEventV4, type DecodedV4Event, - getEventV4, getEventsByCorrelationIdV4, + getEventV4, getWorkflowRunEventsV4, } from './events-v4.js'; import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; import { deserializeStep } from './steps.js'; import { - DEFAULT_RESOLVE_DATA_OPTION, type APIConfig, + DEFAULT_RESOLVE_DATA_OPTION, deserializeError, } from './utils.js'; @@ -62,8 +67,8 @@ import { * (workflow-server/lib/handlers/v4/events.ts PAYLOAD_FIELD_BY_EVENT_TYPE). * * The v4 wire encoding picks this field out of `eventData`, CBOR-encodes - * its value, and ships it as the request body. Everything else in - * `eventData` becomes a `x-wf-*` header. + * its value, and ships it as the frame body. Everything else in + * `eventData` rides in the frame's CBOR meta block. */ const PAYLOAD_FIELD_BY_EVENT_TYPE: Record = { run_created: 'input', @@ -99,7 +104,7 @@ const hookEventsRequiringExistence = new Set([ interface SplitEventData { /** Encoded payload bytes (undefined when the event has no user payload). */ payload?: Uint8Array; - /** Metadata fields that ride in v4 request headers. */ + /** Metadata fields that ride in the v4 POST frame's CBOR meta block. */ meta: { deploymentId?: string; workflowName?: string; @@ -110,15 +115,15 @@ interface SplitEventData { hookIsWebhook?: boolean; hookIsSystem?: boolean; errorCode?: string; - /** Pre-encoded base64 CBOR of executionContext (or undefined). */ - executionContextB64?: string; + /** Structured executionContext, included verbatim in frame meta. */ + executionContext?: Record; }; } /** * Split an AnyEventRequest's `eventData` into (a) the payload bytes that - * become the v4 request body and (b) the metadata fields that become - * v4 request headers. + * become the v4 frame body and (b) the metadata fields that become the + * CBOR-encoded meta block of the same frame. */ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { // Some event types in the AnyEventRequest discriminated union (e.g. @@ -161,10 +166,13 @@ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { } if ( eventData.executionContext !== undefined && - eventData.executionContext !== null + eventData.executionContext !== null && + typeof eventData.executionContext === 'object' ) { - const cbor = encode(eventData.executionContext); - meta.executionContextB64 = Buffer.from(cbor).toString('base64'); + meta.executionContext = eventData.executionContext as Record< + string, + unknown + >; } let payload: Uint8Array | undefined; From aaa436f6d0420c1ce50a7a1a816d8af80a367407 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 25 May 2026 12:23:59 +0200 Subject: [PATCH 08/10] [world-vercel] Stop wrapping already-serialized payload bytes in CBOR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Payload fields (input / output / result / error / payload / metadata) reach world-vercel after the runtime has already serialized them via dehydrateRunError / dehydrateStepReturnValue / dehydrateStepArguments — they're Uint8Arrays carrying a devalue blob with a format prefix. splitEventDataForV4 was running them through cbor-x.encode again, so the wire bytes ended up as cbor(Uint8Array). On reads through runs.get (which goes through v2 and just returns the raw stored bytes), the consumer saw the CBOR wrapping and hydrateRunError couldn't parse the format prefix — every failed workflow run surfaced as "Failed to hydrate workflow run error". Pass the bytes through unchanged on write and read; symmetric with world-local and the v2/v3 wire format. Throw on non-Uint8Array to flag non-runtime callers loudly instead of silently double-wrapping. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/events.ts | 45 +++++++++++++++++------------ 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index a364b392a1..898f9b6833 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -45,7 +45,6 @@ import { validateUlidTimestamp, type WorkflowRun, } from '@workflow/world'; -import { decode, encode } from 'cbor-x'; import { createWorkflowRunEventV4, type DecodedV4Event, @@ -179,12 +178,26 @@ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { if (payloadField && payloadField in eventData) { const value = eventData[payloadField]; if (value !== undefined) { - // Always CBOR-encode, including Uint8Array (cbor-x represents it as - // a binary type that round-trips back to Uint8Array on decode). The - // server stores the bytes opaquely; the SDK does the symmetric - // decode on read. Keeping the encoding unconditional means we never - // have to track "is this raw bytes or CBOR?" on the wire. - payload = new Uint8Array(encode(value)); + // Payload fields (input / output / result / error / payload / + // metadata) reach this layer already serialized as Uint8Array — the + // runtime calls dehydrateRunError / dehydrateStepReturnValue / etc. + // before invoking events.create. Pass the bytes through unchanged + // so runs.get and the events stream return the same raw form that + // hydrateRunError / hydrateStepIO expect. CBOR-encoding here would + // double-wrap on write and (since runs.get bypasses the v4 frame + // decode) leave the consumer with cbor(Uint8Array) rather than the + // devalue blob it was looking for. + if (!(value instanceof Uint8Array)) { + // Surface non-Uint8Array values loudly — current SDK callers go + // through the dehydrate helpers, so anything else is either a + // legacy caller or a bug. + throw new TypeError( + `world-vercel v4: eventData.${payloadField} for ${data.eventType} ` + + `must be a Uint8Array (the runtime's dehydrated wire form); ` + + `got ${typeof value === 'object' ? (value === null ? 'null' : ((value as object).constructor?.name ?? typeof value)) : typeof value}.` + ); + } + payload = value; } } @@ -197,9 +210,11 @@ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { * * Both GET single-event and LIST use the same frame format: meta is the * full event entity with the payload field as a RefDescriptor, body is - * the resolved payload bytes (possibly empty). This helper CBOR-decodes - * the body and splices the value into `eventData[fieldName]`, symmetric - * with the unconditional CBOR-encode in `splitEventDataForV4`. + * the resolved payload bytes (possibly empty). This helper splices the + * body bytes into `eventData[fieldName]` unchanged — the runtime's + * hydrate helpers (hydrateStepIO, hydrateRunError, …) consume the raw + * devalue-with-format-prefix Uint8Array directly. No CBOR decode here, + * symmetric with the pass-through write in `splitEventDataForV4`. */ function buildEventFromV4( decoded: DecodedV4Event, @@ -210,15 +225,7 @@ function buildEventFromV4( if (payloadBody.byteLength > 0) { const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[decoded.eventType]; - if (payloadField) { - try { - eventData[payloadField] = decode(payloadBody); - } catch { - // Defensive: leave the raw bytes if decode fails. The SDK is - // the only producer in practice so this shouldn't fire. - eventData[payloadField] = payloadBody; - } - } + if (payloadField) eventData[payloadField] = payloadBody; } const event = { From a163a5ef168a336e3f50bccf0f2a07801f63ab2f Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 25 May 2026 12:52:36 +0200 Subject: [PATCH 09/10] [world-vercel] Read hook token from eventData.token (matches runtime emitter) Hook-emitting runtimes set eventData.token (matches the world contract in packages/world/src/events.ts). splitEventDataForV4 was looking for eventData.hookToken instead, so the frame meta arrived without a token and the server's hook materialization failed validation. The v4 wire name (meta.hookToken) is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/events.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 898f9b6833..28007b0e0b 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -151,8 +151,12 @@ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { } else if (eventData.resumeAt instanceof Date) { meta.resumeAt = eventData.resumeAt.toISOString(); } - if (typeof eventData.hookToken === 'string') { - meta.hookToken = eventData.hookToken; + // Runtime emits hook_created / hook_received / hook_disposed with the + // hook token in `eventData.token` (matches the world contract in + // packages/world/src/events.ts). The v4 wire encoding still calls it + // `hookToken` in the frame meta, so do the rename here. + if (typeof eventData.token === 'string') { + meta.hookToken = eventData.token; } if (typeof eventData.isWebhook === 'boolean') { meta.hookIsWebhook = eventData.isWebhook; From a2dac6329c44f8acac0ad03c9ec1f7f386d90cd4 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 25 May 2026 13:19:31 +0200 Subject: [PATCH 10/10] [world-vercel] Send resumeAt as a Date in the v4 frame meta MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cbor-x encodes Date natively (CBOR tag 1) and the server (post-23c79b9) decodes it back to a Date for the materialization service. Stop pre-flattening to an ISO string — that was a workaround for the original header-based v4 contract and now leaves the runtime with a string in eventData.resumeAt after replay, blowing up on .getTime(). Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/events-v4.ts | 6 +++++- packages/world-vercel/src/events.ts | 13 +++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/packages/world-vercel/src/events-v4.ts b/packages/world-vercel/src/events-v4.ts index 19b03b3723..2a43e00554 100644 --- a/packages/world-vercel/src/events-v4.ts +++ b/packages/world-vercel/src/events-v4.ts @@ -58,7 +58,11 @@ export interface CreateEventV4Input { workflowName?: string; stepName?: string; attempt?: number; - resumeAt?: string; + /** cbor-x encodes Date as CBOR tag 1 (epoch) and the server decodes it + * back to a Date — the round-trip is symmetric, so wait_created / + * step_retrying / etc. see a Date in eventData.resumeAt on the read + * side. */ + resumeAt?: Date; hookToken?: string; hookIsWebhook?: boolean; hookIsSystem?: boolean; diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 28007b0e0b..de4e36d549 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -109,7 +109,7 @@ interface SplitEventData { workflowName?: string; stepName?: string; attempt?: number; - resumeAt?: string; + resumeAt?: Date; hookToken?: string; hookIsWebhook?: boolean; hookIsSystem?: boolean; @@ -146,10 +146,15 @@ function splitEventDataForV4(data: AnyEventRequest): SplitEventData { if (typeof eventData.attempt === 'number') { meta.attempt = eventData.attempt; } - if (typeof eventData.resumeAt === 'string') { + // wait_created passes resumeAt as a Date. cbor-x encodes Date natively + // (tag 1) and round-trips back to a Date on the server, so the runtime + // sees a real Date instance when it reads the event back. ISO strings + // are accepted as a fallback for non-runtime callers. + if (eventData.resumeAt instanceof Date) { meta.resumeAt = eventData.resumeAt; - } else if (eventData.resumeAt instanceof Date) { - meta.resumeAt = eventData.resumeAt.toISOString(); + } else if (typeof eventData.resumeAt === 'string') { + const parsed = new Date(eventData.resumeAt); + if (!Number.isNaN(parsed.getTime())) meta.resumeAt = parsed; } // Runtime emits hook_created / hook_received / hook_disposed with the // hook token in `eventData.token` (matches the world contract in