Skip to content

Commit 226c2aa

Browse files
Merge pull request #176 from akkadotnet/dev
v1.4.14 Release
2 parents aeaff6c + 481fa67 commit 226c2aa

11 files changed

+57
-27
lines changed

RELEASE_NOTES.md

+2-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
#### 1.4.12 November 26 2020 ####
1+
#### 1.4.14 January 13 2021 ####
22

3-
* Bump Akka version to 1.4.12
4-
* Corrected `CurrentPersistentIds` query and `AllPersistentIds` queries to be more memory efficient and query entity ID data directly from Mongo
5-
* Introduced `AllEvents` and `CurrentEvents` query to read the entire MongoDb journal
6-
* Deprecated previous `GetMaxSeqNo` behavior - we no longer query the max sequence number directly from the journal AND the metadata collection. We only get that data directly from the metadata collection itself, which should make this query an O(1) operation rather than O(n)
3+
* Bump [Akka.NET version to 1.4.14](https://github.com/akkadotnet/akka.net/releases/tag/1.4.14), which adds the `Timestamp` to the `EventEnvelope` data structure - so it can be used for sorting / ordering after the fact in an Akka.Persistence.Query

src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
</PackageReference>
1414
<PackageReference Include="xunit" Version="$(XunitVersion)" />
1515
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
16-
<PackageReference Include="FluentAssertions" Version="5.10.3" />
17-
<PackageReference Include="Mongo2Go" Version="2.2.14" />
16+
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
17+
<PackageReference Include="Mongo2Go" Version="2.2.16" />
1818
<PackageReference Include="System.Net.NetworkInformation" Version="4.3.0" />
1919
</ItemGroup>
2020

src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs

+5-3
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag()
7979
public void Bug80_CurrentEventsByTag_should_Recover_until_end()
8080
{
8181
var actor = Sys.ActorOf(TagActor.Props("y"));
82-
var msgCount = 1200;
82+
//increased this to test for non-collision with the generated timestamps
83+
var msgCount = 5000;
8384
actor.Tell(msgCount);
8485
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));
8586

@@ -96,7 +97,8 @@ public void Bug80_CurrentEventsByTag_should_Recover_until_end()
9697
public void Bug80_AllEventsByTag_should_Recover_all_messages()
9798
{
9899
var actor = Sys.ActorOf(TagActor.Props("y"));
99-
var msgCount = 1200;
100+
//increased this to test for non-collision with the generated timestamps
101+
var msgCount = 5000;
100102
actor.Tell(msgCount);
101103
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));
102104

@@ -176,7 +178,7 @@ public object ToJournal(object evt)
176178
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
177179
{
178180
var specString = @"
179-
akka.test.single-expect-default = 3s
181+
akka.test.single-expect-default = 10s
180182
akka.persistence {
181183
publish-plugin-commands = on
182184
journal {

src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
namespace Akka.Persistence.MongoDb.Tests
1717
{
1818
[Collection("MongoDbSpec")]
19-
public class MongoDbCurrentEventsByPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
19+
public class MongoDbCurrentEventsByPersistenceIdsSpec : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
2020
{
2121
public static readonly AtomicCounter Counter = new AtomicCounter(0);
2222
private readonly ITestOutputHelper _output;

src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@
1111
<ItemGroup>
1212
<PackageReference Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
1313
<PackageReference Include="akka.streams" Version="$(AkkaVersion)" />
14-
<PackageReference Include="MongoDB.Driver" Version="2.11.4" />
14+
<PackageReference Include="MongoDB.Driver" Version="2.11.5" />
1515
</ItemGroup>
1616
</Project>

src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs

-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public class JournalEntry
3535
[BsonElement("Manifest")]
3636
public string Manifest { get; set; }
3737

38-
3938
[BsonElement("Ordering")]
4039
public BsonTimestamp Ordering { get; set; }
4140

src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs

+32-9
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
317317
return new JournalEntry
318318
{
319319
Id = message.PersistenceId + "_" + message.SequenceNr,
320-
//Ordering = _sequenceRepository.GetSequenceValue("journalentry"),
321320
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
322-
//Timestamp = new BsonTimestamp(0),
323321
IsDeleted = message.IsDeleted,
324322
Payload = payload,
325323
PersistenceId = message.PersistenceId,
@@ -338,9 +336,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
338336
return new JournalEntry
339337
{
340338
Id = message.PersistenceId + "_" + message.SequenceNr,
341-
//Ordering = _sequenceRepository.GetSequenceValue("journalentry"),
342339
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
343-
//Timestamp = new BsonTimestamp(0),
344340
IsDeleted = message.IsDeleted,
345341
Payload = binary,
346342
PersistenceId = message.PersistenceId,
@@ -351,8 +347,25 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
351347
};
352348
}
353349

350+
private static long ToTicks(BsonTimestamp bson)
351+
{
352+
353+
354+
// BSON Timestamps are stored natively as Unix epoch seconds + an ordinal value
355+
356+
// need to use BsonTimestamp.Timestamp because the ordinal value doesn't actually have any
357+
// bearing on the time - it's used to try to somewhat order the events that all occurred concurrently
358+
// according to the MongoDb clock. No need to include that data in the EventEnvelope.Timestamp field
359+
// which is used entirely for end-user purposes.
360+
//
361+
// See https://docs.mongodb.com/manual/reference/bson-types/#timestamps
362+
363+
return DateTimeOffset.FromUnixTimeSeconds(bson.Timestamp).Ticks;
364+
}
365+
354366
private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sender)
355367
{
368+
356369
if (_settings.LegacySerialization)
357370
{
358371
var manifest = string.IsNullOrEmpty(entry.Manifest) ? entry.Payload.GetType().TypeQualifiedName() : entry.Manifest;
@@ -363,14 +376,24 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
363376
entry.PersistenceId,
364377
manifest,
365378
entry.IsDeleted,
366-
sender);
379+
sender,
380+
timestamp: ToTicks(entry.Ordering)); // MongoDb timestamps are stored as Unix Epoch
367381
}
368382

369383
var legacy = entry.SerializerId.HasValue || !string.IsNullOrEmpty(entry.Manifest);
370384
if (!legacy)
371385
{
372386
var ser = _serialization.FindSerializerForType(typeof(Persistent));
373-
return ser.FromBinary<Persistent>((byte[]) entry.Payload);
387+
var output = ser.FromBinary<Persistent>((byte[])entry.Payload);
388+
389+
// backwards compatibility for https://github.com/akkadotnet/akka.net/pull/4680
390+
// it the timestamp is not defined in the binary payload
391+
if (output.Timestamp == 0L)
392+
{
393+
output = (Persistent)output.WithTimestamp(ToTicks(entry.Ordering));
394+
}
395+
396+
return output;
374397
}
375398

376399
int? serializerId = null;
@@ -396,14 +419,14 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
396419
}
397420

398421
if (deserialized is Persistent p)
399-
return p;
422+
return (Persistent)p.WithTimestamp(ToTicks(entry.Ordering));
400423

401-
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender);
424+
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering));
402425
}
403426
else // backwards compat for object serialization - Payload was already deserialized by BSON
404427
{
405428
return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest,
406-
entry.IsDeleted, sender);
429+
entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering));
407430
}
408431

409432
}

src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ protected bool Replaying(object message)
113113
offset: new Sequence(replayed.Offset),
114114
persistenceId: replayed.Persistent.PersistenceId,
115115
sequenceNr: replayed.Persistent.SequenceNr,
116+
timestamp: replayed.Persistent.Timestamp,
116117
@event: replayed.Persistent.Payload));
117118

118119
CurrentOffset = replayed.Offset;

src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs

+1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ protected Receive Replaying(int limit)
131131
offset: new Sequence(seqNr),
132132
persistenceId: PersistenceId,
133133
sequenceNr: seqNr,
134+
timestamp: replayed.Persistent.Timestamp,
134135
@event: replayed.Persistent.Payload));
135136
CurrentSequenceNr = seqNr + 1;
136137
Buffer.DeliverBuffer(TotalDemand);

src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs

+1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ protected Receive Replaying(int limit)
125125
offset: new Sequence(replayed.Offset),
126126
persistenceId: replayed.Persistent.PersistenceId,
127127
sequenceNr: replayed.Persistent.SequenceNr,
128+
timestamp: replayed.Persistent.Timestamp,
128129
@event: replayed.Persistent.Payload));
129130

130131
CurrentOffset = replayed.Offset;

src/common.props

+11-5
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
<Project>
22
<PropertyGroup>
3-
<Copyright>Copyright © 2013-2020 Akka.NET Project</Copyright>
3+
<Copyright>Copyright © 2013-2021 Akka.NET Project</Copyright>
44
<Authors>Akka.NET Contrib</Authors>
5-
<VersionPrefix>1.4.1</VersionPrefix>
5+
<VersionPrefix>1.4.14</VersionPrefix>
66
<PackageIconUrl>http://getakka.net/images/akkalogo.png</PackageIconUrl>
77
<PackageProjectUrl>https://github.com/akkadotnet/Akka.Persistence.MongoDB</PackageProjectUrl>
88
<PackageLicenseUrl>https://github.com/akkadotnet/Akka.Persistence.MongoDB/blob/master/LICENSE.md</PackageLicenseUrl>
9-
<PackageReleaseNotes>Bump Akka version to 1.4.1</PackageReleaseNotes>
9+
<PackageReleaseNotes>
10+
Bump Akka version to 1.4.14
11+
Corrected `CurrentPersistentIds` query and `AllPersistentIds` queries to be more memory efficient and query entity ID data directly from Mongo
12+
Introduced `AllEvents` and `CurrentEvents` query to read the entire MongoDb journal
13+
Deprecated previous `GetMaxSeqNo` behavior - we no longer query the max sequence number directly from the journal AND the metadata collection. We only get that data directly from the metadata collection itself, which should make this query an O(1) operation rather than O(n)
14+
</PackageReleaseNotes>
1015
<GenerateDocumentationFile>true</GenerateDocumentationFile>
1116
<Description>Akka Persistence journal and snapshot store backed by MongoDB database.</Description>
1217
<NoWarn>$(NoWarn);CS1591</NoWarn>
1318
</PropertyGroup>
1419
<PropertyGroup>
1520
<XunitVersion>2.4.1</XunitVersion>
16-
<TestSdkVersion>16.8.0</TestSdkVersion>
17-
<AkkaVersion>1.4.12</AkkaVersion>
21+
<TestSdkVersion>16.8.3</TestSdkVersion>
22+
<AkkaVersion>1.4.14</AkkaVersion>
23+
<FluentAssertionsVersion>4.14.0</FluentAssertionsVersion>
1824
</PropertyGroup>
1925
<!-- SourceLink support for all Akka.NET projects -->
2026
<ItemGroup>

0 commit comments

Comments
 (0)