Skip to content

Commit dc151cc

Browse files
committed
Allows the usage of CustomProjection with AggregateByStream() and strong typed identifiers for the aggregate
1 parent 5511915 commit dc151cc

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed

src/EventSourcingTests/Aggregation/CustomProjectionTests.cs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
24
using System.Threading;
35
using System.Threading.Tasks;
46
using JasperFx.Core.Reflection;
7+
using Marten;
58
using Marten.Events;
69
using Marten.Events.Aggregation;
710
using Marten.Events.Projections;
@@ -300,6 +303,46 @@ public async Task use_inline_synchronous()
300303
DCount = 1
301304
});
302305
}
306+
307+
[Fact]
308+
public async Task use_strong_typed_guid_based_identifier()
309+
{
310+
var mapping = new DocumentMapping(typeof(MyCustomGuidAggregate), new StoreOptions());
311+
mapping.IdMember.Name.ShouldBe("Id");
312+
313+
StoreOptions(opts =>
314+
{
315+
opts.Projections.Add(new MyCustomGuidProjection(), ProjectionLifecycle.Inline);
316+
});
317+
318+
var streamId = Guid.NewGuid();
319+
theSession.Events.StartStream<MyCustomGuidAggregate>(streamId, new AEvent(), new BEvent(), new BEvent());
320+
await theSession.SaveChangesAsync();
321+
322+
var aggregate = await theSession.LoadAsync<MyCustomGuidAggregate>(new MyCustomGuidId(streamId));
323+
aggregate.A.ShouldBe(1);
324+
aggregate.B.ShouldBe(2);
325+
aggregate.C.ShouldBe(0);
326+
}
327+
328+
[Fact]
329+
public async Task use_strong_typed_string_based_identifier()
330+
{
331+
StoreOptions(opts =>
332+
{
333+
opts.Events.StreamIdentity = StreamIdentity.AsString;
334+
opts.Projections.Add(new MyCustomStreamProjection(), ProjectionLifecycle.Inline);
335+
});
336+
337+
var streamId = Guid.NewGuid().ToString();
338+
theSession.Events.StartStream<MyCustomStringAggregate>(streamId, new AEvent(), new BEvent(), new BEvent());
339+
await theSession.SaveChangesAsync();
340+
341+
var aggregate = await theSession.LoadAsync<MyCustomStringAggregate>(new MyCustomStringId(streamId));
342+
aggregate.A.ShouldBe(1);
343+
aggregate.B.ShouldBe(2);
344+
aggregate.C.ShouldBe(0);
345+
}
303346
}
304347

305348
public class CustomEvent: INumbered
@@ -344,6 +387,94 @@ public override async ValueTask ApplyChangesAsync(DocumentSessionBase session, E
344387
}
345388
}
346389

390+
public record struct MyCustomStringId(string Value);
391+
392+
public class MyCustomStringAggregate
393+
{
394+
public MyCustomStringId Id { get; set; }
395+
public int A { get; set; }
396+
public int B { get; set; }
397+
public int C { get; set; }
398+
public int D { get; set; }
399+
}
400+
401+
public class MyCustomStreamProjection: CustomProjection<MyCustomStringAggregate, MyCustomStringId>
402+
{
403+
public MyCustomStreamProjection()
404+
{
405+
AggregateByStream();
406+
}
407+
408+
public override MyCustomStringAggregate Apply(MyCustomStringAggregate snapshot, IReadOnlyList<IEvent> events)
409+
{
410+
snapshot ??= new MyCustomStringAggregate();
411+
foreach (var e in events.Select(x => x.Data))
412+
{
413+
switch (e)
414+
{
415+
case AEvent:
416+
snapshot.A++;
417+
break;
418+
case BEvent:
419+
snapshot.B++;
420+
break;
421+
case CEvent:
422+
snapshot.C++;
423+
break;
424+
case DEvent:
425+
snapshot.D++;
426+
break;
427+
}
428+
}
429+
430+
return snapshot;
431+
}
432+
}
433+
434+
public class MyCustomGuidAggregate
435+
{
436+
public MyCustomGuidId Id { get; set; }
437+
public int A { get; set; }
438+
public int B { get; set; }
439+
public int C { get; set; }
440+
public int D { get; set; }
441+
}
442+
443+
public class MyCustomGuidProjection: CustomProjection<MyCustomGuidAggregate, MyCustomGuidId>
444+
{
445+
public MyCustomGuidProjection()
446+
{
447+
AggregateByStream();
448+
}
449+
450+
public override MyCustomGuidAggregate Apply(MyCustomGuidAggregate snapshot, IReadOnlyList<IEvent> events)
451+
{
452+
snapshot ??= new MyCustomGuidAggregate();
453+
foreach (var e in events.Select(x => x.Data))
454+
{
455+
switch (e)
456+
{
457+
case AEvent:
458+
snapshot.A++;
459+
break;
460+
case BEvent:
461+
snapshot.B++;
462+
break;
463+
case CEvent:
464+
snapshot.C++;
465+
break;
466+
case DEvent:
467+
snapshot.D++;
468+
break;
469+
}
470+
}
471+
472+
return snapshot;
473+
}
474+
}
475+
476+
public record struct MyCustomGuidId(Guid Value);
477+
347478
public class MyCustomProjection: CustomProjection<CustomAggregate, int>
348479
{
349480
public MyCustomProjection()

src/Marten/Events/Aggregation/CustomProjection.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Marten.Events.Daemon.Internals;
1111
using Marten.Events.Projections;
1212
using Marten.Exceptions;
13+
using Marten.Internal;
1314
using Marten.Internal.Sessions;
1415
using Marten.Internal.Storage;
1516
using Marten.Schema;
@@ -141,6 +142,8 @@ public virtual async ValueTask ApplyChangesAsync(DocumentSessionBase session, Ev
141142
snapshot = await BuildAsync(session, snapshot, slice.Events()).ConfigureAwait(false);
142143
ApplyMetadata(snapshot, slice.Events().Last());
143144

145+
session.StorageFor<TDoc, TId>().SetIdentity(snapshot, slice.Id);
146+
144147
slice.Aggregate = snapshot;
145148
session.Store(snapshot);
146149
}
@@ -294,6 +297,14 @@ public void AggregateByStream()
294297
{
295298
Slicer = (IEventSlicer<TDoc, TId>)new ByStreamKey<TDoc>();
296299
}
300+
else if (typeof(TId).GetProperties().Any(x => x.PropertyType == typeof(Guid)))
301+
{
302+
Slicer = new ByStreamId<TDoc, TId>(new StoreOptions().RegisterValueType(typeof(TId)));
303+
}
304+
else if (typeof(TId).GetProperties().Any(x => x.PropertyType == typeof(string)))
305+
{
306+
Slicer = new ByStreamKey<TDoc, TId>(new StoreOptions().RegisterValueType(typeof(TId)));
307+
}
297308
else
298309
{
299310
throw new InvalidProjectionException(

0 commit comments

Comments
 (0)