Skip to content

Commit

Permalink
Add support for optional snapshots (akkadotnet#7444)
Browse files Browse the repository at this point in the history
* Add support for optional snapshots

* Fix markdown title formatting

* Fix markdown consecutive blank lines

---------

Co-authored-by: Chris Hoare <[email protected]>
Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
3 people authored Jan 8, 2025
1 parent 26383ec commit 468546c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 6 deletions.
11 changes: 11 additions & 0 deletions docs/articles/persistence/snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ persistent actors should use the `deleteSnapshots` method. Depending on the jour
best practice to do specific deletes with `deleteSnapshot` or to include a `minSequenceNr` as well as a `maxSequenceNr`
for the `SnapshotSelectionCriteria`.

## Optional Snapshots

By default, the persistent actor will unconditionally be stopped if the snapshot can't be loaded in the recovery.
It is possible to make snapshot loading optional. This can be useful when it is alright to ignore snapshot in case
of for example deserialization errors. When snapshot loading fails it will instead recover by replaying all events.

Enable this feature by setting `snapshot-is-optional = true` in the snapshot store configuration.

> [!WARNING]
>Don't set `snapshot-is-optional = true` if events have been deleted because that would result in wrong recovered state if snapshot load fails.
## Snapshot Status Handling

Saving or deleting snapshots can either succeed or fail – this information is reported back to the persistent actor via status messages as illustrated in the following table.
Expand Down
37 changes: 37 additions & 0 deletions src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,41 @@ public void PersistentActor_with_a_failing_snapshot_should_receive_failure_messa
m.Cause.Message.Contains("Failed to delete"));
}
}

public class SnapshotIsOptionalSpec : PersistenceSpec
{
public SnapshotIsOptionalSpec() : base(Configuration("SnapshotIsOptionalSpec", serialization: "off",
extraConfig: @"
akka.persistence.snapshot-store.local.snapshot-is-optional = true
akka.persistence.snapshot-store.local.class = ""Akka.Persistence.Tests.SnapshotFailureRobustnessSpec+FailingLocalSnapshotStore, Akka.Persistence.Tests""
"))
{
}

[Fact]
public void PersistentActor_with_a_failing_snapshot_with_snapshot_is_optional_true_falls_back_to_events()
{
var spref = Sys.ActorOf(Props.Create(() => new SnapshotFailureRobustnessSpec.SaveSnapshotTestActor(Name, TestActor)));

ExpectMsg<RecoveryCompleted>();
spref.Tell(new SnapshotFailureRobustnessSpec.Cmd("boom"));
ExpectMsg(1L);

Sys.EventStream.Subscribe(TestActor, typeof(Error));
try
{

var lpref = Sys.ActorOf(Props.Create(() => new SnapshotFailureRobustnessSpec.LoadSnapshotTestActor(Name, TestActor)));
ExpectMsg<Error>(m => m.Message.ToString().StartsWith("Error loading snapshot"));
ExpectMsg("boom-1");
ExpectMsg<RecoveryCompleted>();

}
finally
{
Sys.EventStream.Unsubscribe(TestActor, typeof(Error));
}

}
}
}
24 changes: 18 additions & 6 deletions src/core/Akka.Persistence/Eventsourced.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.Internal;

namespace Akka.Persistence
Expand Down Expand Up @@ -61,7 +62,9 @@ private EventsourcedState RecoveryStarted(long maxReplays)
// protect against snapshot stalling forever because journal overloaded and such
var timeout = Extension.JournalConfigFor(JournalPluginId).GetTimeSpan("recovery-event-timeout", null, false);
var timeoutCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable(timeout, Self, new RecoveryTick(true), Self);


var snapshotIsOptional = Extension.SnapshotStoreConfigFor(SnapshotPluginId).GetBoolean("snapshot-is-optional", false);

bool RecoveryBehavior(object message)
{
Receive receiveRecover = ReceiveRecover;
Expand Down Expand Up @@ -120,15 +123,24 @@ bool RecoveryBehavior(object message)
}
case LoadSnapshotFailed failed:
timeoutCancelable.Cancel();
try
if (snapshotIsOptional)
{
OnRecoveryFailure(failed.Cause);
Log.Info("Snapshot load error for persistenceId [{0}]. Replaying all events since snapshot-is-optional=true", PersistenceId);
ChangeState(Recovering(RecoveryBehavior, timeout));
Journal.Tell(new ReplayMessages(LastSequenceNr +1L, long.MaxValue, maxReplays, PersistenceId, Self));
}
finally
else
{
Context.Stop(Self);
try
{
OnRecoveryFailure(failed.Cause);
}
finally
{
Context.Stop(Self);
}
ReturnRecoveryPermit();
}
ReturnRecoveryPermit();
break;
case RecoveryTick { Snapshot: true }:
try
Expand Down
16 changes: 16 additions & 0 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,22 @@ internal Config JournalConfigFor(string journalPluginId)
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
return PluginHolderFor(configPath, JournalFallbackConfigPath).Config;
}

/// <summary>
/// Returns the plugin config identified by <paramref name="snapshotPluginId"/>.
/// When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path.
/// When configured, uses <paramref name="snapshotPluginId"/> as absolute path to the journal configuration entry.
/// </summary>
/// <param name="snapshotPluginId">TBD</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when either the plugin class name is undefined or the configuration path is missing.
/// </exception>
/// <returns>TBD</returns>
internal Config SnapshotStoreConfigFor(string snapshotPluginId)
{
var configPath = string.IsNullOrEmpty(snapshotPluginId) ? _defaultSnapshotPluginId.Value : snapshotPluginId;
return PluginHolderFor(configPath, SnapshotStoreFallbackConfigPath).Config;
}

/// <summary>
/// Looks up the plugin config by plugin's ActorRef.
Expand Down
8 changes: 8 additions & 0 deletions src/core/Akka.Persistence/persistence.conf
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ akka.persistence {
call-timeout = 20s
reset-timeout = 60s
}

# Set this to true if successful loading of snapshot is not necessary.
# This can be useful when it is alright to ignore snapshot in case of
# for example deserialization errors. When snapshot loading fails it will instead
# recover by replaying all events.
# Don't set to true if events are deleted because that would
# result in wrong recovered state if snapshot load fails.
snapshot-is-optional = false
}

fsm {
Expand Down

0 comments on commit 468546c

Please sign in to comment.