Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove System.ConfigurationManager support from .NET 6+ #7456

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ protected override bool ReceivePluginInternal(object message)
{
case SelectCurrentPersistenceIds msg:
SelectAllPersistenceIdsAsync(msg.Offset)
.PipeTo(msg.ReplyTo, success: h => new CurrentPersistenceIds(h.Ids, h.LastOrdering), failure: e => new Status.Failure(e));
.PipeTo(msg.ReplyTo, success: h => new CurrentPersistenceIds(h.Ids, h.LastOrdering),
failure: e => new Status.Failure(e));
return true;
case ReplayTaggedMessages replay:
ReplayTaggedMessagesAsync(replay)
.PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h), failure: e => new ReplayMessagesFailure(e));
.PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h),
failure: e => new ReplayMessagesFailure(e));
return true;
case ReplayAllEvents replay:
ReplayAllEventsAsync(replay)
Expand Down Expand Up @@ -122,11 +124,13 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
else eventToTags.Add(p, ImmutableHashSet<string>.Empty);

if (IsTagId(p.PersistenceId))
throw new InvalidOperationException($"Persistence Id {p.PersistenceId} must not start with {QueryExecutor.Configuration.TagsColumnName}");
throw new InvalidOperationException(
$"Persistence Id {p.PersistenceId} must not start with {QueryExecutor.Configuration.TagsColumnName}");
}

var batch = new WriteJournalBatch(eventToTags);
using(var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
await QueryExecutor.InsertBatchAsync(connection, cancellationToken.Token, batch);
}
}).ToArray();
Expand All @@ -149,15 +153,20 @@ protected virtual async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessage
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using(var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor
.SelectByTagAsync(connection, cancellationToken.Token, replay.Tag, replay.FromOffset, replay.ToOffset, replay.Max, replayedTagged => {
foreach(var adapted in AdaptFromJournal(replayedTagged.Persistent))
{
replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, replayedTagged.Tag, replayedTagged.Offset), ActorRefs.NoSender);
}
});
.SelectByTagAsync(connection, cancellationToken.Token, replay.Tag, replay.FromOffset,
replay.ToOffset, replay.Max, replayedTagged =>
{
foreach (var adapted in AdaptFromJournal(replayedTagged.Persistent))
{
replay.ReplyTo.Tell(
new ReplayedTaggedMessage(adapted, replayedTagged.Tag, replayedTagged.Offset),
ActorRefs.NoSender);
}
});
}
}
}
Expand All @@ -167,34 +176,41 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor
.SelectAllEventsAsync(
connection,
cancellationToken.Token,
replay.FromOffset,
cancellationToken.Token,
replay.FromOffset,
replay.ToOffset,
replay.Max,
replayedEvent => {
replay.Max,
replayedEvent =>
{
foreach (var adapted in AdaptFromJournal(replayedEvent.Persistent))
{
replay.ReplyTo.Tell(new ReplayedEvent(adapted, replayedEvent.Offset), ActorRefs.NoSender);
replay.ReplyTo.Tell(new ReplayedEvent(adapted, replayedEvent.Offset),
ActorRefs.NoSender);
}
});
}
}
}

protected virtual async Task<(IEnumerable<string> Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(long offset)
protected virtual async Task<(IEnumerable<string> Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(
long offset)
{
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
var lastOrdering = await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token);
var ids = await QueryExecutor.SelectAllPersistenceIdsAsync(connection, cancellationToken.Token, offset);
var lastOrdering =
await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token);
var ids = await QueryExecutor.SelectAllPersistenceIdsAsync(connection, cancellationToken.Token,
offset);
return (ids, lastOrdering);
}
}
Expand All @@ -210,15 +226,18 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
/// <param name="max">TBD</param>
/// <param name="recoveryCallback">TBD</param>
/// <returns>TBD</returns>
public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max,
public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr,
long toSequenceNr, long max,
Action<IPersistentRepresentation> recoveryCallback)
{
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await QueryExecutor.SelectByPersistenceIdAsync(connection, cancellationToken.Token, persistenceId, fromSequenceNr, toSequenceNr, max, recoveryCallback);
await QueryExecutor.SelectByPersistenceIdAsync(connection, cancellationToken.Token, persistenceId,
fromSequenceNr, toSequenceNr, max, recoveryCallback);
}
}
}
Expand Down Expand Up @@ -257,7 +276,7 @@ protected bool WaitingForInitialization(object message)
return true;
case Status.Failure fail:
Log.Error(fail.Cause, "Failure during {0} initialization.", Self);

// trigger a restart so we have some hope of succeeding in the future even if initialization failed
throw new ApplicationException("Failed to initialize SQL Journal.", fail.Cause);
default:
Expand All @@ -268,15 +287,16 @@ protected bool WaitingForInitialization(object message)

private async Task<object> Initialize()
{
if (!Settings.AutoInitialize)
if (!Settings.AutoInitialize)
return new Status.Success(NotUsed.Instance);

try
{
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await QueryExecutor.CreateTablesAsync(connection, cancellationToken.Token);
}
Expand All @@ -286,6 +306,7 @@ private async Task<object> Initialize()
{
return new Status.Failure(e);
}

return new Status.Success(NotUsed.Instance);
}

Expand Down Expand Up @@ -328,9 +349,11 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await QueryExecutor.DeleteBatchAsync(connection, cancellationToken.Token, persistenceId, toSequenceNr);
await QueryExecutor.DeleteBatchAsync(connection, cancellationToken.Token, persistenceId,
toSequenceNr);
}
}
}
Expand All @@ -346,9 +369,11 @@ public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token, persistenceId);
return await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token,
persistenceId);
}
}
}
Expand All @@ -361,15 +386,18 @@ protected virtual string GetConnectionString()
{
var connectionString = Settings.ConnectionString;

#if NETSTANDARD
if (string.IsNullOrEmpty(connectionString))
{
connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[Settings.ConnectionStringName].ConnectionString;
connectionString =
System.Configuration.ConfigurationManager.ConnectionStrings[Settings.ConnectionStringName].ConnectionString;
}
#endif

return connectionString;
}

protected ITimestampProvider GetTimestampProvider(string typeName) =>
TimestampProviderProvider.GetTimestampProvider(typeName, Context);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ namespace Akka.Persistence.Sql.Common.Snapshot
public abstract class SqlSnapshotStore : SnapshotStore, IWithUnboundedStash
{
#region messages

private sealed class Initialized
{
public static readonly Initialized Instance = new Initialized();
private Initialized() { }

private Initialized()
{
}
}

#endregion

/// <summary>
Expand All @@ -57,6 +60,7 @@ protected SqlSnapshotStore(Config config)
/// TBD
/// </summary>
protected ILoggingAdapter Log => _log ?? (_log ?? Context.GetLogger());

private ILoggingAdapter _log;

/// <summary>
Expand Down Expand Up @@ -114,7 +118,8 @@ private async Task<object> Initialize()
try
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
await QueryExecutor.CreateTableAsync(connection, nestedCancellationTokenSource.Token);
Expand All @@ -129,15 +134,15 @@ private async Task<object> Initialize()

private bool WaitingForInitialization(object message)
{
switch(message)
switch (message)
{
case Initialized _:
UnbecomeStacked();
Stash.UnstashAll();
return true;
case Status.Failure msg:
Log.Error(msg.Cause, "Error during snapshot store initialization");

// trigger a restart so we have some hope of succeeding in the future even if initialization failed
throw new ApplicationException("Failed to initialize SQL SnapshotStore.", msg.Cause);
default:
Expand All @@ -154,10 +159,13 @@ protected virtual string GetConnectionString()
{
var connectionString = Settings.ConnectionString;

#if NETSTANDARD
if (string.IsNullOrEmpty(connectionString))
{
connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[Settings.ConnectionStringName].ConnectionString;
connectionString =
System.Configuration.ConfigurationManager.ConnectionStrings[Settings.ConnectionStringName].ConnectionString;
}
#endif

return connectionString;
}
Expand All @@ -168,13 +176,16 @@ protected virtual string GetConnectionString()
/// <param name="persistenceId">TBD</param>
/// <param name="criteria">TBD</param>
/// <returns>TBD</returns>
protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
SnapshotSelectionCriteria criteria)
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
return await QueryExecutor.SelectSnapshotAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
return await QueryExecutor.SelectSnapshotAsync(connection, nestedCancellationTokenSource.Token,
persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
}
}

Expand All @@ -187,7 +198,8 @@ protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
await QueryExecutor.InsertAsync(connection, nestedCancellationTokenSource.Token, snapshot, metadata);
Expand All @@ -202,11 +214,13 @@ protected override async Task SaveAsync(SnapshotMetadata metadata, object snapsh
protected override async Task DeleteAsync(SnapshotMetadata metadata)
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
DateTime? timestamp = metadata.Timestamp != DateTime.MinValue ? metadata.Timestamp : default(DateTime?);
await QueryExecutor.DeleteAsync(connection, nestedCancellationTokenSource.Token, metadata.PersistenceId, metadata.SequenceNr, timestamp);
await QueryExecutor.DeleteAsync(connection, nestedCancellationTokenSource.Token, metadata.PersistenceId,
metadata.SequenceNr, timestamp);
}
}

Expand All @@ -219,11 +233,13 @@ protected override async Task DeleteAsync(SnapshotMetadata metadata)
protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId,
criteria.MaxSequenceNr, criteria.MaxTimeStamp);
}
}
}
}
}
Loading
Loading