diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala new file mode 100644 index 0000000000..d508d074c3 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.annotation.nowarn +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.Blackhole + +import org.apache.pekko +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.scaladsl._ +import pekko.stream.stage._ + +import com.typesafe.config.ConfigFactory + +object AsyncBoundaryThroughputBenchmark { + final val ElementCount = 100 * 1000 +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class AsyncBoundaryThroughputBenchmark { + + import AsyncBoundaryThroughputBenchmark._ + + val config = ConfigFactory.parseString(s""" + pekko.stream.materializer.sync-processing-limit = ${Int.MaxValue} + """) + + implicit val system: ActorSystem = ActorSystem("AsyncBoundaryThroughputBenchmark", config) + + @Param(Array("1", "3", "10")) + var asyncBoundaries = 0 + + var source: Source[Int, NotUsed] = _ + var flow: Flow[Int, Int, NotUsed] = _ + + @Setup + def setup(): Unit = { + SystemMaterializer(system).materializer + source = Source(1 to ElementCount) + var f: Flow[Int, Int, NotUsed] = Flow[Int] + for (_ <- 1 to asyncBoundaries) { + f = f.map(identity).async + } + flow = f + } + + @Benchmark + @OperationsPerInvocation(ElementCount) + def async_boundary_throughput(blackhole: Blackhole): CountDownLatch = { + FusedGraphsBenchmark.blackhole = blackhole + val latch = source + .via(flow) + .toMat(Sink.fromGraph(new JitSafeCompletionLatchInt))(Keep.right) + .run() + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch timed out") + latch + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } +} + +class JitSafeCompletionLatchInt extends GraphStageWithMaterializedValue[SinkShape[Int], CountDownLatch] { + val in = Inlet[Int]("JitSafeCompletionLatchInt.in") + override val shape = SinkShape(in) + + @nowarn("cat=unused-params") + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, CountDownLatch) = { + val latch = new CountDownLatch(1) + val logic = new GraphStageLogic(shape) with InHandler { + private var count = 0 + + override def preStart(): Unit = pull(in) + override def onPush(): Unit = { + grab(in) // consume element + count += 1 + pull(in) + } + + override def onUpstreamFinish(): Unit = { + FusedGraphsBenchmark.blackhole.consume(count) + latch.countDown() + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + latch.countDown() + failStage(ex) + } + + setHandler(in, this) + } + (logic, latch) + } +} diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala new file mode 100644 index 0000000000..00e7d70c8f --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ + +import org.apache.pekko +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.scaladsl._ + +object MaterializerWiringBenchmark { + + val linearFlowBuilder: Int => RunnableGraph[NotUsed] = numOfOperators => { + var source = Source.single(()) + for (_ <- 1 to numOfOperators) { + source = source.map(identity) + } + source.to(Sink.ignore) + } + + val broadcastMergeBuilder: Int => RunnableGraph[NotUsed] = numOfJunctions => + RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val broadcast = b.add(Broadcast[Unit](numOfJunctions)) + var outlet = broadcast.out(0) + for (i <- 1 until numOfJunctions) { + val merge = b.add(Merge[Unit](2)) + outlet ~> merge + broadcast.out(i) ~> merge + outlet = merge.out + } + + Source.single(()) ~> broadcast + outlet ~> Sink.ignore + ClosedShape + }) + + val broadcastMergeImmediateBuilder: Int => RunnableGraph[NotUsed] = numOfJunctions => + RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val broadcast = b.add(Broadcast[Unit](numOfJunctions)) + val merge = b.add(Merge[Unit](numOfJunctions)) + for (_ <- 0 until numOfJunctions) { + broadcast ~> merge + } + + Source.single(()) ~> broadcast + merge ~> Sink.ignore + ClosedShape + }) + + val importedFlowBuilder: Int => RunnableGraph[NotUsed] = numOfFlows => + RunnableGraph.fromGraph(GraphDSL.createGraph(Source.single(())) { implicit b => source => + import GraphDSL.Implicits._ + val flow = Flow[Unit].map(identity) + var out: Outlet[Unit] = source.out + for (_ <- 0 until numOfFlows) { + val flowShape = b.add(flow) + out ~> flowShape + out = flowShape.outlet + } + out ~> Sink.ignore + ClosedShape + }) +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class MaterializerWiringBenchmark { + + import MaterializerWiringBenchmark._ + + implicit val system: ActorSystem = ActorSystem("MaterializerWiringBenchmark") + + var linearFlow: RunnableGraph[NotUsed] = _ + var broadcastMerge: RunnableGraph[NotUsed] = _ + var broadcastMergeImmediate: RunnableGraph[NotUsed] = _ + var importedFlow: RunnableGraph[NotUsed] = _ + + @Param(Array("100", "500", "1000")) + var complexity = 0 + + @Setup + def setup(): Unit = { + SystemMaterializer(system).materializer + linearFlow = linearFlowBuilder(complexity) + broadcastMerge = broadcastMergeBuilder(complexity) + broadcastMergeImmediate = broadcastMergeImmediateBuilder(complexity) + importedFlow = importedFlowBuilder(complexity) + } + + @Benchmark + def linear(): NotUsed = linearFlow.run() + + @Benchmark + def broadcast_merge_gradual(): NotUsed = broadcastMerge.run() + + @Benchmark + def broadcast_merge_immediate(): NotUsed = broadcastMergeImmediate.run() + + @Benchmark + def imported_flow(): NotUsed = importedFlow.run() + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala index 29b6b740c3..0a3861984e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala @@ -198,7 +198,12 @@ private final case class SavedIslandData( private var segments: java.util.ArrayList[SegmentInfo] = _ private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = _ - private var forwardWires: java.util.ArrayList[ForwardWire] = _ + // Forward wires stored as sorted parallel arrays by key (global offset). + // Avoids java.lang.Integer boxing, per-operation allocation, and provides + // better cache locality than IntMap's Patricia trie nodes. + private var forwardWireKeys: Array[Int] = _ + private var forwardWireValues: Array[ForwardWire] = _ + private var forwardWireCount: Int = 0 private var islandStateStack: java.util.ArrayList[SavedIslandData] = _ private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, attributes, materializer, nextIslandName()) @@ -277,22 +282,26 @@ private final case class SavedIslandData( currentPhase.assignPort(in, localInSlot, logic) // Check if there was any forward wiring that has this offset/slot as its target - // First try to find such wiring - var forwardWire: ForwardWire = null - if ((forwardWires ne null) && !forwardWires.isEmpty) { - var i = 0 - while (i < forwardWires.size()) { - forwardWire = forwardWires.get(i) - if (forwardWire.toGlobalOffset == currentGlobalOffset) { - if (Debug) println(s" there is a forward wire to this slot $forwardWire") - forwardWires.remove(i) - i = Int.MaxValue // Exit the loop - } else { - forwardWire = null // Didn't found it yet - i += 1 - } - } - } + val forwardWire: ForwardWire = + if (forwardWireCount > 0) { + val idx = java.util.Arrays.binarySearch(forwardWireKeys, 0, forwardWireCount, currentGlobalOffset) + if (idx >= 0) { + val fw = forwardWireValues(idx) + forwardWireCount -= 1 + if (idx < forwardWireCount) { + System.arraycopy(forwardWireKeys, idx + 1, forwardWireKeys, idx, forwardWireCount - idx) + System.arraycopy(forwardWireValues, idx + 1, forwardWireValues, idx, forwardWireCount - idx) + } + forwardWireValues(forwardWireCount) = null + if (forwardWireCount == 0) { + forwardWireKeys = null + forwardWireValues = null + } + fw + } else null + } else null + if ((forwardWire ne null) && Debug) + println(s" there is a forward wire to this slot $forwardWire") // If there is a forward wiring we need to resolve it if (forwardWire ne null) { @@ -361,8 +370,9 @@ private final case class SavedIslandData( // The forward wire tracking data structure is only allocated when needed. Many graphs have no forward wires // even though it might have islands. - if (forwardWires eq null) { - forwardWires = new java.util.ArrayList[ForwardWire](8) + if (forwardWireKeys eq null) { + forwardWireKeys = new Array[Int](4) + forwardWireValues = new Array[ForwardWire](4) } val forwardWire = ForwardWire( @@ -373,7 +383,24 @@ private final case class SavedIslandData( currentPhase) if (Debug) println(s" wiring is forward, recording $forwardWire") - forwardWires.add(forwardWire) + if (forwardWireCount == forwardWireKeys.length) { + val newLen = forwardWireKeys.length * 2 + val newKeys = new Array[Int](newLen) + val newValues = new Array[ForwardWire](newLen) + System.arraycopy(forwardWireKeys, 0, newKeys, 0, forwardWireCount) + System.arraycopy(forwardWireValues, 0, newValues, 0, forwardWireCount) + forwardWireKeys = newKeys + forwardWireValues = newValues + } + val insertPos = java.util.Arrays.binarySearch(forwardWireKeys, 0, forwardWireCount, absoluteOffset) + val pos = if (insertPos < 0) -(insertPos + 1) else insertPos + if (pos < forwardWireCount) { + System.arraycopy(forwardWireKeys, pos, forwardWireKeys, pos + 1, forwardWireCount - pos) + System.arraycopy(forwardWireValues, pos, forwardWireValues, pos + 1, forwardWireCount - pos) + } + forwardWireKeys(pos) = absoluteOffset + forwardWireValues(pos) = forwardWire + forwardWireCount += 1 } } @@ -702,7 +729,7 @@ private[pekko] object GraphStageIsland { private var connections = new Array[Connection](16) private var maxConnections = 0 - private var outConnections: List[Connection] = Nil + private var outConnections: java.util.ArrayList[Connection] = _ private var fullIslandName: OptionVal[String] = OptionVal.None val shell = new GraphInterpreterShell(connections = null, logics = null, effectiveAttributes, materializer) @@ -742,7 +769,8 @@ private[pekko] object GraphStageIsland { def outConn(): Connection = { val connection = new Connection(0, null, null, null, null) - outConnections ::= connection + if (outConnections eq null) outConnections = new util.ArrayList[Connection](4) + outConnections.add(connection) connection } @@ -800,20 +828,22 @@ private[pekko] object GraphStageIsland { override def onIslandReady(): Unit = { - val totalConnections = maxConnections + outConnections.size + 1 + val outConnSize = if (outConnections ne null) outConnections.size() else 0 + val totalConnections = maxConnections + outConnSize + 1 val finalConnections = java.util.Arrays.copyOf(connections, totalConnections) var i = maxConnections + 1 - var outConns = outConnections + var j = 0 while (i < totalConnections) { - val conn = outConns.head - outConns = outConns.tail + val conn = outConnections.get(j) + j += 1 if (conn.inHandler eq null) failOnMissingHandler(conn.inOwner) else if (conn.outHandler eq null) failOnMissingHandler(conn.outOwner) finalConnections(i) = conn conn.id = i i += 1 } + outConnections = null shell.connections = finalConnections shell.logics = logics.toArray(GraphStageIsland.emptyLogicArray) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala index 980d56c021..d4a7da2074 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala @@ -213,6 +213,8 @@ import pekko.util.OptionVal private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, Attributes.none) + private val emptyOutSlots: Array[Int] = Array.emptyIntArray + /** * INTERNAL API * @@ -269,7 +271,7 @@ import pekko.util.OptionVal val builder = if (module.shape.outlets.isEmpty) { val b = CompletedTraversalBuilder( - traversalSoFar = MaterializeAtomic(module, new Array[Int](module.shape.outlets.size)), + traversalSoFar = MaterializeAtomic(module, emptyOutSlots), inSlots = module.shape.inlets.size, inToOffset = module.shape.inlets.map(in => in -> in.id).toMap, Attributes.none)