From 61fb2056da022d10278c592d6fe319e12152a6e8 Mon Sep 17 00:00:00 2001 From: Drew Date: Tue, 31 Aug 2021 12:49:24 -0400 Subject: [PATCH 1/4] Fix issue where Persist failures were being reported as rejects. (#29) * Fix issue where Persist failures were being reported as rejects. Use an enum instead of a class on flowcontrol. * fix syntax that makes build server angry --- .../Journal/DAO/BaseByteArrayJournalDao.cs | 145 +++++++++++------- .../DAO/BaseJournalDaoWithReadMessages.cs | 75 +++++---- .../Journal/DAO/FlowControl.cs | 32 +--- .../Journal/Linq2DbWriteJournal.cs | 9 +- .../Query/Linq2DbReadJournal.cs | 62 ++++---- 5 files changed, 166 insertions(+), 157 deletions(-) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs index d80fc3bc..7987cb27 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs @@ -3,6 +3,7 @@ using System.Collections.Immutable; using System.Data; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -138,21 +139,18 @@ private async Task WriteJournalRows(Seq xs) { //hot path: //If we only have one row, penalty for BulkCopy - //Isn't worth it due to insert caching/etc. - if (xs.Count > 1) - { + //Isn't worth it due to insert caching/transaction/etc. + var count = xs.Count; + if (count > 1) await InsertMultiple(xs); - } - else if (xs.Count > 0) - { - await InsertSingle(xs); - } + else if (count == 1) await InsertSingle(xs); } - } private async Task InsertSingle(Seq xs) { + //If we are writing a single row, + //we don't need to worry about transactions. using (var db = _connectionFactory.GetConnection()) { await db.InsertAsync(xs.Head); @@ -176,9 +174,6 @@ await db.GetTable() .MaxRowByRowSize ? BulkCopyType.Default : BulkCopyType.MultipleRows, - //TODO: When Parameters are allowed, - //Make a Config Option - //Or default to true UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert, MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize }, xs); @@ -200,45 +195,61 @@ await db.GetTable() } } - public async Task> AsyncWriteMessages( - IEnumerable messages, long timeStamp = 0) - { - var serializedTries = Serializer.Serialize(messages, timeStamp); - - //Just a little bit of magic here; - //.ToList() keeps it all working later for whatever reason - //while still keeping our allocations in check. + //By using a custom flatten here, we avoid an Enumerable/LINQ allocation + //And are able to have a little more control over default capacity of array. + static List FlattenListOfListsToList(List>> source) { - /*var trySet = new List(); - foreach (var serializedTry in serializedTries) + //List ResultSet( + // Akka.Util.Try> item) + //{ + // return item.Success.GetOrElse(new List(0)); + //} + + List rows = new List(source.Count > 4 ? source.Count:4); + for (var index = 0; index < source.Count; index++) { - trySet.AddRange(serializedTry.Success.GetOrElse(new List(0))); + var item = source[index].Success.Value; + if (item != null) + { + rows.AddRange(item); + } + //rows.AddRange(ResultSet(source[index])); } - var rows = Seq(trySet);*/ - var rows = Seq(serializedTries.SelectMany(serializedTry => - serializedTry.Success.GetOrElse(new List(0))) - .ToList()); - // + return rows; + } + public async Task> AsyncWriteMessages( + IEnumerable messages, long timeStamp = 0) + { + var serializedTries = Serializer.Serialize(messages, timeStamp); - return await QueueWriteJournalRows(rows).ContinueWith(task => - { - //We actually are trying to interleave our tasks here... - //Basically, if serialization failed our task will likely - //Show success - //But we instead should display the serialization failure - return serializedTries.Select(r => - r.IsSuccess - ? (task.IsFaulted - ? TryUnwrapException(task.Exception) - : null) - : r.Failure.Value).ToImmutableList(); - }, CancellationToken.None, - TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Default); + //Fold our List of Lists into a single sequence + var rows = Seq(FlattenListOfListsToList(serializedTries)); + //Wait for the write to go through. If Task fails, write will be captured + //As WriteMessagesFailure. + await QueueWriteJournalRows(rows); + //If we get here, we build an ImmutableList containing our rejections. + //These will be captured as WriteMessagesRejected + return BaseByteArrayJournalDao + .BuildWriteRejections(serializedTries); } + protected static ImmutableList BuildWriteRejections( + List>> serializedTries) + { + Exception[] builderEx = + new Exception[serializedTries.Count]; + for (int i = 0; i < serializedTries.Count; i++) + { + builderEx[i] = (serializedTries[i].Failure.Value); + } + return ImmutableList.CreateRange(builderEx); + } + protected static ImmutableList FailWriteThrowHelper(Exception e) + { + throw TryUnwrapException(e); + } protected static Exception TryUnwrapException(Exception e) { var aggregateException = e as AggregateException; @@ -346,18 +357,31 @@ protected IQueryable MaxMarkedForDeletionMaxPersistenceIdQuery(DataConnect .OrderByDescending(r => r.sequenceNumber) .Select(r => r.sequenceNumber).Take(1); } + + static readonly Expression> metaDataSelector = + + md => + new PersistenceIdAndSequenceNumber() + { + SequenceNumber = md.SequenceNumber, + PersistenceId = md.PersistenceId + }; + static readonly Expression> rowDataSelector = + md => + new PersistenceIdAndSequenceNumber() + { + SequenceNumber = md.sequenceNumber, + PersistenceId = md.persistenceId + }; private IQueryable MaxSeqNumberForPersistenceIdQuery( DataConnection db, string persistenceId, long minSequenceNumber = 0) { + + var queryable = db.GetTable() - .Where(r => r.persistenceId == persistenceId).Select(r => - new - { - SequenceNumber = r.sequenceNumber, - PersistenceId = r.persistenceId - }); + .Where(r => r.persistenceId == persistenceId).Select(rowDataSelector); if (minSequenceNumber != 0) { queryable = queryable.Where(r => @@ -370,18 +394,19 @@ private IQueryable MaxSeqNumberForPersistenceIdQuery( .Where(r => r.SequenceNumber > minSequenceNumber && r.PersistenceId == persistenceId); - queryable = queryable.Union(nextQuery.Select(md => - new - { - SequenceNumber = md.SequenceNumber, - PersistenceId = md.PersistenceId - })); + queryable = queryable.Union(nextQuery.Select(metaDataSelector)); } - return queryable.OrderByDescending(r => r.SequenceNumber) - .Select(r => r.SequenceNumber).Take(1); + return queryable.OrderByDescending(sequenceNumberSelector) + .Select(sequenceNumberSelector).Take(1); } + private static readonly + Expression> + sequenceNumberSelector = + r => r.SequenceNumber; + + public async Task Update(string persistenceId, long sequenceNr, object payload) { @@ -477,4 +502,10 @@ public override } }; } + + public class PersistenceIdAndSequenceNumber + { + public long SequenceNumber { get; set; } + public string PersistenceId { get; set; } + } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs index 78871b38..b201b7ef 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs @@ -33,14 +33,14 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec, public Source, NotUsed> MessagesWithBatch(string persistenceId, long fromSequenceNr, long toSequenceNr, int batchSize, Util.Option<(TimeSpan,IScheduler)> refreshInterval) { - var src = Source - .UnfoldAsync<(long, FlowControl), + return Source + .UnfoldAsync<(long, FlowControlEnum), Seq>>( (Math.Max(1, fromSequenceNr), - FlowControl.Continue.Instance), + FlowControlEnum.Continue), async opt => { - async Task>)>> + async Task>)>> RetrieveNextBatch() { Seq< @@ -48,76 +48,75 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec, using (var conn = _connectionFactory.GetConnection()) { - var waited = Messages(conn, persistenceId, - opt.Item1, - toSequenceNr, batchSize); - msg = await waited + msg = await Messages(conn, persistenceId, + opt.Item1, + toSequenceNr, batchSize) .RunWith( ExtSeq.Seq>(), mat); } var hasMoreEvents = msg.Count == batchSize; - var lastMsg = msg.LastOrDefault(); + //var lastMsg = msg.IsEmpty.LastOrDefault(); Util.Option lastSeq = Util.Option.None; - if (lastMsg != null && lastMsg.IsSuccess) + if (msg.IsEmpty == false) { - lastSeq = lastMsg.Success.Select(r => r.Repr.SequenceNr); - } - else if (lastMsg != null && lastMsg.Failure.HasValue) - { - throw lastMsg.Failure.Value; + + lastSeq = msg.Last.Get().Repr.SequenceNr; } - var hasLastEvent = - lastSeq.HasValue && - lastSeq.Value >= toSequenceNr; - FlowControl nextControl = null; - if (hasLastEvent || opt.Item1 > toSequenceNr) + + FlowControlEnum nextControl = FlowControlEnum.Unknown; + if ((lastSeq.HasValue && + lastSeq.Value >= toSequenceNr) || opt.Item1 > toSequenceNr) { - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; } else if (hasMoreEvents) { - nextControl = FlowControl.Continue.Instance; + nextControl = FlowControlEnum.Continue; } else if (refreshInterval.HasValue == false) { - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; } else { - nextControl = FlowControl.ContinueDelayed - .Instance; + nextControl = FlowControlEnum.ContinueDelayed; } - long nextFrom = 0; + long nextFrom = opt.Item1; if (lastSeq.HasValue) { nextFrom = lastSeq.Value + 1; } - else - { - nextFrom = opt.Item1; - } - return new Util.Option<((long, FlowControl), Seq>)>(( + return new Util.Option<((long, FlowControlEnum), Seq>)>(( (nextFrom, nextControl), msg)); } switch (opt.Item2) { - case FlowControl.Stop _: - return Util.Option<((long, FlowControl), Seq>)>.None; - case FlowControl.Continue _: + case FlowControlEnum.Stop: + return Util.Option<((long, FlowControlEnum), Seq>)>.None; + case FlowControlEnum.Continue: return await RetrieveNextBatch(); - case FlowControl.ContinueDelayed _ when refreshInterval.HasValue: + case FlowControlEnum.ContinueDelayed when refreshInterval.HasValue: return await FutureTimeoutSupport.After(refreshInterval.Value.Item1,refreshInterval.Value.Item2, RetrieveNextBatch); default: - throw new Exception($"Got invalid FlowControl from Queue! Type : {opt.Item2.GetType()}"); + return InvalidFlowThrowHelper(opt); } - }); + }).SelectMany(r => r);; + } - return src.SelectMany(r => r); + private static Util.Option MessagesWithBatchThrowHelper(Util.Try lastMsg) + { + throw lastMsg.Failure.Value; + } + + private static Util.Option<((long, FlowControlEnum), Seq>)> InvalidFlowThrowHelper((long, FlowControlEnum) opt) + { + throw new Exception( + $"Got invalid FlowControl from Queue! Type : {opt.Item2.ToString()}"); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs index 2a575137..f7413f18 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs @@ -1,32 +1,10 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO { - public class FlowControl + public enum FlowControlEnum { - public class Continue : FlowControl - { - private Continue() - { - } - - public static Continue Instance = new Continue(); - } - - public class ContinueDelayed : FlowControl - { - private ContinueDelayed() - { - } - - public static ContinueDelayed Instance = new ContinueDelayed(); - } - - public class Stop : FlowControl - { - private Stop() - { - } - - public static Stop Instance = new Stop(); - } + Unknown, + Continue, + Stop, + ContinueDelayed } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs index d35a87d9..28d0d474 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs @@ -147,11 +147,12 @@ await _journal.MessagesWithBatch(persistenceId, fromSequenceNr, public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { - if (writeInProgress.ContainsKey(persistenceId)) + + if (writeInProgress.TryGetValue(persistenceId, out Task wip)) { //We don't care whether the write succeeded or failed //We just want it to finish. - await new NoThrowAwaiter(writeInProgress[persistenceId]); + await new NoThrowAwaiter(wip); } return await _journal.HighestSequenceNr(persistenceId, fromSequenceNr); } @@ -170,8 +171,10 @@ protected override async Task> //When we are done, we want to send a 'WriteFinished' so that //Sequence Number reads won't block/await/etc. +#pragma warning disable 4014 future.ContinueWith((p) => - self.Tell(new WriteFinished(persistenceId, future)), +#pragma warning restore 4014 + self.Tell(new WriteFinished(persistenceId, p)), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs index 03bca12e..27c6fd68 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs @@ -220,11 +220,11 @@ private Source _eventsByTag(string tag, .JournalSequenceRetrievalConfiguration.AskTimeout; var batchSize = readJournalConfig.MaxBufferSize; return Source - .UnfoldAsync<(long, FlowControl), IImmutableList - >((offset, FlowControl.Continue.Instance), + .UnfoldAsync<(long, FlowControlEnum), IImmutableList + >((offset, FlowControlEnum.Continue), uf => { - async Task)>> retrieveNextBatch() { var queryUntil = @@ -237,25 +237,24 @@ await _currentJournalEventsByTag(tag, uf.Item1, Sink.Seq(), _mat); var hasMoreEvents = xs.Count == batchSize; - FlowControl nextControl = null; + FlowControlEnum nextControl = FlowControlEnum.Unknown; if (terminateAfterOffset.HasValue) { if (!hasMoreEvents && terminateAfterOffset.Value <= queryUntil.Max) - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; if (xs.Exists(r => (r.Offset is Sequence s) && s.Value >= terminateAfterOffset.Value)) - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; } - if (nextControl == null) + if (nextControl == FlowControlEnum.Unknown) { nextControl = hasMoreEvents - ? (FlowControl) FlowControl.Continue - .Instance - : FlowControl.ContinueDelayed.Instance; + ? FlowControlEnum.Continue + : FlowControlEnum.ContinueDelayed; } var nextStartingOffset = (xs.Count == 0) @@ -264,7 +263,7 @@ await _currentJournalEventsByTag(tag, uf.Item1, .Where(r => r != null).Max(t => t.Value); return new Akka.Util.Option<((long nextStartingOffset, - FlowControl + FlowControlEnum nextControl), IImmutableList xs) >(( @@ -273,15 +272,15 @@ await _currentJournalEventsByTag(tag, uf.Item1, switch (uf.Item2) { - case FlowControl.Stop _: + case FlowControlEnum.Stop: return Task.FromResult(Akka.Util - .Option<((long, FlowControl), + .Option<((long, FlowControlEnum), IImmutableList)> .None); - case FlowControl.Continue _: + case FlowControlEnum.Continue: return retrieveNextBatch(); - case FlowControl.ContinueDelayed _: + case FlowControlEnum.ContinueDelayed : return Akka.Pattern.FutureTimeoutSupport.After( readJournalConfig.RefreshInterval, system.Scheduler, @@ -289,7 +288,7 @@ await _currentJournalEventsByTag(tag, uf.Item1, default: return Task.FromResult(Akka.Util - .Option<((long, FlowControl), + .Option<((long, FlowControlEnum), IImmutableList)> .None); } @@ -317,11 +316,11 @@ private Source _events( .JournalSequenceRetrievalConfiguration.AskTimeout; var batchSize = readJournalConfig.MaxBufferSize; return Source - .UnfoldAsync<(long, FlowControl), IImmutableList - >((offset, FlowControl.Continue.Instance), + .UnfoldAsync<(long, FlowControlEnum), IImmutableList + >((offset, FlowControlEnum.Continue), uf => { - async Task)>> retrieveNextBatch() { var queryUntil = @@ -334,25 +333,24 @@ await _currentJournalEvents(uf.Item1, Sink.Seq(), _mat); var hasMoreEvents = xs.Count == batchSize; - FlowControl nextControl = null; + FlowControlEnum nextControl = FlowControlEnum.Unknown; if (terminateAfterOffset.HasValue) { if (!hasMoreEvents && terminateAfterOffset.Value <= queryUntil.Max) - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; if (xs.Exists(r => (r.Offset is Sequence s) && s.Value >= terminateAfterOffset.Value)) - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; } - if (nextControl == null) + if (nextControl == FlowControlEnum.Unknown) { nextControl = hasMoreEvents - ? (FlowControl) FlowControl.Continue - .Instance - : FlowControl.ContinueDelayed.Instance; + ? FlowControlEnum.Continue + : FlowControlEnum.ContinueDelayed; } var nextStartingOffset = (xs.Count == 0) @@ -361,7 +359,7 @@ await _currentJournalEvents(uf.Item1, .Where(r => r != null).Max(t => t.Value); return new Akka.Util.Option<((long nextStartingOffset, - FlowControl + FlowControlEnum nextControl), IImmutableList xs) >(( @@ -370,15 +368,15 @@ await _currentJournalEvents(uf.Item1, switch (uf.Item2) { - case FlowControl.Stop _: + case FlowControlEnum.Stop: return Task.FromResult(Akka.Util - .Option<((long, FlowControl), + .Option<((long, FlowControlEnum), IImmutableList)> .None); - case FlowControl.Continue _: + case FlowControlEnum.Continue: return retrieveNextBatch(); - case FlowControl.ContinueDelayed _: + case FlowControlEnum.ContinueDelayed: return Akka.Pattern.FutureTimeoutSupport.After( readJournalConfig.RefreshInterval, system.Scheduler, @@ -386,7 +384,7 @@ await _currentJournalEvents(uf.Item1, default: return Task.FromResult(Akka.Util - .Option<((long, FlowControl), + .Option<((long, FlowControlEnum), IImmutableList)> .None); } From 7b48f783789beea9d8006ded7c6bd8ad2f6c8de3 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 18 Nov 2021 18:47:23 -0600 Subject: [PATCH 2/4] Upgrade to Ubuntu 20.04 (#31) Ubuntu 16.04 is no longer supported. --- build-system/pr-validation.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build-system/pr-validation.yaml b/build-system/pr-validation.yaml index 5afcc597..d1db25b8 100644 --- a/build-system/pr-validation.yaml +++ b/build-system/pr-validation.yaml @@ -27,8 +27,8 @@ jobs: parameters: name: 'linux_pr' displayName: 'Linux PR Validation' - vmImage: 'ubuntu-16.04' + vmImage: 'ubuntu-20.04' scriptFileName: ./build.sh scriptArgs: all outputDirectory: 'bin/nuget' - artifactName: 'nuget_pack-$(Build.BuildId)' \ No newline at end of file + artifactName: 'nuget_pack-$(Build.BuildId)' From 1588ffea19eb14ec4c464a293707b094495f6cf4 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 18 Nov 2021 19:16:32 -0600 Subject: [PATCH 3/4] added dependabot.yml (#32) --- .github/dependabot.yml | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .github/dependabot.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..851593ab --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,8 @@ +version: 2 +updates: +- package-ecosystem: nuget + directory: "/" + schedule: + interval: daily + time: "11:00" + open-pull-requests-limit: 10 \ No newline at end of file From 7469ac751b13c5f92679b7fe8ec53eb074d6dabe Mon Sep 17 00:00:00 2001 From: Drew Date: Thu, 18 Nov 2021 20:29:14 -0500 Subject: [PATCH 4/4] release notes (#30) Co-authored-by: Aaron Stannard --- RELEASE_NOTES.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 11a65b86..f4f4a1a4 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,8 @@ +#### 1.4.28 Nov 18 2021 #### +**Perf Enhancements and fixes to failure reporting** + +There was an issue found where persistence failures were being reported as rejections when they should not have. This has been fixed alongside some logic cleanup that should lead to more consistent performance. + #### 1.4.21 July 6 2021 #### **First official Release for Akka.Persistence.Linq2Db**