Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.duration._

import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.stream.scaladsl._
import pekko.stream.stage._

import com.typesafe.config.ConfigFactory

object AsyncBoundaryThroughputBenchmark {
final val ElementCount = 100 * 1000
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class AsyncBoundaryThroughputBenchmark {

import AsyncBoundaryThroughputBenchmark._

val config = ConfigFactory.parseString(s"""
pekko.stream.materializer.sync-processing-limit = ${Int.MaxValue}
""")

implicit val system: ActorSystem = ActorSystem("AsyncBoundaryThroughputBenchmark", config)

@Param(Array("1", "3", "10"))
var asyncBoundaries = 0

var source: Source[Int, NotUsed] = _
var flow: Flow[Int, Int, NotUsed] = _

@Setup
def setup(): Unit = {
SystemMaterializer(system).materializer
source = Source(1 to ElementCount)
var f: Flow[Int, Int, NotUsed] = Flow[Int]
for (_ <- 1 to asyncBoundaries) {
f = f.map(identity).async
}
flow = f
}

@Benchmark
@OperationsPerInvocation(ElementCount)
def async_boundary_throughput(blackhole: Blackhole): CountDownLatch = {
FusedGraphsBenchmark.blackhole = blackhole
val latch = source
.via(flow)
.toMat(Sink.fromGraph(new JitSafeCompletionLatchInt))(Keep.right)
.run()
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch timed out")
latch
}

@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}
}

class JitSafeCompletionLatchInt extends GraphStageWithMaterializedValue[SinkShape[Int], CountDownLatch] {
val in = Inlet[Int]("JitSafeCompletionLatchInt.in")
override val shape = SinkShape(in)

@nowarn("cat=unused-params")
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, CountDownLatch) = {
val latch = new CountDownLatch(1)
val logic = new GraphStageLogic(shape) with InHandler {
private var count = 0

override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
grab(in) // consume element
count += 1
pull(in)
}
Comment thread
Copilot marked this conversation as resolved.

override def onUpstreamFinish(): Unit = {
FusedGraphsBenchmark.blackhole.consume(count)
latch.countDown()
completeStage()
}
Comment thread
Copilot marked this conversation as resolved.

override def onUpstreamFailure(ex: Throwable): Unit = {
latch.countDown()
failStage(ex)
}
Comment thread
Copilot marked this conversation as resolved.

setHandler(in, this)
}
(logic, latch)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import org.openjdk.jmh.annotations._

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.stream.scaladsl._

object MaterializerWiringBenchmark {

val linearFlowBuilder: Int => RunnableGraph[NotUsed] = numOfOperators => {
var source = Source.single(())
for (_ <- 1 to numOfOperators) {
source = source.map(identity)
}
source.to(Sink.ignore)
}

val broadcastMergeBuilder: Int => RunnableGraph[NotUsed] = numOfJunctions =>
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val broadcast = b.add(Broadcast[Unit](numOfJunctions))
var outlet = broadcast.out(0)
for (i <- 1 until numOfJunctions) {
val merge = b.add(Merge[Unit](2))
outlet ~> merge
broadcast.out(i) ~> merge
outlet = merge.out
}

Source.single(()) ~> broadcast
outlet ~> Sink.ignore
ClosedShape
})

val broadcastMergeImmediateBuilder: Int => RunnableGraph[NotUsed] = numOfJunctions =>
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val broadcast = b.add(Broadcast[Unit](numOfJunctions))
val merge = b.add(Merge[Unit](numOfJunctions))
for (_ <- 0 until numOfJunctions) {
broadcast ~> merge
}

Source.single(()) ~> broadcast
merge ~> Sink.ignore
ClosedShape
})

val importedFlowBuilder: Int => RunnableGraph[NotUsed] = numOfFlows =>
RunnableGraph.fromGraph(GraphDSL.createGraph(Source.single(())) { implicit b => source =>
import GraphDSL.Implicits._
val flow = Flow[Unit].map(identity)
var out: Outlet[Unit] = source.out
for (_ <- 0 until numOfFlows) {
val flowShape = b.add(flow)
out ~> flowShape
out = flowShape.outlet
}
out ~> Sink.ignore
ClosedShape
})
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class MaterializerWiringBenchmark {

import MaterializerWiringBenchmark._

implicit val system: ActorSystem = ActorSystem("MaterializerWiringBenchmark")

var linearFlow: RunnableGraph[NotUsed] = _
var broadcastMerge: RunnableGraph[NotUsed] = _
var broadcastMergeImmediate: RunnableGraph[NotUsed] = _
var importedFlow: RunnableGraph[NotUsed] = _

@Param(Array("100", "500", "1000"))
var complexity = 0

@Setup
def setup(): Unit = {
SystemMaterializer(system).materializer
linearFlow = linearFlowBuilder(complexity)
broadcastMerge = broadcastMergeBuilder(complexity)
broadcastMergeImmediate = broadcastMergeImmediateBuilder(complexity)
importedFlow = importedFlowBuilder(complexity)
}

@Benchmark
def linear(): NotUsed = linearFlow.run()

@Benchmark
def broadcast_merge_gradual(): NotUsed = broadcastMerge.run()

@Benchmark
def broadcast_merge_immediate(): NotUsed = broadcastMergeImmediate.run()

@Benchmark
def imported_flow(): NotUsed = importedFlow.run()

@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,12 @@ private final case class SavedIslandData(

private var segments: java.util.ArrayList[SegmentInfo] = _
private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = _
private var forwardWires: java.util.ArrayList[ForwardWire] = _
// Forward wires stored as sorted parallel arrays by key (global offset).
// Avoids java.lang.Integer boxing, per-operation allocation, and provides
// better cache locality than IntMap's Patricia trie nodes.
private var forwardWireKeys: Array[Int] = _
private var forwardWireValues: Array[ForwardWire] = _
private var forwardWireCount: Int = 0
private var islandStateStack: java.util.ArrayList[SavedIslandData] = _

private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, attributes, materializer, nextIslandName())
Expand Down Expand Up @@ -277,22 +282,26 @@ private final case class SavedIslandData(
currentPhase.assignPort(in, localInSlot, logic)

// Check if there was any forward wiring that has this offset/slot as its target
// First try to find such wiring
var forwardWire: ForwardWire = null
if ((forwardWires ne null) && !forwardWires.isEmpty) {
var i = 0
while (i < forwardWires.size()) {
forwardWire = forwardWires.get(i)
if (forwardWire.toGlobalOffset == currentGlobalOffset) {
if (Debug) println(s" there is a forward wire to this slot $forwardWire")
forwardWires.remove(i)
i = Int.MaxValue // Exit the loop
} else {
forwardWire = null // Didn't found it yet
i += 1
}
}
}
val forwardWire: ForwardWire =
if (forwardWireCount > 0) {
val idx = java.util.Arrays.binarySearch(forwardWireKeys, 0, forwardWireCount, currentGlobalOffset)
if (idx >= 0) {
val fw = forwardWireValues(idx)
forwardWireCount -= 1
if (idx < forwardWireCount) {
System.arraycopy(forwardWireKeys, idx + 1, forwardWireKeys, idx, forwardWireCount - idx)
System.arraycopy(forwardWireValues, idx + 1, forwardWireValues, idx, forwardWireCount - idx)
}
forwardWireValues(forwardWireCount) = null
if (forwardWireCount == 0) {
forwardWireKeys = null
forwardWireValues = null
}
fw
} else null
} else null
if ((forwardWire ne null) && Debug)
println(s" there is a forward wire to this slot $forwardWire")

// If there is a forward wiring we need to resolve it
if (forwardWire ne null) {
Expand Down Expand Up @@ -361,8 +370,9 @@ private final case class SavedIslandData(

// The forward wire tracking data structure is only allocated when needed. Many graphs have no forward wires
// even though it might have islands.
if (forwardWires eq null) {
forwardWires = new java.util.ArrayList[ForwardWire](8)
if (forwardWireKeys eq null) {
forwardWireKeys = new Array[Int](4)
forwardWireValues = new Array[ForwardWire](4)
}

val forwardWire = ForwardWire(
Expand All @@ -373,7 +383,24 @@ private final case class SavedIslandData(
currentPhase)

if (Debug) println(s" wiring is forward, recording $forwardWire")
forwardWires.add(forwardWire)
if (forwardWireCount == forwardWireKeys.length) {
val newLen = forwardWireKeys.length * 2
val newKeys = new Array[Int](newLen)
val newValues = new Array[ForwardWire](newLen)
System.arraycopy(forwardWireKeys, 0, newKeys, 0, forwardWireCount)
System.arraycopy(forwardWireValues, 0, newValues, 0, forwardWireCount)
forwardWireKeys = newKeys
forwardWireValues = newValues
}
val insertPos = java.util.Arrays.binarySearch(forwardWireKeys, 0, forwardWireCount, absoluteOffset)
val pos = if (insertPos < 0) -(insertPos + 1) else insertPos
if (pos < forwardWireCount) {
System.arraycopy(forwardWireKeys, pos, forwardWireKeys, pos + 1, forwardWireCount - pos)
System.arraycopy(forwardWireValues, pos, forwardWireValues, pos + 1, forwardWireCount - pos)
}
forwardWireKeys(pos) = absoluteOffset
forwardWireValues(pos) = forwardWire
forwardWireCount += 1
}

}
Expand Down Expand Up @@ -702,7 +729,7 @@ private[pekko] object GraphStageIsland {

private var connections = new Array[Connection](16)
private var maxConnections = 0
private var outConnections: List[Connection] = Nil
private var outConnections: java.util.ArrayList[Connection] = _
private var fullIslandName: OptionVal[String] = OptionVal.None

val shell = new GraphInterpreterShell(connections = null, logics = null, effectiveAttributes, materializer)
Expand Down Expand Up @@ -742,7 +769,8 @@ private[pekko] object GraphStageIsland {

def outConn(): Connection = {
val connection = new Connection(0, null, null, null, null)
outConnections ::= connection
if (outConnections eq null) outConnections = new util.ArrayList[Connection](4)
outConnections.add(connection)
connection
}

Expand Down Expand Up @@ -800,20 +828,22 @@ private[pekko] object GraphStageIsland {

override def onIslandReady(): Unit = {

val totalConnections = maxConnections + outConnections.size + 1
val outConnSize = if (outConnections ne null) outConnections.size() else 0
val totalConnections = maxConnections + outConnSize + 1
val finalConnections = java.util.Arrays.copyOf(connections, totalConnections)

var i = maxConnections + 1
var outConns = outConnections
var j = 0
while (i < totalConnections) {
val conn = outConns.head
outConns = outConns.tail
val conn = outConnections.get(j)
j += 1
if (conn.inHandler eq null) failOnMissingHandler(conn.inOwner)
else if (conn.outHandler eq null) failOnMissingHandler(conn.outOwner)
finalConnections(i) = conn
conn.id = i
i += 1
}
outConnections = null

shell.connections = finalConnections
shell.logics = logics.toArray(GraphStageIsland.emptyLogicArray)
Expand Down
Loading