diff --git a/.changeset/orphan-ui-chunks-on-resume.md b/.changeset/orphan-ui-chunks-on-resume.md new file mode 100644 index 0000000000..35ae5264b1 --- /dev/null +++ b/.changeset/orphan-ui-chunks-on-resume.md @@ -0,0 +1,5 @@ +--- +"@workflow/ai": patch +--- + +`WorkflowChatTransport` now drops orphan UI chunks (deltas/ends with no matching `*-start` in the resumed window) when reconnecting with an `initialStartIndex` not matching a UI chunk boundary, instead of throwing. diff --git a/docs/content/docs/v4/ai/resumable-streams.mdx b/docs/content/docs/v4/ai/resumable-streams.mdx index 1e11672a00..9a373a7129 100644 --- a/docs/content/docs/v4/ai/resumable-streams.mdx +++ b/docs/content/docs/v4/ai/resumable-streams.mdx @@ -194,6 +194,10 @@ This avoids replaying potentially thousands of chunks and lets the UI render fas When using a negative `initialStartIndex`, the reconnection endpoint **must** return the `x-workflow-stream-tail-index` header (as shown in [Step 2](#add-a-stream-reconnection-endpoint) above). The transport uses this header to compute absolute chunk positions so that retries after a disconnect resume from the correct position. If the header is missing, the transport falls back to `startIndex: 0` (replaying the entire stream) and logs a warning. +### Mid-part resumes + +A workflow stream is a flat sequence of chunks, but the AI SDK's UI protocol groups chunks into logical parts (`text-*`, `reasoning-*`, `tool-input-*`) that must be opened with a `*-start` before any `*-delta` or `*-end`. A non-zero `startIndex` can land in the middle of an open part. See [`WorkflowChatTransport` → Mid-part resumes](/docs/api-reference/workflow-ai/workflow-chat-transport#mid-part-resumes) for how this is handled and an example of rewinding to a step boundary on the server. + ## Related Documentation - [`WorkflowChatTransport` API Reference](/docs/api-reference/workflow-ai/workflow-chat-transport) - Full configuration options diff --git a/docs/content/docs/v4/api-reference/workflow-ai/workflow-chat-transport.mdx b/docs/content/docs/v4/api-reference/workflow-ai/workflow-chat-transport.mdx index f89583e5ec..175f9acad2 100644 --- a/docs/content/docs/v4/api-reference/workflow-ai/workflow-chat-transport.mdx +++ b/docs/content/docs/v4/api-reference/workflow-ai/workflow-chat-transport.mdx @@ -250,6 +250,45 @@ export default function ChatWithCustomConfig() { } ``` +## Mid-part resumes + +A workflow stream is a flat sequence of chunks, but the AI SDK's UI protocol groups chunks into logical parts: a `text-start` opens a text part that subsequent `text-delta`s extend and a `text-end` closes, and the same shape applies to `reasoning-*` and `tool-input-*`. The AI SDK client enforces that grammar — a `reasoning-delta` whose `reasoning-start` was never seen throws and breaks the chat. + +A non-zero `startIndex` (in particular a negative `initialStartIndex`) resolves to a chunk offset with no awareness of those part boundaries, so it can land in the middle of an open part. When that happens, `WorkflowChatTransport` will **drop chunks that reference a part it didn't see a start for** and log a one-time warning. The chat keeps working, but any partial part overlapping the resume cursor is discarded. + +To preserve those partial parts, rewind to a step boundary on the server before returning the readable. `start-step` / `finish-step` chunks are the natural seams — no UI part is ever open across them. Sketch: + +{/*@skip-typecheck: incomplete code sample*/} + +```typescript title="app/api/chat/[id]/stream/route.ts" +const run = getRun(id); +const tailIndex = await run.getReadable().getTailIndex(); + +let resolved = startIndex < 0 + ? Math.max(0, tailIndex + 1 + startIndex) + : startIndex; + +if (startIndex !== 0) { + // Walk back from `resolved` to the most recent start-step (or chunk 0), + // capping the lookback so a single huge step can't trigger an unbounded scan. + const LOOKBACK = 200; + const probe = run.getReadable({ startIndex: Math.max(0, resolved - LOOKBACK) }); + let i = Math.max(0, resolved - LOOKBACK); + let lastBoundary = i; + for await (const chunk of probe as unknown as AsyncIterable<{ type: string }>) { + if (i >= resolved) break; + if (chunk.type === "start-step") lastBoundary = i; + i++; + } + resolved = lastBoundary; +} + +return createUIMessageStreamResponse({ + stream: run.getReadable({ startIndex: resolved }), + headers: { "x-workflow-stream-tail-index": String(tailIndex) }, +}); +``` + ## See Also - [DurableAgent](/docs/api-reference/workflow-ai/durable-agent) - Building durable AI agents within workflows diff --git a/docs/content/docs/v5/ai/resumable-streams.mdx b/docs/content/docs/v5/ai/resumable-streams.mdx index 1e11672a00..9a373a7129 100644 --- a/docs/content/docs/v5/ai/resumable-streams.mdx +++ b/docs/content/docs/v5/ai/resumable-streams.mdx @@ -194,6 +194,10 @@ This avoids replaying potentially thousands of chunks and lets the UI render fas When using a negative `initialStartIndex`, the reconnection endpoint **must** return the `x-workflow-stream-tail-index` header (as shown in [Step 2](#add-a-stream-reconnection-endpoint) above). The transport uses this header to compute absolute chunk positions so that retries after a disconnect resume from the correct position. If the header is missing, the transport falls back to `startIndex: 0` (replaying the entire stream) and logs a warning. +### Mid-part resumes + +A workflow stream is a flat sequence of chunks, but the AI SDK's UI protocol groups chunks into logical parts (`text-*`, `reasoning-*`, `tool-input-*`) that must be opened with a `*-start` before any `*-delta` or `*-end`. A non-zero `startIndex` can land in the middle of an open part. See [`WorkflowChatTransport` → Mid-part resumes](/docs/api-reference/workflow-ai/workflow-chat-transport#mid-part-resumes) for how this is handled and an example of rewinding to a step boundary on the server. + ## Related Documentation - [`WorkflowChatTransport` API Reference](/docs/api-reference/workflow-ai/workflow-chat-transport) - Full configuration options diff --git a/docs/content/docs/v5/api-reference/workflow-ai/workflow-chat-transport.mdx b/docs/content/docs/v5/api-reference/workflow-ai/workflow-chat-transport.mdx index f89583e5ec..175f9acad2 100644 --- a/docs/content/docs/v5/api-reference/workflow-ai/workflow-chat-transport.mdx +++ b/docs/content/docs/v5/api-reference/workflow-ai/workflow-chat-transport.mdx @@ -250,6 +250,45 @@ export default function ChatWithCustomConfig() { } ``` +## Mid-part resumes + +A workflow stream is a flat sequence of chunks, but the AI SDK's UI protocol groups chunks into logical parts: a `text-start` opens a text part that subsequent `text-delta`s extend and a `text-end` closes, and the same shape applies to `reasoning-*` and `tool-input-*`. The AI SDK client enforces that grammar — a `reasoning-delta` whose `reasoning-start` was never seen throws and breaks the chat. + +A non-zero `startIndex` (in particular a negative `initialStartIndex`) resolves to a chunk offset with no awareness of those part boundaries, so it can land in the middle of an open part. When that happens, `WorkflowChatTransport` will **drop chunks that reference a part it didn't see a start for** and log a one-time warning. The chat keeps working, but any partial part overlapping the resume cursor is discarded. + +To preserve those partial parts, rewind to a step boundary on the server before returning the readable. `start-step` / `finish-step` chunks are the natural seams — no UI part is ever open across them. Sketch: + +{/*@skip-typecheck: incomplete code sample*/} + +```typescript title="app/api/chat/[id]/stream/route.ts" +const run = getRun(id); +const tailIndex = await run.getReadable().getTailIndex(); + +let resolved = startIndex < 0 + ? Math.max(0, tailIndex + 1 + startIndex) + : startIndex; + +if (startIndex !== 0) { + // Walk back from `resolved` to the most recent start-step (or chunk 0), + // capping the lookback so a single huge step can't trigger an unbounded scan. + const LOOKBACK = 200; + const probe = run.getReadable({ startIndex: Math.max(0, resolved - LOOKBACK) }); + let i = Math.max(0, resolved - LOOKBACK); + let lastBoundary = i; + for await (const chunk of probe as unknown as AsyncIterable<{ type: string }>) { + if (i >= resolved) break; + if (chunk.type === "start-step") lastBoundary = i; + i++; + } + resolved = lastBoundary; +} + +return createUIMessageStreamResponse({ + stream: run.getReadable({ startIndex: resolved }), + headers: { "x-workflow-stream-tail-index": String(tailIndex) }, +}); +``` + ## See Also - [DurableAgent](/docs/api-reference/workflow-ai/durable-agent) - Building durable AI agents within workflows diff --git a/packages/ai/src/workflow-chat-transport.test.ts b/packages/ai/src/workflow-chat-transport.test.ts index cf969ef92c..1d2e5399c1 100644 --- a/packages/ai/src/workflow-chat-transport.test.ts +++ b/packages/ai/src/workflow-chat-transport.test.ts @@ -469,6 +469,192 @@ describe('WorkflowChatTransport', () => { }); }); + describe('orphan UI chunk filter on negative startIndex resume', () => { + function makeSSEStream(...events: string[]) { + return new ReadableStream({ + start(controller) { + for (const event of events) { + controller.enqueue(new TextEncoder().encode(`data: ${event}\n\n`)); + } + controller.close(); + }, + }); + } + + async function collect(stream: ReadableStream) { + const reader = stream.getReader(); + const out: unknown[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + out.push(value); + } + return out; + } + + it('drops orphan reasoning-delta / reasoning-end when no prior reasoning-start', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const transport = new WorkflowChatTransport({ + fetch: mockFetch, + initialStartIndex: -50, + }); + + // Reproduces the exact pattern from issue #1835: a resume that lands + // inside an open reasoning-0 part. The deltas + end for it should be + // dropped; everything after the next start-step should flow through. + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Headers({ 'x-workflow-stream-tail-index': '99' }), + body: makeSSEStream( + '{"type":"reasoning-delta","id":"reasoning-0","delta":" space"}', + '{"type":"reasoning-delta","id":"reasoning-0","delta":"."}', + '{"type":"reasoning-end","id":"reasoning-0"}', + '{"type":"finish-step"}', + '{"type":"start-step"}', + '{"type":"reasoning-start","id":"reasoning-1"}', + '{"type":"reasoning-delta","id":"reasoning-1","delta":"hello"}', + '{"type":"reasoning-end","id":"reasoning-1"}', + '{"type":"finish"}' + ), + }); + + const stream = await transport.reconnectToStream({ chatId: 'test-chat' }); + const chunks = (await collect(stream!)) as Array<{ type: string }>; + + const types = chunks.map((c) => c.type); + expect(types).toEqual([ + // orphan reasoning-0 deltas+end dropped + 'finish-step', + 'start-step', + 'reasoning-start', + 'reasoning-delta', + 'reasoning-end', + 'finish', + ]); + + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('Dropping orphan UI chunk') + ); + // Warning is emitted only once per resume even if multiple chunks drop. + expect(warnSpy).toHaveBeenCalledTimes(1); + + warnSpy.mockRestore(); + }); + + it('drops orphan text-delta and text-end without a prior text-start', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const transport = new WorkflowChatTransport({ + fetch: mockFetch, + initialStartIndex: -10, + }); + + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Headers({ 'x-workflow-stream-tail-index': '50' }), + body: makeSSEStream( + '{"type":"text-delta","id":"text-0","delta":"orphan"}', + '{"type":"text-end","id":"text-0"}', + '{"type":"finish"}' + ), + }); + + const stream = await transport.reconnectToStream({ chatId: 'test-chat' }); + const chunks = (await collect(stream!)) as Array<{ type: string }>; + + expect(chunks.map((c) => c.type)).toEqual(['finish']); + expect(warnSpy).toHaveBeenCalledTimes(1); + + warnSpy.mockRestore(); + }); + + it('drops orphan tool-input-delta / tool-output-available without a prior tool-input-start', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const transport = new WorkflowChatTransport({ + fetch: mockFetch, + initialStartIndex: -10, + }); + + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Headers({ 'x-workflow-stream-tail-index': '50' }), + body: makeSSEStream( + '{"type":"tool-input-delta","toolCallId":"call_1","inputTextDelta":"x"}', + '{"type":"tool-output-available","toolCallId":"call_1","output":{}}', + '{"type":"finish"}' + ), + }); + + const stream = await transport.reconnectToStream({ chatId: 'test-chat' }); + const chunks = (await collect(stream!)) as Array<{ type: string }>; + + expect(chunks.map((c) => c.type)).toEqual(['finish']); + expect(warnSpy).toHaveBeenCalledTimes(1); + + warnSpy.mockRestore(); + }); + + it('passes through chunks whose *-start was emitted in the resumed window', async () => { + const transport = new WorkflowChatTransport({ + fetch: mockFetch, + initialStartIndex: -10, + }); + + // tool-input-start IS in the resumed window, so its later + // tool-output-available should pass through untouched. + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Headers({ 'x-workflow-stream-tail-index': '50' }), + body: makeSSEStream( + '{"type":"tool-input-start","toolCallId":"call_1","toolName":"grep"}', + '{"type":"tool-input-available","toolCallId":"call_1","toolName":"grep","input":{}}', + '{"type":"tool-output-available","toolCallId":"call_1","output":{"ok":true}}', + '{"type":"finish"}' + ), + }); + + const stream = await transport.reconnectToStream({ chatId: 'test-chat' }); + const chunks = (await collect(stream!)) as Array<{ type: string }>; + + expect(chunks.map((c) => c.type)).toEqual([ + 'tool-input-start', + 'tool-input-available', + 'tool-output-available', + 'finish', + ]); + }); + + it('does not activate the filter for non-negative startIndex', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const transport = new WorkflowChatTransport({ + fetch: mockFetch, + // Default initialStartIndex = 0 → filter inactive. + }); + + // Even orphan-looking deltas should pass through unchanged because the + // caller didn't ask for a tail-only resume. + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Headers(), + body: makeSSEStream( + '{"type":"reasoning-delta","id":"reasoning-0","delta":"x"}', + '{"type":"finish"}' + ), + }); + + const stream = await transport.reconnectToStream({ chatId: 'test-chat' }); + const chunks = (await collect(stream!)) as Array<{ type: string }>; + + expect(chunks.map((c) => c.type)).toEqual(['reasoning-delta', 'finish']); + expect(warnSpy).not.toHaveBeenCalled(); + + warnSpy.mockRestore(); + }); + }); + describe('reconnection error formatting', () => { it('should format object errors with JSON instead of [object Object]', async () => { const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); diff --git a/packages/ai/src/workflow-chat-transport.ts b/packages/ai/src/workflow-chat-transport.ts index a7724c1e8b..4ac8c8480a 100644 --- a/packages/ai/src/workflow-chat-transport.ts +++ b/packages/ai/src/workflow-chat-transport.ts @@ -11,6 +11,75 @@ import { import { getErrorMessage } from './get-error-message.js'; import { iteratorToStream, streamToIterator } from './stream-iterator.js'; +/** + * Tracks `*-start` chunks the client has accepted so we can drop deltas/ends + * that refer to a part whose start was emitted before the resume cursor. + * + * AI SDK's UI stream processor throws on `text-delta`/`reasoning-delta`/ + * `tool-input-delta` (and the matching `*-end`) when the start chunk for that + * id was never observed. A negative `startIndex` on a flat chunk stream can + * easily land mid-part, so without this guard the client crashes on resume. + * + * This is a best-effort safety net — it preserves only the parts that the + * resumed window includes a `*-start` for. Server-side rewinding to a step + * boundary is the proper fix when you want the full message preserved. + */ +type OrphanFilter = { + shouldDrop: (chunk: UIMessageChunk) => boolean; +}; + +function createOrphanFilter(): OrphanFilter { + const seenStartedIds = new Set(); + const seenStartedToolCallIds = new Set(); + let warnedOnce = false; + + function warnOnce(orphanKind: string, orphanRef: string) { + if (warnedOnce) return; + warnedOnce = true; + console.warn( + '[WorkflowChatTransport] Dropping orphan UI chunk ' + + `(${orphanKind} for id "${orphanRef}") on resume — ` + + 'the resume position landed mid-part. The dropped chunk(s) ' + + "reference a part whose start chunk wasn't in the resumed " + + 'window. To preserve the full message, configure your ' + + 'stream endpoint to rewind to a step boundary before ' + + 'returning the readable. See: ' + + 'https://workflow.dev/docs/ai/resumable-streams#mid-part-resumes' + ); + } + + function shouldDrop(chunk: UIMessageChunk): boolean { + switch (chunk.type) { + case 'text-start': + case 'reasoning-start': + seenStartedIds.add(chunk.id); + return false; + case 'tool-input-start': + seenStartedToolCallIds.add(chunk.toolCallId); + return false; + case 'text-delta': + case 'text-end': + case 'reasoning-delta': + case 'reasoning-end': + if (seenStartedIds.has(chunk.id)) return false; + warnOnce(chunk.type, chunk.id); + return true; + case 'tool-input-delta': + case 'tool-input-available': + case 'tool-input-error': + case 'tool-output-available': + case 'tool-output-error': + if (seenStartedToolCallIds.has(chunk.toolCallId)) return false; + warnOnce(chunk.type, chunk.toolCallId); + return true; + default: + return false; + } + } + + return { shouldDrop }; +} + export interface SendMessagesOptions { trigger: 'submit-message' | 'regenerate-message'; chatId: string; @@ -333,6 +402,17 @@ export class WorkflowChatTransport // the incremental chunkIndex which would be wrong. let replayFromStart = false; + // When resuming with a negative startIndex, the resolved chunk can land in + // the middle of a `*-start` / `*-delta` / `*-end` sequence, which crashes + // the AI SDK UI stream processor. The orphan filter drops chunks whose + // start chunk was emitted before the resume window. Only activated for + // negative resumes — non-negative startIndex is the caller's explicit + // choice and we trust them. See: https://github.com/vercel/workflow/issues/1835 + const orphanFilter = + useExplicitStartIndex && explicitStartIndex < 0 + ? createOrphanFilter() + : null; + while (!gotFinish) { const startIndex = useExplicitStartIndex ? explicitStartIndex @@ -391,6 +471,8 @@ export class WorkflowChatTransport chunkIndex++; + if (orphanFilter?.shouldDrop(chunk.value)) continue; + yield chunk.value; if (chunk.value.type === 'finish') {