From fee0262246d20f9f3ac6535972e5a4e4fb324aae Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Mon, 22 Jun 2026 13:37:44 -0400 Subject: [PATCH] child_process: frame advanced IPC messages natively The `advanced` IPC read path framed messages in JavaScript (parseChannelMessages in lib/internal/child_process/serialization.js): every read scanned the 4-byte big-endian length prefix, accumulated partial frames in an array, and called Buffer.concat to reassemble messages that spanned reads, then crossed into the ipc_serdes binding once per message to deserialize. Move framing into a native per-channel IPCChannelFramer (a BaseObject on the ipc_serdes binding). It owns the cross-read accumulation buffer in C++ and returns an array of complete, deserialized messages per read, so JavaScript no longer reframes the byte stream. Messages that lie entirely within a read are deserialized in place and let host objects borrow the read buffer (zero copy); only frames that span reads are copied, matching the previous Buffer.concat behavior. The wire format (4-byte BE length prefix + V8 payload with custom host-object tags) is preserved byte-for-byte, and handle passing is untouched: the per-read pendingHandle handoff and NODE_HANDLE delivery stay in setupChannel. The `json` read path is deliberately left in JavaScript. Its StringDecoder + split('\n') pipeline avoids copies (V8 rope concatenation and O(1) substrings); reframing it in C++ regressed the json read path by up to ~40% on large messages in benchmarks, so only `advanced` is framed natively. A cctest (test/cctest/test_node_ipc_serdes.cc) exercises the framer directly: single-message round-trips, several messages delivered in one read, and a frame split across two reads (including a split length header). Round-trip throughput (benchmark/child_process/child-process-ipc-roundtrip, advanced; interleaved A/B vs. the pre-change baseline, 8 reps x dur=5s): payload before after change 1 KiB 559,040 626,255 +12.0% (t=5.3) 4 KiB 311,284 330,153 +6.1% (t=2.3) 16 KiB 104,077 125,181 +20.3% 64 KiB 31,921 35,739 +12.0% json round-trip and read throughput are unchanged (within noise). Signed-off-by: Yagiz Nizipli --- lib/internal/child_process/serialization.js | 70 ++----- src/node_ipc_serdes.cc | 208 ++++++++++++++++++-- test/cctest/test_node_ipc_serdes.cc | 144 ++++++++++++++ typings/internalBinding/ipc_serdes.d.ts | 10 + 4 files changed, 364 insertions(+), 68 deletions(-) diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js index 989f748734a245..a33ac900d819f1 100644 --- a/lib/internal/child_process/serialization.js +++ b/lib/internal/child_process/serialization.js @@ -1,78 +1,42 @@ 'use strict'; const { - ArrayPrototypePush, JSONParse, JSONStringify, StringPrototypeSplit, Symbol, - TypedArrayPrototypeSubarray, } = primordials; const { Buffer } = require('buffer'); const { StringDecoder } = require('string_decoder'); -const { serialize, deserialize } = internalBinding('ipc_serdes'); +const { serialize, IPCChannelFramer } = internalBinding('ipc_serdes'); const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap'); -const kMessageBuffer = Symbol('kMessageBuffer'); -const kMessageBufferSize = Symbol('kMessageBufferSize'); +const kFramer = Symbol('kFramer'); const kJSONBuffer = Symbol('kJSONBuffer'); const kStringDecoder = Symbol('kStringDecoder'); // Messages are parsed in either of the following formats: -// - Newline-delimited JSON, or // - V8-serialized buffers, prefixed with their length as a big endian uint32 -// (aka 'advanced') +// (aka 'advanced'), or +// - newline-delimited JSON. +// +// The 'advanced' read path is framed natively: IPCChannelFramer accumulates +// partial reads in C++ and hands back an array of complete, deserialized +// messages, so JavaScript never reframes the byte stream. The 'json' read path +// stays in JavaScript: its StringDecoder + split('\n') pipeline already avoids +// copies via V8 rope concatenation and O(1) substrings, and is measurably +// faster than reframing the bytes in C++. const advanced = { initMessageChannel(channel) { - channel[kMessageBuffer] = []; - channel[kMessageBufferSize] = 0; + channel[kFramer] = new IPCChannelFramer(); channel.buffering = false; }, - *parseChannelMessages(channel, readData) { - if (readData.length === 0) return; - - if (channel[kMessageBufferSize] && channel[kMessageBuffer][0].length < 4) { - // Message length split into two buffers, so let's concatenate it. - channel[kMessageBuffer][0] = Buffer.concat([channel[kMessageBuffer][0], readData]); - } else { - ArrayPrototypePush(channel[kMessageBuffer], readData); - } - channel[kMessageBufferSize] += readData.length; - - // Index 0 should always be present because we just pushed data into it. - let messageBufferHead = channel[kMessageBuffer][0]; - while (messageBufferHead.length >= 4) { - // We call `readUInt32BE` manually here, because this is faster than first converting - // it to a buffer and using `readUInt32BE` on that. - const fullMessageSize = (( - messageBufferHead[0] << 24 | - messageBufferHead[1] << 16 | - messageBufferHead[2] << 8 | - messageBufferHead[3] - ) >>> 0) + 4; - - if (channel[kMessageBufferSize] < fullMessageSize) break; - - const concatenatedBuffer = channel[kMessageBuffer].length === 1 ? - channel[kMessageBuffer][0] : - Buffer.concat( - channel[kMessageBuffer], - channel[kMessageBufferSize], - ); - - const serializedMessage = - TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize); - - messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize); - channel[kMessageBufferSize] = messageBufferHead.length; - channel[kMessageBuffer] = - channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : []; - - yield deserialize(serializedMessage); - } - - channel.buffering = channel[kMessageBufferSize] > 0; + parseChannelMessages(channel, readData) { + if (readData.length === 0) return []; + const messages = channel[kFramer].read(readData); + channel.buffering = channel[kFramer].buffering(); + return messages; }, writeChannelMessage(channel, req, message, handle) { diff --git a/src/node_ipc_serdes.cc b/src/node_ipc_serdes.cc index 05d5ae7ccffeb2..8963a7b19a4f1e 100644 --- a/src/node_ipc_serdes.cc +++ b/src/node_ipc_serdes.cc @@ -1,4 +1,6 @@ +#include "base_object-inl.h" #include "env-inl.h" +#include "memory_tracker-inl.h" #include "node_buffer.h" #include "node_errors.h" #include "node_external_reference.h" @@ -7,6 +9,7 @@ #include #include +#include // Native implementation of the `advanced` child_process IPC serialization // codec previously implemented in lib/internal/child_process/serialization.js @@ -19,6 +22,7 @@ namespace node { +using v8::Array; using v8::ArrayBuffer; using v8::ArrayBufferView; using v8::BackingStore; @@ -31,12 +35,14 @@ using v8::Float16Array; using v8::Float32Array; using v8::Float64Array; using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; using v8::Int16Array; using v8::Int32Array; using v8::Int8Array; using v8::Isolate; using v8::Just; using v8::Local; +using v8::LocalVector; using v8::Maybe; using v8::MaybeLocal; using v8::Name; @@ -280,14 +286,17 @@ class IPCDeserializerDelegate : public ValueDeserializer::Delegate { if (!deserializer_->ReadRawBytes(byte_length, &data)) return {}; const size_t bytes_per_element = BytesPerElement(type_index); - const size_t offset_in_ab = static_cast(data) - - static_cast(ab_->Data()); // Reuse the backing ArrayBuffer when the data is suitably aligned, // otherwise copy into a fresh aligned buffer. Mirrors _readHostObject() - // in lib/v8.js. - if (offset_in_ab % bytes_per_element == 0) { - return MakeView(env_, type_index, ab_, offset_in_ab, byte_length); + // in lib/v8.js. A frame reassembled across reads has no standalone + // ArrayBuffer to borrow from (`ab_` is empty), so always copy. + if (!ab_.IsEmpty()) { + const size_t offset_in_ab = static_cast(data) - + static_cast(ab_->Data()); + if (offset_in_ab % bytes_per_element == 0) { + return MakeView(env_, type_index, ab_, offset_in_ab, byte_length); + } } std::shared_ptr store = ArrayBuffer::NewBackingStore(isolate, byte_length); @@ -339,11 +348,37 @@ static void Serialize(const FunctionCallbackInfo& args) { } } -static void Deserialize(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); +// Deserializes a single `advanced` payload: the V8 wire header followed by the +// value, without the 4-byte big-endian length prefix. When `ab` is non-empty +// it is the ArrayBuffer backing `data`, which the delegate may borrow for +// zero-copy ArrayBufferView host objects; when it is empty (a frame reassembled +// across reads) host objects are copied into fresh buffers. +static MaybeLocal DeserializeAdvancedPayload(Environment* env, + Local ab, + const uint8_t* data, + size_t length) { Isolate* isolate = env->isolate(); Local context = env->context(); + IPCDeserializerDelegate delegate(env, ab); + ValueDeserializer deserializer(isolate, data, length, &delegate); + delegate.set_deserializer(&deserializer); + + bool read_header; + if (!deserializer.ReadHeader(context).To(&read_header)) return {}; + return deserializer.ReadValue(context); +} + +static inline uint32_t ReadUInt32BE(const char* p) { + const uint8_t* b = reinterpret_cast(p); + return (static_cast(b[0]) << 24) | + (static_cast(b[1]) << 16) | + (static_cast(b[2]) << 8) | static_cast(b[3]); +} + +static void Deserialize(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(args[0]->IsArrayBufferView()); Local view = args[0].As(); Local ab = view->Buffer(); @@ -351,30 +386,173 @@ static void Deserialize(const FunctionCallbackInfo& args) { const size_t byte_length = view->ByteLength(); const uint8_t* data = static_cast(ab->Data()) + byte_offset; - IPCDeserializerDelegate delegate(env, ab); - ValueDeserializer deserializer(isolate, data, byte_length, &delegate); - delegate.set_deserializer(&deserializer); - - bool read_header; - if (!deserializer.ReadHeader(context).To(&read_header)) return; - Local value; - if (deserializer.ReadValue(context).ToLocal(&value)) { + if (DeserializeAdvancedPayload(env, ab, data, byte_length).ToLocal(&value)) { args.GetReturnValue().Set(value); } } +// Per-channel native framer for the `advanced` codec. It turns the raw IPC +// byte stream into complete, deserialized messages, owning the cross-read +// accumulation buffer that used to live in serialization.js (kMessageBuffer), +// so JavaScript receives whole messages and never reframes partial reads or +// concatenates partial frames itself. +// +// The `json` codec is intentionally not framed here: its StringDecoder + +// split('\n') pipeline in serialization.js already avoids copies (V8 rope +// concatenation and O(1) substrings) and is faster than reassembling the bytes +// in C++. +class IPCChannelFramer : public BaseObject { + public: + IPCChannelFramer(Environment* env, Local wrap) + : BaseObject(env, wrap) { + MakeWeak(); + } + + static void New(const FunctionCallbackInfo& args); + static void Read(const FunctionCallbackInfo& args); + static void Buffering(const FunctionCallbackInfo& args); + + void MemoryInfo(MemoryTracker* tracker) const override { + tracker->TrackField("buffered", buffered_); + } + SET_MEMORY_INFO_NAME(IPCChannelFramer) + SET_SELF_SIZE(IPCChannelFramer) + + private: + // Append complete deserialized messages found in `data` to `out`, buffering + // any trailing partial frame for the next read. Return false (with a pending + // exception) if deserialization fails. + bool ReadAdvanced(Local ab, + const uint8_t* data, + size_t length, + LocalVector* out); + + std::vector buffered_; +}; + +void IPCChannelFramer::New(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + if (!args.IsConstructCall()) { + return THROW_ERR_CONSTRUCT_CALL_REQUIRED( + env, + "Class constructor IPCChannelFramer cannot be invoked without 'new'"); + } + new IPCChannelFramer(env, args.This()); +} + +bool IPCChannelFramer::ReadAdvanced(Local ab, + const uint8_t* data, + size_t length, + LocalVector* out) { + Environment* env = this->env(); + size_t pos = 0; + + // Phase 1: finish a frame whose bytes started in a previous read. It is + // reassembled in `buffered_`, so it has no borrowable ArrayBuffer and its + // host objects are copied. + if (!buffered_.empty()) { + while (buffered_.size() < 4 && pos < length) { + buffered_.push_back(static_cast(data[pos++])); + } + if (buffered_.size() < 4) return true; // still buffering the length header + + const size_t frame_total = + static_cast(ReadUInt32BE(buffered_.data())) + 4; + while (buffered_.size() < frame_total && pos < length) { + buffered_.push_back(static_cast(data[pos++])); + } + if (buffered_.size() < frame_total) return true; // still buffering payload + + const uint8_t* payload = + reinterpret_cast(buffered_.data()) + 4; + Local message; + if (!DeserializeAdvancedPayload( + env, Local(), payload, frame_total - 4) + .ToLocal(&message)) { + return false; + } + out->push_back(message); + buffered_.clear(); + } + + // Phase 2: messages contained entirely within this chunk are deserialized in + // place, letting host objects borrow the chunk's ArrayBuffer (zero copy). + while (length - pos >= 4) { + const size_t frame_total = static_cast(ReadUInt32BE( + reinterpret_cast(data + pos))) + + 4; + if (length - pos < frame_total) break; // incomplete trailing frame + + Local message; + if (!DeserializeAdvancedPayload(env, ab, data + pos + 4, frame_total - 4) + .ToLocal(&message)) { + return false; + } + out->push_back(message); + pos += frame_total; + } + + if (pos < length) { + buffered_.assign(reinterpret_cast(data + pos), + reinterpret_cast(data + length)); + } + return true; +} + +void IPCChannelFramer::Read(const FunctionCallbackInfo& args) { + IPCChannelFramer* framer; + ASSIGN_OR_RETURN_UNWRAP(&framer, args.This()); + Isolate* isolate = framer->env()->isolate(); + + CHECK(args[0]->IsUint8Array()); + Local chunk = args[0].As(); + Local ab = chunk->Buffer(); + const size_t offset = chunk->ByteOffset(); + const size_t length = chunk->ByteLength(); + const uint8_t* data = static_cast(ab->Data()) + offset; + + LocalVector messages(isolate); + if (!framer->ReadAdvanced(ab, data, length, &messages)) { + return; // a pending exception propagates to JavaScript + } + + args.GetReturnValue().Set( + Array::New(isolate, messages.data(), messages.size())); +} + +void IPCChannelFramer::Buffering(const FunctionCallbackInfo& args) { + IPCChannelFramer* framer; + ASSIGN_OR_RETURN_UNWRAP(&framer, args.This()); + args.GetReturnValue().Set(!framer->buffered_.empty()); +} + static void Initialize(Local target, Local unused, Local context, void* priv) { + Environment* env = Environment::GetCurrent(context); + Isolate* isolate = env->isolate(); + SetMethod(context, target, "serialize", Serialize); SetMethod(context, target, "deserialize", Deserialize); + + Local framer = + NewFunctionTemplate(isolate, IPCChannelFramer::New); + framer->InstanceTemplate()->SetInternalFieldCount( + IPCChannelFramer::kInternalFieldCount); + SetProtoMethod(isolate, framer, "read", IPCChannelFramer::Read); + SetProtoMethod(isolate, framer, "buffering", IPCChannelFramer::Buffering); + framer->ReadOnlyPrototype(); + SetConstructorFunction(context, target, "IPCChannelFramer", framer); } static void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Serialize); registry->Register(Deserialize); + registry->Register(IPCChannelFramer::New); + registry->Register(IPCChannelFramer::Read); + registry->Register(IPCChannelFramer::Buffering); } } // namespace ipc_serdes diff --git a/test/cctest/test_node_ipc_serdes.cc b/test/cctest/test_node_ipc_serdes.cc index fa86e316e7a5f9..3899e42241c49d 100644 --- a/test/cctest/test_node_ipc_serdes.cc +++ b/test/cctest/test_node_ipc_serdes.cc @@ -3,7 +3,9 @@ #include "node_buffer.h" #include "node_test_fixture.h" +#include #include +#include using node::Environment; @@ -207,4 +209,146 @@ TEST_F(IPCSerdesTest, BufferRoundTripsAsBuffer) { EXPECT_EQ(memcmp(node::Buffer::Data(out), "Hello!", 6), 0); } +// --- IPCChannelFramer (native read-path framing) --- + +// Serializes `input` into a complete `advanced` frame (4-byte length prefix +// included), as written on the wire by writeChannelMessage(). +v8::Local SerializeFrame(v8::Local context, + v8::Local serialize, + v8::Local input) { + v8::Isolate* isolate = v8::Isolate::GetCurrent(); + v8::Local buffer_ctor = + context->Global() + ->Get(context, v8::String::NewFromUtf8Literal(isolate, "Buffer")) + .ToLocalChecked(); + v8::Local args[] = {input, buffer_ctor}; + return serialize->Call(context, v8::Undefined(isolate), 2, args) + .ToLocalChecked(); +} + +v8::Local NewFramer(v8::Local context, + v8::Local binding) { + v8::Local ctor = + GetFunction(context, binding, "IPCChannelFramer"); + return ctor->NewInstance(context, 0, nullptr).ToLocalChecked(); +} + +v8::Local MakeChunk(v8::Local context, + const void* data, + size_t length) { + v8::Isolate* isolate = v8::Isolate::GetCurrent(); + v8::Local ab = v8::ArrayBuffer::New(isolate, length); + if (length > 0) memcpy(ab->Data(), data, length); + return v8::Uint8Array::New(ab, 0, length); +} + +v8::Local FramerRead(v8::Local context, + v8::Local framer, + v8::Local chunk) { + v8::Local read = GetFunction(context, framer, "read"); + return read->Call(context, framer, 1, &chunk) + .ToLocalChecked() + .As(); +} + +bool FramerBuffering(v8::Local context, + v8::Local framer) { + v8::Local buffering = GetFunction(context, framer, "buffering"); + return buffering->Call(context, framer, 0, nullptr) + .ToLocalChecked() + ->BooleanValue(v8::Isolate::GetCurrent()); +} + +TEST_F(IPCSerdesTest, FramerRoundTripsSingleMessage) { + const v8::HandleScope handle_scope(isolate_); + const Argv argv; + Env test_env{handle_scope, argv}; + v8::Local context = isolate_->GetCurrentContext(); + + v8::Local binding = GetIpcSerdesBinding(*test_env, context); + v8::Local serialize = + GetFunction(context, binding, "serialize"); + + v8::Local input = v8::Number::New(isolate_, 42); + v8::Local frame = SerializeFrame(context, serialize, input); + + v8::Local framer = NewFramer(context, binding); + v8::Local messages = FramerRead(context, framer, frame); + + ASSERT_EQ(messages->Length(), 1u); + EXPECT_TRUE(messages->Get(context, 0).ToLocalChecked()->StrictEquals(input)); + EXPECT_FALSE(FramerBuffering(context, framer)); +} + +TEST_F(IPCSerdesTest, FramerSplitsMultipleMessagesInOneRead) { + const v8::HandleScope handle_scope(isolate_); + const Argv argv; + Env test_env{handle_scope, argv}; + v8::Local context = isolate_->GetCurrentContext(); + + v8::Local binding = GetIpcSerdesBinding(*test_env, context); + v8::Local serialize = + GetFunction(context, binding, "serialize"); + + v8::Local frame1 = + SerializeFrame(context, serialize, v8::Number::New(isolate_, 1)); + v8::Local frame2 = SerializeFrame( + context, serialize, v8::String::NewFromUtf8Literal(isolate_, "two")); + + node::ArrayBufferViewContents b1(frame1); + node::ArrayBufferViewContents b2(frame2); + std::vector combined; + combined.insert(combined.end(), b1.data(), b1.data() + b1.length()); + combined.insert(combined.end(), b2.data(), b2.data() + b2.length()); + + v8::Local framer = NewFramer(context, binding); + v8::Local messages = FramerRead( + context, framer, MakeChunk(context, combined.data(), combined.size())); + + ASSERT_EQ(messages->Length(), 2u); + EXPECT_EQ(messages->Get(context, 0) + .ToLocalChecked() + ->Int32Value(context) + .FromJust(), + 1); + v8::String::Utf8Value second(isolate_, + messages->Get(context, 1).ToLocalChecked()); + EXPECT_STREQ(*second, "two"); + EXPECT_FALSE(FramerBuffering(context, framer)); +} + +TEST_F(IPCSerdesTest, FramerBuffersPartialAdvancedFrame) { + const v8::HandleScope handle_scope(isolate_); + const Argv argv; + Env test_env{handle_scope, argv}; + v8::Local context = isolate_->GetCurrentContext(); + + v8::Local binding = GetIpcSerdesBinding(*test_env, context); + v8::Local serialize = + GetFunction(context, binding, "serialize"); + + v8::Local frame = SerializeFrame( + context, serialize, v8::String::NewFromUtf8Literal(isolate_, "spanning")); + node::ArrayBufferViewContents bytes(frame); + const size_t total = bytes.length(); + // Split inside the 4-byte length header to exercise that path too. + const size_t split = 2; + ASSERT_GT(total, split); + + v8::Local framer = NewFramer(context, binding); + + v8::Local first = + FramerRead(context, framer, MakeChunk(context, bytes.data(), split)); + EXPECT_EQ(first->Length(), 0u); + EXPECT_TRUE(FramerBuffering(context, framer)); + + v8::Local second = FramerRead( + context, framer, MakeChunk(context, bytes.data() + split, total - split)); + ASSERT_EQ(second->Length(), 1u); + v8::String::Utf8Value value(isolate_, + second->Get(context, 0).ToLocalChecked()); + EXPECT_STREQ(*value, "spanning"); + EXPECT_FALSE(FramerBuffering(context, framer)); +} + } // namespace diff --git a/typings/internalBinding/ipc_serdes.d.ts b/typings/internalBinding/ipc_serdes.d.ts index dc00f068e4270a..99ef866ce16fdc 100644 --- a/typings/internalBinding/ipc_serdes.d.ts +++ b/typings/internalBinding/ipc_serdes.d.ts @@ -1,8 +1,18 @@ declare namespace InternalIPCSerdesBinding { type Buffer = Uint8Array; + + // Per-channel native framer for the `advanced` codec. Accumulates partial + // reads in C++ and returns the complete, deserialized messages found in each + // chunk. See src/node_ipc_serdes.cc. + class IPCChannelFramer { + constructor(); + read(chunk: Uint8Array): unknown[]; + buffering(): boolean; + } } export interface IPCSerdesBinding { serialize(message: unknown): InternalIPCSerdesBinding.Buffer; deserialize(buffer: ArrayBufferView): unknown; + IPCChannelFramer: typeof InternalIPCSerdesBinding.IPCChannelFramer; }