diff --git a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala new file mode 100644 index 00000000000..4dc4bdb9e48 --- /dev/null +++ b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.typed.state.scaladsl + +import org.apache.pekko +import pekko.Done +import pekko.actor.testkit.typed.scaladsl.LogCapturing +import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import pekko.actor.typed.ActorRef +import pekko.actor.typed.Behavior +import pekko.persistence.state.DurableStateStoreRegistry +import pekko.persistence.state.scaladsl.DurableStateUpdateStore +import pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider +import pekko.persistence.typed.PersistenceId + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +object RuntimeDurableStateStoreSpec { + + private object Actor { + sealed trait Command + final case class Save(text: String, replyTo: ActorRef[Done]) extends Command + final case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command + case object Stop extends Command + + def apply(persistenceId: String, store: String): Behavior[Command] = + DurableStateBehavior[Command, String]( + PersistenceId.ofUniqueId(persistenceId), + "", + (state, cmd) => + cmd match { + case Save(text, replyTo) => + Effect.persist(Seq(state, text).filter(_.nonEmpty).mkString("|")).thenRun(_ => replyTo ! Done) + case ShowMeWhatYouGot(replyTo) => + replyTo ! state + Effect.none + case Stop => + Effect.stop() + }) + .withDurableStateStorePluginId(s"$store.state") + .withDurableStateStorePluginConfig(Some(config(store))) + } + + private def config(store: String): Config = + ConfigFactory.parseString(s""" + $store { + state.class = "${classOf[PersistenceTestKitDurableStateStoreProvider].getName}" + } + """) +} + +class RuntimeDurableStateStoreSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { + + import RuntimeDurableStateStoreSpec._ + + "The durable state store plugin" must { + + "be possible to configure at runtime and use in multiple isolated instances" in { + val probe = createTestProbe[Any]() + + { + // one actor in each store with same id + val s1 = spawn(Actor("id1", "store1")) + val s2 = spawn(Actor("id1", "store2")) + s1 ! Actor.Save("s1m1", probe.ref) + probe.receiveMessage() + s2 ! Actor.Save("s2m1", probe.ref) + probe.receiveMessage() + } + + { + def assertStore(store: String, expectedState: String) = { + val durableStateStore = DurableStateStoreRegistry(system) + .durableStateStoreFor[DurableStateUpdateStore[String]](s"$store.state", config(store)) + durableStateStore.getObject("id1").futureValue.value shouldBe Some(expectedState) + } + + assertStore("store1", "s1m1") + assertStore("store2", "s2m1") + } + } + } +} diff --git a/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes new file mode 100644 index 00000000000..b86dd3685cd --- /dev/null +++ b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Add DurableStateBehavior.withDurableStateStorePluginConfig +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.state.scaladsl.DurableStateBehavior.withDurableStateStorePluginConfig") diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala index c74421ab208..332bc62e258 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala @@ -30,6 +30,8 @@ import pekko.persistence.typed.state.internal.InternalProtocol.RecoveryTimeout import pekko.persistence.typed.state.scaladsl.DurableStateBehavior import pekko.util.OptionVal +import com.typesafe.config.ConfigFactory + import org.slf4j.Logger import org.slf4j.MDC @@ -57,7 +59,9 @@ private[pekko] final class BehaviorSetup[C, S]( // Any instead S because adapter may change the type val durableStateStore: DurableStateUpdateStore[Any] = DurableStateStoreRegistry(context.system.toClassic) - .durableStateStoreFor[DurableStateUpdateStore[Any]](settings.durableStateStorePluginId) + .durableStateStoreFor[DurableStateUpdateStore[Any]]( + settings.durableStateStorePluginId, + settings.durableStateStorePluginConfig.getOrElse(ConfigFactory.empty)) def selfClassic: ClassicActorRef = context.self.toClassic diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala index 65061dffe7e..bb6662d07d1 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala @@ -35,6 +35,8 @@ import pekko.persistence.typed.PersistenceId import pekko.persistence.typed.SnapshotAdapter import pekko.persistence.typed.state.scaladsl._ +import com.typesafe.config.Config + import org.slf4j.LoggerFactory @InternalApi @@ -60,6 +62,7 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State]( commandHandler: DurableStateBehavior.CommandHandler[Command, State], loggerClass: Class[?], durableStateStorePluginId: Option[String] = None, + durableStateStorePluginConfig: Option[Config] = None, tag: String = "", snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State], supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, @@ -80,7 +83,11 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State]( case _ => false } if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass) - val settings = DurableStateSettings(ctx.system, durableStateStorePluginId.getOrElse(""), customStashCapacity) + val settings = DurableStateSettings( + ctx.system, + durableStateStorePluginId.getOrElse(""), + durableStateStorePluginConfig, + customStashCapacity) // stashState outside supervise because StashState should survive restarts due to persist failures val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings) @@ -170,6 +177,9 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State]( copy(durableStateStorePluginId = if (id != "") Some(id) else None) } + override def withDurableStateStorePluginConfig(config: Option[Config]): DurableStateBehavior[Command, State] = + copy(durableStateStorePluginConfig = config) + override def withTag(tag: String): DurableStateBehavior[Command, State] = copy(tag = tag) diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala index 33082fcb3df..5071c659a78 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala @@ -34,11 +34,25 @@ import com.typesafe.config.Config system: ActorSystem[?], durableStateStorePluginId: String, customStashCapacity: Option[Int]): DurableStateSettings = - apply(system.settings.config, durableStateStorePluginId, customStashCapacity) + apply(system, durableStateStorePluginId, None, customStashCapacity) + + def apply( + system: ActorSystem[?], + durableStateStorePluginId: String, + durableStateStorePluginConfig: Option[Config], + customStashCapacity: Option[Int]): DurableStateSettings = + apply(system.settings.config, durableStateStorePluginId, durableStateStorePluginConfig, customStashCapacity) + + def apply( + config: Config, + durableStateStorePluginId: String, + customStashCapacity: Option[Int]): DurableStateSettings = + apply(config, durableStateStorePluginId, None, customStashCapacity) def apply( config: Config, durableStateStorePluginId: String, + durableStateStorePluginConfig: Option[Config], customStashCapacity: Option[Int]): DurableStateSettings = { val typedConfig = config.getConfig("pekko.persistence.typed") @@ -54,7 +68,8 @@ import com.typesafe.config.Config val logOnStashing = typedConfig.getBoolean("log-stashing") - val durableStateStoreConfig = durableStateStoreConfigFor(config, durableStateStorePluginId) + val durableStateStoreConfig = + durableStateStoreConfigFor(config, durableStateStorePluginId, durableStateStorePluginConfig) val recoveryTimeout: FiniteDuration = durableStateStoreConfig.getDuration("recovery-timeout", TimeUnit.MILLISECONDS).millis @@ -69,20 +84,26 @@ import com.typesafe.config.Config logOnStashing = logOnStashing, recoveryTimeout, durableStateStorePluginId, + durableStateStorePluginConfig, useContextLoggerForInternalLogging, recurseWhenUnstashingReadOnlyCommands) } - private def durableStateStoreConfigFor(config: Config, pluginId: String): Config = { + private def durableStateStoreConfigFor( + config: Config, + pluginId: String, + pluginConfig: Option[Config]): Config = { + val mergedConfig = pluginConfig.map(_.withFallback(config)).getOrElse(config) + def defaultPluginId = { - val configPath = config.getString("pekko.persistence.state.plugin") + val configPath = mergedConfig.getString("pekko.persistence.state.plugin") Persistence.verifyPluginConfigIsDefined(configPath, "Default DurableStateStore") configPath } val configPath = if (pluginId == "") defaultPluginId else pluginId - Persistence.verifyPluginConfigExists(config, configPath, "DurableStateStore") - config.getConfig(configPath).withFallback(config.getConfig("pekko.persistence.state-plugin-fallback")) + Persistence.verifyPluginConfigExists(mergedConfig, configPath, "DurableStateStore") + mergedConfig.getConfig(configPath).withFallback(mergedConfig.getConfig("pekko.persistence.state-plugin-fallback")) } } @@ -97,6 +118,7 @@ private[pekko] final case class DurableStateSettings( logOnStashing: Boolean, recoveryTimeout: FiniteDuration, durableStateStorePluginId: String, + durableStateStorePluginConfig: Option[Config], useContextLoggerForInternalLogging: Boolean, recurseWhenUnstashingReadOnlyCommands: Boolean) { diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala index 54b3b6003af..4af86d9582e 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala @@ -28,6 +28,10 @@ import pekko.persistence.typed.SnapshotAdapter import pekko.persistence.typed.state.internal import pekko.persistence.typed.state.internal._ import pekko.persistence.typed.state.scaladsl + +import scala.jdk.OptionConverters._ + +import com.typesafe.config.Config import org.jspecify.annotations.Nullable /** @@ -114,6 +118,14 @@ abstract class DurableStateBehavior[Command, State] private[pekko] ( */ def durableStateStorePluginId: String = "" + /** + * Override and define the `DurableStateStore` plugin config that this actor should use instead of the default. + * This is useful when the same plugin class is configured for multiple, isolated stores at runtime. + * + * @since 2.0.0 + */ + def durableStateStorePluginConfig: Optional[Config] = Optional.empty() + /** * The tag that can be used in persistence query. */ @@ -140,7 +152,11 @@ abstract class DurableStateBehavior[Command, State] private[pekko] ( persistenceId, emptyState, (state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[State]], - getClass).withTag(tag).snapshotAdapter(snapshotAdapter()).withDurableStateStorePluginId(durableStateStorePluginId) + getClass) + .withTag(tag) + .snapshotAdapter(snapshotAdapter()) + .withDurableStateStorePluginId(durableStateStorePluginId) + .withDurableStateStorePluginConfig(durableStateStorePluginConfig.toScala) val handler = signalHandler() val behaviorWithSignalHandler = diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala index ec44f3abfab..3a014744bfd 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala @@ -15,6 +15,8 @@ package org.apache.pekko.persistence.typed.state.scaladsl import scala.annotation.tailrec +import com.typesafe.config.Config + import org.apache.pekko import pekko.actor.typed.BackoffSupervisorStrategy import pekko.actor.typed.Behavior @@ -146,6 +148,14 @@ object DurableStateBehavior { */ def withDurableStateStorePluginId(id: String): DurableStateBehavior[Command, State] + /** + * Change the `DurableStateStore` plugin config that this actor should use. + * This is useful when the same plugin class is configured for multiple, isolated stores at runtime. + * + * @since 2.0.0 + */ + def withDurableStateStorePluginConfig(config: Option[Config]): DurableStateBehavior[Command, State] + /** * The tag that can used in persistence query */ diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala b/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala index 64beff8d510..5ff8014ec78 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala @@ -30,6 +30,7 @@ import pekko.persistence.PluginProvider import pekko.persistence.state.scaladsl.DurableStateStore import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory /** * Persistence extension for queries. @@ -67,9 +68,12 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem) configPath } - private def pluginIdOrDefault(pluginId: String): String = { + private def pluginIdOrDefault(pluginId: String): String = + pluginIdOrDefault(pluginId, ConfigFactory.empty) + + private def pluginIdOrDefault(pluginId: String, pluginConfig: Config): String = { val configPath = if (isEmpty(pluginId)) defaultPluginId else pluginId - Persistence.verifyPluginConfigExists(systemConfig, configPath, "DurableStateStore") + Persistence.verifyPluginConfigExists(pluginConfig.withFallback(systemConfig), configPath, "DurableStateStore") configPath } @@ -91,6 +95,17 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem) pluginFor(pluginIdOrDefault(pluginId), pluginConfig(pluginId)).scaladslPlugin.asInstanceOf[T] } + /** + * Scala API: Returns the [[pekko.persistence.state.scaladsl.DurableStateStore]] specified by the given + * configuration entry. The provided `pluginConfig` is used to configure the plugin at runtime, taking + * precedence over the plugin configuration defined in the actor system configuration. + * + * @since 2.0.0 + */ + final def durableStateStoreFor[T <: scaladsl.DurableStateStore[?]](pluginId: String, pluginConfig: Config): T = { + pluginFor(pluginIdOrDefault(pluginId, pluginConfig), pluginConfig).scaladslPlugin.asInstanceOf[T] + } + /** * Java API: Returns the [[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given * configuration entry. @@ -101,4 +116,18 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem) pluginFor(pluginIdOrDefault(pluginId), pluginConfig(pluginId)).javadslPlugin.asInstanceOf[T] } + /** + * Java API: Returns the [[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given + * configuration entry. The provided `pluginConfig` is used to configure the plugin at runtime, taking + * precedence over the plugin configuration defined in the actor system configuration. + * + * @since 2.0.0 + */ + final def getDurableStateStoreFor[T <: javadsl.DurableStateStore[?]]( + @nowarn("msg=never used") clazz: Class[T], // FIXME generic Class could be problematic in Java + pluginId: String, + pluginConfig: Config): T = { + pluginFor(pluginIdOrDefault(pluginId, pluginConfig), pluginConfig).javadslPlugin.asInstanceOf[T] + } + }