From 37fa317b72f7616a3082efe7064b430d020ab10b Mon Sep 17 00:00:00 2001 From: MohammadYusif Date: Thu, 4 Jun 2026 22:18:20 +0300 Subject: [PATCH] fix(core): bound streamText fullStream iteration with an inactivity timeout (#515) The `timeout`/`abortSignal` passed to `streamText` in `WebAgent.generateAndProcessAction` only bound the initial request. The `for await (...streamResult.fullStream)` that consumes the stream was not independently bounded, so a provider that stalled mid-stream (partial data then silence, or a half-open TCP connection that never sends a FIN) could leave the iteration pending forever and freeze the agent with no error and no progress. Wrap the stream consumption in `iterateWithInactivityTimeout`, which races each `next()` against the existing `llmProviderTimeoutMs` as an inactivity window and rejects with a descriptive error if no chunk arrives in time. The rejection flows into the existing try/catch and is surfaced like any other generation failure; the underlying iterator's `return()` is invoked on exit so the stream's resources are released. Reuses the configured provider timeout rather than adding a new knob, mirroring the request-level bound. Same indefinite-hang class as the aria-tree frame.evaluate fix in #511. --- packages/core/src/webAgent.ts | 57 +++++++++++++++++++++- packages/core/test/webAgent.test.ts | 76 ++++++++++++++++++++++++++++- 2 files changed, 131 insertions(+), 2 deletions(-) diff --git a/packages/core/src/webAgent.ts b/packages/core/src/webAgent.ts index aae5f813..da859c14 100644 --- a/packages/core/src/webAgent.ts +++ b/packages/core/src/webAgent.ts @@ -66,6 +66,51 @@ import { type FirewallConfig, } from "./security/actionFirewall.js"; +/** + * Iterate an async iterable, bounding each step with an inactivity timeout. + * + * The AI SDK's `streamText` `timeout`/`abortSignal` bound the initial request, + * but consuming `fullStream` with a bare `for await` is not independently + * bounded: a provider that stalls mid-stream (partial data then silence, or a + * half-open TCP connection that never sends a FIN) can leave the iteration + * pending forever. This wrapper races each `next()` against `timeoutMs` so a + * stalled stream rejects with a descriptive error instead of hanging. The error + * is caught by the caller's existing try/catch and surfaced like any other + * generation failure. On timeout the underlying iterator's `return()` is invoked + * (best effort) so the stream's resources are released. + */ +export async function* iterateWithInactivityTimeout( + iterable: AsyncIterable, + timeoutMs: number, +): AsyncGenerator { + const iterator = iterable[Symbol.asyncIterator](); + try { + while (true) { + let timer: ReturnType; + const timeout = new Promise((_, reject) => { + timer = setTimeout( + () => reject(new Error(`LLM stream stalled: no data received for ${timeoutMs}ms`)), + timeoutMs, + ); + }); + let result: IteratorResult; + try { + result = await Promise.race([iterator.next(), timeout]); + } finally { + clearTimeout(timer!); + } + if (result.done) { + return; + } + yield result.value; + } + } finally { + // Release the underlying stream when iteration ends early (timeout, a + // downstream throw, or a `break` in the consumer). + await iterator.return?.(); + } +} + // === Type Definitions === export interface WebAgentOptions { @@ -1014,7 +1059,17 @@ export class WebAgent { let reasoningText = ""; let reasoningEmitted = false; - for await (const part of streamResult.fullStream) { + // The `timeout`/`abortSignal` passed to streamText above bound the + // initial request, but the `for await` over `fullStream` is not + // independently bounded: if the provider stalls mid-stream (partial + // data then silence, or a zombie TCP connection that never sends a + // FIN), the iteration can hang indefinitely and freeze the agent with + // no error and no progress. Bound each chunk wait with an inactivity + // timeout so a stalled stream surfaces as an error instead. + for await (const part of iterateWithInactivityTimeout( + streamResult.fullStream, + this.llmProviderTimeoutMs, + )) { switch (part.type) { case "reasoning-start": // Start accumulating reasoning diff --git a/packages/core/test/webAgent.test.ts b/packages/core/test/webAgent.test.ts index cb475a8b..e15e9547 100644 --- a/packages/core/test/webAgent.test.ts +++ b/packages/core/test/webAgent.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; -import { WebAgent, WebAgentOptions } from "../src/webAgent.js"; +import { WebAgent, WebAgentOptions, iterateWithInactivityTimeout } from "../src/webAgent.js"; import { InvalidHostnameError } from "../src/security/actionFirewall.js"; import { AriaBrowser, @@ -4999,3 +4999,77 @@ describe("WebAgent firewall options", () => { ).not.toThrow(); }); }); + +describe("iterateWithInactivityTimeout", () => { + // Uses real timers: the helper races each next() against a real setTimeout, + // so a tiny timeout lets the stall fire quickly without fake-timer plumbing. + it("forwards every value of a stream that keeps producing", async () => { + async function* stream() { + yield 1; + yield 2; + yield 3; + } + + const seen: number[] = []; + for await (const value of iterateWithInactivityTimeout(stream(), 1000)) { + seen.push(value); + } + + expect(seen).toEqual([1, 2, 3]); + }); + + it("rejects when the stream stalls mid-iteration longer than the timeout", async () => { + // Yields once, then never produces another value — models a provider that + // sends partial data and then goes silent. + const stalling: AsyncIterable = { + [Symbol.asyncIterator]() { + let emitted = false; + return { + next() { + if (!emitted) { + emitted = true; + return Promise.resolve({ value: 1, done: false }); + } + return new Promise(() => {}); // never resolves + }, + }; + }, + }; + + const seen: number[] = []; + let error: unknown; + try { + for await (const value of iterateWithInactivityTimeout(stalling, 20)) { + seen.push(value); + } + } catch (e) { + error = e; + } + + expect(seen).toEqual([1]); + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toMatch(/stalled/i); + }); + + it("calls the underlying iterator's return() when it stalls out", async () => { + const returnSpy = vi.fn(() => Promise.resolve({ value: undefined, done: true as const })); + const stalling: AsyncIterable = { + [Symbol.asyncIterator]() { + return { + next() { + return new Promise(() => {}); // never resolves + }, + return: returnSpy, + }; + }, + }; + + await expect(async () => { + for await (const _ of iterateWithInactivityTimeout(stalling, 20)) { + // no-op + } + }).rejects.toThrow(/stalled/i); + + expect(returnSpy).toHaveBeenCalledTimes(1); + }); +});