Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,17 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Events;
using JasperFx.Events.Aggregation;
using JasperFx.Events.Daemon;
using JasperFx.Events.Grouping;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Exceptions;
using Marten.Internal.Sessions;
using Marten.Metadata;
using Marten.Schema;
using Marten.Storage;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using Marten.Util;
using NSubstitute;
using Shouldly;
using Xunit;
using Xunit.Abstractions;
Expand All @@ -47,7 +38,7 @@ public using_explicit_code_for_aggregations(ITestOutputHelper output)
[InlineData(false, EventAppendMode.Quick, ProjectionLifecycle.Inline, false)]
public void configure_mapping(bool isSingleGrouper, EventAppendMode appendMode, ProjectionLifecycle lifecycle, bool useVersionFromStream)
{
var projection = isSingleGrouper ? (IAggregateProjection)new MySingleStreamProjection(){Lifecycle = lifecycle} : new MyCustomProjection(){Lifecycle = lifecycle};
var projection = isSingleGrouper ? (IAggregateProjection)new MySingleStreamProjection() { Lifecycle = lifecycle } : new MyCustomProjection() { Lifecycle = lifecycle };
var mapping = DocumentMapping.For<MyAggregate>();
mapping.StoreOptions.Events.AppendMode = appendMode;

Expand Down Expand Up @@ -555,6 +546,47 @@ public async Task use_fetch_latest_with_custom_projection()
aggregate.Deleted.ShouldBeTrue();
}

[Fact]
public async Task return_correct_data_after_restarts()
{
var streamId = Guid.NewGuid();
theSession.Events.StartStream<StartAndStopAggregate>(streamId,
// 1
new Start(),
// 2
new Increment(),
// 3
new Increment());

await theSession.SaveChangesAsync();

// start state
var firstResult = await theSession.Query<StartAndStopAggregate>().FirstOrDefaultAsync(x => x.Id == streamId);
Assert.NotNull(firstResult);

var stream = await theSession.Events.FetchForWriting<StartAndStopAggregate>(streamId, CancellationToken.None);
stream.AppendOne(new End());

await theSession.SaveChangesAsync();

// soft end state
var softEndState = await theSession.Query<StartAndStopAggregate>().FirstOrDefaultAsync(x => x.Id == streamId);
Assert.Null(softEndState);

var softEndStateByLoadAsync = await theSession.LoadAsync<StartAndStopAggregate>(streamId);
Assert.NotNull(softEndStateByLoadAsync);
Assert.True(softEndStateByLoadAsync.Deleted);

var streamForRestart = await theSession.Events.FetchForWriting<StartAndStopAggregate>(streamId, CancellationToken.None);
streamForRestart.AppendOne(new Restart());

await theSession.SaveChangesAsync();

// soft end state
var restartState = await theSession.Query<StartAndStopAggregate>().FirstOrDefaultAsync(x => x.Id == streamId);
Assert.NotNull(restartState);
Assert.False(restartState.Deleted);
}
}

#region sample_StartAndStopAggregate
Expand Down Expand Up @@ -638,7 +670,8 @@ public override (StartAndStopAggregate?, ActionType) DetermineAction(StartAndSto

case Increment when snapshot is { Deleted: false }:

if (actionType == ActionType.StoreThenSoftDelete) continue;
if (actionType == ActionType.StoreThenSoftDelete)
continue;

// Use explicit code to only apply this event
// if the snapshot already exists
Expand Down
Loading