diff --git a/.changeset/v4-events-client.md b/.changeset/v4-events-client.md new file mode 100644 index 0000000000..a9512bf881 --- /dev/null +++ b/.changeset/v4-events-client.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": major +--- + +Switch the world-vercel adapter's event endpoints from the v2/v3 wire format to v4. Event metadata now rides in `x-wf-*` HTTP headers and payloads stream end-to-end as opaque bytes — no server-side CBOR parse on writes, and no per-event `/refs` round-trip on list responses. POST event response carries the materialized EventResult as a CBOR body. GET single event and LIST events use the same `application/vnd.workflow.v4-frames` binary frame stream; `listEventsByCorrelationId` is wired through too. Public `createWorkflowRunEvent` / `getEvent` / `getWorkflowRunEvents` signatures are unchanged. Requires workflow-server with v4 routes mounted. diff --git a/packages/world-vercel/src/events-v4.ts b/packages/world-vercel/src/events-v4.ts new file mode 100644 index 0000000000..2a43e00554 --- /dev/null +++ b/packages/world-vercel/src/events-v4.ts @@ -0,0 +1,407 @@ +/** + * v4 event endpoints — fully framed wire protocol. + * + * Both directions use the same length-prefixed binary frame layout: + * + * frame := [u32_be meta_len][cbor_meta][u32_be body_len][body_bytes] + * + * - **POST**: request body is one frame. `cbor_meta` carries structured + * event metadata (eventType, specVersion, deploymentId, workflowName, + * …, executionContext); `body_bytes` is the opaque user payload that + * the server streams straight to S3 without decoding. + * - **GET single event**: response body is one frame. + * - **LIST events**: response body is a stream of frames terminated by a + * sentinel frame (meta = `{_end: 1, next?: cursor}`). + * + * The few HTTP response headers v4 still uses (eventId / runId / + * createdAt) are for client convenience — they let the caller read those + * three fields without decoding the response body. + * + * Higher-level callers (the world-vercel adapter) CBOR-encode their JS + * values into the `payload` parameter and CBOR-decode returned `body` + * bytes — this module stays at the wire-bytes layer. + */ + +import { getVercelOidcToken } from '@vercel/oidc'; +import { decode } from 'cbor-x'; +import { request } from 'undici'; +import { decodeFrames, encodeFrame, V4_FRAME_CONTENT_TYPE } from './frames.js'; +import { getDispatcher } from './http-client.js'; +import { type APIConfig, getHttpConfig } from './utils.js'; + +/** + * The few HTTP response headers v4 still uses. POST surfaces these so + * callers can read the freshly-created eventId without decoding the + * CBOR response body. Mirror of + * workflow-server/lib/handlers/v4/headers.ts `V4_RESPONSE_HEADERS`. + */ +export const V4_RESPONSE_HEADERS = { + eventId: 'x-wf-event-id', + runId: 'x-wf-run-id', + createdAt: 'x-wf-created-at', +} as const; + +export interface CreateEventV4Input { + /** runId in the URL. Required for run_created too — v4 has no + * `/runs/null/events` shortcut because the runId is part of the S3 + * key. Higher-level callers generate the ULID locally. */ + runId: string; + eventType: string; + /** Opaque payload bytes. Pass undefined for events that don't carry + * user data (e.g. step_started). */ + payload?: Uint8Array; + specVersion: number; + correlationId?: string; + vercelId?: string; + remoteRefBehavior?: 'resolve' | 'lazy'; + deploymentId?: string; + workflowName?: string; + stepName?: string; + attempt?: number; + /** cbor-x encodes Date as CBOR tag 1 (epoch) and the server decodes it + * back to a Date — the round-trip is symmetric, so wait_created / + * step_retrying / etc. see a Date in eventData.resumeAt on the read + * side. */ + resumeAt?: Date; + hookToken?: string; + hookIsWebhook?: boolean; + hookIsSystem?: boolean; + errorCode?: string; + /** Arbitrary structured map; rides as a native CBOR object in the + * frame meta. Bounded by the server at 2 KB encoded. */ + executionContext?: Record; +} + +export interface CreateEventV4Result { + eventId: string; + runId: string; + createdAt: string; + /** + * Materialized-entity bag — CBOR-decoded from the response body. The + * server hands back the same shape v2/v3 use for EventResult so the + * adapter layer can drop these fields into its return value unchanged. + * Keys are unset when the event type doesn't materialize that entity + * kind. + */ + body: { + event?: unknown; + run?: unknown; + step?: unknown; + hook?: unknown; + wait?: unknown; + events?: unknown[]; + cursor?: string | null; + hasMore?: boolean; + }; +} + +/** Build the CBOR meta map for a v4 POST frame. Drops undefined entries + * so the wire shape matches what the server expects to see. */ +function buildPostFrameMeta( + input: CreateEventV4Input +): Record { + const meta: Record = { + eventType: input.eventType, + specVersion: input.specVersion, + }; + if (input.correlationId !== undefined) + meta.correlationId = input.correlationId; + if (input.vercelId !== undefined) meta.vercelId = input.vercelId; + if (input.remoteRefBehavior !== undefined) { + meta.remoteRefBehavior = input.remoteRefBehavior; + } + if (input.deploymentId !== undefined) meta.deploymentId = input.deploymentId; + if (input.workflowName !== undefined) meta.workflowName = input.workflowName; + if (input.stepName !== undefined) meta.stepName = input.stepName; + if (input.attempt !== undefined) meta.attempt = input.attempt; + if (input.resumeAt !== undefined) meta.resumeAt = input.resumeAt; + if (input.hookToken !== undefined) meta.hookToken = input.hookToken; + if (input.hookIsWebhook !== undefined) + meta.hookIsWebhook = input.hookIsWebhook; + if (input.hookIsSystem !== undefined) meta.hookIsSystem = input.hookIsSystem; + if (input.errorCode !== undefined) meta.errorCode = input.errorCode; + if (input.executionContext !== undefined) { + meta.executionContext = input.executionContext; + } + return meta; +} + +async function setAuthHeader( + headers: Headers, + config: APIConfig | undefined +): Promise { + if (config?.token) { + headers.set('Authorization', `Bearer ${config.token}`); + } else { + // Default: get an OIDC token via @vercel/oidc, same as the v3 client. + const token = await getVercelOidcToken(); + headers.set('Authorization', `Bearer ${token}`); + } +} + +/** + * POST /api/v4/runs/:runId/events + * + * Sends the full request as a single v4 frame and returns the event ids + * + materialized-entity bag from the CBOR response body. Throws on + * non-2xx. + */ +export async function createWorkflowRunEventV4( + input: CreateEventV4Input, + config?: APIConfig +): Promise { + const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); + const headers = new Headers(baseHeaders); + headers.set('Content-Type', 'application/octet-stream'); + await setAuthHeader(headers, config); + + const frame = encodeFrame( + buildPostFrameMeta(input), + input.payload ?? new Uint8Array(0) + ); + + const url = `${baseUrl}/v4/runs/${encodeURIComponent(input.runId)}/events`; + const response = await request(url, { + method: 'POST', + headers: Object.fromEntries(headers.entries()), + body: frame, + dispatcher: getDispatcher(), + }); + if (response.statusCode < 200 || response.statusCode >= 300) { + const errorBody = await response.body.text(); + throw new Error( + `v4 createEvent failed: ${response.statusCode} ${errorBody}` + ); + } + + const eventId = response.headers[V4_RESPONSE_HEADERS.eventId]; + const runId = response.headers[V4_RESPONSE_HEADERS.runId]; + const createdAt = response.headers[V4_RESPONSE_HEADERS.createdAt]; + if ( + typeof eventId !== 'string' || + typeof runId !== 'string' || + typeof createdAt !== 'string' + ) { + throw new Error('v4 createEvent: response missing required x-wf-* headers'); + } + + // Decode the materialized-entity bag from the CBOR response body. + const bodyBytes = new Uint8Array(await response.body.arrayBuffer()); + const body = + bodyBytes.byteLength > 0 + ? (decode(bodyBytes) as CreateEventV4Result['body']) + : {}; + + return { eventId, runId, createdAt, body }; +} + +/** + * Decoded event entity returned by GET /api/v4/runs/:runId/events/:eventId. + * The server CBOR-encodes the full entity with refs resolved server-side, + * so the payload field (input/output/result/error/payload/metadata + * depending on eventType) already contains the resolved bytes — the + * adapter layer doesn't need to splice them in. + */ +export interface DecodedV4Event { + eventId: string; + runId: string; + eventType: string; + correlationId?: string; + createdAt: Date | string; + specVersion?: number; + eventData?: Record; +} + +function readHeader( + responseHeaders: Record, + name: string +): string | undefined { + const value = responseHeaders[name]; + if (typeof value === 'string') return value; + if (Array.isArray(value) && value.length > 0) return value[0]; + return undefined; +} + +/** + * GET /api/v4/runs/:runId/events/:eventId + * + * Returns one v4 frame: the full event entity (CBOR-decoded from the + * frame meta) plus the resolved payload bytes (frame body, possibly + * empty). The wire format is identical to a single LIST frame so the + * server can stream the payload from S3 without buffering — callers + * are responsible for splicing `body` into `event.eventData[payloadField]` + * when they need the resolved value. The world-vercel adapter does this + * in events.ts. + */ +export async function getEventV4( + runId: string, + eventId: string, + config?: APIConfig +): Promise<{ event: DecodedV4Event; body: Uint8Array }> { + const { baseUrl, headers: baseHeaders } = await getHttpConfig(config); + const headers = new Headers(baseHeaders); + await setAuthHeader(headers, config); + + const url = `${baseUrl}/v4/runs/${encodeURIComponent(runId)}/events/${encodeURIComponent(eventId)}`; + const response = await request(url, { + method: 'GET', + headers: Object.fromEntries(headers.entries()), + dispatcher: getDispatcher(), + }); + if (response.statusCode < 200 || response.statusCode >= 300) { + const errorBody = await response.body.text(); + throw new Error(`v4 getEvent failed: ${response.statusCode} ${errorBody}`); + } + const contentType = readHeader(response.headers, 'content-type'); + if (!contentType?.startsWith(V4_FRAME_CONTENT_TYPE)) { + throw new Error( + `v4 getEvent: expected ${V4_FRAME_CONTENT_TYPE}, got ${contentType ?? '(none)'}` + ); + } + const webBody = (await import('node:stream')).Readable.toWeb( + response.body as unknown as import('node:stream').Readable + ) as unknown as ReadableStream; + + // GET emits a single frame (no sentinel); decodeFrames returns at EOF + // after yielding it. + for await (const frame of decodeFrames(webBody)) { + return { event: frame.meta as unknown as DecodedV4Event, body: frame.body }; + } + throw new Error(`v4 getEvent: empty frame stream for ${eventId}`); +} + +export interface ListEventsV4Params { + cursor?: string; + limit?: number; + sortOrder?: 'asc' | 'desc'; +} + +/** + * A single event extracted from a v4 LIST frame. Mirrors `DecodedV4Event` + * but also carries the raw payload bytes — for payload-bearing events the + * server emits the resolved bytes in the frame body (so it never has to + * decode them) and the SDK is expected to splice them back into the + * appropriate `eventData` field. + */ +export interface ListedEventV4 { + event: DecodedV4Event; + /** Resolved payload bytes. Empty for events without a payload. */ + body: Uint8Array; +} + +export interface ListEventsV4Result { + events: ListedEventV4[]; + /** Pagination cursor — present when more pages remain. */ + next?: string; +} + +/** + * Drive a v4 frame-stream list response into an in-memory page. Used by + * both the by-runId and by-correlationId list endpoints — the wire + * shape is identical, only the URL differs. + */ +async function consumeListFrameStream( + url: string, + config: APIConfig | undefined, + opName: string +): Promise { + const { headers: baseHeaders } = await getHttpConfig(config); + const headers = new Headers(baseHeaders); + await setAuthHeader(headers, config); + + const response = await request(url, { + method: 'GET', + headers: Object.fromEntries(headers.entries()), + dispatcher: getDispatcher(), + }); + if (response.statusCode < 200 || response.statusCode >= 300) { + const errorBody = await response.body.text(); + throw new Error(`v4 ${opName} failed: ${response.statusCode} ${errorBody}`); + } + const contentType = readHeader(response.headers, 'content-type'); + if (!contentType?.startsWith(V4_FRAME_CONTENT_TYPE)) { + throw new Error( + `v4 ${opName}: expected ${V4_FRAME_CONTENT_TYPE}, got ${contentType ?? '(none)'}` + ); + } + + // undici's `request().body` is a Node Readable; convert to a Web + // ReadableStream so the same decodeFrames implementation works in + // both Node and edge runtimes. + const webBody = (await import('node:stream')).Readable.toWeb( + response.body as unknown as import('node:stream').Readable + ) as unknown as ReadableStream; + + const events: ListedEventV4[] = []; + let next: string | undefined; + for await (const frame of decodeFrames(webBody)) { + if (frame.meta._end === 1) { + if (typeof frame.meta.next === 'string') next = frame.meta.next; + break; + } + events.push({ + event: frame.meta as unknown as DecodedV4Event, + body: frame.body, + }); + } + + return { events, ...(next ? { next } : {}) }; +} + +function paginationToQuery(params: ListEventsV4Params): string { + const sp = new URLSearchParams(); + if (params.cursor) sp.set('cursor', params.cursor); + if (params.limit !== undefined) sp.set('limit', String(params.limit)); + if (params.sortOrder) sp.set('sortOrder', params.sortOrder); + const qs = sp.toString(); + return qs ? `?${qs}` : ''; +} + +/** + * GET /api/v4/runs/:runId/events + * + * Parses the binary-frame stream into a list of events plus the + * pagination cursor (from the sentinel frame). Each frame's CBOR meta + * IS the full event entity, with the payload field still in `eventData` + * as a `RefDescriptor` (lazy); the resolved payload bytes ride in the + * frame body. The adapter layer splices them back into eventData. + * + * Eagerly drains the stream into memory to match the existing + * `getWorkflowRunEvents` page-at-a-time contract. A streaming variant + * that yields events one at a time without buffering the page would be + * a small refactor (decodeFrames is already async-iterable). + */ +export async function getWorkflowRunEventsV4( + runId: string, + params: ListEventsV4Params = {}, + config?: APIConfig +): Promise { + const { baseUrl } = await getHttpConfig(config); + const url = + `${baseUrl}/v4/runs/${encodeURIComponent(runId)}/events` + + paginationToQuery(params); + return consumeListFrameStream(url, config, 'listEvents'); +} + +/** + * GET /api/v4/events?correlationId=... + * + * Same frame stream as getWorkflowRunEventsV4 but selected by + * correlationId (GSI) instead of runId. Used by the storage adapter's + * `events.listByCorrelationId` path — the v3 client used + * `/v2/events?correlationId=...` for the equivalent query. + */ +export async function getEventsByCorrelationIdV4( + correlationId: string, + params: ListEventsV4Params = {}, + config?: APIConfig +): Promise { + const { baseUrl } = await getHttpConfig(config); + const sp = new URLSearchParams(); + sp.set('correlationId', correlationId); + if (params.cursor) sp.set('cursor', params.cursor); + if (params.limit !== undefined) sp.set('limit', String(params.limit)); + if (params.sortOrder) sp.set('sortOrder', params.sortOrder); + const url = `${baseUrl}/v4/events?${sp.toString()}`; + return consumeListFrameStream(url, config, 'listEventsByCorrelationId'); +} diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 433cedcc7c..de4e36d549 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -1,104 +1,75 @@ +/** + * world-vercel event functions — v4 wire format throughout. + * + * This module replaces the previous v2/v3 implementation. The v4 wire + * format uses a single length-prefixed binary frame layout in both + * directions: + * + * frame := [u32_be meta_len][cbor_meta][u32_be body_len][body_bytes] + * + * `cbor_meta` is the structured event metadata; `body_bytes` is the + * opaque user payload, never CBOR-decoded by the server. See + * workflow-server/lib/handlers/v4/ for the matching server-side handlers + * and ../events-v4.ts for the wire-level client. + * + * Key shape changes vs. v2/v3: + * + * - POST request body is one v4 frame (meta + payload). The response + * surfaces eventId/runId/createdAt as `x-wf-*` headers and carries + * the materialized EventResult (event/run/step/hook/wait/events/ + * cursor/hasMore) as a CBOR body — `remoteRefBehavior` in the frame + * meta still controls server-side ref resolution. + * - GET single event returns one v4 frame: the event entity in the + * frame meta, the user payload bytes in the frame body. + * - LIST events returns a stream of v4 frames terminated by a sentinel + * frame whose meta carries `{_end: 1, next?: cursor}`. The old + * per-event `/refs` round-trip is eliminated. + * + * Public function signatures are unchanged: storage.ts continues to + * wire these as `Storage['events']` and the workflow runtime sees the + * same EventResult / Event / PaginatedResponse shapes it did on + * the v3 path. + */ + import { HookNotFoundError, WorkflowWorldError } from '@workflow/errors'; import { type AnyEventRequest, type CreateEventParams, type Event, type EventResult, - EventSchema, - EventTypeSchema, type GetEventParams, - HookSchema, type ListEventsByCorrelationIdParams, type ListEventsParams, type PaginatedResponse, - PaginatedResponseSchema, stripEventDataRefs, validateUlidTimestamp, type WorkflowRun, - WorkflowRunSchema, } from '@workflow/world'; -import z from 'zod'; import { - isRefDescriptor, - type RefDescriptor, - type RefWithRunId, - resolveRefDescriptors, -} from './refs.js'; -import { - cancelWorkflowRunV1, - createWorkflowRunV1, - WorkflowRunWireBaseSchema, -} from './runs.js'; -import { deserializeStep, StepWireSchema } from './steps.js'; -import { trace } from './telemetry.js'; -import type { APIConfig } from './utils.js'; + createWorkflowRunEventV4, + type DecodedV4Event, + getEventsByCorrelationIdV4, + getEventV4, + getWorkflowRunEventsV4, +} from './events-v4.js'; +import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; +import { deserializeStep } from './steps.js'; import { + type APIConfig, DEFAULT_RESOLVE_DATA_OPTION, deserializeError, - makeRequest, } from './utils.js'; -// Wraps stripEventDataRefs to also strip the legacy eventDataRef field, -// since the server always returns lazy refs and callers with -// resolveData='none' should not see them. -function stripEventAndLegacyRefs( - event: any, - resolveData: 'none' | 'all' -): Event { - if (resolveData !== 'none') return event; - const { eventDataRef: _eventDataRef, ...withoutLegacyRef } = event; - return stripEventDataRefs(withoutLegacyRef, resolveData); -} - -// Schema for EventResult wire format returned by events.create. -// Uses wire format schemas for step to handle field name mapping. -// Two variants are used depending on `remoteRefBehavior`: -// - 'resolve': the server returns fully resolved data, so we validate the run -// with the strict WorkflowRunSchema discriminated union (e.g. status:'failed' -// requires error to be present). -// - 'lazy': the server may omit resolved fields (error may be a string or -// undefined), so we use the looser WorkflowRunWireBaseSchema and normalize -// the error via deserializeError() afterward. -const EventResultResolveWireSchema = z.object({ - event: EventSchema.optional(), - run: WorkflowRunSchema.optional(), - step: StepWireSchema.optional(), - hook: HookSchema.optional(), - events: z.array(EventSchema).optional(), - cursor: z.string().nullable().optional(), - hasMore: z.boolean().optional(), -}); - -const EventResultLazyWireSchema = z.object({ - event: EventSchema.optional(), - run: WorkflowRunWireBaseSchema.optional(), - step: StepWireSchema.optional(), - hook: HookSchema.optional(), - events: z.array(EventSchema).optional(), - cursor: z.string().nullable().optional(), - hasMore: z.boolean().optional(), -}); - -// Schema for events returned with `remoteRefBehavior=lazy`. -// Includes both `eventDataRef` (legacy, specVersion=1) and `eventData` -// (v2, specVersion=2 — may contain nested RefDescriptor values). -// specVersion defaults to 1 (legacy) when parsing responses from storage. -const EventWithRefsSchema = z.object({ - eventId: z.string(), - runId: z.string(), - eventType: EventTypeSchema, - correlationId: z.string().optional(), - eventDataRef: z.any().optional(), - eventData: z.any().optional(), - createdAt: z.coerce.date(), - specVersion: z.number().default(1), -}); - /** - * Maps event types to the field name within `eventData` that may contain - * a ref descriptor. Mirrors the server-side `resolveEventDataRefs()` mapping. + * Per-event-type map of the field within `eventData` that holds the user + * payload. Same convention used on the server side + * (workflow-server/lib/handlers/v4/events.ts PAYLOAD_FIELD_BY_EVENT_TYPE). + * + * The v4 wire encoding picks this field out of `eventData`, CBOR-encodes + * its value, and ships it as the frame body. Everything else in + * `eventData` rides in the frame's CBOR meta block. */ -const eventDataRefFieldMap: Record = { +const PAYLOAD_FIELD_BY_EVENT_TYPE: Record = { run_created: 'input', run_completed: 'output', run_failed: 'error', @@ -110,150 +81,186 @@ const eventDataRefFieldMap: Record = { hook_received: 'payload', }; -// Events where the client uses the response entity data need 'resolve' (default). -// Events where the client discards the response can use 'lazy' to skip expensive -// S3 ref resolution on the server, saving ~200-460ms per event. -const eventsNeedingResolve = new Set([ - 'run_created', // client reads result.run.runId - 'run_started', // client reads result.run (checks startedAt, status) - 'step_started', // client reads result.step (checks attempt, state) +// Events whose POST response the workflow runtime reads immediately +// (so the materialized entity must come back fully resolved). +const eventsNeedingResolve = new Set([ + 'run_created', // runtime reads result.run.runId + 'run_started', // runtime reads result.run (checks startedAt, status) + 'step_started', // runtime reads result.step (checks attempt, state) ]); -/** - * Collect all ref descriptors from a list of lazy-loaded events. - * Returns a flat array of { eventIndex, refType, fieldName?, descriptor } - * entries that can be resolved in bulk. - */ -interface PendingRef { - eventIndex: number; - /** - * 'entity' = top-level eventDataRef (legacy specVersion=1 events) - * 'nested' = nested ref descriptor within eventData (v2 events) - */ - refType: 'entity' | 'nested'; - /** The field name within eventData containing the ref (only for 'nested') */ - fieldName?: string; - descriptor: RefDescriptor; -} - -function collectPendingRefs(events: any[]): PendingRef[] { - const pending: PendingRef[] = []; +// Hook events that 404 when the hook is already disposed or never existed — +// translate to a typed HookNotFoundError so the runtime can branch on it. +const hookEventsRequiringExistence = new Set([ + 'hook_disposed', + 'hook_received', +]); - for (let i = 0; i < events.length; i++) { - const event = events[i]; +// ============================================================================= +// Helpers +// ============================================================================= + +interface SplitEventData { + /** Encoded payload bytes (undefined when the event has no user payload). */ + payload?: Uint8Array; + /** Metadata fields that ride in the v4 POST frame's CBOR meta block. */ + meta: { + deploymentId?: string; + workflowName?: string; + stepName?: string; + attempt?: number; + resumeAt?: Date; + hookToken?: string; + hookIsWebhook?: boolean; + hookIsSystem?: boolean; + errorCode?: string; + /** Structured executionContext, included verbatim in frame meta. */ + executionContext?: Record; + }; +} - // Legacy events (specVersion=1): eventDataRef is a RefDescriptor - if (event.eventDataRef && isRefDescriptor(event.eventDataRef)) { - pending.push({ - eventIndex: i, - refType: 'entity', - descriptor: event.eventDataRef, - }); - } +/** + * Split an AnyEventRequest's `eventData` into (a) the payload bytes that + * become the v4 frame body and (b) the metadata fields that become the + * CBOR-encoded meta block of the same frame. + */ +function splitEventDataForV4(data: AnyEventRequest): SplitEventData { + // Some event types in the AnyEventRequest discriminated union (e.g. + // run_cancelled) have no eventData. Cast through unknown so this + // helper can read it defensively without TS narrowing per branch. + const eventData = (( + data as unknown as { eventData?: Record } + ).eventData ?? {}) as Record; + const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[data.eventType]; + const meta: SplitEventData['meta'] = {}; + + if (typeof eventData.deploymentId === 'string') { + meta.deploymentId = eventData.deploymentId; + } + if (typeof eventData.workflowName === 'string') { + meta.workflowName = eventData.workflowName; + } + if (typeof eventData.stepName === 'string') { + meta.stepName = eventData.stepName; + } + if (typeof eventData.attempt === 'number') { + meta.attempt = eventData.attempt; + } + // wait_created passes resumeAt as a Date. cbor-x encodes Date natively + // (tag 1) and round-trips back to a Date on the server, so the runtime + // sees a real Date instance when it reads the event back. ISO strings + // are accepted as a fallback for non-runtime callers. + if (eventData.resumeAt instanceof Date) { + meta.resumeAt = eventData.resumeAt; + } else if (typeof eventData.resumeAt === 'string') { + const parsed = new Date(eventData.resumeAt); + if (!Number.isNaN(parsed.getTime())) meta.resumeAt = parsed; + } + // Runtime emits hook_created / hook_received / hook_disposed with the + // hook token in `eventData.token` (matches the world contract in + // packages/world/src/events.ts). The v4 wire encoding still calls it + // `hookToken` in the frame meta, so do the rename here. + if (typeof eventData.token === 'string') { + meta.hookToken = eventData.token; + } + if (typeof eventData.isWebhook === 'boolean') { + meta.hookIsWebhook = eventData.isWebhook; + } + if (typeof eventData.isSystem === 'boolean') { + meta.hookIsSystem = eventData.isSystem; + } + if (typeof eventData.errorCode === 'string') { + meta.errorCode = eventData.errorCode; + } + if ( + eventData.executionContext !== undefined && + eventData.executionContext !== null && + typeof eventData.executionContext === 'object' + ) { + meta.executionContext = eventData.executionContext as Record< + string, + unknown + >; + } - // V2 events: eventData may contain a nested RefDescriptor - if (event.eventData && typeof event.eventData === 'object') { - const fieldName = eventDataRefFieldMap[event.eventType as string]; - if (fieldName) { - const fieldValue = event.eventData[fieldName]; - if (isRefDescriptor(fieldValue)) { - pending.push({ - eventIndex: i, - refType: 'nested', - fieldName, - descriptor: fieldValue, - }); - } + let payload: Uint8Array | undefined; + if (payloadField && payloadField in eventData) { + const value = eventData[payloadField]; + if (value !== undefined) { + // Payload fields (input / output / result / error / payload / + // metadata) reach this layer already serialized as Uint8Array — the + // runtime calls dehydrateRunError / dehydrateStepReturnValue / etc. + // before invoking events.create. Pass the bytes through unchanged + // so runs.get and the events stream return the same raw form that + // hydrateRunError / hydrateStepIO expect. CBOR-encoding here would + // double-wrap on write and (since runs.get bypasses the v4 frame + // decode) leave the consumer with cbor(Uint8Array) rather than the + // devalue blob it was looking for. + if (!(value instanceof Uint8Array)) { + // Surface non-Uint8Array values loudly — current SDK callers go + // through the dehydrate helpers, so anything else is either a + // legacy caller or a bug. + throw new TypeError( + `world-vercel v4: eventData.${payloadField} for ${data.eventType} ` + + `must be a Uint8Array (the runtime's dehydrated wire form); ` + + `got ${typeof value === 'object' ? (value === null ? 'null' : ((value as object).constructor?.name ?? typeof value)) : typeof value}.` + ); } + payload = value; } } - return pending; + return { payload, meta }; } /** - * Hydrate lazy-loaded events by resolving all ref descriptors client-side. - * For entity-level refs (eventDataRef), the resolved value becomes eventData. - * For nested refs (eventData[field]), the resolved value replaces the descriptor. + * Turn a v4 event (frame meta + frame body) into the Event shape the + * workflow runtime expects. * - * Events are shallow-cloned before mutation to avoid corrupting any upstream - * caches (SWR, React cache, etc.) that might hold references to the originals. + * Both GET single-event and LIST use the same frame format: meta is the + * full event entity with the payload field as a RefDescriptor, body is + * the resolved payload bytes (possibly empty). This helper splices the + * body bytes into `eventData[fieldName]` unchanged — the runtime's + * hydrate helpers (hydrateStepIO, hydrateRunError, …) consume the raw + * devalue-with-format-prefix Uint8Array directly. No CBOR decode here, + * symmetric with the pass-through write in `splitEventDataForV4`. */ -async function hydrateEventRefs( - events: any[], - config?: APIConfig, - refResolveConcurrency?: number -): Promise { - const pending = collectPendingRefs(events); - if (pending.length === 0) return events; - - return trace('world.refs.hydrate', async (span) => { - span?.setAttribute('workflow.refs.hydrated_count', pending.length); - - // Deduplicate descriptors by _ref key to avoid redundant resolutions. - // Multiple events may reference the same ref (e.g., shared input). - const uniqueRefs = new Map(); - for (const p of pending) { - if (!uniqueRefs.has(p.descriptor._ref)) { - const eventRunId = events[p.eventIndex].runId as string; - uniqueRefs.set(p.descriptor._ref, { - descriptor: p.descriptor, - runId: eventRunId, - }); - } - } - const deduped = Array.from(uniqueRefs.values()); - - // Resolve unique descriptors in parallel with bounded concurrency - const dedupedResults = await resolveRefDescriptors( - deduped, - config, - refResolveConcurrency - ).catch((err) => { - const msg = err instanceof Error ? err.message : String(err); - throw new Error( - `Failed to hydrate ${pending.length} ref(s) across ${events.length} event(s): ${msg}` - ); - }); - - // Build a map from ref key → resolved value for fast lookup - const resolvedMap = new Map(); - const dedupedKeys = Array.from(uniqueRefs.keys()); - for (let i = 0; i < dedupedKeys.length; i++) { - resolvedMap.set(dedupedKeys[i], dedupedResults[i]); - } - - // Shallow-clone events that need modification, then apply resolved values - const result = [...events]; - for (let i = 0; i < pending.length; i++) { - const { eventIndex, refType, fieldName, descriptor } = pending[i]; - const resolved = resolvedMap.get(descriptor._ref); - - // Shallow-clone the event (and eventData if nested) before mutating - if (result[eventIndex] === events[eventIndex]) { - result[eventIndex] = { ...events[eventIndex] }; - } - const event = result[eventIndex]; +function buildEventFromV4( + decoded: DecodedV4Event, + payloadBody: Uint8Array, + resolveData: 'none' | 'all' +): Event { + const eventData = (decoded.eventData ?? {}) as Record; - if (refType === 'entity') { - // Legacy: eventDataRef → eventData, remove the ref field - event.eventData = resolved; - delete event.eventDataRef; - } else if (refType === 'nested' && fieldName) { - // Shallow-clone eventData before mutating if not yet cloned - if (event.eventData === events[eventIndex].eventData) { - event.eventData = { ...event.eventData }; - } - // V2: replace the nested ref descriptor with resolved value - event.eventData[fieldName] = resolved; - } - } + if (payloadBody.byteLength > 0) { + const payloadField = PAYLOAD_FIELD_BY_EVENT_TYPE[decoded.eventType]; + if (payloadField) eventData[payloadField] = payloadBody; + } - return result; - }); + const event = { + eventId: decoded.eventId, + runId: decoded.runId, + eventType: decoded.eventType, + createdAt: + decoded.createdAt instanceof Date + ? decoded.createdAt + : new Date(decoded.createdAt), + ...(decoded.correlationId ? { correlationId: decoded.correlationId } : {}), + eventData, + ...(decoded.specVersion !== undefined + ? { specVersion: decoded.specVersion } + : {}), + } as unknown as Event; + + // For resolveData='none', strip eventData entirely. Reuse the world- + // side helper so behavior stays in sync with other backends. + return resolveData === 'none' ? stripEventDataRefs(event, 'none') : event; } -// Functions +// ============================================================================= +// Public API +// ============================================================================= + export async function getEvent( runId: string, eventId: string, @@ -261,123 +268,38 @@ export async function getEvent( config?: APIConfig ): Promise { const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; - const remoteRefBehavior = resolveData === 'none' ? 'lazy' : 'resolve'; - - const searchParams = new URLSearchParams(); - searchParams.set('remoteRefBehavior', remoteRefBehavior); - - const queryString = searchParams.toString(); - const endpoint = `/v3/runs/${encodeURIComponent(runId)}/events/${encodeURIComponent(eventId)}${queryString ? `?${queryString}` : ''}`; - - const event = await makeRequest({ - endpoint, - options: { method: 'GET' }, - config, - schema: (resolveData === 'none' ? EventWithRefsSchema : EventSchema) as any, - }); - - return stripEventAndLegacyRefs(event as any, resolveData); + const { event, body } = await getEventV4(runId, eventId, config); + // Same shape as a LIST frame — splice the body bytes into + // eventData[payloadField] in buildEventFromV4. + return buildEventFromV4(event, body, resolveData); } export async function getWorkflowRunEvents( params: ListEventsParams | ListEventsByCorrelationIdParams, config?: APIConfig ): Promise> { - const searchParams = new URLSearchParams(); - const { pagination, resolveData = DEFAULT_RESOLVE_DATA_OPTION } = params; - let runId: string | undefined; - let correlationId: string | undefined; - if ('runId' in params) { - runId = params.runId; - } else { - correlationId = params.correlationId; - } - - if (!runId && !correlationId) { - throw new Error('Either runId or correlationId must be provided'); - } - - if (pagination?.limit) searchParams.set('limit', pagination.limit.toString()); - if (pagination?.cursor) searchParams.set('cursor', pagination.cursor); - if (pagination?.sortOrder) - searchParams.set('sortOrder', pagination.sortOrder); - if (correlationId) searchParams.set('correlationId', correlationId); - - // Always send 'lazy' to the server to avoid memory pressure from resolving - // all refs in memory. When resolveData is 'all', we hydrate refs client-side - // via individual ref resolution requests. - searchParams.set('remoteRefBehavior', 'lazy'); - - const queryString = searchParams.toString(); - const query = queryString ? `?${queryString}` : ''; - const endpoint = correlationId - ? `/v2/events${query}` - : `/v3/runs/${encodeURIComponent(runId!)}/events${query}`; - - let refResolveConcurrency: number | undefined; - const response = (await makeRequest({ - endpoint, - options: { method: 'GET' }, - config, - schema: PaginatedResponseSchema(EventWithRefsSchema), - onResponse: (res) => { - const header = res.headers.get('x-ref-resolve-concurrency'); - if (header) { - const parsed = parseInt(header, 10); - if (!Number.isNaN(parsed) && parsed > 0) { - refResolveConcurrency = parsed; - } - } - }, - })) as PaginatedResponse; - - if (resolveData === 'all') { - // Hydrate refs client-side: resolve all ref descriptors in parallel - const hydratedEvents = await hydrateEventRefs( - response.data, - config, - refResolveConcurrency - ); + const wirePagination = { + cursor: pagination?.cursor ?? undefined, + limit: pagination?.limit, + sortOrder: pagination?.sortOrder, + }; - // Re-parse hydrated events through EventSchema to apply type coercions - // (e.g., z.coerce.date() for resumeAt) that EventWithRefsSchema skips. - // Use safeParse to gracefully handle any events that don't match a known - // type — pass them through as-is rather than failing the entire request. - let coercionFailures = 0; - const validatedEvents = hydratedEvents.map((event: any) => { - const result = EventSchema.safeParse(event); - if (!result.success) coercionFailures++; - return result.success ? result.data : event; - }); - if (coercionFailures > 0) { - console.warn( - `[world-vercel] EventSchema coercion failed for ${coercionFailures}/${hydratedEvents.length} events` - ); - } + const result = await ('correlationId' in params + ? getEventsByCorrelationIdV4(params.correlationId, wirePagination, config) + : getWorkflowRunEventsV4(params.runId, wirePagination, config)); - return { - ...response, - data: validatedEvents, - }; - } + const events = result.events.map((listed) => + buildEventFromV4(listed.event, listed.body, resolveData) + ); - // resolveData === 'none': strip eventData and eventDataRef return { - ...response, - data: response.data.map((event: any) => - stripEventAndLegacyRefs(event, resolveData) - ), - }; + data: events, + cursor: result.next ?? null, + hasMore: Boolean(result.next), + } as PaginatedResponse; } -// Event types that require the hook to already exist — a 404 on these -// means the hook was already disposed or never created. -const hookEventsRequiringExistence = new Set([ - 'hook_disposed', - 'hook_received', -]); - export async function createWorkflowRunEvent( id: string | null, data: AnyEventRequest, @@ -387,10 +309,7 @@ export async function createWorkflowRunEvent( try { return await createWorkflowRunEventInner(id, data, params, config); } catch (err) { - // Translate 404 to HookNotFoundError for hook-related events. - // makeRequest() throws a generic WorkflowWorldError for all 404s; - // on the hook_disposed / hook_received path a 404 means the hook - // was already disposed or never created. + // 404 on hook_disposed / hook_received → already-disposed hook. if ( hookEventsRequiringExistence.has(data.eventType) && WorkflowWorldError.is(err) && @@ -409,99 +328,77 @@ async function createWorkflowRunEventInner( params?: CreateEventParams, config?: APIConfig ): Promise { - const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; - - const v1Compat = params?.v1Compat ?? false; - if (v1Compat) { + // v1Compat: caller wants the legacy entity-mutation endpoints (used + // for migrating SDKs that haven't switched to event sourcing yet). + // Keep this on v1 routes — the v4 protocol does not cover it. + if (params?.v1Compat) { if (data.eventType === 'run_cancelled' && id) { const run = await cancelWorkflowRunV1(id, params, config); return { run: run as WorkflowRun }; - } else if (data.eventType === 'run_created') { + } + if (data.eventType === 'run_created') { const run = await createWorkflowRunV1(data.eventData, config); return { run }; } - const wireResult = await makeRequest({ - endpoint: `/v1/runs/${encodeURIComponent(id!)}/events`, - options: { method: 'POST' }, - data, - config, - schema: EventSchema, - }); + throw new Error( + `world-vercel: v1Compat=true is only supported for run_created ` + + `and run_cancelled, not ${data.eventType}` + ); + } - return { event: wireResult }; + if (id === null) { + throw new WorkflowWorldError( + 'world-vercel v4: createWorkflowRunEvent requires a client-generated ' + + 'runId for run_created (the runId is part of the S3 ref key). ' + + 'Generate a wrun_ ULID before calling.', + { status: 400 } + ); } - // Validate client-provided runId timestamp is within acceptable threshold - if (data.eventType === 'run_created' && id) { + // Defensive check for client-generated run_created IDs that ride too + // far ahead of wall-clock time — same threshold the v3 path enforced. + if (data.eventType === 'run_created') { const validationError = validateUlidTimestamp(id, 'wrun_'); if (validationError) { throw new WorkflowWorldError(validationError, { status: 400 }); } } - // For run_created events, runId may be client-provided or null - const runIdPath = id === null ? 'null' : encodeURIComponent(id); - const remoteRefBehavior = eventsNeedingResolve.has(data.eventType) ? 'resolve' : 'lazy'; - // Use the strict schema when the server resolves all refs (preserves the - // WorkflowRunSchema discriminated union), and the loose wire schema when - // the server returns lazy refs (error may be a string or undefined). - if (remoteRefBehavior === 'resolve') { - const wireResult = await makeRequest({ - endpoint: `/v3/runs/${runIdPath}/events`, - options: { method: 'POST' }, - data: { - ...data, - remoteRefBehavior, - ...(params?.requestId ? { vercelId: params.requestId } : {}), - }, - config, - schema: EventResultResolveWireSchema, - }); - - return { - event: wireResult.event - ? stripEventAndLegacyRefs(wireResult.event, resolveData) - : undefined, - run: wireResult.run, - step: wireResult.step ? deserializeStep(wireResult.step) : undefined, - hook: wireResult.hook, - events: wireResult.events, - cursor: wireResult.cursor, - hasMore: wireResult.hasMore, - }; - } + const { payload, meta } = splitEventDataForV4(data); - const wireResult = await makeRequest({ - endpoint: `/v3/runs/${runIdPath}/events`, - options: { method: 'POST' }, - data: { - ...data, - remoteRefBehavior, + const result = await createWorkflowRunEventV4( + { + runId: id, + eventType: data.eventType, + specVersion: data.specVersion ?? 2, + ...(data.correlationId ? { correlationId: data.correlationId } : {}), ...(params?.requestId ? { vercelId: params.requestId } : {}), + remoteRefBehavior, + payload, + ...meta, }, - config, - schema: EventResultLazyWireSchema, - }); + config + ); - // Transform wire format to interface format. In the current event-sourced - // model, the run/step error fields are SerializedData (Uint8Array) — the - // deserializeError/deserializeStep helpers are pass-throughs that handle - // any legacy wire-format variants. + // The server already CBOR-decoded into result.body — just thread the + // fields through. Step has a wire-format adapter; runs use the + // pass-through deserializeError helper. + const body = result.body; return { - event: wireResult.event - ? stripEventAndLegacyRefs(wireResult.event, resolveData) + event: body.event as Event | undefined, + run: body.run + ? deserializeError(body.run as Record) : undefined, - run: wireResult.run - ? deserializeError(wireResult.run) + step: body.step + ? deserializeStep(body.step as Parameters[0]) : undefined, - step: wireResult.step ? deserializeStep(wireResult.step) : undefined, - hook: wireResult.hook, - events: wireResult.events, - cursor: wireResult.cursor, - hasMore: wireResult.hasMore, + hook: body.hook as EventResult['hook'], + events: body.events as EventResult['events'], + cursor: body.cursor ?? undefined, + hasMore: body.hasMore, }; } diff --git a/packages/world-vercel/src/frames.test.ts b/packages/world-vercel/src/frames.test.ts new file mode 100644 index 0000000000..b2e302531f --- /dev/null +++ b/packages/world-vercel/src/frames.test.ts @@ -0,0 +1,181 @@ +import { decode, encode } from 'cbor-x'; +import { describe, expect, it } from 'vitest'; +import { + type DecodedFrame, + decodeFrames, + encodeFrame, + V4_FRAME_CONTENT_TYPE, +} from './frames.js'; + +/** Server's wire encoder (matches the workflow-server/lib/handlers/v4/frames.ts + * end-frame helper). Re-implemented here so the client tests don't depend on + * importing from another package. */ +function encodeEndFrame(next?: string): Uint8Array { + const meta: Record = { _end: 1 }; + if (next) meta.next = next; + return encodeFrame(meta, new Uint8Array(0)); +} + +/** Build a ReadableStream that yields `payload` in fixed-size chunks. Used to + * stress chunk-boundary handling in the decoder. */ +function streamOf(payload: Uint8Array, chunkSize: number) { + let offset = 0; + return new ReadableStream({ + pull(controller) { + if (offset >= payload.byteLength) { + controller.close(); + return; + } + const end = Math.min(offset + chunkSize, payload.byteLength); + controller.enqueue(payload.subarray(offset, end)); + offset = end; + }, + }); +} + +async function drainFrames( + source: ReadableStream +): Promise { + const out: DecodedFrame[] = []; + for await (const f of decodeFrames(source)) out.push(f); + return out; +} + +describe('encodeFrame', () => { + it('produces the canonical wire layout', () => { + const meta = { eventId: 'evnt_abc', n: 42 }; + const body = new Uint8Array([1, 2, 3, 4, 5]); + const frame = encodeFrame(meta, body); + const view = new DataView(frame.buffer); + const metaLen = view.getUint32(0, false); + expect(decode(frame.subarray(4, 4 + metaLen))).toEqual(meta); + const bodyLen = view.getUint32(4 + metaLen, false); + expect(bodyLen).toBe(body.byteLength); + expect(frame.subarray(4 + metaLen + 4)).toEqual(body); + }); +}); + +describe('decodeFrames', () => { + it('round-trips a single frame', async () => { + const meta = { eventType: 'run_created', eventId: 'evnt_1' }; + const body = new TextEncoder().encode('{"hello":"world"}'); + const stream = streamOf( + new Uint8Array([...encodeFrame(meta, body), ...encodeEndFrame()]), + 4096 + ); + const frames = await drainFrames(stream); + expect(frames).toHaveLength(2); + expect(frames[0].meta).toEqual(meta); + expect(frames[0].body).toEqual(body); + expect(frames[1].meta).toEqual({ _end: 1 }); + }); + + it('round-trips multiple frames with cursor', async () => { + const body1 = new TextEncoder().encode('one'); + const body2 = new Uint8Array(64).fill(0xab); + const parts = [ + encodeFrame({ eventId: 'a' }, body1), + encodeFrame({ eventId: 'b' }, body2), + encodeEndFrame('cursor-xyz'), + ]; + let total = 0; + for (const p of parts) total += p.byteLength; + const flat = new Uint8Array(total); + let off = 0; + for (const p of parts) { + flat.set(p, off); + off += p.byteLength; + } + const frames = await drainFrames(streamOf(flat, 256)); + expect(frames).toHaveLength(3); + expect(frames[0].meta).toEqual({ eventId: 'a' }); + expect(frames[0].body).toEqual(body1); + expect(frames[1].meta).toEqual({ eventId: 'b' }); + expect(frames[1].body).toEqual(body2); + expect(frames[2].meta).toEqual({ _end: 1, next: 'cursor-xyz' }); + expect(frames[2].body.byteLength).toBe(0); + }); + + it('handles delivery in 1-byte chunks (worst-case chunk boundary)', async () => { + const body = new Uint8Array(1024); + for (let i = 0; i < body.length; i++) body[i] = (i * 13 + 5) & 0xff; + const flat = new Uint8Array([ + ...encodeFrame({ eventType: 'big', n: 99 }, body), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, 1)); + expect(frames).toHaveLength(2); + expect(frames[0].meta).toEqual({ eventType: 'big', n: 99 }); + expect(frames[0].body).toEqual(body); + expect(frames[1].meta).toEqual({ _end: 1 }); + }); + + it('handles a 64 KB body split across many small chunks', async () => { + const body = new Uint8Array(64 * 1024); + for (let i = 0; i < body.length; i++) body[i] = (i * 7) & 0xff; + const flat = new Uint8Array([ + ...encodeFrame({ eventId: 'big' }, body), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, 37)); + expect(frames[0].body.byteLength).toBe(body.byteLength); + expect(frames[0].body[0]).toBe(body[0]); + expect(frames[0].body[body.length - 1]).toBe(body[body.length - 1]); + }); + + it('handles frames whose body contains bytes that look like length prefixes', async () => { + // 0xff bytes that could trip up a parser that scans for u32 patterns + // rather than honoring the explicit length prefixes. + const body = new Uint8Array(32).fill(0xff); + const flat = new Uint8Array([ + ...encodeFrame({ eventId: 'tricky' }, body), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, 7)); + expect(frames[0].body).toEqual(body); + }); + + it('handles back-to-back frames in a single chunk', async () => { + const flat = new Uint8Array([ + ...encodeFrame({ id: 1 }, new Uint8Array([10, 20, 30])), + ...encodeFrame({ id: 2 }, new Uint8Array([40, 50, 60])), + ...encodeFrame({ id: 3 }, new Uint8Array(0)), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, flat.byteLength)); + expect(frames).toHaveLength(4); + expect(frames[2].body.byteLength).toBe(0); + expect(frames[3].meta._end).toBe(1); + }); + + it('throws when the stream ends mid-frame', async () => { + const partial = encodeFrame({ x: 1 }, new Uint8Array(100)).slice(0, 20); + const stream = streamOf(partial, 1024); + await expect(drainFrames(stream)).rejects.toThrow(/truncated/); + }); + + it('preserves CBOR types in meta (numbers, booleans, arrays)', async () => { + const meta = { + eventId: 'mix', + attempt: 4, + isWebhook: true, + tags: ['a', 'b'], + n: 12345, + }; + const flat = new Uint8Array([ + ...encodeFrame(meta, new Uint8Array(0)), + ...encodeEndFrame(), + ]); + const frames = await drainFrames(streamOf(flat, 32)); + expect(frames[0].meta).toEqual(meta); + expect(typeof frames[0].meta.attempt).toBe('number'); + expect(typeof frames[0].meta.isWebhook).toBe('boolean'); + expect(Array.isArray(frames[0].meta.tags)).toBe(true); + }); +}); + +describe('V4_FRAME_CONTENT_TYPE', () => { + it('matches the server-side content type', () => { + expect(V4_FRAME_CONTENT_TYPE).toBe('application/vnd.workflow.v4-frames'); + }); +}); diff --git a/packages/world-vercel/src/frames.ts b/packages/world-vercel/src/frames.ts new file mode 100644 index 0000000000..2b24ac8edc --- /dev/null +++ b/packages/world-vercel/src/frames.ts @@ -0,0 +1,110 @@ +/** + * Length-prefixed binary frame codec for the v4 list-events response. + * + * Mirrors the server-side encoder in + * workflow-server/lib/handlers/v4/frames.ts. Wire format: + * + * list-response := frame* end-frame + * frame := u32_be(meta_len) || cbor_meta || u32_be(body_len) || body_bytes + * end-frame := u32_be(meta_len) || cbor_meta {_end: 1, next?: string} || u32_be(0) + */ + +import { decode, encode } from 'cbor-x'; + +export const V4_FRAME_CONTENT_TYPE = 'application/vnd.workflow.v4-frames'; + +export interface DecodedFrame { + meta: Record; + body: Uint8Array; +} + +/** Test/utility: encode a complete frame. Production server uses prefix + * + streaming body. */ +export function encodeFrame( + meta: Record, + body: Uint8Array +): Uint8Array { + const metaBytes = new Uint8Array(encode(meta)); + const out = new Uint8Array(4 + metaBytes.byteLength + 4 + body.byteLength); + const view = new DataView(out.buffer); + view.setUint32(0, metaBytes.byteLength, false); + out.set(metaBytes, 4); + view.setUint32(4 + metaBytes.byteLength, body.byteLength, false); + out.set(body, 4 + metaBytes.byteLength + 4); + return out; +} + +/** + * Async-iterable parser for a frame stream. Yields one `DecodedFrame` + * per frame in source order, terminating at the sentinel frame whose + * meta contains `_end: 1`. The sentinel frame itself IS yielded — the + * caller inspects `meta._end` to detect end-of-stream and reads + * `meta.next` for the pagination cursor. + * + * Survives arbitrary chunk boundaries from the source stream, including + * splits that fall in the middle of a u32 length prefix or in the + * middle of the CBOR meta block. + */ +export async function* decodeFrames( + source: ReadableStream +): AsyncGenerator { + const reader = source.getReader(); + // Accumulating buffer of bytes we've read but not yet consumed. + let buffer = new Uint8Array(0); + + const refill = async (needed: number): Promise => { + while (buffer.byteLength < needed) { + const { done, value } = await reader.read(); + if (done) return false; + if (!value || value.byteLength === 0) continue; + const next = new Uint8Array(buffer.byteLength + value.byteLength); + next.set(buffer, 0); + next.set(value, buffer.byteLength); + buffer = next; + } + return true; + }; + + const take = (n: number): Uint8Array => { + const out = buffer.subarray(0, n); + buffer = buffer.subarray(n); + return out; + }; + + while (true) { + if (!(await refill(4))) return; + const metaLen = new DataView(buffer.buffer, buffer.byteOffset, 4).getUint32( + 0, + false + ); + take(4); + + if (!(await refill(metaLen))) { + throw new Error('decodeFrames: truncated meta block'); + } + const meta = decode(take(metaLen)) as Record; + + if (!(await refill(4))) { + throw new Error('decodeFrames: truncated body length'); + } + const bodyLen = new DataView(buffer.buffer, buffer.byteOffset, 4).getUint32( + 0, + false + ); + take(4); + + if (bodyLen > 0) { + if (!(await refill(bodyLen))) { + throw new Error('decodeFrames: truncated body bytes'); + } + // Slice (not subarray) so the yielded body owns its bytes — + // subsequent reads into the buffer won't overwrite it. + yield { meta, body: buffer.slice(0, bodyLen) }; + take(bodyLen); + } else { + yield { meta, body: new Uint8Array(0) }; + } + + if (meta._end === 1) return; + } +} diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts deleted file mode 100644 index e703db66c2..0000000000 --- a/packages/world-vercel/src/refs.ts +++ /dev/null @@ -1,210 +0,0 @@ -import { WorkflowWorldError } from '@workflow/errors'; -import { decode } from 'cbor-x'; -import { getDispatcher } from './http-client.js'; -import { - ErrorType, - getSpanKind, - HttpRequestMethod, - HttpResponseStatusCode, - PeerService, - trace, - UrlFull, -} from './telemetry.js'; -import { type APIConfig, getHttpConfig } from './utils.js'; - -/** - * A ref descriptor as returned by workflow-server when `remoteRefBehavior=lazy`. - * Matches the server-side `RefDescriptor` type in `lib/data/remote-ref.ts`. - */ -export interface RefDescriptor { - _type: 'RemoteRef'; - _ref: string; - /** Base64-encoded inline payload. Present only for dbrf: (inline) refs. */ - _data?: string; - /** Content type of the inline payload. Present only for dbrf: refs. */ - _ct?: string; -} - -/** - * Checks if a value is a RefDescriptor object. - */ -export function isRefDescriptor(value: unknown): value is RefDescriptor { - return ( - typeof value === 'object' && - value !== null && - '_type' in value && - '_ref' in value && - typeof (value as { _ref: unknown })._ref === 'string' && - (value as { _type: string })._type === 'RemoteRef' - ); -} - -/** - * Maximum number of concurrent ref resolution requests. - * Limits peak concurrency to avoid overwhelming the server. - */ -const REF_RESOLVE_CONCURRENCY = 10; - -/** - * Resolve a single ref descriptor. - * - * For inline refs (dbrf: prefix), the data is decoded locally from the - * descriptor's `_data` field — no network request is needed. - * - * For S3 refs (s3rf:) and Redis refs (kvrf:), a request is made to the - * `GET /v2/runs/:runId/refs` endpoint on workflow-server which returns - * raw CBOR or binary bytes. - * - * @param descriptor - The ref descriptor to resolve - * @param runId - The runId that owns this ref (used in the URL path) - * @param config - API configuration - */ -export async function resolveRefDescriptor( - descriptor: RefDescriptor, - runId: string, - config?: APIConfig -): Promise { - const ref = descriptor._ref; - - // Inline refs (dbrf:) carry their data in the descriptor — decode locally - if (ref.startsWith('dbrf:')) { - if (!descriptor._data) { - throw new Error(`Inline ref descriptor missing _data field: ${ref}`); - } - const contentType = descriptor._ct ?? 'application/cbor'; - const binaryData = Buffer.from(descriptor._data, 'base64'); - if (contentType === 'application/octet-stream') { - // Buffer is a Uint8Array subclass — return directly to avoid a copy. - return binaryData; - } - // CBOR-encoded data — decode it. Buffer is accepted by cbor-x directly. - return decode(binaryData); - } - - // Remote refs (s3rf:, kvrf:) — fetch raw bytes from the server. - // The server returns the raw stored bytes directly (not wrapped in a - // JSON/CBOR envelope). The Content-Type may be 'application/cbor' (for - // CBOR-encoded data) or 'application/octet-stream' (for raw binary like - // Uint8Array). We handle both content types directly rather than going - // through makeRequest, which only handles JSON/CBOR API responses. - const { baseUrl, headers } = await getHttpConfig(config); - const endpoint = `/v2/runs/${encodeURIComponent(runId)}/refs?ref=${encodeURIComponent(ref)}`; - const url = `${baseUrl}${endpoint}`; - - // Set headers that makeRequest normally adds: Accept for content - // negotiation and X-Request-Time to bypass RSC request memoization. - headers.set('Accept', 'application/cbor, application/octet-stream'); - headers.set('X-Request-Time', Date.now().toString()); - - return trace( - 'http GET', - { kind: await getSpanKind('CLIENT') }, - async (span) => { - span?.setAttributes({ - ...HttpRequestMethod('GET'), - ...UrlFull(url), - ...PeerService('workflow-server'), - }); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit - const response = await fetch(url, { - method: 'GET', - headers, - dispatcher: getDispatcher(), - } as any); - - span?.setAttributes({ - ...HttpResponseStatusCode(response.status), - }); - - if (!response.ok) { - const error = new WorkflowWorldError( - `Failed to resolve ref: HTTP ${response.status}`, - { url, status: response.status } - ); - span?.setAttributes({ - ...ErrorType(`HTTP ${response.status}`), - }); - span?.recordException?.(error); - throw error; - } - - const contentType = response.headers.get('content-type') || ''; - const buffer = await response.arrayBuffer(); - - if (contentType.includes('application/octet-stream')) { - // Raw binary data (e.g., Uint8Array stored by the workflow) - return new Uint8Array(buffer); - } - - // CBOR-encoded data (the common case for structured values) - return decode(new Uint8Array(buffer)); - } - ); -} - -/** - * A ref descriptor paired with the runId that owns it, for resolution. - */ -export interface RefWithRunId { - descriptor: RefDescriptor; - runId: string; -} - -/** - * Resolve multiple ref descriptors in parallel with bounded concurrency. - * - * If any ref in a batch fails, the batch rejects and remaining batches - * are aborted to avoid cascading failures. - * - * @param refs - Array of ref descriptors with their owning runIds - * @param config - API configuration - * @param concurrency - Max concurrent ref resolution requests. Falls back to REF_RESOLVE_CONCURRENCY. - * @returns Array of resolved values in the same order as input - */ -export async function resolveRefDescriptors( - refs: RefWithRunId[], - config?: APIConfig, - concurrency?: number -): Promise { - if (refs.length === 0) return []; - - const limit = concurrency ?? REF_RESOLVE_CONCURRENCY; - - return trace('world.refs.resolve', async (span) => { - const inlineCount = refs.filter((r) => - r.descriptor._ref.startsWith('dbrf:') - ).length; - const remoteCount = refs.length - inlineCount; - - span?.setAttributes({ - 'workflow.refs.total_count': refs.length, - 'workflow.refs.inline_count': inlineCount, - 'workflow.refs.remote_count': remoteCount, - 'workflow.refs.concurrency_limit': limit, - }); - - // Simple case: if under concurrency limit, resolve all at once - if (refs.length <= limit) { - return Promise.all( - refs.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config)) - ); - } - - // Batch with bounded concurrency. If any ref in a batch fails, - // the batch rejects and remaining batches are aborted to avoid - // cascading failures. - const results: unknown[] = new Array(refs.length); - for (let i = 0; i < refs.length; i += limit) { - const batch = refs.slice(i, i + limit); - const batchResults = await Promise.all( - batch.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config)) - ); - for (let j = 0; j < batchResults.length; j++) { - results[i + j] = batchResults[j]; - } - } - - return results; - }); -} diff --git a/packages/world-vercel/src/utils.test.ts b/packages/world-vercel/src/utils.test.ts index 5a077ba3d5..59a0995121 100644 --- a/packages/world-vercel/src/utils.test.ts +++ b/packages/world-vercel/src/utils.test.ts @@ -1,6 +1,13 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { getHeaders, getHttpConfig, getHttpUrl } from './utils.js'; +// On this v4 PR branch, WORKFLOW_SERVER_URL_OVERRIDE in utils.ts is set +// to the workflow-server PR #439 preview deployment so e2e tests can run +// against the v4 server. Several tests below check what happens when the +// inline override is empty — when this override is reset to '' before +// merging to main, the affected expectations need to flip back too. +const V4_SERVER_URL_OVERRIDE = 'https://workflow-server-git-peter-v4.vercel.sh'; + vi.mock('@vercel/oidc', () => ({ getVercelOidcToken: vi.fn().mockRejectedValue(new Error('no OIDC')), })); @@ -19,16 +26,20 @@ describe('getHttpUrl', () => { }); it('uses default workflow-server URL when no config and no env override', () => { + // v4-branch: inline override wins over the default. On main this + // would be 'https://vercel-workflow.com/api'. expect(getHttpUrl()).toEqual({ - baseUrl: 'https://vercel-workflow.com/api', + baseUrl: `${V4_SERVER_URL_OVERRIDE}/api`, usingProxy: false, }); }); it('respects VERCEL_WORKFLOW_SERVER_URL when set (no proxy)', () => { process.env.VERCEL_WORKFLOW_SERVER_URL = 'https://custom-host.example.com'; + // v4-branch: inline override wins over the env var. On main this + // would be 'https://custom-host.example.com/api'. expect(getHttpUrl()).toEqual({ - baseUrl: 'https://custom-host.example.com/api', + baseUrl: `${V4_SERVER_URL_OVERRIDE}/api`, usingProxy: false, }); }); @@ -77,15 +88,21 @@ describe('getHeaders', () => { }); it('omits x-vercel-workflow-api-url when override is unset', () => { + // v4-branch: inline override is set, so the header IS sent (with the + // override URL). On main with override='' the header would be null. const headers = getHeaders(undefined, { usingProxy: true }); - expect(headers.get('x-vercel-workflow-api-url')).toBeNull(); + expect(headers.get('x-vercel-workflow-api-url')).toBe( + V4_SERVER_URL_OVERRIDE + ); }); it('sets x-vercel-workflow-api-url when VERCEL_WORKFLOW_SERVER_URL is set and using proxy', () => { process.env.VERCEL_WORKFLOW_SERVER_URL = 'https://custom.example.com'; + // v4-branch: inline override wins over the env var. On main the + // header would be 'https://custom.example.com'. const headers = getHeaders(undefined, { usingProxy: true }); expect(headers.get('x-vercel-workflow-api-url')).toBe( - 'https://custom.example.com' + V4_SERVER_URL_OVERRIDE ); }); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 8acefbaa69..4c229fc467 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -58,8 +58,13 @@ function httpLog( * Inline workflow-server URL override. Must remain an empty string on * `main` — rewritten by external CI for branch-deployment testing. * Prefer `VERCEL_WORKFLOW_SERVER_URL` for deployment-time configuration. + * + * On this v4 branch, set to the workflow-server PR-#439 preview deployment + * (https://github.com/vercel/workflow-server/pull/439) so e2e tests run + * against the v4-enabled server. Reset to '' before merging to main. */ -const WORKFLOW_SERVER_URL_OVERRIDE = ''; +const WORKFLOW_SERVER_URL_OVERRIDE = + 'https://workflow-server-git-peter-v4.vercel.sh'; /** * Per-request timeout for HTTP calls to workflow-server (in ms).