Skip to content

perf: optimize stream materializer wiring with HashMap and ArrayList replacements#3062

Open
He-Pin wants to merge 8 commits into
mainfrom
optimize/stream-materializer-wiring
Open

perf: optimize stream materializer wiring with HashMap and ArrayList replacements#3062
He-Pin wants to merge 8 commits into
mainfrom
optimize/stream-materializer-wiring

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 13, 2026

Copy link
Copy Markdown
Member

Motivation

The PhasedFusingActorMaterializer uses an ArrayList with O(n) linear scan to resolve forward wires during graph materialization, and GraphStageIsland uses a Scala List for outConnections which allocates cons cells on every addition. Both create unnecessary overhead for complex stream graphs.

Modification

  1. forwardWires: ArrayList → sorted primitive arrays — Replace ArrayList[ForwardWire] with sorted parallel arrays (Array[Int] keys + Array[ForwardWire] values). Forward wire lookup uses binary search (O(log n)), insert maintains sorted order via arraycopy, removal compacts via arraycopy. Zero java.lang.Integer boxing, zero per-operation allocation.

  2. outConnections: Scala List → ArrayList — Replace List[Connection] (::= prepending) with java.util.ArrayList[Connection] (lazy-init). Eliminates cons cell allocation and enables indexed iteration in onIslandReady().

  3. emptyOutSlots shared constant — In TraversalBuilder.atomic(), stages with no outlets now reuse a shared Array.emptyIntArray instead of allocating new Array[Int](0) each time.

  4. New JMH benchmarks:

    • MaterializerWiringBenchmark — Measures materialization throughput for linear pipelines, broadcast-merge (gradual and immediate), and imported flow topologies at complexity levels 100/500/1000.
    • AsyncBoundaryThroughputBenchmark — Measures cross-island element throughput with 1/3/10 async boundaries.

Result

Forward wire resolution drops from O(n) to O(log n) per lookup using sorted primitive arrays with binary search. At higher graph complexity, this produces significant speedups:

JMH Benchmark: MaterializerWiringBenchmark.broadcast_merge_gradual

Complexity Baseline (main) Optimized Speedup
200 1.211 ± 0.429 ms 1.277 ± 0.704 ms ~0% (noise)
500 4.027 ± 1.153 ms 1.707 ± 0.906 ms 2.36x
1000 5.441 ± 4.872 ms 2.554 ± 1.326 ms 2.13x

Mode: single-shot (ss), 10-20 iterations, 1 fork, JMH 1.37, JDK 21, Apple M-series.

At complexity ≥500 (where O(n²) linear scan cost becomes dominant), materialization is 2.1-2.4x faster.

Control benchmarks (no forward wires)

Benchmark Complexity Baseline Optimized
linear 200 0.652 ± 0.065 ms 0.686 ± 0.090 ms
imported_flow 200 0.749 ± 0.091 ms 0.753 ± 0.085 ms

No regression on graphs without forward wires.

Tests

  • sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec" — 134 tests passed
  • sbt "stream-tests / Test / testOnly *GraphInterpreterSpec *ActorGraphInterpreterSpec" — 22 tests passed
  • sbt "bench-jmh / compile" — passed

References

None - performance optimization identified through code review

…replacements

Motivation:
The PhasedFusingActorMaterializer uses an ArrayList with O(n) linear scan
to resolve forward wires during graph materialization, and GraphStageIsland
uses a Scala List for outConnections which allocates cons cells on every
addition. Both create unnecessary overhead for complex graphs.

Modification:
- Replace forwardWires ArrayList with HashMap<Integer, ForwardWire> keyed
  by toGlobalOffset, giving O(1) lookup and removal in wireIn/wireOut
- Replace outConnections Scala List with ArrayList, eliminating cons cell
  allocation and enabling indexed iteration in onIslandReady
- Add shared emptyOutSlots constant in TraversalBuilder to avoid allocating
  zero-length arrays for stages with no outlets
- Add MaterializerWiringBenchmark (JMH) to measure materialization throughput
  for linear, broadcast-merge, and imported flow graph topologies
- Add AsyncBoundaryThroughputBenchmark (JMH) to measure cross-island element
  throughput with varying numbers of async boundaries

Result:
Forward wire resolution drops from O(n) to O(1) per lookup, outConnections
avoids Scala List overhead, and empty outlet stages reuse a shared array.
New benchmarks provide regression coverage for these code paths.

Tests:
- sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec" - 30 tests passed
- sbt "stream-tests / Test / testOnly *GraphInterpreterSpec *ActorGraphInterpreterSpec" - 112 tests passed
- sbt "bench-jmh / compile" - passed

References:
None - performance optimization identified through code review
@He-Pin He-Pin marked this pull request as draft June 13, 2026 20:02
Motivation:
java.util.HashMap[java.lang.Integer, ForwardWire] introduces autoboxing
on every put/remove. Benchmark files had incorrect license headers
(Akka-derived header on new code).

Modification:
- Replace forwardWires HashMap with sorted parallel arrays
  (Array[Int] keys + Array[ForwardWire] values). Lookup uses
  binary search, insert maintains sorted order, removal compacts
  via arraycopy. Zero boxing, zero per-op allocation.
- Fix benchmark file headers to use standard Apache 2.0 license
  (new files should not have "derived from Akka" line).
- Update MaterializerWiringBenchmark @Param to include higher
  complexity values (500, 1000) where forward wire optimization
  shows measurable improvement.

Result:
At complexity=500, broadcast_merge_gradual materialization is
2.36x faster (4.027ms → 1.707ms). At complexity=1000, 2.13x faster
(5.441ms → 2.554ms). O(n) per-lookup becomes O(log n).

Tests:
- sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec" - 134 passed
- sbt "stream-tests / Test / testOnly *GraphInterpreterSpec *ActorGraphInterpreterSpec" - 22 passed
- JMH MaterializerWiringBenchmark: 2.36x at N=500, 2.13x at N=1000

References:
PR #3062 review feedback
@He-Pin He-Pin marked this pull request as ready for review June 13, 2026 20:51
He-Pin added 2 commits June 14, 2026 04:58
Motivation:
After removing a forward wire from the sorted arrays, the trailing
slot in forwardWireValues still held a reference to the consumed
ForwardWire object, preventing garbage collection.

Modification:
Set forwardWireValues(forwardWireCount) = null after removal and
array compaction.

Result:
Consumed ForwardWire objects become eligible for GC immediately
instead of lingering until the array is reused or discarded.

Tests:
- sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec *GraphInterpreterSpec *ActorGraphInterpreterSpec" - 156 passed

References:
PR #3062 code review
Motivation:
After forward wires are consumed, the arrays and their entries
retained references preventing GC. Similarly, outConnections
ArrayList held references after being copied to finalConnections.

Modification:
- Null out forwardWireValues entries after removal to release
  consumed ForwardWire objects immediately
- Release forwardWireKeys/Values arrays when forwardWireCount
  reaches 0, allowing both arrays to be GC'd
- Set outConnections = null after onIslandReady() copies all
  connections to the shell's finalConnections array

Result:
Reduced memory retention during materialization. ForwardWire
objects and their associated arrays are eligible for GC as
soon as they are consumed, rather than lingering until the
IslandTracking/GraphStageIsland instance is collected.

Tests:
- 156 tests passed (MaterializerSpec, PhasedFusingSpec,
  TraversalBuilderSpec, GraphInterpreterSpec, ActorGraphInterpreterSpec)

References:
PR #3062 code review
@He-Pin He-Pin marked this pull request as draft June 13, 2026 21:04
IntMap's Patricia trie allocates nodes on every updated/removed.
Benchmarked at N=500: sorted arrays 1.707ms vs IntMap 2.537ms (1.49x).
At N=1000: sorted arrays 2.554ms vs IntMap 4.260ms (1.67x).

References:
PR #3062 review discussion
@He-Pin He-Pin marked this pull request as ready for review June 13, 2026 21:16
@He-Pin He-Pin added this to the 2.0.0-M4 milestone Jun 13, 2026
@He-Pin He-Pin requested a review from Copilot June 13, 2026 21:17

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes Pekko Streams graph materialization hot paths by reducing allocation/boxing and improving lookup performance during wiring, and adds JMH benchmarks to measure the impact.

Changes:

  • Replace forward-wire tracking in IslandTracking with sorted parallel arrays + binary search to avoid linear scans and boxing.
  • Replace GraphStageIsland’s outConnections from Scala List to lazily initialized java.util.ArrayList.
  • Reuse shared empty Array[Int] for atomic modules with no outlets; add new JMH benchmarks for wiring and async-boundary throughput.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala Reuse a shared empty Array[Int] for outlet-slot arrays to avoid repeated empty allocations.
stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala Optimize forward-wire resolution and out-connection accumulation to reduce allocations and improve lookup/iteration.
bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala Add JMH benchmark covering materialization throughput across representative graph topologies.
bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala Add JMH benchmark for cross-island throughput; includes a latch sink stage.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

He-Pin and others added 3 commits June 14, 2026 05:23
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
@He-Pin He-Pin added the t:stream Pekko Streams label Jun 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

t:stream Pekko Streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants