@@ -31,7 +31,19 @@ import org.slf4j.Logger
31
31
32
32
object QueryState {
33
33
val empty : QueryState =
34
- QueryState (TimestampOffset .Zero , 0 , 0 , 0 , 0 , backtrackingCount = 0 , TimestampOffset .Zero , 0 , 0 , Buckets .empty)
34
+ QueryState (
35
+ latest = TimestampOffset .Zero ,
36
+ rowCount = 0 ,
37
+ rowCountSinceBacktracking = 0 ,
38
+ queryCount = 0 ,
39
+ idleCount = 0 ,
40
+ backtrackingCount = 0 ,
41
+ latestBacktracking = TimestampOffset .Zero ,
42
+ latestBacktrackingSeenCount = 0 ,
43
+ backtrackingExpectFiltered = 0 ,
44
+ buckets = Buckets .empty,
45
+ previous = TimestampOffset .Zero ,
46
+ previousBacktracking = TimestampOffset .Zero )
35
47
}
36
48
37
49
final case class QueryState (
@@ -44,7 +56,9 @@ import org.slf4j.Logger
44
56
latestBacktracking : TimestampOffset ,
45
57
latestBacktrackingSeenCount : Int ,
46
58
backtrackingExpectFiltered : Int ,
47
- buckets : Buckets ) {
59
+ buckets : Buckets ,
60
+ previous : TimestampOffset ,
61
+ previousBacktracking : TimestampOffset ) {
48
62
49
63
def backtracking : Boolean = backtrackingCount > 0
50
64
@@ -56,6 +70,10 @@ import org.slf4j.Logger
56
70
if (backtracking) latestBacktracking.timestamp
57
71
else latest.timestamp
58
72
73
+ def nextQueryFromSeqNr : Option [Long ] =
74
+ if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking)
75
+ else highestSeenSeqNr(previous, latest)
76
+
59
77
def nextQueryToTimestamp (atLeastNumberOfEvents : Int ): Option [Instant ] = {
60
78
buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match {
61
79
case Some (t) =>
@@ -70,6 +88,12 @@ import org.slf4j.Logger
70
88
}
71
89
}
72
90
91
+ // only filter by highest seen seq nr when the next query is the same timestamp (or when unknown for initial queries)
92
+ private def highestSeenSeqNr (previous : TimestampOffset , latest : TimestampOffset ): Option [Long ] =
93
+ Option .when((previous == TimestampOffset .Zero || previous.timestamp == latest.timestamp) && latest.seen.nonEmpty) {
94
+ latest.seen.values.max
95
+ }
96
+
73
97
object Buckets {
74
98
type EpochSeconds = Long
75
99
type Count = Long
@@ -157,6 +181,7 @@ import org.slf4j.Logger
157
181
minSlice : Int ,
158
182
maxSlice : Int ,
159
183
fromTimestamp : Instant ,
184
+ fromSeqNr : Option [Long ], // for events with same timestamp as `fromTimestamp`
160
185
toTimestamp : Option [Instant ],
161
186
behindCurrentTime : FiniteDuration ,
162
187
backtracking : Boolean ): Source [SerializedRow , NotUsed ]
@@ -212,7 +237,10 @@ import org.slf4j.Logger
212
237
// so continue until rowCount is 0. That means an extra query at the end to make sure there are no
213
238
// more to fetch.
214
239
if (state.queryCount == 0L || state.rowCount > 0 ) {
215
- val newState = state.copy(rowCount = 0 , queryCount = state.queryCount + 1 )
240
+ val newState = state.copy(rowCount = 0 , queryCount = state.queryCount + 1 , previous = state.latest)
241
+
242
+ val fromTimestamp = state.latest.timestamp
243
+ val fromSeqNr = highestSeenSeqNr(state.previous, state.latest)
216
244
217
245
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
218
246
case Some (t) =>
@@ -228,7 +256,7 @@ import org.slf4j.Logger
228
256
state.queryCount,
229
257
minSlice,
230
258
maxSlice,
231
- state.latest.timestamp ,
259
+ fromTimestamp ,
232
260
toTimestamp,
233
261
state.rowCount)
234
262
@@ -238,7 +266,8 @@ import org.slf4j.Logger
238
266
entityType,
239
267
minSlice,
240
268
maxSlice,
241
- state.latest.timestamp,
269
+ fromTimestamp,
270
+ fromSeqNr,
242
271
toTimestamp = Some (toTimestamp),
243
272
behindCurrentTime = Duration .Zero ,
244
273
backtracking = false )
@@ -312,7 +341,11 @@ import org.slf4j.Logger
312
341
s " Unexpected offset [ $offset] before latestBacktracking [ ${state.latestBacktracking}]. " )
313
342
314
343
val newSeenCount =
315
- if (offset.timestamp == state.latestBacktracking.timestamp) state.latestBacktrackingSeenCount + 1 else 1
344
+ if (offset.timestamp == state.latestBacktracking.timestamp &&
345
+ highestSeenSeqNr(state.previousBacktracking, offset) ==
346
+ highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking))
347
+ state.latestBacktrackingSeenCount + 1
348
+ else 1
316
349
317
350
state.copy(
318
351
latestBacktracking = offset,
@@ -420,6 +453,7 @@ import org.slf4j.Logger
420
453
else settings.querySettings.behindCurrentTime
421
454
422
455
val fromTimestamp = newState.nextQueryFromTimestamp
456
+ val fromSeqNr = newState.nextQueryFromSeqNr
423
457
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
424
458
425
459
if (log.isDebugEnabled()) {
@@ -446,14 +480,19 @@ import org.slf4j.Logger
446
480
else s " Found [ ${state.rowCount}] rows in previous query. " )
447
481
}
448
482
449
- newState ->
483
+ val newStateWithPrevious =
484
+ if (newState.backtracking) newState.copy(previousBacktracking = newState.latestBacktracking)
485
+ else newState.copy(previous = newState.latest)
486
+
487
+ newStateWithPrevious ->
450
488
Some (
451
489
dao
452
490
.rowsBySlices(
453
491
entityType,
454
492
minSlice,
455
493
maxSlice,
456
494
fromTimestamp,
495
+ fromSeqNr,
457
496
toTimestamp,
458
497
behindCurrentTime,
459
498
backtracking = newState.backtracking)
0 commit comments