Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion packages/core/src/webAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,51 @@ import {
type FirewallConfig,
} from "./security/actionFirewall.js";

/**
* Iterate an async iterable, bounding each step with an inactivity timeout.
*
* The AI SDK's `streamText` `timeout`/`abortSignal` bound the initial request,
* but consuming `fullStream` with a bare `for await` is not independently
* bounded: a provider that stalls mid-stream (partial data then silence, or a
* half-open TCP connection that never sends a FIN) can leave the iteration
* pending forever. This wrapper races each `next()` against `timeoutMs` so a
* stalled stream rejects with a descriptive error instead of hanging. The error
* is caught by the caller's existing try/catch and surfaced like any other
* generation failure. On timeout the underlying iterator's `return()` is invoked
* (best effort) so the stream's resources are released.
*/
export async function* iterateWithInactivityTimeout<T>(
iterable: AsyncIterable<T>,
timeoutMs: number,
): AsyncGenerator<T> {
const iterator = iterable[Symbol.asyncIterator]();
try {
while (true) {
let timer: ReturnType<typeof setTimeout>;
const timeout = new Promise<never>((_, reject) => {
timer = setTimeout(
() => reject(new Error(`LLM stream stalled: no data received for ${timeoutMs}ms`)),
timeoutMs,
);
});
let result: IteratorResult<T>;
try {
result = await Promise.race([iterator.next(), timeout]);
} finally {
clearTimeout(timer!);
}
if (result.done) {
return;
}
yield result.value;
}
} finally {
// Release the underlying stream when iteration ends early (timeout, a
// downstream throw, or a `break` in the consumer).
await iterator.return?.();
}
}

// === Type Definitions ===

export interface WebAgentOptions {
Expand Down Expand Up @@ -1118,7 +1163,17 @@ export class WebAgent {
let reasoningText = "";
let reasoningEmitted = false;

for await (const part of streamResult.fullStream) {
// The `timeout`/`abortSignal` passed to streamText above bound the
// initial request, but the `for await` over `fullStream` is not
// independently bounded: if the provider stalls mid-stream (partial
// data then silence, or a zombie TCP connection that never sends a
// FIN), the iteration can hang indefinitely and freeze the agent with
// no error and no progress. Bound each chunk wait with an inactivity
// timeout so a stalled stream surfaces as an error instead.
for await (const part of iterateWithInactivityTimeout(
streamResult.fullStream,
this.llmProviderTimeoutMs,
)) {
switch (part.type) {
case "reasoning-start":
// Start accumulating reasoning
Expand Down
76 changes: 75 additions & 1 deletion packages/core/test/webAgent.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { WebAgent, WebAgentOptions } from "../src/webAgent.js";
import { WebAgent, WebAgentOptions, iterateWithInactivityTimeout } from "../src/webAgent.js";
import { InvalidHostnameError } from "../src/security/actionFirewall.js";
import {
AriaBrowser,
Expand Down Expand Up @@ -5068,3 +5068,77 @@ describe("WebAgent firewall options", () => {
).not.toThrow();
});
});

describe("iterateWithInactivityTimeout", () => {
// Uses real timers: the helper races each next() against a real setTimeout,
// so a tiny timeout lets the stall fire quickly without fake-timer plumbing.
it("forwards every value of a stream that keeps producing", async () => {
async function* stream() {
yield 1;
yield 2;
yield 3;
}

const seen: number[] = [];
for await (const value of iterateWithInactivityTimeout(stream(), 1000)) {
seen.push(value);
}

expect(seen).toEqual([1, 2, 3]);
});

it("rejects when the stream stalls mid-iteration longer than the timeout", async () => {
// Yields once, then never produces another value — models a provider that
// sends partial data and then goes silent.
const stalling: AsyncIterable<number> = {
[Symbol.asyncIterator]() {
let emitted = false;
return {
next() {
if (!emitted) {
emitted = true;
return Promise.resolve({ value: 1, done: false });
}
return new Promise(() => {}); // never resolves
},
};
},
};

const seen: number[] = [];
let error: unknown;
try {
for await (const value of iterateWithInactivityTimeout(stalling, 20)) {
seen.push(value);
}
} catch (e) {
error = e;
}

expect(seen).toEqual([1]);
expect(error).toBeInstanceOf(Error);
expect((error as Error).message).toMatch(/stalled/i);
});

it("calls the underlying iterator's return() when it stalls out", async () => {
const returnSpy = vi.fn(() => Promise.resolve({ value: undefined, done: true as const }));
const stalling: AsyncIterable<number> = {
[Symbol.asyncIterator]() {
return {
next() {
return new Promise(() => {}); // never resolves
},
return: returnSpy,
};
},
};

await expect(async () => {
for await (const _ of iterateWithInactivityTimeout(stalling, 20)) {
// no-op
}
}).rejects.toThrow(/stalled/i);

expect(returnSpy).toHaveBeenCalledTimes(1);
});
});