Skip to content

Commit ec72574

Browse files
committed
buffer: fix Blob.stream() leaking source buffer
Blob.prototype.stream() registered a wakeup callback on the underlying source's start() and never released it. The strong Reader::wakeup_ handle kept the reader -- and through it the blob's DataQueue and backing store -- reachable as a GC root, so the source buffer leaked on every stream() call. On Node 26+, streaming a 1 MiB blob 300 times retained ~300 MiB in process.memoryUsage().arrayBuffers while the V8 heap stayed small. Register the wakeup lazily in pull() and clear it on every terminal or idle path (EOS, error, cancel, backpressure), mirroring the cleanup already done by the async iterator path. The strong handle now only lives while a pull is in flight, so the reader and its backing store become collectable once the stream finishes, errors, is cancelled, or goes idle under backpressure. Fixes: #63574 Signed-off-by: semimikoh <ejffjeosms@gmail.com>
1 parent 73c592b commit ec72574

2 files changed

Lines changed: 73 additions & 4 deletions

File tree

lib/internal/blob.js

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -453,30 +453,40 @@ function createBlobReaderStream(reader) {
453453
// There really should only be one read at a time so using an
454454
// array here is purely defensive.
455455
this.pendingPulls = [];
456-
// Register a wakeup callback that the C++ side can invoke
456+
// Lazily register a wakeup callback that the C++ side can invoke
457457
// when new data is available after a STATUS_BLOCK.
458-
reader.setWakeup(() => {
458+
this.wakeup = () => {
459459
if (this.pendingPulls.length > 0) {
460460
this.readNext(c);
461461
}
462-
});
462+
};
463463
},
464464
pull(c) {
465465
const { promise, resolve, reject } = PromiseWithResolvers();
466+
if (this.pendingPulls.length === 0) {
467+
reader.setWakeup(this.wakeup);
468+
}
466469
this.pendingPulls.push({ resolve, reject });
467470
this.readNext(c);
468471
return promise;
469472
},
473+
clearWakeupIfIdle() {
474+
if (this.pendingPulls.length === 0) {
475+
reader.setWakeup(undefined);
476+
}
477+
},
470478
readNext(c) {
471479
reader.pull((status, buffer) => {
472480
// If pendingPulls is empty here, the stream had to have
473481
// been canceled, and we don't really care about the result.
474482
// We can simply exit.
475483
if (this.pendingPulls.length === 0) {
484+
reader.setWakeup(undefined);
476485
return;
477486
}
478487
if (status === 0) {
479488
// EOS
489+
reader.setWakeup(undefined);
480490
c.close();
481491
// This is to signal the end for byob readers
482492
// see https://streams.spec.whatwg.org/#example-rbs-pull
@@ -488,6 +498,7 @@ function createBlobReaderStream(reader) {
488498
// The read could fail for many different reasons when reading
489499
// from a non-memory resident blob part (e.g. file-backed blob).
490500
// The error details the system error code.
501+
reader.setWakeup(undefined);
491502
const error =
492503
lazyDOMException('The blob could not be read',
493504
'NotReadableError');
@@ -497,7 +508,7 @@ function createBlobReaderStream(reader) {
497508
return;
498509
} else if (status === 2) {
499510
// STATUS_BLOCK: No data available yet. The wakeup callback
500-
// registered in start() will re-invoke readNext when data
511+
// registered in pull() will re-invoke readNext when data
501512
// arrives.
502513
return;
503514
}
@@ -517,6 +528,7 @@ function createBlobReaderStream(reader) {
517528
if (this.pendingPulls.length !== 0) {
518529
const pending = this.pendingPulls.shift();
519530
pending.resolve();
531+
this.clearWakeupIfIdle();
520532
}
521533
return;
522534
}
@@ -525,6 +537,7 @@ function createBlobReaderStream(reader) {
525537
});
526538
},
527539
cancel(reason) {
540+
reader.setWakeup(undefined);
528541
// Reject any currently pending pulls here.
529542
for (const pending of this.pendingPulls) {
530543
pending.reject(reason);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Flags: --expose-gc --no-concurrent-array-buffer-sweeping
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { setImmediate: setImmediatePromise } = require('timers/promises');
7+
8+
const MiB = 1024 * 1024;
9+
const iterations = 64;
10+
const maxRetained = 16 * MiB;
11+
12+
async function collectArrayBuffers() {
13+
for (let i = 0; i < 3; i++) {
14+
global.gc();
15+
await setImmediatePromise();
16+
}
17+
}
18+
19+
async function assertNoBlobStreamRetention(name, fn) {
20+
const buffer = Buffer.alloc(MiB);
21+
22+
await collectArrayBuffers();
23+
const before = process.memoryUsage().arrayBuffers;
24+
25+
for (let i = 0; i < iterations; i++) {
26+
await fn(buffer);
27+
}
28+
29+
await collectArrayBuffers();
30+
const retained = process.memoryUsage().arrayBuffers - before;
31+
32+
assert(
33+
retained < maxRetained,
34+
`${name} retained ${retained} bytes in arrayBuffers`,
35+
);
36+
}
37+
38+
(async () => {
39+
await assertNoBlobStreamRetention('unused Blob streams',
40+
common.mustCall(async (buffer) => {
41+
new Blob([buffer]).stream();
42+
}, iterations));
43+
44+
await assertNoBlobStreamRetention('cancelled Blob streams',
45+
common.mustCall(async (buffer) => {
46+
await new Blob([buffer]).stream()
47+
.cancel();
48+
}, iterations));
49+
50+
await assertNoBlobStreamRetention('drained Blob streams',
51+
common.mustCall(async (buffer) => {
52+
await new Response(
53+
new Blob([buffer]).stream(),
54+
).arrayBuffer();
55+
}, iterations));
56+
})().then(common.mustCall());

0 commit comments

Comments
 (0)