Skip to content

Commit 282dacb

Browse files
committed
Initial version of query for non stale aggregate data.
1 parent 7ecdb00 commit 282dacb

File tree

5 files changed

+62
-3
lines changed

5 files changed

+62
-3
lines changed

src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs

+29
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using EventSourcingTests.FetchForWriting;
66
using EventSourcingTests.Projections;
77
using JasperFx.Core;
8+
using Marten;
89
using Marten.Events;
910
using Marten.Events.Daemon;
1011
using Marten.Events.Projections;
@@ -95,4 +96,32 @@ await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync(typeof(Simple
9596
}
9697

9798
}
99+
100+
[Fact]
101+
public async Task try_to_query_for_non_stale_data_by_aggregate_type()
102+
{
103+
var task = Task.Run(async () =>
104+
{
105+
using var session = theStore.LightweightSession();
106+
107+
for (int i = 0; i < 20; i++)
108+
{
109+
session.Events.StartStream(new AEvent(), new BEvent());
110+
session.Events.StartStream(new CEvent(), new BEvent());
111+
session.Events.StartStream(new DEvent(), new AEvent());
112+
var streamId = session.Events.StartStream(new DEvent(), new CEvent());
113+
await session.SaveChangesAsync();
114+
}
115+
});
116+
117+
using var daemon = await theStore.BuildProjectionDaemonAsync();
118+
await daemon.StartAllAsync();
119+
120+
await Task.Delay(1.Seconds());
121+
122+
var items = await theSession.QueryForNonStaleData<SimpleAggregate>(30.Seconds()).ToListAsync();
123+
items.Count.ShouldBeGreaterThan(0);
124+
125+
await task;
126+
}
98127
}

src/Marten/Events/AsyncProjectionTestingExtensions.cs

+1-3
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,7 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
140140
{
141141
// Number of active projection shards, plus the high water mark
142142
var shards = database.As<MartenDatabase>().Options.Projections.AsyncShardsPublishingType(aggregationType);
143-
if (!shards.Any())
144-
throw new InvalidOperationException(
145-
$"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}");
143+
if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}");
146144

147145
var all = shards.Concat([ShardName.HighWaterMark]).ToArray();
148146
var tracking = new Dictionary<string, long>();

src/Marten/IQuerySession.cs

+13
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ public interface IQuerySession: IDisposable, IAsyncDisposable
165165

166166
#endregion
167167

168+
/// <summary>
169+
/// If you are querying for data against a projected event aggregation that is updated asynchronously
170+
/// through the async daemon, this method will ensure that you are querying against the latest events appended
171+
/// to the system by waiting for the aggregate to catch up to the current "high water mark" of the event store
172+
/// at the time this query is executed.
173+
/// </summary>
174+
/// <param name="timeout"></param>
175+
/// <typeparam name="T"></typeparam>
176+
/// <returns></returns>
177+
IMartenQueryable<T> QueryForNonStaleData<T>(TimeSpan timeout);
178+
168179
/// <summary>
169180
/// Queries the document storage table for the document type T by supplied SQL. See
170181
/// https://martendb.io/documents/querying/sql.html for more information on usage.
@@ -763,4 +774,6 @@ Task<DocumentMetadata> MetadataForAsync<T>(T entity,
763774
/// <typeparam name="T"></typeparam>
764775
/// <returns></returns>
765776
Task<T> QueryByPlanAsync<T>(IQueryPlan<T> plan, CancellationToken token = default);
777+
778+
766779
}

src/Marten/Internal/Sessions/QuerySession.Querying.cs

+9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#nullable enable
2+
using System;
23
using System.Collections.Generic;
34
using System.Threading;
45
using System.Threading.Tasks;
@@ -22,6 +23,14 @@ public IMartenQueryable<T> Query<T>()
2223
return new MartenLinqQueryable<T>(this);
2324
}
2425

26+
public IMartenQueryable<T> QueryForNonStaleData<T>(TimeSpan timeout)
27+
{
28+
var queryable = new MartenLinqQueryable<T>(this);
29+
queryable.MartenProvider.Waiter = new WaitForAggregate(timeout);
30+
31+
return queryable;
32+
}
33+
2534
public IReadOnlyList<T> Query<T>(string sql, params object[] parameters)
2635
{
2736
assertNotDisposed();

src/Marten/Linq/MartenLinqQueryProvider.cs

+10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Runtime.CompilerServices;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using Marten.Events;
1011
using Marten.Exceptions;
1112
using Marten.Internal.Sessions;
1213
using Marten.Linq.Parsing;
@@ -16,6 +17,8 @@
1617

1718
namespace Marten.Linq;
1819

20+
internal record WaitForAggregate(TimeSpan Timeout);
21+
1922
internal class MartenLinqQueryProvider: IQueryProvider
2023
{
2124
private readonly QuerySession _session;
@@ -28,6 +31,8 @@ public MartenLinqQueryProvider(QuerySession session, Type type)
2831

2932
public Type SourceType { get; }
3033

34+
internal WaitForAggregate? Waiter { get; set; }
35+
3136
internal QueryStatistics Statistics { get; set; } = null!;
3237

3338
public IQueryable CreateQuery(Expression expression)
@@ -70,6 +75,11 @@ internal async ValueTask EnsureStorageExistsAsync(LinqQueryParser parser,
7075
{
7176
await _session.Database.EnsureStorageExistsAsync(documentType, cancellationToken).ConfigureAwait(false);
7277
}
78+
79+
if (Waiter != null)
80+
{
81+
await _session.Database.WaitForNonStaleProjectionDataAsync(SourceType, Waiter.Timeout, cancellationToken).ConfigureAwait(false);
82+
}
7383
}
7484

7585

0 commit comments

Comments
 (0)