Skip to content

Commit c1d1472

Browse files
committed
fs: handle early writeFile stream errors
Attach a temporary error listener to readable stream inputs before opening the destination file. This lets writeFile() reject with the stream error instead of allowing an early source error to become an uncaught exception. Remove the listener when the write finishes or when opening the destination fails. Signed-off-by: cookesan <6601329+cookesan@users.noreply.github.com>
1 parent 614050b commit c1d1472

3 files changed

Lines changed: 155 additions & 16 deletions

File tree

lib/internal/fs/promises.js

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@ const EventEmitter = require('events');
107107
const { StringDecoder } = require('string_decoder');
108108
const { kFSWatchStart, watch } = require('internal/fs/watchers');
109109
const nonNativeWatcher = require('internal/fs/recursive_watch');
110-
const { isIterable } = require('internal/streams/utils');
110+
const {
111+
isIterable,
112+
isReadableErrored,
113+
isReadableNodeStream,
114+
} = require('internal/streams/utils');
111115
const assert = require('internal/assert');
112116

113117
const permission = require('internal/process/permission');
@@ -1116,24 +1120,62 @@ function checkAborted(signal) {
11161120
throw new AbortError(undefined, { cause: signal.reason });
11171121
}
11181122

1119-
async function writeFileHandle(filehandle, data, signal, encoding) {
1120-
checkAborted(signal);
1123+
function makeWriteFileStreamErrorHandler(data) {
1124+
if (!isReadableNodeStream(data) ||
1125+
typeof data.removeListener !== 'function') {
1126+
return undefined;
1127+
}
1128+
1129+
let error;
1130+
let errored = false;
1131+
function onError(err) {
1132+
error = err;
1133+
errored = true;
1134+
}
1135+
const streamError = isReadableErrored(data);
1136+
if (streamError != null)
1137+
onError(streamError);
1138+
data.on('error', onError);
1139+
1140+
return {
1141+
__proto__: null,
1142+
check() {
1143+
if (errored)
1144+
throw error;
1145+
},
1146+
cleanup() {
1147+
data.removeListener('error', onError);
1148+
},
1149+
};
1150+
}
1151+
1152+
async function writeFileHandle(filehandle, data, signal, encoding, streamErrorHandler) {
11211153
if (isCustomIterable(data)) {
1122-
for await (const buf of data) {
1154+
streamErrorHandler ??= makeWriteFileStreamErrorHandler(data);
1155+
try {
11231156
checkAborted(signal);
1124-
const toWrite =
1125-
isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8');
1126-
let remaining = toWrite.byteLength;
1127-
while (remaining > 0) {
1128-
const writeSize = MathMin(kWriteFileMaxChunkSize, remaining);
1129-
const { bytesWritten } = await write(
1130-
filehandle, toWrite, toWrite.byteLength - remaining, writeSize);
1131-
remaining -= bytesWritten;
1157+
streamErrorHandler?.check();
1158+
for await (const buf of data) {
11321159
checkAborted(signal);
1160+
streamErrorHandler?.check();
1161+
const toWrite =
1162+
isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8');
1163+
let remaining = toWrite.byteLength;
1164+
while (remaining > 0) {
1165+
const writeSize = MathMin(kWriteFileMaxChunkSize, remaining);
1166+
const { bytesWritten } = await write(
1167+
filehandle, toWrite, toWrite.byteLength - remaining, writeSize);
1168+
remaining -= bytesWritten;
1169+
checkAborted(signal);
1170+
streamErrorHandler?.check();
1171+
}
11331172
}
1173+
} finally {
1174+
streamErrorHandler?.cleanup();
11341175
}
11351176
return;
11361177
}
1178+
checkAborted(signal);
11371179
data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
11381180
let remaining = data.byteLength;
11391181
if (remaining === 0) return;
@@ -1891,13 +1933,23 @@ async function writeFile(path, data, options) {
18911933
}
18921934

18931935
validateAbortSignal(options.signal);
1936+
checkAborted(options.signal);
1937+
const streamErrorHandler = makeWriteFileStreamErrorHandler(data);
1938+
18941939
if (path instanceof FileHandle)
1895-
return writeFileHandle(path, data, options.signal, options.encoding);
1940+
return writeFileHandle(
1941+
path, data, options.signal, options.encoding, streamErrorHandler);
18961942

1897-
checkAborted(options.signal);
1943+
let fd;
1944+
try {
1945+
fd = await open(path, flag, options.mode);
1946+
} catch (err) {
1947+
streamErrorHandler?.cleanup();
1948+
throw err;
1949+
}
18981950

1899-
const fd = await open(path, flag, options.mode);
1900-
let writeOp = writeFileHandle(fd, data, options.signal, options.encoding);
1951+
let writeOp = writeFileHandle(
1952+
fd, data, options.signal, options.encoding, streamErrorHandler);
19011953

19021954
if (flush) {
19031955
writeOp = handleFdSync(writeOp, fd);

test/parallel/test-fs-promises-file-handle-writeFile.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ async function doWriteAndCancel() {
4848

4949
const dest = path.resolve(tmpDir, 'tmp.txt');
5050
const otherDest = path.resolve(tmpDir, 'tmp-2.txt');
51+
const errorDest = path.resolve(tmpDir, 'tmp-error.txt');
5152
const stream = Readable.from(['a', 'b', 'c']);
5253
const stream2 = Readable.from(['ümlaut', ' ', 'sechzig']);
5354
const iterable = {
@@ -65,6 +66,15 @@ function iterableWith(value) {
6566
}
6667
};
6768
}
69+
70+
function createEarlyErrorStream(error) {
71+
const stream = new Readable({
72+
read() {}
73+
});
74+
process.nextTick(() => stream.destroy(error));
75+
return stream;
76+
}
77+
6878
const bufferIterable = {
6979
expected: 'abc',
7080
*[Symbol.iterator]() {
@@ -94,6 +104,25 @@ async function doWriteStream() {
94104
}
95105
}
96106

107+
async function doWriteStreamError() {
108+
const fileHandle = await open(errorDest, 'w+');
109+
const error = new Error('early file handle writeFile stream error');
110+
const stream = createEarlyErrorStream(error);
111+
const uncaughtException = common.mustNotCall(
112+
'stream errors should reject FileHandle.writeFile()');
113+
114+
process.once('uncaughtException', uncaughtException);
115+
try {
116+
await assert.rejects(
117+
fileHandle.writeFile(stream),
118+
{ message: error.message }
119+
);
120+
} finally {
121+
process.removeListener('uncaughtException', uncaughtException);
122+
await fileHandle.close();
123+
}
124+
}
125+
97126
async function doWriteStreamWithCancel() {
98127
const controller = new AbortController();
99128
const { signal } = controller;
@@ -190,6 +219,7 @@ async function doWriteInvalidValues() {
190219
await validateWriteFile();
191220
await doWriteAndCancel();
192221
await doWriteStream();
222+
await doWriteStreamError();
193223
await doWriteStreamWithCancel();
194224
await doWriteIterable();
195225
await doWriteInvalidIterable();

test/parallel/test-fs-promises-writefile.js

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ tmpdir.refresh();
1313

1414
const dest = path.resolve(tmpDir, 'tmp.txt');
1515
const otherDest = path.resolve(tmpDir, 'tmp-2.txt');
16+
const errorDest = path.resolve(tmpDir, 'tmp-error.txt');
1617
const buffer = Buffer.from('abc'.repeat(1000));
1718
const buffer2 = Buffer.from('xyz'.repeat(1000));
1819
const stream = Readable.from(['a', 'b', 'c']);
@@ -25,6 +26,16 @@ const iterable = {
2526
yield 'c';
2627
}
2728
};
29+
const streamLikeIterable = {
30+
expected: 'abc',
31+
pipe: common.mustNotCall('pipe should not be called for custom iterables'),
32+
on: common.mustNotCall('on should not be called without removeListener'),
33+
*[Symbol.iterator]() {
34+
yield 'a';
35+
yield 'b';
36+
yield 'c';
37+
}
38+
};
2839

2940
const veryLargeBuffer = {
3041
expected: 'dogs running'.repeat(512 * 1024),
@@ -40,6 +51,15 @@ function iterableWith(value) {
4051
}
4152
};
4253
}
54+
55+
function createEarlyErrorStream(error) {
56+
const stream = new Readable({
57+
read() {}
58+
});
59+
process.nextTick(() => stream.destroy(error));
60+
return stream;
61+
}
62+
4363
const bufferIterable = {
4464
expected: 'abc',
4565
*[Symbol.iterator]() {
@@ -70,6 +90,34 @@ async function doWriteStream() {
7090
assert.deepStrictEqual(data, expected);
7191
}
7292

93+
async function doWriteStreamError() {
94+
const error = new Error('early writeFile stream error');
95+
const stream = createEarlyErrorStream(error);
96+
const uncaughtException = common.mustNotCall(
97+
'stream errors should reject writeFile()');
98+
99+
process.once('uncaughtException', uncaughtException);
100+
try {
101+
await assert.rejects(
102+
fsPromises.writeFile(errorDest, stream),
103+
{ message: error.message }
104+
);
105+
assert.strictEqual(stream.listenerCount('error'), 0);
106+
} finally {
107+
process.removeListener('uncaughtException', uncaughtException);
108+
}
109+
}
110+
111+
async function doWriteStreamOpenError() {
112+
const stream = Readable.from(['a']);
113+
114+
await assert.rejects(
115+
fsPromises.writeFile(path.resolve(tmpDir, 'not-found', 'tmp.txt'), stream),
116+
{ code: 'ENOENT' }
117+
);
118+
assert.strictEqual(stream.listenerCount('error'), 0);
119+
}
120+
73121
async function doWriteStreamWithCancel() {
74122
const controller = new AbortController();
75123
const { signal } = controller;
@@ -86,6 +134,12 @@ async function doWriteIterable() {
86134
assert.deepStrictEqual(data, iterable.expected);
87135
}
88136

137+
async function doWriteStreamLikeIterable() {
138+
await fsPromises.writeFile(dest, streamLikeIterable);
139+
const data = fs.readFileSync(dest, 'utf-8');
140+
assert.deepStrictEqual(data, streamLikeIterable.expected);
141+
}
142+
89143
async function doWriteInvalidIterable() {
90144
await Promise.all(
91145
[42, 42n, {}, Symbol('42'), true, undefined, null, NaN].map((value) =>
@@ -168,8 +222,11 @@ async function doReadWithEncoding() {
168222
await doRead();
169223
await doReadWithEncoding();
170224
await doWriteStream();
225+
await doWriteStreamError();
226+
await doWriteStreamOpenError();
171227
await doWriteStreamWithCancel();
172228
await doWriteIterable();
229+
await doWriteStreamLikeIterable();
173230
await doWriteInvalidIterable();
174231
await doWriteIterableWithEncoding();
175232
await doWriteBufferIterable();

0 commit comments

Comments
 (0)