Skip to content

Commit fee0262

Browse files
committed
child_process: frame advanced IPC messages natively
The `advanced` IPC read path framed messages in JavaScript (parseChannelMessages in lib/internal/child_process/serialization.js): every read scanned the 4-byte big-endian length prefix, accumulated partial frames in an array, and called Buffer.concat to reassemble messages that spanned reads, then crossed into the ipc_serdes binding once per message to deserialize. Move framing into a native per-channel IPCChannelFramer (a BaseObject on the ipc_serdes binding). It owns the cross-read accumulation buffer in C++ and returns an array of complete, deserialized messages per read, so JavaScript no longer reframes the byte stream. Messages that lie entirely within a read are deserialized in place and let host objects borrow the read buffer (zero copy); only frames that span reads are copied, matching the previous Buffer.concat behavior. The wire format (4-byte BE length prefix + V8 payload with custom host-object tags) is preserved byte-for-byte, and handle passing is untouched: the per-read pendingHandle handoff and NODE_HANDLE delivery stay in setupChannel. The `json` read path is deliberately left in JavaScript. Its StringDecoder + split('\n') pipeline avoids copies (V8 rope concatenation and O(1) substrings); reframing it in C++ regressed the json read path by up to ~40% on large messages in benchmarks, so only `advanced` is framed natively. A cctest (test/cctest/test_node_ipc_serdes.cc) exercises the framer directly: single-message round-trips, several messages delivered in one read, and a frame split across two reads (including a split length header). Round-trip throughput (benchmark/child_process/child-process-ipc-roundtrip, advanced; interleaved A/B vs. the pre-change baseline, 8 reps x dur=5s): payload before after change 1 KiB 559,040 626,255 +12.0% (t=5.3) 4 KiB 311,284 330,153 +6.1% (t=2.3) 16 KiB 104,077 125,181 +20.3% 64 KiB 31,921 35,739 +12.0% json round-trip and read throughput are unchanged (within noise). Signed-off-by: Yagiz Nizipli <yagiz@nizipli.com>
1 parent 546b596 commit fee0262

4 files changed

Lines changed: 364 additions & 68 deletions

File tree

lib/internal/child_process/serialization.js

Lines changed: 17 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,42 @@
11
'use strict';
22

33
const {
4-
ArrayPrototypePush,
54
JSONParse,
65
JSONStringify,
76
StringPrototypeSplit,
87
Symbol,
9-
TypedArrayPrototypeSubarray,
108
} = primordials;
119
const { Buffer } = require('buffer');
1210
const { StringDecoder } = require('string_decoder');
13-
const { serialize, deserialize } = internalBinding('ipc_serdes');
11+
const { serialize, IPCChannelFramer } = internalBinding('ipc_serdes');
1412
const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap');
1513

16-
const kMessageBuffer = Symbol('kMessageBuffer');
17-
const kMessageBufferSize = Symbol('kMessageBufferSize');
14+
const kFramer = Symbol('kFramer');
1815
const kJSONBuffer = Symbol('kJSONBuffer');
1916
const kStringDecoder = Symbol('kStringDecoder');
2017

2118
// Messages are parsed in either of the following formats:
22-
// - Newline-delimited JSON, or
2319
// - V8-serialized buffers, prefixed with their length as a big endian uint32
24-
// (aka 'advanced')
20+
// (aka 'advanced'), or
21+
// - newline-delimited JSON.
22+
//
23+
// The 'advanced' read path is framed natively: IPCChannelFramer accumulates
24+
// partial reads in C++ and hands back an array of complete, deserialized
25+
// messages, so JavaScript never reframes the byte stream. The 'json' read path
26+
// stays in JavaScript: its StringDecoder + split('\n') pipeline already avoids
27+
// copies via V8 rope concatenation and O(1) substrings, and is measurably
28+
// faster than reframing the bytes in C++.
2529
const advanced = {
2630
initMessageChannel(channel) {
27-
channel[kMessageBuffer] = [];
28-
channel[kMessageBufferSize] = 0;
31+
channel[kFramer] = new IPCChannelFramer();
2932
channel.buffering = false;
3033
},
3134

32-
*parseChannelMessages(channel, readData) {
33-
if (readData.length === 0) return;
34-
35-
if (channel[kMessageBufferSize] && channel[kMessageBuffer][0].length < 4) {
36-
// Message length split into two buffers, so let's concatenate it.
37-
channel[kMessageBuffer][0] = Buffer.concat([channel[kMessageBuffer][0], readData]);
38-
} else {
39-
ArrayPrototypePush(channel[kMessageBuffer], readData);
40-
}
41-
channel[kMessageBufferSize] += readData.length;
42-
43-
// Index 0 should always be present because we just pushed data into it.
44-
let messageBufferHead = channel[kMessageBuffer][0];
45-
while (messageBufferHead.length >= 4) {
46-
// We call `readUInt32BE` manually here, because this is faster than first converting
47-
// it to a buffer and using `readUInt32BE` on that.
48-
const fullMessageSize = ((
49-
messageBufferHead[0] << 24 |
50-
messageBufferHead[1] << 16 |
51-
messageBufferHead[2] << 8 |
52-
messageBufferHead[3]
53-
) >>> 0) + 4;
54-
55-
if (channel[kMessageBufferSize] < fullMessageSize) break;
56-
57-
const concatenatedBuffer = channel[kMessageBuffer].length === 1 ?
58-
channel[kMessageBuffer][0] :
59-
Buffer.concat(
60-
channel[kMessageBuffer],
61-
channel[kMessageBufferSize],
62-
);
63-
64-
const serializedMessage =
65-
TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize);
66-
67-
messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize);
68-
channel[kMessageBufferSize] = messageBufferHead.length;
69-
channel[kMessageBuffer] =
70-
channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : [];
71-
72-
yield deserialize(serializedMessage);
73-
}
74-
75-
channel.buffering = channel[kMessageBufferSize] > 0;
35+
parseChannelMessages(channel, readData) {
36+
if (readData.length === 0) return [];
37+
const messages = channel[kFramer].read(readData);
38+
channel.buffering = channel[kFramer].buffering();
39+
return messages;
7640
},
7741

7842
writeChannelMessage(channel, req, message, handle) {

src/node_ipc_serdes.cc

Lines changed: 193 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
#include "base_object-inl.h"
12
#include "env-inl.h"
3+
#include "memory_tracker-inl.h"
24
#include "node_buffer.h"
35
#include "node_errors.h"
46
#include "node_external_reference.h"
@@ -7,6 +9,7 @@
79

810
#include <cstring>
911
#include <utility>
12+
#include <vector>
1013

1114
// Native implementation of the `advanced` child_process IPC serialization
1215
// codec previously implemented in lib/internal/child_process/serialization.js
@@ -19,6 +22,7 @@
1922

2023
namespace node {
2124

25+
using v8::Array;
2226
using v8::ArrayBuffer;
2327
using v8::ArrayBufferView;
2428
using v8::BackingStore;
@@ -31,12 +35,14 @@ using v8::Float16Array;
3135
using v8::Float32Array;
3236
using v8::Float64Array;
3337
using v8::FunctionCallbackInfo;
38+
using v8::FunctionTemplate;
3439
using v8::Int16Array;
3540
using v8::Int32Array;
3641
using v8::Int8Array;
3742
using v8::Isolate;
3843
using v8::Just;
3944
using v8::Local;
45+
using v8::LocalVector;
4046
using v8::Maybe;
4147
using v8::MaybeLocal;
4248
using v8::Name;
@@ -280,14 +286,17 @@ class IPCDeserializerDelegate : public ValueDeserializer::Delegate {
280286
if (!deserializer_->ReadRawBytes(byte_length, &data)) return {};
281287

282288
const size_t bytes_per_element = BytesPerElement(type_index);
283-
const size_t offset_in_ab = static_cast<const uint8_t*>(data) -
284-
static_cast<const uint8_t*>(ab_->Data());
285289

286290
// Reuse the backing ArrayBuffer when the data is suitably aligned,
287291
// otherwise copy into a fresh aligned buffer. Mirrors _readHostObject()
288-
// in lib/v8.js.
289-
if (offset_in_ab % bytes_per_element == 0) {
290-
return MakeView(env_, type_index, ab_, offset_in_ab, byte_length);
292+
// in lib/v8.js. A frame reassembled across reads has no standalone
293+
// ArrayBuffer to borrow from (`ab_` is empty), so always copy.
294+
if (!ab_.IsEmpty()) {
295+
const size_t offset_in_ab = static_cast<const uint8_t*>(data) -
296+
static_cast<const uint8_t*>(ab_->Data());
297+
if (offset_in_ab % bytes_per_element == 0) {
298+
return MakeView(env_, type_index, ab_, offset_in_ab, byte_length);
299+
}
291300
}
292301
std::shared_ptr<BackingStore> store =
293302
ArrayBuffer::NewBackingStore(isolate, byte_length);
@@ -339,42 +348,211 @@ static void Serialize(const FunctionCallbackInfo<Value>& args) {
339348
}
340349
}
341350

342-
static void Deserialize(const FunctionCallbackInfo<Value>& args) {
343-
Environment* env = Environment::GetCurrent(args);
351+
// Deserializes a single `advanced` payload: the V8 wire header followed by the
352+
// value, without the 4-byte big-endian length prefix. When `ab` is non-empty
353+
// it is the ArrayBuffer backing `data`, which the delegate may borrow for
354+
// zero-copy ArrayBufferView host objects; when it is empty (a frame reassembled
355+
// across reads) host objects are copied into fresh buffers.
356+
static MaybeLocal<Value> DeserializeAdvancedPayload(Environment* env,
357+
Local<ArrayBuffer> ab,
358+
const uint8_t* data,
359+
size_t length) {
344360
Isolate* isolate = env->isolate();
345361
Local<Context> context = env->context();
346362

363+
IPCDeserializerDelegate delegate(env, ab);
364+
ValueDeserializer deserializer(isolate, data, length, &delegate);
365+
delegate.set_deserializer(&deserializer);
366+
367+
bool read_header;
368+
if (!deserializer.ReadHeader(context).To(&read_header)) return {};
369+
return deserializer.ReadValue(context);
370+
}
371+
372+
static inline uint32_t ReadUInt32BE(const char* p) {
373+
const uint8_t* b = reinterpret_cast<const uint8_t*>(p);
374+
return (static_cast<uint32_t>(b[0]) << 24) |
375+
(static_cast<uint32_t>(b[1]) << 16) |
376+
(static_cast<uint32_t>(b[2]) << 8) | static_cast<uint32_t>(b[3]);
377+
}
378+
379+
static void Deserialize(const FunctionCallbackInfo<Value>& args) {
380+
Environment* env = Environment::GetCurrent(args);
381+
347382
CHECK(args[0]->IsArrayBufferView());
348383
Local<ArrayBufferView> view = args[0].As<ArrayBufferView>();
349384
Local<ArrayBuffer> ab = view->Buffer();
350385
const size_t byte_offset = view->ByteOffset();
351386
const size_t byte_length = view->ByteLength();
352387
const uint8_t* data = static_cast<const uint8_t*>(ab->Data()) + byte_offset;
353388

354-
IPCDeserializerDelegate delegate(env, ab);
355-
ValueDeserializer deserializer(isolate, data, byte_length, &delegate);
356-
delegate.set_deserializer(&deserializer);
357-
358-
bool read_header;
359-
if (!deserializer.ReadHeader(context).To(&read_header)) return;
360-
361389
Local<Value> value;
362-
if (deserializer.ReadValue(context).ToLocal(&value)) {
390+
if (DeserializeAdvancedPayload(env, ab, data, byte_length).ToLocal(&value)) {
363391
args.GetReturnValue().Set(value);
364392
}
365393
}
366394

395+
// Per-channel native framer for the `advanced` codec. It turns the raw IPC
396+
// byte stream into complete, deserialized messages, owning the cross-read
397+
// accumulation buffer that used to live in serialization.js (kMessageBuffer),
398+
// so JavaScript receives whole messages and never reframes partial reads or
399+
// concatenates partial frames itself.
400+
//
401+
// The `json` codec is intentionally not framed here: its StringDecoder +
402+
// split('\n') pipeline in serialization.js already avoids copies (V8 rope
403+
// concatenation and O(1) substrings) and is faster than reassembling the bytes
404+
// in C++.
405+
class IPCChannelFramer : public BaseObject {
406+
public:
407+
IPCChannelFramer(Environment* env, Local<Object> wrap)
408+
: BaseObject(env, wrap) {
409+
MakeWeak();
410+
}
411+
412+
static void New(const FunctionCallbackInfo<Value>& args);
413+
static void Read(const FunctionCallbackInfo<Value>& args);
414+
static void Buffering(const FunctionCallbackInfo<Value>& args);
415+
416+
void MemoryInfo(MemoryTracker* tracker) const override {
417+
tracker->TrackField("buffered", buffered_);
418+
}
419+
SET_MEMORY_INFO_NAME(IPCChannelFramer)
420+
SET_SELF_SIZE(IPCChannelFramer)
421+
422+
private:
423+
// Append complete deserialized messages found in `data` to `out`, buffering
424+
// any trailing partial frame for the next read. Return false (with a pending
425+
// exception) if deserialization fails.
426+
bool ReadAdvanced(Local<ArrayBuffer> ab,
427+
const uint8_t* data,
428+
size_t length,
429+
LocalVector<Value>* out);
430+
431+
std::vector<char> buffered_;
432+
};
433+
434+
void IPCChannelFramer::New(const FunctionCallbackInfo<Value>& args) {
435+
Environment* env = Environment::GetCurrent(args);
436+
if (!args.IsConstructCall()) {
437+
return THROW_ERR_CONSTRUCT_CALL_REQUIRED(
438+
env,
439+
"Class constructor IPCChannelFramer cannot be invoked without 'new'");
440+
}
441+
new IPCChannelFramer(env, args.This());
442+
}
443+
444+
bool IPCChannelFramer::ReadAdvanced(Local<ArrayBuffer> ab,
445+
const uint8_t* data,
446+
size_t length,
447+
LocalVector<Value>* out) {
448+
Environment* env = this->env();
449+
size_t pos = 0;
450+
451+
// Phase 1: finish a frame whose bytes started in a previous read. It is
452+
// reassembled in `buffered_`, so it has no borrowable ArrayBuffer and its
453+
// host objects are copied.
454+
if (!buffered_.empty()) {
455+
while (buffered_.size() < 4 && pos < length) {
456+
buffered_.push_back(static_cast<char>(data[pos++]));
457+
}
458+
if (buffered_.size() < 4) return true; // still buffering the length header
459+
460+
const size_t frame_total =
461+
static_cast<size_t>(ReadUInt32BE(buffered_.data())) + 4;
462+
while (buffered_.size() < frame_total && pos < length) {
463+
buffered_.push_back(static_cast<char>(data[pos++]));
464+
}
465+
if (buffered_.size() < frame_total) return true; // still buffering payload
466+
467+
const uint8_t* payload =
468+
reinterpret_cast<const uint8_t*>(buffered_.data()) + 4;
469+
Local<Value> message;
470+
if (!DeserializeAdvancedPayload(
471+
env, Local<ArrayBuffer>(), payload, frame_total - 4)
472+
.ToLocal(&message)) {
473+
return false;
474+
}
475+
out->push_back(message);
476+
buffered_.clear();
477+
}
478+
479+
// Phase 2: messages contained entirely within this chunk are deserialized in
480+
// place, letting host objects borrow the chunk's ArrayBuffer (zero copy).
481+
while (length - pos >= 4) {
482+
const size_t frame_total = static_cast<size_t>(ReadUInt32BE(
483+
reinterpret_cast<const char*>(data + pos))) +
484+
4;
485+
if (length - pos < frame_total) break; // incomplete trailing frame
486+
487+
Local<Value> message;
488+
if (!DeserializeAdvancedPayload(env, ab, data + pos + 4, frame_total - 4)
489+
.ToLocal(&message)) {
490+
return false;
491+
}
492+
out->push_back(message);
493+
pos += frame_total;
494+
}
495+
496+
if (pos < length) {
497+
buffered_.assign(reinterpret_cast<const char*>(data + pos),
498+
reinterpret_cast<const char*>(data + length));
499+
}
500+
return true;
501+
}
502+
503+
void IPCChannelFramer::Read(const FunctionCallbackInfo<Value>& args) {
504+
IPCChannelFramer* framer;
505+
ASSIGN_OR_RETURN_UNWRAP(&framer, args.This());
506+
Isolate* isolate = framer->env()->isolate();
507+
508+
CHECK(args[0]->IsUint8Array());
509+
Local<Uint8Array> chunk = args[0].As<Uint8Array>();
510+
Local<ArrayBuffer> ab = chunk->Buffer();
511+
const size_t offset = chunk->ByteOffset();
512+
const size_t length = chunk->ByteLength();
513+
const uint8_t* data = static_cast<const uint8_t*>(ab->Data()) + offset;
514+
515+
LocalVector<Value> messages(isolate);
516+
if (!framer->ReadAdvanced(ab, data, length, &messages)) {
517+
return; // a pending exception propagates to JavaScript
518+
}
519+
520+
args.GetReturnValue().Set(
521+
Array::New(isolate, messages.data(), messages.size()));
522+
}
523+
524+
void IPCChannelFramer::Buffering(const FunctionCallbackInfo<Value>& args) {
525+
IPCChannelFramer* framer;
526+
ASSIGN_OR_RETURN_UNWRAP(&framer, args.This());
527+
args.GetReturnValue().Set(!framer->buffered_.empty());
528+
}
529+
367530
static void Initialize(Local<Object> target,
368531
Local<Value> unused,
369532
Local<Context> context,
370533
void* priv) {
534+
Environment* env = Environment::GetCurrent(context);
535+
Isolate* isolate = env->isolate();
536+
371537
SetMethod(context, target, "serialize", Serialize);
372538
SetMethod(context, target, "deserialize", Deserialize);
539+
540+
Local<FunctionTemplate> framer =
541+
NewFunctionTemplate(isolate, IPCChannelFramer::New);
542+
framer->InstanceTemplate()->SetInternalFieldCount(
543+
IPCChannelFramer::kInternalFieldCount);
544+
SetProtoMethod(isolate, framer, "read", IPCChannelFramer::Read);
545+
SetProtoMethod(isolate, framer, "buffering", IPCChannelFramer::Buffering);
546+
framer->ReadOnlyPrototype();
547+
SetConstructorFunction(context, target, "IPCChannelFramer", framer);
373548
}
374549

375550
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
376551
registry->Register(Serialize);
377552
registry->Register(Deserialize);
553+
registry->Register(IPCChannelFramer::New);
554+
registry->Register(IPCChannelFramer::Read);
555+
registry->Register(IPCChannelFramer::Buffering);
378556
}
379557

380558
} // namespace ipc_serdes

0 commit comments

Comments
 (0)