7
7
8
8
using System ;
9
9
using System . Collections . Immutable ;
10
+ using System . Threading . Tasks ;
10
11
using Akka . Actor ;
11
12
using Akka . Cluster . Tools . Singleton ;
12
13
using Akka . Configuration ;
@@ -72,18 +73,20 @@ public object Apply(object message, IActorContext context)
72
73
private const string ShardTypeName = "Caat" ;
73
74
74
75
private static Config SpecConfig =>
75
- ConfigurationFactory . ParseString ( @"
76
- akka.loglevel = DEBUG
77
- akka.actor.provider = cluster
78
- akka.remote.dot-netty.tcp.port = 0
79
- akka.remote.log-remote-lifecycle-events = off
80
-
81
- akka.test.single-expect-default = 5 s
82
- akka.cluster.sharding.state-store-mode = ""ddata""
83
- akka.cluster.sharding.verbose-debug-logging = on
84
- akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
85
- akka.cluster.sharding.distributed-data.durable.keys = []" )
86
- . WithFallback ( ClusterSingletonManager . DefaultConfig ( )
76
+ ConfigurationFactory . ParseString ( """
77
+
78
+ akka.loglevel = DEBUG
79
+ akka.actor.provider = cluster
80
+ akka.remote.dot-netty.tcp.port = 0
81
+ akka.remote.log-remote-lifecycle-events = off
82
+
83
+ akka.test.single-expect-default = 5 s
84
+ akka.cluster.sharding.state-store-mode = "ddata"
85
+ akka.cluster.sharding.verbose-debug-logging = on
86
+ akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
87
+ akka.cluster.sharding.distributed-data.durable.keys = []
88
+ """ )
89
+ . WithFallback ( ClusterSingleton . DefaultConfig ( )
87
90
. WithFallback ( ClusterSharding . DefaultConfig ( ) ) ) ;
88
91
89
92
private readonly AtomicCounter _counterA = new ( 0 ) ;
@@ -105,9 +108,11 @@ public ShardingBufferAdapterSpec(ITestOutputHelper helper) : base(SpecConfig, he
105
108
106
109
InitializeLogger ( _sysB , "[sysB]" ) ;
107
110
111
+ // ReSharper disable VirtualMemberCallInConstructor
108
112
_pA = CreateTestProbe ( _sysA ) ;
109
113
_pB = CreateTestProbe ( _sysB ) ;
110
-
114
+ // ReSharper restore VirtualMemberCallInConstructor
115
+
111
116
ClusterSharding . Get ( _sysA ) . SetShardingBufferMessageAdapter ( new TestMessageAdapter ( _counterA ) ) ;
112
117
ClusterSharding . Get ( _sysB ) . SetShardingBufferMessageAdapter ( new TestMessageAdapter ( _counterB ) ) ;
113
118
@@ -134,37 +139,38 @@ private IActorRef StartShard(ActorSystem sys)
134
139
}
135
140
136
141
[ Fact ( DisplayName = "ClusterSharding buffer message adapter must be called when message was buffered" ) ]
137
- public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors ( )
142
+ public async Task ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors ( )
138
143
{
139
- Cluster . Get ( _sysA ) . Join ( Cluster . Get ( _sysA ) . SelfAddress ) ; // coordinator on A
144
+ await Cluster . Get ( _sysA ) . JoinAsync ( Cluster . Get ( _sysA ) . SelfAddress ) ; // coordinator on A
140
145
141
- AwaitAssert ( ( ) =>
146
+ await AwaitAssertAsync ( ( ) =>
142
147
{
143
148
Cluster . Get ( _sysA ) . SelfMember . Status . Should ( ) . Be ( MemberStatus . Up ) ;
144
149
} , TimeSpan . FromSeconds ( 1 ) ) ;
145
150
146
- Cluster . Get ( _sysB ) . Join ( Cluster . Get ( _sysA ) . SelfAddress ) ;
151
+ await Cluster . Get ( _sysB ) . JoinAsync ( Cluster . Get ( _sysA ) . SelfAddress ) ;
147
152
148
- Within ( TimeSpan . FromSeconds ( 10 ) , ( ) =>
153
+ await WithinAsync ( TimeSpan . FromSeconds ( 10 ) , async ( ) =>
149
154
{
150
- AwaitAssert ( ( ) =>
155
+ await AwaitAssertAsync ( async ( ) =>
151
156
{
152
157
foreach ( var s in ImmutableHashSet . Create ( _sysA , _sysB ) )
153
158
{
154
159
Cluster . Get ( s ) . SendCurrentClusterState ( TestActor ) ;
155
- ExpectMsg < ClusterEvent . CurrentClusterState > ( ) . Members . Count . Should ( ) . Be ( 2 ) ;
160
+ ( await ExpectMsgAsync < ClusterEvent . CurrentClusterState > ( ) ) . Members . Count . Should ( ) . Be ( 2 ) ;
156
161
}
157
162
} ) ;
158
163
} ) ;
159
164
160
- _regionA . Tell ( 1 , _pA . Ref ) ;
161
- _pA . ExpectMsg ( 1 ) ;
165
+ // need to make sure that ShardingEnvelope doesn't impacted by this change
166
+ _regionA . Tell ( new ShardingEnvelope ( "1" , 1 ) , _pA . Ref ) ;
167
+ await _pA . ExpectMsgAsync ( 1 ) ;
162
168
163
169
_regionB . Tell ( 2 , _pB . Ref ) ;
164
- _pB . ExpectMsg ( 2 ) ;
170
+ await _pB . ExpectMsgAsync ( 2 ) ;
165
171
166
172
_regionB . Tell ( 3 , _pB . Ref ) ;
167
- _pB . ExpectMsg ( 3 ) ;
173
+ await _pB . ExpectMsgAsync ( 3 ) ;
168
174
169
175
var counterAValue = _counterA . Current ;
170
176
var counterBValue = _counterB . Current ;
@@ -175,13 +181,13 @@ public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors(
175
181
counterBValue . Should ( ) . BeGreaterOrEqualTo ( 2 ) ;
176
182
177
183
_regionA . Tell ( 1 , _pA . Ref ) ;
178
- _pA . ExpectMsg ( 1 ) ;
184
+ await _pA . ExpectMsgAsync ( 1 ) ;
179
185
180
186
_regionB . Tell ( 2 , _pB . Ref ) ;
181
- _pB . ExpectMsg ( 2 ) ;
187
+ await _pB . ExpectMsgAsync ( 2 ) ;
182
188
183
189
_regionB . Tell ( 3 , _pB . Ref ) ;
184
- _pB . ExpectMsg ( 3 ) ;
190
+ await _pB . ExpectMsgAsync ( 3 ) ;
185
191
186
192
// Each entity should not have their messages buffered once they were instantiated
187
193
_counterA . Current . Should ( ) . Be ( counterAValue ) ;
0 commit comments