Skip to content

Commit eedd317

Browse files
committed
stream: narrow pipeTo guard to writer release check
The state.shuttingDown guard introduced in the previous commit is too broad: it skips writes even when the destination is still writable. The WHATWG Streams spec requires that already-read chunks are written to a still-writable destination during shutdown (step 15, shutdown substeps 3.1-3.2). Replace the guard with a check for writer[kState].stream === undefined which is only true after finalize() has released the writer. This precisely targets the crash condition without violating the spec requirement. Restore shuttingDown as a closure-local variable since it no longer needs to be shared with PipeToReadableStreamReadRequest. Add a test case verifying that an already-read chunk is written when an AbortSignal fires after enqueue while the destination is still writable. Signed-off-by: Qingyu Wang <wangqingyu.c0l1n@bytedance.com>
1 parent a546fba commit eedd317

2 files changed

Lines changed: 41 additions & 9 deletions

File tree

lib/internal/webstreams/readablestream.js

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,6 +1424,8 @@ function readableStreamPipeTo(
14241424

14251425
source[kState].disturbed = true;
14261426

1427+
let shuttingDown = false;
1428+
14271429
if (signal !== undefined) {
14281430
try {
14291431
validateAbortSignal(signal, 'options.signal');
@@ -1436,7 +1438,6 @@ function readableStreamPipeTo(
14361438

14371439
const state = {
14381440
currentWrite: PromiseResolve(),
1439-
shuttingDown: false,
14401441
};
14411442

14421443
// The error here can be undefined. The rejected arg
@@ -1461,8 +1462,8 @@ function readableStreamPipeTo(
14611462
}
14621463

14631464
function shutdownWithAnAction(action, rejected, originalError) {
1464-
if (state.shuttingDown) return;
1465-
state.shuttingDown = true;
1465+
if (shuttingDown) return;
1466+
shuttingDown = true;
14661467
if (dest[kState].state === 'writable' &&
14671468
!writableStreamCloseQueuedOrInFlight(dest)) {
14681469
PromisePrototypeThen(
@@ -1482,8 +1483,8 @@ function readableStreamPipeTo(
14821483
}
14831484

14841485
function shutdown(rejected, error) {
1485-
if (state.shuttingDown) return;
1486-
state.shuttingDown = true;
1486+
if (shuttingDown) return;
1487+
shuttingDown = true;
14871488
if (dest[kState].state === 'writable' &&
14881489
!writableStreamCloseQueuedOrInFlight(dest)) {
14891490
PromisePrototypeThen(
@@ -1545,11 +1546,11 @@ function readableStreamPipeTo(
15451546
}
15461547

15471548
async function step() {
1548-
if (state.shuttingDown) return true;
1549+
if (shuttingDown) return true;
15491550

15501551
if (dest[kState].backpressure) {
15511552
await writer[kState].ready.promise;
1552-
if (state.shuttingDown) return true;
1553+
if (shuttingDown) return true;
15531554
}
15541555

15551556
const controller = source[kState].controller;
@@ -1562,7 +1563,7 @@ function readableStreamPipeTo(
15621563
controller[kState].queue.length > 0) {
15631564

15641565
while (controller[kState].queue.length > 0) {
1565-
if (state.shuttingDown) return true;
1566+
if (shuttingDown) return true;
15661567

15671568
const chunk = dequeueValue(controller);
15681569

@@ -1677,7 +1678,7 @@ class PipeToReadableStreamReadRequest {
16771678
// synchronous write during enqueue(). See WHATWG Streams spec
16781679
// "ReadableStreamPipeTo" step 15's "chunk steps".
16791680
queueMicrotask(() => {
1680-
if (this.state.shuttingDown) {
1681+
if (this.writer[kState].stream === undefined) {
16811682
this.promise.resolve(false);
16821683
return;
16831684
}

test/parallel/test-webstreams-pipeto-writer-released-race.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,34 @@ const { ReadableStream, WritableStream } = require('stream/web');
3333
sourceController.enqueue('chunk');
3434
}));
3535
}
36+
37+
{
38+
const ac = new AbortController();
39+
let sourceController;
40+
const chunks = [];
41+
42+
const source = new ReadableStream({
43+
start(controller) {
44+
sourceController = controller;
45+
},
46+
}, { highWaterMark: 0 });
47+
48+
const dest = new WritableStream({
49+
write: common.mustCall((chunk) => {
50+
chunks.push(chunk);
51+
}),
52+
}, { highWaterMark: 1 });
53+
54+
source.pipeTo(dest, { signal: ac.signal }).then(
55+
common.mustNotCall('pipeTo should not resolve'),
56+
common.mustCall((err) => {
57+
assert.strictEqual(err.name, 'AbortError');
58+
assert.deepStrictEqual(chunks, ['chunk']);
59+
})
60+
);
61+
62+
setImmediate(common.mustCall(() => {
63+
sourceController.enqueue('chunk');
64+
ac.abort();
65+
}));
66+
}

0 commit comments

Comments
 (0)