Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/orphan-ui-chunks-on-resume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/ai": patch
---

`WorkflowChatTransport` now drops orphan UI chunks (deltas/ends with no matching `*-start` in the resumed window) when reconnecting with an `initialStartIndex` not matching a UI chunk boundary, instead of throwing.
4 changes: 4 additions & 0 deletions docs/content/docs/v4/ai/resumable-streams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ This avoids replaying potentially thousands of chunks and lets the UI render fas
When using a negative `initialStartIndex`, the reconnection endpoint **must** return the `x-workflow-stream-tail-index` header (as shown in [Step 2](#add-a-stream-reconnection-endpoint) above). The transport uses this header to compute absolute chunk positions so that retries after a disconnect resume from the correct position. If the header is missing, the transport falls back to `startIndex: 0` (replaying the entire stream) and logs a warning.
</Callout>

### Mid-part resumes

A workflow stream is a flat sequence of chunks, but the AI SDK's UI protocol groups chunks into logical parts (`text-*`, `reasoning-*`, `tool-input-*`) that must be opened with a `*-start` before any `*-delta` or `*-end`. A non-zero `startIndex` can land in the middle of an open part. See [`WorkflowChatTransport` → Mid-part resumes](/docs/api-reference/workflow-ai/workflow-chat-transport#mid-part-resumes) for how this is handled and an example of rewinding to a step boundary on the server.

## Related Documentation

- [`WorkflowChatTransport` API Reference](/docs/api-reference/workflow-ai/workflow-chat-transport) - Full configuration options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,45 @@ export default function ChatWithCustomConfig() {
}
```

## Mid-part resumes

A workflow stream is a flat sequence of chunks, but the AI SDK's UI protocol groups chunks into logical parts: a `text-start` opens a text part that subsequent `text-delta`s extend and a `text-end` closes, and the same shape applies to `reasoning-*` and `tool-input-*`. The AI SDK client enforces that grammar — a `reasoning-delta` whose `reasoning-start` was never seen throws and breaks the chat.

A non-zero `startIndex` (in particular a negative `initialStartIndex`) resolves to a chunk offset with no awareness of those part boundaries, so it can land in the middle of an open part. When that happens, `WorkflowChatTransport` will **drop chunks that reference a part it didn't see a start for** and log a one-time warning. The chat keeps working, but any partial part overlapping the resume cursor is discarded.

To preserve those partial parts, rewind to a step boundary on the server before returning the readable. `start-step` / `finish-step` chunks are the natural seams — no UI part is ever open across them. Sketch:

{/*@skip-typecheck: incomplete code sample*/}

```typescript title="app/api/chat/[id]/stream/route.ts"
const run = getRun(id);
const tailIndex = await run.getReadable().getTailIndex();

let resolved = startIndex < 0
? Math.max(0, tailIndex + 1 + startIndex)
: startIndex;

if (startIndex !== 0) {
// Walk back from `resolved` to the most recent start-step (or chunk 0),
// capping the lookback so a single huge step can't trigger an unbounded scan.
const LOOKBACK = 200;
const probe = run.getReadable({ startIndex: Math.max(0, resolved - LOOKBACK) });
let i = Math.max(0, resolved - LOOKBACK);
let lastBoundary = i;
for await (const chunk of probe as unknown as AsyncIterable<{ type: string }>) {
if (i >= resolved) break;
if (chunk.type === "start-step") lastBoundary = i;
i++;
}
resolved = lastBoundary;
}

return createUIMessageStreamResponse({
stream: run.getReadable({ startIndex: resolved }),
headers: { "x-workflow-stream-tail-index": String(tailIndex) },
});
```

## See Also

- [DurableAgent](/docs/api-reference/workflow-ai/durable-agent) - Building durable AI agents within workflows
Expand Down
4 changes: 4 additions & 0 deletions docs/content/docs/v5/ai/resumable-streams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ This avoids replaying potentially thousands of chunks and lets the UI render fas
When using a negative `initialStartIndex`, the reconnection endpoint **must** return the `x-workflow-stream-tail-index` header (as shown in [Step 2](#add-a-stream-reconnection-endpoint) above). The transport uses this header to compute absolute chunk positions so that retries after a disconnect resume from the correct position. If the header is missing, the transport falls back to `startIndex: 0` (replaying the entire stream) and logs a warning.
</Callout>

### Mid-part resumes

A workflow stream is a flat sequence of chunks, but the AI SDK's UI protocol groups chunks into logical parts (`text-*`, `reasoning-*`, `tool-input-*`) that must be opened with a `*-start` before any `*-delta` or `*-end`. A non-zero `startIndex` can land in the middle of an open part. See [`WorkflowChatTransport` → Mid-part resumes](/docs/api-reference/workflow-ai/workflow-chat-transport#mid-part-resumes) for how this is handled and an example of rewinding to a step boundary on the server.

## Related Documentation

- [`WorkflowChatTransport` API Reference](/docs/api-reference/workflow-ai/workflow-chat-transport) - Full configuration options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,45 @@ export default function ChatWithCustomConfig() {
}
```

## Mid-part resumes

A workflow stream is a flat sequence of chunks, but the AI SDK's UI protocol groups chunks into logical parts: a `text-start` opens a text part that subsequent `text-delta`s extend and a `text-end` closes, and the same shape applies to `reasoning-*` and `tool-input-*`. The AI SDK client enforces that grammar — a `reasoning-delta` whose `reasoning-start` was never seen throws and breaks the chat.

A non-zero `startIndex` (in particular a negative `initialStartIndex`) resolves to a chunk offset with no awareness of those part boundaries, so it can land in the middle of an open part. When that happens, `WorkflowChatTransport` will **drop chunks that reference a part it didn't see a start for** and log a one-time warning. The chat keeps working, but any partial part overlapping the resume cursor is discarded.

To preserve those partial parts, rewind to a step boundary on the server before returning the readable. `start-step` / `finish-step` chunks are the natural seams — no UI part is ever open across them. Sketch:

{/*@skip-typecheck: incomplete code sample*/}

```typescript title="app/api/chat/[id]/stream/route.ts"
const run = getRun(id);
const tailIndex = await run.getReadable().getTailIndex();

let resolved = startIndex < 0
? Math.max(0, tailIndex + 1 + startIndex)
: startIndex;

if (startIndex !== 0) {
// Walk back from `resolved` to the most recent start-step (or chunk 0),
// capping the lookback so a single huge step can't trigger an unbounded scan.
const LOOKBACK = 200;
const probe = run.getReadable({ startIndex: Math.max(0, resolved - LOOKBACK) });
let i = Math.max(0, resolved - LOOKBACK);
let lastBoundary = i;
for await (const chunk of probe as unknown as AsyncIterable<{ type: string }>) {
if (i >= resolved) break;
if (chunk.type === "start-step") lastBoundary = i;
i++;
}
resolved = lastBoundary;
}

return createUIMessageStreamResponse({
stream: run.getReadable({ startIndex: resolved }),
headers: { "x-workflow-stream-tail-index": String(tailIndex) },
});
```

## See Also

- [DurableAgent](/docs/api-reference/workflow-ai/durable-agent) - Building durable AI agents within workflows
Expand Down
186 changes: 186 additions & 0 deletions packages/ai/src/workflow-chat-transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,192 @@ describe('WorkflowChatTransport', () => {
});
});

describe('orphan UI chunk filter on negative startIndex resume', () => {
function makeSSEStream(...events: string[]) {
return new ReadableStream({
start(controller) {
for (const event of events) {
controller.enqueue(new TextEncoder().encode(`data: ${event}\n\n`));
}
controller.close();
},
});
}

async function collect(stream: ReadableStream<unknown>) {
const reader = stream.getReader();
const out: unknown[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
out.push(value);
}
return out;
}

it('drops orphan reasoning-delta / reasoning-end when no prior reasoning-start', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});

const transport = new WorkflowChatTransport({
fetch: mockFetch,
initialStartIndex: -50,
});

// Reproduces the exact pattern from issue #1835: a resume that lands
// inside an open reasoning-0 part. The deltas + end for it should be
// dropped; everything after the next start-step should flow through.
mockFetch.mockResolvedValueOnce({
ok: true,
headers: new Headers({ 'x-workflow-stream-tail-index': '99' }),
body: makeSSEStream(
'{"type":"reasoning-delta","id":"reasoning-0","delta":" space"}',
'{"type":"reasoning-delta","id":"reasoning-0","delta":"."}',
'{"type":"reasoning-end","id":"reasoning-0"}',
'{"type":"finish-step"}',
'{"type":"start-step"}',
'{"type":"reasoning-start","id":"reasoning-1"}',
'{"type":"reasoning-delta","id":"reasoning-1","delta":"hello"}',
'{"type":"reasoning-end","id":"reasoning-1"}',
'{"type":"finish"}'
),
});

const stream = await transport.reconnectToStream({ chatId: 'test-chat' });
const chunks = (await collect(stream!)) as Array<{ type: string }>;

const types = chunks.map((c) => c.type);
expect(types).toEqual([
// orphan reasoning-0 deltas+end dropped
'finish-step',
'start-step',
'reasoning-start',
'reasoning-delta',
'reasoning-end',
'finish',
]);

expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('Dropping orphan UI chunk')
);
// Warning is emitted only once per resume even if multiple chunks drop.
expect(warnSpy).toHaveBeenCalledTimes(1);

warnSpy.mockRestore();
});

it('drops orphan text-delta and text-end without a prior text-start', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});

const transport = new WorkflowChatTransport({
fetch: mockFetch,
initialStartIndex: -10,
});

mockFetch.mockResolvedValueOnce({
ok: true,
headers: new Headers({ 'x-workflow-stream-tail-index': '50' }),
body: makeSSEStream(
'{"type":"text-delta","id":"text-0","delta":"orphan"}',
'{"type":"text-end","id":"text-0"}',
'{"type":"finish"}'
),
});

const stream = await transport.reconnectToStream({ chatId: 'test-chat' });
const chunks = (await collect(stream!)) as Array<{ type: string }>;

expect(chunks.map((c) => c.type)).toEqual(['finish']);
expect(warnSpy).toHaveBeenCalledTimes(1);

warnSpy.mockRestore();
});

it('drops orphan tool-input-delta / tool-output-available without a prior tool-input-start', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});

const transport = new WorkflowChatTransport({
fetch: mockFetch,
initialStartIndex: -10,
});

mockFetch.mockResolvedValueOnce({
ok: true,
headers: new Headers({ 'x-workflow-stream-tail-index': '50' }),
body: makeSSEStream(
'{"type":"tool-input-delta","toolCallId":"call_1","inputTextDelta":"x"}',
'{"type":"tool-output-available","toolCallId":"call_1","output":{}}',
'{"type":"finish"}'
),
});

const stream = await transport.reconnectToStream({ chatId: 'test-chat' });
const chunks = (await collect(stream!)) as Array<{ type: string }>;

expect(chunks.map((c) => c.type)).toEqual(['finish']);
expect(warnSpy).toHaveBeenCalledTimes(1);

warnSpy.mockRestore();
});

it('passes through chunks whose *-start was emitted in the resumed window', async () => {
const transport = new WorkflowChatTransport({
fetch: mockFetch,
initialStartIndex: -10,
});

// tool-input-start IS in the resumed window, so its later
// tool-output-available should pass through untouched.
mockFetch.mockResolvedValueOnce({
ok: true,
headers: new Headers({ 'x-workflow-stream-tail-index': '50' }),
body: makeSSEStream(
'{"type":"tool-input-start","toolCallId":"call_1","toolName":"grep"}',
'{"type":"tool-input-available","toolCallId":"call_1","toolName":"grep","input":{}}',
'{"type":"tool-output-available","toolCallId":"call_1","output":{"ok":true}}',
'{"type":"finish"}'
),
});

const stream = await transport.reconnectToStream({ chatId: 'test-chat' });
const chunks = (await collect(stream!)) as Array<{ type: string }>;

expect(chunks.map((c) => c.type)).toEqual([
'tool-input-start',
'tool-input-available',
'tool-output-available',
'finish',
]);
});

it('does not activate the filter for non-negative startIndex', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});

const transport = new WorkflowChatTransport({
fetch: mockFetch,
// Default initialStartIndex = 0 → filter inactive.
});

// Even orphan-looking deltas should pass through unchanged because the
// caller didn't ask for a tail-only resume.
mockFetch.mockResolvedValueOnce({
ok: true,
headers: new Headers(),
body: makeSSEStream(
'{"type":"reasoning-delta","id":"reasoning-0","delta":"x"}',
'{"type":"finish"}'
),
});

const stream = await transport.reconnectToStream({ chatId: 'test-chat' });
const chunks = (await collect(stream!)) as Array<{ type: string }>;

expect(chunks.map((c) => c.type)).toEqual(['reasoning-delta', 'finish']);
expect(warnSpy).not.toHaveBeenCalled();

warnSpy.mockRestore();
});
});

describe('reconnection error formatting', () => {
it('should format object errors with JSON instead of [object Object]', async () => {
const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
Expand Down
Loading
Loading