Skip to content

Commit 4b053cb

Browse files
committed
stream: fix merge() idle source draining
Defer pulling a merged source again until the consumer requests the next merged item. This prevents fast sources from being drained while the merged iterator is idle. Fixes: #63566 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent 2adaeee commit 4b053cb

2 files changed

Lines changed: 129 additions & 47 deletions

File tree

lib/internal/streams/iter/consumers.js

Lines changed: 103 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const {
1717
ArrayPrototypeSlice,
1818
Promise,
1919
PromisePrototypeThen,
20+
PromiseResolve,
2021
SafePromiseAllReturnVoid,
2122
SymbolAsyncIterator,
2223
TypedArrayPrototypeGetBuffer,
@@ -412,19 +413,22 @@ function merge(...args) {
412413

413414
return {
414415
__proto__: null,
415-
async *[SymbolAsyncIterator]() {
416+
[SymbolAsyncIterator]() {
416417
const signal = options?.signal;
417418

418419
signal?.throwIfAborted();
419420

420-
if (normalized.length === 0) return;
421+
if (normalized.length === 0) {
422+
return (async function*() {})();
423+
}
421424

422425
if (normalized.length === 1) {
423-
for await (const batch of normalized[0]) {
424-
signal?.throwIfAborted();
425-
yield batch;
426-
}
427-
return;
426+
return (async function*() {
427+
for await (const batch of normalized[0]) {
428+
signal?.throwIfAborted();
429+
yield batch;
430+
}
431+
})();
428432
}
429433

430434
// Multiple sources - use a ready queue so that batches that settle
@@ -434,74 +438,65 @@ function merge(...args) {
434438
const ready = [];
435439
let activeCount = normalized.length;
436440
let waitResolve = null;
441+
let lastIterator = null;
442+
let started = false;
443+
let closed = false;
437444

438445
// Called when a source's .next() settles. Pushes the result into
439446
// the ready queue and wakes the consumer if it's waiting.
440447
const onSettled = (iterator, result) => {
448+
if (closed) return;
441449
if (result.done) {
442450
activeCount--;
443451
} else {
444-
ArrayPrototypePush(ready, result.value);
445-
// Immediately request the next value from this source
446-
// (at most one pending .next() per source)
447-
PromisePrototypeThen(
448-
iterator.next(),
449-
(r) => onSettled(iterator, r),
450-
(err) => {
451-
ArrayPrototypePush(ready, { __proto__: null, error: err });
452-
if (waitResolve) {
453-
waitResolve();
454-
waitResolve = null;
455-
}
456-
},
457-
);
452+
ArrayPrototypePush(ready, {
453+
__proto__: null,
454+
iterator,
455+
value: result.value,
456+
});
458457
}
459458
if (waitResolve) {
460459
waitResolve();
461460
waitResolve = null;
462461
}
463462
};
464463

465-
// Start one .next() per source
466-
const iterators = [];
467-
for (let i = 0; i < normalized.length; i++) {
468-
const iterator = normalized[i][SymbolAsyncIterator]();
469-
ArrayPrototypePush(iterators, iterator);
464+
const requestNext = (iterator) => {
470465
PromisePrototypeThen(
471466
iterator.next(),
472467
(r) => onSettled(iterator, r),
473468
(err) => {
469+
if (closed) return;
474470
ArrayPrototypePush(ready, { __proto__: null, error: err });
475471
if (waitResolve) {
476472
waitResolve();
477473
waitResolve = null;
478474
}
479475
},
480476
);
481-
}
477+
};
482478

483-
try {
484-
while (activeCount > 0 || ready.length > 0) {
485-
signal?.throwIfAborted();
479+
const iterators = [];
486480

487-
// Drain ready queue synchronously
488-
while (ready.length > 0) {
489-
const item = ArrayPrototypeShift(ready);
490-
if (item?.error) {
491-
throw item.error;
492-
}
493-
yield item;
494-
}
481+
const start = () => {
482+
if (started) return;
483+
started = true;
484+
for (let i = 0; i < normalized.length; i++) {
485+
const iterator = normalized[i][SymbolAsyncIterator]();
486+
ArrayPrototypePush(iterators, iterator);
487+
requestNext(iterator);
488+
}
489+
};
495490

496-
// If sources are still active, wait for the next settlement
497-
if (activeCount > 0) {
498-
await new Promise((resolve) => {
499-
waitResolve = resolve;
500-
});
501-
}
491+
const cleanup = async () => {
492+
if (closed) {
493+
return { __proto__: null, done: true, value: undefined };
494+
}
495+
closed = true;
496+
if (waitResolve) {
497+
waitResolve();
498+
waitResolve = null;
502499
}
503-
} finally {
504-
// Clean up: return all iterators
505500
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
506501
if (iterator.return) {
507502
try {
@@ -511,7 +506,68 @@ function merge(...args) {
511506
}
512507
}
513508
});
514-
}
509+
return { __proto__: null, done: true, value: undefined };
510+
};
511+
512+
const nextImpl = async () => {
513+
try {
514+
if (closed) {
515+
return cleanup();
516+
}
517+
518+
signal?.throwIfAborted();
519+
520+
start();
521+
522+
if (lastIterator !== null) {
523+
requestNext(lastIterator);
524+
lastIterator = null;
525+
}
526+
527+
while (activeCount > 0 || ready.length > 0) {
528+
signal?.throwIfAborted();
529+
530+
if (ready.length > 0) {
531+
const item = ArrayPrototypeShift(ready);
532+
if (item?.error) {
533+
await cleanup();
534+
throw item.error;
535+
}
536+
lastIterator = item.iterator;
537+
return { __proto__: null, done: false, value: item.value };
538+
}
539+
540+
await new Promise((resolve) => { waitResolve = resolve; });
541+
}
542+
543+
return cleanup();
544+
} catch (err) {
545+
await cleanup();
546+
throw err;
547+
}
548+
};
549+
550+
let nextQueue = PromiseResolve();
551+
const enqueue = (fn) => {
552+
const result = PromisePrototypeThen(nextQueue, fn, fn);
553+
nextQueue = PromisePrototypeThen(result, () => {}, () => {});
554+
return result;
555+
};
556+
557+
return {
558+
__proto__: null,
559+
[SymbolAsyncIterator]() {
560+
return this;
561+
},
562+
563+
next() {
564+
return enqueue(nextImpl);
565+
},
566+
567+
return() {
568+
return enqueue(cleanup);
569+
},
570+
};
515571
},
516572
};
517573
}

test/parallel/test-stream-iter-consumers-merge.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,31 @@ async function testMergeConsumerBreak() {
131131
assert.strictEqual(source1Return && source2Return, true);
132132
}
133133

134+
async function testMergeDoesNotDrainIdleSources() {
135+
function source(n) {
136+
return {
137+
pulls: 0,
138+
async *[Symbol.asyncIterator]() {
139+
while (this.pulls < n) {
140+
yield [Buffer.from(`${++this.pulls}`)];
141+
}
142+
},
143+
};
144+
}
145+
146+
const source1 = source(5);
147+
const source2 = source(5);
148+
const iterator = merge(source1, source2)[Symbol.asyncIterator]();
149+
150+
await iterator.next();
151+
await new Promise((resolve) => setTimeout(resolve, 20));
152+
153+
assert.ok(source1.pulls <= 1);
154+
assert.ok(source2.pulls <= 1);
155+
156+
await iterator.return?.();
157+
}
158+
134159
async function testMergeSignalMidIteration() {
135160
const ac = new AbortController();
136161
async function* slowSource() {
@@ -190,6 +215,7 @@ Promise.all([
190215
testMergeSyncSources(),
191216
testMergeSourceError(),
192217
testMergeConsumerBreak(),
218+
testMergeDoesNotDrainIdleSources(),
193219
testMergeSignalMidIteration(),
194220
testMergeStringSources(),
195221
testMergeObjectLikeSources(),

0 commit comments

Comments
 (0)