Skip to content

Commit

Permalink
Added ShardingEnvelope checks for ShardingMessageAdapter (akkadot…
Browse files Browse the repository at this point in the history
…net#7449)

* cleaned up `ShardingBufferAdapterSpec`

* added `ShardEnvelope` check condition
  • Loading branch information
Aaronontheweb authored Jan 9, 2025
1 parent 468546c commit 80f73bc
Showing 1 changed file with 33 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
Expand Down Expand Up @@ -72,18 +73,20 @@ public object Apply(object message, IActorContext context)
private const string ShardTypeName = "Caat";

private static Config SpecConfig =>
ConfigurationFactory.ParseString(@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.remote.log-remote-lifecycle-events = off
akka.test.single-expect-default = 5 s
akka.cluster.sharding.state-store-mode = ""ddata""
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
akka.cluster.sharding.distributed-data.durable.keys = []")
.WithFallback(ClusterSingletonManager.DefaultConfig()
ConfigurationFactory.ParseString("""
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.remote.log-remote-lifecycle-events = off
akka.test.single-expect-default = 5 s
akka.cluster.sharding.state-store-mode = "ddata"
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
akka.cluster.sharding.distributed-data.durable.keys = []
""")
.WithFallback(ClusterSingleton.DefaultConfig()
.WithFallback(ClusterSharding.DefaultConfig()));

private readonly AtomicCounter _counterA = new (0);
Expand All @@ -105,9 +108,11 @@ public ShardingBufferAdapterSpec(ITestOutputHelper helper) : base(SpecConfig, he

InitializeLogger(_sysB, "[sysB]");

// ReSharper disable VirtualMemberCallInConstructor
_pA = CreateTestProbe(_sysA);
_pB = CreateTestProbe(_sysB);

// ReSharper restore VirtualMemberCallInConstructor

ClusterSharding.Get(_sysA).SetShardingBufferMessageAdapter(new TestMessageAdapter(_counterA));
ClusterSharding.Get(_sysB).SetShardingBufferMessageAdapter(new TestMessageAdapter(_counterB));

Expand All @@ -134,37 +139,38 @@ private IActorRef StartShard(ActorSystem sys)
}

[Fact(DisplayName = "ClusterSharding buffer message adapter must be called when message was buffered")]
public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors()
public async Task ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors()
{
Cluster.Get(_sysA).Join(Cluster.Get(_sysA).SelfAddress); // coordinator on A
await Cluster.Get(_sysA).JoinAsync(Cluster.Get(_sysA).SelfAddress); // coordinator on A

AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_sysA).SelfMember.Status.Should().Be(MemberStatus.Up);
}, TimeSpan.FromSeconds(1));

Cluster.Get(_sysB).Join(Cluster.Get(_sysA).SelfAddress);
await Cluster.Get(_sysB).JoinAsync(Cluster.Get(_sysA).SelfAddress);

Within(TimeSpan.FromSeconds(10), () =>
await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(async () =>
{
foreach (var s in ImmutableHashSet.Create(_sysA, _sysB))
{
Cluster.Get(s).SendCurrentClusterState(TestActor);
ExpectMsg<ClusterEvent.CurrentClusterState>().Members.Count.Should().Be(2);
(await ExpectMsgAsync<ClusterEvent.CurrentClusterState>()).Members.Count.Should().Be(2);
}
});
});

_regionA.Tell(1, _pA.Ref);
_pA.ExpectMsg(1);
// need to make sure that ShardingEnvelope doesn't impacted by this change
_regionA.Tell(new ShardingEnvelope("1", 1), _pA.Ref);
await _pA.ExpectMsgAsync(1);

_regionB.Tell(2, _pB.Ref);
_pB.ExpectMsg(2);
await _pB.ExpectMsgAsync(2);

_regionB.Tell(3, _pB.Ref);
_pB.ExpectMsg(3);
await _pB.ExpectMsgAsync(3);

var counterAValue = _counterA.Current;
var counterBValue = _counterB.Current;
Expand All @@ -175,13 +181,13 @@ public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors(
counterBValue.Should().BeGreaterOrEqualTo(2);

_regionA.Tell(1, _pA.Ref);
_pA.ExpectMsg(1);
await _pA.ExpectMsgAsync(1);

_regionB.Tell(2, _pB.Ref);
_pB.ExpectMsg(2);
await _pB.ExpectMsgAsync(2);

_regionB.Tell(3, _pB.Ref);
_pB.ExpectMsg(3);
await _pB.ExpectMsgAsync(3);

// Each entity should not have their messages buffered once they were instantiated
_counterA.Current.Should().Be(counterAValue);
Expand Down

0 comments on commit 80f73bc

Please sign in to comment.