diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..851593ab --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,8 @@ +version: 2 +updates: +- package-ecosystem: nuget + directory: "/" + schedule: + interval: daily + time: "11:00" + open-pull-requests-limit: 10 \ No newline at end of file diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 11a65b86..f4f4a1a4 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,8 @@ +#### 1.4.28 Nov 18 2021 #### +**Perf Enhancements and fixes to failure reporting** + +There was an issue found where persistence failures were being reported as rejections when they should not have. This has been fixed alongside some logic cleanup that should lead to more consistent performance. + #### 1.4.21 July 6 2021 #### **First official Release for Akka.Persistence.Linq2Db** diff --git a/build-system/pr-validation.yaml b/build-system/pr-validation.yaml index 5afcc597..d1db25b8 100644 --- a/build-system/pr-validation.yaml +++ b/build-system/pr-validation.yaml @@ -27,8 +27,8 @@ jobs: parameters: name: 'linux_pr' displayName: 'Linux PR Validation' - vmImage: 'ubuntu-16.04' + vmImage: 'ubuntu-20.04' scriptFileName: ./build.sh scriptArgs: all outputDirectory: 'bin/nuget' - artifactName: 'nuget_pack-$(Build.BuildId)' \ No newline at end of file + artifactName: 'nuget_pack-$(Build.BuildId)' diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs index d80fc3bc..7987cb27 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs @@ -3,6 +3,7 @@ using System.Collections.Immutable; using System.Data; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -138,21 +139,18 @@ private async Task WriteJournalRows(Seq xs) { //hot path: //If we only have one row, penalty for BulkCopy - //Isn't worth it due to insert caching/etc. - if (xs.Count > 1) - { + //Isn't worth it due to insert caching/transaction/etc. + var count = xs.Count; + if (count > 1) await InsertMultiple(xs); - } - else if (xs.Count > 0) - { - await InsertSingle(xs); - } + else if (count == 1) await InsertSingle(xs); } - } private async Task InsertSingle(Seq xs) { + //If we are writing a single row, + //we don't need to worry about transactions. using (var db = _connectionFactory.GetConnection()) { await db.InsertAsync(xs.Head); @@ -176,9 +174,6 @@ await db.GetTable() .MaxRowByRowSize ? BulkCopyType.Default : BulkCopyType.MultipleRows, - //TODO: When Parameters are allowed, - //Make a Config Option - //Or default to true UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert, MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize }, xs); @@ -200,45 +195,61 @@ await db.GetTable() } } - public async Task> AsyncWriteMessages( - IEnumerable messages, long timeStamp = 0) - { - var serializedTries = Serializer.Serialize(messages, timeStamp); - - //Just a little bit of magic here; - //.ToList() keeps it all working later for whatever reason - //while still keeping our allocations in check. + //By using a custom flatten here, we avoid an Enumerable/LINQ allocation + //And are able to have a little more control over default capacity of array. + static List FlattenListOfListsToList(List>> source) { - /*var trySet = new List(); - foreach (var serializedTry in serializedTries) + //List ResultSet( + // Akka.Util.Try> item) + //{ + // return item.Success.GetOrElse(new List(0)); + //} + + List rows = new List(source.Count > 4 ? source.Count:4); + for (var index = 0; index < source.Count; index++) { - trySet.AddRange(serializedTry.Success.GetOrElse(new List(0))); + var item = source[index].Success.Value; + if (item != null) + { + rows.AddRange(item); + } + //rows.AddRange(ResultSet(source[index])); } - var rows = Seq(trySet);*/ - var rows = Seq(serializedTries.SelectMany(serializedTry => - serializedTry.Success.GetOrElse(new List(0))) - .ToList()); - // + return rows; + } + public async Task> AsyncWriteMessages( + IEnumerable messages, long timeStamp = 0) + { + var serializedTries = Serializer.Serialize(messages, timeStamp); - return await QueueWriteJournalRows(rows).ContinueWith(task => - { - //We actually are trying to interleave our tasks here... - //Basically, if serialization failed our task will likely - //Show success - //But we instead should display the serialization failure - return serializedTries.Select(r => - r.IsSuccess - ? (task.IsFaulted - ? TryUnwrapException(task.Exception) - : null) - : r.Failure.Value).ToImmutableList(); - }, CancellationToken.None, - TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Default); + //Fold our List of Lists into a single sequence + var rows = Seq(FlattenListOfListsToList(serializedTries)); + //Wait for the write to go through. If Task fails, write will be captured + //As WriteMessagesFailure. + await QueueWriteJournalRows(rows); + //If we get here, we build an ImmutableList containing our rejections. + //These will be captured as WriteMessagesRejected + return BaseByteArrayJournalDao + .BuildWriteRejections(serializedTries); } + protected static ImmutableList BuildWriteRejections( + List>> serializedTries) + { + Exception[] builderEx = + new Exception[serializedTries.Count]; + for (int i = 0; i < serializedTries.Count; i++) + { + builderEx[i] = (serializedTries[i].Failure.Value); + } + return ImmutableList.CreateRange(builderEx); + } + protected static ImmutableList FailWriteThrowHelper(Exception e) + { + throw TryUnwrapException(e); + } protected static Exception TryUnwrapException(Exception e) { var aggregateException = e as AggregateException; @@ -346,18 +357,31 @@ protected IQueryable MaxMarkedForDeletionMaxPersistenceIdQuery(DataConnect .OrderByDescending(r => r.sequenceNumber) .Select(r => r.sequenceNumber).Take(1); } + + static readonly Expression> metaDataSelector = + + md => + new PersistenceIdAndSequenceNumber() + { + SequenceNumber = md.SequenceNumber, + PersistenceId = md.PersistenceId + }; + static readonly Expression> rowDataSelector = + md => + new PersistenceIdAndSequenceNumber() + { + SequenceNumber = md.sequenceNumber, + PersistenceId = md.persistenceId + }; private IQueryable MaxSeqNumberForPersistenceIdQuery( DataConnection db, string persistenceId, long minSequenceNumber = 0) { + + var queryable = db.GetTable() - .Where(r => r.persistenceId == persistenceId).Select(r => - new - { - SequenceNumber = r.sequenceNumber, - PersistenceId = r.persistenceId - }); + .Where(r => r.persistenceId == persistenceId).Select(rowDataSelector); if (minSequenceNumber != 0) { queryable = queryable.Where(r => @@ -370,18 +394,19 @@ private IQueryable MaxSeqNumberForPersistenceIdQuery( .Where(r => r.SequenceNumber > minSequenceNumber && r.PersistenceId == persistenceId); - queryable = queryable.Union(nextQuery.Select(md => - new - { - SequenceNumber = md.SequenceNumber, - PersistenceId = md.PersistenceId - })); + queryable = queryable.Union(nextQuery.Select(metaDataSelector)); } - return queryable.OrderByDescending(r => r.SequenceNumber) - .Select(r => r.SequenceNumber).Take(1); + return queryable.OrderByDescending(sequenceNumberSelector) + .Select(sequenceNumberSelector).Take(1); } + private static readonly + Expression> + sequenceNumberSelector = + r => r.SequenceNumber; + + public async Task Update(string persistenceId, long sequenceNr, object payload) { @@ -477,4 +502,10 @@ public override } }; } + + public class PersistenceIdAndSequenceNumber + { + public long SequenceNumber { get; set; } + public string PersistenceId { get; set; } + } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs index 78871b38..b201b7ef 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs @@ -33,14 +33,14 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec, public Source, NotUsed> MessagesWithBatch(string persistenceId, long fromSequenceNr, long toSequenceNr, int batchSize, Util.Option<(TimeSpan,IScheduler)> refreshInterval) { - var src = Source - .UnfoldAsync<(long, FlowControl), + return Source + .UnfoldAsync<(long, FlowControlEnum), Seq>>( (Math.Max(1, fromSequenceNr), - FlowControl.Continue.Instance), + FlowControlEnum.Continue), async opt => { - async Task>)>> + async Task>)>> RetrieveNextBatch() { Seq< @@ -48,76 +48,75 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec, using (var conn = _connectionFactory.GetConnection()) { - var waited = Messages(conn, persistenceId, - opt.Item1, - toSequenceNr, batchSize); - msg = await waited + msg = await Messages(conn, persistenceId, + opt.Item1, + toSequenceNr, batchSize) .RunWith( ExtSeq.Seq>(), mat); } var hasMoreEvents = msg.Count == batchSize; - var lastMsg = msg.LastOrDefault(); + //var lastMsg = msg.IsEmpty.LastOrDefault(); Util.Option lastSeq = Util.Option.None; - if (lastMsg != null && lastMsg.IsSuccess) + if (msg.IsEmpty == false) { - lastSeq = lastMsg.Success.Select(r => r.Repr.SequenceNr); - } - else if (lastMsg != null && lastMsg.Failure.HasValue) - { - throw lastMsg.Failure.Value; + + lastSeq = msg.Last.Get().Repr.SequenceNr; } - var hasLastEvent = - lastSeq.HasValue && - lastSeq.Value >= toSequenceNr; - FlowControl nextControl = null; - if (hasLastEvent || opt.Item1 > toSequenceNr) + + FlowControlEnum nextControl = FlowControlEnum.Unknown; + if ((lastSeq.HasValue && + lastSeq.Value >= toSequenceNr) || opt.Item1 > toSequenceNr) { - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; } else if (hasMoreEvents) { - nextControl = FlowControl.Continue.Instance; + nextControl = FlowControlEnum.Continue; } else if (refreshInterval.HasValue == false) { - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; } else { - nextControl = FlowControl.ContinueDelayed - .Instance; + nextControl = FlowControlEnum.ContinueDelayed; } - long nextFrom = 0; + long nextFrom = opt.Item1; if (lastSeq.HasValue) { nextFrom = lastSeq.Value + 1; } - else - { - nextFrom = opt.Item1; - } - return new Util.Option<((long, FlowControl), Seq>)>(( + return new Util.Option<((long, FlowControlEnum), Seq>)>(( (nextFrom, nextControl), msg)); } switch (opt.Item2) { - case FlowControl.Stop _: - return Util.Option<((long, FlowControl), Seq>)>.None; - case FlowControl.Continue _: + case FlowControlEnum.Stop: + return Util.Option<((long, FlowControlEnum), Seq>)>.None; + case FlowControlEnum.Continue: return await RetrieveNextBatch(); - case FlowControl.ContinueDelayed _ when refreshInterval.HasValue: + case FlowControlEnum.ContinueDelayed when refreshInterval.HasValue: return await FutureTimeoutSupport.After(refreshInterval.Value.Item1,refreshInterval.Value.Item2, RetrieveNextBatch); default: - throw new Exception($"Got invalid FlowControl from Queue! Type : {opt.Item2.GetType()}"); + return InvalidFlowThrowHelper(opt); } - }); + }).SelectMany(r => r);; + } - return src.SelectMany(r => r); + private static Util.Option MessagesWithBatchThrowHelper(Util.Try lastMsg) + { + throw lastMsg.Failure.Value; + } + + private static Util.Option<((long, FlowControlEnum), Seq>)> InvalidFlowThrowHelper((long, FlowControlEnum) opt) + { + throw new Exception( + $"Got invalid FlowControl from Queue! Type : {opt.Item2.ToString()}"); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs index 2a575137..f7413f18 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs @@ -1,32 +1,10 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO { - public class FlowControl + public enum FlowControlEnum { - public class Continue : FlowControl - { - private Continue() - { - } - - public static Continue Instance = new Continue(); - } - - public class ContinueDelayed : FlowControl - { - private ContinueDelayed() - { - } - - public static ContinueDelayed Instance = new ContinueDelayed(); - } - - public class Stop : FlowControl - { - private Stop() - { - } - - public static Stop Instance = new Stop(); - } + Unknown, + Continue, + Stop, + ContinueDelayed } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs index d35a87d9..28d0d474 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs @@ -147,11 +147,12 @@ await _journal.MessagesWithBatch(persistenceId, fromSequenceNr, public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { - if (writeInProgress.ContainsKey(persistenceId)) + + if (writeInProgress.TryGetValue(persistenceId, out Task wip)) { //We don't care whether the write succeeded or failed //We just want it to finish. - await new NoThrowAwaiter(writeInProgress[persistenceId]); + await new NoThrowAwaiter(wip); } return await _journal.HighestSequenceNr(persistenceId, fromSequenceNr); } @@ -170,8 +171,10 @@ protected override async Task> //When we are done, we want to send a 'WriteFinished' so that //Sequence Number reads won't block/await/etc. +#pragma warning disable 4014 future.ContinueWith((p) => - self.Tell(new WriteFinished(persistenceId, future)), +#pragma warning restore 4014 + self.Tell(new WriteFinished(persistenceId, p)), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs index 03bca12e..27c6fd68 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs @@ -220,11 +220,11 @@ private Source _eventsByTag(string tag, .JournalSequenceRetrievalConfiguration.AskTimeout; var batchSize = readJournalConfig.MaxBufferSize; return Source - .UnfoldAsync<(long, FlowControl), IImmutableList - >((offset, FlowControl.Continue.Instance), + .UnfoldAsync<(long, FlowControlEnum), IImmutableList + >((offset, FlowControlEnum.Continue), uf => { - async Task)>> retrieveNextBatch() { var queryUntil = @@ -237,25 +237,24 @@ await _currentJournalEventsByTag(tag, uf.Item1, Sink.Seq(), _mat); var hasMoreEvents = xs.Count == batchSize; - FlowControl nextControl = null; + FlowControlEnum nextControl = FlowControlEnum.Unknown; if (terminateAfterOffset.HasValue) { if (!hasMoreEvents && terminateAfterOffset.Value <= queryUntil.Max) - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; if (xs.Exists(r => (r.Offset is Sequence s) && s.Value >= terminateAfterOffset.Value)) - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; } - if (nextControl == null) + if (nextControl == FlowControlEnum.Unknown) { nextControl = hasMoreEvents - ? (FlowControl) FlowControl.Continue - .Instance - : FlowControl.ContinueDelayed.Instance; + ? FlowControlEnum.Continue + : FlowControlEnum.ContinueDelayed; } var nextStartingOffset = (xs.Count == 0) @@ -264,7 +263,7 @@ await _currentJournalEventsByTag(tag, uf.Item1, .Where(r => r != null).Max(t => t.Value); return new Akka.Util.Option<((long nextStartingOffset, - FlowControl + FlowControlEnum nextControl), IImmutableList xs) >(( @@ -273,15 +272,15 @@ await _currentJournalEventsByTag(tag, uf.Item1, switch (uf.Item2) { - case FlowControl.Stop _: + case FlowControlEnum.Stop: return Task.FromResult(Akka.Util - .Option<((long, FlowControl), + .Option<((long, FlowControlEnum), IImmutableList)> .None); - case FlowControl.Continue _: + case FlowControlEnum.Continue: return retrieveNextBatch(); - case FlowControl.ContinueDelayed _: + case FlowControlEnum.ContinueDelayed : return Akka.Pattern.FutureTimeoutSupport.After( readJournalConfig.RefreshInterval, system.Scheduler, @@ -289,7 +288,7 @@ await _currentJournalEventsByTag(tag, uf.Item1, default: return Task.FromResult(Akka.Util - .Option<((long, FlowControl), + .Option<((long, FlowControlEnum), IImmutableList)> .None); } @@ -317,11 +316,11 @@ private Source _events( .JournalSequenceRetrievalConfiguration.AskTimeout; var batchSize = readJournalConfig.MaxBufferSize; return Source - .UnfoldAsync<(long, FlowControl), IImmutableList - >((offset, FlowControl.Continue.Instance), + .UnfoldAsync<(long, FlowControlEnum), IImmutableList + >((offset, FlowControlEnum.Continue), uf => { - async Task)>> retrieveNextBatch() { var queryUntil = @@ -334,25 +333,24 @@ await _currentJournalEvents(uf.Item1, Sink.Seq(), _mat); var hasMoreEvents = xs.Count == batchSize; - FlowControl nextControl = null; + FlowControlEnum nextControl = FlowControlEnum.Unknown; if (terminateAfterOffset.HasValue) { if (!hasMoreEvents && terminateAfterOffset.Value <= queryUntil.Max) - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; if (xs.Exists(r => (r.Offset is Sequence s) && s.Value >= terminateAfterOffset.Value)) - nextControl = FlowControl.Stop.Instance; + nextControl = FlowControlEnum.Stop; } - if (nextControl == null) + if (nextControl == FlowControlEnum.Unknown) { nextControl = hasMoreEvents - ? (FlowControl) FlowControl.Continue - .Instance - : FlowControl.ContinueDelayed.Instance; + ? FlowControlEnum.Continue + : FlowControlEnum.ContinueDelayed; } var nextStartingOffset = (xs.Count == 0) @@ -361,7 +359,7 @@ await _currentJournalEvents(uf.Item1, .Where(r => r != null).Max(t => t.Value); return new Akka.Util.Option<((long nextStartingOffset, - FlowControl + FlowControlEnum nextControl), IImmutableList xs) >(( @@ -370,15 +368,15 @@ await _currentJournalEvents(uf.Item1, switch (uf.Item2) { - case FlowControl.Stop _: + case FlowControlEnum.Stop: return Task.FromResult(Akka.Util - .Option<((long, FlowControl), + .Option<((long, FlowControlEnum), IImmutableList)> .None); - case FlowControl.Continue _: + case FlowControlEnum.Continue: return retrieveNextBatch(); - case FlowControl.ContinueDelayed _: + case FlowControlEnum.ContinueDelayed: return Akka.Pattern.FutureTimeoutSupport.After( readJournalConfig.RefreshInterval, system.Scheduler, @@ -386,7 +384,7 @@ await _currentJournalEvents(uf.Item1, default: return Task.FromResult(Akka.Util - .Option<((long, FlowControl), + .Option<((long, FlowControlEnum), IImmutableList)> .None); }