Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attempting to fix racy Akka.Persistence.TestKit.Tests #7068

Open
wants to merge 34 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
adab09d
attempting to fix racy Akka.Persistence.TestKit.Tests
Aaronontheweb Jan 19, 2024
e1e3c02
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Aaronontheweb Jan 19, 2024
185d5ad
added debug log
Aaronontheweb Jan 19, 2024
272b962
looking into some suspicious `await` calls inside the `AsyncWriteJour…
Aaronontheweb Jan 19, 2024
fa5bd37
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Aaronontheweb Jan 23, 2024
7b8b538
minor code clean up
Aaronontheweb Jan 23, 2024
ef8fd68
`MemoryJournal` cleanup
Aaronontheweb Jan 23, 2024
caf93d7
added more robust external error logging
Aaronontheweb Jan 23, 2024
30897ff
disable sync thread dispatcher
Aaronontheweb Jan 23, 2024
fbb0980
Revert "disable sync thread dispatcher"
Aaronontheweb Jan 23, 2024
0178cd3
added debugging capabilities to `TestJournal`
Aaronontheweb Jan 23, 2024
7206cca
added debug logging to `CounterActor` specs
Aaronontheweb Jan 23, 2024
daf2c8c
added debug logging to `Bug4762FixSpec`
Aaronontheweb Jan 23, 2024
26108a7
added debug logging to `TestSnapshotStore`
Aaronontheweb Jan 23, 2024
852375f
attempting to fix some continuation mess inside the snapshot store
Aaronontheweb Jan 24, 2024
0de3cc9
fixed final cases
Aaronontheweb Jan 24, 2024
ef22f50
formatting
Aaronontheweb Jan 24, 2024
a33110a
fixed snapshot saving errors
Aaronontheweb Jan 24, 2024
b8a7245
enable DEBUG logging
Aaronontheweb Jan 24, 2024
4af443a
more debug logging
Aaronontheweb Jan 24, 2024
190e972
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Aaronontheweb Jan 24, 2024
663a88d
more debug logging
Aaronontheweb Jan 25, 2024
8192cc8
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Arkatufus Jan 31, 2024
a221b7e
Clean-up and modernize ReplayFilter
Arkatufus Jan 31, 2024
f963565
Fix configuration, make sure that default values are sensible
Arkatufus Jan 31, 2024
7788e72
Fix journal interceptor, make sure that Ask operation are short enoug…
Arkatufus Jan 31, 2024
aeb68c8
Turn on relpay filter debug mode
Arkatufus Jan 31, 2024
c777bc6
Merge branch 'fix-AkkaPersistenceTestKitTests' of github.com:Aaronont…
Arkatufus Jan 31, 2024
afc24df
Bump timeout, testing if recovery is delayed or permanently stuck
Arkatufus Jan 31, 2024
8f84ff5
Fix AsyncWriteJournal replay handler not closing over self
Arkatufus Feb 1, 2024
68e2192
Fix SnapshotStore snapshot load not closing over self
Arkatufus Feb 1, 2024
a7da6a6
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Arkatufus Feb 1, 2024
55349bf
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Arkatufus Feb 5, 2024
8050937
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Aaronontheweb Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ protected override void OnRecover(object message)
return;
}
}

protected override void PostStop()
{
_log.Info("Shutting down");
}

protected override void PreStart()
{
_log.Info("Starting up");
}
}

public class CounterActorTests : PersistenceTestKit
Expand All @@ -86,9 +96,10 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s
var counterProps = Props.Create(() => new CounterActor("test"));
var actor = ActorOf(counterProps, "counter");

Watch(actor);
Sys.Log.Info("Messaging actor");
await WatchAsync(actor);
actor.Tell("inc", TestActor);
await ExpectMsgAsync<Terminated>(TimeSpan.FromSeconds(3));
await ExpectTerminatedAsync(actor);

// need to restart actor
actor = ActorOf(counterProps, "counter1");
Expand Down
1 change: 0 additions & 1 deletion src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once(
return WithJournalWrite(write => write.Pass(), async () =>
{
var actor = ActorOf(() => new TestActor2(probe));
Watch(actor);

var command = new WriteMessage();
actor.Tell(command, actor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal class Noop : IJournalInterceptor
{
public static readonly IJournalInterceptor Instance = new Noop();

public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true);
public Task InterceptAsync(IPersistentRepresentation message) => Task.CompletedTask;
}

internal class Failure : IJournalInterceptor
Expand Down
27 changes: 15 additions & 12 deletions src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,20 +259,11 @@ private void HandleReplayMessages(ReplayMessages message)

async Task ExecuteHighestSequenceNr()
{
void CompleteHighSeqNo(long highSeqNo)
{
replyTo.Tell(new RecoverySuccess(highSeqNo));

if (CanPublish)
{
eventStream.Publish(message);
}
}

try
{
var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), state =>
state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom));
state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom))
.ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if there's some issues with the Akka.Persistence Journal capturing context here, since these tasks run as detatched. I doubt it's the issue but it's one of the things that stood out to me here.

Copy link
Contributor

@Arkatufus Arkatufus Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task isn't detached though, its actually awaited inside the circuit breaker, except when the circuit breaker is open, in which it just throws an open circuit exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true - we have some other racy CircuitBreaker specs too (independently from any Akka.Persistence plugin) - wonder if that's the common issue?

Copy link
Contributor

@Arkatufus Arkatufus Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not the CircuitBreaker, the giant lag on recovery is between PreStart and when the LoadSnapshot message was received by the snapshot store actor:

[INFO][02/01/2024 18:30:23.919Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 18:30:23.940Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 18:30:23.999Z][Thread 0005][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][02/01/2024 18:30:24.041Z][Thread 0012][akka://test/user/$b] Starting up and beginning recovery
[INFO][02/01/2024 18:30:29.807Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
[INFO][02/01/2024 18:30:29.809Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] Starting LoadSnapshotAsync circuit breaker.
[INFO][02/01/2024 18:30:29.810Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] Invoking LoadAsync inside circuit breaker.
[INFO][02/01/2024 18:30:29.812Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 18:30:29.812Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 18:30:29.814Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshotAsync circuit breaker completed.
[DEBUG][02/01/2024 18:30:29.838Z][Thread 0012][akka://test/system/akka.persistence.journal.test/$a] Replay completed: RecoverySuccess<highestSequenceNr: 0>
[INFO][02/01/2024 18:30:29.839Z][Thread 0012][akka://test/user/$b] Received recover Akka.Persistence.RecoveryCompleted

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think that the bug is actually in the RecoveryPermitter class

var toSequenceNr = Math.Min(message.ToSequenceNr, highSequenceNr);
if (toSequenceNr <= 0L || message.FromSequenceNr > toSequenceNr)
{
Expand All @@ -293,7 +284,7 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr
replyTo.Tell(new ReplayedMessage(adaptedRepresentation), ActorRefs.NoSender);
}
}
});
}).ConfigureAwait(false);

CompleteHighSeqNo(highSequenceNr);
}
Expand All @@ -309,6 +300,18 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr
{
replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex)));
}

return;

void CompleteHighSeqNo(long highSeqNo)
{
replyTo.Tell(new RecoverySuccess(highSeqNo));

if (CanPublish)
{
eventStream.Publish(message);
}
}
}

// instead of ContinueWith
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Persistence/Journal/MemoryJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten
var highest = HighestSequenceNr(persistenceId);
if (highest != 0L && max != 0L)
Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max).ForEach(recoveryCallback);
return Task.FromResult(new object());
return Task.CompletedTask;
}

/// <summary>
Expand All @@ -154,7 +154,7 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque
_meta.AddOrUpdate(persistenceId, highestSeqNr, (_, _) => highestSeqNr);
for (var snr = 1L; snr <= toSeqNr; snr++)
Delete(persistenceId, snr);
return Task.FromResult(new object());
return Task.CompletedTask;
}

protected override bool ReceivePluginInternal(object message)
Expand Down
Loading