Skip to content

Commit 0dbb5df

Browse files
authored
stream: respect iter consumer abort signals
Delegate broadcast.push() and share.pull() calls with per-consumer AbortSignals through pull(), so pending next() calls reject when the signal aborts. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63997 Fixes: #63302 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Filip Skokan <panva.ip@gmail.com>
1 parent e9a9065 commit 0dbb5df

4 files changed

Lines changed: 36 additions & 2 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class BroadcastImpl {
128128
// own internal AbortController that follows the external signal.
129129
// When no transforms, return rawConsumer directly (controller elided
130130
// per PULL-02 optimization -- no transforms means no signal recipient).
131-
if (transforms.length > 0) {
131+
if (transforms.length > 0 || options?.signal) {
132132
const pullArgs = [...transforms];
133133
if (options?.signal) {
134134
ArrayPrototypePush(pullArgs,

lib/internal/streams/iter/share.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class ShareImpl {
9797
const { transforms, options } = parsePullArgs(args);
9898
const rawConsumer = this.#createRawConsumer();
9999

100-
if (transforms.length > 0) {
100+
if (transforms.length > 0 || options?.signal) {
101101
if (options) {
102102
return pullWithTransforms(rawConsumer, ...transforms, options);
103103
}

test/parallel/test-stream-iter-broadcast-basic.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,19 @@ async function testPendingNextSettlesAfterReturn() {
174174
assert.strictEqual(result.value, undefined);
175175
}
176176

177+
async function testPushAbortSignalRejectsPendingNext() {
178+
const ac = new AbortController();
179+
const reason = new Error('push aborted');
180+
const { broadcast: bc } = broadcast();
181+
const iter = bc.push({ signal: ac.signal })[Symbol.asyncIterator]();
182+
183+
const pendingNext = iter.next();
184+
const rejected = assert.rejects(pendingNext, (error) => error === reason);
185+
ac.abort(reason);
186+
187+
await rejected;
188+
}
189+
177190
// =============================================================================
178191
// Writer fail detaches consumers
179192
// =============================================================================
@@ -300,6 +313,7 @@ Promise.all([
300313
testCancelWithReason(),
301314
testCancelWithFalsyReason(),
302315
testPendingNextSettlesAfterReturn(),
316+
testPushAbortSignalRejectsPendingNext(),
303317
testFailDetachesConsumers(),
304318
testWriterFailIdempotent(),
305319
testLateJoinerSeesBufferedData(),

test/parallel/test-stream-iter-share-async.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,25 @@ async function testShareAbortSignalWhileSourcePullPending() {
196196
await Promise.all([rejected1, rejected2]);
197197
}
198198

199+
async function testSharePullAbortSignalRejectsPendingNext() {
200+
const ac = new AbortController();
201+
const reason = new Error('pull aborted');
202+
const shared = share(
203+
// eslint-disable-next-line require-yield
204+
(async function* never() {
205+
await new Promise(() => {});
206+
})(),
207+
);
208+
const iter = shared.pull({ signal: ac.signal })[Symbol.asyncIterator]();
209+
210+
const pendingNext = iter.next();
211+
const rejected = assert.rejects(pendingNext, (error) => error === reason);
212+
ac.abort(reason);
213+
214+
await rejected;
215+
shared.cancel();
216+
}
217+
199218
async function testShareAlreadyAborted() {
200219
const shared = share(from('data'), { signal: AbortSignal.abort() });
201220
const consumer = shared.pull();
@@ -340,6 +359,7 @@ Promise.all([
340359
testShareCancelWithReason(),
341360
testShareAbortSignal(),
342361
testShareAbortSignalWhileSourcePullPending(),
362+
testSharePullAbortSignalRejectsPendingNext(),
343363
testShareAlreadyAborted(),
344364
testShareSourceError(),
345365
testShareLateJoiningConsumer(),

0 commit comments

Comments
 (0)