From 13397882acea68b18eb1c74aab2da6cf2ae5798c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 3 Jan 2025 04:03:56 +0700 Subject: [PATCH 1/8] Add `IShardingMessageAdapter` --- .../IShardingMessageAdapter.cs | 28 +++++++++++++++++++ .../cluster/Akka.Cluster.Sharding/Shard.cs | 9 ++++-- .../Akka.Cluster.Sharding/ShardRegion.cs | 8 ++++-- .../Akka.Cluster.Sharding/ShardingSetup.cs | 26 +++++++++++++++++ 4 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Sharding/IShardingMessageAdapter.cs create mode 100644 src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/IShardingMessageAdapter.cs b/src/contrib/cluster/Akka.Cluster.Sharding/IShardingMessageAdapter.cs new file mode 100644 index 00000000000..9c9da61a2ad --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding/IShardingMessageAdapter.cs @@ -0,0 +1,28 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2025 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Annotations; + +namespace Akka.Cluster.Sharding; + +[InternalApi] +public interface IShardingMessageAdapter +{ + public object Adapt(object message); +} + +[InternalApi] +internal class EmptyMessageAdapter: IShardingMessageAdapter +{ + public static EmptyMessageAdapter Instance { get; } = new (); + + private EmptyMessageAdapter() + { + } + + public object Adapt(object message) => message; +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 42297294512..5c3f3dfef9a 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -963,6 +963,8 @@ public override string ToString() private readonly Lease? _lease; private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used + private readonly IShardingMessageAdapter _messageAdapter; + public ILoggingAdapter Log { get; } = Context.GetLogger(); public IStash Stash { get; set; } = null!; public ITimerScheduler Timers { get; set; } = null!; @@ -1017,6 +1019,9 @@ public Shard( _leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval; } + + var setup = Context.System.Settings.Setup.Get(); + _messageAdapter = setup.HasValue ? setup.Value.MessageAdapter : EmptyMessageAdapter.Instance; } protected override SupervisorStrategy SupervisorStrategy() @@ -1971,7 +1976,7 @@ private void AppendToMessageBuffer(EntityId id, object msg, IActorRef snd) if (Log.IsDebugEnabled) Log.Debug("{0}: Message of type [{1}] for entity [{2}] buffered", _typeName, msg.GetType().Name, id); - _messageBuffers.Append(id, msg, snd); + _messageBuffers.Append(id, _messageAdapter.Adapt(msg), snd); } } @@ -1994,7 +1999,7 @@ private void SendMsgBuffer(EntityId entityId) // and as the child exists, the message will be directly forwarded foreach (var (message, @ref) in messages) { - if (message is ShardRegion.StartEntity se) + if (WrappedMessage.Unwrap(message) is ShardRegion.StartEntity se) StartEntity(se.EntityId, @ref); else DeliverMessage(entityId, message, @ref); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 27d96ef0504..68a3ea296d1 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -430,6 +430,7 @@ internal static Props ProxyProps( private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); private readonly TaskCompletionSource _gracefulShutdownProgress = new(); + private readonly IShardingMessageAdapter _messageAdapter; /// /// TBD @@ -464,6 +465,9 @@ public ShardRegion( _initRegistrationDelay = TimeSpan.FromMilliseconds(100).Max(new TimeSpan(_retryInterval.Ticks / 2 / 2 / 2)); _nextRegistrationDelay = _initRegistrationDelay; + var setup = Context.System.Settings.Setup.Get(); + _messageAdapter = setup.HasValue ? setup.Value.MessageAdapter : EmptyMessageAdapter.Instance; + SetupCoordinatedShutdown(); } @@ -812,7 +816,7 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender) } else { - _shardBuffers.Append(shardId, message, sender); + _shardBuffers.Append(shardId, _messageAdapter.Adapt(message), sender); // log some insight to how buffers are filled up every 10% of the buffer capacity var total = totalBufferSize + 1; @@ -1267,7 +1271,7 @@ private void DeliverBufferedMessages(ShardId shardId, IActorRef receiver) foreach (var (Message, Ref) in buffer) { - if (Message is RestartShard && !receiver.Equals(Self)) + if (WrappedMessage.Unwrap(Message) is RestartShard && !receiver.Equals(Self)) { _log.Debug( "{0}: Dropping buffered message {1}, these are only processed by a local ShardRegion.", diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs new file mode 100644 index 00000000000..1267d578a1e --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs @@ -0,0 +1,26 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2025 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Actor.Setup; +using Akka.Annotations; + +#nullable enable +namespace Akka.Cluster.Sharding; + +[InternalApi] +public class ShardingSetup: Setup +{ + public static ShardingSetup Create(IShardingMessageAdapter messageAdapter) + => new (messageAdapter); + + internal ShardingSetup(IShardingMessageAdapter messageAdapter) + { + MessageAdapter = messageAdapter; + } + + public IShardingMessageAdapter MessageAdapter { get; } +} From 73f2db3914e1de311ddbe4cfcc230b7e3fb70681 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 3 Jan 2025 04:12:19 +0700 Subject: [PATCH 2/8] Use clearer name, update API approval list --- ...Adapter.cs => IShardingBufferedMessageAdapter.cs} | 8 ++++---- src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs | 6 +++--- .../cluster/Akka.Cluster.Sharding/ShardRegion.cs | 6 +++--- .../cluster/Akka.Cluster.Sharding/ShardingSetup.cs | 10 +++++----- ...PISpec.ApproveClusterSharding.DotNet.verified.txt | 12 ++++++++++++ ...reAPISpec.ApproveClusterSharding.Net.verified.txt | 12 ++++++++++++ 6 files changed, 39 insertions(+), 15 deletions(-) rename src/contrib/cluster/Akka.Cluster.Sharding/{IShardingMessageAdapter.cs => IShardingBufferedMessageAdapter.cs} (72%) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/IShardingMessageAdapter.cs b/src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferedMessageAdapter.cs similarity index 72% rename from src/contrib/cluster/Akka.Cluster.Sharding/IShardingMessageAdapter.cs rename to src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferedMessageAdapter.cs index 9c9da61a2ad..39e209aeb55 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/IShardingMessageAdapter.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferedMessageAdapter.cs @@ -10,17 +10,17 @@ namespace Akka.Cluster.Sharding; [InternalApi] -public interface IShardingMessageAdapter +public interface IShardingBufferedMessageAdapter { public object Adapt(object message); } [InternalApi] -internal class EmptyMessageAdapter: IShardingMessageAdapter +internal class EmptyBufferedMessageAdapter: IShardingBufferedMessageAdapter { - public static EmptyMessageAdapter Instance { get; } = new (); + public static EmptyBufferedMessageAdapter Instance { get; } = new (); - private EmptyMessageAdapter() + private EmptyBufferedMessageAdapter() { } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 5c3f3dfef9a..a5b1f62a98e 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -963,7 +963,7 @@ public override string ToString() private readonly Lease? _lease; private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used - private readonly IShardingMessageAdapter _messageAdapter; + private readonly IShardingBufferedMessageAdapter _bufferedMessageAdapter; public ILoggingAdapter Log { get; } = Context.GetLogger(); public IStash Stash { get; set; } = null!; @@ -1021,7 +1021,7 @@ public Shard( } var setup = Context.System.Settings.Setup.Get(); - _messageAdapter = setup.HasValue ? setup.Value.MessageAdapter : EmptyMessageAdapter.Instance; + _bufferedMessageAdapter = setup.HasValue ? setup.Value.BufferedMessageAdapter : EmptyBufferedMessageAdapter.Instance; } protected override SupervisorStrategy SupervisorStrategy() @@ -1976,7 +1976,7 @@ private void AppendToMessageBuffer(EntityId id, object msg, IActorRef snd) if (Log.IsDebugEnabled) Log.Debug("{0}: Message of type [{1}] for entity [{2}] buffered", _typeName, msg.GetType().Name, id); - _messageBuffers.Append(id, _messageAdapter.Adapt(msg), snd); + _messageBuffers.Append(id, _bufferedMessageAdapter.Adapt(msg), snd); } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 68a3ea296d1..6116b3993b7 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -430,7 +430,7 @@ internal static Props ProxyProps( private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); private readonly TaskCompletionSource _gracefulShutdownProgress = new(); - private readonly IShardingMessageAdapter _messageAdapter; + private readonly IShardingBufferedMessageAdapter _bufferedMessageAdapter; /// /// TBD @@ -466,7 +466,7 @@ public ShardRegion( _nextRegistrationDelay = _initRegistrationDelay; var setup = Context.System.Settings.Setup.Get(); - _messageAdapter = setup.HasValue ? setup.Value.MessageAdapter : EmptyMessageAdapter.Instance; + _bufferedMessageAdapter = setup.HasValue ? setup.Value.BufferedMessageAdapter : EmptyBufferedMessageAdapter.Instance; SetupCoordinatedShutdown(); } @@ -816,7 +816,7 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender) } else { - _shardBuffers.Append(shardId, _messageAdapter.Adapt(message), sender); + _shardBuffers.Append(shardId, _bufferedMessageAdapter.Adapt(message), sender); // log some insight to how buffers are filled up every 10% of the buffer capacity var total = totalBufferSize + 1; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs index 1267d578a1e..9be8be73892 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs @@ -14,13 +14,13 @@ namespace Akka.Cluster.Sharding; [InternalApi] public class ShardingSetup: Setup { - public static ShardingSetup Create(IShardingMessageAdapter messageAdapter) - => new (messageAdapter); + public static ShardingSetup Create(IShardingBufferedMessageAdapter bufferedMessageAdapter) + => new (bufferedMessageAdapter); - internal ShardingSetup(IShardingMessageAdapter messageAdapter) + internal ShardingSetup(IShardingBufferedMessageAdapter bufferedMessageAdapter) { - MessageAdapter = messageAdapter; + BufferedMessageAdapter = bufferedMessageAdapter; } - public IShardingMessageAdapter MessageAdapter { get; } + public IShardingBufferedMessageAdapter BufferedMessageAdapter { get; } } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt index d8833b198eb..47fbd6b5b19 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt @@ -215,6 +215,11 @@ namespace Akka.Cluster.Sharding } public interface IShardRegionCommand { } public interface IShardRegionQuery { } + [Akka.Annotations.InternalApiAttribute()] + public interface IShardingBufferedMessageAdapter + { + object Adapt(object message); + } public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy { void Start(); @@ -335,6 +340,13 @@ namespace Akka.Cluster.Sharding public override int GetHashCode() { } public override string ToString() { } } + [Akka.Annotations.InternalApiAttribute()] + [System.Runtime.CompilerServices.NullableAttribute(0)] + public class ShardingSetup : Akka.Actor.Setup.Setup + { + public Akka.Cluster.Sharding.IShardingBufferedMessageAdapter BufferedMessageAdapter { get; } + public static Akka.Cluster.Sharding.ShardingSetup Create(Akka.Cluster.Sharding.IShardingBufferedMessageAdapter bufferedMessageAdapter) { } + } public enum StateStoreMode { Persistence = 0, diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt index 31227d76d30..4311d052a99 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt @@ -215,6 +215,11 @@ namespace Akka.Cluster.Sharding } public interface IShardRegionCommand { } public interface IShardRegionQuery { } + [Akka.Annotations.InternalApiAttribute()] + public interface IShardingBufferedMessageAdapter + { + object Adapt(object message); + } public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy { void Start(); @@ -335,6 +340,13 @@ namespace Akka.Cluster.Sharding public override int GetHashCode() { } public override string ToString() { } } + [Akka.Annotations.InternalApiAttribute()] + [System.Runtime.CompilerServices.NullableAttribute(0)] + public class ShardingSetup : Akka.Actor.Setup.Setup + { + public Akka.Cluster.Sharding.IShardingBufferedMessageAdapter BufferedMessageAdapter { get; } + public static Akka.Cluster.Sharding.ShardingSetup Create(Akka.Cluster.Sharding.IShardingBufferedMessageAdapter bufferedMessageAdapter) { } + } public enum StateStoreMode { Persistence = 0, From d8792fbb1dfc86e91d75042b98576b16f8844773 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 4 Jan 2025 03:10:22 +0700 Subject: [PATCH 3/8] Fix API --- ...eAdapter.cs => IShardingBufferMessageAdapter.cs} | 13 +++++++------ src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs | 6 +++--- .../cluster/Akka.Cluster.Sharding/ShardRegion.cs | 6 +++--- .../cluster/Akka.Cluster.Sharding/ShardingSetup.cs | 10 +++++----- 4 files changed, 18 insertions(+), 17 deletions(-) rename src/contrib/cluster/Akka.Cluster.Sharding/{IShardingBufferedMessageAdapter.cs => IShardingBufferMessageAdapter.cs} (57%) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferedMessageAdapter.cs b/src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferMessageAdapter.cs similarity index 57% rename from src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferedMessageAdapter.cs rename to src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferMessageAdapter.cs index 39e209aeb55..d194bf21663 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferedMessageAdapter.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/IShardingBufferMessageAdapter.cs @@ -5,24 +5,25 @@ // // ----------------------------------------------------------------------- +using Akka.Actor; using Akka.Annotations; namespace Akka.Cluster.Sharding; [InternalApi] -public interface IShardingBufferedMessageAdapter +public interface IShardingBufferMessageAdapter { - public object Adapt(object message); + public object Apply(object message, IActorContext context); } [InternalApi] -internal class EmptyBufferedMessageAdapter: IShardingBufferedMessageAdapter +internal class EmptyBufferMessageAdapter: IShardingBufferMessageAdapter { - public static EmptyBufferedMessageAdapter Instance { get; } = new (); + public static EmptyBufferMessageAdapter Instance { get; } = new (); - private EmptyBufferedMessageAdapter() + private EmptyBufferMessageAdapter() { } - public object Adapt(object message) => message; + public object Apply(object message, IActorContext context) => message; } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index a5b1f62a98e..98581f1e188 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -963,7 +963,7 @@ public override string ToString() private readonly Lease? _lease; private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used - private readonly IShardingBufferedMessageAdapter _bufferedMessageAdapter; + private readonly IShardingBufferMessageAdapter _bufferMessageAdapter; public ILoggingAdapter Log { get; } = Context.GetLogger(); public IStash Stash { get; set; } = null!; @@ -1021,7 +1021,7 @@ public Shard( } var setup = Context.System.Settings.Setup.Get(); - _bufferedMessageAdapter = setup.HasValue ? setup.Value.BufferedMessageAdapter : EmptyBufferedMessageAdapter.Instance; + _bufferMessageAdapter = setup.HasValue ? setup.Value.BufferMessageAdapter : EmptyBufferMessageAdapter.Instance; } protected override SupervisorStrategy SupervisorStrategy() @@ -1976,7 +1976,7 @@ private void AppendToMessageBuffer(EntityId id, object msg, IActorRef snd) if (Log.IsDebugEnabled) Log.Debug("{0}: Message of type [{1}] for entity [{2}] buffered", _typeName, msg.GetType().Name, id); - _messageBuffers.Append(id, _bufferedMessageAdapter.Adapt(msg), snd); + _messageBuffers.Append(id, _bufferMessageAdapter.Apply(msg, Context), snd); } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 6116b3993b7..2fb246c1586 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -430,7 +430,7 @@ internal static Props ProxyProps( private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); private readonly TaskCompletionSource _gracefulShutdownProgress = new(); - private readonly IShardingBufferedMessageAdapter _bufferedMessageAdapter; + private readonly IShardingBufferMessageAdapter _bufferMessageAdapter; /// /// TBD @@ -466,7 +466,7 @@ public ShardRegion( _nextRegistrationDelay = _initRegistrationDelay; var setup = Context.System.Settings.Setup.Get(); - _bufferedMessageAdapter = setup.HasValue ? setup.Value.BufferedMessageAdapter : EmptyBufferedMessageAdapter.Instance; + _bufferMessageAdapter = setup.HasValue ? setup.Value.BufferMessageAdapter : EmptyBufferMessageAdapter.Instance; SetupCoordinatedShutdown(); } @@ -816,7 +816,7 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender) } else { - _shardBuffers.Append(shardId, _bufferedMessageAdapter.Adapt(message), sender); + _shardBuffers.Append(shardId, _bufferMessageAdapter.Apply(message, Context), sender); // log some insight to how buffers are filled up every 10% of the buffer capacity var total = totalBufferSize + 1; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs index 9be8be73892..2b09794eb57 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs @@ -14,13 +14,13 @@ namespace Akka.Cluster.Sharding; [InternalApi] public class ShardingSetup: Setup { - public static ShardingSetup Create(IShardingBufferedMessageAdapter bufferedMessageAdapter) - => new (bufferedMessageAdapter); + public static ShardingSetup Create(IShardingBufferMessageAdapter bufferMessageAdapter) + => new (bufferMessageAdapter); - internal ShardingSetup(IShardingBufferedMessageAdapter bufferedMessageAdapter) + internal ShardingSetup(IShardingBufferMessageAdapter bufferMessageAdapter) { - BufferedMessageAdapter = bufferedMessageAdapter; + BufferMessageAdapter = bufferMessageAdapter; } - public IShardingBufferedMessageAdapter BufferedMessageAdapter { get; } + public IShardingBufferMessageAdapter BufferMessageAdapter { get; } } From b49ab399dd01c898f6580839a7b2fb7ec2fd7a0b Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 4 Jan 2025 04:26:40 +0700 Subject: [PATCH 4/8] Refactor setup class to extension --- src/Directory.Build.props | 10 ++---- .../ClusterShardingBufferAdapter.cs | 34 +++++++++++++++++++ .../cluster/Akka.Cluster.Sharding/Shard.cs | 3 +- .../Akka.Cluster.Sharding/ShardRegion.cs | 3 +- .../Akka.Cluster.Sharding/ShardingSetup.cs | 26 -------------- 5 files changed, 38 insertions(+), 38 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingBufferAdapter.cs delete mode 100644 src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 5ba45dd6962..7f0857c2142 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -2,7 +2,7 @@ Copyright © 2013-2023 Akka.NET Team Akka.NET Team - 1.5.27.1 + 1.5.34 akkalogo.png https://github.com/akkadotnet/akka.net https://github.com/akkadotnet/akka.net/blob/master/LICENSE @@ -47,13 +47,7 @@ true - Akka.NET v1.5.27.1 is a minor patch to fix a race condition between the logging and remoting system. -[Akka: Fix Remoting-Logging DefaultAddress race condition](https://github.com/akkadotnet/akka.net/pull/7305) -To [see the full set of changes in Akka.NET v1.5.27.1, click here](https://github.com/akkadotnet/akka.net/milestone/110). -| COMMITS | LOC+ | LOC- | AUTHOR | -|---------|------|------|---------------------| -| 1 | 4 | 0 | Aaron Stannard | -| 1 | 10 | 3 | Gregorius Soedharmo | + Placeholder for nightlies* diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingBufferAdapter.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingBufferAdapter.cs new file mode 100644 index 00000000000..bd124019a69 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingBufferAdapter.cs @@ -0,0 +1,34 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2025 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Annotations; + +#nullable enable +namespace Akka.Cluster.Sharding; + +[InternalApi] +public class ClusterShardingBufferAdapter : IExtension +{ + public static ClusterShardingBufferAdapter Get(ActorSystem system) + { + return system.WithExtension(); + } + + public IShardingBufferMessageAdapter BufferMessageAdapter { get; private set; } = EmptyBufferMessageAdapter.Instance; + + public void SetShardingBufferMessageAdapter(IShardingBufferMessageAdapter? bufferMessageAdapter) + { + BufferMessageAdapter = bufferMessageAdapter ?? EmptyBufferMessageAdapter.Instance; + } +} + +[InternalApi] +public sealed class ClusterShardingBufferAdapterExtensionProvider : ExtensionIdProvider +{ + public override ClusterShardingBufferAdapter CreateExtension(ExtendedActorSystem system) => new (); +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 98581f1e188..916bb003731 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -1020,8 +1020,7 @@ public Shard( _leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval; } - var setup = Context.System.Settings.Setup.Get(); - _bufferMessageAdapter = setup.HasValue ? setup.Value.BufferMessageAdapter : EmptyBufferMessageAdapter.Instance; + _bufferMessageAdapter = ClusterShardingBufferAdapter.Get(Context.System).BufferMessageAdapter; } protected override SupervisorStrategy SupervisorStrategy() diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 2fb246c1586..623e22a48b1 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -465,8 +465,7 @@ public ShardRegion( _initRegistrationDelay = TimeSpan.FromMilliseconds(100).Max(new TimeSpan(_retryInterval.Ticks / 2 / 2 / 2)); _nextRegistrationDelay = _initRegistrationDelay; - var setup = Context.System.Settings.Setup.Get(); - _bufferMessageAdapter = setup.HasValue ? setup.Value.BufferMessageAdapter : EmptyBufferMessageAdapter.Instance; + _bufferMessageAdapter = ClusterShardingBufferAdapter.Get(Context.System).BufferMessageAdapter; SetupCoordinatedShutdown(); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs deleted file mode 100644 index 2b09794eb57..00000000000 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs +++ /dev/null @@ -1,26 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2009-2025 Lightbend Inc. -// Copyright (C) 2013-2025 .NET Foundation -// -// ----------------------------------------------------------------------- - -using Akka.Actor.Setup; -using Akka.Annotations; - -#nullable enable -namespace Akka.Cluster.Sharding; - -[InternalApi] -public class ShardingSetup: Setup -{ - public static ShardingSetup Create(IShardingBufferMessageAdapter bufferMessageAdapter) - => new (bufferMessageAdapter); - - internal ShardingSetup(IShardingBufferMessageAdapter bufferMessageAdapter) - { - BufferMessageAdapter = bufferMessageAdapter; - } - - public IShardingBufferMessageAdapter BufferMessageAdapter { get; } -} From 4f63a7fd1d25bedf55981341628f9436b1dc450b Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 6 Jan 2025 21:14:49 +0700 Subject: [PATCH 5/8] Refactor extension, merge the property to ClusterSharding extension --- .../Akka.Cluster.Sharding/ClusterSharding.cs | 10 ++++++ .../ClusterShardingBufferAdapter.cs | 34 ------------------- .../cluster/Akka.Cluster.Sharding/Shard.cs | 2 +- .../Akka.Cluster.Sharding/ShardRegion.cs | 2 +- 4 files changed, 12 insertions(+), 36 deletions(-) delete mode 100644 src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingBufferAdapter.cs diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index 02deaaa83ad..54c4238d986 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -16,6 +16,7 @@ using System.Threading.Tasks; using Akka.Actor; +using Akka.Annotations; using Akka.Cluster.Tools.Singleton; using Akka.Configuration; using Akka.Dispatch; @@ -366,6 +367,9 @@ public ClusterSharding(ExtendedActorSystem system) /// public ClusterShardingSettings Settings { get; } + [InternalApi] + public IShardingBufferMessageAdapter BufferMessageAdapter { get; private set; } = EmptyBufferMessageAdapter.Instance; + /// /// Default HOCON settings for cluster sharding. /// @@ -376,6 +380,12 @@ public static Config DefaultConfig() .WithFallback(DistributedData.DistributedData.DefaultConfig()); } + [InternalApi] + public void SetShardingBufferMessageAdapter(IShardingBufferMessageAdapter? bufferMessageAdapter) + { + BufferMessageAdapter = bufferMessageAdapter ?? EmptyBufferMessageAdapter.Instance; + } + /// /// Register a named entity type by defining the of the entity actor /// and functions to extract entity and shard identifier from messages. The actor diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingBufferAdapter.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingBufferAdapter.cs deleted file mode 100644 index bd124019a69..00000000000 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingBufferAdapter.cs +++ /dev/null @@ -1,34 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2009-2025 Lightbend Inc. -// Copyright (C) 2013-2025 .NET Foundation -// -// ----------------------------------------------------------------------- - -using Akka.Actor; -using Akka.Annotations; - -#nullable enable -namespace Akka.Cluster.Sharding; - -[InternalApi] -public class ClusterShardingBufferAdapter : IExtension -{ - public static ClusterShardingBufferAdapter Get(ActorSystem system) - { - return system.WithExtension(); - } - - public IShardingBufferMessageAdapter BufferMessageAdapter { get; private set; } = EmptyBufferMessageAdapter.Instance; - - public void SetShardingBufferMessageAdapter(IShardingBufferMessageAdapter? bufferMessageAdapter) - { - BufferMessageAdapter = bufferMessageAdapter ?? EmptyBufferMessageAdapter.Instance; - } -} - -[InternalApi] -public sealed class ClusterShardingBufferAdapterExtensionProvider : ExtensionIdProvider -{ - public override ClusterShardingBufferAdapter CreateExtension(ExtendedActorSystem system) => new (); -} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 916bb003731..cb454b08387 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -1020,7 +1020,7 @@ public Shard( _leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval; } - _bufferMessageAdapter = ClusterShardingBufferAdapter.Get(Context.System).BufferMessageAdapter; + _bufferMessageAdapter = ClusterSharding.Get(Context.System).BufferMessageAdapter; } protected override SupervisorStrategy SupervisorStrategy() diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 623e22a48b1..e5e8cef25b9 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -465,7 +465,7 @@ public ShardRegion( _initRegistrationDelay = TimeSpan.FromMilliseconds(100).Max(new TimeSpan(_retryInterval.Ticks / 2 / 2 / 2)); _nextRegistrationDelay = _initRegistrationDelay; - _bufferMessageAdapter = ClusterShardingBufferAdapter.Get(Context.System).BufferMessageAdapter; + _bufferMessageAdapter = ClusterSharding.Get(Context.System).BufferMessageAdapter; SetupCoordinatedShutdown(); } From 32ca8d94d6ed6ae5bbd17d0bdd7d30142021432a Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 6 Jan 2025 21:46:14 +0700 Subject: [PATCH 6/8] Add unit test --- .../ShardingBufferAdapterSpec.cs | 190 ++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs new file mode 100644 index 00000000000..2e319c3f575 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs @@ -0,0 +1,190 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2025 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using Akka.Util.Internal; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; +using static FluentAssertions.FluentActions; + +namespace Akka.Cluster.Sharding.Tests; + +public class ShardingBufferAdapterSpec: AkkaSpec +{ + private sealed class MessageExtractor: IMessageExtractor + { + public string EntityId(object message) + => message switch + { + int i => i.ToString(), + _ => null + }; + + public object EntityMessage(object message) + => message; + + public string ShardId(object message) + => message switch + { + int i => (i % 10).ToString(), + _ => null + }; + + public string ShardId(string entityId, object messageHint = null) + => (int.Parse(entityId) % 10).ToString(); + } + + private class EntityActor : ActorBase + { + protected override bool Receive(object message) + { + Sender.Tell(message); + return true; + } + } + + private class TestMessageAdapter: IShardingBufferMessageAdapter + { + private readonly AtomicCounter _counter; + + public TestMessageAdapter(AtomicCounter counter) + { + _counter = counter; + } + + public object Apply(object message, IActorContext context) + { + _counter.IncrementAndGet(); + return message; + } + } + + private const string ShardTypeName = "Caat"; + + private static Config SpecConfig => + ConfigurationFactory.ParseString(@" + akka.loglevel = DEBUG + akka.actor.provider = cluster + akka.remote.dot-netty.tcp.port = 0 + akka.remote.log-remote-lifecycle-events = off + + akka.test.single-expect-default = 5 s + akka.cluster.sharding.state-store-mode = ""ddata"" + akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on + akka.cluster.sharding.distributed-data.durable.keys = []") + .WithFallback(ClusterSingletonManager.DefaultConfig() + .WithFallback(ClusterSharding.DefaultConfig())); + + private readonly AtomicCounter _counterA = new (0); + private readonly AtomicCounter _counterB = new (0); + + private readonly ActorSystem _sysA; + private readonly ActorSystem _sysB; + + private readonly TestProbe _pA; + private readonly TestProbe _pB; + + private readonly IActorRef _regionA; + private readonly IActorRef _regionB; + + public ShardingBufferAdapterSpec(ITestOutputHelper helper) : base(SpecConfig, helper) + { + _sysA = Sys; + _sysB = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + + InitializeLogger(_sysB, "[sysB]"); + + _pA = CreateTestProbe(_sysA); + _pB = CreateTestProbe(_sysB); + + ClusterSharding.Get(_sysA).SetShardingBufferMessageAdapter(new TestMessageAdapter(_counterA)); + ClusterSharding.Get(_sysB).SetShardingBufferMessageAdapter(new TestMessageAdapter(_counterB)); + + _regionA = StartShard(_sysA); + _regionB = StartShard(_sysB); + } + + protected override void AfterAll() + { + if(_sysA != null) + Shutdown(_sysA); + if(_sysB != null) + Shutdown(_sysB); + base.AfterAll(); + } + + private IActorRef StartShard(ActorSystem sys) + { + return ClusterSharding.Get(sys).Start( + ShardTypeName, + Props.Create(() => new EntityActor()), + ClusterShardingSettings.Create(Sys).WithRememberEntities(true), + new MessageExtractor()); + } + + [Fact(DisplayName = "ClusterSharding buffer message adapter must be called when message was buffered")] + public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors() + { + Cluster.Get(_sysA).Join(Cluster.Get(_sysA).SelfAddress); // coordinator on A + + AwaitAssert(() => + { + Cluster.Get(_sysA).SelfMember.Status.Should().Be(MemberStatus.Up); + }, TimeSpan.FromSeconds(1)); + + Cluster.Get(_sysB).Join(Cluster.Get(_sysA).SelfAddress); + + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + foreach (var s in ImmutableHashSet.Create(_sysA, _sysB)) + { + Cluster.Get(s).SendCurrentClusterState(TestActor); + ExpectMsg().Members.Count.Should().Be(2); + } + }); + }); + + _regionA.Tell(1, _pA.Ref); + _pA.ExpectMsg(1); + + _regionB.Tell(2, _pB.Ref); + _pB.ExpectMsg(2); + + _regionB.Tell(3, _pB.Ref); + _pB.ExpectMsg(3); + + var counterAValue = _counterA.Current; + var counterBValue = _counterB.Current; + + // Each newly instantiated entities should buffer their message at least once + // Buffer message adapter should be called everytime a message is buffered + counterAValue.Should().BeGreaterOrEqualTo(1); + counterBValue.Should().BeGreaterOrEqualTo(2); + + _regionA.Tell(1, _pA.Ref); + _pA.ExpectMsg(1); + + _regionB.Tell(2, _pB.Ref); + _pB.ExpectMsg(2); + + _regionB.Tell(3, _pB.Ref); + _pB.ExpectMsg(3); + + // Each entity should not buffer their message once they were instantiated + _counterA.Current.Should().Be(counterAValue); + _counterB.Current.Should().Be(counterBValue); + } +} \ No newline at end of file From 9cdcf136b311820a6b91d546f3ae4bb901b56807 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 6 Jan 2025 21:46:34 +0700 Subject: [PATCH 7/8] Update API approval list --- ...pec.ApproveClusterSharding.DotNet.verified.txt | 15 ++++++--------- ...PISpec.ApproveClusterSharding.Net.verified.txt | 15 ++++++--------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt index 47fbd6b5b19..7c8df1bb4ed 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt @@ -12,11 +12,15 @@ namespace Akka.Cluster.Sharding public class ClusterSharding : Akka.Actor.IExtension { public ClusterSharding(Akka.Actor.ExtendedActorSystem system) { } + [Akka.Annotations.InternalApiAttribute()] + public Akka.Cluster.Sharding.IShardingBufferMessageAdapter BufferMessageAdapter { get; } public Akka.Cluster.Sharding.ClusterShardingSettings Settings { get; } public System.Collections.Immutable.ImmutableHashSet ShardTypeNames { get; } public static Akka.Configuration.Config DefaultConfig() { } public Akka.Cluster.Sharding.IShardAllocationStrategy DefaultShardAllocationStrategy(Akka.Cluster.Sharding.ClusterShardingSettings settings) { } public static Akka.Cluster.Sharding.ClusterSharding Get(Akka.Actor.ActorSystem system) { } + [Akka.Annotations.InternalApiAttribute()] + public void SetShardingBufferMessageAdapter(Akka.Cluster.Sharding.IShardingBufferMessageAdapter bufferMessageAdapter) { } public Akka.Actor.IActorRef ShardRegion(string typeName) { } public Akka.Actor.IActorRef ShardRegionProxy(string typeName) { } [System.ObsoleteAttribute("Use one of the overloads that accepts an IMessageExtractor instead")] @@ -216,9 +220,9 @@ namespace Akka.Cluster.Sharding public interface IShardRegionCommand { } public interface IShardRegionQuery { } [Akka.Annotations.InternalApiAttribute()] - public interface IShardingBufferedMessageAdapter + public interface IShardingBufferMessageAdapter { - object Adapt(object message); + object Apply(object message, Akka.Actor.IActorContext context); } public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy { @@ -340,13 +344,6 @@ namespace Akka.Cluster.Sharding public override int GetHashCode() { } public override string ToString() { } } - [Akka.Annotations.InternalApiAttribute()] - [System.Runtime.CompilerServices.NullableAttribute(0)] - public class ShardingSetup : Akka.Actor.Setup.Setup - { - public Akka.Cluster.Sharding.IShardingBufferedMessageAdapter BufferedMessageAdapter { get; } - public static Akka.Cluster.Sharding.ShardingSetup Create(Akka.Cluster.Sharding.IShardingBufferedMessageAdapter bufferedMessageAdapter) { } - } public enum StateStoreMode { Persistence = 0, diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt index 4311d052a99..20920646b0d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt @@ -12,11 +12,15 @@ namespace Akka.Cluster.Sharding public class ClusterSharding : Akka.Actor.IExtension { public ClusterSharding(Akka.Actor.ExtendedActorSystem system) { } + [Akka.Annotations.InternalApiAttribute()] + public Akka.Cluster.Sharding.IShardingBufferMessageAdapter BufferMessageAdapter { get; } public Akka.Cluster.Sharding.ClusterShardingSettings Settings { get; } public System.Collections.Immutable.ImmutableHashSet ShardTypeNames { get; } public static Akka.Configuration.Config DefaultConfig() { } public Akka.Cluster.Sharding.IShardAllocationStrategy DefaultShardAllocationStrategy(Akka.Cluster.Sharding.ClusterShardingSettings settings) { } public static Akka.Cluster.Sharding.ClusterSharding Get(Akka.Actor.ActorSystem system) { } + [Akka.Annotations.InternalApiAttribute()] + public void SetShardingBufferMessageAdapter(Akka.Cluster.Sharding.IShardingBufferMessageAdapter bufferMessageAdapter) { } public Akka.Actor.IActorRef ShardRegion(string typeName) { } public Akka.Actor.IActorRef ShardRegionProxy(string typeName) { } [System.ObsoleteAttribute("Use one of the overloads that accepts an IMessageExtractor instead")] @@ -216,9 +220,9 @@ namespace Akka.Cluster.Sharding public interface IShardRegionCommand { } public interface IShardRegionQuery { } [Akka.Annotations.InternalApiAttribute()] - public interface IShardingBufferedMessageAdapter + public interface IShardingBufferMessageAdapter { - object Adapt(object message); + object Apply(object message, Akka.Actor.IActorContext context); } public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy { @@ -340,13 +344,6 @@ namespace Akka.Cluster.Sharding public override int GetHashCode() { } public override string ToString() { } } - [Akka.Annotations.InternalApiAttribute()] - [System.Runtime.CompilerServices.NullableAttribute(0)] - public class ShardingSetup : Akka.Actor.Setup.Setup - { - public Akka.Cluster.Sharding.IShardingBufferedMessageAdapter BufferedMessageAdapter { get; } - public static Akka.Cluster.Sharding.ShardingSetup Create(Akka.Cluster.Sharding.IShardingBufferedMessageAdapter bufferedMessageAdapter) { } - } public enum StateStoreMode { Persistence = 0, From 6b6d5c0fdbcb08eb21d8f3a7602b8754ce239cf4 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 6 Jan 2025 21:49:31 +0700 Subject: [PATCH 8/8] Fix comment wording --- .../Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs index 2e319c3f575..e935c369f4f 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardingBufferAdapterSpec.cs @@ -169,7 +169,7 @@ public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors( var counterAValue = _counterA.Current; var counterBValue = _counterB.Current; - // Each newly instantiated entities should buffer their message at least once + // Each newly instantiated entities should have their messages buffered at least once // Buffer message adapter should be called everytime a message is buffered counterAValue.Should().BeGreaterOrEqualTo(1); counterBValue.Should().BeGreaterOrEqualTo(2); @@ -183,7 +183,7 @@ public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors( _regionB.Tell(3, _pB.Ref); _pB.ExpectMsg(3); - // Each entity should not buffer their message once they were instantiated + // Each entity should not have their messages buffered once they were instantiated _counterA.Current.Should().Be(counterAValue); _counterB.Current.Should().Be(counterBValue); }