Skip to content

Commit

Permalink
Make sure that generated query config is valid (#404)
Browse files Browse the repository at this point in the history
* Make sure that generated query config is valid

* Add end to end unit test
  • Loading branch information
Arkatufus authored Jun 13, 2024
1 parent 8f47e1e commit c2c51f9
Show file tree
Hide file tree
Showing 5 changed files with 581 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" />
<PackageReference Include="Akka.Hosting.TestKit" />

<PackageReference Include="Microsoft.NET.Test.Sdk" />
Expand Down
112 changes: 112 additions & 0 deletions src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// -----------------------------------------------------------------------
// <copyright file="ConfigExtensions.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.Sql.Extensions;
using FluentAssertions;

namespace Akka.Persistence.Sql.Hosting.Tests;

public static class ConfigExtensions
{
public static void AssertType(this Configuration.Config actual, Configuration.Config expected, string key, Type? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
Type.GetType(actual.GetString(key)).Should().Be(value);
actual.GetString(key).Should().Be(expected.GetString(key));
}

public static void AssertString(this Configuration.Config actual, Configuration.Config expected, string key, string? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
actual.GetString(key).Should().Be(value);
actual.GetString(key).Should().Be(expected.GetString(key));
}

public static void AssertInt(this Configuration.Config actual, Configuration.Config expected, string key, int? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
actual.GetInt(key).Should().Be(value.Value);
actual.GetInt(key).Should().Be(expected.GetInt(key));
}

public static void AssertBool(this Configuration.Config actual, Configuration.Config expected, string key, bool? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
actual.GetBoolean(key).Should().Be(value.Value);
actual.GetBoolean(key).Should().Be(expected.GetBoolean(key));
}

public static void AssertTimeSpan(this Configuration.Config actual, Configuration.Config expected, string key, TimeSpan? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
actual.GetTimeSpan(key).Should().Be(value.Value);
actual.GetTimeSpan(key).Should().Be(expected.GetTimeSpan(key));
}

public static void AssertIsolationLevel(this Configuration.Config actual, Configuration.Config expected, string key)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
actual.GetIsolationLevel(key).Should().Be(expected.GetIsolationLevel(key));
}

public static void AssertMappingEquals(this Configuration.Config actual, Configuration.Config expected, string key)
{
var actualMapConfig = actual.GetConfig(key);
var expectedMapConfig = expected.GetConfig(key);

actualMapConfig.AssertString(expectedMapConfig, "schema-name");

var actualJournalConfig = actualMapConfig.GetConfig("journal");
actualJournalConfig.Should().NotBeNull();
var expectedJournalConfig = expectedMapConfig.GetConfig("journal");
expectedJournalConfig.Should().NotBeNull();

actualJournalConfig.AssertBool(expectedJournalConfig, "use-writer-uuid-column");
actualJournalConfig.AssertString(expectedJournalConfig, "table-name");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.ordering");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.deleted");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.persistence-id");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.sequence-number");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.created");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.tags");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.message");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.identifier");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.manifest");
actualJournalConfig.AssertString(expectedJournalConfig, "columns.writer-uuid");

var actualMetaConfig = actualMapConfig.GetConfig("metadata");
actualMetaConfig.Should().NotBeNull();
var expectedMetaConfig = expectedMapConfig.GetConfig("metadata");
expectedMetaConfig.Should().NotBeNull();

actualMetaConfig.AssertString(expectedMetaConfig, "table-name");
actualMetaConfig.AssertString(expectedMetaConfig, "columns.persistence-id");
actualMetaConfig.AssertString(expectedMetaConfig, "columns.sequence-number");

var actualTagConfig = actualMapConfig.GetConfig("tag");
actualTagConfig.Should().NotBeNull();
var expectedTagConfig = expectedMapConfig.GetConfig("tag");
expectedTagConfig.Should().NotBeNull();

actualTagConfig.AssertString(expectedTagConfig, "table-name");
actualTagConfig.AssertString(expectedTagConfig, "columns.ordering-id");
actualTagConfig.AssertString(expectedTagConfig, "columns.tag-value");
actualTagConfig.AssertString(expectedTagConfig, "columns.persistence-id");
actualTagConfig.AssertString(expectedTagConfig, "columns.sequence-nr");
}

}
275 changes: 275 additions & 0 deletions src/Akka.Persistence.Sql.Hosting.Tests/CustomSqlEndToEndSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
// -----------------------------------------------------------------------
// <copyright file="SqlEndToEndSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Data.SQLite;
using Akka.Actor;
using Akka.Hosting;
using Akka.Persistence.Query;
using Akka.Persistence.Sql.Query;
using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Streams;
using Akka.Streams.TestKit;
using FluentAssertions;
using LanguageExt.UnitsOfMeasure;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sql.Hosting.Tests
{
public class CustomSqlEndToEndSpec : Akka.Hosting.TestKit.TestKit, IClassFixture<SqliteContainer>
{
private const string GetAll = "getAll";
private const string Ack = "ACK";
private const string SnapshotAck = "SnapACK";
private const string PId = "ac1";

private readonly SqliteContainer _fixture;

public CustomSqlEndToEndSpec(ITestOutputHelper output, SqliteContainer fixture) : base(nameof(SqlEndToEndSpec), output)
=> _fixture = fixture;

protected override async Task BeforeTestStart()
{
await base.BeforeTestStart();
await _fixture.InitializeAsync();
}

protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
builder
.WithSqlPersistence(
connectionString: _fixture.ConnectionString,
providerName: _fixture.ProviderName)
.WithSqlPersistence(
journal =>
{
journal.IsDefaultPlugin = false;
journal.ConnectionString = _fixture.ConnectionString;
journal.ProviderName = _fixture.ProviderName;
journal.Identifier = "custom";
journal.DatabaseOptions = JournalDatabaseOptions.Default;
journal.DatabaseOptions.JournalTable!.TableName = "journal2";
journal.DatabaseOptions.MetadataTable!.TableName = "journal_metadata2";
journal.DatabaseOptions.TagTable!.TableName = "tags2";
},
snapshot =>
{
snapshot.IsDefaultPlugin = false;
snapshot.ConnectionString = _fixture.ConnectionString;
snapshot.ProviderName = _fixture.ProviderName;
snapshot.Identifier = "custom";
snapshot.DatabaseOptions = SnapshotDatabaseOptions.Default;
snapshot.DatabaseOptions.SnapshotTable!.TableName = "snapshot2";
})
.StartActors(
(system, registry) =>
{
var myActor = system.ActorOf(Props.Create(() => new MyPersistenceActor(PId)), "default");
registry.Register<MyPersistenceActor>(myActor);

myActor = system.ActorOf(Props.Create(() => new MyCustomPersistenceActor(PId)), "custom");
registry.Register<MyCustomPersistenceActor>(myActor);
});
}

[Fact]
public async Task Should_Start_ActorSystem_wth_Sql_Persistence()
{
var timeout = 3.Seconds();

#region Default SQL plugin

// arrange
var myPersistentActor = await ActorRegistry.GetAsync<MyPersistenceActor>();

// act
myPersistentActor.Tell(1, TestActor);
ExpectMsg<string>(Ack);
myPersistentActor.Tell(2, TestActor);
ExpectMsg<string>(Ack);
ExpectMsg<string>(SnapshotAck);
var snapshot = await myPersistentActor.Ask<int[]>(GetAll, timeout);

// assert
snapshot.Should().BeEquivalentTo(new[] { 1, 2 });

#endregion

#region Custom SQL plugin

// arrange
var customMyPersistentActor = await ActorRegistry.GetAsync<MyCustomPersistenceActor>();

// act
customMyPersistentActor.Tell(1, TestActor);
ExpectMsg<string>(Ack);
customMyPersistentActor.Tell(2, TestActor);
ExpectMsg<string>(Ack);
ExpectMsg<string>(SnapshotAck);
var customSnapshot = await customMyPersistentActor.Ask<int[]>(GetAll, timeout);

// assert
customSnapshot.Should().BeEquivalentTo(new[] { 1, 2 });

#endregion


// kill + recreate actor with same PersistentId
await myPersistentActor.GracefulStop(timeout);
var myPersistentActor2 = Sys.ActorOf(Props.Create(() => new MyPersistenceActor(PId)));

var snapshot2 = await myPersistentActor2.Ask<int[]>(GetAll, timeout);
snapshot2.Should().BeEquivalentTo(new[] { 1, 2 });

await customMyPersistentActor.GracefulStop(timeout);
var customMyPersistentActor2 = Sys.ActorOf(Props.Create(() => new MyCustomPersistenceActor(PId)));

var customSnapshot2 = await customMyPersistentActor2.Ask<int[]>(GetAll, timeout);
customSnapshot2.Should().BeEquivalentTo(new[] { 1, 2 });

// validate configs
var config = Sys.Settings.Config;
config.GetString("akka.persistence.journal.plugin").Should().Be("akka.persistence.journal.sql");
config.GetString("akka.persistence.snapshot-store.plugin").Should().Be("akka.persistence.snapshot-store.sql");

var customJournalConfig = config.GetConfig("akka.persistence.journal.custom");
customJournalConfig.Should().NotBeNull();
customJournalConfig.GetString("connection-string").Should().Be(_fixture.ConnectionString);
customJournalConfig.GetString("provider-name").Should().Be(_fixture.ProviderName);

var customSnapshotConfig = config.GetConfig("akka.persistence.snapshot-store.custom");
customSnapshotConfig.Should().NotBeNull();
customSnapshotConfig.GetString("connection-string").Should().Be(_fixture.ConnectionString);
customSnapshotConfig.GetString("provider-name").Should().Be(_fixture.ProviderName);

// validate that query is working
var readJournal = Sys.ReadJournalFor<SqlReadJournal>("akka.persistence.query.journal.sql");
var source = readJournal.AllEvents(Offset.NoOffset());
var probe = source.RunWith(this.SinkProbe<EventEnvelope>(), Sys.Materializer());
probe.Request(2);
probe.ExpectNext<EventEnvelope>(p => p.PersistenceId == PId && p.SequenceNr == 1L && p.Event.Equals(1));
probe.ExpectNext<EventEnvelope>(p => p.PersistenceId == PId && p.SequenceNr == 2L && p.Event.Equals(2));
await probe.CancelAsync();

var customReadJournal = Sys.ReadJournalFor<SqlReadJournal>("akka.persistence.query.journal.custom");
var customSource = customReadJournal.AllEvents(Offset.NoOffset());
var customProbe = customSource.RunWith(this.SinkProbe<EventEnvelope>(), Sys.Materializer());
customProbe.Request(2);
customProbe.ExpectNext<EventEnvelope>(p => p.PersistenceId == PId && p.SequenceNr == 1L && p.Event.Equals(1));
customProbe.ExpectNext<EventEnvelope>(p => p.PersistenceId == PId && p.SequenceNr == 2L && p.Event.Equals(2));
await customProbe.CancelAsync();

// Probe the database directly to make sure that all tables were created properly
var tables = await GetTableNamesAsync(_fixture.ConnectionString);

tables.Should().Contain("journal");
tables.Should().Contain("tags");
tables.Should().Contain("snapshot");
tables.Should().Contain("journal2");
tables.Should().Contain("tags2");
tables.Should().Contain("snapshot2");
}

private static async Task<string[]> GetTableNamesAsync(string connectionString)
{
await using var conn = new SQLiteConnection(connectionString);
await conn.OpenAsync();

var cmd = new SQLiteCommand("SELECT name FROM sqlite_schema WHERE type = 'table' AND name NOT LIKE 'sqlite_%'", conn);
var reader = await cmd.ExecuteReaderAsync();
var tables = new List<string>();
while (await reader.ReadAsync())
{
tables.Add(reader.GetString(0));
}
await reader.CloseAsync();

return tables.ToArray();
}

private sealed class MyPersistenceActor : ReceivePersistentActor
{
private List<int> _values = new();
private IActorRef? _sender;

public MyPersistenceActor(string persistenceId)
{
PersistenceId = persistenceId;
JournalPluginId = "akka.persistence.journal.sql";
SnapshotPluginId = "akka.persistence.snapshot-store.sql";

Recover<SnapshotOffer>(offer =>
{
if (offer.Snapshot is IEnumerable<int> ints)
_values = new List<int>(ints);
});

Recover<int>(_values.Add);

Command<int>( i =>
{
_sender = Sender;
Persist(
i,
_ =>
{
_values.Add(i);
if (LastSequenceNr % 2 == 0)
SaveSnapshot(_values);
_sender.Tell(Ack);
});
});

Command<string>(str => str.Equals(GetAll), _ => Sender.Tell(_values.ToArray()));

Command<SaveSnapshotSuccess>(_ => _sender.Tell(SnapshotAck));
}

public override string PersistenceId { get; }
}

private sealed class MyCustomPersistenceActor : ReceivePersistentActor
{
private List<int> _values = new();
private IActorRef? _sender;

public MyCustomPersistenceActor(string persistenceId)
{
PersistenceId = persistenceId;
JournalPluginId = "akka.persistence.journal.custom";
SnapshotPluginId = "akka.persistence.snapshot-store.custom";

Recover<SnapshotOffer>(offer =>
{
if (offer.Snapshot is IEnumerable<int> ints)
_values = new List<int>(ints);
});

Recover<int>(_values.Add);

Command<int>( i =>
{
_sender = Sender;
Persist(
i,
_ =>
{
_values.Add(i);
if (LastSequenceNr % 2 == 0)
SaveSnapshot(_values);
_sender.Tell(Ack);
});
});

Command<string>(str => str.Equals(GetAll), _ => Sender.Tell(_values.ToArray()));

Command<SaveSnapshotSuccess>(_ => _sender.Tell(SnapshotAck));
}

public override string PersistenceId { get; }
}
}
}
Loading

0 comments on commit c2c51f9

Please sign in to comment.