perf: optimize stream materializer wiring with HashMap and ArrayList replacements#3062
Open
He-Pin wants to merge 8 commits into
Open
perf: optimize stream materializer wiring with HashMap and ArrayList replacements#3062He-Pin wants to merge 8 commits into
He-Pin wants to merge 8 commits into
Conversation
…replacements Motivation: The PhasedFusingActorMaterializer uses an ArrayList with O(n) linear scan to resolve forward wires during graph materialization, and GraphStageIsland uses a Scala List for outConnections which allocates cons cells on every addition. Both create unnecessary overhead for complex graphs. Modification: - Replace forwardWires ArrayList with HashMap<Integer, ForwardWire> keyed by toGlobalOffset, giving O(1) lookup and removal in wireIn/wireOut - Replace outConnections Scala List with ArrayList, eliminating cons cell allocation and enabling indexed iteration in onIslandReady - Add shared emptyOutSlots constant in TraversalBuilder to avoid allocating zero-length arrays for stages with no outlets - Add MaterializerWiringBenchmark (JMH) to measure materialization throughput for linear, broadcast-merge, and imported flow graph topologies - Add AsyncBoundaryThroughputBenchmark (JMH) to measure cross-island element throughput with varying numbers of async boundaries Result: Forward wire resolution drops from O(n) to O(1) per lookup, outConnections avoids Scala List overhead, and empty outlet stages reuse a shared array. New benchmarks provide regression coverage for these code paths. Tests: - sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec" - 30 tests passed - sbt "stream-tests / Test / testOnly *GraphInterpreterSpec *ActorGraphInterpreterSpec" - 112 tests passed - sbt "bench-jmh / compile" - passed References: None - performance optimization identified through code review
Motivation: java.util.HashMap[java.lang.Integer, ForwardWire] introduces autoboxing on every put/remove. Benchmark files had incorrect license headers (Akka-derived header on new code). Modification: - Replace forwardWires HashMap with sorted parallel arrays (Array[Int] keys + Array[ForwardWire] values). Lookup uses binary search, insert maintains sorted order, removal compacts via arraycopy. Zero boxing, zero per-op allocation. - Fix benchmark file headers to use standard Apache 2.0 license (new files should not have "derived from Akka" line). - Update MaterializerWiringBenchmark @Param to include higher complexity values (500, 1000) where forward wire optimization shows measurable improvement. Result: At complexity=500, broadcast_merge_gradual materialization is 2.36x faster (4.027ms → 1.707ms). At complexity=1000, 2.13x faster (5.441ms → 2.554ms). O(n) per-lookup becomes O(log n). Tests: - sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec" - 134 passed - sbt "stream-tests / Test / testOnly *GraphInterpreterSpec *ActorGraphInterpreterSpec" - 22 passed - JMH MaterializerWiringBenchmark: 2.36x at N=500, 2.13x at N=1000 References: PR #3062 review feedback
Motivation: After removing a forward wire from the sorted arrays, the trailing slot in forwardWireValues still held a reference to the consumed ForwardWire object, preventing garbage collection. Modification: Set forwardWireValues(forwardWireCount) = null after removal and array compaction. Result: Consumed ForwardWire objects become eligible for GC immediately instead of lingering until the array is reused or discarded. Tests: - sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec *GraphInterpreterSpec *ActorGraphInterpreterSpec" - 156 passed References: PR #3062 code review
Motivation: After forward wires are consumed, the arrays and their entries retained references preventing GC. Similarly, outConnections ArrayList held references after being copied to finalConnections. Modification: - Null out forwardWireValues entries after removal to release consumed ForwardWire objects immediately - Release forwardWireKeys/Values arrays when forwardWireCount reaches 0, allowing both arrays to be GC'd - Set outConnections = null after onIslandReady() copies all connections to the shell's finalConnections array Result: Reduced memory retention during materialization. ForwardWire objects and their associated arrays are eligible for GC as soon as they are consumed, rather than lingering until the IslandTracking/GraphStageIsland instance is collected. Tests: - 156 tests passed (MaterializerSpec, PhasedFusingSpec, TraversalBuilderSpec, GraphInterpreterSpec, ActorGraphInterpreterSpec) References: PR #3062 code review
IntMap's Patricia trie allocates nodes on every updated/removed. Benchmarked at N=500: sorted arrays 1.707ms vs IntMap 2.537ms (1.49x). At N=1000: sorted arrays 2.554ms vs IntMap 4.260ms (1.67x). References: PR #3062 review discussion
Contributor
There was a problem hiding this comment.
Pull request overview
This PR optimizes Pekko Streams graph materialization hot paths by reducing allocation/boxing and improving lookup performance during wiring, and adds JMH benchmarks to measure the impact.
Changes:
- Replace forward-wire tracking in
IslandTrackingwith sorted parallel arrays + binary search to avoid linear scans and boxing. - Replace
GraphStageIsland’soutConnectionsfrom ScalaListto lazily initializedjava.util.ArrayList. - Reuse shared empty
Array[Int]for atomic modules with no outlets; add new JMH benchmarks for wiring and async-boundary throughput.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala | Reuse a shared empty Array[Int] for outlet-slot arrays to avoid repeated empty allocations. |
| stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala | Optimize forward-wire resolution and out-connection accumulation to reduce allocations and improve lookup/iteration. |
| bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala | Add JMH benchmark covering materialization throughput across representative graph topologies. |
| bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala | Add JMH benchmark for cross-island throughput; includes a latch sink stage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
The
PhasedFusingActorMaterializeruses anArrayListwith O(n) linear scan to resolve forward wires during graph materialization, andGraphStageIslanduses a ScalaListforoutConnectionswhich allocates cons cells on every addition. Both create unnecessary overhead for complex stream graphs.Modification
forwardWires: ArrayList → sorted primitive arrays — Replace
ArrayList[ForwardWire]with sorted parallel arrays (Array[Int]keys +Array[ForwardWire]values). Forward wire lookup uses binary search (O(log n)), insert maintains sorted order via arraycopy, removal compacts via arraycopy. Zerojava.lang.Integerboxing, zero per-operation allocation.outConnections: Scala List → ArrayList — Replace
List[Connection](::=prepending) withjava.util.ArrayList[Connection](lazy-init). Eliminates cons cell allocation and enables indexed iteration inonIslandReady().emptyOutSlots shared constant — In
TraversalBuilder.atomic(), stages with no outlets now reuse a sharedArray.emptyIntArrayinstead of allocatingnew Array[Int](0)each time.New JMH benchmarks:
MaterializerWiringBenchmark— Measures materialization throughput for linear pipelines, broadcast-merge (gradual and immediate), and imported flow topologies at complexity levels 100/500/1000.AsyncBoundaryThroughputBenchmark— Measures cross-island element throughput with 1/3/10 async boundaries.Result
Forward wire resolution drops from O(n) to O(log n) per lookup using sorted primitive arrays with binary search. At higher graph complexity, this produces significant speedups:
JMH Benchmark: MaterializerWiringBenchmark.broadcast_merge_gradual
Mode: single-shot (ss), 10-20 iterations, 1 fork, JMH 1.37, JDK 21, Apple M-series.
At complexity ≥500 (where O(n²) linear scan cost becomes dominant), materialization is 2.1-2.4x faster.
Control benchmarks (no forward wires)
No regression on graphs without forward wires.
Tests
sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec"— 134 tests passedsbt "stream-tests / Test / testOnly *GraphInterpreterSpec *ActorGraphInterpreterSpec"— 22 tests passedsbt "bench-jmh / compile"— passedReferences
None - performance optimization identified through code review