Skip to content

Commit

Permalink
Merge pull request #44 from akkadotnet/dev
Browse files Browse the repository at this point in the history
v1.4.28 Release
  • Loading branch information
Aaronontheweb authored Nov 19, 2021
2 parents 170c39a + 7469ac7 commit d9de166
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 159 deletions.
8 changes: 8 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: 2
updates:
- package-ecosystem: nuget
directory: "/"
schedule:
interval: daily
time: "11:00"
open-pull-requests-limit: 10
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#### 1.4.28 Nov 18 2021 ####
**Perf Enhancements and fixes to failure reporting**

There was an issue found where persistence failures were being reported as rejections when they should not have. This has been fixed alongside some logic cleanup that should lead to more consistent performance.

#### 1.4.21 July 6 2021 ####
**First official Release for Akka.Persistence.Linq2Db**

Expand Down
4 changes: 2 additions & 2 deletions build-system/pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ jobs:
parameters:
name: 'linux_pr'
displayName: 'Linux PR Validation'
vmImage: 'ubuntu-16.04'
vmImage: 'ubuntu-20.04'
scriptFileName: ./build.sh
scriptArgs: all
outputDirectory: 'bin/nuget'
artifactName: 'nuget_pack-$(Build.BuildId)'
artifactName: 'nuget_pack-$(Build.BuildId)'
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Immutable;
using System.Data;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -138,21 +139,18 @@ private async Task WriteJournalRows(Seq<JournalRow> xs)
{
//hot path:
//If we only have one row, penalty for BulkCopy
//Isn't worth it due to insert caching/etc.
if (xs.Count > 1)
{
//Isn't worth it due to insert caching/transaction/etc.
var count = xs.Count;
if (count > 1)
await InsertMultiple(xs);
}
else if (xs.Count > 0)
{
await InsertSingle(xs);
}
else if (count == 1) await InsertSingle(xs);
}

}

private async Task InsertSingle(Seq<JournalRow> xs)
{
//If we are writing a single row,
//we don't need to worry about transactions.
using (var db = _connectionFactory.GetConnection())
{
await db.InsertAsync(xs.Head);
Expand All @@ -176,9 +174,6 @@ await db.GetTable<JournalRow>()
.MaxRowByRowSize
? BulkCopyType.Default
: BulkCopyType.MultipleRows,
//TODO: When Parameters are allowed,
//Make a Config Option
//Or default to true
UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert,
MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize
}, xs);
Expand All @@ -200,45 +195,61 @@ await db.GetTable<JournalRow>()
}
}

public async Task<IImmutableList<Exception>> AsyncWriteMessages(
IEnumerable<AtomicWrite> messages, long timeStamp = 0)
{
var serializedTries = Serializer.Serialize(messages, timeStamp);

//Just a little bit of magic here;
//.ToList() keeps it all working later for whatever reason
//while still keeping our allocations in check.
//By using a custom flatten here, we avoid an Enumerable/LINQ allocation
//And are able to have a little more control over default capacity of array.
static List<JournalRow> FlattenListOfListsToList(List<Akka.Util.Try<List<JournalRow>>> source) {

/*var trySet = new List<JournalRow>();
foreach (var serializedTry in serializedTries)
//List<JournalRow> ResultSet(
// Akka.Util.Try<List<JournalRow>> item)
//{
// return item.Success.GetOrElse(new List<JournalRow>(0));
//}

List<JournalRow> rows = new List<JournalRow>(source.Count > 4 ? source.Count:4);
for (var index = 0; index < source.Count; index++)
{
trySet.AddRange(serializedTry.Success.GetOrElse(new List<JournalRow>(0)));
var item = source[index].Success.Value;
if (item != null)
{
rows.AddRange(item);
}
//rows.AddRange(ResultSet(source[index]));
}

var rows = Seq(trySet);*/
var rows = Seq(serializedTries.SelectMany(serializedTry =>
serializedTry.Success.GetOrElse(new List<JournalRow>(0)))
.ToList());
//
return rows;
}

public async Task<IImmutableList<Exception>> AsyncWriteMessages(
IEnumerable<AtomicWrite> messages, long timeStamp = 0)
{
var serializedTries = Serializer.Serialize(messages, timeStamp);

return await QueueWriteJournalRows(rows).ContinueWith(task =>
{
//We actually are trying to interleave our tasks here...
//Basically, if serialization failed our task will likely
//Show success
//But we instead should display the serialization failure
return serializedTries.Select(r =>
r.IsSuccess
? (task.IsFaulted
? TryUnwrapException(task.Exception)
: null)
: r.Failure.Value).ToImmutableList();
}, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
//Fold our List of Lists into a single sequence
var rows = Seq(FlattenListOfListsToList(serializedTries));
//Wait for the write to go through. If Task fails, write will be captured
//As WriteMessagesFailure.
await QueueWriteJournalRows(rows);
//If we get here, we build an ImmutableList containing our rejections.
//These will be captured as WriteMessagesRejected
return BaseByteArrayJournalDao
.BuildWriteRejections(serializedTries);
}

protected static ImmutableList<Exception> BuildWriteRejections(
List<Akka.Util.Try<List<JournalRow>>> serializedTries)
{
Exception[] builderEx =
new Exception[serializedTries.Count];
for (int i = 0; i < serializedTries.Count; i++)
{
builderEx[i] = (serializedTries[i].Failure.Value);
}
return ImmutableList.CreateRange<Exception>(builderEx);
}
protected static ImmutableList<Exception> FailWriteThrowHelper(Exception e)
{
throw TryUnwrapException(e);
}
protected static Exception TryUnwrapException(Exception e)
{
var aggregateException = e as AggregateException;
Expand Down Expand Up @@ -346,18 +357,31 @@ protected IQueryable<long> MaxMarkedForDeletionMaxPersistenceIdQuery(DataConnect
.OrderByDescending(r => r.sequenceNumber)
.Select(r => r.sequenceNumber).Take(1);
}

static readonly Expression<Func<JournalMetaData, PersistenceIdAndSequenceNumber>> metaDataSelector =

md =>
new PersistenceIdAndSequenceNumber()
{
SequenceNumber = md.SequenceNumber,
PersistenceId = md.PersistenceId
};

static readonly Expression<Func<JournalRow, PersistenceIdAndSequenceNumber>> rowDataSelector =
md =>
new PersistenceIdAndSequenceNumber()
{
SequenceNumber = md.sequenceNumber,
PersistenceId = md.persistenceId
};
private IQueryable<long> MaxSeqNumberForPersistenceIdQuery(
DataConnection db, string persistenceId, long minSequenceNumber = 0)
{



var queryable = db.GetTable<JournalRow>()
.Where(r => r.persistenceId == persistenceId).Select(r =>
new
{
SequenceNumber = r.sequenceNumber,
PersistenceId = r.persistenceId
});
.Where(r => r.persistenceId == persistenceId).Select(rowDataSelector);
if (minSequenceNumber != 0)
{
queryable = queryable.Where(r =>
Expand All @@ -370,18 +394,19 @@ private IQueryable<long> MaxSeqNumberForPersistenceIdQuery(
.Where(r =>
r.SequenceNumber > minSequenceNumber &&
r.PersistenceId == persistenceId);
queryable = queryable.Union(nextQuery.Select(md =>
new
{
SequenceNumber = md.SequenceNumber,
PersistenceId = md.PersistenceId
}));
queryable = queryable.Union(nextQuery.Select(metaDataSelector));
}

return queryable.OrderByDescending(r => r.SequenceNumber)
.Select(r => r.SequenceNumber).Take(1);
return queryable.OrderByDescending(sequenceNumberSelector)
.Select(sequenceNumberSelector).Take(1);
}

private static readonly
Expression<Func<PersistenceIdAndSequenceNumber, long>>
sequenceNumberSelector =
r => r.SequenceNumber;


public async Task<Done> Update(string persistenceId, long sequenceNr,
object payload)
{
Expand Down Expand Up @@ -477,4 +502,10 @@ public override
}
};
}

public class PersistenceIdAndSequenceNumber
{
public long SequenceNumber { get; set; }
public string PersistenceId { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,91 +33,90 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec,
public Source<Util.Try<ReplayCompletion>, NotUsed> MessagesWithBatch(string persistenceId, long fromSequenceNr,
long toSequenceNr, int batchSize, Util.Option<(TimeSpan,IScheduler)> refreshInterval)
{
var src = Source
.UnfoldAsync<(long, FlowControl),
return Source
.UnfoldAsync<(long, FlowControlEnum),
Seq<Util.Try<ReplayCompletion>>>(
(Math.Max(1, fromSequenceNr),
FlowControl.Continue.Instance),
FlowControlEnum.Continue),
async opt =>
{
async Task<Util.Option<((long, FlowControl), Seq<Util.Try<ReplayCompletion>>)>>
async Task<Util.Option<((long, FlowControlEnum), Seq<Util.Try<ReplayCompletion>>)>>
RetrieveNextBatch()
{
Seq<
Util.Try<ReplayCompletion>> msg;
using (var conn =
_connectionFactory.GetConnection())
{
var waited = Messages(conn, persistenceId,
opt.Item1,
toSequenceNr, batchSize);
msg = await waited
msg = await Messages(conn, persistenceId,
opt.Item1,
toSequenceNr, batchSize)
.RunWith(
ExtSeq.Seq<Util.Try<ReplayCompletion>>(), mat);
}

var hasMoreEvents = msg.Count == batchSize;
var lastMsg = msg.LastOrDefault();
//var lastMsg = msg.IsEmpty.LastOrDefault();
Util.Option<long> lastSeq = Util.Option<long>.None;
if (lastMsg != null && lastMsg.IsSuccess)
if (msg.IsEmpty == false)
{
lastSeq = lastMsg.Success.Select(r => r.Repr.SequenceNr);
}
else if (lastMsg != null && lastMsg.Failure.HasValue)
{
throw lastMsg.Failure.Value;

lastSeq = msg.Last.Get().Repr.SequenceNr;
}

var hasLastEvent =
lastSeq.HasValue &&
lastSeq.Value >= toSequenceNr;
FlowControl nextControl = null;
if (hasLastEvent || opt.Item1 > toSequenceNr)

FlowControlEnum nextControl = FlowControlEnum.Unknown;
if ((lastSeq.HasValue &&
lastSeq.Value >= toSequenceNr) || opt.Item1 > toSequenceNr)
{
nextControl = FlowControl.Stop.Instance;
nextControl = FlowControlEnum.Stop;
}
else if (hasMoreEvents)
{
nextControl = FlowControl.Continue.Instance;
nextControl = FlowControlEnum.Continue;
}
else if (refreshInterval.HasValue == false)
{
nextControl = FlowControl.Stop.Instance;
nextControl = FlowControlEnum.Stop;
}
else
{
nextControl = FlowControl.ContinueDelayed
.Instance;
nextControl = FlowControlEnum.ContinueDelayed;
}

long nextFrom = 0;
long nextFrom = opt.Item1;
if (lastSeq.HasValue)
{
nextFrom = lastSeq.Value + 1;
}
else
{
nextFrom = opt.Item1;
}

return new Util.Option<((long, FlowControl), Seq<Util.Try<ReplayCompletion>>)>((
return new Util.Option<((long, FlowControlEnum), Seq<Util.Try<ReplayCompletion>>)>((
(nextFrom, nextControl), msg));
}

switch (opt.Item2)
{
case FlowControl.Stop _:
return Util.Option<((long, FlowControl), Seq<Util.Try<ReplayCompletion>>)>.None;
case FlowControl.Continue _:
case FlowControlEnum.Stop:
return Util.Option<((long, FlowControlEnum), Seq<Util.Try<ReplayCompletion>>)>.None;
case FlowControlEnum.Continue:
return await RetrieveNextBatch();
case FlowControl.ContinueDelayed _ when refreshInterval.HasValue:
case FlowControlEnum.ContinueDelayed when refreshInterval.HasValue:
return await FutureTimeoutSupport.After(refreshInterval.Value.Item1,refreshInterval.Value.Item2, RetrieveNextBatch);
default:
throw new Exception($"Got invalid FlowControl from Queue! Type : {opt.Item2.GetType()}");
return InvalidFlowThrowHelper(opt);
}
});
}).SelectMany(r => r);;
}

return src.SelectMany(r => r);
private static Util.Option<long> MessagesWithBatchThrowHelper(Util.Try<ReplayCompletion> lastMsg)
{
throw lastMsg.Failure.Value;
}

private static Util.Option<((long, FlowControlEnum), Seq<Util.Try<ReplayCompletion>>)> InvalidFlowThrowHelper((long, FlowControlEnum) opt)
{
throw new Exception(
$"Got invalid FlowControl from Queue! Type : {opt.Item2.ToString()}");
}
}
}
Loading

0 comments on commit d9de166

Please sign in to comment.