diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterMapChainBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterMapChainBenchmark.scala new file mode 100644 index 00000000000..39bf031aec8 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterMapChainBenchmark.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ + +import org.apache.pekko +import pekko.stream.impl.fusing.GraphInterpreterSpecKit +import pekko.stream.stage._ +import InterpreterBenchmark.{ GraphDataSink, GraphDataSource } + +/** + * Companion to [[InterpreterBenchmark]]. That benchmark drives a chain of pure identity stages, so + * the only work per element is the interpreter's own dispatch — no user code runs. Real pipelines are + * dominated by `map`/`filter`-style stages whose handlers invoke a user function; in the post-#2986 + * flamegraph that "user code" frame (`push(out, f(grab(in)))`) is ~32% of inclusive samples. + * + * This benchmark fills that gap: each stage applies a user function, so the hot path exercises the + * megamorphic `InHandler.onPush` / `OutHandler.onPull` dispatch plus a real user-code frame. It is the + * representative surface on which any future interpreter optimisation must be measured — the identity + * bench alone cannot tell whether a change helps realistic workloads or only the degenerate case. + */ +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class InterpreterMapChainBenchmark extends GraphInterpreterSpecKit { + import InterpreterMapChainBenchmark._ + + // manual, and not via @Param, because we want @OperationsPerInvocation on our tests + final val data100k: Vector[Int] = (1 to 100000).toVector + + @Param(Array("1", "5", "10")) + var numberOfStages: Int = 0 + + @TearDown(Level.Trial) + def shutdown(): Unit = { + Await.result(system.terminate(), 10.seconds) + } + + /** + * Chain of `map` stages, each applying `_ + 1`. Exercises the chase loop together with virtual + * handler dispatch and a user-code frame on every element — the realistic counterpart to + * [[InterpreterBenchmark.graph_interpreter_100k_elements]]. + */ + @Benchmark + @OperationsPerInvocation(100000) + def map_chain_100k_elements(): Unit = { + new TestSetup { + val maps = Vector.fill(numberOfStages)(new MapStage[Int](_ + 1)) + val source = new GraphDataSource("source", data100k) + val sink = new GraphDataSink[Int]("sink", data100k.size) + + val b = builder(maps: _*).connect(source, maps.head.in).connect(maps.last.out, sink) + + // FIXME: This should not be here, this is pure setup overhead + for (i <- 0 until maps.size - 1) { + b.connect(maps(i).out, maps(i + 1).in) + } + + b.init() + sink.requestOne() + interpreter.execute(Int.MaxValue) + } + } +} + +object InterpreterMapChainBenchmark { + + /** + * Per-instance map stage applying a user function. Per-instance (not a shared singleton) for the + * same reason as [[InterpreterBenchmark.IdentityStage]]: a shared Inlet/Outlet shape would collapse + * the chain and mis-wire the assembly (`Cannot pull port twice`). + */ + final class MapStage[T](f: T => T) extends GraphStage[FlowShape[T, T]] { + val in = Inlet[T]("Map.in") + val out = Outlet[T]("Map.out") + override val shape: FlowShape[T, T] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(out, f(grab(in))) + override def onPull(): Unit = pull(in) + setHandler(in, this) + setHandler(out, this) + } + } +} diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index 2741292ad9e..d5a3e1118cf 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -1046,6 +1046,12 @@ pekko { # compression of common strings in remoting messages, like actor destinations, serializers etc compression { + # Frequency sketch implementation used for heavy hitter detection. + # Options: + # - "count-min-sketch": Original CountMinSketch implementation (legacy, ~128KB per connection) + # - "fast-frequency-sketch": FastFrequencySketch with TinyLFU aging (default, ~4KB per connection) + frequency-sketch-implementation = "fast-frequency-sketch" + actor-refs { # Max number of compressed actor-refs # Note that compression tables are "rolling" (i.e. a new table replaces the old diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala index cb3ef30a9db..870f4057813 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala @@ -278,6 +278,12 @@ private[pekko] object ArterySettings { private[pekko] final val Enabled = ActorRefs.Enabled || Manifests.Enabled + val FrequencySketchImplementation: String = toRootLowerCase(getString("frequency-sketch-implementation")) match { + case "count-min-sketch" => "count-min-sketch" + case "fast-frequency-sketch" => "fast-frequency-sketch" + case other => throw new IllegalArgumentException(s"Unknown frequency-sketch-implementation: $other") + } + object ActorRefs { val config: Config = getConfig("actor-refs") import config._ diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala index 4238af24906..6bfe5b84d02 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala @@ -28,8 +28,54 @@ import pekko.actor.InternalActorRef import pekko.event.Logging import pekko.event.LoggingAdapter import pekko.remote.artery._ +import pekko.util.FastFrequencySketch import pekko.util.OptionVal +/** + * INTERNAL API + * + * Abstraction over frequency sketch implementations used for heavy hitter detection in Artery compression. + */ +private[remote] trait FrequencySketch[T] { + def increment(value: T): Unit + def frequency(value: T): Long +} + +/** + * INTERNAL API + * + * Wrapper around CountMinSketch that implements the FrequencySketch interface. + */ +private[remote] final class CountMinSketchFrequencySketch[T](depth: Int, width: Int, seed: Int) + extends FrequencySketch[T] { + private val cms = new CountMinSketch(depth, width, seed) + + override def increment(value: T): Unit = { + cms.addObjectAndEstimateCount(value, 1) + } + + override def frequency(value: T): Long = { + cms.estimateCount(value) + } +} + +/** + * INTERNAL API + * + * Wrapper around FastFrequencySketch that implements the FrequencySketch interface. + */ +private[remote] final class FastFrequencySketchWrapper[T](capacity: Int) extends FrequencySketch[T] { + private val sketch = FastFrequencySketch[T](capacity) + + override def increment(value: T): Unit = { + sketch.increment(value) + } + + override def frequency(value: T): Long = { + sketch.frequency(value).toLong + } +} + /** * INTERNAL API * Decompress and cause compression advertisements. @@ -352,7 +398,14 @@ private[remote] abstract class InboundCompression[T >: Null]( private[this] var resendCount = 0 private[this] val maxResendCount = 3 - private[this] val cms = new CountMinSketch(16, 1024, System.currentTimeMillis().toInt) + private[this] val frequencySketch: FrequencySketch[T] = settings.FrequencySketchImplementation match { + case "count-min-sketch" => + new CountMinSketchFrequencySketch[T](depth = 16, width = 1024, seed = System.currentTimeMillis().toInt) + case "fast-frequency-sketch" => + new FastFrequencySketchWrapper[T](capacity = settings.ActorRefs.Max) + case other => + throw new IllegalStateException(s"Unknown frequency-sketch-implementation: $other") + } log.debug("Initializing {} for originUid [{}]", Logging.simpleName(getClass), originUid) @@ -442,8 +495,13 @@ private[remote] abstract class InboundCompression[T >: Null]( * Empty keys are omitted. */ def increment(@nowarn("msg=never used") remoteAddress: Address, value: T, n: Long): Unit = { - val count = cms.addObjectAndEstimateCount(value, n) - addAndCheckIfheavyHitterDetected(value, count) + var i = 0 + while (i < n) { + frequencySketch.increment(value) + i += 1 + } + val frequency = frequencySketch.frequency(value) + addAndCheckIfheavyHitterDetected(value, frequency) alive = true } @@ -537,7 +595,7 @@ private[remote] abstract class InboundCompression[T >: Null]( } override def toString = - s"""${Logging.simpleName(getClass)}(countMinSketch: $cms, heavyHitters: $heavyHitters)""" + s"""${Logging.simpleName(getClass)}(frequencySketch: $frequencySketch, heavyHitters: $heavyHitters)""" } diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala index be6e27f5327..192880c4af0 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala @@ -155,45 +155,46 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl * @return `true` if the added item has become a heavy hitter. */ // TODO possibly can be optimised further? (there is a benchmark) - def update(item: T, count: Long): Boolean = - isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway - val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode - val startIndex = hashCode.get & mask - - // We first try to find the slot where an element with an equal hash value is. This is a possible candidate - // for an actual matching entry (unless it is an entry with a colliding hash value). - // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm... - val candidateIndex = findHashIdx(startIndex, hashCode) - - if (candidateIndex == -1) { - // No matching hash value entry is found, so we are sure we don't have this entry yet. - insertKnownNewHeavy(hashCode, item, count) // O(log n + alpha) - true + def update(item: T, count: Long): Boolean = { + val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode + val startIndex = hashCode.get & mask + + // We first try to find the slot where an element with an equal hash value is. This is a possible candidate + // for an actual matching entry (unless it is an entry with a colliding hash value). + // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm... + val candidateIndex = findHashIdx(startIndex, hashCode) + + if (candidateIndex == -1) { + // No matching hash value entry is found, so we are sure we don't have this entry yet. + // Only insert new entries if the weight qualifies as heavy. + isHeavy(count) && { insertKnownNewHeavy(hashCode, item, count); true } + } else { + // We now found, relatively cheaply, the first index where our searched entry *might* be (hashes are equal). + // This is not guaranteed to be the one we are searching for, yet (hash values may collide). + // From this position we can invoke the more costly search which checks actual object equalities. + // With this two step search we avoid equality checks completely for many non-colliding entries. + val actualIdx = findItemIdx(candidateIndex, hashCode, item) + + // usually O(1), worst case O(n) if we need to scan due to hash conflicts + if (actualIdx == -1) { + // So we don't have this entry so far (only a colliding one, it was a false positive from findHashIdx). + // Only insert new entries if the weight qualifies as heavy. + isHeavy(count) && { insertKnownNewHeavy(hashCode, item, count); true } } else { - // We now found, relatively cheaply, the first index where our searched entry *might* be (hashes are equal). - // This is not guaranteed to be the one we are searching for, yet (hash values may collide). - // From this position we can invoke the more costly search which checks actual object equalities. - // With this two step search we avoid equality checks completely for many non-colliding entries. - val actualIdx = findItemIdx(candidateIndex, hashCode, item) - - // usually O(1), worst case O(n) if we need to scan due to hash conflicts - if (actualIdx == -1) { - // So we don't have this entry so far (only a colliding one, it was a false positive from findHashIdx). - insertKnownNewHeavy(hashCode, item, count) // O(1 + log n), we simply replace the current lowest heavy hitter - true - } else { - // The entry exists, let's update it. - updateExistingHeavyHitter(actualIdx, count) - // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before) - false - } + // The entry exists, let's update it. + // Existing heavy hitters always get their weight updated (even if decreased), + // which is needed when using a frequency sketch with periodic reset (aging). + updateExistingHeavyHitter(actualIdx, count) + // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before) + false } - } + } /** * Checks the lowest weight entry in this structure and returns true if the given count is larger than that. In * other words this checks if a new entry can be added as it is larger than the known least weight. + * Note: this only gates insertion of new entries; existing entries always get their weight updated. */ private def isHeavy(count: Long): Boolean = count > lowestHitterWeight @@ -233,15 +234,18 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl /** * Replace existing heavy hitter – give it a new `count` value. This will also restore the heap property, so this * might make a previously lowest hitter no longer be one. + * + * Supports both weight increase and decrease. When using a frequency sketch with periodic reset (aging), + * the estimated frequency can decrease, so the weight must be allowed to go down as well. */ private def updateExistingHeavyHitter(foundHashIndex: Int, count: Long): Unit = { - if (weights(foundHashIndex) > count) - throw new IllegalArgumentException( - s"Weights can be only incremented or kept the same, not decremented. " + - s"Previous weight was [${weights(foundHashIndex)}], attempted to modify it to [$count].") + val oldCount = weights(foundHashIndex) weights(foundHashIndex) = count // we don't need to change `hashCode`, `heapIndex` or `item`, those remain the same - // Position in the heap might have changed as count was incremented - fixHeap(heapIndex(foundHashIndex)) + // Position in the heap might have changed as count was updated + if (count > oldCount) + fixHeap(heapIndex(foundHashIndex)) // weight increased: push down towards children + else if (count < oldCount) + fixHeapUp(heapIndex(foundHashIndex)) // weight decreased: bubble up towards parent } /** @@ -311,6 +315,23 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl } } + /** + * Call this if the weight of an entry at heap node `index` was decremented. Since the weight decreased, + * the element may now be smaller than its parent, so we need to restore the heap property "upwards". + */ + @tailrec + private def fixHeapUp(index: Int): Unit = { + if (index > 0) { + val parentIndex = (index - 1) / 2 + val currentWeight: Long = weights(heap(index)) + val parentWeight: Long = weights(heap(parentIndex)) + if (currentWeight < parentWeight) { + swapHeapNode(index, parentIndex) + fixHeapUp(parentIndex) + } + } + } + /** * Swaps two elements in `heap` array and maintain correct index in `heapIndex`. * diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala index 5c61ebe9e4b..c1128e7f41b 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala @@ -170,6 +170,40 @@ class HeavyHittersSpec extends AnyWordSpecLike with Matchers { hitters.lowestHitterWeight should ===(3) } + "support weight decrease (e.g. from frequency sketch reset)" in { + val hitters = new TopHeavyHitters[String](2) + hitters.update("A", 10) should ===(true) + hitters.update("B", 20) should ===(true) + hitters.lowestHitterWeight should ===(10) + + // Simulate frequency sketch reset: weight decreases + hitters.update("A", 5) should ===(false) + // A is now the lowest hitter + hitters.lowestHitterWeight should ===(5) + + // A new item with weight > 5 should replace A + hitters.update("C", 8) should ===(true) + hitters.iterator.toSet should ===(Set("B", "C")) + } + + "restore heap property upward when weight decreases" in { + val hitters = new TopHeavyHitters[String](4) + hitters.update("A", 10) should ===(true) + hitters.update("B", 20) should ===(true) + hitters.update("C", 30) should ===(true) + hitters.update("D", 40) should ===(true) + hitters.lowestHitterWeight should ===(10) + + // Decrease D's weight significantly - it should bubble up to become the lowest + hitters.update("D", 1) should ===(false) + hitters.lowestHitterWeight should ===(1) + hitters.iterator.toSet should ===(Set("A", "B", "C", "D")) + + // A new item should replace D (the new lowest) + hitters.update("E", 15) should ===(true) + hitters.iterator.toSet should ===(Set("A", "B", "C", "E")) + } + "be disabled with max=0" in { val hitters = new TopHeavyHitters[String](0) hitters.update("A", 10) shouldBe true diff --git a/task_plan.md b/task_plan.md new file mode 100644 index 00000000000..68193a142b1 --- /dev/null +++ b/task_plan.md @@ -0,0 +1,62 @@ +# Task Plan: Fix sbt load warnings and create PR + +## Goal +Fix all sbt load warnings in the Pekko project and create a PR to the main repository. + +## Acceptance Criteria +- [ ] All sbt load warnings are identified +- [ ] All warnings are fixed +- [ ] sbt loads without any warnings +- [ ] PR is created with proper description + +## Phases + +### Phase 1: Identify Warnings +- [x] Run sbt load to capture all warnings +- [x] Document each warning with file location and description +- [x] Categorize warnings by type + +### Phase 2: Fix Warnings +- [x] Fix each warning systematically +- [x] Verify fixes don't break functionality +- [x] Run sbt load again to confirm warnings are gone + +### Phase 3: Create PR +- [x] Create new branch +- [x] Commit changes with proper message +- [x] Push branch and create PR +- [x] Follow PR template from AGENTS.md + +## Current Status +- Phase: 3 (Create PR) - Completed +- Started: 2026-06-01 +- Last Updated: 2026-06-01 +- PR Created: https://github.com/apache/pekko/pull/3029 +- Branch: fix/sbt-load-warnings-clean + +## Warnings Identified +Total: 165 unused keys + +### Warning Categories: +1. `projectInfoVersion` - defined in PekkoBuild.scala:48 and :118 +2. `Pr-validation / fork` - defined in ValidatePullRequest.scala:70 +3. `logManager` - defined in AddLogTimestamps.scala:32 +4. `test / javacOptions` - defined in PekkoBuild.scala:305 +5. `Javaunidoc / unidoc / unidocProjectFilter` - defined in Doc.scala:153 + +### Files to Examine: +- /Users/hepin/IdeaProjects/pekko/project/PekkoBuild.scala +- /Users/hepin/IdeaProjects/pekko/project/ValidatePullRequest.scala +- /Users/hepin/IdeaProjects/pekko/project/AddLogTimestamps.scala +- /Users/hepin/IdeaProjects/pekko/project/Doc.scala + +### Analysis: +These warnings are from sbt's `lintUnused` check, which reports settings that are defined but not used by other settings/tasks. The settings are: + +1. `projectInfoVersion` - Used for linking to API docs (overwrites `project-info.version`) +2. `Pr-validation / fork` - Used to make PR validation fork like regular test running +3. `logManager` - Used to add timestamps to log output when enabled +4. `test / javacOptions` - Used to disable doclint for tests +5. `Javaunidoc / unidoc / unidocProjectFilter` - Used to filter projects for unidoc generation + +These settings are actually useful but sbt's lint check doesn't recognize their usage. The recommended solution is to add them to `Global / excludeLintKeys`. \ No newline at end of file