Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ abstract class BehaviorTestKit[T] {
def expectEffect(expectedEffect: Effect): Unit

/**
* Asserts that the oldest effect is an instance of of class T. Consumes and returns the concrete effect for
* Asserts that the oldest effect is an instance of class T. Consumes and returns the concrete effect for
* further direct assertions.
*/
def expectEffectClass[U <: Effect](effectClass: Class[U]): U
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ abstract class TestInbox[T] {
def receiveMessage(): T

/**
* Assert and remove the the oldest message.
* Assert and remove the oldest message.
*/
def expectMessage(expectedMessage: T): TestInbox[T]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait TestInbox[T] {
def receiveMessage(): T

/**
* Assert and remove the the oldest message.
* Assert and remove the oldest message.
*/
def expectMessage(expectedMessage: T): TestInbox[T]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class UdpConnectedIntegrationSpec extends PekkoSpec("""
clientCommander ! UdpConnected.Send(ByteString("data to trigger fail"), 2)
expectMsg(2)

// when a new server appears at the same port it it should be able to receive
// when a new server appears at the same port it should be able to receive
val serverIncarnation2Handler = TestProbe()
val serverIncarnation2 = bindUdp(serverAddress, serverIncarnation2Handler.ref)
val dataToNewIncarnation = ByteString("Data to new incarnation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class SerializeSpec extends PekkoSpec(SerializationTests.serializeConf) {
ser.serializerFor(classOf[PlainMessage]).getClass should ===(classOf[NoopSerializer])
}

"resolve serializer for message extending class with with binding" in {
"resolve serializer for message extending class with binding" in {
ser.serializerFor(classOf[ExtendedPlainMessage]).getClass should ===(classOf[NoopSerializer])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object SpawnProtocol {
* If `name` is an empty string an anonymous actor (with automatically generated name) will be created.
*
* If the `name` is already taken of an existing actor a unique name will be used by appending a suffix
* to the the `name`. The exact format or value of the suffix is an implementation detail that is
* to the `name`. The exact format or value of the suffix is an implementation detail that is
* undefined. This means that reusing the same name for several actors will not result in
* `InvalidActorNameException`, but it's better to use unique names to begin with.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](

case AskTimeout(outKey, outSeqNr) =>
context.log.debug(
"Message seqNr [{}] sent to worker [{}] timed out. It will be be redelivered.",
"Message seqNr [{}] sent to worker [{}] timed out. It will be redelivered.",
outSeqNr,
outKey)
Behaviors.same
Expand Down
2 changes: 1 addition & 1 deletion actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ pekko {
# If the current runtime does not support virtual thread,
# then the executor configured in "fallback" will be used.
virtual-thread-executor {
#Please set the the underlying pool with system properties below:
#Please set the underlying pool with system properties below:
#jdk.virtualThreadScheduler.parallelism
#jdk.virtualThreadScheduler.maxPoolSize
#jdk.virtualThreadScheduler.minRunnable
Expand Down
2 changes: 1 addition & 1 deletion actor/src/main/scala/org/apache/pekko/actor/Stash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private[pekko] trait StashSupport {
/**
* INTERNAL API.
*
* Clears the stash and and returns all envelopes that have not been unstashed.
* Clears the stash and returns all envelopes that have not been unstashed.
*/
@InternalStableApi
private[pekko] def clearStash(): Vector[Envelope] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ import pekko.util.OptionVal
} else {
// it was from an old timer that was enqueued in mailbox before canceled
log.debug(
"Received timer [{}] from from old generation [{}], expected generation [{}], discarding",
"Received timer [{}] from old generation [{}], expected generation [{}], discarding",
timerMsg.key,
timerMsg.generation,
t.generation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[pekko] (
/**
* Specifies that the ShardedDaemonProcess should run on nodes with a specific role.
* If the role is not specified all nodes in the cluster are used. If the given role does
* not match the role of the current node the the ShardedDaemonProcess will not be started.
* not match the role of the current node the ShardedDaemonProcess will not be started.
*/
def withRole(role: String): ShardedDaemonProcessSettings =
copy(role = Option(role))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ private class ShardingProducerControllerImpl[A: ClassTag](

case AskTimeout(outKey, outSeqNr) =>
context.log.debug(
"Message seqNr [{}] sent to entity [{}] timed out. It will be be redelivered.",
"Message seqNr [{}] sent to entity [{}] timed out. It will be redelivered.",
outSeqNr,
outKey)
Behaviors.same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class ReliableDeliveryShardingSpec
next8.entitiesWithDemand should ===(Set("entity-2"))
next8.bufferedForEntitiesWithoutDemand should ===(Map("entity-1" -> 1))

// when new demand the buffered messages will be be sent
// when new demand the buffered messages will be sent
seq5.producerController ! ProducerControllerImpl.Request(confirmedSeqNr = 5L, requestUpToSeqNr = 10, true, false)
val seq6 = shardingProbe
.fishForMessage(testKit.testKitSettings.dilated(3.seconds), "waiting for buffered msg-6 after renewed demand") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ object ShardCoordinator {
/**
* Shard allocation strategy where start is called by the shard coordinator before any calls to
* rebalance or allocate shard. This can be used if there is any expensive initialization to be done
* that you do not want to to in the constructor as it will happen on every node rather than just
* that you do not want to do in the constructor as it will happen on every node rather than just
* the node that hosts the ShardCoordinator
*/
trait StartableAllocationStrategy extends ShardAllocationStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ConcurrentStartupShardingSpec extends PekkoSpec(ConcurrentStartupShardingS
if (!log.isDebugEnabled)
system.eventStream.publish(Mute(DeadLettersFilter[Any]))

// The intended usage is to start sharding in one (or a few) places when the the ActorSystem
// The intended usage is to start sharding in one (or a few) places when the ActorSystem
// is started and not to do it concurrently from many threads. However, we can do our best and when using
// FJP the Await will create additional threads when needed.
"Concurrent Sharding startup" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object PersistentShardingMigrationSpec {
remember-entities-store = "eventsourced"

# this forces the remembered entity store to use persistence
# is is deprecated
# is deprecated
state-store-mode = "persistence"

# make sure we test snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ object ClusterSingletonManager {
case object HandOverDone extends ClusterSingletonMessage

/**
* Sent from from previous oldest to new oldest to
* Sent from previous oldest to new oldest to
* initiate the normal hand-over process.
* Especially useful when new node joins and becomes
* oldest immediately, without knowing who was previous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* `responseAdapter` function.
*
* Note that `createRequest` is a function that creates the `Update` message from the provided
* `ActorRef[UpdateResponse]` that the the replicator will send the response message back through.
* `ActorRef[UpdateResponse]` that the replicator will send the response message back through.
* Use that `ActorRef[UpdateResponse]` as the `replyTo` parameter in the `Update` message.
*/
def askUpdate(
Expand All @@ -120,7 +120,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* `responseAdapter` function.
*
* Note that `createRequest` is a function that creates the `Get` message from the provided
* `ActorRef[GetResponse]` that the the replicator will send the response message back through.
* `ActorRef[GetResponse]` that the replicator will send the response message back through.
* Use that `ActorRef[GetResponse]` as the `replyTo` parameter in the `Get` message.
*/
@nowarn
Expand All @@ -140,7 +140,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* `responseAdapter` function.
*
* Note that `createRequest` is a function that creates the `Delete` message from the provided
* `ActorRef[DeleteResponse]` that the the replicator will send the response message back through.
* `ActorRef[DeleteResponse]` that the replicator will send the response message back through.
* Use that `ActorRef[DeleteResponse]` as the `replyTo` parameter in the `Delete` message.
*/
def askDelete(
Expand All @@ -159,7 +159,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* `responseAdapter` function.
*
* Note that `createRequest` is a function that creates the `GetReplicaCount` message from the provided
* `ActorRef[ReplicaCount]` that the the replicator will send the response message back through.
* `ActorRef[ReplicaCount]` that the replicator will send the response message back through.
* Use that `ActorRef[ReplicaCount]` as the `replyTo` parameter in the `GetReplicaCount` message.
*/
def askReplicaCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* `responseAdapter` function.
*
* Note that `createRequest` is a function that creates the `Update` message from the provided
* `ActorRef[UpdateResponse]` that the the replicator will send the response message back through.
* `ActorRef[UpdateResponse]` that the replicator will send the response message back through.
* Use that `ActorRef[UpdateResponse]` as the `replyTo` parameter in the `Update` message.
*/
def askUpdate(
Expand All @@ -123,7 +123,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* `responseAdapter` function.
*
* Note that `createRequest` is a function that creates the `Get` message from the provided
* `ActorRef[GetResponse]` that the the replicator will send the response message back through.
* `ActorRef[GetResponse]` that the replicator will send the response message back through.
* Use that `ActorRef[GetResponse]` as the `replyTo` parameter in the `Get` message.
*/
def askGet(
Expand All @@ -141,7 +141,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* `responseAdapter` function.
*
* Note that `createRequest` is a function that creates the `Delete` message from the provided
* `ActorRef[DeleteResponse]` that the the replicator will send the response message back through.
* `ActorRef[DeleteResponse]` that the replicator will send the response message back through.
* Use that `ActorRef[DeleteResponse]` as the `replyTo` parameter in the `Delete` message.
*/
def askDelete(
Expand All @@ -160,7 +160,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* `responseAdapter` function.
*
* Note that `createRequest` is a function that creates the `GetReplicaCount` message from the provided
* `ActorRef[ReplicaCount]` that the the replicator will send the response message back through.
* `ActorRef[ReplicaCount]` that the replicator will send the response message back through.
* Use that `ActorRef[ReplicaCount]` as the `replyTo` parameter in the `GetReplicaCount` message.
*/
def askReplicaCount(
Expand Down
4 changes: 2 additions & 2 deletions cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
/**
* The supplied thunk will be run, once, when current cluster member is `Removed`.
* If the cluster has already been shutdown the thunk will run on the caller thread immediately.
* If this is called "at the same time" as `shutdown()` there is a possibility that the the thunk
* If this is called "at the same time" as `shutdown()` there is a possibility that the thunk
* is not invoked. It's often better to use [[pekko.actor.CoordinatedShutdown]] for this purpose.
*/
def registerOnMemberRemoved[T](code: => T): Unit =
Expand All @@ -461,7 +461,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
/**
* Java API: The supplied thunk will be run, once, when current cluster member is `Removed`.
* If the cluster has already been shutdown the thunk will run on the caller thread immediately.
* If this is called "at the same time" as `shutdown()` there is a possibility that the the thunk
* If this is called "at the same time" as `shutdown()` there is a possibility that the thunk
* is not invoked. It's often better to use [[pekko.actor.CoordinatedShutdown]] for this purpose.
*/
def registerOnMemberRemoved(callback: Runnable): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ import pekko.coordination.lease.scaladsl.Lease
* INTERNAL API
*
* Down the unreachable nodes if the current node is in the majority part based the last known
* membership information. Otherwise down the reachable nodes, i.e. the own part. If the the
* membership information. Otherwise down the reachable nodes, i.e. the own part. If the
* parts are of equal size the part containing the node with the lowest address is kept.
*
* If the `role` is defined the decision is based only on members with that `role`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class JoinConfigCompatCheckerSpec extends PekkoSpec with ClusterTestKit {
"be allowed to join a cluster when its configuration is incompatible but it's configured to NOT enforce it" taggedAs
LongRunningTest in {
// this config is NOT compatible with the cluster config,
// but node will ignore the the config check and join anyway
// but node will ignore the config check and join anyway
val joinNodeConfig =
ConfigFactory.parseString("""
pekko.cluster {
Expand Down Expand Up @@ -523,7 +523,7 @@ class JoinConfigCompatCheckerSpec extends PekkoSpec with ClusterTestKit {
"be allowed to re-join a cluster when its configuration is incompatible but it's configured to NOT enforce it" taggedAs
LongRunningTest in {
// this config is NOT compatible with the cluster config,
// but node will ignore the the config check and join anyway
// but node will ignore the config check and join anyway
val joinNodeConfig =
ConfigFactory.parseString("""
pekko.cluster {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ReachabilityPerfSpec extends AnyWordSpec with Matchers {

private def recordsFrom(r1: Reachability): Unit = {
r1.allObservers.foreach { o =>
r1.recordsFrom(o) should not be be(null)
r1.recordsFrom(o) should not be null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class DnsDiscoverySpec extends DockerBindDnsService(DnsDiscoverySpec.config) {
}

"be using its own resolver" in {
// future will fail if it it doesn't exist
// future will fail if it doesn't exist
system.actorSelection("/system/SD-DNS/async-dns").resolveOne(2.seconds).futureValue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ object Replicator {
*
* For good introduction to the CRDT subject watch the
* <a href="https://www.infoq.com/presentations/CRDT/">Eventually Consistent Data Structures</a>
* talk by Sean Cribbs and and the
* talk by Sean Cribbs and the
* <a href="https://www.microsoft.com/en-us/research/video/strong-eventual-consistency-and-conflict-free-replicated-data-types/">talk by Mark Shapiro</a>
* and read the excellent paper <a href="https://hal.inria.fr/file/index/docid/555588/filename/techreport.pdf">
* A comprehensive study of Convergent and Commutative Replicated Data Types</a>
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/discovery/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Pekko Discovery with DNS does always use the @ref[Pekko-native "async-dns" imple
DNS discovery maps `Lookup` queries as follows:

* `serviceName`, `portName` and `protocol` set: SRV query in the form: `_port._protocol.name` Where the `_`s are added.
* Any query missing any of the fields is mapped to a A/AAAA query for the `serviceName`
* Any query missing any of the fields is mapped to a A/AAAA query for the `serviceName`

The mapping between Pekko service discovery terminology and SRV terminology:

Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/serialization-jackson.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ To use Jackson Serialization, you must add the following dependency in your proj

## Introduction

You find general concepts for for Pekko serialization in the @ref:[Serialization](serialization.md) section.
You find general concepts for Pekko serialization in the @ref:[Serialization](serialization.md) section.
This section describes how to use the Jackson serializer for application specific messages and persistent
events and snapshots.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ will also be sent to all those @apidoc[Sink]s.

**completes** when upstream completes

**cancels** when downstream or or any of the @apidoc[Sink]s cancels
**cancels** when downstream or any of the @apidoc[Sink]s cancels

@@@

Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/stream/stream-refs.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Stream refs utilise normal actor messaging for their transport, and therefore pr

- messages are sent over actor remoting
- which relies on TCP (classic remoting or Artery TCP) or Aeron UDP for basic redelivery mechanisms
- messages are guaranteed to to be in-order
- messages are guaranteed to be in-order
- messages can be lost, however:
- a *dropped demand signal* will be re-delivered automatically (similar to system messages)
- a *dropped element signal* will cause the stream to *fail*
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/typed/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ Reasons for how this can happen:
A lease can be a final backup that means that each shard won't create child entity actors unless it has the lease.

To use a lease for sharding set `pekko.cluster.sharding.use-lease` to the configuration location
of the lease to use. Each shard will try and acquire a lease with with the name `<actor system name>-shard-<type name>-<shard id>` and
of the lease to use. Each shard will try and acquire a lease with the name `<actor system name>-shard-<type name>-<shard id>` and
the owner is set to the `Cluster(system).selfAddress.hostPort`.

If a shard can't acquire a lease it will remain uninitialized so messages for entities it owns will
Expand Down
6 changes: 3 additions & 3 deletions docs/src/main/paradox/typed/cluster-singleton.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ instance will eventually be started.
@@@ warning

Make sure to not use a Cluster downing strategy that may split the cluster into several separate clusters in
case of network problems or system overload (long GC pauses), since that will result in in *multiple Singletons*
case of network problems or system overload (long GC pauses), since that will result in *multiple Singletons*
being started, one in each separate cluster!
See @ref:[Downing](cluster.md#downing).

Expand Down Expand Up @@ -106,7 +106,7 @@ This pattern may seem to be very tempting to use at first, but it has several dr
@@@ warning

Make sure to not use a Cluster downing strategy that may split the cluster into several separate clusters in
case of network problems or system overload (long GC pauses), since that will result in in *multiple Singletons*
case of network problems or system overload (long GC pauses), since that will result in *multiple Singletons*
being started, one in each separate cluster!
See @ref:[Downing](cluster.md#downing).

Expand Down Expand Up @@ -175,7 +175,7 @@ A lease can be a final backup that means that the singleton actor won't be creat
the lease can be acquired.

To use a lease for singleton set `pekko.cluster.singleton.use-lease` to the configuration location
of the lease to use. A lease with with the name `<actor system name>-singleton-<singleton actor path>` is used and
of the lease to use. A lease with the name `<actor system name>-singleton-<singleton actor path>` is used and
the owner is set to the @scala[`Cluster(system).selfAddress.hostPort`]@java[`Cluster.get(system).selfAddress().hostPort()`].

If the cluster singleton manager can't acquire the lease it will keep retrying while it is the oldest node in the cluster.
Expand Down
Loading