diff --git a/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj
new file mode 100644
index 00000000..0b02836b
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj
@@ -0,0 +1,31 @@
+
+
+
+ netcoreapp3.1
+
+ false
+
+
+
+ false
+
+
+
+
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+
+
+
+
+
+
+
diff --git a/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/PostgreSQLSpecsFixture.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/PostgreSQLSpecsFixture.cs
new file mode 100644
index 00000000..71e39ccf
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/PostgreSQLSpecsFixture.cs
@@ -0,0 +1,11 @@
+using Akka.Persistence.Sql.Linq2Db.Tests.Docker;
+using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker;
+using Xunit;
+
+namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.Linq2Db
+{
+ [CollectionDefinition("PostgreSQLSpec")]
+ public sealed class PostgreSQLSpecsFixture : ICollectionFixture
+ {
+ }
+}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs
similarity index 93%
rename from src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs
rename to Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs
index 376eff29..e7072e06 100644
--- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs
+++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs
@@ -1,10 +1,10 @@
using Akka.Configuration;
-using Akka.Persistence.Sql.Linq2Db.Tests.Docker;
+using Akka.Persistence.Linq2Db.BenchmarkTests;
using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker;
using Xunit;
using Xunit.Abstractions;
-namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.SqlCommon
+namespace Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.SqlCommon
{
[Collection("SqlServerSpec")]
public class DockerBatchingSqlServerJournalPerfSpec : L2dbJournalPerfSpec
diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs
similarity index 93%
rename from src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs
rename to Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs
index b0a17968..95183d4e 100644
--- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs
+++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerPostgreSQLJournalPerfSpec.cs
@@ -1,10 +1,10 @@
using Akka.Configuration;
-using Akka.Persistence.Sql.Linq2Db.Tests.Docker;
+using Akka.Persistence.Linq2Db.BenchmarkTests;
using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker;
using Xunit;
using Xunit.Abstractions;
-namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.SqlCommon
+namespace Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.SqlCommon
{
[Collection("PostgreSQLSpec")]
public class DockerPostgreSQLJournalPerfSpec : L2dbJournalPerfSpec
diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerSqlServerJournalPerfSpec.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs
similarity index 93%
rename from src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerSqlServerJournalPerfSpec.cs
rename to Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs
index c2fd1d62..b1f72492 100644
--- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/SqlCommon/DockerSqlServerJournalPerfSpec.cs
+++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs
@@ -1,10 +1,10 @@
using Akka.Configuration;
-using Akka.Persistence.Sql.Linq2Db.Tests.Docker;
+using Akka.Persistence.Linq2Db.BenchmarkTests;
using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker;
using Xunit;
using Xunit.Abstractions;
-namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.SqlCommon
+namespace Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.SqlCommon
{
[Collection("SqlServerSpec")]
public class DockerSqlServerJournalPerfSpec : L2dbJournalPerfSpec
diff --git a/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlServerSpecsFixture.cs b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlServerSpecsFixture.cs
new file mode 100644
index 00000000..6f2bf65a
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlServerSpecsFixture.cs
@@ -0,0 +1,11 @@
+using Akka.Persistence.Sql.Linq2Db.Tests.Docker;
+using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker;
+using Xunit;
+
+namespace Akka.Persistence.Linq2Db.BenchmarkTests.Docker.Linq2Db
+{
+ [CollectionDefinition("SqlServerSpec")]
+ public sealed class SqlServerSpecsFixture : ICollectionFixture
+ {
+ }
+}
\ No newline at end of file
diff --git a/Akka.Persistence.Linq2Db.sln b/Akka.Persistence.Linq2Db.sln
index 770be733..f84ac72e 100644
--- a/Akka.Persistence.Linq2Db.sln
+++ b/Akka.Persistence.Linq2Db.sln
@@ -31,8 +31,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "_", "_", "{20C26B2D-59EA-42
ProjectSection(SolutionItems) = preProject
src\common.props = src\common.props
README.md = README.md
+ RELEASE_NOTES.md = RELEASE_NOTES.md
EndProjectSection
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests", "Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests\Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj", "{170698FA-DA1E-40BC-896D-AFA67976C0EB}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -71,6 +74,10 @@ Global
{095E392C-C439-4EBC-ABCB-6AA87FCBD9E9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{095E392C-C439-4EBC-ABCB-6AA87FCBD9E9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{095E392C-C439-4EBC-ABCB-6AA87FCBD9E9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/README.md b/README.md
index b18528e7..dc2bb158 100644
--- a/README.md
+++ b/README.md
@@ -2,13 +2,13 @@
A Cross-SQL-DB Engine Akka.Persistence plugin with broad database compatibility thanks to Linq2Db.
-This is a Fairly-Naive port of the amazing akka-persistence-jdbc package from Scala to C#.
+This is a port of the amazing akka-persistence-jdbc package from Scala, with a few improvements based on C# as well as our choice of data library.
Please read the documentation carefully. Some features may be specific to use case and have trade-offs (namely, compatibility modes)
## Status
-- Usable for basic Read/Writes
+
- Implements the following for `Akka.Persistence.Query`:
- IPersistenceIdsQuery
- ICurrentPersistenceIdsQuery
@@ -20,9 +20,7 @@ Please read the documentation carefully. Some features may be specific to use ca
- ICurrentAllEventsQuery
- Snapshot Store Support
-#### This is still a WORK IN PROGRESS
-
-**Pull Requests are Welcome** but please note this is still considered 'work in progress' and only used if one understands the risks. While the TCK Specs pass you should still test in a 'safe' non-production environment carefully before deciding to fully deploy.
+See something you want to add or improve? **Pull Requests are Welcome!**
Working:
@@ -102,27 +100,8 @@ Compatibility with existing Providers is partially implemented via `table-compat
# Performance
-Tests based on i-7 8750H, 32GB Ram, 2TB SSD, Windows 10 version Version 10.0.19041.630.
-Databases running on Docker WSL2.
-
-All numbers are in msg/sec.
-
-|Test |SqlServer (normal) | SqlServer Batching | Linq2Db |vs Normal| vs Batching|
-|:------------- |:------------- | :----------: | -----------: | -----------: | -----------: |
-|Persist | 164|427| 235|143.29%|55.04%|
-|PersistAll | 782|875| 5609|717.26%|641.03%|
-|PersistAsync | 630|846| 16099|2555.40%|1902.96%|
-|PersistAllAsync | 2095|902| 15681|748.50%|1738.47%|
-|PersistGroup10 | 590|680| 1069|181.19%|157.21%|
-|PersistGroup100 | 607|965| 5537|912.19%|573.78%|
-|PersistGroup200 | 628|1356| 7966|1268.47%|587.46%|
-|PersistGroup25 | 629|675| 2189|348.01%|324.30%|
-|PersistGroup400 | 612|1011| 7237|1182.52%|715.83%|
-|PersistGroup50 | 612|654| 3867|631.86%|591.28%|
-|Recovering | 41903|38766| 42592|101.64%|109.87%|
-|Recovering8 | 75466|65515| 63960|84.75%|97.63%|
-|RecoveringFour | 59259|51355| 58437|98.61%|113.79%|
-|RecoveringTwo | 41745|35512| 41108|98.47%|115.76%|
+Updated Performance numbers pending.
+
## Sql.Common Compatibility modes
- Delete Compatibility mode is available.
@@ -140,19 +119,21 @@ All numbers are in msg/sec.
### Journal:
-Please note that you -must- provide a Connection String and Provider name.
+Please note that you -must- provide a Connection String (`connection-string`) and Provider name (`provider-name`).
- Refer to the Members of `LinqToDb.ProviderName` for included providers.
- Note: For best performance, one should use the most specific provider name possible. i.e. `LinqToDB.ProviderName.SqlServer2012` instead of `LinqToDB.ProviderName.SqlServer`. Otherwise certain provider detections have to run more frequently which may impair performance slightly.
- `parallelism` controls the number of Akka.Streams Queues used to write to the DB.
- - Default in JVM is `8`. We use `2`
- - For SQL Server, Based on testing `2` is a fairly optimal number in .NET and thusly chosen as the default. You may wish to adjust up if you are dealing with a large number of actors.
+ - Default in JVM is `8`. We use `3`
+ - For SQL Server, Based on testing `3` is a fairly optimal number in .NET and thusly chosen as the default. You may wish to adjust up if you are dealing with a large number of actors.
- Testing indicates that `2` will provide performance on par or better than both batching and non-batching journal.
- For SQLite, you may want to just put `1` here, because SQLite allows at most a single writer at a time even in WAL mode.
- Keep in mind there may be some latency/throughput trade-offs if your write-set gets large.
- - Note that these run on the threadpool, not on dedicated threads. Setting this number too high may steal work from other actors.
- - It's worth noting that LinqToDb's Bulk Copy implementations are -very- efficient here, since under many DBs the batch can be done in a single round-trip.
+ - Note that unless `materializer-dispatcher` is changed, by default these run on the threadpool, not on dedicated threads. Setting this number too high may steal work from other actors.
+ - It's worth noting that LinqToDb's Bulk Copy implementations are very efficient here, since under many DBs the batch can be done in a single async round-trip.
+- `materializer-dispatcher` may be used to change the dispatcher that the Akka.Streams Queues use for scheduling.
+ - You can define a different dispatcher here if worried about stealing from the thread-pool, for instance a Dedicated thread-pool dispatcher.
- `logical-delete` if `true` will only set the deleted flag for items, i.e. will not actually delete records from DB.
- if `false` all records are set as deleted, and then all but the top record is deleted. This top record is used for sequence number tracking in case no other records exist in the table.
- `delete-compatibility-mode` specifies to perform deletes in a way that is compatible with Akka.Persistence.Sql.Common.
@@ -161,6 +142,14 @@ Please note that you -must- provide a Connection String and Provider name.
- `use-clone-connection` is a bit of a hack. Currently Linq2Db has a performance penalty for custom mapping schemas. Cloning the connection is faster but may not work for all scenarios.
- tl;dr - If a password or similar is in the connection string, leave `use-clone-connection` set to `false`.
- If you don't have a password or similar, run some tests with it set to `true`. You'll see improved write and read performance.
+- Batching options:
+ - `batch-size` controls the maximum size of the batch used in the Akka.Streams Batch. A single batch is written to the DB in a transaction, with 1 or more round trips.
+ - If more than `batch-size` is in a single `AtomicWrite`, That atomic write will still be atomic, just treated as it's own batch.
+ - `db-round-trip-max-batch-size` tries to hint to Linq2Db multirow insert the maximum number of rows to send in a round-trip to the DB.
+ - multiple round-trips will still be contained in a single transaction.
+ - You will want to Keep this number higher than `batch-size`, if you are persisting lots of events with `PersistAll/(Async)`.
+ - `prefer-parameters-on-multirow-insert` controls whether Linq2Db will try to use parameters instead of building raw strings for inserts.
+ - Linq2Db is incredibly speed and memory efficent at building binary strings. In most cases, this will be faster than the cost of parsing/marshalling parameters by ADO and the DB.
- For Table Configuration:
- Note that Tables/Columns will be created with the casing provided, and selected in the same way (i.e. if using a DB with case sensitive columns, be careful!)
@@ -209,6 +198,18 @@ akka.persistence {
#larger batches may be better if you have A fast/local DB.
batch-size = 100
+ #This batch size controls the maximum number of rows that will be sent
+ #In a single round trip to the DB. This is different than the -actual- batch size,
+ #And intentionally set larger than batch-size,
+ #to help atomicwrites be faster
+ #Note that Linq2Db may use a lower number per round-trip in some cases.
+ db-round-trip-max-batch-size = 1000
+
+ #Linq2Db by default will use a built string for multi-row inserts
+ #Somewhat counterintuitively, this is faster than using parameters in most cases,
+ #But if you would prefer parameters, you can set this to true.
+ prefer-parameters-on-multirow-insert = true
+
# Denotes the number of messages retrieved
# Per round-trip to DB on recovery.
# This is to limit both size of dataset from DB (possibly lowering locking requirements)
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 6c85f6d8..11a65b86 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -1,3 +1,20 @@
+#### 1.4.21 July 6 2021 ####
+**First official Release for Akka.Persistence.Linq2Db**
+
+Akka.Persistence.Linq2Db is an Akka.Net Persistence plug-in that is designed for both high performance as well as easy cross-database compatibility.
+
+There is a compatibility mode also available for those who wish to migrate from the existing Sql.Common journals.
+
+This release contains fixes for Transactions around batched writes, a fix for Sequence Number reading on Aggressive PersistAsync Usage, improved serialization/deserialization pipelines for improved write speed, and easier snapshot compatibility with Akka.Persistence.Sql.Common.
+
+We are still looking for community help with adding tests/configurations for MySql and Oracle, as well as trying out the new plugin and [providing feedback](https://github.com/akkadotnet/Akka.Persistence.Linq2Db/issues).
+
+[Please refer to the project page](https://github.com/akkadotnet/Akka.Persistence.Linq2Db/) for information on configuration.
+
+
+
+
+
#### 0.90.1 Feb 3 2021 ####
**Preview Release for Akka.Persistence.Linq2Db**
diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml
index d1647f98..439bc99b 100644
--- a/build-system/windows-release.yaml
+++ b/build-system/windows-release.yaml
@@ -10,6 +10,8 @@ trigger:
include:
- refs/tags/*
+pr: none
+
variables:
- group: signingSecrets #create this group with SECRET variables `signingUsername` and `signingPassword`
- group: nugetKeys #create this group with SECRET variables `nugetKey`
diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj
index 506f3ae7..74ee4282 100644
--- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj
+++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj
@@ -13,12 +13,12 @@
-
-
-
+
+
+
-
+
diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs
index 3c5eb7eb..daf873c5 100644
--- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs
+++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs
@@ -33,6 +33,7 @@ class = ""{0}""
#connection-string = ""FullUri=file:test.db&cache=shared""
provider-name = """ + LinqToDB.ProviderName.SqlServer2017 + @"""
use-clone-connection = true
+ #prefer-parameters-on-multirow-insert = true
tables.journal {{
auto-init = true
warn-on-auto-init-fail = false
diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj
index 378a9226..e74918f5 100644
--- a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj
+++ b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj
@@ -16,12 +16,12 @@
-
-
-
+
+
+
-
+
diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs
index be9d7899..8bb31b23 100644
--- a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs
+++ b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs
@@ -136,6 +136,7 @@ internal void MeasureGroup(Func msg, Action block, int numMsg,
{
var measurements = new List(MeasurementIterations);
+ block();
block(); //warm-up
int i = 0;
@@ -550,7 +551,7 @@ public void PersistenceActor_performance_must_measure_Recovering8()
FeedAndExpectLastSpecific(p6, "p", Commands);
FeedAndExpectLastSpecific(p7, "p", Commands);
FeedAndExpectLastSpecific(p8, "p", Commands);
- MeasureGroup(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms", () =>
+ MeasureGroup(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms , {(EventsCount*8 / d.TotalMilliseconds) * 1000} total msg/sec", () =>
{
var task1 = Task.Run(()=>
{
diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj
index bbb72821..342de0a8 100644
--- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj
+++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj
@@ -12,8 +12,8 @@
-
-
+
+
diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj
index 68626e8d..249d0002 100644
--- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj
+++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj
@@ -9,8 +9,8 @@
-
-
+
+
diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs
index 97789cc8..c442f370 100644
--- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs
+++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs
@@ -50,6 +50,7 @@ public async Task Can_Recover_SqlCommon_Journal()
persistRef.Tell(new SomeEvent(){EventName = "rec-test", Guid = ourGuid, Number = 1});
Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result);
await persistRef.GracefulStop(TimeSpan.FromSeconds(5));
+ await Task.Delay(1000);
persistRef = sys1.ActorOf(Props.Create(() =>
new JournalCompatActor(NewJournal,
"p-1")), "test-recover-1");
@@ -90,6 +91,7 @@ public async Task SqlCommon_Journal_Can_Recover_L2Db_Journal()
persistRef.Tell(new SomeEvent(){EventName = "rec-test", Guid = ourGuid, Number = 1});
Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result);
await persistRef.GracefulStop(TimeSpan.FromSeconds(5));
+ await Task.Delay(1000);
persistRef = sys1.ActorOf(Props.Create(() =>
new JournalCompatActor(OldJournal,
"p-3")), "test-recover-2");
diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs
index b4726c7c..77584e8b 100644
--- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs
+++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs
@@ -44,6 +44,7 @@ public async Task Can_Recover_SqlCommon_Snapshot()
Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result);
await Task.Delay(TimeSpan.FromSeconds(2));
await persistRef.GracefulStop(TimeSpan.FromSeconds(5));
+ await Task.Delay(TimeSpan.FromSeconds(2));
persistRef = sys1.ActorOf(Props.Create(() =>
new SnapshotCompatActor(NewSnapshot,
"p-1")), "test-snap-recover-1");
@@ -63,6 +64,7 @@ public async Task Can_Persist_SqlCommon_Snapshot()
Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result);
await Task.Delay(TimeSpan.FromSeconds(2));
await persistRef.GracefulStop(TimeSpan.FromSeconds(5));
+ await Task.Delay(TimeSpan.FromSeconds(2));
persistRef = sys1.ActorOf(Props.Create(() =>
new SnapshotCompatActor(NewSnapshot,
"p-2")), "test-snap-persist-1");
@@ -86,6 +88,7 @@ public async Task SqlCommon_Snapshot_Can_Recover_L2Db_Snapshot()
Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result);
await Task.Delay(TimeSpan.FromSeconds(2));
await persistRef.GracefulStop(TimeSpan.FromSeconds(5));
+ await Task.Delay(TimeSpan.FromSeconds(2));
persistRef = sys1.ActorOf(Props.Create(() =>
new SnapshotCompatActor(OldSnapshot,
"p-3")), "test-snap-recover-2");
@@ -105,6 +108,7 @@ public async Task SqlCommon_Snapshot_Can_Persist_L2db_Snapshot()
Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result);
await Task.Delay(TimeSpan.FromSeconds(2));
await persistRef.GracefulStop(TimeSpan.FromSeconds(5));
+ await Task.Delay(TimeSpan.FromSeconds(2));
persistRef = sys1.ActorOf(Props.Create(() =>
new SnapshotCompatActor(OldSnapshot,
"p-4")), "test-snap-persist-2");
diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs
index 92d9649d..43a97dfc 100644
--- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs
+++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs
@@ -51,12 +51,13 @@ class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}""
provider-name = """ + LinqToDB.ProviderName.SQLiteMS + $@"""
#use-clone-connection = true
table-compatibility-mode = sqlite
+ table-name = ""{tablename}""
tables
{{
snapshot {{
auto-init = true
warn-on-auto-init-fail = false
- table-name = ""{tablename}""
+ #table-name = ""{tablename}""
}}
}}
}}
@@ -92,11 +93,12 @@ class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}""
#connection-string = ""FullUri=file:test.db&cache=shared""
provider-name = ""{LinqToDB.ProviderName.SQLiteMS}""
parallelism = 3
+ table-name = ""{tablename}""
table-compatibility-mode = ""sqlite""
tables.journal {{
auto-init = true
warn-on-auto-init-fail = false
- table-name = ""{tablename}""
+ #table-name = ""{tablename}""
metadata-table-name = ""{metadatatablename}""
}}
diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj
index 476d40ec..b2c6f4b5 100644
--- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj
+++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj
@@ -8,10 +8,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj
index fc69ab38..1ff734b6 100644
--- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj
+++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj
@@ -12,16 +12,16 @@
-
-
-
-
-
-
+
+
+
+
+
+
-
+
-
+
diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs
index dd9f6b94..130c5c86 100644
--- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs
@@ -15,7 +15,8 @@ namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker
///
public class PostgreSQLFixture : IAsyncLifetime
{
- protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}";
+ protected readonly string PostgresContainerName = $"postgresSqlServer-{Guid.NewGuid():N}";
+ //protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}";
protected DockerClient Client;
public PostgreSQLFixture()
@@ -31,34 +32,33 @@ public PostgreSQLFixture()
Client = config.CreateClient();
}
- protected string PostgreSQLImageName
- {
- get
- {
- //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
- // return "microsoft/mssql-server-windows-express";
- return "postgres";
- }
- }
+
+ protected string ImageName => "postgres";
+ protected string Tag => "latest";
- protected string SqlServerImageTag
- {
- get
- {
- //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
- // return "2017-latest";
- return "latest";
- }
- }
+ protected string PostgresImageName => $"{ImageName}:{Tag}";
public string ConnectionString { get; private set; }
public async Task InitializeAsync()
{
- var images = await Client.Images.ListImagesAsync(new ImagesListParameters {MatchName = PostgreSQLImageName});
+ var images = await Client.Images.ListImagesAsync(new ImagesListParameters
+ {
+ Filters = new Dictionary>
+ {
+ {
+ "reference",
+ new Dictionary
+ {
+ {PostgresImageName, true}
+ }
+ }
+ }
+ });
+
if (images.Count == 0)
await Client.Images.CreateImageAsync(
- new ImagesCreateParameters {FromImage = PostgreSQLImageName, Tag = "latest"}, null,
+ new ImagesCreateParameters { FromImage = ImageName, Tag = Tag }, null,
new Progress(message =>
{
Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage)
@@ -66,13 +66,13 @@ await Client.Images.CreateImageAsync(
: $"{message.ID} {message.Status} {message.ProgressMessage}");
}));
- var postgresPort = ThreadLocalRandom.Current.Next(9000, 10000);
+ var sqlServerHostPort = ThreadLocalRandom.Current.Next(9000, 10000);
// create the container
await Client.Containers.CreateContainerAsync(new CreateContainerParameters
{
- Image = PostgreSQLImageName,
- Name = PostgreSqlImageName,
+ Image = PostgresImageName,
+ Name = PostgresContainerName,
Tty = true,
ExposedPorts = new Dictionary
{
@@ -88,37 +88,44 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters
{
new PortBinding
{
- HostPort = $"{postgresPort}"
+ HostPort = $"{sqlServerHostPort}"
}
}
}
}
},
- Env = new[] {"POSTGRES_PASSWORD=l0lTh1sIsOpenSource"}
+ Env = new[]
+ {
+ "POSTGRES_PASSWORD=postgres",
+ "POSTGRES_USER=postgres"
+ }
});
// start the container
- await Client.Containers.StartContainerAsync(PostgreSqlImageName, new ContainerStartParameters());
+ await Client.Containers.StartContainerAsync(PostgresContainerName, new ContainerStartParameters());
- // Provide a 30 second startup delay
- await Task.Delay(TimeSpan.FromSeconds(30));
+ // Provide a 10 second startup delay
+ await Task.Delay(TimeSpan.FromSeconds(10));
- var connectionString = new NpgsqlConnectionStringBuilder()
- {
- Host = "localhost", Password = "l0lTh1sIsOpenSource",
- Username = "postgres", Database = "postgres",
- Port = postgresPort
- };
+ ConnectionString = $"Server=127.0.0.1;Port={sqlServerHostPort};" +
+ "Database=postgres;User Id=postgres;Password=postgres";
- ConnectionString = connectionString.ToString();
+ //var connectionString = new NpgsqlConnectionStringBuilder()
+ //{
+ // Host = "localhost", Password = "l0lTh1sIsOpenSource",
+ // Username = "postgres", Database = "postgres",
+ // Port = sqlServerHostPort
+ //};
+ //
+ //ConnectionString = connectionString.ToString();
}
public async Task DisposeAsync()
{
if (Client != null)
{
- await Client.Containers.StopContainerAsync(PostgreSqlImageName, new ContainerStopParameters());
- await Client.Containers.RemoveContainerAsync(PostgreSqlImageName,
+ await Client.Containers.StopContainerAsync(PostgresContainerName, new ContainerStopParameters());
+ await Client.Containers.RemoveContainerAsync(PostgresContainerName,
new ContainerRemoveParameters {Force = true});
Client.Dispose();
}
diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs
index 1737bc23..e04f9267 100644
--- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs
@@ -31,34 +31,35 @@ public SqlServerFixture()
Client = config.CreateClient();
}
- protected string SqlServerImageName
- {
- get
- {
- //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
- // return "microsoft/mssql-server-windows-express";
- return "mcr.microsoft.com/mssql/server";
- }
- }
-
- protected string SqlServerImageTag
- {
- get
- {
- //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
- // return "2017-latest";
- return "2017-latest-ubuntu";
- }
- }
+ protected string ImageName =>
+ //RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
+ // ? "microsoft/mssql-server-windows-express"
+ // :
+ "mcr.microsoft.com/mssql/server";
+ protected string Tag => "2017-latest";
+ protected string SqlServerImageName => $"{ImageName}:{Tag}";
public string ConnectionString { get; private set; }
public async Task InitializeAsync()
{
- var images = await Client.Images.ListImagesAsync(new ImagesListParameters {MatchName = SqlServerImageName});
+ var images = await Client.Images.ListImagesAsync(new ImagesListParameters
+ {
+ Filters = new Dictionary>
+ {
+ {
+ "reference",
+ new Dictionary
+ {
+ {SqlServerImageName, true}
+ }
+ }
+ }
+ });
+
if (images.Count == 0)
await Client.Images.CreateImageAsync(
- new ImagesCreateParameters {FromImage = SqlServerImageName, Tag = "latest"}, null,
+ new ImagesCreateParameters {FromImage = ImageName, Tag = Tag}, null,
new Progress(message =>
{
Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage)
@@ -107,7 +108,7 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters
var connectionString = new DbConnectionStringBuilder
{
ConnectionString =
- "data source=.;database=akka_persistence_tests;user id=sa;password=l0lTh1sIsOpenSource;"
+ "data source=.;database=akka_persistence_tests;user id=sa;password=l0lTh1sIsOpenSource"
};
connectionString["Data Source"] = $"localhost,{sqlServerHostPort}";
diff --git a/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj b/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj
index 2cacaa68..904f2731 100644
--- a/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj
+++ b/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj
@@ -11,16 +11,16 @@
-
-
-
-
-
-
+
+
+
+
+
+
-
+
-
+
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj b/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj
index ea42a61f..e6a1c1b0 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj
+++ b/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj
@@ -3,18 +3,18 @@
- 8
+ default
$(LibraryFramework)
An Akka Persistence Module for SQL Databases using Linq2Db.
-
-
-
-
+
+
+
+
-
+
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs
index e3dbce24..0410df3d 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs
@@ -9,6 +9,10 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config)
BufferSize = config.GetInt("buffer-size", 5000);
BatchSize = config.GetInt("batch-size", 100);
+ DbRoundTripBatchSize = config.GetInt("db-round-trip-max-batch-size", 1000);
+ PreferParametersOnMultiRowInsert =
+ config.GetBoolean("prefer-parameters-on-multirow-insert",
+ false);
ReplayBatchSize = config.GetInt("replay-batch-size", 1000);
Parallelism = config.GetInt("parallelism", 2);
LogicalDelete = config.GetBoolean("logical-delete", false);
@@ -17,6 +21,10 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config)
config.GetBoolean("delete-compatibility-mode", true);
}
+ public bool PreferParametersOnMultiRowInsert { get; set; }
+
+ public int DbRoundTripBatchSize { get; set; }
+
///
/// Specifies the batch size at which point
/// will switch to 'Default' instead of 'MultipleRows'. For smaller sets
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs
index ac4246ae..1b39ddda 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs
@@ -9,12 +9,11 @@ public class SnapshotTableConfiguration
{
public SnapshotTableConfiguration(Configuration.Config config)
{
- config =
- config.SafeWithFallback(Linq2DbSnapshotStore
- .DefaultConfiguration);
- var localcfg = config.GetConfig("tables.snapshot");
+
+ var localcfg = config.GetConfig("tables.snapshot")
+ .SafeWithFallback(config).SafeWithFallback(Configuration.Config.Empty);
ColumnNames= new SnapshotTableColumnNames(config);
- TableName = localcfg.GetString("table-name", "snapshot");
+ TableName = config.GetString("table-name", localcfg.GetString("table-name", "snapshot"));
SchemaName = localcfg.GetString("schema-name", null);
AutoInitialize = localcfg.GetBoolean("auto-init", false);
WarnOnAutoInitializeFail =
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs
index 3afe256c..d80fc3bc 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
+using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -37,6 +38,7 @@ public abstract class BaseByteArrayJournalDao :
public bool logicalDelete;
protected readonly ILoggingAdapter _logger;
private Flow, long)>, NotUsed> deserializeFlow;
+ private Flow, NotUsed> deserializeFlowMapped;
protected BaseByteArrayJournalDao(IAdvancedScheduler sched,
IMaterializer materializerr,
@@ -49,6 +51,7 @@ protected BaseByteArrayJournalDao(IAdvancedScheduler sched,
logicalDelete = _journalConfig.DaoConfig.LogicalDelete;
Serializer = serializer;
deserializeFlow = Serializer.DeserializeFlow();
+ deserializeFlowMapped = Serializer.DeserializeFlow().Select(MessageWithBatchMapper());
//Due to C# rules we have to initialize WriteQueue here
//Keeping it here vs init function prevents accidental moving of init
//to where variables aren't set yet.
@@ -132,13 +135,38 @@ private async Task QueueWriteJournalRows(Seq xs)
private async Task WriteJournalRows(Seq xs)
{
- using (var db = _connectionFactory.GetConnection())
{
//hot path:
//If we only have one row, penalty for BulkCopy
//Isn't worth it due to insert caching/etc.
if (xs.Count > 1)
{
+ await InsertMultiple(xs);
+ }
+ else if (xs.Count > 0)
+ {
+ await InsertSingle(xs);
+ }
+ }
+
+ }
+
+ private async Task InsertSingle(Seq xs)
+ {
+ using (var db = _connectionFactory.GetConnection())
+ {
+ await db.InsertAsync(xs.Head);
+ }
+ }
+
+ private async Task InsertMultiple(Seq xs)
+ {
+ using (var db = _connectionFactory.GetConnection())
+ {
+ try
+ {
+ await db.BeginTransactionAsync(IsolationLevel
+ .ReadCommitted);
await db.GetTable()
.BulkCopyAsync(
new BulkCopyOptions()
@@ -148,30 +176,50 @@ await db.GetTable()
.MaxRowByRowSize
? BulkCopyType.Default
: BulkCopyType.MultipleRows,
- UseInternalTransaction = true
+ //TODO: When Parameters are allowed,
+ //Make a Config Option
+ //Or default to true
+ UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert,
+ MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize
}, xs);
+ await db.CommitTransactionAsync();
}
- else if (xs.Count > 0)
+ catch (Exception e)
{
- await db.InsertAsync(xs.Head);
+ try
+ {
+ await db.RollbackTransactionAsync();
+ }
+ catch (Exception exception)
+ {
+ throw e;
+ }
+
+ throw;
}
}
-
}
-
+
public async Task> AsyncWriteMessages(
IEnumerable messages, long timeStamp = 0)
{
-
var serializedTries = Serializer.Serialize(messages, timeStamp);
//Just a little bit of magic here;
//.ToList() keeps it all working later for whatever reason
//while still keeping our allocations in check.
+
+ /*var trySet = new List();
+ foreach (var serializedTry in serializedTries)
+ {
+ trySet.AddRange(serializedTry.Success.GetOrElse(new List(0)));
+ }
+
+ var rows = Seq(trySet);*/
var rows = Seq(serializedTries.SelectMany(serializedTry =>
serializedTry.Success.GetOrElse(new List(0)))
.ToList());
-
+ //
return await QueueWriteJournalRows(rows).ContinueWith(task =>
@@ -239,8 +287,7 @@ await db.GetTable()
},
jmd => new JournalMetaData()
{
- PersistenceId = persistenceId,
- SequenceNumber = maxMarkedDeletion
+
},
() => new JournalMetaData()
{
@@ -400,30 +447,27 @@ public override
{
query = query.Take((int) max);
}
-
- var runninng = query.ToListAsync();
- return Source.FromTask(runninng).SelectMany(r => r)
- .Via(deserializeFlow.Select(MessageWithBatchMapper()));
+
+
+ return
+ Source.FromTask(query.ToListAsync())
+ .SelectMany(r => r)
+ .Via(deserializeFlowMapped);
//return AsyncSource.FromEnumerable(query,async q=>await q.ToListAsync())
// .Via(
// deserializeFlow).Select(MessageWithBatchMapper());
}
}
- private static Func, long)>, Util.Try> MessageWithBatchMapper()
- {
- return sertry =>
+ private static Func, long)>, Util.Try> MessageWithBatchMapper() =>
+ sertry =>
{
if (sertry.IsSuccess)
{
- var success = sertry.Success.Value;
return new
Util.Try(
- new ReplayCompletion()
- {
- repr = success.Item1,
- Ordering = success.Item3
- });
+ new ReplayCompletion( sertry.Success.Value
+ ));
}
else
{
@@ -432,6 +476,5 @@ public override
sertry.Failure.Value);
}
};
- }
}
}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs
index 2ff59c08..78871b38 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs
@@ -61,7 +61,7 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec,
Util.Option lastSeq = Util.Option.None;
if (lastMsg != null && lastMsg.IsSuccess)
{
- lastSeq = lastMsg.Success.Select(r => r.repr.SequenceNr);
+ lastSeq = lastMsg.Success.Select(r => r.Repr.SequenceNr);
}
else if (lastMsg != null && lastMsg.Failure.HasValue)
{
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs
index 16e418b0..21ec4de9 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs
@@ -28,41 +28,67 @@ public ByteArrayJournalSerializer(IProviderConfig journalCon
_separator = separator;
_separatorArray = new[] {_separator};
}
+
+ ///
+ /// Concatenates a set of tags using a provided separator.
+ ///
+ ///
+ ///
+ ///
+ private static string StringSep(IImmutableSet tags,
+ string separator)
+ {
+ if (tags.Count == 0)
+ {
+ return "";
+ }
+
+ return tags.Aggregate((tl, tr) =>
+ tl + separator + tr);
+ }
+
protected override Try Serialize(IPersistentRepresentation persistentRepr, IImmutableSet tTags, long timeStamp = 0)
{
try
{
- var serializer = _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
- string manifest = "";
- var binary = Akka.Serialization.Serialization.WithTransport(_serializer.System, () =>
- {
-
- if (serializer is SerializerWithStringManifest stringManifest)
- {
- manifest =
- stringManifest.Manifest(persistentRepr.Payload);
- }
- else
- {
- if (serializer.IncludeManifest)
+ return Akka.Serialization.Serialization.WithTransport(
+ _serializer.System, (persistentRepr
+ , _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer),
+ StringSep(tTags,_separator),
+ timeStamp
+ ),
+ state =>
{
- manifest = persistentRepr.Payload.GetType().TypeQualifiedName();
- }
- }
-
- return serializer.ToBinary(persistentRepr.Payload);
- });
- return new Try(new JournalRow()
- {
- manifest = manifest,
- message = binary,
- persistenceId = persistentRepr.PersistenceId,
- tags = tTags.Any()? tTags.Aggregate((tl, tr) => tl + _separator + tr) : "",
- Identifier = serializer.Identifier,
- sequenceNumber = persistentRepr.SequenceNr,
- Timestamp = persistentRepr.Timestamp==0? timeStamp: persistentRepr.Timestamp
- });
+ var (_persistentRepr, serializer,tags,ts) = state;
+ string thisManifest = "";
+ if (serializer is SerializerWithStringManifest withStringManifest)
+ {
+ thisManifest =
+ withStringManifest.Manifest(_persistentRepr.Payload);
+ }
+ else
+ {
+ if (serializer.IncludeManifest)
+ {
+ thisManifest = _persistentRepr.Payload
+ .GetType().TypeQualifiedName();
+ }
+ }
+ return new Try(new JournalRow()
+ {
+ message =
+ serializer.ToBinary(_persistentRepr.Payload),
+ manifest = thisManifest,
+ persistenceId = _persistentRepr.PersistenceId,
+ tags = tags,
+ Identifier = serializer.Identifier,
+ sequenceNumber = _persistentRepr.SequenceNr,
+ Timestamp = _persistentRepr.Timestamp == 0
+ ? ts
+ : _persistentRepr.Timestamp
+ });
+ });
}
catch (Exception e)
{
@@ -74,34 +100,44 @@ protected override Try Serialize(IPersistentRepresentation persisten
{
try
{
- object deserialized = null;
- if (t.Identifier.HasValue == false)
+ //object deserialized = null;
+ var identifierMaybe = t.Identifier;
+ if (identifierMaybe.HasValue == false)
{
var type = System.Type.GetType(t.manifest, true);
- var deserializer =
- _serializer.FindSerializerForType(type, null);
+
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
- deserialized =
- Akka.Serialization.Serialization.WithTransport(
- _serializer.System,
- () => deserializer.FromBinary(t.message, type));
+
+ return new Try<(IPersistentRepresentation, IImmutableSet
+ , long)>((
+ new Persistent( Akka.Serialization.Serialization.WithTransport(
+ _serializer.System, (_serializer.FindSerializerForType(type,_journalConfig.DefaultSerializer),t.message,type),
+ (state) =>
+ {
+ return state.Item1.FromBinary(
+ state.message, state.type);
+ }), t.sequenceNumber,
+ t.persistenceId,
+ t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp),
+ t.tags?.Split(_separatorArray,
+ StringSplitOptions.RemoveEmptyEntries)
+ .ToImmutableHashSet() ?? ImmutableHashSet.Empty,
+ t.ordering));
}
else
{
- var serializerId = t.Identifier.Value;
+ return new Try<(IPersistentRepresentation, IImmutableSet
+ , long)>((
+ new Persistent(_serializer.Deserialize(t.message,
+ identifierMaybe.Value,t.manifest), t.sequenceNumber,
+ t.persistenceId,
+ t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp),
+ t.tags?.Split(_separatorArray,
+ StringSplitOptions.RemoveEmptyEntries)
+ .ToImmutableHashSet() ?? ImmutableHashSet.Empty,
+ t.ordering));
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
- deserialized = _serializer.Deserialize(t.message,
- serializerId,t.manifest);
}
-
- return (
- new Persistent(deserialized, t.sequenceNumber,
- t.persistenceId,
- t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp),
- t.tags?.Split(_separatorArray,
- StringSplitOptions.RemoveEmptyEntries)
- .ToImmutableHashSet() ?? ImmutableHashSet.Empty,
- t.ordering);
}
catch (Exception e)
{
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs
index ffbbb005..2a575137 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs
@@ -4,16 +4,28 @@ public class FlowControl
{
public class Continue : FlowControl
{
+ private Continue()
+ {
+ }
+
public static Continue Instance = new Continue();
}
public class ContinueDelayed : FlowControl
{
+ private ContinueDelayed()
+ {
+ }
+
public static ContinueDelayed Instance = new ContinueDelayed();
}
public class Stop : FlowControl
{
+ private Stop()
+ {
+ }
+
public static Stop Instance = new Stop();
}
}
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs
index f78cd947..d35a87d9 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
+using System.Linq;
+using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
@@ -14,11 +16,12 @@
using Akka.Persistence.Sql.Linq2Db.Utility;
using Akka.Streams;
using Akka.Streams.Dsl;
-using Akka.Util;
using Akka.Util.Internal;
+using LanguageExt;
namespace Akka.Persistence.Sql.Linq2Db.Journal
{
+
public class DateTimeHelpers
{
private static DateTime UnixEpoch = new DateTime(1970,1,1,0,0,0,DateTimeKind.Utc);
@@ -102,14 +105,18 @@ protected override bool ReceivePluginInternal(object message)
{
if (message is WriteFinished wf)
{
- writeInProgress.Remove(wf.PersistenceId);
+
+ if (writeInProgress.TryGetValue(wf.PersistenceId,
+ out Task latestPending) & latestPending == wf.Future)
+ {
+ writeInProgress.Remove(wf.PersistenceId);
+ }
+ return true;
}
else
{
return false;
}
-
- return true;
}
public override void AroundPreRestart(Exception cause, object message)
@@ -125,7 +132,7 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per
{
await _journal.MessagesWithBatch(persistenceId, fromSequenceNr,
toSequenceNr, _journalConfig.DaoConfig.ReplayBatchSize,
- Option<(TimeSpan, IScheduler)>.None)
+ Util.Option<(TimeSpan, IScheduler)>.None)
.Take(max).SelectAsync(1,
t => t.IsSuccess
? Task.FromResult(t.Success.Value)
@@ -133,7 +140,7 @@ await _journal.MessagesWithBatch(persistenceId, fromSequenceNr,
t.Failure.Value))
.RunForeach(r =>
{
- recoveryCallback(r.repr);
+ recoveryCallback(r.Repr);
}, _mat);
}
@@ -142,18 +149,9 @@ public override async Task ReadHighestSequenceNrAsync(string persistenceId
{
if (writeInProgress.ContainsKey(persistenceId))
{
- try
- {
- await writeInProgress[persistenceId];
- }
- catch (Exception)
- {
- //We don't have 'Recover' in C# so this is intentionally empty
- //Basically we just wanted to wait for write to succeed OR fail
- }
- var hsn =await _journal.HighestSequenceNr(persistenceId,
- fromSequenceNr);
- return hsn;
+ //We don't care whether the write succeeded or failed
+ //We just want it to finish.
+ await new NoThrowAwaiter(writeInProgress[persistenceId]);
}
return await _journal.HighestSequenceNr(persistenceId, fromSequenceNr);
}
@@ -167,7 +165,7 @@ protected override async Task>
var persistenceId = messages.Head().PersistenceId;
var future = _journal.AsyncWriteMessages(messages,currentTime);
- writeInProgress.AddOrSet(persistenceId, future);
+ writeInProgress[persistenceId] = future;
var self = Self;
//When we are done, we want to send a 'WriteFinished' so that
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs
index 80b771cc..4574e9d6 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs
@@ -5,6 +5,10 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.Types
{
public sealed class JournalRow
{
+ public JournalRow()
+ {
+
+ }
public long ordering { get; set; }
public long Timestamp { get; set; } = 0;
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs
index c5e105f6..28cad0ff 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs
@@ -1,8 +1,23 @@
-namespace Akka.Persistence.Sql.Linq2Db.Journal.Types
+using System.Collections.Immutable;
+
+namespace Akka.Persistence.Sql.Linq2Db.Journal.Types
{
public class ReplayCompletion
{
- public IPersistentRepresentation repr { get; set; }
- public long Ordering { get; set; }
+ public ReplayCompletion(IPersistentRepresentation repr, long ordering)
+ {
+ Repr = repr;
+ Ordering = ordering;
+ }
+
+ public readonly IPersistentRepresentation Repr;
+ public readonly long Ordering;
+
+ public ReplayCompletion((IPersistentRepresentation, IImmutableSet, long) success)
+ {
+ //(Repr, _, Ordering) = success;
+ Repr = success.Item1;
+ Ordering = success.Item3;
+ }
}
}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs
index 28afb73d..9d1ff11f 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs
@@ -156,11 +156,10 @@ private Flow perfectlyMatchTag(
DataConnection dc, string persistenceId, long fromSequenceNr,
long toSequenceNr, long max)
{
- var toTake = MaxTake(max);
return AsyncSource.FromEnumerable(
new
{
- dc, persistenceId, fromSequenceNr, toSequenceNr, toTake,
+ dc, persistenceId, fromSequenceNr, toSequenceNr,toTake= MaxTake(max),
includeDeleted
}, async (state) =>
@@ -180,10 +179,8 @@ await baseQueryStatic(state.dc, state.includeDeleted)
{
var val = t.Get();
return new Akka.Util.Try(
- new ReplayCompletion()
- {
- repr = val.Item1, Ordering = val.Item3
- });
+ new ReplayCompletion(val.Item1,val.Item3)
+ );
}
catch (Exception e)
{
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs
index e03f9f33..03bca12e 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs
@@ -154,8 +154,8 @@ private Source _eventsByPersistenceIdSource(
toSequenceNr, batchSize, refreshInterval)
.SelectAsync, ReplayCompletion, NotUsed>(1,
reprAndOrdNr => Task.FromResult(reprAndOrdNr.Get()))
- .SelectMany((ReplayCompletion r) => _adaptEvents(r.repr)
- .Select(p => new {repr = r.repr, ordNr = r.Ordering}))
+ .SelectMany((ReplayCompletion r) => _adaptEvents(r.Repr)
+ .Select(p => new {repr = r.Repr, ordNr = r.Ordering}))
.Select(r => new EventEnvelope(new Sequence(r.ordNr),
r.repr.PersistenceId, r.repr.SequenceNr, r.repr.Payload,r.repr.Timestamp));
}
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs
index ad6892cf..24c67720 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs
@@ -25,23 +25,10 @@ public bool InRange(long number)
public IEnumerable ToEnumerable()
{
- var itemCount = until - from;
- List returnList;
- if (itemCount < Int32.MaxValue)
- {
- returnList = new List();
- }
- else
- {
- returnList = new List();
- }
-
for (long i = from; i < until; i++)
{
- returnList.Add(i);
+ yield return i;
}
-
- return returnList;
}
public IEnumerator GetEnumerator()
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Readme.MD b/src/Akka.Persistence.Sql.Linq2Db/Readme.MD
index 58891cee..fe826cff 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Readme.MD
+++ b/src/Akka.Persistence.Sql.Linq2Db/Readme.MD
@@ -149,8 +149,10 @@ Please note that you -must- provide a Connection String and Provider name.
- Testing indicates that `2` will provide performance on par or better than both batching and non-batching journal.
- For SQLite, you may want to just put `1` here, because SQLite allows at most a single writer at a time even in WAL mode.
- Keep in mind there may be some latency/throughput trade-offs if your write-set gets large.
- - Note that these run on the threadpool, not on dedicated threads. Setting this number too high may steal work from other actors.
- - It's worth noting that LinqToDb's Bulk Copy implementations are -very- efficient here, since under many DBs the batch can be done in a single async round-trip.
+ - Note that unless `materializer-dispatcher` is changed, by default these run on the threadpool, not on dedicated threads. Setting this number too high may steal work from other actors.
+ - It's worth noting that LinqToDb's Bulk Copy implementations are very efficient here, since under many DBs the batch can be done in a single async round-trip.
+ - `materializer-dispatcher` may be used to change the dispatcher that the Akka.Streams Queues use for scheduling.
+ - You can define a different dispatcher here if worried about stealing from the thread-pool, for instance a Dedicated thread-pool dispatcher.
- `logical-delete` if `true` will only set the deleted flag for items, i.e. will not actually delete records from DB.
- if `false` all records are set as deleted, and then all but the top record is deleted. This top record is used for sequence number tracking in case no other records exist in the table.
- `delete-compatibility-mode` specifies to perform deletes in a way that is compatible with Akka.Persistence.Sql.Common.
@@ -159,6 +161,16 @@ Please note that you -must- provide a Connection String and Provider name.
- `use-clone-connection` is a bit of a hack. Currently Linq2Db has a performance penalty for custom mapping schemas. Cloning the connection is faster but may not work for all scenarios.
- tl;dr - If a password or similar is in the connection string, leave `use-clone-connection` set to `false`.
- If you don't have a password or similar, run some tests with it set to `true`. You'll see improved write and read performance.
+ - Batching options:
+ - `batch-size` controls the maximum size of the batch used in the Akka.Streams Batch. A single batch is written to the DB in a transaction, with 1 or more round trips.
+ - If more than `batch-size` is in a single `AtomicWrite`, That atomic write will still be atomic, just treated as it's own batch.
+ - `db-round-trip-max-batch-size` tries to hint to Linq2Db multirow insert the maximum number of rows to send in a round-trip to the DB.
+ - multiple round-trips will still be contained in a single transaction.
+ - You will want to Keep this number higher than `batch-size`, if you are persisting lots of events with `PersistAll/(Async)`.
+ - `prefer-parameters-on-multirow-insert` controls whether Linq2Db will try to use parameters instead of building raw strings for inserts.
+ - Linq2Db is incredibly speed and memory efficent at building binary strings. In most cases, this will be faster than the cost of parsing/marshalling parameters by ADO and the DB.
+
+
- For Table Configuration:
- Note that Tables/Columns will be created with the casing provided, and selected in the same way (i.e. if using a DB with case sensitive columns, be careful!)
@@ -206,12 +218,24 @@ akka.persistence {
#For that penalty.
buffer-size = 5000
- #Batch size refers to the number of items included in a batch to DB
+ #Batch size refers to the maximum number of items included in a batch (transaction) to DB
#JDBC Default is/was 400 but testing against scenarios indicates
#100 is better for overall latency. That said,
#larger batches may be better if you have A fast/local DB.
batch-size = 100
+ #This batch size controls the maximum number of rows that will be sent
+ #In a single round trip to the DB. This is different than the -actual- batch size,
+ #And intentionally set larger than batch-size,
+ #to help atomicwrites be faster
+ #Note that Linq2Db may use a lower number per round-trip in some cases.
+ db-round-trip-max-batch-size = 1000
+
+ #Linq2Db by default will use a built string for multi-row inserts
+ #Somewhat counterintuitively, this is faster than using parameters in most cases,
+ #But if you would prefer parameters, you can set this to true.
+ prefer-parameters-on-multirow-insert = true
+
# Denotes the number of messages retrieved
# Per round-trip to DB on recovery.
# This is to limit both size of dataset from DB (possibly lowering locking requirements)
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs
index a897c59a..c2b1e347 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs
@@ -44,6 +44,7 @@ public abstract class PersistentReprSerializer
if (opt.HasValue)
{
retList.Add(opt.Value);
+ return new Util.Try>(retList);
}
else
{
@@ -65,43 +66,16 @@ public abstract class PersistentReprSerializer
return new Util.Try>(ser.Failure.Value);
}
}
+
+ return new Util.Try>(retList);
}
- return new Util.Try>(retList);
+ //return new Util.Try>(retList);
}).ToList();
}
- public Seq>> SerializeSeq(
- IEnumerable messages)
- {
- return messages.Select(aw =>
- {
- return Util.Try>.From(() =>
- {
- var serialized =
- (aw.Payload as IEnumerable)
- .Select(p=> Serialize(p).Get());
- return serialized.ToSeq();
- });
- }).ToSeq();
-
- //return messages.Select(aw =>
- //{
- // var serialized = (aw.Payload as IEnumerable)
- // .Select(Serialize).
- //})
- //return Seq(messages.Select(aw =>
- //{
- // var serialized =
- // (aw.Payload as IEnumerable)
- // .Select(Serialize);
- // return TrySeq.SequenceSeq(serialized);
- //}));
- }
-
-
public Akka.Util.Try Serialize(IPersistentRepresentation persistentRepr, long timeStamp = 0)
{
switch (persistentRepr.Payload)
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs
index e7275493..90372dbf 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs
@@ -45,8 +45,11 @@ protected object GetSnapshot(SnapshotRow reader)
{
var type = Type.GetType(manifest, true);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
- var serializer = _serialization.FindSerializerForType(type, _config.DefaultSerializer);
- obj = Akka.Serialization.Serialization.WithTransport(_serialization.System, () => serializer.FromBinary(binary, type));
+ obj = Akka.Serialization.Serialization.WithTransport(
+ _serialization.System,
+ (serializer: _serialization.FindSerializerForType(type, _config.DefaultSerializer),
+ binary, type),
+ (state) => state.serializer.FromBinary(state.binary, state.type));
}
else
{
@@ -60,8 +63,8 @@ private SnapshotRow ToSnapshotEntry(SnapshotMetadata metadata, object snapshot)
{
var snapshotType = snapshot.GetType();
var serializer = _serialization.FindSerializerForType(snapshotType, _config.DefaultSerializer);
- var binary = Akka.Serialization.Serialization.WithTransport(_serialization.System,
- () => serializer.ToBinary(snapshot));
+ var binary = Akka.Serialization.Serialization.WithTransport(_serialization.System,(serializer, snapshot),
+ (state ) => state.serializer.ToBinary(state.snapshot));
string manifest = "";
if (serializer is SerializerWithStringManifest)
{
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Util/NoThrowAwaiter.cs b/src/Akka.Persistence.Sql.Linq2Db/Util/NoThrowAwaiter.cs
new file mode 100644
index 00000000..f62243dd
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Linq2Db/Util/NoThrowAwaiter.cs
@@ -0,0 +1,20 @@
+using System;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+
+namespace Akka.Persistence.Sql.Linq2Db.Utility
+{
+ ///
+ /// An awaiter that never throws.
+ ///
+ internal struct NoThrowAwaiter : ICriticalNotifyCompletion
+ {
+ private readonly Task _task;
+ public NoThrowAwaiter(Task task) { _task = task; }
+ public NoThrowAwaiter GetAwaiter() => this;
+ public bool IsCompleted => _task.IsCompleted;
+ public void GetResult() { }
+ public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation);
+ public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
+ }
+}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf
index 0efd8e7b..988b0b72 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf
+++ b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf
@@ -18,10 +18,12 @@
# Rather than actual physical deletions
logical-delete = false
- # If true, journal_metadata is created
+ # If true, journal_metadata is created and used for deletes
+ # and max sequence number queries.
+ # note that there is a performance penalty for using this.
delete-compatibility-mode = true
- # If "sqlite" or "sqlserver", default column names are compatible with
+ # If "sqlite", "sqlserver", or "postgres", default column names are compatible with
# Akka.Persistence.Sql Default Column names.
table-compatibility-mode = null
@@ -32,11 +34,25 @@
buffer-size = 5000
#Batch size refers to the number of items included in a batch to DB
+ # (In cases where an AtomicWrite is greater than batch-size,
+ # The Atomic write will still be handled in a single batch.)
#JDBC Default is/was 400 but testing against scenarios indicates
#100 is better for overall latency. That said,
#larger batches may be better if you have A fast/local DB.
batch-size = 100
+ #This batch size controls the maximum number of rows that will be sent
+ #In a single round trip to the DB. This is different than the -actual- batch size,
+ #And intentionally set larger than batch-size,
+ #to help atomicwrites be faster
+ #Note that Linq2Db may use a lower number per round-trip in some cases.
+ db-round-trip-max-batch-size = 1000
+
+ #Linq2Db by default will use a built string for multi-row inserts
+ #Somewhat counterintuitively, this is faster than using parameters in most cases,
+ #But if you would prefer parameters, you can set this to true.
+ prefer-parameters-on-multirow-insert = true
+
# Denotes the number of messages retrieved
# Per round-trip to DB on recovery.
# This is to limit both size of dataset from DB (possibly lowering locking requirements)
diff --git a/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf b/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf
index d30aa2d8..40ca63b0 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf
+++ b/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf
@@ -23,8 +23,7 @@
# Column names will be compatible with Akka.Persistence.Sql
# You still must set your table name!
table-compatibility-mode = false
- tables
- {
+ tables.
snapshot
{
schema-name = null
@@ -69,7 +68,6 @@
serializerId: "serializer_id",
}
}
- }
}
}
}
\ No newline at end of file