Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
Comment thread
pjfanning marked this conversation as resolved.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.typed.state.scaladsl

import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.state.scaladsl.DurableStateUpdateStore
import pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider
import pekko.persistence.typed.PersistenceId

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

object RuntimeDurableStateStoreSpec {

private object Actor {
sealed trait Command
final case class Save(text: String, replyTo: ActorRef[Done]) extends Command
final case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command
case object Stop extends Command

def apply(persistenceId: String, store: String): Behavior[Command] =
DurableStateBehavior[Command, String](
PersistenceId.ofUniqueId(persistenceId),
"",
(state, cmd) =>
cmd match {
case Save(text, replyTo) =>
Effect.persist(Seq(state, text).filter(_.nonEmpty).mkString("|")).thenRun(_ => replyTo ! Done)
case ShowMeWhatYouGot(replyTo) =>
replyTo ! state
Effect.none
case Stop =>
Effect.stop()
})
.withDurableStateStorePluginId(s"$store.state")
.withDurableStateStorePluginConfig(Some(config(store)))
}

private def config(store: String): Config =
ConfigFactory.parseString(s"""
$store {
state.class = "${classOf[PersistenceTestKitDurableStateStoreProvider].getName}"
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Test only covers Scala API

This spec exercises DurableStateBehavior.withDurableStateStorePluginConfig (Scala DSL) and DurableStateStoreRegistry.durableStateStoreFor(pluginId, config) (Scala registry API).

The Java DSL counterpart (DurableStateBehavior.durableStateStorePluginConfig() returning Optional[Config]) and the Java registry API (getDurableStateStoreFor(clazz, pluginId, config)) are untested.

Per CONTRIBUTING.md: "Scala and Java DSL changes must keep API, docs, and tests in parity."

Suggested fix — add a Java DSL spec (e.g., RuntimeDurableStateStoreJavaSpec) that defines a DurableStateBehavior subclass overriding durableStateStorePluginConfig(), and verifies store isolation via getDurableStateStoreFor.

""")
}

class RuntimeDurableStateStoreSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {

import RuntimeDurableStateStoreSpec._

"The durable state store plugin" must {

"be possible to configure at runtime and use in multiple isolated instances" in {
val probe = createTestProbe[Any]()

{
// one actor in each store with same id
val s1 = spawn(Actor("id1", "store1"))
val s2 = spawn(Actor("id1", "store2"))
s1 ! Actor.Save("s1m1", probe.ref)
probe.receiveMessage()
s2 ! Actor.Save("s2m1", probe.ref)
probe.receiveMessage()
}

{
def assertStore(store: String, expectedState: String) = {
val durableStateStore = DurableStateStoreRegistry(system)
.durableStateStoreFor[DurableStateUpdateStore[String]](s"$store.state", config(store))
durableStateStore.getObject("id1").futureValue.value shouldBe Some(expectedState)
}

assertStore("store1", "s1m1")
assertStore("store2", "s2m1")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add DurableStateBehavior.withDurableStateStorePluginConfig
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.state.scaladsl.DurableStateBehavior.withDurableStateStorePluginConfig")
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import pekko.persistence.typed.state.internal.InternalProtocol.RecoveryTimeout
import pekko.persistence.typed.state.scaladsl.DurableStateBehavior
import pekko.util.OptionVal

import com.typesafe.config.ConfigFactory

import org.slf4j.Logger
import org.slf4j.MDC

Expand Down Expand Up @@ -57,7 +59,9 @@ private[pekko] final class BehaviorSetup[C, S](
// Any instead S because adapter may change the type
val durableStateStore: DurableStateUpdateStore[Any] =
DurableStateStoreRegistry(context.system.toClassic)
.durableStateStoreFor[DurableStateUpdateStore[Any]](settings.durableStateStorePluginId)
.durableStateStoreFor[DurableStateUpdateStore[Any]](
settings.durableStateStorePluginId,
settings.durableStateStorePluginConfig.getOrElse(ConfigFactory.empty))

def selfClassic: ClassicActorRef = context.self.toClassic

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.SnapshotAdapter
import pekko.persistence.typed.state.scaladsl._

import com.typesafe.config.Config

import org.slf4j.LoggerFactory

@InternalApi
Expand All @@ -60,6 +62,7 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State](
commandHandler: DurableStateBehavior.CommandHandler[Command, State],
loggerClass: Class[?],
durableStateStorePluginId: Option[String] = None,
durableStateStorePluginConfig: Option[Config] = None,
tag: String = "",
snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State],
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
Expand All @@ -80,7 +83,11 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State](
case _ => false
}
if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
val settings = DurableStateSettings(ctx.system, durableStateStorePluginId.getOrElse(""), customStashCapacity)
val settings = DurableStateSettings(
ctx.system,
durableStateStorePluginId.getOrElse(""),
durableStateStorePluginConfig,
customStashCapacity)

// stashState outside supervise because StashState should survive restarts due to persist failures
val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
Expand Down Expand Up @@ -170,6 +177,9 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State](
copy(durableStateStorePluginId = if (id != "") Some(id) else None)
}

override def withDurableStateStorePluginConfig(config: Option[Config]): DurableStateBehavior[Command, State] =
copy(durableStateStorePluginConfig = config)

override def withTag(tag: String): DurableStateBehavior[Command, State] =
copy(tag = tag)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,25 @@ import com.typesafe.config.Config
system: ActorSystem[?],
durableStateStorePluginId: String,
customStashCapacity: Option[Int]): DurableStateSettings =
apply(system.settings.config, durableStateStorePluginId, customStashCapacity)
apply(system, durableStateStorePluginId, None, customStashCapacity)

def apply(
system: ActorSystem[?],
durableStateStorePluginId: String,
durableStateStorePluginConfig: Option[Config],
customStashCapacity: Option[Int]): DurableStateSettings =
apply(system.settings.config, durableStateStorePluginId, durableStateStorePluginConfig, customStashCapacity)

def apply(
config: Config,
durableStateStorePluginId: String,
customStashCapacity: Option[Int]): DurableStateSettings =
apply(config, durableStateStorePluginId, None, customStashCapacity)

def apply(
config: Config,
durableStateStorePluginId: String,
durableStateStorePluginConfig: Option[Config],
customStashCapacity: Option[Int]): DurableStateSettings = {
val typedConfig = config.getConfig("pekko.persistence.typed")

Expand All @@ -54,7 +68,8 @@ import com.typesafe.config.Config

val logOnStashing = typedConfig.getBoolean("log-stashing")

val durableStateStoreConfig = durableStateStoreConfigFor(config, durableStateStorePluginId)
val durableStateStoreConfig =
durableStateStoreConfigFor(config, durableStateStorePluginId, durableStateStorePluginConfig)
val recoveryTimeout: FiniteDuration =
durableStateStoreConfig.getDuration("recovery-timeout", TimeUnit.MILLISECONDS).millis

Expand All @@ -69,20 +84,26 @@ import com.typesafe.config.Config
logOnStashing = logOnStashing,
recoveryTimeout,
durableStateStorePluginId,
durableStateStorePluginConfig,
useContextLoggerForInternalLogging,
recurseWhenUnstashingReadOnlyCommands)
}

private def durableStateStoreConfigFor(config: Config, pluginId: String): Config = {
private def durableStateStoreConfigFor(
config: Config,
pluginId: String,
pluginConfig: Option[Config]): Config = {
val mergedConfig = pluginConfig.map(_.withFallback(config)).getOrElse(config)

def defaultPluginId = {
val configPath = config.getString("pekko.persistence.state.plugin")
val configPath = mergedConfig.getString("pekko.persistence.state.plugin")
Persistence.verifyPluginConfigIsDefined(configPath, "Default DurableStateStore")
configPath
}

val configPath = if (pluginId == "") defaultPluginId else pluginId
Persistence.verifyPluginConfigExists(config, configPath, "DurableStateStore")
config.getConfig(configPath).withFallback(config.getConfig("pekko.persistence.state-plugin-fallback"))
Persistence.verifyPluginConfigExists(mergedConfig, configPath, "DurableStateStore")
mergedConfig.getConfig(configPath).withFallback(mergedConfig.getConfig("pekko.persistence.state-plugin-fallback"))
}

}
Expand All @@ -97,6 +118,7 @@ private[pekko] final case class DurableStateSettings(
logOnStashing: Boolean,
recoveryTimeout: FiniteDuration,
durableStateStorePluginId: String,
durableStateStorePluginConfig: Option[Config],
useContextLoggerForInternalLogging: Boolean,
recurseWhenUnstashingReadOnlyCommands: Boolean) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import pekko.persistence.typed.SnapshotAdapter
import pekko.persistence.typed.state.internal
import pekko.persistence.typed.state.internal._
import pekko.persistence.typed.state.scaladsl

import scala.jdk.OptionConverters._

import com.typesafe.config.Config
import org.jspecify.annotations.Nullable

/**
Expand Down Expand Up @@ -114,6 +118,14 @@ abstract class DurableStateBehavior[Command, State] private[pekko] (
*/
def durableStateStorePluginId: String = ""

/**
* Override and define the `DurableStateStore` plugin config that this actor should use instead of the default.
* This is useful when the same plugin class is configured for multiple, isolated stores at runtime.
*
* @since 2.0.0
*/
def durableStateStorePluginConfig: Optional[Config] = Optional.empty()

/**
* The tag that can be used in persistence query.
*/
Expand All @@ -140,7 +152,11 @@ abstract class DurableStateBehavior[Command, State] private[pekko] (
persistenceId,
emptyState,
(state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[State]],
getClass).withTag(tag).snapshotAdapter(snapshotAdapter()).withDurableStateStorePluginId(durableStateStorePluginId)
getClass)
.withTag(tag)
.snapshotAdapter(snapshotAdapter())
.withDurableStateStorePluginId(durableStateStorePluginId)
.withDurableStateStorePluginConfig(durableStateStorePluginConfig.toScala)

val handler = signalHandler()
val behaviorWithSignalHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package org.apache.pekko.persistence.typed.state.scaladsl

import scala.annotation.tailrec

import com.typesafe.config.Config

import org.apache.pekko
import pekko.actor.typed.BackoffSupervisorStrategy
import pekko.actor.typed.Behavior
Expand Down Expand Up @@ -146,6 +148,14 @@ object DurableStateBehavior {
*/
def withDurableStateStorePluginId(id: String): DurableStateBehavior[Command, State]

/**
* Change the `DurableStateStore` plugin config that this actor should use.
* This is useful when the same plugin class is configured for multiple, isolated stores at runtime.
*
* @since 2.0.0
*/
def withDurableStateStorePluginConfig(config: Option[Config]): DurableStateBehavior[Command, State]

/**
* The tag that can used in persistence query
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import pekko.persistence.PluginProvider
import pekko.persistence.state.scaladsl.DurableStateStore

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

/**
* Persistence extension for queries.
Expand Down Expand Up @@ -67,9 +68,12 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem)
configPath
}

private def pluginIdOrDefault(pluginId: String): String = {
private def pluginIdOrDefault(pluginId: String): String =
pluginIdOrDefault(pluginId, ConfigFactory.empty)

private def pluginIdOrDefault(pluginId: String, pluginConfig: Config): String = {
val configPath = if (isEmpty(pluginId)) defaultPluginId else pluginId
Persistence.verifyPluginConfigExists(systemConfig, configPath, "DurableStateStore")
Persistence.verifyPluginConfigExists(pluginConfig.withFallback(systemConfig), configPath, "DurableStateStore")
configPath
}

Expand All @@ -91,6 +95,17 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem)
pluginFor(pluginIdOrDefault(pluginId), pluginConfig(pluginId)).scaladslPlugin.asInstanceOf[T]
}

/**
* Scala API: Returns the [[pekko.persistence.state.scaladsl.DurableStateStore]] specified by the given
* configuration entry. The provided `pluginConfig` is used to configure the plugin at runtime, taking
* precedence over the plugin configuration defined in the actor system configuration.
*
* @since 2.0.0

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Parameter name shadows private method

The parameter pluginConfig: Config shadows the private method pluginConfig(pluginId: String) defined at line 76. While the current method body doesn't call the private method (so no compile error), this creates a maintenance trap: a future edit adding pluginConfig(someId) here would get a confusing "Config is not callable" error instead of invoking the private method.

Same concern applies to getDurableStateStoreFor at line 130.

Suggested fix — rename the parameter to avoid the collision:

final def durableStateStoreFor[T <: scaladsl.DurableStateStore[?]](pluginId: String, runtimeConfig: Config): T = {
  pluginFor(pluginIdOrDefault(pluginId, runtimeConfig), runtimeConfig).scaladslPlugin.asInstanceOf[T]
}

*/
final def durableStateStoreFor[T <: scaladsl.DurableStateStore[?]](pluginId: String, pluginConfig: Config): T = {
pluginFor(pluginIdOrDefault(pluginId, pluginConfig), pluginConfig).scaladslPlugin.asInstanceOf[T]
}

/**
* Java API: Returns the [[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given
* configuration entry.
Expand All @@ -101,4 +116,18 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem)
pluginFor(pluginIdOrDefault(pluginId), pluginConfig(pluginId)).javadslPlugin.asInstanceOf[T]
}

/**
* Java API: Returns the [[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given
* configuration entry. The provided `pluginConfig` is used to configure the plugin at runtime, taking
* precedence over the plugin configuration defined in the actor system configuration.
*
* @since 2.0.0
*/
final def getDurableStateStoreFor[T <: javadsl.DurableStateStore[?]](
@nowarn("msg=never used") clazz: Class[T], // FIXME generic Class could be problematic in Java
pluginId: String,
pluginConfig: Config): T = {
pluginFor(pluginIdOrDefault(pluginId, pluginConfig), pluginConfig).javadslPlugin.asInstanceOf[T]
}

}