From adab09d8f1e7cb1fe16f1c48bbd0f0e1c32ee917 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 19 Jan 2024 09:30:24 -0600 Subject: [PATCH 01/26] attempting to fix racy Akka.Persistence.TestKit.Tests --- .../Actors/CounterActor.cs | 14 ++++++++++++-- .../Bug4762FixSpec.cs | 1 - 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index ca404a5a0a4..2b19e30b745 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -72,6 +72,16 @@ protected override void OnRecover(object message) return; } } + + protected override void PostStop() + { + _log.Info("Shutting down"); + } + + protected override void PreStart() + { + _log.Info("Starting up"); + } } public class CounterActorTests : PersistenceTestKit @@ -86,9 +96,9 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s var counterProps = Props.Create(() => new CounterActor("test")); var actor = ActorOf(counterProps, "counter"); - Watch(actor); + await WatchAsync(actor); actor.Tell("inc", TestActor); - await ExpectMsgAsync(TimeSpan.FromSeconds(3)); + await ExpectTerminatedAsync(actor); // need to restart actor actor = ActorOf(counterProps, "counter1"); diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index d9cb3099aa2..dee3bcd0f45 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -80,7 +80,6 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( return WithJournalWrite(write => write.Pass(), async () => { var actor = ActorOf(() => new TestActor2(probe)); - Watch(actor); var command = new WriteMessage(); actor.Tell(command, actor); From 185d5adcd7cbe0df0ecf8747505db8d63cce4833 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 19 Jan 2024 09:53:55 -0600 Subject: [PATCH 02/26] added debug log --- src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index 2b19e30b745..d4275c6f059 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -96,6 +96,7 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s var counterProps = Props.Create(() => new CounterActor("test")); var actor = ActorOf(counterProps, "counter"); + Sys.Log.Info("Messaging actor"); await WatchAsync(actor); actor.Tell("inc", TestActor); await ExpectTerminatedAsync(actor); From 272b962eebf3819e94002a090cbf55157990673f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 19 Jan 2024 10:27:14 -0600 Subject: [PATCH 03/26] looking into some suspicious `await` calls inside the `AsyncWriteJournal` --- .../Journal/JournalInterceptors.cs | 2 +- .../Journal/AsyncWriteJournal.cs | 27 ++++++++++--------- .../Akka.Persistence/Journal/MemoryJournal.cs | 4 +-- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs index 22f3eadf865..41ff1a3e3b0 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs @@ -16,7 +16,7 @@ internal class Noop : IJournalInterceptor { public static readonly IJournalInterceptor Instance = new Noop(); - public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true); + public Task InterceptAsync(IPersistentRepresentation message) => Task.CompletedTask; } internal class Failure : IJournalInterceptor diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index a30590147b5..d3f88339d93 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -259,20 +259,11 @@ private void HandleReplayMessages(ReplayMessages message) async Task ExecuteHighestSequenceNr() { - void CompleteHighSeqNo(long highSeqNo) - { - replyTo.Tell(new RecoverySuccess(highSeqNo)); - - if (CanPublish) - { - eventStream.Publish(message); - } - } - try { var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), state => - state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom)); + state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom)) + .ConfigureAwait(false); var toSequenceNr = Math.Min(message.ToSequenceNr, highSequenceNr); if (toSequenceNr <= 0L || message.FromSequenceNr > toSequenceNr) { @@ -293,7 +284,7 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr replyTo.Tell(new ReplayedMessage(adaptedRepresentation), ActorRefs.NoSender); } } - }); + }).ConfigureAwait(false); CompleteHighSeqNo(highSequenceNr); } @@ -309,6 +300,18 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr { replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex))); } + + return; + + void CompleteHighSeqNo(long highSeqNo) + { + replyTo.Tell(new RecoverySuccess(highSeqNo)); + + if (CanPublish) + { + eventStream.Publish(message); + } + } } // instead of ContinueWith diff --git a/src/core/Akka.Persistence/Journal/MemoryJournal.cs b/src/core/Akka.Persistence/Journal/MemoryJournal.cs index 25a861db3ac..445b8d38917 100644 --- a/src/core/Akka.Persistence/Journal/MemoryJournal.cs +++ b/src/core/Akka.Persistence/Journal/MemoryJournal.cs @@ -137,7 +137,7 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten var highest = HighestSequenceNr(persistenceId); if (highest != 0L && max != 0L) Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max).ForEach(recoveryCallback); - return Task.FromResult(new object()); + return Task.CompletedTask; } /// @@ -154,7 +154,7 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque _meta.AddOrUpdate(persistenceId, highestSeqNr, (_, _) => highestSeqNr); for (var snr = 1L; snr <= toSeqNr; snr++) Delete(persistenceId, snr); - return Task.FromResult(new object()); + return Task.CompletedTask; } protected override bool ReceivePluginInternal(object message) From 7b8b538a5b8a8b102198f93b4496c35070f86f6a Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 08:09:54 -0600 Subject: [PATCH 04/26] minor code clean up --- .../PersistenceTestKit.cs | 10 +++++----- .../SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs index 406dab3d5d6..e7b4d74b1dc 100644 --- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs @@ -157,7 +157,7 @@ public Task WithJournalRecovery(Func behaviorSele if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(new object()); + return Task.CompletedTask; }); /// @@ -175,7 +175,7 @@ public Task WithJournalWrite(Func behaviorSelector, if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(new object()); + return Task.CompletedTask; }); /// @@ -268,7 +268,7 @@ public Task WithSnapshotSave(Func behaviorSelec if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// @@ -286,7 +286,7 @@ public Task WithSnapshotLoad(Func behaviorSelec if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// @@ -304,7 +304,7 @@ public Task WithSnapshotDelete(Func behaviorS if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs index 67ccccb0d49..84e7fc8ef77 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs @@ -18,7 +18,7 @@ internal class SnapshotStoreSaveBehaviorSetter : ISnapshotStoreBehaviorSetter { internal SnapshotStoreSaveBehaviorSetter(IActorRef snapshots) { - this._snapshots = snapshots; + _snapshots = snapshots; } private readonly IActorRef _snapshots; From ef8fd689462ec1bbb3fc28a5c4063cbeba927a94 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 08:20:59 -0600 Subject: [PATCH 05/26] `MemoryJournal` cleanup --- .../Akka.Persistence/Journal/MemoryJournal.cs | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/core/Akka.Persistence/Journal/MemoryJournal.cs b/src/core/Akka.Persistence/Journal/MemoryJournal.cs index 445b8d38917..7358359fc60 100644 --- a/src/core/Akka.Persistence/Journal/MemoryJournal.cs +++ b/src/core/Akka.Persistence/Journal/MemoryJournal.cs @@ -91,7 +91,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl var persistentRepresentation = p.WithTimestamp(DateTime.UtcNow.Ticks); Add(persistentRepresentation); _allMessages.AddLast(persistentRepresentation); - if (!(p.Payload is Tagged tagged)) continue; + if (p.Payload is not Tagged tagged) continue; foreach (var tag in tagged.Tags) { @@ -197,7 +197,7 @@ private Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) if (!_tagsToMessagesMapping.ContainsKey(replay.Tag)) return Task.FromResult(0); - int index = 0; + var index = 0; foreach (var persistence in _tagsToMessagesMapping[replay.Tag] .Skip(replay.FromOffset) .Take(replay.ToOffset)) @@ -212,7 +212,7 @@ private Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) private Task ReplayAllEventsAsync(ReplayAllEvents replay) { - int index = 0; + var index = 0; var replayed = _allMessages .Skip(replay.FromOffset) .Take(replay.ToOffset - replay.FromOffset) @@ -557,7 +557,7 @@ public Messages Add(IPersistentRepresentation persistent) /// TBD public Messages Update(string pid, long seqNr, Func updater) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { var node = persistents.First; while (node != null) @@ -580,7 +580,7 @@ public Messages Update(string pid, long seqNr, FuncTBD public Messages Delete(string pid, long seqNr) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { var node = persistents.First; while (node != null) @@ -605,7 +605,7 @@ public Messages Delete(string pid, long seqNr) /// TBD public IEnumerable Read(string pid, long fromSeqNr, long toSeqNr, long max) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { return persistents .Where(x => x.SequenceNr >= fromSeqNr && x.SequenceNr <= toSeqNr) @@ -622,13 +622,10 @@ public IEnumerable Read(string pid, long fromSeqNr, l /// TBD public long HighestSequenceNr(string pid) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) - { - var last = persistents.LastOrDefault(); - return last?.SequenceNr ?? 0L; - } + if (!Messages.TryGetValue(pid, out var persistents)) return 0L; + var last = persistents.LastOrDefault(); + return last?.SequenceNr ?? 0L; - return 0L; } #endregion From caf93d767df96d9226f787b54801e500f8173f96 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 08:35:27 -0600 Subject: [PATCH 06/26] added more robust external error logging --- .../PersistenceTestKit.cs | 26 +++++++++++++++++++ .../Akka.Persistence/Journal/MemoryJournal.cs | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs index e7b4d74b1dc..44a420a4e8d 100644 --- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs @@ -7,6 +7,7 @@ using Akka.Actor.Setup; using Akka.Configuration; +using Akka.Event; namespace Akka.Persistence.TestKit { @@ -111,6 +112,11 @@ public async Task WithJournalRecovery(Func behavi await behaviorSelector(Journal.OnRecovery); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithJournalRecovery"); + throw; + } finally { await Journal.OnRecovery.Pass(); @@ -136,6 +142,11 @@ public async Task WithJournalWrite(Func behaviorSele await behaviorSelector(Journal.OnWrite); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithJournalWrite"); + throw; + } finally { await Journal.OnWrite.Pass(); @@ -197,6 +208,11 @@ public async Task WithSnapshotSave(Func behavio await behaviorSelector(Snapshots.OnSave); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotSave"); + throw; + } finally { await Snapshots.OnSave.Pass(); @@ -222,6 +238,11 @@ public async Task WithSnapshotLoad(Func behavio await behaviorSelector(Snapshots.OnLoad); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotLoad"); + throw; + } finally { await Snapshots.OnLoad.Pass(); @@ -247,6 +268,11 @@ public async Task WithSnapshotDelete(Func beh await behaviorSelector(Snapshots.OnDelete); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotDelete"); + throw; + } finally { await Snapshots.OnDelete.Pass(); diff --git a/src/core/Akka.Persistence/Journal/MemoryJournal.cs b/src/core/Akka.Persistence/Journal/MemoryJournal.cs index 7358359fc60..89cc7756835 100644 --- a/src/core/Akka.Persistence/Journal/MemoryJournal.cs +++ b/src/core/Akka.Persistence/Journal/MemoryJournal.cs @@ -118,7 +118,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl /// TBD public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { - return Task.FromResult(Math.Max(HighestSequenceNr(persistenceId), _meta.TryGetValue(persistenceId, out long metaSeqNr) ? metaSeqNr : 0L)); + return Task.FromResult(Math.Max(HighestSequenceNr(persistenceId), _meta.TryGetValue(persistenceId, out var metaSeqNr) ? metaSeqNr : 0L)); } /// From 30897ff96abe769670aab69b0f3e81e4ea43048b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 09:02:16 -0600 Subject: [PATCH 07/26] disable sync thread dispatcher --- src/core/Akka.TestKit/TestKitBase.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index c639719304b..d13da2333b0 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -701,8 +701,9 @@ public IActorRef CreateTestActor(string name) private IActorRef CreateTestActor(ActorSystem system, string name) { - var testActorProps = Props.Create(() => new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))) - .WithDispatcher("akka.test.test-actor.dispatcher"); + var testActorProps = Props.Create(() => + new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))); + //.WithDispatcher("akka.test.test-actor.dispatcher"); var testActor = system.AsInstanceOf().SystemActorOf(testActorProps, name); return testActor; } From fbb0980640f3c12a8bbe4c400ba8c9e3bfd4c64e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 09:33:23 -0600 Subject: [PATCH 08/26] Revert "disable sync thread dispatcher" This reverts commit 30897ff96abe769670aab69b0f3e81e4ea43048b. --- src/core/Akka.TestKit/TestKitBase.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index d13da2333b0..c639719304b 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -701,9 +701,8 @@ public IActorRef CreateTestActor(string name) private IActorRef CreateTestActor(ActorSystem system, string name) { - var testActorProps = Props.Create(() => - new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))); - //.WithDispatcher("akka.test.test-actor.dispatcher"); + var testActorProps = Props.Create(() => new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))) + .WithDispatcher("akka.test.test-actor.dispatcher"); var testActor = system.AsInstanceOf().SystemActorOf(testActorProps, name); return testActor; } From 0178cd3906af0b09c170387acbc5c07836c115eb Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 09:46:18 -0600 Subject: [PATCH 09/26] added debugging capabilities to `TestJournal` --- .../Journal/TestJournal.cs | 29 +++++++++++++++++++ src/core/Akka.Persistence.TestKit/config.conf | 5 +++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs index 56aa3f6b528..9f572ca34f1 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs @@ -5,6 +5,9 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; +using Akka.Event; + namespace Akka.Persistence.TestKit { using Akka.Actor; @@ -20,19 +23,31 @@ namespace Akka.Persistence.TestKit /// public sealed class TestJournal : MemoryJournal { + private readonly ILoggingAdapter _log = Context.GetLogger(); private IJournalInterceptor _writeInterceptor = JournalInterceptors.Noop.Instance; private IJournalInterceptor _recoveryInterceptor = JournalInterceptors.Noop.Instance; + public TestJournal(Config journalConfig) + { + DebugEnabled = journalConfig.GetBoolean("debug", false); + } + + private bool DebugEnabled { get; } + protected override bool ReceivePluginInternal(object message) { switch (message) { case UseWriteInterceptor use: + if(DebugEnabled) + _log.Info("Using write interceptor {0}", use.Interceptor.GetType().Name); _writeInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseRecoveryInterceptor use: + if(DebugEnabled) + _log.Info("Using recovery interceptor {0}", use.Interceptor.GetType().Name); _recoveryInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; @@ -51,7 +66,12 @@ protected override async Task> WriteMessagesAsync(IEnu { foreach (var p in (IEnumerable)w.Payload) { + if(DebugEnabled) + _log.Info("Beginning write intercept of message {0} with interceptor {1}", p, _writeInterceptor.GetType().Name); await _writeInterceptor.InterceptAsync(p); + + if(DebugEnabled) + _log.Info("Completed write intercept of message {0} with interceptor {1}", p, _writeInterceptor.GetType().Name); Add(p); } } @@ -75,6 +95,10 @@ protected override async Task> WriteMessagesAsync(IEnu public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action recoveryCallback) { var highest = HighestSequenceNr(persistenceId); + + if(DebugEnabled) + _log.Info("Replaying messages from {0} to {1} for persistenceId {2}", fromSequenceNr, toSequenceNr, persistenceId); + if (highest != 0L && max != 0L) { var messages = Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max); @@ -82,7 +106,12 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per { try { + if(DebugEnabled) + _log.Info("Beginning recovery intercept of message {0} with interceptor {1}", p, _recoveryInterceptor.GetType().Name); await _recoveryInterceptor.InterceptAsync(p); + + if(DebugEnabled) + _log.Info("Completed recovery intercept of message {0} with interceptor {1}", p, _recoveryInterceptor.GetType().Name); recoveryCallback(p); } catch (TestJournalFailureException) diff --git a/src/core/Akka.Persistence.TestKit/config.conf b/src/core/Akka.Persistence.TestKit/config.conf index dda3d635272..a416e0fb7ca 100644 --- a/src/core/Akka.Persistence.TestKit/config.conf +++ b/src/core/Akka.Persistence.TestKit/config.conf @@ -7,6 +7,9 @@ akka { test { class = "Akka.Persistence.TestKit.TestJournal, Akka.Persistence.TestKit" plugin-dispatcher = "akka.actor.default-dispatcher" + + # enables debug mode, which adds verbose logging to each of the TestJournal stages + debug = false } } @@ -16,7 +19,7 @@ akka { test { class = "Akka.Persistence.TestKit.TestSnapshotStore, Akka.Persistence.TestKit" - plugin-dispatcher = "akka.actor.default-dispatcher" + plugin-dispatcher = "akka.actor.default-dispatcher" } } } From 7206ccac995c54185684a023a31bc791678aa93e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 10:00:58 -0600 Subject: [PATCH 10/26] added debug logging to `CounterActor` specs --- .../Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs | 6 +++++- .../Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index d4275c6f059..51b7fd8c9c3 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; using Akka.Event; using Xunit.Abstractions; @@ -86,7 +87,10 @@ protected override void PreStart() public class CounterActorTests : PersistenceTestKit { - public CounterActorTests(ITestOutputHelper output) : base(output:output){} + // create a Config that enables debug mode on the TestJournal + private static readonly Config Config = ConfigurationFactory.ParseString("akka.persistence.journal.test.debug = on"); + + public CounterActorTests(ITestOutputHelper output) : base(Config, output:output){} [Fact] public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_store_is_not_available() diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs index 44a420a4e8d..f35070446b2 100644 --- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs @@ -374,7 +374,7 @@ private static Config GetConfig(Config customConfig) { var defaultConfig = ConfigurationFactory.FromResource("Akka.Persistence.TestKit.config.conf"); if (customConfig == Config.Empty) return defaultConfig; - else return defaultConfig.SafeWithFallback(customConfig); + else return customConfig.WithFallback(defaultConfig); } } } From daf2c8cb77988cf6706fed6d5c4ff4a64d073889 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 10:07:18 -0600 Subject: [PATCH 11/26] added debug logging to `Bug4762FixSpec` --- .../Bug4762FixSpec.cs | 25 +++++++++++-------- .../Journal/TestJournal.cs | 3 +++ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index dee3bcd0f45..3e78a44d13f 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -11,6 +11,7 @@ using System.Text; using System.Threading.Tasks; using Akka.Actor; +using Akka.Configuration; using Akka.Event; using Xunit; using Xunit.Abstractions; @@ -22,16 +23,21 @@ namespace Akka.Persistence.TestKit.Tests /// public class Bug4762FixSpec : PersistenceTestKit { - public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(output: outputHelper) + // create a Config that enables debug mode on the TestJournal + private static readonly Config Config = + ConfigurationFactory.ParseString("akka.persistence.journal.test.debug = on"); + + public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(Config, output: outputHelper) { - } - + private class WriteMessage - { } + { + } private class TestEvent - { } + { + } private class TestActor2 : UntypedPersistentActor { @@ -48,17 +54,14 @@ public TestActor2(IActorRef probe) protected override void OnCommand(object message) { _log.Info("Received command {0}", message); - + switch (message) { case WriteMessage _: var event1 = new TestEvent(); var event2 = new TestEvent(); var events = new List { event1, event2 }; - PersistAll(events, _ => - { - _probe.Tell(Done.Instance); - }); + PersistAll(events, _ => { _probe.Tell(Done.Instance); }); break; default: @@ -91,4 +94,4 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( }); } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs index 9f572ca34f1..bf9ea165013 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs @@ -121,6 +121,9 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per throw; } } + + if(DebugEnabled) + _log.Info("Completed replaying messages from {0} to {1} for persistenceId {2}", fromSequenceNr, toSequenceNr, persistenceId); } } From 26108a79dd081b7f2ba631055950a5e920a207a1 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 16:13:37 -0600 Subject: [PATCH 12/26] added debug logging to `TestSnapshotStore` --- .../Actors/CounterActor.cs | 6 ++- .../Bug4762FixSpec.cs | 5 ++- .../Journal/TestJournal.cs | 14 +++--- .../SnapshotStore/TestSnapshotStore.cs | 45 ++++++++++++++++++- src/core/Akka.Persistence.TestKit/config.conf | 3 ++ 5 files changed, 62 insertions(+), 11 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index 51b7fd8c9c3..e2d8e2e9445 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -88,7 +88,11 @@ protected override void PreStart() public class CounterActorTests : PersistenceTestKit { // create a Config that enables debug mode on the TestJournal - private static readonly Config Config = ConfigurationFactory.ParseString("akka.persistence.journal.test.debug = on"); + private static readonly Config Config = + ConfigurationFactory.ParseString(""" + akka.persistence.journal.test.debug = on + akka.persistence.snapshot-store.test.debug = on + """); public CounterActorTests(ITestOutputHelper output) : base(Config, output:output){} diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index 3e78a44d13f..a590aa2e3e7 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -25,7 +25,10 @@ public class Bug4762FixSpec : PersistenceTestKit { // create a Config that enables debug mode on the TestJournal private static readonly Config Config = - ConfigurationFactory.ParseString("akka.persistence.journal.test.debug = on"); + ConfigurationFactory.ParseString(""" + akka.persistence.journal.test.debug = on + akka.persistence.snapshot-store.test.debug = on + """); public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(Config, output: outputHelper) { diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs index bf9ea165013..4333efeb7ad 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs @@ -7,17 +7,15 @@ using Akka.Configuration; using Akka.Event; +using Akka.Actor; +using Akka.Persistence.Journal; +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading.Tasks; namespace Akka.Persistence.TestKit { - using Akka.Actor; - using Akka.Persistence; - using Akka.Persistence.Journal; - using System; - using System.Collections.Generic; - using System.Collections.Immutable; - using System.Threading.Tasks; - /// /// In-memory persistence journal implementation which behavior could be controlled by interceptors. /// diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs index 838f06ce40e..2be0704d986 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs @@ -5,6 +5,9 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; +using Akka.Event; + namespace Akka.Persistence.TestKit { using System.Threading.Tasks; @@ -19,22 +22,36 @@ public class TestSnapshotStore : MemorySnapshotStore private ISnapshotStoreInterceptor _saveInterceptor = SnapshotStoreInterceptors.Noop.Instance; private ISnapshotStoreInterceptor _loadInterceptor = SnapshotStoreInterceptors.Noop.Instance; private ISnapshotStoreInterceptor _deleteInterceptor = SnapshotStoreInterceptors.Noop.Instance; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public TestSnapshotStore(Config snapshotStoreConfig) + { + DebugEnabled = snapshotStoreConfig.GetBoolean("debug", false); + } + + private bool DebugEnabled { get; } protected override bool ReceivePluginInternal(object message) { switch (message) { case UseSaveInterceptor use: + if(DebugEnabled) + _log.Info("Using save interceptor {0}", use.Interceptor.GetType().Name); _saveInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseLoadInterceptor use: + if(DebugEnabled) + _log.Info("Using load interceptor {0}", use.Interceptor.GetType().Name); _loadInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseDeleteInterceptor use: + if(DebugEnabled) + _log.Info("Using delete interceptor {0}", use.Interceptor.GetType().Name); _deleteInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; @@ -46,29 +63,55 @@ protected override bool ReceivePluginInternal(object message) protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} saving using interceptor {1}", metadata, _saveInterceptor.GetType().Name); + await _saveInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata)); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} saving using interceptor {1}", metadata, _saveInterceptor.GetType().Name); await base.SaveAsync(metadata, snapshot); } protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} loading using interceptor {1}", persistenceId, _loadInterceptor.GetType().Name); await _loadInterceptor.InterceptAsync(persistenceId, criteria); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} loading using interceptor {1}", persistenceId, _loadInterceptor.GetType().Name); + return await base.LoadAsync(persistenceId, criteria); } protected override async Task DeleteAsync(SnapshotMetadata metadata) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} deletion using interceptor {1}", metadata, _deleteInterceptor.GetType().Name); + await _deleteInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata)); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} deletion using interceptor {1}", metadata, _deleteInterceptor.GetType().Name); + await base.DeleteAsync(metadata); } protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} deletion using interceptor {1}", persistenceId, _deleteInterceptor.GetType().Name); + await _deleteInterceptor.InterceptAsync(persistenceId, criteria); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} deletion using interceptor {1}", persistenceId, _deleteInterceptor.GetType().Name); + await base.DeleteAsync(persistenceId, criteria); } - static SnapshotSelectionCriteria ToSelectionCriteria(SnapshotMetadata metadata) + private static SnapshotSelectionCriteria ToSelectionCriteria(SnapshotMetadata metadata) => new(metadata.SequenceNr, metadata.Timestamp, metadata.SequenceNr, metadata.Timestamp); /// diff --git a/src/core/Akka.Persistence.TestKit/config.conf b/src/core/Akka.Persistence.TestKit/config.conf index a416e0fb7ca..f03792b2baa 100644 --- a/src/core/Akka.Persistence.TestKit/config.conf +++ b/src/core/Akka.Persistence.TestKit/config.conf @@ -20,6 +20,9 @@ akka { test { class = "Akka.Persistence.TestKit.TestSnapshotStore, Akka.Persistence.TestKit" plugin-dispatcher = "akka.actor.default-dispatcher" + + # enables debug mode, which adds verbose logging to each of the TestSnapshotStore stages + debug = false } } } From 852375f3c1a79e00b630515ed1a7a712f3e70d44 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 19:11:57 -0600 Subject: [PATCH 13/26] attempting to fix some continuation mess inside the snapshot store --- .../Snapshot/SnapshotStore.cs | 127 +++++++++++------- 1 file changed, 77 insertions(+), 50 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index 8474c1ab4bb..bcb1c15d219 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -32,7 +32,8 @@ protected SnapshotStore() var extension = Persistence.Instance.Apply(Context.System); if (extension == null) { - throw new ArgumentException("Couldn't initialize SnapshotStore instance, because associated Persistence extension has not been used in current actor system context."); + throw new ArgumentException( + "Couldn't initialize SnapshotStore instance, because associated Persistence extension has not been used in current actor system context."); } _publish = extension.Settings.Internal.PublishPluginCommands; @@ -57,35 +58,11 @@ private bool ReceiveSnapshotStore(object message) if (message is LoadSnapshot loadSnapshot) { - if (loadSnapshot.Criteria == SnapshotSelectionCriteria.None) - { - senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr)); - } - else - { - _breaker.WithCircuitBreaker(() => LoadAsync(loadSnapshot.PersistenceId, loadSnapshot.Criteria.Limit(loadSnapshot.ToSequenceNr))) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new LoadSnapshotResult(t.Result, loadSnapshot.ToSequenceNr) as ISnapshotResponse - : new LoadSnapshotFailed(t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("LoadAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(senderPersistentActor); - } + LoadSnapshotAsync(loadSnapshot, senderPersistentActor); } else if (message is SaveSnapshot saveSnapshot) { - var metadata = new SnapshotMetadata(saveSnapshot.Metadata.PersistenceId, saveSnapshot.Metadata.SequenceNr, DateTime.UtcNow); - - _breaker.WithCircuitBreaker(() => SaveAsync(metadata, saveSnapshot.Snapshot)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new SaveSnapshotSuccess(metadata) as ISnapshotResponse - : new SaveSnapshotFailure(saveSnapshot.Metadata, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("SaveAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor); + SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); } else if (message is SaveSnapshotSuccess) { @@ -103,7 +80,7 @@ private bool ReceiveSnapshotStore(object message) try { ReceivePluginInternal(message); - _breaker.WithCircuitBreaker(() => DeleteAsync(saveSnapshotFailure.Metadata)); + _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), state => state.ss.DeleteAsync(state.msg.Metadata)); } finally { @@ -112,21 +89,7 @@ private bool ReceiveSnapshotStore(object message) } else if (message is DeleteSnapshot deleteSnapshot) { - var eventStream = Context.System.EventStream; - _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshot.Metadata)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new DeleteSnapshotSuccess(deleteSnapshot.Metadata) as ISnapshotResponse - : new DeleteSnapshotFailure(deleteSnapshot.Metadata, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor) - .ContinueWith(_ => - { - if (_publish) - eventStream.Publish(message); - }, _continuationOptions); + DeleteSnapshotAsync(deleteSnapshot, self, senderPersistentActor); } else if (message is DeleteSnapshotSuccess) { @@ -155,12 +118,13 @@ private bool ReceiveSnapshotStore(object message) var eventStream = Context.System.EventStream; _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshots.PersistenceId, deleteSnapshots.Criteria)) .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new DeleteSnapshotsSuccess(deleteSnapshots.Criteria) as ISnapshotResponse - : new DeleteSnapshotsFailure(deleteSnapshots.Criteria, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) + ? new DeleteSnapshotsSuccess(deleteSnapshots.Criteria) as ISnapshotResponse + : new DeleteSnapshotsFailure(deleteSnapshots.Criteria, + t.IsFaulted + ? TryUnwrapException(t.Exception) + : new OperationCanceledException( + "DeleteAsync canceled, possibly due to timing out.")), + _continuationOptions) .PipeTo(self, senderPersistentActor) .ContinueWith(_ => { @@ -191,9 +155,71 @@ private bool ReceiveSnapshotStore(object message) } } else return false; + return true; } + private async Task DeleteSnapshotAsync(DeleteSnapshot deleteSnapshot, IActorRef self, + IActorRef senderPersistentActor) + { + var eventStream = Context.System.EventStream; + + try + { + await _breaker.WithCircuitBreaker((msg: deleteSnapshot, ss: this), + state => state.ss.DeleteAsync(state.msg.Metadata)); + + self.Tell(new DeleteSnapshotSuccess(deleteSnapshot.Metadata), senderPersistentActor); + } + catch (Exception ex) + { + self.Tell(new DeleteSnapshotFailure(deleteSnapshot.Metadata, ex), senderPersistentActor); + } + + if (_publish) + eventStream.Publish(deleteSnapshot); + } + + private async Task SaveSnapshotAsync(SaveSnapshot saveSnapshot, IActorRef self, IActorRef senderPersistentActor) + { + var metadata = new SnapshotMetadata(saveSnapshot.Metadata.PersistenceId, + saveSnapshot.Metadata.SequenceNr, DateTime.UtcNow); + + try + { + await _breaker.WithCircuitBreaker((msg: metadata, save: saveSnapshot, ss: this), + state => state.ss.SaveAsync(state.msg, state.save)); + self.Tell(new SaveSnapshotSuccess(metadata), senderPersistentActor); + } + catch (Exception ex) + { + self.Tell(new SaveSnapshotFailure(metadata, ex), senderPersistentActor); + } + } + + private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef senderPersistentActor) + { + if (loadSnapshot.Criteria == SnapshotSelectionCriteria.None) + { + senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr)); + } + else + { + try + { + var result = await _breaker.WithCircuitBreaker((msg: loadSnapshot, ss: this), + state => state.ss.LoadAsync(state.msg.PersistenceId, + state.msg.Criteria.Limit(state.msg.ToSequenceNr))); + + senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr)); + } + catch (Exception ex) + { + senderPersistentActor.Tell(new LoadSnapshotFailed(ex)); + } + } + } + private Exception TryUnwrapException(Exception e) { var aggregateException = e as AggregateException; @@ -203,6 +229,7 @@ private Exception TryUnwrapException(Exception e) if (aggregateException.InnerExceptions.Count == 1) return aggregateException.InnerExceptions[0]; } + return e; } @@ -256,4 +283,4 @@ protected virtual bool ReceivePluginInternal(object message) return false; } } -} +} \ No newline at end of file From 0de3cc9ecf420fb9c7197ef352efe216f9c5e2f0 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 19:16:49 -0600 Subject: [PATCH 14/26] fixed final cases --- .../Snapshot/SnapshotStore.cs | 195 +++++++++--------- 1 file changed, 100 insertions(+), 95 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index bcb1c15d219..67a6e333925 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -55,108 +55,113 @@ private bool ReceiveSnapshotStore(object message) { var senderPersistentActor = Sender; // Sender is PersistentActor var self = Self; //Self MUST BE CLOSED OVER here, or the code below will be subject to race conditions - - if (message is LoadSnapshot loadSnapshot) - { - LoadSnapshotAsync(loadSnapshot, senderPersistentActor); - } - else if (message is SaveSnapshot saveSnapshot) - { - SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); - } - else if (message is SaveSnapshotSuccess) - { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } - } - else if (message is SaveSnapshotFailure saveSnapshotFailure) - { - try - { - ReceivePluginInternal(message); - _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), state => state.ss.DeleteAsync(state.msg.Metadata)); - } - finally - { - senderPersistentActor.Tell(message); - } - } - else if (message is DeleteSnapshot deleteSnapshot) +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + switch (message) { - DeleteSnapshotAsync(deleteSnapshot, self, senderPersistentActor); - } - else if (message is DeleteSnapshotSuccess) - { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } - } - else if (message is DeleteSnapshotFailure) - { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } - } - else if (message is DeleteSnapshots deleteSnapshots) - { - var eventStream = Context.System.EventStream; - _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshots.PersistenceId, deleteSnapshots.Criteria)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new DeleteSnapshotsSuccess(deleteSnapshots.Criteria) as ISnapshotResponse - : new DeleteSnapshotsFailure(deleteSnapshots.Criteria, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException( - "DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor) - .ContinueWith(_ => + case LoadSnapshot loadSnapshot: + + LoadSnapshotAsync(loadSnapshot, senderPersistentActor); + break; + case SaveSnapshot saveSnapshot: + SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); + break; + case SaveSnapshotSuccess: + try { - if (_publish) - eventStream.Publish(message); - }, _continuationOptions); + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case SaveSnapshotFailure saveSnapshotFailure: + try + { + ReceivePluginInternal(message); + _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), state => state.ss.DeleteAsync(state.msg.Metadata)); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshot deleteSnapshot: + DeleteSnapshotAsync(deleteSnapshot, self, senderPersistentActor); + break; + case DeleteSnapshotSuccess: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshotFailure: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshots deleteSnapshots: + DeleteSnapshotsAsync(deleteSnapshots, self, senderPersistentActor); + break; + case DeleteSnapshotsSuccess: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshotsFailure: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + default: + return false; } - else if (message is DeleteSnapshotsSuccess) +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + return true; + } + + private async Task DeleteSnapshotsAsync(DeleteSnapshots deleteSnapshots, IActorRef self, IActorRef senderPersistentActor) + { + var eventStream = Context.System.EventStream; + try { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } + await _breaker.WithCircuitBreaker((msg: deleteSnapshots, ss: this), + state => state.ss.DeleteAsync(state.msg.PersistenceId, state.msg.Criteria)); + + self.Tell(new DeleteSnapshotsSuccess(deleteSnapshots.Criteria), senderPersistentActor); } - else if (message is DeleteSnapshotsFailure) + catch (Exception ex) { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } + self.Tell(new DeleteSnapshotsFailure(deleteSnapshots.Criteria, ex), senderPersistentActor); } - else return false; - - return true; + + if (_publish) + eventStream.Publish(deleteSnapshots); } private async Task DeleteSnapshotAsync(DeleteSnapshot deleteSnapshot, IActorRef self, From ef22f501cbd0d69fb65de5d46985c744fcfb4b7b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 19:17:05 -0600 Subject: [PATCH 15/26] formatting --- src/core/Akka.Persistence/Snapshot/SnapshotStore.cs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index 67a6e333925..bb0c7a6ef81 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -80,7 +80,8 @@ private bool ReceiveSnapshotStore(object message) try { ReceivePluginInternal(message); - _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), state => state.ss.DeleteAsync(state.msg.Metadata)); + _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), + state => state.ss.DeleteAsync(state.msg.Metadata)); } finally { @@ -145,7 +146,8 @@ private bool ReceiveSnapshotStore(object message) return true; } - private async Task DeleteSnapshotsAsync(DeleteSnapshots deleteSnapshots, IActorRef self, IActorRef senderPersistentActor) + private async Task DeleteSnapshotsAsync(DeleteSnapshots deleteSnapshots, IActorRef self, + IActorRef senderPersistentActor) { var eventStream = Context.System.EventStream; try @@ -159,7 +161,7 @@ await _breaker.WithCircuitBreaker((msg: deleteSnapshots, ss: this), { self.Tell(new DeleteSnapshotsFailure(deleteSnapshots.Criteria, ex), senderPersistentActor); } - + if (_publish) eventStream.Publish(deleteSnapshots); } @@ -168,7 +170,7 @@ private async Task DeleteSnapshotAsync(DeleteSnapshot deleteSnapshot, IActorRef IActorRef senderPersistentActor) { var eventStream = Context.System.EventStream; - + try { await _breaker.WithCircuitBreaker((msg: deleteSnapshot, ss: this), @@ -180,7 +182,7 @@ await _breaker.WithCircuitBreaker((msg: deleteSnapshot, ss: this), { self.Tell(new DeleteSnapshotFailure(deleteSnapshot.Metadata, ex), senderPersistentActor); } - + if (_publish) eventStream.Publish(deleteSnapshot); } From a33110a832e5792fd8527aa1b1da4398356ef77f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 19:44:55 -0600 Subject: [PATCH 16/26] fixed snapshot saving errors --- .../LocalSnapshotStoreSpec.cs | 13 +++++++++---- src/core/Akka.Persistence/Snapshot/SnapshotStore.cs | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs b/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs index b728d354816..4c5866915f7 100644 --- a/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs +++ b/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs @@ -46,10 +46,15 @@ public void LocalSnapshotStore_can_snapshot_actors_with_PersistenceId_containing ExpectMsg(); SnapshotStore.Tell(new LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, long.MaxValue), TestActor); - ExpectMsg(res => - res.Snapshot.Snapshot.Equals("sample data") - && res.Snapshot.Metadata.PersistenceId == pid - && res.Snapshot.Metadata.SequenceNr == 1); + ExpectMsg(IsMessage); + bool IsMessage(LoadSnapshotResult res) + { + var result = res.Snapshot.Snapshot.Equals("sample data") + && res.Snapshot.Metadata.PersistenceId == pid + && res.Snapshot.Metadata.SequenceNr == 1; + + return result; + } } } } diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index bb0c7a6ef81..158b94ca2b8 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -195,7 +195,7 @@ private async Task SaveSnapshotAsync(SaveSnapshot saveSnapshot, IActorRef self, try { await _breaker.WithCircuitBreaker((msg: metadata, save: saveSnapshot, ss: this), - state => state.ss.SaveAsync(state.msg, state.save)); + state => state.ss.SaveAsync(state.msg, state.save.Snapshot)); self.Tell(new SaveSnapshotSuccess(metadata), senderPersistentActor); } catch (Exception ex) From b8a7245a26106bc625586a1d70ef31456a9a4cb7 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 24 Jan 2024 07:54:20 -0600 Subject: [PATCH 17/26] enable DEBUG logging --- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index a590aa2e3e7..48840062f54 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -26,6 +26,7 @@ public class Bug4762FixSpec : PersistenceTestKit // create a Config that enables debug mode on the TestJournal private static readonly Config Config = ConfigurationFactory.ParseString(""" + akka.loglevel = DEBUG akka.persistence.journal.test.debug = on akka.persistence.snapshot-store.test.debug = on """); From 4af443afc4482d551d5b6dfabbf86e27b1a19cb0 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 24 Jan 2024 07:59:09 -0600 Subject: [PATCH 18/26] more debug logging --- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index 48840062f54..d1a2682b5bd 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -73,6 +73,12 @@ protected override void OnCommand(object message) } } + protected override void PreStart() + { + _log.Info("Starting up and beginning recovery"); + base.PreStart(); + } + protected override void OnRecover(object message) { _log.Info("Received recover {0}", message); From 663a88da56e2b00f6aa0b0cac53f7196c1469da8 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 25 Jan 2024 08:21:02 -0600 Subject: [PATCH 19/26] more debug logging --- .../SnapshotStore/TestSnapshotStore.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs index 2be0704d986..a576a61f2e1 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs @@ -33,6 +33,9 @@ public TestSnapshotStore(Config snapshotStoreConfig) protected override bool ReceivePluginInternal(object message) { + if(DebugEnabled) + _log.Info("Received plugin internal message {0}", message); + switch (message) { case UseSaveInterceptor use: From a221b7efe5cd1ff6d36e61fcb66cc917c62620b2 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 00:43:49 +0700 Subject: [PATCH 20/26] Clean-up and modernize ReplayFilter --- .../Akka.Persistence/Journal/ReplayFilter.cs | 228 ++++++++++-------- 1 file changed, 121 insertions(+), 107 deletions(-) diff --git a/src/core/Akka.Persistence/Journal/ReplayFilter.cs b/src/core/Akka.Persistence/Journal/ReplayFilter.cs index e6fda67e051..476a1521544 100644 --- a/src/core/Akka.Persistence/Journal/ReplayFilter.cs +++ b/src/core/Akka.Persistence/Journal/ReplayFilter.cs @@ -131,28 +131,63 @@ public static Props Props(IActorRef persistentActor, ReplayFilterMode mode, int /// TBD protected override bool Receive(object message) { - if (message is ReplayedMessage value) + switch (message) { - if (DebugEnabled && _log.IsDebugEnabled) - _log.Debug($"Replay: {value.Persistent}"); + case ReplayedMessage value: + if (DebugEnabled && _log.IsDebugEnabled) + _log.Debug($"Replay: {value.Persistent}"); - try - { - if (_buffer.Count == WindowSize) + try { - var msg = _buffer.First; - _buffer.RemoveFirst(); - PersistentActor.Tell(msg.Value, ActorRefs.NoSender); - } + if (_buffer.Count == WindowSize) + { + var msg = _buffer.First; + _buffer.RemoveFirst(); + PersistentActor.Tell(msg.Value, ActorRefs.NoSender); + } - if (value.Persistent.WriterGuid.Equals(_writerUuid)) - { - // from same writer - if (value.Persistent.SequenceNr < _sequenceNr) + if (value.Persistent.WriterGuid.Equals(_writerUuid)) { - var errMsg = $@"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}] as - the sequenceNr should be equal to or greater than already-processed event [sequenceNr={_sequenceNr}, writerUUID={_writerUuid}] from the same writer, for the same persistenceId [{value.Persistent.PersistenceId}]. - Perhaps, events were journaled out of sequence, or duplicate PersistentId for different entities?"; + // from same writer + if (value.Persistent.SequenceNr < _sequenceNr) + { + var errMsg = + $"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}] as " + + $"the sequenceNr should be equal to or greater than already-processed event [sequenceNr={_sequenceNr}, " + + $"writerUUID={_writerUuid}] from the same writer, for the same persistenceId [{value.Persistent.PersistenceId}]. " + + "Perhaps, events were journaled out of sequence, or duplicate PersistentId for different entities?"; + LogIssue(errMsg); + switch (Mode) + { + case ReplayFilterMode.RepairByDiscardOld: + //discard + break; + case ReplayFilterMode.Fail: + throw new IllegalStateException(errMsg); + case ReplayFilterMode.Warn: + _buffer.AddLast(value); + break; + case ReplayFilterMode.Disabled: + throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); + } + } + else + { + // note that it is alright with == _sequenceNr, since such may be emitted by EventSeq + _buffer.AddLast(value); + _sequenceNr = value.Persistent.SequenceNr; + } + } + else if (_oldWriters.Contains(value.Persistent.WriterGuid)) + { + // from old writer + var errMsg = + $"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}]. " + + $"There was already a newer writer whose last replayed event was [sequenceNr={_sequenceNr}, " + + $"writerUUID={_writerUuid}] for the same persistenceId [{value.Persistent.PersistenceId}]. " + + "Perhaps, the old writer kept journaling messages after the new writer created, or duplicate PersistentId for different entities?"; LogIssue(errMsg); switch (Mode) { @@ -166,98 +201,78 @@ protected override bool Receive(object message) break; case ReplayFilterMode.Disabled: throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); } } else { - // note that it is alright with == _sequenceNr, since such may be emitted by EventSeq - _buffer.AddLast(value); + // from new writer + if (!string.IsNullOrEmpty(_writerUuid)) + _oldWriters.AddLast(_writerUuid); + if (_oldWriters.Count > MaxOldWriters) + _oldWriters.RemoveFirst(); + _writerUuid = value.Persistent.WriterGuid; _sequenceNr = value.Persistent.SequenceNr; - } - } - else if (_oldWriters.Contains(value.Persistent.WriterGuid)) - { - // from old writer - var errMsg = $@"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}]. - There was already a newer writer whose last replayed event was [sequenceNr={_sequenceNr}, writerUUID={_writerUuid}] for the same persistenceId [{value.Persistent.PersistenceId}]. - Perhaps, the old writer kept journaling messages after the new writer created, or duplicate PersistentId for different entities?"; - LogIssue(errMsg); - switch (Mode) - { - case ReplayFilterMode.RepairByDiscardOld: - //discard - break; - case ReplayFilterMode.Fail: - throw new IllegalStateException(errMsg); - case ReplayFilterMode.Warn: - _buffer.AddLast(value); - break; - case ReplayFilterMode.Disabled: - throw new ArgumentException("Mode must not be Disabled"); - } - } - else - { - // from new writer - if (!string.IsNullOrEmpty(_writerUuid)) - _oldWriters.AddLast(_writerUuid); - if (_oldWriters.Count > MaxOldWriters) - _oldWriters.RemoveFirst(); - _writerUuid = value.Persistent.WriterGuid; - _sequenceNr = value.Persistent.SequenceNr; - // clear the buffer from messages from other writers with higher SequenceNr - var node = _buffer.First; - while (node != null) - { - var next = node.Next; - var msg = node.Value; - if (msg.Persistent.SequenceNr >= _sequenceNr) + // clear the buffer from messages from other writers with higher SequenceNr + var node = _buffer.First; + while (node != null) { - var errMsg = $@"Invalid replayed event [sequenceNr=${value.Persistent.SequenceNr}, writerUUID=${value.Persistent.WriterGuid}] from a new writer. - An older writer already sent an event [sequenceNr=${msg.Persistent.SequenceNr}, writerUUID=${msg.Persistent.WriterGuid}] whose sequence number was equal or greater for the same persistenceId [${value.Persistent.PersistenceId}]. - Perhaps, the new writer journaled the event out of sequence, or duplicate PersistentId for different entities?"; - LogIssue(errMsg); - switch (Mode) + var next = node.Next; + var msg = node.Value; + if (msg.Persistent.SequenceNr >= _sequenceNr) { - case ReplayFilterMode.RepairByDiscardOld: - _buffer.Remove(node); - //discard - break; - case ReplayFilterMode.Fail: - throw new IllegalStateException(errMsg); - case ReplayFilterMode.Warn: - // keep - break; - case ReplayFilterMode.Disabled: - throw new ArgumentException("Mode must not be Disabled"); + var errMsg = + $"Invalid replayed event [sequenceNr=${value.Persistent.SequenceNr}, writerUUID=${value.Persistent.WriterGuid}] from a new writer. " + + $"An older writer already sent an event [sequenceNr=${msg.Persistent.SequenceNr}, " + + $"writerUUID=${msg.Persistent.WriterGuid}] whose sequence number was equal or greater " + + $"for the same persistenceId [${value.Persistent.PersistenceId}]. " + + "Perhaps, the new writer journaled the event out of sequence, or duplicate PersistentId for different entities?"; + LogIssue(errMsg); + switch (Mode) + { + case ReplayFilterMode.RepairByDiscardOld: + _buffer.Remove(node); + //discard + break; + case ReplayFilterMode.Fail: + throw new IllegalStateException(errMsg); + case ReplayFilterMode.Warn: + // keep + break; + case ReplayFilterMode.Disabled: + throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); + } } + node = next; } - node = next; + _buffer.AddLast(value); } - _buffer.AddLast(value); } + catch (IllegalStateException ex) + { + if (Mode == ReplayFilterMode.Fail) + Fail(ex); + else + throw; + } + return true; + + case RecoverySuccess or ReplayMessagesFailure: + if (DebugEnabled) + _log.Debug($"Replay completed: {message}"); - } - catch (IllegalStateException ex) - { - if (Mode == ReplayFilterMode.Fail) - Fail(ex); - else - throw; - } - } - else if (message is RecoverySuccess or ReplayMessagesFailure) - { - if (DebugEnabled) - _log.Debug($"Replay completed: {message}"); - - SendBuffered(); - PersistentActor.Tell(message, ActorRefs.NoSender); - Context.Stop(Self); + SendBuffered(); + PersistentActor.Tell(message, ActorRefs.NoSender); + Context.Stop(Self); + return true; + + default: + return false; } - else return false; - return true; } private void SendBuffered() @@ -282,6 +297,8 @@ private void LogIssue(string errMsg) break; case ReplayFilterMode.Disabled: throw new ArgumentException("mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)}: {Mode} value. Message: {errMsg}"); } } @@ -291,20 +308,17 @@ private void Fail(IllegalStateException cause) PersistentActor.Tell(new ReplayMessagesFailure(cause), ActorRefs.NoSender); Context.Become(message => { - if (message is ReplayedMessage) - { - // discard - } - else if (message is RecoverySuccess or ReplayMessagesFailure) - { - Context.Stop(Self); - } - else + switch (message) { - return false; + case ReplayedMessage: + // discard + return true; + case RecoverySuccess or ReplayMessagesFailure: + Context.Stop(Self); + return true; + default: + return false; } - - return true; }); } } From f963565ca284a87d082899ef8ad776576ea2d8e6 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 00:44:34 +0700 Subject: [PATCH 21/26] Fix configuration, make sure that default values are sensible --- src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index d3f88339d93..b08088208a8 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -54,9 +54,9 @@ protected AsyncWriteJournal() var config = extension.ConfigFor(Self); _breaker = new CircuitBreaker( Context.System.Scheduler, - config.GetInt("circuit-breaker.max-failures", 0), - config.GetTimeSpan("circuit-breaker.call-timeout", null), - config.GetTimeSpan("circuit-breaker.reset-timeout", null)); + config.GetInt("circuit-breaker.max-failures", 10), + config.GetTimeSpan("circuit-breaker.call-timeout", TimeSpan.FromSeconds(10)), + config.GetTimeSpan("circuit-breaker.reset-timeout", TimeSpan.FromSeconds(30))); var replayFilterMode = config.GetString("replay-filter.mode", "").ToLowerInvariant(); switch (replayFilterMode) @@ -77,8 +77,8 @@ protected AsyncWriteJournal() throw new ConfigurationException($"Invalid replay-filter.mode [{replayFilterMode}], supported values [off, repair-by-discard-old, fail, warn]"); } _isReplayFilterEnabled = _replayFilterMode != ReplayFilterMode.Disabled; - _replayFilterWindowSize = config.GetInt("replay-filter.window-size", 0); - _replayFilterMaxOldWriters = config.GetInt("replay-filter.max-old-writers", 0); + _replayFilterWindowSize = config.GetInt("replay-filter.window-size", 100); + _replayFilterMaxOldWriters = config.GetInt("replay-filter.max-old-writers", 10); _replayDebugEnabled = config.GetBoolean("replay-filter.debug", false); _resequencer = Context.System.ActorOf(Props.Create(() => new Resequencer())); From 7788e726e9b383876e1da88b8a13b1ee68156d94 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 00:46:08 +0700 Subject: [PATCH 22/26] Fix journal interceptor, make sure that Ask operation are short enough so that it wouldn't be masked by other exceptions --- .../Journal/JournalRecoveryBehaviorSetter.cs | 2 +- .../Journal/JournalWriteBehaviorSetter.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs index b9cc55c534c..ffc76445643 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs @@ -26,7 +26,7 @@ internal JournalRecoveryBehaviorSetter(IActorRef journal) public Task SetInterceptorAsync(IJournalInterceptor interceptor) => _journal.Ask( new TestJournal.UseRecoveryInterceptor(interceptor), - TimeSpan.FromSeconds(3) + TimeSpan.FromSeconds(0.5) ); } } diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs index 7e554be403a..ed9f2e67a60 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs @@ -26,7 +26,7 @@ internal JournalWriteBehaviorSetter(IActorRef journal) public Task SetInterceptorAsync(IJournalInterceptor interceptor) => _journal.Ask( new TestJournal.UseWriteInterceptor(interceptor), - TimeSpan.FromSeconds(3) + TimeSpan.FromSeconds(0.5) ); } } From aeb68c85871a1c65b4bd55fa331f00968740e5e0 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 00:46:36 +0700 Subject: [PATCH 23/26] Turn on relpay filter debug mode --- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index d1a2682b5bd..f89bab5472c 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -28,6 +28,7 @@ public class Bug4762FixSpec : PersistenceTestKit ConfigurationFactory.ParseString(""" akka.loglevel = DEBUG akka.persistence.journal.test.debug = on + akka.persistence.journal.test.replay-filter.debug = on akka.persistence.snapshot-store.test.debug = on """); From afc24df297b0ab7420d80599b6291af15949f9b7 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 01:20:05 +0700 Subject: [PATCH 24/26] Bump timeout, testing if recovery is delayed or permanently stuck --- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index f89bab5472c..8d4b0f7890d 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -13,6 +13,7 @@ using Akka.Actor; using Akka.Configuration; using Akka.Event; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -98,7 +99,7 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( var command = new WriteMessage(); actor.Tell(command, actor); - await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync(10.Seconds()); await probe.ExpectMsgAsync(); await probe.ExpectMsgAsync(); await probe.ExpectNoMsgAsync(3000); From 8f84ff5079fef870b653fa3420e70229397021cf Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 23:48:36 +0700 Subject: [PATCH 25/26] Fix AsyncWriteJournal replay handler not closing over self --- src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index b08088208a8..d4103bfee61 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -251,6 +251,7 @@ private void HandleReplayMessages(ReplayMessages message) ? Context.ActorOf(ReplayFilter.Props(message.PersistentActor, _replayFilterMode, _replayFilterWindowSize, _replayFilterMaxOldWriters, _replayDebugEnabled)) : message.PersistentActor; + var self = Context.Self; var context = Context; var eventStream = Context.System.EventStream; @@ -294,18 +295,18 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr // operation failed because a CancellationToken was invoked // wrap the original exception and throw it, with some additional callsite context var newEx = new OperationCanceledException("ReplayMessagesAsync canceled, possibly due to timing out.", cx); - replyTo.Tell(new ReplayMessagesFailure(newEx)); + replyTo.Tell(new ReplayMessagesFailure(newEx), self); } catch (Exception ex) { - replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex))); + replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex)), self); } return; void CompleteHighSeqNo(long highSeqNo) { - replyTo.Tell(new RecoverySuccess(highSeqNo)); + replyTo.Tell(new RecoverySuccess(highSeqNo), self); if (CanPublish) { From 68e2192bd869721135c7bd975b650e320349f536 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 2 Feb 2024 00:01:17 +0700 Subject: [PATCH 26/26] Fix SnapshotStore snapshot load not closing over self --- src/core/Akka.Persistence/Snapshot/SnapshotStore.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index 158b94ca2b8..fbd0693e652 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -60,7 +60,7 @@ private bool ReceiveSnapshotStore(object message) { case LoadSnapshot loadSnapshot: - LoadSnapshotAsync(loadSnapshot, senderPersistentActor); + LoadSnapshotAsync(loadSnapshot, self, senderPersistentActor); break; case SaveSnapshot saveSnapshot: SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); @@ -204,11 +204,11 @@ await _breaker.WithCircuitBreaker((msg: metadata, save: saveSnapshot, ss: this), } } - private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef senderPersistentActor) + private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef self, IActorRef senderPersistentActor) { if (loadSnapshot.Criteria == SnapshotSelectionCriteria.None) { - senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr)); + senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr), self); } else { @@ -218,11 +218,11 @@ private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef sender state => state.ss.LoadAsync(state.msg.PersistenceId, state.msg.Criteria.Limit(state.msg.ToSequenceNr))); - senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr)); + senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr), self); } catch (Exception ex) { - senderPersistentActor.Tell(new LoadSnapshotFailed(ex)); + senderPersistentActor.Tell(new LoadSnapshotFailed(ex), self); } } }