Skip to content

Commit

Permalink
fix: only initial backtracking up to start offset when far behind (#631)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Jan 9, 2025
1 parent aab0557 commit b1c2070
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,12 @@ import org.slf4j.Logger

def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = {
val aheadOfInitial =
initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp)
initialOffset == TimestampOffset.Zero ||
state.latestBacktracking.timestamp.compareTo(initialOffset.timestamp) >= 0

val previousTimestamp =
if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp

aheadOfInitial &&
previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ class EventsBySliceBacktrackingSpec
result1.cancel()
}

"still make initial backtracking until ahead of start offset" in {
"still make initial backtracking until caught up to start offset, then skip backtracking" in {
pendingIfMoreThanOneDataPartition()

val entityType = nextEntityType()
Expand All @@ -317,7 +317,11 @@ class EventsBySliceBacktrackingSpec
writeEvent(slice1, pid1, 2, startTime.plusMillis(3), "e1-2")
writeEvent(slice2, pid2, 2, startTime.plusMillis(4), "e2-2")

(3 to 10).foreach { n =>
// will start query at next event
val startOffset = TimestampOffset(startTime.plusSeconds(23).plusMillis(1), Map.empty)

// go past switch-to-backtracking trigger of 3 * buffer size (of 10)
(3 to 30).foreach { n =>
writeEvent(slice1, pid1, n, startTime.plusSeconds(20 + n).plusMillis(1), s"e1-$n")
writeEvent(slice2, pid2, n, startTime.plusSeconds(20 + n).plusMillis(2), s"e2-$n")
}
Expand All @@ -339,15 +343,16 @@ class EventsBySliceBacktrackingSpec
env.offset
}

val result1 = startQuery(TimestampOffset(startTime.plusSeconds(20), Map.empty))
val result1 = startQuery(startOffset)
// from backtracking
expect(result1.expectNext(), pid1, 1, None)
expect(result1.expectNext(), pid2, 1, None)
expect(result1.expectNext(), pid1, 2, None)
expect(result1.expectNext(), pid2, 2, None)
expect(result1.expectNext(), pid1, 3, None) // start offset

// from normal
(3 to 10).foreach { n =>
(3 to 30).foreach { n =>
expect(result1.expectNext(), pid1, n, Some(s"e1-$n"))
expect(result1.expectNext(), pid2, n, Some(s"e2-$n"))
}
Expand Down

0 comments on commit b1c2070

Please sign in to comment.