From a8f90559b3a992baf51326a1d511aa6d53f628c7 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Feb 2021 12:35:49 -0600 Subject: [PATCH 1/5] Disable PR support for Akka.Persistence.Linq2Db release pipeline --- build-system/windows-release.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml index d1647f98..439bc99b 100644 --- a/build-system/windows-release.yaml +++ b/build-system/windows-release.yaml @@ -10,6 +10,8 @@ trigger: include: - refs/tags/* +pr: none + variables: - group: signingSecrets #create this group with SECRET variables `signingUsername` and `signingPassword` - group: nugetKeys #create this group with SECRET variables `nugetKey` From 24a798de0d2dbf3700351c44abd34301e856c93f Mon Sep 17 00:00:00 2001 From: Drew Date: Wed, 23 Jun 2021 15:06:13 -0400 Subject: [PATCH 2/5] Serialization pipeline Fixes/improvements (#22) * hang on to some bits * Use a proper yield enumerator in `NumericRangeEntry * Update Akka.Net version, Clean up dead code in `ByteArrayJournalSerializer`, remove perf numbers till we make new ones * remove csharp 9 features from main lib * more csproj fixes * Update Fixture usage to get unit tests working. * add some awaits on compatibility spec to help with raciness on actor shutdown. * harden snapshottableconfiguration, upgrade linq2db, add more tuning parameters --- README.md | 31 +--- ...tence.Linq2Db.Benchmark.DockerTests.csproj | 6 +- ...Persistence.Linq2Db.Benchmark.Tests.csproj | 6 +- .../DupJournalPerfSpec.cs | 3 +- ...e.Linq2Db.Compatibility.DockerTests.csproj | 4 +- ...istence.Linq2Db.Compatibility.Tests.csproj | 4 +- .../SqlCommonJournalCompatibilitySpec.cs | 2 + .../Sqlite/SQLiteCompatibilitySpecConfig.cs | 6 +- ...istence.Linq2Db.Journal.Query.Tests.csproj | 8 +- ...Persistence.Sql.Linq2Db.DockerTests.csproj | 16 +-- .../Docker/PostgreSQLFixture.cs | 83 ++++++----- .../Docker/SqlServerFixture.cs | 45 +++--- .../Akka.Persistence.Sql.Linq2Db.Tests.csproj | 16 +-- .../Akka.Persistence.Sql.Linq2Db.csproj | 12 +- .../Config/BaseByteArrayJournalDaoConfig.cs | 8 ++ .../Config/SnapshotTableConfiguration.cs | 9 +- .../Journal/DAO/BaseByteArrayJournalDao.cs | 88 +++++++++--- .../DAO/BaseJournalDaoWithReadMessages.cs | 2 +- .../Journal/DAO/ByteArrayJournalSerializer.cs | 134 +++++++++++------- .../Journal/DAO/FlowControl.cs | 12 ++ .../Journal/Linq2DbWriteJournal.cs | 2 +- .../Journal/Types/JournalRow.cs | 4 + .../Journal/Types/ReplayCompletion.cs | 21 ++- .../Query/Dao/BaseByteReadArrayJournalDAO.cs | 9 +- .../Query/Linq2DbReadJournal.cs | 4 +- .../Query/NumericRangeEntry.cs | 15 +- .../Serialization/PersistentReprSerializer.cs | 34 +---- .../Snapshot/ByteArraySnapshotSerializer.cs | 11 +- .../persistence.conf | 18 ++- .../snapshot.conf | 4 +- 30 files changed, 351 insertions(+), 266 deletions(-) diff --git a/README.md b/README.md index b18528e7..033c41e3 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,13 @@ A Cross-SQL-DB Engine Akka.Persistence plugin with broad database compatibility thanks to Linq2Db. -This is a Fairly-Naive port of the amazing akka-persistence-jdbc package from Scala to C#. +This is a port of the amazing akka-persistence-jdbc package from Scala, with a few improvements based on C# as well as our choice of data library. Please read the documentation carefully. Some features may be specific to use case and have trade-offs (namely, compatibility modes) ## Status -- Usable for basic Read/Writes + - Implements the following for `Akka.Persistence.Query`: - IPersistenceIdsQuery - ICurrentPersistenceIdsQuery @@ -20,9 +20,7 @@ Please read the documentation carefully. Some features may be specific to use ca - ICurrentAllEventsQuery - Snapshot Store Support -#### This is still a WORK IN PROGRESS - -**Pull Requests are Welcome** but please note this is still considered 'work in progress' and only used if one understands the risks. While the TCK Specs pass you should still test in a 'safe' non-production environment carefully before deciding to fully deploy. +See something you want to add or improve? **Pull Requests are Welcome!** Working: @@ -102,27 +100,8 @@ Compatibility with existing Providers is partially implemented via `table-compat # Performance -Tests based on i-7 8750H, 32GB Ram, 2TB SSD, Windows 10 version Version 10.0.19041.630. -Databases running on Docker WSL2. - -All numbers are in msg/sec. - -|Test |SqlServer (normal) | SqlServer Batching | Linq2Db |vs Normal| vs Batching| -|:------------- |:------------- | :----------: | -----------: | -----------: | -----------: | -|Persist | 164|427| 235|143.29%|55.04%| -|PersistAll | 782|875| 5609|717.26%|641.03%| -|PersistAsync | 630|846| 16099|2555.40%|1902.96%| -|PersistAllAsync | 2095|902| 15681|748.50%|1738.47%| -|PersistGroup10 | 590|680| 1069|181.19%|157.21%| -|PersistGroup100 | 607|965| 5537|912.19%|573.78%| -|PersistGroup200 | 628|1356| 7966|1268.47%|587.46%| -|PersistGroup25 | 629|675| 2189|348.01%|324.30%| -|PersistGroup400 | 612|1011| 7237|1182.52%|715.83%| -|PersistGroup50 | 612|654| 3867|631.86%|591.28%| -|Recovering | 41903|38766| 42592|101.64%|109.87%| -|Recovering8 | 75466|65515| 63960|84.75%|97.63%| -|RecoveringFour | 59259|51355| 58437|98.61%|113.79%| -|RecoveringTwo | 41745|35512| 41108|98.47%|115.76%| +Updated Performance numbers pending. + ## Sql.Common Compatibility modes - Delete Compatibility mode is available. diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj index 506f3ae7..24982456 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj +++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj @@ -13,12 +13,12 @@ - - + + - + diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj index 378a9226..fe771662 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj @@ -16,12 +16,12 @@ - - + + - + diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs index be9d7899..8bb31b23 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs @@ -136,6 +136,7 @@ internal void MeasureGroup(Func msg, Action block, int numMsg, { var measurements = new List(MeasurementIterations); + block(); block(); //warm-up int i = 0; @@ -550,7 +551,7 @@ public void PersistenceActor_performance_must_measure_Recovering8() FeedAndExpectLastSpecific(p6, "p", Commands); FeedAndExpectLastSpecific(p7, "p", Commands); FeedAndExpectLastSpecific(p8, "p", Commands); - MeasureGroup(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms", () => + MeasureGroup(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms , {(EventsCount*8 / d.TotalMilliseconds) * 1000} total msg/sec", () => { var task1 = Task.Run(()=> { diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj index bbb72821..61f8693a 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj @@ -12,8 +12,8 @@ - - + + diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj index 68626e8d..a9124902 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj @@ -9,8 +9,8 @@ - - + + diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs index 97789cc8..c442f370 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs @@ -50,6 +50,7 @@ public async Task Can_Recover_SqlCommon_Journal() persistRef.Tell(new SomeEvent(){EventName = "rec-test", Guid = ourGuid, Number = 1}); Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result); await persistRef.GracefulStop(TimeSpan.FromSeconds(5)); + await Task.Delay(1000); persistRef = sys1.ActorOf(Props.Create(() => new JournalCompatActor(NewJournal, "p-1")), "test-recover-1"); @@ -90,6 +91,7 @@ public async Task SqlCommon_Journal_Can_Recover_L2Db_Journal() persistRef.Tell(new SomeEvent(){EventName = "rec-test", Guid = ourGuid, Number = 1}); Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result); await persistRef.GracefulStop(TimeSpan.FromSeconds(5)); + await Task.Delay(1000); persistRef = sys1.ActorOf(Props.Create(() => new JournalCompatActor(OldJournal, "p-3")), "test-recover-2"); diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs index 92d9649d..43a97dfc 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs @@ -51,12 +51,13 @@ class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}"" provider-name = """ + LinqToDB.ProviderName.SQLiteMS + $@""" #use-clone-connection = true table-compatibility-mode = sqlite + table-name = ""{tablename}"" tables {{ snapshot {{ auto-init = true warn-on-auto-init-fail = false - table-name = ""{tablename}"" + #table-name = ""{tablename}"" }} }} }} @@ -92,11 +93,12 @@ class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}"" #connection-string = ""FullUri=file:test.db&cache=shared"" provider-name = ""{LinqToDB.ProviderName.SQLiteMS}"" parallelism = 3 + table-name = ""{tablename}"" table-compatibility-mode = ""sqlite"" tables.journal {{ auto-init = true warn-on-auto-init-fail = false - table-name = ""{tablename}"" + #table-name = ""{tablename}"" metadata-table-name = ""{metadatatablename}"" }} diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj index 476d40ec..39c48968 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj @@ -8,10 +8,10 @@ - - - - + + + + diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj index fc69ab38..5e0ec9d1 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj @@ -12,16 +12,16 @@ - - - - - - + + + + + + - + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs index dd9f6b94..130c5c86 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs @@ -15,7 +15,8 @@ namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker /// public class PostgreSQLFixture : IAsyncLifetime { - protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}"; + protected readonly string PostgresContainerName = $"postgresSqlServer-{Guid.NewGuid():N}"; + //protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}"; protected DockerClient Client; public PostgreSQLFixture() @@ -31,34 +32,33 @@ public PostgreSQLFixture() Client = config.CreateClient(); } - protected string PostgreSQLImageName - { - get - { - //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - // return "microsoft/mssql-server-windows-express"; - return "postgres"; - } - } + + protected string ImageName => "postgres"; + protected string Tag => "latest"; - protected string SqlServerImageTag - { - get - { - //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - // return "2017-latest"; - return "latest"; - } - } + protected string PostgresImageName => $"{ImageName}:{Tag}"; public string ConnectionString { get; private set; } public async Task InitializeAsync() { - var images = await Client.Images.ListImagesAsync(new ImagesListParameters {MatchName = PostgreSQLImageName}); + var images = await Client.Images.ListImagesAsync(new ImagesListParameters + { + Filters = new Dictionary> + { + { + "reference", + new Dictionary + { + {PostgresImageName, true} + } + } + } + }); + if (images.Count == 0) await Client.Images.CreateImageAsync( - new ImagesCreateParameters {FromImage = PostgreSQLImageName, Tag = "latest"}, null, + new ImagesCreateParameters { FromImage = ImageName, Tag = Tag }, null, new Progress(message => { Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage) @@ -66,13 +66,13 @@ await Client.Images.CreateImageAsync( : $"{message.ID} {message.Status} {message.ProgressMessage}"); })); - var postgresPort = ThreadLocalRandom.Current.Next(9000, 10000); + var sqlServerHostPort = ThreadLocalRandom.Current.Next(9000, 10000); // create the container await Client.Containers.CreateContainerAsync(new CreateContainerParameters { - Image = PostgreSQLImageName, - Name = PostgreSqlImageName, + Image = PostgresImageName, + Name = PostgresContainerName, Tty = true, ExposedPorts = new Dictionary { @@ -88,37 +88,44 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters { new PortBinding { - HostPort = $"{postgresPort}" + HostPort = $"{sqlServerHostPort}" } } } } }, - Env = new[] {"POSTGRES_PASSWORD=l0lTh1sIsOpenSource"} + Env = new[] + { + "POSTGRES_PASSWORD=postgres", + "POSTGRES_USER=postgres" + } }); // start the container - await Client.Containers.StartContainerAsync(PostgreSqlImageName, new ContainerStartParameters()); + await Client.Containers.StartContainerAsync(PostgresContainerName, new ContainerStartParameters()); - // Provide a 30 second startup delay - await Task.Delay(TimeSpan.FromSeconds(30)); + // Provide a 10 second startup delay + await Task.Delay(TimeSpan.FromSeconds(10)); - var connectionString = new NpgsqlConnectionStringBuilder() - { - Host = "localhost", Password = "l0lTh1sIsOpenSource", - Username = "postgres", Database = "postgres", - Port = postgresPort - }; + ConnectionString = $"Server=127.0.0.1;Port={sqlServerHostPort};" + + "Database=postgres;User Id=postgres;Password=postgres"; - ConnectionString = connectionString.ToString(); + //var connectionString = new NpgsqlConnectionStringBuilder() + //{ + // Host = "localhost", Password = "l0lTh1sIsOpenSource", + // Username = "postgres", Database = "postgres", + // Port = sqlServerHostPort + //}; + // + //ConnectionString = connectionString.ToString(); } public async Task DisposeAsync() { if (Client != null) { - await Client.Containers.StopContainerAsync(PostgreSqlImageName, new ContainerStopParameters()); - await Client.Containers.RemoveContainerAsync(PostgreSqlImageName, + await Client.Containers.StopContainerAsync(PostgresContainerName, new ContainerStopParameters()); + await Client.Containers.RemoveContainerAsync(PostgresContainerName, new ContainerRemoveParameters {Force = true}); Client.Dispose(); } diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs index 1737bc23..e04f9267 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs @@ -31,34 +31,35 @@ public SqlServerFixture() Client = config.CreateClient(); } - protected string SqlServerImageName - { - get - { - //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - // return "microsoft/mssql-server-windows-express"; - return "mcr.microsoft.com/mssql/server"; - } - } - - protected string SqlServerImageTag - { - get - { - //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - // return "2017-latest"; - return "2017-latest-ubuntu"; - } - } + protected string ImageName => + //RuntimeInformation.IsOSPlatform(OSPlatform.Windows) + // ? "microsoft/mssql-server-windows-express" + // : + "mcr.microsoft.com/mssql/server"; + protected string Tag => "2017-latest"; + protected string SqlServerImageName => $"{ImageName}:{Tag}"; public string ConnectionString { get; private set; } public async Task InitializeAsync() { - var images = await Client.Images.ListImagesAsync(new ImagesListParameters {MatchName = SqlServerImageName}); + var images = await Client.Images.ListImagesAsync(new ImagesListParameters + { + Filters = new Dictionary> + { + { + "reference", + new Dictionary + { + {SqlServerImageName, true} + } + } + } + }); + if (images.Count == 0) await Client.Images.CreateImageAsync( - new ImagesCreateParameters {FromImage = SqlServerImageName, Tag = "latest"}, null, + new ImagesCreateParameters {FromImage = ImageName, Tag = Tag}, null, new Progress(message => { Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage) @@ -107,7 +108,7 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters var connectionString = new DbConnectionStringBuilder { ConnectionString = - "data source=.;database=akka_persistence_tests;user id=sa;password=l0lTh1sIsOpenSource;" + "data source=.;database=akka_persistence_tests;user id=sa;password=l0lTh1sIsOpenSource" }; connectionString["Data Source"] = $"localhost,{sqlServerHostPort}"; diff --git a/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj b/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj index 2cacaa68..7369e828 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj @@ -11,16 +11,16 @@ - - - - - - + + + + + + - + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj b/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj index ea42a61f..25ab378a 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj @@ -3,18 +3,18 @@ - 8 + default $(LibraryFramework) An Akka Persistence Module for SQL Databases using Linq2Db. - - - - + + + + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs index e3dbce24..0410df3d 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs @@ -9,6 +9,10 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config) BufferSize = config.GetInt("buffer-size", 5000); BatchSize = config.GetInt("batch-size", 100); + DbRoundTripBatchSize = config.GetInt("db-round-trip-max-batch-size", 1000); + PreferParametersOnMultiRowInsert = + config.GetBoolean("prefer-parameters-on-multirow-insert", + false); ReplayBatchSize = config.GetInt("replay-batch-size", 1000); Parallelism = config.GetInt("parallelism", 2); LogicalDelete = config.GetBoolean("logical-delete", false); @@ -17,6 +21,10 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config) config.GetBoolean("delete-compatibility-mode", true); } + public bool PreferParametersOnMultiRowInsert { get; set; } + + public int DbRoundTripBatchSize { get; set; } + /// /// Specifies the batch size at which point /// will switch to 'Default' instead of 'MultipleRows'. For smaller sets diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs index ac4246ae..1b39ddda 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs @@ -9,12 +9,11 @@ public class SnapshotTableConfiguration { public SnapshotTableConfiguration(Configuration.Config config) { - config = - config.SafeWithFallback(Linq2DbSnapshotStore - .DefaultConfiguration); - var localcfg = config.GetConfig("tables.snapshot"); + + var localcfg = config.GetConfig("tables.snapshot") + .SafeWithFallback(config).SafeWithFallback(Configuration.Config.Empty); ColumnNames= new SnapshotTableColumnNames(config); - TableName = localcfg.GetString("table-name", "snapshot"); + TableName = config.GetString("table-name", localcfg.GetString("table-name", "snapshot")); SchemaName = localcfg.GetString("schema-name", null); AutoInitialize = localcfg.GetBoolean("auto-init", false); WarnOnAutoInitializeFail = diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs index 3afe256c..0ef35926 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Collections.Immutable; +using System.Data; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -37,6 +38,7 @@ public abstract class BaseByteArrayJournalDao : public bool logicalDelete; protected readonly ILoggingAdapter _logger; private Flow, long)>, NotUsed> deserializeFlow; + private Flow, NotUsed> deserializeFlowMapped; protected BaseByteArrayJournalDao(IAdvancedScheduler sched, IMaterializer materializerr, @@ -49,6 +51,7 @@ protected BaseByteArrayJournalDao(IAdvancedScheduler sched, logicalDelete = _journalConfig.DaoConfig.LogicalDelete; Serializer = serializer; deserializeFlow = Serializer.DeserializeFlow(); + deserializeFlowMapped = Serializer.DeserializeFlow().Select(MessageWithBatchMapper()); //Due to C# rules we have to initialize WriteQueue here //Keeping it here vs init function prevents accidental moving of init //to where variables aren't set yet. @@ -132,13 +135,38 @@ private async Task QueueWriteJournalRows(Seq xs) private async Task WriteJournalRows(Seq xs) { - using (var db = _connectionFactory.GetConnection()) { //hot path: //If we only have one row, penalty for BulkCopy //Isn't worth it due to insert caching/etc. if (xs.Count > 1) { + await InsertMultiple(xs); + } + else if (xs.Count > 0) + { + await InsertSingle(xs); + } + } + + } + + private async Task InsertSingle(Seq xs) + { + using (var db = _connectionFactory.GetConnection()) + { + await db.InsertAsync(xs.Head); + } + } + + private async Task InsertMultiple(Seq xs) + { + using (var db = _connectionFactory.GetConnection()) + { + try + { + await db.BeginTransactionAsync(IsolationLevel + .ReadCommitted); await db.GetTable() .BulkCopyAsync( new BulkCopyOptions() @@ -148,30 +176,50 @@ await db.GetTable() .MaxRowByRowSize ? BulkCopyType.Default : BulkCopyType.MultipleRows, - UseInternalTransaction = true + //TODO: When Parameters are allowed, + //Make a Config Option + //Or default to true + UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert, + MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize }, xs); + await db.CommitTransactionAsync(); } - else if (xs.Count > 0) + catch (Exception e) { - await db.InsertAsync(xs.Head); + try + { + await db.RollbackTransactionAsync(); + } + catch (Exception exception) + { + throw e; + } + + throw; } } - } - + public async Task> AsyncWriteMessages( IEnumerable messages, long timeStamp = 0) { - var serializedTries = Serializer.Serialize(messages, timeStamp); //Just a little bit of magic here; //.ToList() keeps it all working later for whatever reason //while still keeping our allocations in check. + + /*var trySet = new List(); + foreach (var serializedTry in serializedTries) + { + trySet.AddRange(serializedTry.Success.GetOrElse(new List(0))); + } + + var rows = Seq(trySet);*/ var rows = Seq(serializedTries.SelectMany(serializedTry => serializedTry.Success.GetOrElse(new List(0))) .ToList()); - + // return await QueueWriteJournalRows(rows).ContinueWith(task => @@ -400,30 +448,27 @@ public override { query = query.Take((int) max); } - - var runninng = query.ToListAsync(); - return Source.FromTask(runninng).SelectMany(r => r) - .Via(deserializeFlow.Select(MessageWithBatchMapper())); + + + return + Source.FromTask(query.ToListAsync()) + .SelectMany(r => r) + .Via(deserializeFlowMapped); //return AsyncSource.FromEnumerable(query,async q=>await q.ToListAsync()) // .Via( // deserializeFlow).Select(MessageWithBatchMapper()); } } - private static Func, long)>, Util.Try> MessageWithBatchMapper() - { - return sertry => + private static Func, long)>, Util.Try> MessageWithBatchMapper() => + sertry => { if (sertry.IsSuccess) { - var success = sertry.Success.Value; return new Util.Try( - new ReplayCompletion() - { - repr = success.Item1, - Ordering = success.Item3 - }); + new ReplayCompletion( sertry.Success.Value + )); } else { @@ -432,6 +477,5 @@ public override sertry.Failure.Value); } }; - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs index 2ff59c08..78871b38 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs @@ -61,7 +61,7 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec, Util.Option lastSeq = Util.Option.None; if (lastMsg != null && lastMsg.IsSuccess) { - lastSeq = lastMsg.Success.Select(r => r.repr.SequenceNr); + lastSeq = lastMsg.Success.Select(r => r.Repr.SequenceNr); } else if (lastMsg != null && lastMsg.Failure.HasValue) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs index 16e418b0..21ec4de9 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs @@ -28,41 +28,67 @@ public ByteArrayJournalSerializer(IProviderConfig journalCon _separator = separator; _separatorArray = new[] {_separator}; } + + /// + /// Concatenates a set of tags using a provided separator. + /// + /// + /// + /// + private static string StringSep(IImmutableSet tags, + string separator) + { + if (tags.Count == 0) + { + return ""; + } + + return tags.Aggregate((tl, tr) => + tl + separator + tr); + } + protected override Try Serialize(IPersistentRepresentation persistentRepr, IImmutableSet tTags, long timeStamp = 0) { try { - var serializer = _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer); // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - string manifest = ""; - var binary = Akka.Serialization.Serialization.WithTransport(_serializer.System, () => - { - - if (serializer is SerializerWithStringManifest stringManifest) - { - manifest = - stringManifest.Manifest(persistentRepr.Payload); - } - else - { - if (serializer.IncludeManifest) + return Akka.Serialization.Serialization.WithTransport( + _serializer.System, (persistentRepr + , _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer), + StringSep(tTags,_separator), + timeStamp + ), + state => { - manifest = persistentRepr.Payload.GetType().TypeQualifiedName(); - } - } - - return serializer.ToBinary(persistentRepr.Payload); - }); - return new Try(new JournalRow() - { - manifest = manifest, - message = binary, - persistenceId = persistentRepr.PersistenceId, - tags = tTags.Any()? tTags.Aggregate((tl, tr) => tl + _separator + tr) : "", - Identifier = serializer.Identifier, - sequenceNumber = persistentRepr.SequenceNr, - Timestamp = persistentRepr.Timestamp==0? timeStamp: persistentRepr.Timestamp - }); + var (_persistentRepr, serializer,tags,ts) = state; + string thisManifest = ""; + if (serializer is SerializerWithStringManifest withStringManifest) + { + thisManifest = + withStringManifest.Manifest(_persistentRepr.Payload); + } + else + { + if (serializer.IncludeManifest) + { + thisManifest = _persistentRepr.Payload + .GetType().TypeQualifiedName(); + } + } + return new Try(new JournalRow() + { + message = + serializer.ToBinary(_persistentRepr.Payload), + manifest = thisManifest, + persistenceId = _persistentRepr.PersistenceId, + tags = tags, + Identifier = serializer.Identifier, + sequenceNumber = _persistentRepr.SequenceNr, + Timestamp = _persistentRepr.Timestamp == 0 + ? ts + : _persistentRepr.Timestamp + }); + }); } catch (Exception e) { @@ -74,34 +100,44 @@ protected override Try Serialize(IPersistentRepresentation persisten { try { - object deserialized = null; - if (t.Identifier.HasValue == false) + //object deserialized = null; + var identifierMaybe = t.Identifier; + if (identifierMaybe.HasValue == false) { var type = System.Type.GetType(t.manifest, true); - var deserializer = - _serializer.FindSerializerForType(type, null); + // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - deserialized = - Akka.Serialization.Serialization.WithTransport( - _serializer.System, - () => deserializer.FromBinary(t.message, type)); + + return new Try<(IPersistentRepresentation, IImmutableSet + , long)>(( + new Persistent( Akka.Serialization.Serialization.WithTransport( + _serializer.System, (_serializer.FindSerializerForType(type,_journalConfig.DefaultSerializer),t.message,type), + (state) => + { + return state.Item1.FromBinary( + state.message, state.type); + }), t.sequenceNumber, + t.persistenceId, + t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), + t.tags?.Split(_separatorArray, + StringSplitOptions.RemoveEmptyEntries) + .ToImmutableHashSet() ?? ImmutableHashSet.Empty, + t.ordering)); } else { - var serializerId = t.Identifier.Value; + return new Try<(IPersistentRepresentation, IImmutableSet + , long)>(( + new Persistent(_serializer.Deserialize(t.message, + identifierMaybe.Value,t.manifest), t.sequenceNumber, + t.persistenceId, + t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), + t.tags?.Split(_separatorArray, + StringSplitOptions.RemoveEmptyEntries) + .ToImmutableHashSet() ?? ImmutableHashSet.Empty, + t.ordering)); // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - deserialized = _serializer.Deserialize(t.message, - serializerId,t.manifest); } - - return ( - new Persistent(deserialized, t.sequenceNumber, - t.persistenceId, - t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), - t.tags?.Split(_separatorArray, - StringSplitOptions.RemoveEmptyEntries) - .ToImmutableHashSet() ?? ImmutableHashSet.Empty, - t.ordering); } catch (Exception e) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs index ffbbb005..2a575137 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs @@ -4,16 +4,28 @@ public class FlowControl { public class Continue : FlowControl { + private Continue() + { + } + public static Continue Instance = new Continue(); } public class ContinueDelayed : FlowControl { + private ContinueDelayed() + { + } + public static ContinueDelayed Instance = new ContinueDelayed(); } public class Stop : FlowControl { + private Stop() + { + } + public static Stop Instance = new Stop(); } } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs index f78cd947..7093bf17 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs @@ -133,7 +133,7 @@ await _journal.MessagesWithBatch(persistenceId, fromSequenceNr, t.Failure.Value)) .RunForeach(r => { - recoveryCallback(r.repr); + recoveryCallback(r.Repr); }, _mat); } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs index 80b771cc..4574e9d6 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs @@ -5,6 +5,10 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.Types { public sealed class JournalRow { + public JournalRow() + { + + } public long ordering { get; set; } public long Timestamp { get; set; } = 0; diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs index c5e105f6..28cad0ff 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs @@ -1,8 +1,23 @@ -namespace Akka.Persistence.Sql.Linq2Db.Journal.Types +using System.Collections.Immutable; + +namespace Akka.Persistence.Sql.Linq2Db.Journal.Types { public class ReplayCompletion { - public IPersistentRepresentation repr { get; set; } - public long Ordering { get; set; } + public ReplayCompletion(IPersistentRepresentation repr, long ordering) + { + Repr = repr; + Ordering = ordering; + } + + public readonly IPersistentRepresentation Repr; + public readonly long Ordering; + + public ReplayCompletion((IPersistentRepresentation, IImmutableSet, long) success) + { + //(Repr, _, Ordering) = success; + Repr = success.Item1; + Ordering = success.Item3; + } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index 28afb73d..9d1ff11f 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -156,11 +156,10 @@ private Flow perfectlyMatchTag( DataConnection dc, string persistenceId, long fromSequenceNr, long toSequenceNr, long max) { - var toTake = MaxTake(max); return AsyncSource.FromEnumerable( new { - dc, persistenceId, fromSequenceNr, toSequenceNr, toTake, + dc, persistenceId, fromSequenceNr, toSequenceNr,toTake= MaxTake(max), includeDeleted }, async (state) => @@ -180,10 +179,8 @@ await baseQueryStatic(state.dc, state.includeDeleted) { var val = t.Get(); return new Akka.Util.Try( - new ReplayCompletion() - { - repr = val.Item1, Ordering = val.Item3 - }); + new ReplayCompletion(val.Item1,val.Item3) + ); } catch (Exception e) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs index e03f9f33..03bca12e 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs @@ -154,8 +154,8 @@ private Source _eventsByPersistenceIdSource( toSequenceNr, batchSize, refreshInterval) .SelectAsync, ReplayCompletion, NotUsed>(1, reprAndOrdNr => Task.FromResult(reprAndOrdNr.Get())) - .SelectMany((ReplayCompletion r) => _adaptEvents(r.repr) - .Select(p => new {repr = r.repr, ordNr = r.Ordering})) + .SelectMany((ReplayCompletion r) => _adaptEvents(r.Repr) + .Select(p => new {repr = r.Repr, ordNr = r.Ordering})) .Select(r => new EventEnvelope(new Sequence(r.ordNr), r.repr.PersistenceId, r.repr.SequenceNr, r.repr.Payload,r.repr.Timestamp)); } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs index ad6892cf..24c67720 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs @@ -25,23 +25,10 @@ public bool InRange(long number) public IEnumerable ToEnumerable() { - var itemCount = until - from; - List returnList; - if (itemCount < Int32.MaxValue) - { - returnList = new List(); - } - else - { - returnList = new List(); - } - for (long i = from; i < until; i++) { - returnList.Add(i); + yield return i; } - - return returnList; } public IEnumerator GetEnumerator() diff --git a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs index a897c59a..c2b1e347 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs @@ -44,6 +44,7 @@ public abstract class PersistentReprSerializer if (opt.HasValue) { retList.Add(opt.Value); + return new Util.Try>(retList); } else { @@ -65,43 +66,16 @@ public abstract class PersistentReprSerializer return new Util.Try>(ser.Failure.Value); } } + + return new Util.Try>(retList); } - return new Util.Try>(retList); + //return new Util.Try>(retList); }).ToList(); } - public Seq>> SerializeSeq( - IEnumerable messages) - { - return messages.Select(aw => - { - return Util.Try>.From(() => - { - var serialized = - (aw.Payload as IEnumerable) - .Select(p=> Serialize(p).Get()); - return serialized.ToSeq(); - }); - }).ToSeq(); - - //return messages.Select(aw => - //{ - // var serialized = (aw.Payload as IEnumerable) - // .Select(Serialize). - //}) - //return Seq(messages.Select(aw => - //{ - // var serialized = - // (aw.Payload as IEnumerable) - // .Select(Serialize); - // return TrySeq.SequenceSeq(serialized); - //})); - } - - public Akka.Util.Try Serialize(IPersistentRepresentation persistentRepr, long timeStamp = 0) { switch (persistentRepr.Payload) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs index e7275493..90372dbf 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs @@ -45,8 +45,11 @@ protected object GetSnapshot(SnapshotRow reader) { var type = Type.GetType(manifest, true); // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - var serializer = _serialization.FindSerializerForType(type, _config.DefaultSerializer); - obj = Akka.Serialization.Serialization.WithTransport(_serialization.System, () => serializer.FromBinary(binary, type)); + obj = Akka.Serialization.Serialization.WithTransport( + _serialization.System, + (serializer: _serialization.FindSerializerForType(type, _config.DefaultSerializer), + binary, type), + (state) => state.serializer.FromBinary(state.binary, state.type)); } else { @@ -60,8 +63,8 @@ private SnapshotRow ToSnapshotEntry(SnapshotMetadata metadata, object snapshot) { var snapshotType = snapshot.GetType(); var serializer = _serialization.FindSerializerForType(snapshotType, _config.DefaultSerializer); - var binary = Akka.Serialization.Serialization.WithTransport(_serialization.System, - () => serializer.ToBinary(snapshot)); + var binary = Akka.Serialization.Serialization.WithTransport(_serialization.System,(serializer, snapshot), + (state ) => state.serializer.ToBinary(state.snapshot)); string manifest = ""; if (serializer is SerializerWithStringManifest) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf index 0efd8e7b..b0000294 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf +++ b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf @@ -18,7 +18,9 @@ # Rather than actual physical deletions logical-delete = false - # If true, journal_metadata is created + # If true, journal_metadata is created and used for deletes + # and max sequence number queries. + # note that there is a performance penalty for using this. delete-compatibility-mode = true # If "sqlite" or "sqlserver", default column names are compatible with @@ -32,11 +34,25 @@ buffer-size = 5000 #Batch size refers to the number of items included in a batch to DB + # (In cases where an AtomicWrite is greater than batch-size, + # The Atomic write will still be handled in a single batch.) #JDBC Default is/was 400 but testing against scenarios indicates #100 is better for overall latency. That said, #larger batches may be better if you have A fast/local DB. batch-size = 100 + #This batch size controls the maximum number of rows that will be sent + #In a single round trip to the DB. This is different than the -actual- batch size, + #And intentionally set larger than batch-size, + #to help atomicwrites be faster + #Note that Linq2Db may use a lower number per round-trip in some cases. + db-round-trip-max-batch-size = 1000 + + #Linq2Db by default will use a built string for multi-row inserts + #Somewhat counterintuitively, this is faster than using parameters in most cases, + #But if you would prefer parameters, you can set this to true. + prefer-parameters-on-multirow-insert = true + # Denotes the number of messages retrieved # Per round-trip to DB on recovery. # This is to limit both size of dataset from DB (possibly lowering locking requirements) diff --git a/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf b/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf index d30aa2d8..40ca63b0 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf +++ b/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf @@ -23,8 +23,7 @@ # Column names will be compatible with Akka.Persistence.Sql # You still must set your table name! table-compatibility-mode = false - tables - { + tables. snapshot { schema-name = null @@ -69,7 +68,6 @@ serializerId: "serializer_id", } } - } } } } \ No newline at end of file From 103d4830bab2ceb9524efc57157e45f0a3c48b44 Mon Sep 17 00:00:00 2001 From: Drew Date: Sat, 3 Jul 2021 12:44:40 -0400 Subject: [PATCH 3/5] Fix pendingwrite clear on persistasync edge case (#25) * fix edge case on multiple PersistAsync calls and MaxSequenceNumber * update nuget packages * Try adding some delays after gracefulstop to deal with racy CI runs * Move SqlCommon Benchmark tests used for comparison to separate Project so we can skip in CI --- ...2Db.Benchmark.DockerComparisonTests.csproj | 31 +++++++++++++++++ .../PostgreSQLSpecsFixture.cs | 11 ++++++ .../DockerBatchingSqlServerJournalPerfSpec.cs | 4 +-- .../DockerPostgreSQLJournalPerfSpec.cs | 4 +-- .../DockerSqlServerJournalPerfSpec.cs | 4 +-- .../SqlServerSpecsFixture.cs | 11 ++++++ Akka.Persistence.Linq2Db.sln | 7 ++++ ...tence.Linq2Db.Benchmark.DockerTests.csproj | 8 ++--- .../DockerLinq2DbSqlServerJournalPerfSpec.cs | 1 + ...Persistence.Linq2Db.Benchmark.Tests.csproj | 8 ++--- ...e.Linq2Db.Compatibility.DockerTests.csproj | 4 +-- ...istence.Linq2Db.Compatibility.Tests.csproj | 4 +-- .../SqlCommonSnapshotCompatibilitySpec.cs | 4 +++ ...istence.Linq2Db.Journal.Query.Tests.csproj | 8 ++--- ...Persistence.Sql.Linq2Db.DockerTests.csproj | 14 ++++---- .../Akka.Persistence.Sql.Linq2Db.Tests.csproj | 14 ++++---- .../Akka.Persistence.Sql.Linq2Db.csproj | 10 +++--- .../Journal/DAO/BaseByteArrayJournalDao.cs | 3 +- .../Journal/Linq2DbWriteJournal.cs | 34 +++++++++---------- .../Util/NoThrowAwaiter.cs | 20 +++++++++++ 20 files changed, 143 insertions(+), 61 deletions(-) create mode 100644 Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj create mode 100644 Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/PostgreSQLSpecsFixture.cs rename {src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker => Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests}/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs (93%) rename {src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker => Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests}/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs (93%) rename {src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker => Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests}/SqlCommon/DockerSqlServerJournalPerfSpec.cs (93%) create mode 100644 Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlServerSpecsFixture.cs create mode 100644 src/Akka.Persistence.Sql.Linq2Db/Util/NoThrowAwaiter.cs diff --git a/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj new file mode 100644 index 00000000..0b02836b --- /dev/null +++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj @@ -0,0 +1,31 @@ + + + + netcoreapp3.1 + + false + + + + false + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + diff --git a/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/PostgreSQLSpecsFixture.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/PostgreSQLSpecsFixture.cs new file mode 100644 index 00000000..71e39ccf --- /dev/null +++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/PostgreSQLSpecsFixture.cs @@ -0,0 +1,11 @@ +using Akka.Persistence.Sql.Linq2Db.Tests.Docker; +using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; +using Xunit; + +namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.Linq2Db +{ + [CollectionDefinition("PostgreSQLSpec")] + public sealed class PostgreSQLSpecsFixture : ICollectionFixture + { + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs similarity index 93% rename from src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs rename to Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs index 376eff29..e7072e06 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs +++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs @@ -1,10 +1,10 @@ using Akka.Configuration; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker; +using Akka.Persistence.Linq2Db.BenchmarkTests; using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; using Xunit; using Xunit.Abstractions; -namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.SqlCommon +namespace Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.SqlCommon { [Collection("SqlServerSpec")] public class DockerBatchingSqlServerJournalPerfSpec : L2dbJournalPerfSpec diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs similarity index 93% rename from src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs rename to Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs index b0a17968..95183d4e 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs +++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs @@ -1,10 +1,10 @@ using Akka.Configuration; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker; +using Akka.Persistence.Linq2Db.BenchmarkTests; using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; using Xunit; using Xunit.Abstractions; -namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.SqlCommon +namespace Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.SqlCommon { [Collection("PostgreSQLSpec")] public class DockerPostgreSQLJournalPerfSpec : L2dbJournalPerfSpec diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerSqlServerJournalPerfSpec.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs similarity index 93% rename from src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerSqlServerJournalPerfSpec.cs rename to Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs index c2fd1d62..b1f72492 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerSqlServerJournalPerfSpec.cs +++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs @@ -1,10 +1,10 @@ using Akka.Configuration; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker; +using Akka.Persistence.Linq2Db.BenchmarkTests; using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; using Xunit; using Xunit.Abstractions; -namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.SqlCommon +namespace Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.SqlCommon { [Collection("SqlServerSpec")] public class DockerSqlServerJournalPerfSpec : L2dbJournalPerfSpec diff --git a/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlServerSpecsFixture.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlServerSpecsFixture.cs new file mode 100644 index 00000000..6f2bf65a --- /dev/null +++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlServerSpecsFixture.cs @@ -0,0 +1,11 @@ +using Akka.Persistence.Sql.Linq2Db.Tests.Docker; +using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; +using Xunit; + +namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.Linq2Db +{ + [CollectionDefinition("SqlServerSpec")] + public sealed class SqlServerSpecsFixture : ICollectionFixture + { + } +} \ No newline at end of file diff --git a/Akka.Persistence.Linq2Db.sln b/Akka.Persistence.Linq2Db.sln index 770be733..f84ac72e 100644 --- a/Akka.Persistence.Linq2Db.sln +++ b/Akka.Persistence.Linq2Db.sln @@ -31,8 +31,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "_", "_", "{20C26B2D-59EA-42 ProjectSection(SolutionItems) = preProject src\common.props = src\common.props README.md = README.md + RELEASE_NOTES.md = RELEASE_NOTES.md EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests", "Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests\Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj", "{170698FA-DA1E-40BC-896D-AFA67976C0EB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -71,6 +74,10 @@ Global {095E392C-C439-4EBC-ABCB-6AA87FCBD9E9}.Debug|Any CPU.Build.0 = Debug|Any CPU {095E392C-C439-4EBC-ABCB-6AA87FCBD9E9}.Release|Any CPU.ActiveCfg = Release|Any CPU {095E392C-C439-4EBC-ABCB-6AA87FCBD9E9}.Release|Any CPU.Build.0 = Release|Any CPU + {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj index 24982456..74ee4282 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj +++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj @@ -13,12 +13,12 @@ - - - + + + - + diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs index 3c5eb7eb..daf873c5 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs @@ -33,6 +33,7 @@ class = ""{0}"" #connection-string = ""FullUri=file:test.db&cache=shared"" provider-name = """ + LinqToDB.ProviderName.SqlServer2017 + @""" use-clone-connection = true + #prefer-parameters-on-multirow-insert = true tables.journal {{ auto-init = true warn-on-auto-init-fail = false diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj index fe771662..e74918f5 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj @@ -16,12 +16,12 @@ - - - + + + - + diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj index 61f8693a..342de0a8 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj @@ -12,8 +12,8 @@ - - + + diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj index a9124902..249d0002 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj @@ -9,8 +9,8 @@ - - + + diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs index b4726c7c..77584e8b 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs @@ -44,6 +44,7 @@ public async Task Can_Recover_SqlCommon_Snapshot() Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result); await Task.Delay(TimeSpan.FromSeconds(2)); await persistRef.GracefulStop(TimeSpan.FromSeconds(5)); + await Task.Delay(TimeSpan.FromSeconds(2)); persistRef = sys1.ActorOf(Props.Create(() => new SnapshotCompatActor(NewSnapshot, "p-1")), "test-snap-recover-1"); @@ -63,6 +64,7 @@ public async Task Can_Persist_SqlCommon_Snapshot() Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result); await Task.Delay(TimeSpan.FromSeconds(2)); await persistRef.GracefulStop(TimeSpan.FromSeconds(5)); + await Task.Delay(TimeSpan.FromSeconds(2)); persistRef = sys1.ActorOf(Props.Create(() => new SnapshotCompatActor(NewSnapshot, "p-2")), "test-snap-persist-1"); @@ -86,6 +88,7 @@ public async Task SqlCommon_Snapshot_Can_Recover_L2Db_Snapshot() Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result); await Task.Delay(TimeSpan.FromSeconds(2)); await persistRef.GracefulStop(TimeSpan.FromSeconds(5)); + await Task.Delay(TimeSpan.FromSeconds(2)); persistRef = sys1.ActorOf(Props.Create(() => new SnapshotCompatActor(OldSnapshot, "p-3")), "test-snap-recover-2"); @@ -105,6 +108,7 @@ public async Task SqlCommon_Snapshot_Can_Persist_L2db_Snapshot() Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result); await Task.Delay(TimeSpan.FromSeconds(2)); await persistRef.GracefulStop(TimeSpan.FromSeconds(5)); + await Task.Delay(TimeSpan.FromSeconds(2)); persistRef = sys1.ActorOf(Props.Create(() => new SnapshotCompatActor(OldSnapshot, "p-4")), "test-snap-persist-2"); diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj index 39c48968..b2c6f4b5 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj @@ -8,10 +8,10 @@ - - - - + + + + diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj index 5e0ec9d1..1ff734b6 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj @@ -12,16 +12,16 @@ - - - - - - + + + + + + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj b/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj index 7369e828..904f2731 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj @@ -11,16 +11,16 @@ - - - - - - + + + + + + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj b/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj index 25ab378a..e6a1c1b0 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj @@ -9,12 +9,12 @@ - - - - + + + + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs index 0ef35926..d80fc3bc 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs @@ -287,8 +287,7 @@ await db.GetTable() }, jmd => new JournalMetaData() { - PersistenceId = persistenceId, - SequenceNumber = maxMarkedDeletion + }, () => new JournalMetaData() { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs index 7093bf17..d35a87d9 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; using System.Collections.Immutable; +using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -14,11 +16,12 @@ using Akka.Persistence.Sql.Linq2Db.Utility; using Akka.Streams; using Akka.Streams.Dsl; -using Akka.Util; using Akka.Util.Internal; +using LanguageExt; namespace Akka.Persistence.Sql.Linq2Db.Journal { + public class DateTimeHelpers { private static DateTime UnixEpoch = new DateTime(1970,1,1,0,0,0,DateTimeKind.Utc); @@ -102,14 +105,18 @@ protected override bool ReceivePluginInternal(object message) { if (message is WriteFinished wf) { - writeInProgress.Remove(wf.PersistenceId); + + if (writeInProgress.TryGetValue(wf.PersistenceId, + out Task latestPending) & latestPending == wf.Future) + { + writeInProgress.Remove(wf.PersistenceId); + } + return true; } else { return false; } - - return true; } public override void AroundPreRestart(Exception cause, object message) @@ -125,7 +132,7 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per { await _journal.MessagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, _journalConfig.DaoConfig.ReplayBatchSize, - Option<(TimeSpan, IScheduler)>.None) + Util.Option<(TimeSpan, IScheduler)>.None) .Take(max).SelectAsync(1, t => t.IsSuccess ? Task.FromResult(t.Success.Value) @@ -142,18 +149,9 @@ public override async Task ReadHighestSequenceNrAsync(string persistenceId { if (writeInProgress.ContainsKey(persistenceId)) { - try - { - await writeInProgress[persistenceId]; - } - catch (Exception) - { - //We don't have 'Recover' in C# so this is intentionally empty - //Basically we just wanted to wait for write to succeed OR fail - } - var hsn =await _journal.HighestSequenceNr(persistenceId, - fromSequenceNr); - return hsn; + //We don't care whether the write succeeded or failed + //We just want it to finish. + await new NoThrowAwaiter(writeInProgress[persistenceId]); } return await _journal.HighestSequenceNr(persistenceId, fromSequenceNr); } @@ -167,7 +165,7 @@ protected override async Task> var persistenceId = messages.Head().PersistenceId; var future = _journal.AsyncWriteMessages(messages,currentTime); - writeInProgress.AddOrSet(persistenceId, future); + writeInProgress[persistenceId] = future; var self = Self; //When we are done, we want to send a 'WriteFinished' so that diff --git a/src/Akka.Persistence.Sql.Linq2Db/Util/NoThrowAwaiter.cs b/src/Akka.Persistence.Sql.Linq2Db/Util/NoThrowAwaiter.cs new file mode 100644 index 00000000..f62243dd --- /dev/null +++ b/src/Akka.Persistence.Sql.Linq2Db/Util/NoThrowAwaiter.cs @@ -0,0 +1,20 @@ +using System; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + +namespace Akka.Persistence.Sql.Linq2Db.Utility +{ + /// + /// An awaiter that never throws. + /// + internal struct NoThrowAwaiter : ICriticalNotifyCompletion + { + private readonly Task _task; + public NoThrowAwaiter(Task task) { _task = task; } + public NoThrowAwaiter GetAwaiter() => this; + public bool IsCompleted => _task.IsCompleted; + public void GetResult() { } + public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation); + public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation); + } +} \ No newline at end of file From 1432e22e8e2ad554186dbfec9c29ef251a4cde5f Mon Sep 17 00:00:00 2001 From: Drew Date: Mon, 5 Jul 2021 17:52:48 -0400 Subject: [PATCH 4/5] update readme.md (#26) --- README.md | 32 ++++++++++++++++--- src/Akka.Persistence.Sql.Linq2Db/Readme.MD | 30 +++++++++++++++-- .../persistence.conf | 2 +- 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 033c41e3..dc2bb158 100644 --- a/README.md +++ b/README.md @@ -119,19 +119,21 @@ Updated Performance numbers pending. ### Journal: -Please note that you -must- provide a Connection String and Provider name. +Please note that you -must- provide a Connection String (`connection-string`) and Provider name (`provider-name`). - Refer to the Members of `LinqToDb.ProviderName` for included providers. - Note: For best performance, one should use the most specific provider name possible. i.e. `LinqToDB.ProviderName.SqlServer2012` instead of `LinqToDB.ProviderName.SqlServer`. Otherwise certain provider detections have to run more frequently which may impair performance slightly. - `parallelism` controls the number of Akka.Streams Queues used to write to the DB. - - Default in JVM is `8`. We use `2` - - For SQL Server, Based on testing `2` is a fairly optimal number in .NET and thusly chosen as the default. You may wish to adjust up if you are dealing with a large number of actors. + - Default in JVM is `8`. We use `3` + - For SQL Server, Based on testing `3` is a fairly optimal number in .NET and thusly chosen as the default. You may wish to adjust up if you are dealing with a large number of actors. - Testing indicates that `2` will provide performance on par or better than both batching and non-batching journal. - For SQLite, you may want to just put `1` here, because SQLite allows at most a single writer at a time even in WAL mode. - Keep in mind there may be some latency/throughput trade-offs if your write-set gets large. - - Note that these run on the threadpool, not on dedicated threads. Setting this number too high may steal work from other actors. - - It's worth noting that LinqToDb's Bulk Copy implementations are -very- efficient here, since under many DBs the batch can be done in a single round-trip. + - Note that unless `materializer-dispatcher` is changed, by default these run on the threadpool, not on dedicated threads. Setting this number too high may steal work from other actors. + - It's worth noting that LinqToDb's Bulk Copy implementations are very efficient here, since under many DBs the batch can be done in a single async round-trip. +- `materializer-dispatcher` may be used to change the dispatcher that the Akka.Streams Queues use for scheduling. + - You can define a different dispatcher here if worried about stealing from the thread-pool, for instance a Dedicated thread-pool dispatcher. - `logical-delete` if `true` will only set the deleted flag for items, i.e. will not actually delete records from DB. - if `false` all records are set as deleted, and then all but the top record is deleted. This top record is used for sequence number tracking in case no other records exist in the table. - `delete-compatibility-mode` specifies to perform deletes in a way that is compatible with Akka.Persistence.Sql.Common. @@ -140,6 +142,14 @@ Please note that you -must- provide a Connection String and Provider name. - `use-clone-connection` is a bit of a hack. Currently Linq2Db has a performance penalty for custom mapping schemas. Cloning the connection is faster but may not work for all scenarios. - tl;dr - If a password or similar is in the connection string, leave `use-clone-connection` set to `false`. - If you don't have a password or similar, run some tests with it set to `true`. You'll see improved write and read performance. +- Batching options: + - `batch-size` controls the maximum size of the batch used in the Akka.Streams Batch. A single batch is written to the DB in a transaction, with 1 or more round trips. + - If more than `batch-size` is in a single `AtomicWrite`, That atomic write will still be atomic, just treated as it's own batch. + - `db-round-trip-max-batch-size` tries to hint to Linq2Db multirow insert the maximum number of rows to send in a round-trip to the DB. + - multiple round-trips will still be contained in a single transaction. + - You will want to Keep this number higher than `batch-size`, if you are persisting lots of events with `PersistAll/(Async)`. + - `prefer-parameters-on-multirow-insert` controls whether Linq2Db will try to use parameters instead of building raw strings for inserts. + - Linq2Db is incredibly speed and memory efficent at building binary strings. In most cases, this will be faster than the cost of parsing/marshalling parameters by ADO and the DB. - For Table Configuration: - Note that Tables/Columns will be created with the casing provided, and selected in the same way (i.e. if using a DB with case sensitive columns, be careful!) @@ -188,6 +198,18 @@ akka.persistence { #larger batches may be better if you have A fast/local DB. batch-size = 100 + #This batch size controls the maximum number of rows that will be sent + #In a single round trip to the DB. This is different than the -actual- batch size, + #And intentionally set larger than batch-size, + #to help atomicwrites be faster + #Note that Linq2Db may use a lower number per round-trip in some cases. + db-round-trip-max-batch-size = 1000 + + #Linq2Db by default will use a built string for multi-row inserts + #Somewhat counterintuitively, this is faster than using parameters in most cases, + #But if you would prefer parameters, you can set this to true. + prefer-parameters-on-multirow-insert = true + # Denotes the number of messages retrieved # Per round-trip to DB on recovery. # This is to limit both size of dataset from DB (possibly lowering locking requirements) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Readme.MD b/src/Akka.Persistence.Sql.Linq2Db/Readme.MD index 58891cee..fe826cff 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Readme.MD +++ b/src/Akka.Persistence.Sql.Linq2Db/Readme.MD @@ -149,8 +149,10 @@ Please note that you -must- provide a Connection String and Provider name. - Testing indicates that `2` will provide performance on par or better than both batching and non-batching journal. - For SQLite, you may want to just put `1` here, because SQLite allows at most a single writer at a time even in WAL mode. - Keep in mind there may be some latency/throughput trade-offs if your write-set gets large. - - Note that these run on the threadpool, not on dedicated threads. Setting this number too high may steal work from other actors. - - It's worth noting that LinqToDb's Bulk Copy implementations are -very- efficient here, since under many DBs the batch can be done in a single async round-trip. + - Note that unless `materializer-dispatcher` is changed, by default these run on the threadpool, not on dedicated threads. Setting this number too high may steal work from other actors. + - It's worth noting that LinqToDb's Bulk Copy implementations are very efficient here, since under many DBs the batch can be done in a single async round-trip. + - `materializer-dispatcher` may be used to change the dispatcher that the Akka.Streams Queues use for scheduling. + - You can define a different dispatcher here if worried about stealing from the thread-pool, for instance a Dedicated thread-pool dispatcher. - `logical-delete` if `true` will only set the deleted flag for items, i.e. will not actually delete records from DB. - if `false` all records are set as deleted, and then all but the top record is deleted. This top record is used for sequence number tracking in case no other records exist in the table. - `delete-compatibility-mode` specifies to perform deletes in a way that is compatible with Akka.Persistence.Sql.Common. @@ -159,6 +161,16 @@ Please note that you -must- provide a Connection String and Provider name. - `use-clone-connection` is a bit of a hack. Currently Linq2Db has a performance penalty for custom mapping schemas. Cloning the connection is faster but may not work for all scenarios. - tl;dr - If a password or similar is in the connection string, leave `use-clone-connection` set to `false`. - If you don't have a password or similar, run some tests with it set to `true`. You'll see improved write and read performance. + - Batching options: + - `batch-size` controls the maximum size of the batch used in the Akka.Streams Batch. A single batch is written to the DB in a transaction, with 1 or more round trips. + - If more than `batch-size` is in a single `AtomicWrite`, That atomic write will still be atomic, just treated as it's own batch. + - `db-round-trip-max-batch-size` tries to hint to Linq2Db multirow insert the maximum number of rows to send in a round-trip to the DB. + - multiple round-trips will still be contained in a single transaction. + - You will want to Keep this number higher than `batch-size`, if you are persisting lots of events with `PersistAll/(Async)`. + - `prefer-parameters-on-multirow-insert` controls whether Linq2Db will try to use parameters instead of building raw strings for inserts. + - Linq2Db is incredibly speed and memory efficent at building binary strings. In most cases, this will be faster than the cost of parsing/marshalling parameters by ADO and the DB. + + - For Table Configuration: - Note that Tables/Columns will be created with the casing provided, and selected in the same way (i.e. if using a DB with case sensitive columns, be careful!) @@ -206,12 +218,24 @@ akka.persistence { #For that penalty. buffer-size = 5000 - #Batch size refers to the number of items included in a batch to DB + #Batch size refers to the maximum number of items included in a batch (transaction) to DB #JDBC Default is/was 400 but testing against scenarios indicates #100 is better for overall latency. That said, #larger batches may be better if you have A fast/local DB. batch-size = 100 + #This batch size controls the maximum number of rows that will be sent + #In a single round trip to the DB. This is different than the -actual- batch size, + #And intentionally set larger than batch-size, + #to help atomicwrites be faster + #Note that Linq2Db may use a lower number per round-trip in some cases. + db-round-trip-max-batch-size = 1000 + + #Linq2Db by default will use a built string for multi-row inserts + #Somewhat counterintuitively, this is faster than using parameters in most cases, + #But if you would prefer parameters, you can set this to true. + prefer-parameters-on-multirow-insert = true + # Denotes the number of messages retrieved # Per round-trip to DB on recovery. # This is to limit both size of dataset from DB (possibly lowering locking requirements) diff --git a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf index b0000294..988b0b72 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf +++ b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf @@ -23,7 +23,7 @@ # note that there is a performance penalty for using this. delete-compatibility-mode = true - # If "sqlite" or "sqlserver", default column names are compatible with + # If "sqlite", "sqlserver", or "postgres", default column names are compatible with # Akka.Persistence.Sql Default Column names. table-compatibility-mode = null From 644f270056ff97a5e511020abc9e6f6a561bcccc Mon Sep 17 00:00:00 2001 From: Drew Date: Mon, 5 Jul 2021 18:03:09 -0400 Subject: [PATCH 5/5] release notes (#27) Co-authored-by: Aaron Stannard --- RELEASE_NOTES.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 6c85f6d8..11a65b86 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,20 @@ +#### 1.4.21 July 6 2021 #### +**First official Release for Akka.Persistence.Linq2Db** + +Akka.Persistence.Linq2Db is an Akka.Net Persistence plug-in that is designed for both high performance as well as easy cross-database compatibility. + +There is a compatibility mode also available for those who wish to migrate from the existing Sql.Common journals. + +This release contains fixes for Transactions around batched writes, a fix for Sequence Number reading on Aggressive PersistAsync Usage, improved serialization/deserialization pipelines for improved write speed, and easier snapshot compatibility with Akka.Persistence.Sql.Common. + +We are still looking for community help with adding tests/configurations for MySql and Oracle, as well as trying out the new plugin and [providing feedback](https://github.com/akkadotnet/Akka.Persistence.Linq2Db/issues). + +[Please refer to the project page](https://github.com/akkadotnet/Akka.Persistence.Linq2Db/) for information on configuration. + + + + + #### 0.90.1 Feb 3 2021 #### **Preview Release for Akka.Persistence.Linq2Db**