Skip to content

Commit bea41b1

Browse files
committed
stream: fixup error handling in stream/iter adapters
Signed-off-by: James M Snell <jasnell@gmail.com>
1 parent 3afcc51 commit bea41b1

3 files changed

Lines changed: 51 additions & 21 deletions

File tree

lib/internal/streams/iter/classic.js

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -779,8 +779,10 @@ function toWritable(writer) {
779779
queueMicrotask(cb);
780780
return;
781781
}
782-
} catch {
783-
// Sync path threw -- fall through to async.
782+
// WriteSync returned false: not accepted, fall through to async.
783+
} catch (err) {
784+
cb(err);
785+
return;
784786
}
785787
}
786788
try {
@@ -803,8 +805,10 @@ function toWritable(writer) {
803805
queueMicrotask(cb);
804806
return;
805807
}
806-
} catch {
807-
// Sync path threw -- fall through to async.
808+
// WritevSync returned false: not accepted, fall through to async.
809+
} catch (err) {
810+
cb(err);
811+
return;
808812
}
809813
}
810814
try {
@@ -826,8 +830,10 @@ function toWritable(writer) {
826830
queueMicrotask(cb);
827831
return;
828832
}
829-
} catch {
830-
// Sync path threw -- fall through to async.
833+
// Result < 0: can't end synchronously, fall through to async.
834+
} catch (err) {
835+
cb(err);
836+
return;
831837
}
832838
}
833839
try {

lib/internal/streams/iter/consumers.js

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ function merge(...args) {
480480
);
481481
}
482482

483+
let primaryError;
483484
try {
484485
while (activeCount > 0 || ready.length > 0) {
485486
signal?.throwIfAborted();
@@ -500,17 +501,42 @@ function merge(...args) {
500501
});
501502
}
502503
}
504+
} catch (err) {
505+
primaryError = err;
503506
} finally {
504-
// Clean up: return all iterators
507+
// Clean up: return all iterators. Cleanup errors are not
508+
// swallowed - a broken iterator.return() (e.g., failing to
509+
// release a resource) should be visible to the caller.
510+
let cleanupError;
505511
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
506512
if (iterator.return) {
507513
try {
508514
await iterator.return();
509-
} catch {
510-
// Ignore return errors
515+
} catch (err) {
516+
// Keep the first cleanup error encountered.
517+
cleanupError ??= err;
511518
}
512519
}
513520
});
521+
// Throws in finally are intentional: in an async generator,
522+
// cleanup must run on consumer break (generator return), and
523+
// code after the try/finally block would not execute.
524+
if (cleanupError !== undefined) {
525+
if (primaryError !== undefined) {
526+
// Both a primary error and a cleanup error occurred.
527+
// Wrap in SuppressedError so neither is lost:
528+
// .error = primaryError, .suppressed = cleanupError.
529+
// eslint-disable-next-line no-restricted-syntax, no-unsafe-finally
530+
throw new SuppressedError(cleanupError, primaryError);
531+
}
532+
// No primary error - the cleanup error is the only error.
533+
// eslint-disable-next-line no-unsafe-finally
534+
throw cleanupError;
535+
}
536+
if (primaryError !== undefined) {
537+
// eslint-disable-next-line no-unsafe-finally
538+
throw primaryError;
539+
}
514540
}
515541
},
516542
};

test/parallel/test-stream-iter-writable-from.js

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -537,18 +537,15 @@ function testHighWaterMarkIsMaxSafeInt() {
537537
}
538538

539539
// =============================================================================
540-
// writeSync throws -- falls back to async
540+
// writeSync throws -- error propagates, does NOT fall back to async
541541
// =============================================================================
542542

543-
async function testWriteSyncThrowsFallback() {
544-
let asyncCalled = false;
545-
543+
async function testWriteSyncThrowsPropagation() {
546544
const writer = {
547545
writeSync() {
548546
throw new Error('sync broken');
549547
},
550-
write(chunk) {
551-
asyncCalled = true;
548+
write() {
552549
return Promise.resolve();
553550
},
554551
end() { return Promise.resolve(0); },
@@ -557,11 +554,12 @@ async function testWriteSyncThrowsFallback() {
557554

558555
const writable = toWritable(writer);
559556

560-
await new Promise((resolve) => {
561-
writable.write('test', resolve);
562-
});
563-
564-
assert.ok(asyncCalled, 'async write should be called as fallback');
557+
await assert.rejects(new Promise((resolve, reject) => {
558+
writable.write('test', (err) => {
559+
if (err) reject(err);
560+
else resolve();
561+
});
562+
}), { message: 'sync broken' });
565563
}
566564

567565
// =============================================================================
@@ -625,7 +623,7 @@ Promise.all([
625623
testWritevDelegation(),
626624
testWriteSyncFirst(),
627625
testWriteSyncFallback(),
628-
testWriteSyncThrowsFallback(),
626+
testWriteSyncThrowsPropagation(),
629627
testEndSyncFirst(),
630628
testEndSyncFallback(),
631629
testFinalDelegatesToEnd(),

0 commit comments

Comments
 (0)