Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# endReadableNT teardown-tick benchmarks

These two scripts measure the `process.nextTick()` teardown cascade of a
short-lived, flowing `PassThrough -> Transform` pipeline — the shape produced
by e.g. a per-request or per-file streaming stage. They exist to demonstrate
the `endReadableNT` tick-dedup change to `lib/internal/streams/readable.js`.

They are **not** published to npm (the package `files` field only ships
`lib`, `LICENSE`, `README.md`).

## `endReadable-ticks.js`

Counts the exact number of `nextTick` hops one unit incurs, broken down by
call site, so the redundant `endReadableNT()` scheduling is visible.

```
node benchmark/endReadable-ticks.js [chunksPerUnit=4]
```

Compare base vs this change (the `lib/` change is the only tracked edit, so it
stashes cleanly while the untracked `benchmark/` dir stays put):

```
git stash push -- lib/internal/streams/readable.js
node benchmark/endReadable-ticks.js # BASE
git stash pop
node benchmark/endReadable-ticks.js # PATCHED
```

Expected (objectMode, 4 chunks):

| | total nextTick hops | `endReadableNT()` scheduled |
|--|--|--|
| base | 15 | 5x |
| patched | 12 | 2x |

The 3 removed hops are duplicate `endReadableNT()` ticks (base schedules it 3x
from `flow()` + 2x from `resume_()`; only one ever emits `'end'`). `'end'`
still fires exactly once, on the same tick — behaviour is unchanged.

## `endReadable-cpu.js`

CPU time (from `process.cpuUsage()`, an OS counter, more stable than wall time
on a loaded machine) for a wave of `N` short-lived units.

```
node --expose-gc benchmark/endReadable-cpu.js [units=8000] [chunksPerUnit=4] [reps=15]
```

Run it on both branches as above. A paired A/B run (base and patched loaded
side-by-side in one process, alternating rep-by-rep so shared-box drift
cancels) measured a **median ~4.5–7.7% CPU reduction with the patched build
faster in the large majority of paired reps** on a 2-core machine shared with
other load. The magnitude tracks the tick reduction (~20% fewer teardown
ticks on this shape).
78 changes: 78 additions & 0 deletions benchmark/endReadable-cpu.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
'use strict'

// CPU throughput for a wave of short-lived, flowing PassThrough -> Transform
// units (the endReadableNT teardown-tick shape). Reports CPU time from
// process.cpuUsage() (an OS counter), which is more stable than wall time on a
// loaded/shared machine. Run on the base branch and on this branch to compare:
//
// git stash && node --expose-gc benchmark/endReadable-cpu.js
// git stash pop && node --expose-gc benchmark/endReadable-cpu.js
//
// Args: [units] [chunksPerUnit] [reps]

const { PassThrough, Transform } = require('..')

const N = parseInt(process.argv[2] || '8000', 10)
const K = parseInt(process.argv[3] || '4', 10)
const reps = parseInt(process.argv[4] || '15', 10)

function oneUnit() {
return new Promise((resolve, reject) => {
const src = new PassThrough({ objectMode: true })
const xform = new Transform({
objectMode: true,
transform(c, e, cb) {
cb(null, c)
}
})
let sink = 0
src.pipe(xform)
xform.on('data', (d) => {
sink += (d && d.n) | 0
})
xform.on('end', () => resolve(sink))
xform.on('error', reject)
for (let i = 0; i < K; i++) src.write({ n: i, s: 'term' + i })
src.end()
})
}

async function workload() {
const WAVE = 32
for (let i = 0; i < N; i += WAVE) {
const batch = []
for (let j = i; j < Math.min(i + WAVE, N); j++) batch.push(oneUnit())
await Promise.all(batch)
}
}

async function measure() {
if (global.gc) global.gc()
const c0 = process.cpuUsage()
await workload()
const c1 = process.cpuUsage(c0)
return (c1.user + c1.system) / 1000 // ms of CPU
}

async function main() {
await workload() // warm up
await workload()
const samples = []
for (let r = 0; r < reps; r++) samples.push(await measure())
samples.sort((a, b) => a - b)
const med = samples[samples.length >> 1]
console.log(
JSON.stringify({
units: N,
chunksPerUnit: K,
reps,
cpuMedianMs: +med.toFixed(1),
usPerUnit: +((med * 1000) / N).toFixed(2)
})
)
}

main().catch((e) => {
console.error(e)
process.exit(1)
})
72 changes: 72 additions & 0 deletions benchmark/endReadable-ticks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
'use strict'

// Counts the process.nextTick() hops incurred by ONE short-lived, flowing
// PassThrough -> Transform unit (the shape produced e.g. by a per-request or
// per-file streaming pipeline). It also breaks the hops down by call site so
// the redundant endReadableNT() scheduling is visible.
//
// Run it on the base branch and on this branch:
//
// git stash && node benchmark/endReadable-ticks.js # base
// git stash pop && node benchmark/endReadable-ticks.js # patched
//
// Expected (objectMode, K=4 chunks):
// base : 15 hops total, endReadable scheduled 5x
// patched : 12 hops total, endReadable scheduled 2x
//
// The 3 removed hops are duplicate endReadableNT() ticks; 'end' still fires
// exactly once, on the same tick, so behaviour is unchanged.

const { PassThrough, Transform } = require('..')

const sites = new Map()
let count = 0
const realNextTick = process.nextTick.bind(process)
process.nextTick = function (fn, ...args) {
count++
const stack = new Error().stack
.split('\n')
.slice(2, 5)
.map((l) =>
l
.trim()
.replace(/\(.*streams\//, '(')
.replace(/:\d+\)$/, ')')
)
.join(' <- ')
sites.set(stack, (sites.get(stack) || 0) + 1)
return realNextTick(fn, ...args)
}

const K = parseInt(process.argv[2] || '4', 10)

function oneUnit() {
return new Promise((resolve, reject) => {
const src = new PassThrough({ objectMode: true })
const xform = new Transform({
objectMode: true,
transform(c, e, cb) {
cb(null, c)
}
})
src.pipe(xform)
xform.on('data', () => {})
xform.on('end', resolve)
xform.on('error', reject)
for (let i = 0; i < K; i++) src.write({ n: i, s: 'term' + i })
src.end()
})
}

oneUnit().then(() => {
process.nextTick = realNextTick
let endReadableHops = 0
for (const [site, n] of sites) {
if (site.startsWith('at endReadable (')) endReadableHops += n
}
console.log(`nextTick hops for ONE unit (K=${K} chunks): ${count}`)
console.log(` of which endReadableNT() scheduled by endReadable(): ${endReadableHops}`)
console.log('per call site:')
const rows = [...sites.entries()].sort((a, b) => b[1] - a[1])
for (const [site, n] of rows) console.log(` ${n}x ${site}`)
})
18 changes: 16 additions & 2 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const kCloseEmitted = 1 << 15
const kMultiAwaitDrain = 1 << 16
const kReadingMore = 1 << 17
const kDataEmitted = 1 << 18
// Set while an endReadableNT() nextTick is already pending, so repeated
// read()/resume() calls after end don't schedule duplicate ticks.
const kEndScheduled = 1 << 19

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -141,7 +144,9 @@ ObjectDefineProperties(ReadableState.prototype, {
multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
// If true, a maybeReadMore has been scheduled.
readingMore: makeBitMapDescriptor(kReadingMore),
dataEmitted: makeBitMapDescriptor(kDataEmitted)
dataEmitted: makeBitMapDescriptor(kDataEmitted),
// If true, an endReadableNT tick is already pending.
endScheduled: makeBitMapDescriptor(kEndScheduled)
})
function ReadableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
Expand Down Expand Up @@ -1219,14 +1224,23 @@ function fromList(n, state) {
function endReadable(stream) {
const state = stream._readableState
debug('endReadable', state.endEmitted)
if (!state.endEmitted) {
// On the flowing short-lived path read()/resume() can call endReadable()
// several times after end; endReadableNT only ever emits 'end' once, so
// skip scheduling a duplicate tick when one is already pending.
if (!state.endEmitted && !state.endScheduled) {
state.ended = true
state.endScheduled = true
process.nextTick(endReadableNT, state, stream)
}
}
function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length)

// Clear the scheduled flag now that the tick is running, so that if this
// tick bails out below (e.g. a last-moment unshift raised state.length),
// a later endReadable() can schedule a fresh tick.
state.endScheduled = false

// Check that we didn't get one last unshift.
if (!state.errored && !state.closeEmitted && !state.endEmitted && state.length === 0) {
state.endEmitted = true
Expand Down
Loading