From 135043a79e2bc8a384fae2daa1e8b5011a20c3cf Mon Sep 17 00:00:00 2001 From: Domantas Petrauskas Date: Fri, 12 Jun 2026 17:44:52 +0300 Subject: [PATCH 1/2] Runtime plugin configuration for DurableState Motivation: Like EventSourcedBehavior, DurableStateBehavior should allow configuring its persistence plugin at runtime so the same plugin class can back multiple, isolated stores (e.g. the pekko-persistence-r2dbc plugin). Modification: Add DurableStateBehavior.withDurableStateStorePluginConfig (Scala) and an overridable durableStateStorePluginConfig (Java), threaded through DurableStateBehaviorImpl, DurableStateSettings, and BehaviorSetup. Add Config-accepting overloads of DurableStateStoreRegistry.durableStateStoreFor and getDurableStateStoreFor that merge the runtime config ahead of the system config. The recovery-timeout is now resolved from the merged config so a plugin defined only at runtime resolves correctly. Result: DurableStateBehavior can be configured with a runtime plugin Config, and the store registry resolves the plugin using it. Tests: - sbt persistence/mimaReportBinaryIssues persistence-typed/mimaReportBinaryIssues - success - sbt persistence-typed-tests/testOnly (all state.scaladsl and state.javadsl specs) - All tests passed References: Fixes #1798 --- .../RuntimeDurableStateStoreSpec.scala | 92 +++++++++++++++++++ .../durablestate-runtime-config.excludes | 19 ++++ .../typed/state/internal/BehaviorSetup.scala | 6 +- .../internal/DurableStateBehaviorImpl.scala | 12 ++- .../state/internal/DurableStateSettings.scala | 34 +++++-- .../state/javadsl/DurableStateBehavior.scala | 18 +++- .../state/scaladsl/DurableStateBehavior.scala | 10 ++ .../state/DurableStateStoreRegistry.scala | 33 ++++++- 8 files changed, 213 insertions(+), 11 deletions(-) create mode 100644 persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala create mode 100644 persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes diff --git a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala new file mode 100644 index 00000000000..f3a08b78abb --- /dev/null +++ b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.persistence.typed.state.scaladsl + +import org.apache.pekko +import pekko.Done +import pekko.actor.testkit.typed.scaladsl.LogCapturing +import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import pekko.actor.typed.ActorRef +import pekko.actor.typed.Behavior +import pekko.persistence.state.DurableStateStoreRegistry +import pekko.persistence.state.scaladsl.DurableStateUpdateStore +import pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider +import pekko.persistence.typed.PersistenceId + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +object RuntimeDurableStateStoreSpec { + + private object Actor { + sealed trait Command + final case class Save(text: String, replyTo: ActorRef[Done]) extends Command + final case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command + case object Stop extends Command + + def apply(persistenceId: String, store: String): Behavior[Command] = + DurableStateBehavior[Command, String]( + PersistenceId.ofUniqueId(persistenceId), + "", + (state, cmd) => + cmd match { + case Save(text, replyTo) => + Effect.persist(Seq(state, text).filter(_.nonEmpty).mkString("|")).thenRun(_ => replyTo ! Done) + case ShowMeWhatYouGot(replyTo) => + replyTo ! state + Effect.none + case Stop => + Effect.stop() + }) + .withDurableStateStorePluginId(s"$store.state") + .withDurableStateStorePluginConfig(Some(config(store))) + } + + private def config(store: String): Config = + ConfigFactory.parseString(s""" + $store { + state.class = "${classOf[PersistenceTestKitDurableStateStoreProvider].getName}" + } + """) +} + +class RuntimeDurableStateStoreSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { + + import RuntimeDurableStateStoreSpec._ + + "The durable state store plugin" must { + + "be possible to configure at runtime and use in multiple isolated instances" in { + val probe = createTestProbe[Any]() + + { + // one actor in each store with same id + val s1 = spawn(Actor("id1", "store1")) + val s2 = spawn(Actor("id1", "store2")) + s1 ! Actor.Save("s1m1", probe.ref) + probe.receiveMessage() + s2 ! Actor.Save("s2m1", probe.ref) + probe.receiveMessage() + } + + { + def assertStore(store: String, expectedState: String) = { + val durableStateStore = DurableStateStoreRegistry(system) + .durableStateStoreFor[DurableStateUpdateStore[String]](s"$store.state", config(store)) + durableStateStore.getObject("id1").futureValue.value shouldBe Some(expectedState) + } + + assertStore("store1", "s1m1") + assertStore("store2", "s2m1") + } + } + } +} diff --git a/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes new file mode 100644 index 00000000000..b86dd3685cd --- /dev/null +++ b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Add DurableStateBehavior.withDurableStateStorePluginConfig +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.state.scaladsl.DurableStateBehavior.withDurableStateStorePluginConfig") diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala index c74421ab208..332bc62e258 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala @@ -30,6 +30,8 @@ import pekko.persistence.typed.state.internal.InternalProtocol.RecoveryTimeout import pekko.persistence.typed.state.scaladsl.DurableStateBehavior import pekko.util.OptionVal +import com.typesafe.config.ConfigFactory + import org.slf4j.Logger import org.slf4j.MDC @@ -57,7 +59,9 @@ private[pekko] final class BehaviorSetup[C, S]( // Any instead S because adapter may change the type val durableStateStore: DurableStateUpdateStore[Any] = DurableStateStoreRegistry(context.system.toClassic) - .durableStateStoreFor[DurableStateUpdateStore[Any]](settings.durableStateStorePluginId) + .durableStateStoreFor[DurableStateUpdateStore[Any]]( + settings.durableStateStorePluginId, + settings.durableStateStorePluginConfig.getOrElse(ConfigFactory.empty)) def selfClassic: ClassicActorRef = context.self.toClassic diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala index 65061dffe7e..bb6662d07d1 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala @@ -35,6 +35,8 @@ import pekko.persistence.typed.PersistenceId import pekko.persistence.typed.SnapshotAdapter import pekko.persistence.typed.state.scaladsl._ +import com.typesafe.config.Config + import org.slf4j.LoggerFactory @InternalApi @@ -60,6 +62,7 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State]( commandHandler: DurableStateBehavior.CommandHandler[Command, State], loggerClass: Class[?], durableStateStorePluginId: Option[String] = None, + durableStateStorePluginConfig: Option[Config] = None, tag: String = "", snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State], supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, @@ -80,7 +83,11 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State]( case _ => false } if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass) - val settings = DurableStateSettings(ctx.system, durableStateStorePluginId.getOrElse(""), customStashCapacity) + val settings = DurableStateSettings( + ctx.system, + durableStateStorePluginId.getOrElse(""), + durableStateStorePluginConfig, + customStashCapacity) // stashState outside supervise because StashState should survive restarts due to persist failures val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings) @@ -170,6 +177,9 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State]( copy(durableStateStorePluginId = if (id != "") Some(id) else None) } + override def withDurableStateStorePluginConfig(config: Option[Config]): DurableStateBehavior[Command, State] = + copy(durableStateStorePluginConfig = config) + override def withTag(tag: String): DurableStateBehavior[Command, State] = copy(tag = tag) diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala index 33082fcb3df..5071c659a78 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala @@ -34,11 +34,25 @@ import com.typesafe.config.Config system: ActorSystem[?], durableStateStorePluginId: String, customStashCapacity: Option[Int]): DurableStateSettings = - apply(system.settings.config, durableStateStorePluginId, customStashCapacity) + apply(system, durableStateStorePluginId, None, customStashCapacity) + + def apply( + system: ActorSystem[?], + durableStateStorePluginId: String, + durableStateStorePluginConfig: Option[Config], + customStashCapacity: Option[Int]): DurableStateSettings = + apply(system.settings.config, durableStateStorePluginId, durableStateStorePluginConfig, customStashCapacity) + + def apply( + config: Config, + durableStateStorePluginId: String, + customStashCapacity: Option[Int]): DurableStateSettings = + apply(config, durableStateStorePluginId, None, customStashCapacity) def apply( config: Config, durableStateStorePluginId: String, + durableStateStorePluginConfig: Option[Config], customStashCapacity: Option[Int]): DurableStateSettings = { val typedConfig = config.getConfig("pekko.persistence.typed") @@ -54,7 +68,8 @@ import com.typesafe.config.Config val logOnStashing = typedConfig.getBoolean("log-stashing") - val durableStateStoreConfig = durableStateStoreConfigFor(config, durableStateStorePluginId) + val durableStateStoreConfig = + durableStateStoreConfigFor(config, durableStateStorePluginId, durableStateStorePluginConfig) val recoveryTimeout: FiniteDuration = durableStateStoreConfig.getDuration("recovery-timeout", TimeUnit.MILLISECONDS).millis @@ -69,20 +84,26 @@ import com.typesafe.config.Config logOnStashing = logOnStashing, recoveryTimeout, durableStateStorePluginId, + durableStateStorePluginConfig, useContextLoggerForInternalLogging, recurseWhenUnstashingReadOnlyCommands) } - private def durableStateStoreConfigFor(config: Config, pluginId: String): Config = { + private def durableStateStoreConfigFor( + config: Config, + pluginId: String, + pluginConfig: Option[Config]): Config = { + val mergedConfig = pluginConfig.map(_.withFallback(config)).getOrElse(config) + def defaultPluginId = { - val configPath = config.getString("pekko.persistence.state.plugin") + val configPath = mergedConfig.getString("pekko.persistence.state.plugin") Persistence.verifyPluginConfigIsDefined(configPath, "Default DurableStateStore") configPath } val configPath = if (pluginId == "") defaultPluginId else pluginId - Persistence.verifyPluginConfigExists(config, configPath, "DurableStateStore") - config.getConfig(configPath).withFallback(config.getConfig("pekko.persistence.state-plugin-fallback")) + Persistence.verifyPluginConfigExists(mergedConfig, configPath, "DurableStateStore") + mergedConfig.getConfig(configPath).withFallback(mergedConfig.getConfig("pekko.persistence.state-plugin-fallback")) } } @@ -97,6 +118,7 @@ private[pekko] final case class DurableStateSettings( logOnStashing: Boolean, recoveryTimeout: FiniteDuration, durableStateStorePluginId: String, + durableStateStorePluginConfig: Option[Config], useContextLoggerForInternalLogging: Boolean, recurseWhenUnstashingReadOnlyCommands: Boolean) { diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala index 54b3b6003af..4af86d9582e 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala @@ -28,6 +28,10 @@ import pekko.persistence.typed.SnapshotAdapter import pekko.persistence.typed.state.internal import pekko.persistence.typed.state.internal._ import pekko.persistence.typed.state.scaladsl + +import scala.jdk.OptionConverters._ + +import com.typesafe.config.Config import org.jspecify.annotations.Nullable /** @@ -114,6 +118,14 @@ abstract class DurableStateBehavior[Command, State] private[pekko] ( */ def durableStateStorePluginId: String = "" + /** + * Override and define the `DurableStateStore` plugin config that this actor should use instead of the default. + * This is useful when the same plugin class is configured for multiple, isolated stores at runtime. + * + * @since 2.0.0 + */ + def durableStateStorePluginConfig: Optional[Config] = Optional.empty() + /** * The tag that can be used in persistence query. */ @@ -140,7 +152,11 @@ abstract class DurableStateBehavior[Command, State] private[pekko] ( persistenceId, emptyState, (state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[State]], - getClass).withTag(tag).snapshotAdapter(snapshotAdapter()).withDurableStateStorePluginId(durableStateStorePluginId) + getClass) + .withTag(tag) + .snapshotAdapter(snapshotAdapter()) + .withDurableStateStorePluginId(durableStateStorePluginId) + .withDurableStateStorePluginConfig(durableStateStorePluginConfig.toScala) val handler = signalHandler() val behaviorWithSignalHandler = diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala index ec44f3abfab..3a014744bfd 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala @@ -15,6 +15,8 @@ package org.apache.pekko.persistence.typed.state.scaladsl import scala.annotation.tailrec +import com.typesafe.config.Config + import org.apache.pekko import pekko.actor.typed.BackoffSupervisorStrategy import pekko.actor.typed.Behavior @@ -146,6 +148,14 @@ object DurableStateBehavior { */ def withDurableStateStorePluginId(id: String): DurableStateBehavior[Command, State] + /** + * Change the `DurableStateStore` plugin config that this actor should use. + * This is useful when the same plugin class is configured for multiple, isolated stores at runtime. + * + * @since 2.0.0 + */ + def withDurableStateStorePluginConfig(config: Option[Config]): DurableStateBehavior[Command, State] + /** * The tag that can used in persistence query */ diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala b/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala index 64beff8d510..5ff8014ec78 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala @@ -30,6 +30,7 @@ import pekko.persistence.PluginProvider import pekko.persistence.state.scaladsl.DurableStateStore import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory /** * Persistence extension for queries. @@ -67,9 +68,12 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem) configPath } - private def pluginIdOrDefault(pluginId: String): String = { + private def pluginIdOrDefault(pluginId: String): String = + pluginIdOrDefault(pluginId, ConfigFactory.empty) + + private def pluginIdOrDefault(pluginId: String, pluginConfig: Config): String = { val configPath = if (isEmpty(pluginId)) defaultPluginId else pluginId - Persistence.verifyPluginConfigExists(systemConfig, configPath, "DurableStateStore") + Persistence.verifyPluginConfigExists(pluginConfig.withFallback(systemConfig), configPath, "DurableStateStore") configPath } @@ -91,6 +95,17 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem) pluginFor(pluginIdOrDefault(pluginId), pluginConfig(pluginId)).scaladslPlugin.asInstanceOf[T] } + /** + * Scala API: Returns the [[pekko.persistence.state.scaladsl.DurableStateStore]] specified by the given + * configuration entry. The provided `pluginConfig` is used to configure the plugin at runtime, taking + * precedence over the plugin configuration defined in the actor system configuration. + * + * @since 2.0.0 + */ + final def durableStateStoreFor[T <: scaladsl.DurableStateStore[?]](pluginId: String, pluginConfig: Config): T = { + pluginFor(pluginIdOrDefault(pluginId, pluginConfig), pluginConfig).scaladslPlugin.asInstanceOf[T] + } + /** * Java API: Returns the [[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given * configuration entry. @@ -101,4 +116,18 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem) pluginFor(pluginIdOrDefault(pluginId), pluginConfig(pluginId)).javadslPlugin.asInstanceOf[T] } + /** + * Java API: Returns the [[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given + * configuration entry. The provided `pluginConfig` is used to configure the plugin at runtime, taking + * precedence over the plugin configuration defined in the actor system configuration. + * + * @since 2.0.0 + */ + final def getDurableStateStoreFor[T <: javadsl.DurableStateStore[?]]( + @nowarn("msg=never used") clazz: Class[T], // FIXME generic Class could be problematic in Java + pluginId: String, + pluginConfig: Config): T = { + pluginFor(pluginIdOrDefault(pluginId, pluginConfig), pluginConfig).javadslPlugin.asInstanceOf[T] + } + } From 54babe6ba7c9a182b512bbab73ab4b04eda067b1 Mon Sep 17 00:00:00 2001 From: Domantas Petrauskas Date: Fri, 12 Jun 2026 17:55:03 +0300 Subject: [PATCH 2/2] Use new-file ASF header for RuntimeDurableStateStoreSpec --- .../scaladsl/RuntimeDurableStateStoreSpec.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala index f3a08b78abb..4dc4bdb9e48 100644 --- a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala +++ b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala @@ -1,10 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * This file is part of the Apache Pekko project, which was derived from Akka. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pekko.persistence.typed.state.scaladsl