@@ -76,11 +76,13 @@ protected override bool ReceivePluginInternal(object message)
76
76
{
77
77
case SelectCurrentPersistenceIds msg :
78
78
SelectAllPersistenceIdsAsync ( msg . Offset )
79
- . PipeTo ( msg . ReplyTo , success : h => new CurrentPersistenceIds ( h . Ids , h . LastOrdering ) , failure : e => new Status . Failure ( e ) ) ;
79
+ . PipeTo ( msg . ReplyTo , success : h => new CurrentPersistenceIds ( h . Ids , h . LastOrdering ) ,
80
+ failure : e => new Status . Failure ( e ) ) ;
80
81
return true ;
81
82
case ReplayTaggedMessages replay :
82
83
ReplayTaggedMessagesAsync ( replay )
83
- . PipeTo ( replay . ReplyTo , success : h => new RecoverySuccess ( h ) , failure : e => new ReplayMessagesFailure ( e ) ) ;
84
+ . PipeTo ( replay . ReplyTo , success : h => new RecoverySuccess ( h ) ,
85
+ failure : e => new ReplayMessagesFailure ( e ) ) ;
84
86
return true ;
85
87
case ReplayAllEvents replay :
86
88
ReplayAllEventsAsync ( replay )
@@ -122,11 +124,13 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
122
124
else eventToTags . Add ( p , ImmutableHashSet < string > . Empty ) ;
123
125
124
126
if ( IsTagId ( p . PersistenceId ) )
125
- throw new InvalidOperationException ( $ "Persistence Id { p . PersistenceId } must not start with { QueryExecutor . Configuration . TagsColumnName } ") ;
127
+ throw new InvalidOperationException (
128
+ $ "Persistence Id { p . PersistenceId } must not start with { QueryExecutor . Configuration . TagsColumnName } ") ;
126
129
}
127
130
128
131
var batch = new WriteJournalBatch ( eventToTags ) ;
129
- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
132
+ using ( var cancellationToken =
133
+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
130
134
await QueryExecutor . InsertBatchAsync ( connection , cancellationToken . Token , batch ) ;
131
135
}
132
136
} ) . ToArray ( ) ;
@@ -149,15 +153,20 @@ protected virtual async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessage
149
153
using ( var connection = CreateDbConnection ( ) )
150
154
{
151
155
await connection . OpenAsync ( ) ;
152
- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
156
+ using ( var cancellationToken =
157
+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
153
158
{
154
159
return await QueryExecutor
155
- . SelectByTagAsync ( connection , cancellationToken . Token , replay . Tag , replay . FromOffset , replay . ToOffset , replay . Max , replayedTagged => {
156
- foreach ( var adapted in AdaptFromJournal ( replayedTagged . Persistent ) )
157
- {
158
- replay . ReplyTo . Tell ( new ReplayedTaggedMessage ( adapted , replayedTagged . Tag , replayedTagged . Offset ) , ActorRefs . NoSender ) ;
159
- }
160
- } ) ;
160
+ . SelectByTagAsync ( connection , cancellationToken . Token , replay . Tag , replay . FromOffset ,
161
+ replay . ToOffset , replay . Max , replayedTagged =>
162
+ {
163
+ foreach ( var adapted in AdaptFromJournal ( replayedTagged . Persistent ) )
164
+ {
165
+ replay . ReplyTo . Tell (
166
+ new ReplayedTaggedMessage ( adapted , replayedTagged . Tag , replayedTagged . Offset ) ,
167
+ ActorRefs . NoSender ) ;
168
+ }
169
+ } ) ;
161
170
}
162
171
}
163
172
}
@@ -167,34 +176,41 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
167
176
using ( var connection = CreateDbConnection ( ) )
168
177
{
169
178
await connection . OpenAsync ( ) ;
170
- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
179
+ using ( var cancellationToken =
180
+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
171
181
{
172
182
return await QueryExecutor
173
183
. SelectAllEventsAsync (
174
184
connection ,
175
- cancellationToken . Token ,
176
- replay . FromOffset ,
185
+ cancellationToken . Token ,
186
+ replay . FromOffset ,
177
187
replay . ToOffset ,
178
- replay . Max ,
179
- replayedEvent => {
188
+ replay . Max ,
189
+ replayedEvent =>
190
+ {
180
191
foreach ( var adapted in AdaptFromJournal ( replayedEvent . Persistent ) )
181
192
{
182
- replay . ReplyTo . Tell ( new ReplayedEvent ( adapted , replayedEvent . Offset ) , ActorRefs . NoSender ) ;
193
+ replay . ReplyTo . Tell ( new ReplayedEvent ( adapted , replayedEvent . Offset ) ,
194
+ ActorRefs . NoSender ) ;
183
195
}
184
196
} ) ;
185
197
}
186
198
}
187
199
}
188
200
189
- protected virtual async Task < ( IEnumerable < string > Ids , long LastOrdering ) > SelectAllPersistenceIdsAsync ( long offset )
201
+ protected virtual async Task < ( IEnumerable < string > Ids , long LastOrdering ) > SelectAllPersistenceIdsAsync (
202
+ long offset )
190
203
{
191
204
using ( var connection = CreateDbConnection ( ) )
192
205
{
193
206
await connection . OpenAsync ( ) ;
194
- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
207
+ using ( var cancellationToken =
208
+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
195
209
{
196
- var lastOrdering = await QueryExecutor . SelectHighestSequenceNrAsync ( connection , cancellationToken . Token ) ;
197
- var ids = await QueryExecutor . SelectAllPersistenceIdsAsync ( connection , cancellationToken . Token , offset ) ;
210
+ var lastOrdering =
211
+ await QueryExecutor . SelectHighestSequenceNrAsync ( connection , cancellationToken . Token ) ;
212
+ var ids = await QueryExecutor . SelectAllPersistenceIdsAsync ( connection , cancellationToken . Token ,
213
+ offset ) ;
198
214
return ( ids , lastOrdering ) ;
199
215
}
200
216
}
@@ -210,15 +226,18 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
210
226
/// <param name="max">TBD</param>
211
227
/// <param name="recoveryCallback">TBD</param>
212
228
/// <returns>TBD</returns>
213
- public override async Task ReplayMessagesAsync ( IActorContext context , string persistenceId , long fromSequenceNr , long toSequenceNr , long max ,
229
+ public override async Task ReplayMessagesAsync ( IActorContext context , string persistenceId , long fromSequenceNr ,
230
+ long toSequenceNr , long max ,
214
231
Action < IPersistentRepresentation > recoveryCallback )
215
232
{
216
233
using ( var connection = CreateDbConnection ( ) )
217
234
{
218
235
await connection . OpenAsync ( ) ;
219
- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
236
+ using ( var cancellationToken =
237
+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
220
238
{
221
- await QueryExecutor . SelectByPersistenceIdAsync ( connection , cancellationToken . Token , persistenceId , fromSequenceNr , toSequenceNr , max , recoveryCallback ) ;
239
+ await QueryExecutor . SelectByPersistenceIdAsync ( connection , cancellationToken . Token , persistenceId ,
240
+ fromSequenceNr , toSequenceNr , max , recoveryCallback ) ;
222
241
}
223
242
}
224
243
}
@@ -257,7 +276,7 @@ protected bool WaitingForInitialization(object message)
257
276
return true ;
258
277
case Status . Failure fail :
259
278
Log . Error ( fail . Cause , "Failure during {0} initialization." , Self ) ;
260
-
279
+
261
280
// trigger a restart so we have some hope of succeeding in the future even if initialization failed
262
281
throw new ApplicationException ( "Failed to initialize SQL Journal." , fail . Cause ) ;
263
282
default :
@@ -268,15 +287,16 @@ protected bool WaitingForInitialization(object message)
268
287
269
288
private async Task < object > Initialize ( )
270
289
{
271
- if ( ! Settings . AutoInitialize )
290
+ if ( ! Settings . AutoInitialize )
272
291
return new Status . Success ( NotUsed . Instance ) ;
273
292
274
293
try
275
294
{
276
295
using ( var connection = CreateDbConnection ( ) )
277
296
{
278
297
await connection . OpenAsync ( ) ;
279
- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
298
+ using ( var cancellationToken =
299
+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
280
300
{
281
301
await QueryExecutor . CreateTablesAsync ( connection , cancellationToken . Token ) ;
282
302
}
@@ -286,6 +306,7 @@ private async Task<object> Initialize()
286
306
{
287
307
return new Status . Failure ( e ) ;
288
308
}
309
+
289
310
return new Status . Success ( NotUsed . Instance ) ;
290
311
}
291
312
@@ -328,9 +349,11 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t
328
349
using ( var connection = CreateDbConnection ( ) )
329
350
{
330
351
await connection . OpenAsync ( ) ;
331
- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
352
+ using ( var cancellationToken =
353
+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
332
354
{
333
- await QueryExecutor . DeleteBatchAsync ( connection , cancellationToken . Token , persistenceId , toSequenceNr ) ;
355
+ await QueryExecutor . DeleteBatchAsync ( connection , cancellationToken . Token , persistenceId ,
356
+ toSequenceNr ) ;
334
357
}
335
358
}
336
359
}
@@ -346,9 +369,11 @@ public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId
346
369
using ( var connection = CreateDbConnection ( ) )
347
370
{
348
371
await connection . OpenAsync ( ) ;
349
- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
372
+ using ( var cancellationToken =
373
+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
350
374
{
351
- return await QueryExecutor . SelectHighestSequenceNrAsync ( connection , cancellationToken . Token , persistenceId ) ;
375
+ return await QueryExecutor . SelectHighestSequenceNrAsync ( connection , cancellationToken . Token ,
376
+ persistenceId ) ;
352
377
}
353
378
}
354
379
}
@@ -361,15 +386,18 @@ protected virtual string GetConnectionString()
361
386
{
362
387
var connectionString = Settings . ConnectionString ;
363
388
389
+ #if NETSTANDARD
364
390
if ( string . IsNullOrEmpty ( connectionString ) )
365
391
{
366
- connectionString = System . Configuration . ConfigurationManager . ConnectionStrings [ Settings . ConnectionStringName ] . ConnectionString ;
392
+ connectionString =
393
+ System . Configuration . ConfigurationManager . ConnectionStrings [ Settings . ConnectionStringName ] . ConnectionString ;
367
394
}
395
+ #endif
368
396
369
397
return connectionString ;
370
398
}
371
399
372
400
protected ITimestampProvider GetTimestampProvider ( string typeName ) =>
373
401
TimestampProviderProvider . GetTimestampProvider ( typeName , Context ) ;
374
402
}
375
- }
403
+ }
0 commit comments