Skip to content

Commit 890329e

Browse files
committed
stream: propagate abort reason in share and broadcast
Pass signal.reason to the multi-consumer cancel paths so signal abort is reported as AbortError instead of clean iterator completion. Also make detached share consumers rethrow a stored source error when they resume after cancellation, preserving the abort reason for pending pulls. Fixes: #63357 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent c0327d3 commit 890329e

4 files changed

Lines changed: 64 additions & 25 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ function broadcast(options = { __proto__: null }) {
688688
broadcastImpl.setWriter(writer);
689689

690690
if (signal) {
691-
onSignalAbort(signal, () => broadcastImpl.cancel());
691+
onSignalAbort(signal, () => broadcastImpl.cancel(signal.reason));
692692
}
693693

694694
return { __proto__: null, writer, broadcast: broadcastImpl };

lib/internal/streams/iter/share.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class ShareImpl {
144144
// cursor must re-pull rather than terminating prematurely.
145145
for (;;) {
146146
if (state.detached) {
147+
if (self.#sourceError) throw self.#sourceError;
147148
return { __proto__: null, done: true, value: undefined };
148149
}
149150

@@ -657,7 +658,7 @@ function share(source, options = { __proto__: null }) {
657658
const shareImpl = new ShareImpl(normalized, opts);
658659

659660
if (signal) {
660-
onSignalAbort(signal, () => shareImpl.cancel());
661+
onSignalAbort(signal, () => shareImpl.cancel(signal.reason));
661662
}
662663

663664
return shareImpl;

test/parallel/test-stream-iter-broadcast-from.js

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,13 @@ async function testBroadcastFromMultipleConsumers() {
6666
async function testAbortSignal() {
6767
const ac = new AbortController();
6868
const { broadcast: bc } = broadcast({ signal: ac.signal });
69-
const consumer = bc.push();
69+
const iter = bc.push()[Symbol.asyncIterator]();
70+
const read = iter.next();
71+
const rejected = assert.rejects(read, { name: 'AbortError' });
7072

7173
ac.abort();
7274

73-
const batches = [];
74-
for await (const batch of consumer) {
75-
batches.push(batch);
76-
}
77-
assert.strictEqual(batches.length, 0);
75+
await rejected;
7876
}
7977

8078
async function testAlreadyAbortedSignal() {
@@ -84,11 +82,12 @@ async function testAlreadyAbortedSignal() {
8482
const { broadcast: bc } = broadcast({ signal: ac.signal });
8583
const consumer = bc.push();
8684

87-
const batches = [];
88-
for await (const batch of consumer) {
89-
batches.push(batch);
90-
}
91-
assert.strictEqual(batches.length, 0);
85+
await assert.rejects(async () => {
86+
// eslint-disable-next-line no-unused-vars
87+
for await (const _ of consumer) {
88+
assert.fail('Should not reach here');
89+
}
90+
}, { name: 'AbortError' });
9291
}
9392

9493
// =============================================================================

test/parallel/test-stream-iter-share-async.js

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,53 @@ async function testShareCancelWithReason() {
134134

135135
async function testShareAbortSignal() {
136136
const ac = new AbortController();
137-
const shared = share(from('data'), { signal: ac.signal });
138-
const consumer = shared.pull();
139-
137+
const enc = new TextEncoder();
138+
async function* source() {
139+
yield [enc.encode('a')];
140+
yield [enc.encode('b')];
141+
}
142+
const shared = share(source(), {
143+
highWaterMark: 1,
144+
backpressure: 'block',
145+
signal: ac.signal,
146+
});
147+
const fast = shared.pull()[Symbol.asyncIterator]();
148+
shared.pull();
149+
150+
await fast.next();
151+
const read = fast.next();
152+
const rejected = assert.rejects(read, { name: 'AbortError' });
140153
ac.abort();
141154

142-
const batches = [];
143-
for await (const batch of consumer) {
144-
batches.push(batch);
155+
await rejected;
156+
}
157+
158+
async function testShareAbortSignalWhileSourcePullPending() {
159+
const ac = new AbortController();
160+
let resume;
161+
let sourceStarted;
162+
const sourceStartedPromise = new Promise((resolve) => {
163+
sourceStarted = resolve;
164+
});
165+
async function* source() {
166+
await new Promise((resolve) => {
167+
resume = resolve;
168+
sourceStarted();
169+
});
145170
}
146-
assert.strictEqual(batches.length, 0);
171+
const shared = share(source(), { signal: ac.signal });
172+
const iter1 = shared.pull()[Symbol.asyncIterator]();
173+
const iter2 = shared.pull()[Symbol.asyncIterator]();
174+
const read1 = iter1.next();
175+
const read2 = iter2.next();
176+
const rejected1 = assert.rejects(read1, { name: 'AbortError' });
177+
const rejected2 = assert.rejects(read2, { name: 'AbortError' });
178+
179+
await sourceStartedPromise;
180+
ac.abort();
181+
resume();
182+
183+
await Promise.all([rejected1, rejected2]);
147184
}
148185

149186
async function testShareAlreadyAborted() {
@@ -153,11 +190,12 @@ async function testShareAlreadyAborted() {
153190
const shared = share(from('data'), { signal: ac.signal });
154191
const consumer = shared.pull();
155192

156-
const batches = [];
157-
for await (const batch of consumer) {
158-
batches.push(batch);
159-
}
160-
assert.strictEqual(batches.length, 0);
193+
await assert.rejects(async () => {
194+
// eslint-disable-next-line no-unused-vars
195+
for await (const _ of consumer) {
196+
assert.fail('Should not reach here');
197+
}
198+
}, { name: 'AbortError' });
161199
}
162200

163201
// =============================================================================
@@ -273,6 +311,7 @@ Promise.all([
273311
testShareCancelMidIteration(),
274312
testShareCancelWithReason(),
275313
testShareAbortSignal(),
314+
testShareAbortSignalWhileSourcePullPending(),
276315
testShareAlreadyAborted(),
277316
testShareSourceError(),
278317
testShareLateJoiningConsumer(),

0 commit comments

Comments
 (0)