Skip to content

Commit a0263fe

Browse files
authored
stream: observe abort while awaiting pipeTo source
Use the abort-aware iterator wrapper in the no-transform pipeTo() path so a pending source read does not block AbortSignal handling. Fixes: #64014 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #64015 Fixes: #64014 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Filip Skokan <panva.ip@gmail.com>
1 parent 0dbb5df commit a0263fe

2 files changed

Lines changed: 35 additions & 1 deletion

File tree

lib/internal/streams/iter/pull.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,7 @@ async function pipeTo(source, ...args) {
10901090
} else if (transforms.length === 0) {
10911091
// Fast path: no transforms - iterate normalized source directly
10921092
if (signal) {
1093-
for await (const batch of normalized) {
1093+
for await (const batch of yieldAbortable(normalized, signal)) {
10941094
signal.throwIfAborted();
10951095
const p = writeBatch(batch);
10961096
if (p) await p;

test/parallel/test-stream-iter-pipeto-signal.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
const common = require('../common');
88
const assert = require('assert');
9+
const { setTimeout } = require('timers/promises');
910
const { pipeTo, from } = require('stream/iter');
1011

1112
// pipeTo with live signal, no transforms — abort mid-stream
@@ -30,6 +31,38 @@ async function testPipeToLiveSignalNoTransforms() {
3031
assert.ok(written.length >= 1);
3132
}
3233

34+
// pipeTo with live signal, no transforms — abort while waiting for next chunk
35+
async function testPipeToLiveSignalNoTransformsPendingNext() {
36+
const ac = new AbortController();
37+
const reason = new Error('abort reason');
38+
const writer = {
39+
write: common.mustNotCall(),
40+
};
41+
const source = {
42+
[Symbol.asyncIterator]() {
43+
return {
44+
next() {
45+
return new Promise(() => {});
46+
},
47+
};
48+
},
49+
};
50+
51+
setTimeout(10)
52+
.then(() => ac.abort(reason))
53+
.then(common.mustCall());
54+
55+
const result = await Promise.race([
56+
assert.rejects(
57+
() => pipeTo(source, writer, { signal: ac.signal }),
58+
reason,
59+
).then(() => 'aborted'),
60+
setTimeout(1000, 'timed out'),
61+
]);
62+
63+
assert.strictEqual(result, 'aborted');
64+
}
65+
3366
// pipeTo with live signal + transforms — abort mid-stream
3467
async function testPipeToLiveSignalWithTransforms() {
3568
const ac = new AbortController();
@@ -84,6 +117,7 @@ async function testPipeToLiveSignalWithTransformsCompletes() {
84117

85118
Promise.all([
86119
testPipeToLiveSignalNoTransforms(),
120+
testPipeToLiveSignalNoTransformsPendingNext(),
87121
testPipeToLiveSignalWithTransforms(),
88122
testPipeToLiveSignalCompletes(),
89123
testPipeToLiveSignalWithTransformsCompletes(),

0 commit comments

Comments
 (0)