Skip to content

Commit 2fc68fd

Browse files
committed
fix(core): buffer session-stream chunks even when handlers exist
Persistent listeners registered via `session.in.on(...)` (e.g. chat.agent's `stopInput.on` for the stop signal) must not 'consume' chunks. They filter by `kind` and ignore non-matching chunks, so previously `#dispatch` was silently dropping any chunk that arrived before a once-waiter had registered. This race surfaced on test cloud (network round-trip > sync subscribe-time) but not locally (zero-latency). Symptom: chat.agent's first user message landed in S2 before `messagesInput.waitWithIdleTimeout` registered its waiter, the tail received it, `#dispatch` saw the `stopInput` handler and returned without buffering, the message was gone, the waitWithIdleTimeout fell through to a durable waitpoint, and the race-check skipped seq 0 (since the tail's onPart had advanced `lastSeqNum` to 0). Fix: when no once-waiter exists, invoke handlers AND buffer the chunk. Handlers observe; they don't consume.
1 parent a47b0ec commit 2fc68fd

1 file changed

Lines changed: 7 additions & 5 deletions

File tree

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,13 @@ export class StandardSessionStreamManager implements SessionStreamManager {
297297
return;
298298
}
299299

300-
const handlers = this.handlers.get(key);
301-
if (handlers && handlers.size > 0) {
302-
this.#invokeHandlers(key, data);
303-
return;
304-
}
300+
// Persistent handlers (e.g. `stopInput.on(...)`) get a copy of the chunk,
301+
// but they don't "consume" it — handlers usually filter by `kind` and
302+
// ignore chunks they don't care about. Buffer the chunk regardless so a
303+
// subsequent `once()` (e.g. `messagesInput.waitWithIdleTimeout` in
304+
// chat.agent's preload) can still pick up the same chunk that arrived
305+
// before its waiter was registered.
306+
this.#invokeHandlers(key, data);
305307

306308
let buffered = this.buffer.get(key);
307309
if (!buffered) {

0 commit comments

Comments
 (0)