Skip to content

Commit 760931c

Browse files
committed
stream: refine the stream/iter backpressure
In `Writer`, there are two queues that matter: the slot queue and the pending queue. The sync `writeSync`/`writevSync` only use the slot queue. If writes cannot be accepted directly into slots, they return `false`. The sync methods *never* enqueue into the pending queue. The async `write`/`writev` will first attempt to add to the slot queue; if it is full, then it will attempt to add to the pending queue; if that is also full, the backpressure policy kicks in. Signed-off-by: James M Snell <jasnell@gmail.com>
1 parent faa413e commit 760931c

5 files changed

Lines changed: 18 additions & 147 deletions

File tree

lib/internal/streams/iter/classic.js

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ const {
5757
const {
5858
toAsyncStreamable: kToAsyncStreamable,
5959
kValidatedSource,
60-
kSyncWriteAccepted,
6160
drainableProtocol,
6261
} = require('internal/streams/iter/types');
6362

@@ -765,41 +764,11 @@ function toWritable(writer) {
765764
const hasEndSync = hasEnd &&
766765
typeof writer.endSync === 'function';
767766
const hasFail = typeof writer.fail === 'function';
768-
const hasSyncWriteAccepted =
769-
typeof writer[kSyncWriteAccepted] === 'function';
770-
771-
function syncWriteAccepted() {
772-
return hasSyncWriteAccepted && writer[kSyncWriteAccepted]();
773-
}
774-
775-
function finishAfterSyncBackpressure(cb) {
776-
let ondrain;
777-
try {
778-
if (typeof writer[drainableProtocol] === 'function') {
779-
ondrain = writer[drainableProtocol]();
780-
}
781-
} catch (err) {
782-
cb(err);
783-
return;
784-
}
785-
if (ondrain !== null && ondrain !== undefined) {
786-
PromisePrototypeThen(ondrain, (drained) => {
787-
if (drained === false) {
788-
cb(new ERR_INVALID_STATE.TypeError('Stream closed by consumer'));
789-
return;
790-
}
791-
cb();
792-
}, cb);
793-
return;
794-
}
795-
queueMicrotask(cb);
796-
}
797-
798767
// Try-sync-first pattern: attempt the synchronous method and fall back to the
799-
// async method if it returns false without accepting the data, or if it
800-
// throws. When the sync path succeeds, the callback is deferred via
801-
// queueMicrotask to preserve the async resolution contract that Writable
802-
// internals expect from _write/_writev/_final callbacks.
768+
// async method if it returns false (data not accepted synchronously).
769+
// When the sync path succeeds, the callback is deferred via queueMicrotask
770+
// to preserve the async resolution contract that Writable internals expect
771+
// from _write/_writev/_final callbacks.
803772

804773
function _write(chunk, encoding, cb) {
805774
const bytes = typeof chunk === 'string' ?
@@ -810,11 +779,6 @@ function toWritable(writer) {
810779
queueMicrotask(cb);
811780
return;
812781
}
813-
if (syncWriteAccepted()) {
814-
// The chunk was accepted; false only signaled backpressure.
815-
finishAfterSyncBackpressure(cb);
816-
return;
817-
}
818782
} catch {
819783
// Sync path threw -- fall through to async.
820784
}
@@ -839,11 +803,6 @@ function toWritable(writer) {
839803
queueMicrotask(cb);
840804
return;
841805
}
842-
if (syncWriteAccepted()) {
843-
// The chunks were accepted; false only signaled backpressure.
844-
finishAfterSyncBackpressure(cb);
845-
return;
846-
}
847806
} catch {
848807
// Sync path threw -- fall through to async.
849808
}

lib/internal/streams/iter/pull.js

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ const {
5656
} = require('internal/streams/iter/utils');
5757

5858
const {
59-
drainableProtocol,
60-
kSyncWriteAcceptedOnFalse,
6159
kValidatedSource,
6260
kValidatedTransform,
6361
toAsyncStreamable,
@@ -872,21 +870,6 @@ async function pipeTo(source, ...args) {
872870
const hasWritev = typeof writer.writev === 'function';
873871
const hasWritevSync = typeof writer.writevSync === 'function';
874872
const hasEndSync = typeof writer.endSync === 'function';
875-
const syncFalseCanBeAccepted = writer[kSyncWriteAcceptedOnFalse] === true;
876-
877-
function syncFalseWasAccepted() {
878-
return syncFalseCanBeAccepted && writer.desiredSize === 0;
879-
}
880-
881-
function waitForSyncBackpressure() {
882-
const ondrain = writer[drainableProtocol];
883-
return ondrain?.call(writer);
884-
}
885-
886-
async function writeBatchAfterAcceptedBackpressure(batch, startIndex) {
887-
await waitForSyncBackpressure();
888-
await writeBatchAsyncFallback(batch, startIndex);
889-
}
890873

891874
// Async fallback for writeBatch when sync write fails partway through.
892875
// Continues writing from batch[startIndex] using async write().
@@ -895,10 +878,6 @@ async function pipeTo(source, ...args) {
895878
const chunk = batch[i];
896879
if (hasWriteSync && writer.writeSync(chunk)) {
897880
// Sync retry succeeded
898-
} else if (syncFalseWasAccepted()) {
899-
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
900-
await waitForSyncBackpressure();
901-
continue;
902881
} else {
903882
const result = writer.write(
904883
chunk, signal ? { __proto__: null, signal } : undefined);
@@ -916,12 +895,6 @@ async function pipeTo(source, ...args) {
916895
function writeBatch(batch) {
917896
if (hasWritev && batch.length > 1) {
918897
if (!hasWritevSync || !writer.writevSync(batch)) {
919-
if (hasWritevSync && syncFalseWasAccepted()) {
920-
for (let i = 0; i < batch.length; i++) {
921-
totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
922-
}
923-
return waitForSyncBackpressure();
924-
}
925898
const opts = signal ? { __proto__: null, signal } : undefined;
926899
const result = writer.writev(batch, opts);
927900
if (result === undefined) {
@@ -944,12 +917,7 @@ async function pipeTo(source, ...args) {
944917
for (let i = 0; i < batch.length; i++) {
945918
const chunk = batch[i];
946919
if (!hasWriteSync || !writer.writeSync(chunk)) {
947-
if (hasWriteSync && syncFalseWasAccepted()) {
948-
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
949-
return writeBatchAfterAcceptedBackpressure(batch, i + 1);
950-
}
951920
// Sync path failed at index i - fall back to async for the rest.
952-
// Count bytes for chunks already written synchronously (0..i-1).
953921
return writeBatchAsyncFallback(batch, i);
954922
}
955923
totalBytes += TypedArrayPrototypeGetByteLength(chunk);

lib/internal/streams/iter/push.js

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ const {
3333

3434
const {
3535
drainableProtocol,
36-
kSyncWriteAccepted,
37-
kSyncWriteAcceptedOnFalse,
3836
} = require('internal/streams/iter/types');
3937

4038
const {
@@ -369,19 +367,6 @@ class PushQueue {
369367
this.#pendingEnd = pending;
370368
}
371369

372-
/**
373-
* Force-enqueue chunks into the slots buffer, bypassing capacity checks.
374-
* Used by PushWriter.writeSync() for 'block' policy where the data is
375-
* accepted but false is returned as a backpressure signal.
376-
*/
377-
forceEnqueue(chunks) {
378-
this.#slots.push(chunks);
379-
for (let i = 0; i < chunks.length; i++) {
380-
this.#bytesWritten += TypedArrayPrototypeGetByteLength(chunks[i]);
381-
}
382-
this.#resolvePendingReads();
383-
}
384-
385370
/**
386371
* Wait for backpressure to clear (desiredSize > 0).
387372
* @returns {Promise<void>}
@@ -563,16 +548,11 @@ class PushQueue {
563548

564549
class PushWriter {
565550
#queue;
566-
#syncWriteAccepted = false;
567551

568552
constructor(queue) {
569553
this.#queue = queue;
570554
}
571555

572-
[kSyncWriteAccepted]() {
573-
return this.#syncWriteAccepted;
574-
}
575-
576556
[drainableProtocol]() {
577557
const desired = this.desiredSize;
578558
if (desired === null) return null;
@@ -584,10 +564,6 @@ class PushWriter {
584564
return this.#queue.desiredSize;
585565
}
586566

587-
get [kSyncWriteAcceptedOnFalse]() {
588-
return this.#queue.backpressurePolicy === 'block';
589-
}
590-
591567
write(chunk, options) {
592568
if (!options?.signal && this.#queue.canWriteSync()) {
593569
const bytes = toUint8Array(chunk);
@@ -612,36 +588,16 @@ class PushWriter {
612588
}
613589

614590
writeSync(chunk) {
615-
this.#syncWriteAccepted = false;
616591
const bytes = toUint8Array(chunk);
617-
const result = this.#queue.writeSync([bytes]);
618-
if (!result && this.#queue.backpressurePolicy === 'block' &&
619-
this.#queue.desiredSize === 0) {
620-
// Block policy: force-enqueue and return false as backpressure signal.
621-
// Data IS accepted; false tells caller to slow down.
622-
this.#queue.forceEnqueue([bytes]);
623-
this.#syncWriteAccepted = true;
624-
return false;
625-
}
626-
this.#syncWriteAccepted = result;
627-
return result;
592+
return this.#queue.writeSync([bytes]);
628593
}
629594

630595
writevSync(chunks) {
631-
this.#syncWriteAccepted = false;
632596
if (!ArrayIsArray(chunks)) {
633597
throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks);
634598
}
635599
const bytes = convertChunks(chunks);
636-
const result = this.#queue.writeSync(bytes);
637-
if (!result && this.#queue.backpressurePolicy === 'block' &&
638-
this.#queue.desiredSize === 0) {
639-
this.#queue.forceEnqueue(bytes);
640-
this.#syncWriteAccepted = true;
641-
return false;
642-
}
643-
this.#syncWriteAccepted = result;
644-
return result;
600+
return this.#queue.writeSync(bytes);
645601
}
646602

647603
end(options) {

lib/internal/streams/iter/types.js

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,25 +64,9 @@ const kValidatedTransform = Symbol('kValidatedTransform');
6464
*/
6565
const kValidatedSource = Symbol('kValidatedSource');
6666

67-
/**
68-
* Internal sentinel for writers whose sync write methods can return false
69-
* after accepting data as a backpressure signal.
70-
*/
71-
const kSyncWriteAccepted = Symbol('kSyncWriteAccepted');
72-
73-
/**
74-
* Internal sentinel for writers whose sync write methods may return false
75-
* after accepting data when backpressure is applied. Such writers must expose
76-
* desiredSize so callers can distinguish accepted backpressure from a sync
77-
* write that was not performed.
78-
*/
79-
const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse');
80-
8167
module.exports = {
8268
broadcastProtocol,
8369
drainableProtocol,
84-
kSyncWriteAccepted,
85-
kSyncWriteAcceptedOnFalse,
8670
kValidatedSource,
8771
kValidatedTransform,
8872
shareProtocol,

test/parallel/test-stream-iter-push-backpressure.js

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,21 +95,25 @@ async function testBlockBackpressure() {
9595
await writePromise;
9696
}
9797

98-
async function testBlockWriteSyncEnqueues() {
99-
// With block policy, writeSync should enqueue the data even when the buffer
100-
// is full, returning false as a backpressure signal. The data IS accepted.
98+
async function testBlockWriteSyncDoesNotEnqueue() {
99+
// With block policy, writeSync returns false when the buffer is full.
100+
// The data is NOT accepted — writeSync only operates on the slots buffer.
101+
// The caller should fall back to write() which uses the pending queue.
101102
const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' });
102103

103104
// Fill the buffer
104105
assert.strictEqual(writer.writeSync('a'), true);
105106

106-
// Buffer full: writeSync should enqueue and return false (data accepted)
107+
// Buffer full: writeSync returns false, data NOT enqueued
107108
assert.strictEqual(writer.writeSync('b'), false);
108109

109-
writer.endSync();
110+
// Use the async write (try-fallback pattern) — this goes to pending queue
111+
const consuming = text(readable);
112+
await writer.write('b');
113+
await writer.end();
110114

111-
// Both chunks should be delivered (drain flushes all slots into one batch)
112-
const result = await text(readable);
115+
// Both chunks should be delivered
116+
const result = await consuming;
113117
assert.strictEqual(result, 'ab');
114118
}
115119

@@ -151,6 +155,6 @@ Promise.all([
151155
testDropOldest(),
152156
testDropNewest(),
153157
testBlockBackpressure(),
154-
testBlockWriteSyncEnqueues(),
158+
testBlockWriteSyncDoesNotEnqueue(),
155159
testStrictPendingQueueOverflow(),
156160
]).then(common.mustCall());

0 commit comments

Comments
 (0)