From c2c51f9d8f23840caf438d40c942d197879646c8 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 14 Jun 2024 02:01:48 +0700 Subject: [PATCH] Make sure that generated query config is valid (#404) * Make sure that generated query config is valid * Add end to end unit test --- .../Akka.Persistence.Sql.Hosting.Tests.csproj | 1 + .../ConfigExtensions.cs | 112 +++++++ .../CustomSqlEndToEndSpec.cs | 275 ++++++++++++++++++ .../JournalSettingsSpec.cs | 232 ++++++++++----- .../SqlJournalOptions.cs | 49 ++-- 5 files changed, 581 insertions(+), 88 deletions(-) create mode 100644 src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs create mode 100644 src/Akka.Persistence.Sql.Hosting.Tests/CustomSqlEndToEndSpec.cs diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/Akka.Persistence.Sql.Hosting.Tests.csproj b/src/Akka.Persistence.Sql.Hosting.Tests/Akka.Persistence.Sql.Hosting.Tests.csproj index d0351fcd..e51270ce 100644 --- a/src/Akka.Persistence.Sql.Hosting.Tests/Akka.Persistence.Sql.Hosting.Tests.csproj +++ b/src/Akka.Persistence.Sql.Hosting.Tests/Akka.Persistence.Sql.Hosting.Tests.csproj @@ -7,6 +7,7 @@ + diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs b/src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs new file mode 100644 index 00000000..4babfa78 --- /dev/null +++ b/src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs @@ -0,0 +1,112 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Persistence.Sql.Extensions; +using FluentAssertions; + +namespace Akka.Persistence.Sql.Hosting.Tests; + +public static class ConfigExtensions +{ + public static void AssertType(this Configuration.Config actual, Configuration.Config expected, string key, Type? value = null) + { + expected.HasPath(key).Should().BeTrue(); + actual.HasPath(key).Should().BeTrue(); + if (value is not null) + Type.GetType(actual.GetString(key)).Should().Be(value); + actual.GetString(key).Should().Be(expected.GetString(key)); + } + + public static void AssertString(this Configuration.Config actual, Configuration.Config expected, string key, string? value = null) + { + expected.HasPath(key).Should().BeTrue(); + actual.HasPath(key).Should().BeTrue(); + if (value is not null) + actual.GetString(key).Should().Be(value); + actual.GetString(key).Should().Be(expected.GetString(key)); + } + + public static void AssertInt(this Configuration.Config actual, Configuration.Config expected, string key, int? value = null) + { + expected.HasPath(key).Should().BeTrue(); + actual.HasPath(key).Should().BeTrue(); + if (value is not null) + actual.GetInt(key).Should().Be(value.Value); + actual.GetInt(key).Should().Be(expected.GetInt(key)); + } + + public static void AssertBool(this Configuration.Config actual, Configuration.Config expected, string key, bool? value = null) + { + expected.HasPath(key).Should().BeTrue(); + actual.HasPath(key).Should().BeTrue(); + if (value is not null) + actual.GetBoolean(key).Should().Be(value.Value); + actual.GetBoolean(key).Should().Be(expected.GetBoolean(key)); + } + + public static void AssertTimeSpan(this Configuration.Config actual, Configuration.Config expected, string key, TimeSpan? value = null) + { + expected.HasPath(key).Should().BeTrue(); + actual.HasPath(key).Should().BeTrue(); + if (value is not null) + actual.GetTimeSpan(key).Should().Be(value.Value); + actual.GetTimeSpan(key).Should().Be(expected.GetTimeSpan(key)); + } + + public static void AssertIsolationLevel(this Configuration.Config actual, Configuration.Config expected, string key) + { + expected.HasPath(key).Should().BeTrue(); + actual.HasPath(key).Should().BeTrue(); + actual.GetIsolationLevel(key).Should().Be(expected.GetIsolationLevel(key)); + } + + public static void AssertMappingEquals(this Configuration.Config actual, Configuration.Config expected, string key) + { + var actualMapConfig = actual.GetConfig(key); + var expectedMapConfig = expected.GetConfig(key); + + actualMapConfig.AssertString(expectedMapConfig, "schema-name"); + + var actualJournalConfig = actualMapConfig.GetConfig("journal"); + actualJournalConfig.Should().NotBeNull(); + var expectedJournalConfig = expectedMapConfig.GetConfig("journal"); + expectedJournalConfig.Should().NotBeNull(); + + actualJournalConfig.AssertBool(expectedJournalConfig, "use-writer-uuid-column"); + actualJournalConfig.AssertString(expectedJournalConfig, "table-name"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.ordering"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.deleted"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.persistence-id"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.sequence-number"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.created"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.tags"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.message"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.identifier"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.manifest"); + actualJournalConfig.AssertString(expectedJournalConfig, "columns.writer-uuid"); + + var actualMetaConfig = actualMapConfig.GetConfig("metadata"); + actualMetaConfig.Should().NotBeNull(); + var expectedMetaConfig = expectedMapConfig.GetConfig("metadata"); + expectedMetaConfig.Should().NotBeNull(); + + actualMetaConfig.AssertString(expectedMetaConfig, "table-name"); + actualMetaConfig.AssertString(expectedMetaConfig, "columns.persistence-id"); + actualMetaConfig.AssertString(expectedMetaConfig, "columns.sequence-number"); + + var actualTagConfig = actualMapConfig.GetConfig("tag"); + actualTagConfig.Should().NotBeNull(); + var expectedTagConfig = expectedMapConfig.GetConfig("tag"); + expectedTagConfig.Should().NotBeNull(); + + actualTagConfig.AssertString(expectedTagConfig, "table-name"); + actualTagConfig.AssertString(expectedTagConfig, "columns.ordering-id"); + actualTagConfig.AssertString(expectedTagConfig, "columns.tag-value"); + actualTagConfig.AssertString(expectedTagConfig, "columns.persistence-id"); + actualTagConfig.AssertString(expectedTagConfig, "columns.sequence-nr"); + } + +} diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/CustomSqlEndToEndSpec.cs b/src/Akka.Persistence.Sql.Hosting.Tests/CustomSqlEndToEndSpec.cs new file mode 100644 index 00000000..27fdc236 --- /dev/null +++ b/src/Akka.Persistence.Sql.Hosting.Tests/CustomSqlEndToEndSpec.cs @@ -0,0 +1,275 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Data.SQLite; +using Akka.Actor; +using Akka.Hosting; +using Akka.Persistence.Query; +using Akka.Persistence.Sql.Query; +using Akka.Persistence.Sql.Tests.Common.Containers; +using Akka.Streams; +using Akka.Streams.TestKit; +using FluentAssertions; +using LanguageExt.UnitsOfMeasure; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Sql.Hosting.Tests +{ + public class CustomSqlEndToEndSpec : Akka.Hosting.TestKit.TestKit, IClassFixture + { + private const string GetAll = "getAll"; + private const string Ack = "ACK"; + private const string SnapshotAck = "SnapACK"; + private const string PId = "ac1"; + + private readonly SqliteContainer _fixture; + + public CustomSqlEndToEndSpec(ITestOutputHelper output, SqliteContainer fixture) : base(nameof(SqlEndToEndSpec), output) + => _fixture = fixture; + + protected override async Task BeforeTestStart() + { + await base.BeforeTestStart(); + await _fixture.InitializeAsync(); + } + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + builder + .WithSqlPersistence( + connectionString: _fixture.ConnectionString, + providerName: _fixture.ProviderName) + .WithSqlPersistence( + journal => + { + journal.IsDefaultPlugin = false; + journal.ConnectionString = _fixture.ConnectionString; + journal.ProviderName = _fixture.ProviderName; + journal.Identifier = "custom"; + journal.DatabaseOptions = JournalDatabaseOptions.Default; + journal.DatabaseOptions.JournalTable!.TableName = "journal2"; + journal.DatabaseOptions.MetadataTable!.TableName = "journal_metadata2"; + journal.DatabaseOptions.TagTable!.TableName = "tags2"; + }, + snapshot => + { + snapshot.IsDefaultPlugin = false; + snapshot.ConnectionString = _fixture.ConnectionString; + snapshot.ProviderName = _fixture.ProviderName; + snapshot.Identifier = "custom"; + snapshot.DatabaseOptions = SnapshotDatabaseOptions.Default; + snapshot.DatabaseOptions.SnapshotTable!.TableName = "snapshot2"; + }) + .StartActors( + (system, registry) => + { + var myActor = system.ActorOf(Props.Create(() => new MyPersistenceActor(PId)), "default"); + registry.Register(myActor); + + myActor = system.ActorOf(Props.Create(() => new MyCustomPersistenceActor(PId)), "custom"); + registry.Register(myActor); + }); + } + + [Fact] + public async Task Should_Start_ActorSystem_wth_Sql_Persistence() + { + var timeout = 3.Seconds(); + + #region Default SQL plugin + + // arrange + var myPersistentActor = await ActorRegistry.GetAsync(); + + // act + myPersistentActor.Tell(1, TestActor); + ExpectMsg(Ack); + myPersistentActor.Tell(2, TestActor); + ExpectMsg(Ack); + ExpectMsg(SnapshotAck); + var snapshot = await myPersistentActor.Ask(GetAll, timeout); + + // assert + snapshot.Should().BeEquivalentTo(new[] { 1, 2 }); + + #endregion + + #region Custom SQL plugin + + // arrange + var customMyPersistentActor = await ActorRegistry.GetAsync(); + + // act + customMyPersistentActor.Tell(1, TestActor); + ExpectMsg(Ack); + customMyPersistentActor.Tell(2, TestActor); + ExpectMsg(Ack); + ExpectMsg(SnapshotAck); + var customSnapshot = await customMyPersistentActor.Ask(GetAll, timeout); + + // assert + customSnapshot.Should().BeEquivalentTo(new[] { 1, 2 }); + + #endregion + + + // kill + recreate actor with same PersistentId + await myPersistentActor.GracefulStop(timeout); + var myPersistentActor2 = Sys.ActorOf(Props.Create(() => new MyPersistenceActor(PId))); + + var snapshot2 = await myPersistentActor2.Ask(GetAll, timeout); + snapshot2.Should().BeEquivalentTo(new[] { 1, 2 }); + + await customMyPersistentActor.GracefulStop(timeout); + var customMyPersistentActor2 = Sys.ActorOf(Props.Create(() => new MyCustomPersistenceActor(PId))); + + var customSnapshot2 = await customMyPersistentActor2.Ask(GetAll, timeout); + customSnapshot2.Should().BeEquivalentTo(new[] { 1, 2 }); + + // validate configs + var config = Sys.Settings.Config; + config.GetString("akka.persistence.journal.plugin").Should().Be("akka.persistence.journal.sql"); + config.GetString("akka.persistence.snapshot-store.plugin").Should().Be("akka.persistence.snapshot-store.sql"); + + var customJournalConfig = config.GetConfig("akka.persistence.journal.custom"); + customJournalConfig.Should().NotBeNull(); + customJournalConfig.GetString("connection-string").Should().Be(_fixture.ConnectionString); + customJournalConfig.GetString("provider-name").Should().Be(_fixture.ProviderName); + + var customSnapshotConfig = config.GetConfig("akka.persistence.snapshot-store.custom"); + customSnapshotConfig.Should().NotBeNull(); + customSnapshotConfig.GetString("connection-string").Should().Be(_fixture.ConnectionString); + customSnapshotConfig.GetString("provider-name").Should().Be(_fixture.ProviderName); + + // validate that query is working + var readJournal = Sys.ReadJournalFor("akka.persistence.query.journal.sql"); + var source = readJournal.AllEvents(Offset.NoOffset()); + var probe = source.RunWith(this.SinkProbe(), Sys.Materializer()); + probe.Request(2); + probe.ExpectNext(p => p.PersistenceId == PId && p.SequenceNr == 1L && p.Event.Equals(1)); + probe.ExpectNext(p => p.PersistenceId == PId && p.SequenceNr == 2L && p.Event.Equals(2)); + await probe.CancelAsync(); + + var customReadJournal = Sys.ReadJournalFor("akka.persistence.query.journal.custom"); + var customSource = customReadJournal.AllEvents(Offset.NoOffset()); + var customProbe = customSource.RunWith(this.SinkProbe(), Sys.Materializer()); + customProbe.Request(2); + customProbe.ExpectNext(p => p.PersistenceId == PId && p.SequenceNr == 1L && p.Event.Equals(1)); + customProbe.ExpectNext(p => p.PersistenceId == PId && p.SequenceNr == 2L && p.Event.Equals(2)); + await customProbe.CancelAsync(); + + // Probe the database directly to make sure that all tables were created properly + var tables = await GetTableNamesAsync(_fixture.ConnectionString); + + tables.Should().Contain("journal"); + tables.Should().Contain("tags"); + tables.Should().Contain("snapshot"); + tables.Should().Contain("journal2"); + tables.Should().Contain("tags2"); + tables.Should().Contain("snapshot2"); + } + + private static async Task GetTableNamesAsync(string connectionString) + { + await using var conn = new SQLiteConnection(connectionString); + await conn.OpenAsync(); + + var cmd = new SQLiteCommand("SELECT name FROM sqlite_schema WHERE type = 'table' AND name NOT LIKE 'sqlite_%'", conn); + var reader = await cmd.ExecuteReaderAsync(); + var tables = new List(); + while (await reader.ReadAsync()) + { + tables.Add(reader.GetString(0)); + } + await reader.CloseAsync(); + + return tables.ToArray(); + } + + private sealed class MyPersistenceActor : ReceivePersistentActor + { + private List _values = new(); + private IActorRef? _sender; + + public MyPersistenceActor(string persistenceId) + { + PersistenceId = persistenceId; + JournalPluginId = "akka.persistence.journal.sql"; + SnapshotPluginId = "akka.persistence.snapshot-store.sql"; + + Recover(offer => + { + if (offer.Snapshot is IEnumerable ints) + _values = new List(ints); + }); + + Recover(_values.Add); + + Command( i => + { + _sender = Sender; + Persist( + i, + _ => + { + _values.Add(i); + if (LastSequenceNr % 2 == 0) + SaveSnapshot(_values); + _sender.Tell(Ack); + }); + }); + + Command(str => str.Equals(GetAll), _ => Sender.Tell(_values.ToArray())); + + Command(_ => _sender.Tell(SnapshotAck)); + } + + public override string PersistenceId { get; } + } + + private sealed class MyCustomPersistenceActor : ReceivePersistentActor + { + private List _values = new(); + private IActorRef? _sender; + + public MyCustomPersistenceActor(string persistenceId) + { + PersistenceId = persistenceId; + JournalPluginId = "akka.persistence.journal.custom"; + SnapshotPluginId = "akka.persistence.snapshot-store.custom"; + + Recover(offer => + { + if (offer.Snapshot is IEnumerable ints) + _values = new List(ints); + }); + + Recover(_values.Add); + + Command( i => + { + _sender = Sender; + Persist( + i, + _ => + { + _values.Add(i); + if (LastSequenceNr % 2 == 0) + SaveSnapshot(_values); + _sender.Tell(Ack); + }); + }); + + Command(str => str.Equals(GetAll), _ => Sender.Tell(_values.ToArray())); + + Command(_ => _sender.Tell(SnapshotAck)); + } + + public override string PersistenceId { get; } + } + } +} diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/JournalSettingsSpec.cs b/src/Akka.Persistence.Sql.Hosting.Tests/JournalSettingsSpec.cs index 1d1ad112..9794f8d3 100644 --- a/src/Akka.Persistence.Sql.Hosting.Tests/JournalSettingsSpec.cs +++ b/src/Akka.Persistence.Sql.Hosting.Tests/JournalSettingsSpec.cs @@ -8,6 +8,9 @@ using Akka.Configuration; using Akka.Persistence.Sql.Config; using Akka.Persistence.Sql.Extensions; +using Akka.Persistence.Sql.Journal; +using Akka.Persistence.Sql.Journal.Dao; +using Akka.Persistence.Sql.Query; using FluentAssertions; using LanguageExt.UnitsOfMeasure; using Xunit; @@ -19,65 +22,98 @@ public class JournalSettingsSpec [Fact(DisplayName = "Default options should not override default hocon config")] public void DefaultOptionsTest() { - var defaultConfig = ConfigurationFactory.ParseString( - @" -akka.persistence.journal.sql { - connection-string = a - provider-name = b -}") - .WithFallback(SqlPersistence.DefaultConfiguration); - - defaultConfig = defaultConfig.GetConfig(SqlPersistence.JournalConfigPath); + #region Setup + + var expectedConfig = ConfigurationFactory.ParseString( + """ + akka.persistence.journal.sql { + connection-string = a + provider-name = b + } + akka.persistence.query.journal.sql { + connection-string = a + provider-name = b + } + """) + .WithFallback(SqlPersistence.DefaultConfiguration) + .WithFallback(SqlPersistence.DefaultQueryConfiguration); var opt = new SqlJournalOptions { ConnectionString = "a", ProviderName = "b", }; - var actualConfig = opt.ToConfig().WithFallback(SqlPersistence.DefaultConfiguration); - - actualConfig = actualConfig.GetConfig(SqlPersistence.JournalConfigPath); - - actualConfig.GetString("connection-string").Should().Be(defaultConfig.GetString("connection-string")); - actualConfig.GetString("provider-name").Should().Be(defaultConfig.GetString("provider-name")); - actualConfig.GetString("table-mapping").Should().Be(defaultConfig.GetString("table-mapping")); - actualConfig.GetString("serializer").Should().Be(defaultConfig.GetString("serializer")); - actualConfig.GetBoolean("auto-initialize").Should().Be(defaultConfig.GetBoolean("auto-initialize")); - actualConfig.GetIsolationLevel("read-isolation-level").Should().Be(defaultConfig.GetIsolationLevel("read-isolation-level")); - actualConfig.GetIsolationLevel("write-isolation-level").Should().Be(defaultConfig.GetIsolationLevel("write-isolation-level")); - actualConfig.GetString("default.schema-name").Should().Be(defaultConfig.GetString("default.schema-name")); - - var defaultJournalConfig = defaultConfig.GetConfig("default.journal"); - var actualJournalConfig = actualConfig.GetConfig("default.journal"); - - actualJournalConfig.GetBoolean("use-writer-uuid-column").Should().Be(defaultJournalConfig.GetBoolean("use-writer-uuid-column")); - actualJournalConfig.GetString("table-name").Should().Be(defaultJournalConfig.GetString("table-name")); - actualJournalConfig.GetString("columns.ordering").Should().Be(defaultJournalConfig.GetString("columns.ordering")); - actualJournalConfig.GetString("columns.deleted").Should().Be(defaultJournalConfig.GetString("columns.deleted")); - actualJournalConfig.GetString("columns.persistence-id").Should().Be(defaultJournalConfig.GetString("columns.persistence-id")); - actualJournalConfig.GetString("columns.sequence-number").Should().Be(defaultJournalConfig.GetString("columns.sequence-number")); - actualJournalConfig.GetString("columns.created").Should().Be(defaultJournalConfig.GetString("columns.created")); - actualJournalConfig.GetString("columns.tags").Should().Be(defaultJournalConfig.GetString("columns.tags")); - actualJournalConfig.GetString("columns.message").Should().Be(defaultJournalConfig.GetString("columns.message")); - actualJournalConfig.GetString("columns.identifier").Should().Be(defaultJournalConfig.GetString("columns.identifier")); - actualJournalConfig.GetString("columns.manifest").Should().Be(defaultJournalConfig.GetString("columns.manifest")); - actualJournalConfig.GetString("columns.writer-uuid").Should().Be(defaultJournalConfig.GetString("columns.writer-uuid")); - - var defaultMetaConfig = defaultConfig.GetConfig("default.metadata"); - var actualMetaConfig = actualConfig.GetConfig("default.metadata"); - - actualMetaConfig.GetString("table-name").Should().Be(defaultMetaConfig.GetString("table-name")); - actualMetaConfig.GetString("columns.persistence-id").Should().Be(defaultMetaConfig.GetString("columns.persistence-id")); - actualMetaConfig.GetString("columns.sequence-number").Should().Be(defaultMetaConfig.GetString("columns.sequence-number")); - - var defaultTagConfig = defaultConfig.GetConfig("default.tag"); - var actualTagConfig = actualConfig.GetConfig("default.tag"); - - actualTagConfig.GetString("table-name").Should().Be(defaultTagConfig.GetString("table-name")); - actualTagConfig.GetString("columns.ordering-id").Should().Be(defaultTagConfig.GetString("columns.ordering-id")); - actualTagConfig.GetString("columns.tag-value").Should().Be(defaultTagConfig.GetString("columns.tag-value")); - actualTagConfig.GetString("columns.persistence-id").Should().Be(defaultTagConfig.GetString("columns.persistence-id")); - actualTagConfig.GetString("columns.sequence-nr").Should().Be(defaultTagConfig.GetString("columns.sequence-nr")); + var config = opt.ToConfig() + .WithFallback(SqlPersistence.DefaultConfiguration) + .WithFallback(SqlPersistence.DefaultQueryConfiguration); + + #endregion + + #region Journal configuration + + var defaultConfig = expectedConfig.GetConfig(SqlPersistence.JournalConfigPath); + var actualConfig = config.GetConfig(SqlPersistence.JournalConfigPath); + + actualConfig.AssertType(defaultConfig, "class", typeof(SqlWriteJournal)); + actualConfig.AssertString(defaultConfig, "plugin-dispatcher"); + actualConfig.AssertString(defaultConfig, "connection-string", "a"); + actualConfig.AssertString(defaultConfig, "provider-name", "b"); + actualConfig.AssertBool(defaultConfig, "delete-compatibility-mode", false); + actualConfig.AssertString(defaultConfig, "table-mapping", "default"); + actualConfig.AssertInt(defaultConfig, "buffer-size", 5000); + actualConfig.AssertInt(defaultConfig, "batch-size", 100); + actualConfig.AssertInt(defaultConfig, "db-round-trip-max-batch-size", 1000); + actualConfig.AssertBool(defaultConfig, "prefer-parameters-on-multirow-insert", false); + actualConfig.AssertInt(defaultConfig, "replay-batch-size", 1000); + actualConfig.AssertInt(defaultConfig, "parallelism", 3); + actualConfig.AssertInt(defaultConfig, "max-row-by-row-size", 100); + actualConfig.AssertBool(defaultConfig, "use-clone-connection", true); + actualConfig.AssertString(defaultConfig, "materializer-dispatcher"); + actualConfig.AssertString(defaultConfig, "tag-write-mode", "TagTable"); + actualConfig.AssertString(defaultConfig, "tag-separator", ";"); + actualConfig.AssertBool(defaultConfig, "auto-initialize", true); + actualConfig.AssertBool(defaultConfig, "warn-on-auto-init-fail", true); + actualConfig.AssertType(defaultConfig, "dao", typeof(ByteArrayJournalDao)); + actualConfig.AssertString(defaultConfig, "serializer"); + actualConfig.AssertIsolationLevel(defaultConfig, "read-isolation-level"); + actualConfig.AssertIsolationLevel(defaultConfig, "write-isolation-level"); + + actualConfig.AssertMappingEquals(defaultConfig, "default"); + #endregion + + #region Query configuration + + var defaultQueryConfig = expectedConfig.GetConfig(SqlPersistence.QueryConfigPath); + var actualQueryConfig = config.GetConfig(SqlPersistence.QueryConfigPath); + + actualQueryConfig.AssertType(defaultQueryConfig, "class", typeof(SqlReadJournalProvider)); + actualQueryConfig.AssertString(defaultQueryConfig, "write-plugin"); + actualQueryConfig.AssertInt(defaultQueryConfig, "max-buffer-size", 500); + actualQueryConfig.AssertTimeSpan(defaultQueryConfig, "refresh-interval", 1.Seconds()); + actualQueryConfig.AssertString(defaultQueryConfig, "connection-string", "a"); + actualQueryConfig.AssertString(defaultQueryConfig, "tag-read-mode", "TagTable"); + + actualQueryConfig.AssertInt(defaultQueryConfig, "journal-sequence-retrieval.batch-size", 10000); + actualQueryConfig.AssertInt(defaultQueryConfig, "journal-sequence-retrieval.max-tries", 10); + actualQueryConfig.AssertTimeSpan(defaultQueryConfig, "journal-sequence-retrieval.query-delay", 1.Seconds()); + actualQueryConfig.AssertTimeSpan(defaultQueryConfig, "journal-sequence-retrieval.max-backoff-query-delay", 60.Seconds()); + actualQueryConfig.AssertTimeSpan(defaultQueryConfig, "journal-sequence-retrieval.ask-timeout", 1.Seconds()); + + actualQueryConfig.AssertString(defaultQueryConfig, "provider-name", "b"); + actualQueryConfig.AssertString(defaultQueryConfig, "table-mapping", "default"); + actualQueryConfig.AssertInt(defaultQueryConfig, "buffer-size", 5000); + actualQueryConfig.AssertInt(defaultQueryConfig, "batch-size", 100); + actualQueryConfig.AssertInt(defaultQueryConfig, "replay-batch-size", 1000); + actualQueryConfig.AssertInt(defaultQueryConfig, "parallelism", 3); + actualQueryConfig.AssertInt(defaultQueryConfig, "max-row-by-row-size", 100); + actualQueryConfig.AssertBool(defaultQueryConfig, "use-clone-connection", true); + actualQueryConfig.AssertString(defaultQueryConfig, "tag-separator", ";"); + actualQueryConfig.AssertIsolationLevel(defaultQueryConfig, "read-isolation-level"); + actualQueryConfig.AssertType(defaultQueryConfig, "dao", typeof(ByteArrayJournalDao)); + + actualQueryConfig.AssertMappingEquals(defaultQueryConfig, "default"); + + #endregion } [Fact(DisplayName = "Custom Options should modify default config")] @@ -92,6 +128,9 @@ public void ModifiedOptionsTest() Serializer = "hyperion", ReadIsolationLevel = IsolationLevel.Snapshot, WriteIsolationLevel = IsolationLevel.Snapshot, + TagStorageMode = TagMode.Csv, + TagSeparator = ":", + DeleteCompatibilityMode = true, DatabaseOptions = new JournalDatabaseOptions(DatabaseMapping.SqlServer) { SchemaName = "schema", @@ -128,23 +167,79 @@ public void ModifiedOptionsTest() }; var fullConfig = opt.ToConfig(); - var journalConfig = fullConfig - .GetConfig("akka.persistence.journal.custom") - .WithFallback(SqlPersistence.DefaultJournalConfiguration); - var config = new JournalConfig(journalConfig); - fullConfig.GetTimeSpan("akka.persistence.query.journal.sql.refresh-interval").Should().Be(5.Seconds()); + #region Journal configuration + var journalConfig = new JournalConfig( + fullConfig + .GetConfig("akka.persistence.journal.custom") + .WithFallback(SqlPersistence.DefaultJournalConfiguration)); + + journalConfig.AutoInitialize.Should().BeFalse(); + journalConfig.ConnectionString.Should().Be("a"); + journalConfig.ProviderName.Should().Be("b"); + journalConfig.DefaultSerializer.Should().Be("hyperion"); + journalConfig.UseCloneConnection.Should().BeTrue(); // non-overridable + journalConfig.ReadIsolationLevel.Should().Be(IsolationLevel.Snapshot); + journalConfig.WriteIsolationLevel.Should().Be(IsolationLevel.Snapshot); + + journalConfig.PluginConfig.TagSeparator.Should().Be(":"); + journalConfig.PluginConfig.TagMode.Should().Be(TagMode.Csv); + + journalConfig.DaoConfig.SqlCommonCompatibilityMode.Should().BeTrue(); + + journalConfig.TableConfig.SchemaName.Should().Be("schema"); + + var journalTable = journalConfig.TableConfig.EventJournalTable; + journalTable.UseWriterUuidColumn.Should().BeFalse(); + journalTable.Name.Should().Be("aa"); + journalTable.ColumnNames.Ordering.Should().Be("a"); + journalTable.ColumnNames.Deleted.Should().Be("b"); + journalTable.ColumnNames.PersistenceId.Should().Be("c"); + journalTable.ColumnNames.SequenceNumber.Should().Be("d"); + journalTable.ColumnNames.Created.Should().Be("e"); + journalTable.ColumnNames.Tags.Should().Be("f"); + journalTable.ColumnNames.Message.Should().Be("g"); + journalTable.ColumnNames.Identifier.Should().Be("h"); + journalTable.ColumnNames.Manifest.Should().Be("i"); + journalTable.ColumnNames.WriterUuid.Should().Be("j"); + + var metaTable = journalConfig.TableConfig.MetadataTable; + metaTable.Name.Should().Be("bb"); + metaTable.ColumnNames.PersistenceId.Should().Be("a"); + metaTable.ColumnNames.SequenceNumber.Should().Be("b"); + + var tagTable = journalConfig.TableConfig.TagTable; + tagTable.Name.Should().Be("cc"); + tagTable.ColumnNames.OrderingId.Should().Be("a"); + tagTable.ColumnNames.Tag.Should().Be("b"); + tagTable.ColumnNames.PersistenceId.Should().Be("c"); + tagTable.ColumnNames.SequenceNumber.Should().Be("d"); + #endregion + + #region Query configuration + var queryConfig = new ReadJournalConfig( + fullConfig + .GetConfig("akka.persistence.query.journal.custom") + .WithFallback(SqlPersistence.DefaultQueryConfiguration)); + + queryConfig.ConnectionString.Should().Be("a"); + queryConfig.ProviderName.Should().Be("b"); + queryConfig.DefaultSerializer.Should().Be("hyperion"); + queryConfig.UseCloneConnection.Should().BeTrue(); // non-overridable + queryConfig.RefreshInterval.Should().Be(5.Seconds()); - config.AutoInitialize.Should().BeFalse(); - config.ConnectionString.Should().Be("a"); - config.ProviderName.Should().Be("b"); - config.DefaultSerializer.Should().Be("hyperion"); - config.ReadIsolationLevel.Should().Be(IsolationLevel.Snapshot); - config.WriteIsolationLevel.Should().Be(IsolationLevel.Snapshot); + queryConfig.PluginConfig.TagSeparator.Should().Be(":"); + queryConfig.PluginConfig.TagMode.Should().Be(TagMode.Csv); - config.TableConfig.SchemaName.Should().Be("schema"); + queryConfig.JournalSequenceRetrievalConfiguration.BatchSize.Should().Be(10000); // non-overridable + queryConfig.JournalSequenceRetrievalConfiguration.MaxTries.Should().Be(10); // non-overridable + queryConfig.JournalSequenceRetrievalConfiguration.QueryDelay.Should().Be(1.Seconds()); // non-overridable + queryConfig.JournalSequenceRetrievalConfiguration.MaxBackoffQueryDelay.Should().Be(60.Seconds()); // non-overridable + queryConfig.JournalSequenceRetrievalConfiguration.AskTimeout.Should().Be(1.Seconds()); // non-overridable + + queryConfig.TableConfig.SchemaName.Should().Be("schema"); - var journalTable = config.TableConfig.EventJournalTable; + journalTable = queryConfig.TableConfig.EventJournalTable; journalTable.UseWriterUuidColumn.Should().BeFalse(); journalTable.Name.Should().Be("aa"); journalTable.ColumnNames.Ordering.Should().Be("a"); @@ -158,17 +253,18 @@ public void ModifiedOptionsTest() journalTable.ColumnNames.Manifest.Should().Be("i"); journalTable.ColumnNames.WriterUuid.Should().Be("j"); - var metaTable = config.TableConfig.MetadataTable; + metaTable = queryConfig.TableConfig.MetadataTable; metaTable.Name.Should().Be("bb"); metaTable.ColumnNames.PersistenceId.Should().Be("a"); metaTable.ColumnNames.SequenceNumber.Should().Be("b"); - var tagTable = config.TableConfig.TagTable; + tagTable = queryConfig.TableConfig.TagTable; tagTable.Name.Should().Be("cc"); tagTable.ColumnNames.OrderingId.Should().Be("a"); tagTable.ColumnNames.Tag.Should().Be("b"); tagTable.ColumnNames.PersistenceId.Should().Be("c"); tagTable.ColumnNames.SequenceNumber.Should().Be("d"); + #endregion } } } diff --git a/src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs b/src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs index f4185444..3c9169a7 100644 --- a/src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs +++ b/src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs @@ -154,32 +154,41 @@ protected override StringBuilder Build(StringBuilder sb) base.Build(sb); - if (IsDefaultPlugin) - { - sb.AppendLine("akka.persistence.query.journal.sql {"); - sb.AppendLine($"connection-string = {ConnectionString.ToHocon()}"); - sb.AppendLine($"provider-name = {ProviderName.ToHocon()}"); + BuildQueryConfig(sb, QueryPluginId); + + if (IsDefaultPlugin && Identifier is not "sql") + BuildQueryConfig(sb, "akka.persistence.query.journal.sql"); + + return sb; + } + + private StringBuilder BuildQueryConfig(StringBuilder sb, string queryPluginId) + { + sb.Append(queryPluginId).AppendLine("{"); + sb.AppendLine($"connection-string = {ConnectionString.ToHocon()}"); + sb.AppendLine($"provider-name = {ProviderName.ToHocon()}"); - if (DatabaseOptions is not null) - sb.AppendLine($"table-mapping = {DatabaseOptions.Mapping.Name().ToHocon()}"); + if (DatabaseOptions is not null) + sb.AppendLine($"table-mapping = {DatabaseOptions.Mapping.Name().ToHocon()}"); - if (TagStorageMode is not null) - sb.AppendLine($"tag-read-mode = {TagStorageMode.ToString().ToHocon()}"); + if (TagStorageMode is not null) + sb.AppendLine($"tag-read-mode = {TagStorageMode.ToString().ToHocon()}"); - if (TagSeparator is not null) - sb.AppendLine($"tag-separator = {TagSeparator.ToHocon()}"); + if (TagSeparator is not null) + sb.AppendLine($"tag-separator = {TagSeparator.ToHocon()}"); - if (ReadIsolationLevel is not null) - sb.AppendLine($"read-isolation-level = {ReadIsolationLevel.ToHocon()}"); - - DatabaseOptions?.Build(sb); - - sb.AppendLine("}"); - } + if (ReadIsolationLevel is not null) + sb.AppendLine($"read-isolation-level = {ReadIsolationLevel.ToHocon()}"); if (QueryRefreshInterval is not null) - sb.AppendLine($"akka.persistence.query.journal.sql.refresh-interval = {QueryRefreshInterval.ToHocon()}"); - + sb.AppendLine($"refresh-interval = {QueryRefreshInterval.ToHocon()}"); + + sb.AppendLine($"serializer = {Serializer.ToHocon()}"); + + DatabaseOptions?.Build(sb); + + sb.AppendLine("}"); + return sb; } }