Skip to content

Commit

Permalink
feat: Skip backtracking when far behind,Accept PubSub events after id…
Browse files Browse the repository at this point in the history
…le (#614)

* feat: Accept PubSub events after idle
  * PubSub events are ignored if they are too far ahead of backtracking.
  * That means that they will always be ignored after an idle period.
  * This emits heartbeat events when the query is idle and thereby progress
    the backtracking timestamp in the PubSub filter.
  * base relative heartbeat timestamps on initial database time
  * previous query wall clock minus backtracking behind current time
  * including fake persistenceId corresponding to the slice
  * don't deduplicate heartbeat
  * mima filter
  * unique heartbeat persistenceId
  * filter heartbeats early in pubsub merge

* feat: Skip backtracking when far behind

---------

Co-authored-by: Peter Vlugter <[email protected]>
  • Loading branch information
patriknw and pvlugter authored Oct 18, 2024
1 parent 56552e4 commit c7899d1
Show file tree
Hide file tree
Showing 11 changed files with 531 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# PR #599 internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.copy*")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.unapply")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internals
ProblemFilters.exclude[Problem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState*")
209 changes: 143 additions & 66 deletions core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@

package akka.persistence.r2dbc.internal

import java.time.Clock

import scala.collection.immutable
import java.time.Instant
import java.time.{ Duration => JDuration }

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import akka.NotUsed
import akka.annotation.InternalApi
import akka.persistence.query.Offset
Expand Down Expand Up @@ -42,7 +46,11 @@ import org.slf4j.Logger
backtrackingExpectFiltered = 0,
buckets = Buckets.empty,
previous = TimestampOffset.Zero,
previousBacktracking = TimestampOffset.Zero)
previousBacktracking = TimestampOffset.Zero,
startTimestamp = Instant.EPOCH,
startWallClock = Instant.EPOCH,
currentQueryWallClock = Instant.EPOCH,
previousQueryWallClock = Instant.EPOCH)
}

final case class QueryState(
Expand All @@ -57,24 +65,32 @@ import org.slf4j.Logger
backtrackingExpectFiltered: Int,
buckets: Buckets,
previous: TimestampOffset,
previousBacktracking: TimestampOffset) {
previousBacktracking: TimestampOffset,
startTimestamp: Instant,
startWallClock: Instant,
currentQueryWallClock: Instant,
previousQueryWallClock: Instant) {

def backtracking: Boolean = backtrackingCount > 0

def currentOffset: TimestampOffset =
if (backtracking) latestBacktracking
else latest

def nextQueryFromTimestamp: Instant =
if (backtracking) latestBacktracking.timestamp
else latest.timestamp
def nextQueryFromTimestamp(backtrackingWindow: JDuration): Instant =
if (backtracking && latest.timestamp.minus(backtrackingWindow).isAfter(latestBacktracking.timestamp))
latest.timestamp.minus(backtrackingWindow)
else if (backtracking)
latestBacktracking.timestamp
else
latest.timestamp

def nextQueryFromSeqNr: Option[Long] =
if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking)
else highestSeenSeqNr(previous, latest)

def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match {
def nextQueryToTimestamp(backtrackingWindow: JDuration, atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp(backtrackingWindow), atLeastNumberOfEvents) match {
case Some(t) =>
if (backtracking)
if (t.isAfter(latest.timestamp)) Some(latest.timestamp) else Some(t)
Expand Down Expand Up @@ -208,15 +224,18 @@ import org.slf4j.Logger
dao: BySliceQuery.Dao[Row],
createEnvelope: (TimestampOffset, Row) => Envelope,
extractOffset: Envelope => TimestampOffset,
createHeartbeat: Instant => Option[Envelope],
clock: Clock,
settings: R2dbcSettings,
log: Logger)(implicit val ec: ExecutionContext) {
import BySliceQuery._
import TimestampOffset.toTimestampOffset

private val backtrackingWindow = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)
private val halfBacktrackingWindow = backtrackingWindow.dividedBy(2)
private val firstBacktrackingQueryWindow =
backtrackingWindow.plus(JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis))
private val backtrackingBehindCurrentTime =
JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis)
private val firstBacktrackingQueryWindow = backtrackingWindow.plus(backtrackingBehindCurrentTime)
private val eventBucketCountInterval = JDuration.ofSeconds(60)

def currentBySlices(
Expand All @@ -228,8 +247,12 @@ import org.slf4j.Logger
filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)

def nextOffset(state: QueryState, envelope: Envelope): QueryState =
state.copy(latest = extractOffset(envelope), rowCount = state.rowCount + 1)
def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
if (EnvelopeOrigin.isHeartbeatEvent(envelope))
state
else
state.copy(latest = extractOffset(envelope), rowCount = state.rowCount + 1)
}

def nextQuery(state: QueryState, endTimestamp: Instant): (QueryState, Option[Source[Envelope, NotUsed]]) = {
// Note that we can't know how many events with the same timestamp that are filtered out
Expand All @@ -241,7 +264,7 @@ import org.slf4j.Logger
val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.previous, state.latest)

val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
case None =>
Expand Down Expand Up @@ -333,45 +356,49 @@ import org.slf4j.Logger
initialOffset.timestamp)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp &&
highestSeenSeqNr(state.previousBacktracking, offset) ==
highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking))
state.latestBacktrackingSeenCount + 1
else 1

state.copy(
latestBacktracking = offset,
latestBacktrackingSeenCount = newSeenCount,
rowCount = state.rowCount + 1)
if (EnvelopeOrigin.isHeartbeatEvent(envelope))
state
else {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp &&
highestSeenSeqNr(state.previousBacktracking, offset) ==
highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking))
state.latestBacktrackingSeenCount + 1
else 1

} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")
state.copy(
latestBacktracking = offset,
latestBacktrackingSeenCount = newSeenCount,
rowCount = state.rowCount + 1)

if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow)))
log.debug(
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
logPrefix,
state.latestBacktracking,
offset)
}
} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")

if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow)))
log.debug(
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
logPrefix,
state.latestBacktracking,
offset)
}

state.copy(latest = offset, rowCount = state.rowCount + 1)
state.copy(latest = offset, rowCount = state.rowCount + 1)
}
}
}

def delayNextQuery(state: QueryState): Option[FiniteDuration] = {
if (switchFromBacktracking(state)) {
// switch from from backtracking immediately
// switch from backtracking immediately
None
} else {
val delay = ContinuousQuery.adjustNextDelay(
Expand All @@ -398,20 +425,38 @@ import org.slf4j.Logger
state.backtracking && state.rowCount < settings.querySettings.bufferSize - state.backtrackingExpectFiltered
}

def switchToBacktracking(state: QueryState, newIdleCount: Long): Boolean = {
// Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

val qSettings = settings.querySettings

def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = {
val aheadOfInitial =
initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp)
val previousTimestamp =
if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp
aheadOfInitial &&
previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow))
}

qSettings.backtrackingEnabled &&
!state.backtracking &&
state.latest != TimestampOffset.Zero &&
!disableBacktrackingWhenFarBehindCurrentWallClockTime &&
(newIdleCount >= 5 ||
state.rowCountSinceBacktracking + state.rowCount >= qSettings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)
}

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
// only start tracking query wall clock (for heartbeats) after initial backtracking query
val newQueryWallClock = if (state.latestBacktracking != TimestampOffset.Zero) clock.instant() else Instant.EPOCH
val newState =
if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero &&
(newIdleCount >= 5 ||
state.rowCountSinceBacktracking + state.rowCount >= settings.querySettings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)) {
// FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0`

// Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

if (switchToBacktracking(state, newIdleCount)) {
// switching to backtracking
val fromOffset =
if (state.latestBacktracking == TimestampOffset.Zero)
Expand All @@ -426,15 +471,19 @@ import org.slf4j.Logger
idleCount = newIdleCount,
backtrackingCount = 1,
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
} else if (switchFromBacktracking(state)) {
// switch from backtracking
// switching from backtracking
state.copy(
rowCount = 0,
rowCountSinceBacktracking = 0,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = 0)
backtrackingCount = 0,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
} else {
// continue
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
Expand All @@ -444,16 +493,18 @@ import org.slf4j.Logger
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = newBacktrackingCount,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
}

val behindCurrentTime =
if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime
else settings.querySettings.behindCurrentTime

val fromTimestamp = newState.nextQueryFromTimestamp
val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow)
val fromSeqNr = newState.nextQueryFromSeqNr
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize)

if (log.isDebugEnabled()) {
val backtrackingInfo =
Expand Down Expand Up @@ -501,12 +552,38 @@ import org.slf4j.Logger
.via(deserializeAndAddOffset(newState.currentOffset)))
}

ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty.copy(latest = initialOffset),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _))
def heartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) {
// using wall clock to measure duration since the start time (database timestamp) up to idle backtracking limit
val timestamp = state.startTimestamp.plus(
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))
createHeartbeat(timestamp)
} else
None
}

val nextHeartbeat: QueryState => Option[Envelope] =
if (settings.journalPublishEvents) heartbeat else _ => None

val currentTimestamp =
if (settings.useAppTimestamp) Future.successful(InstantFactory.now())
else dao.currentDbTimestamp(minSlice)

Source
.futureSource[Envelope, NotUsed] {
currentTimestamp.map { currentTime =>
val currentWallClock = clock.instant()
ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty
.copy(latest = initialOffset, startTimestamp = currentTime, startWallClock = currentWallClock),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _),
heartbeat = nextHeartbeat)
}
}
.mapMaterializedValue(_ => NotUsed)
}

private def beforeQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ private[r2dbc] object ContinuousQuery {
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery))
beforeQuery: S => Option[Future[S]] = (_: S) => None,
heartbeat: S => Option[T] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(
new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery, heartbeat))

private case object NextQuery

Expand Down Expand Up @@ -68,7 +70,8 @@ final private[r2dbc] class ContinuousQuery[S, T](
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]])
beforeQuery: S => Option[Future[S]],
heartbeat: S => Option[T])
extends GraphStage[SourceShape[T]] {
import ContinuousQuery._

Expand Down Expand Up @@ -149,8 +152,12 @@ final private[r2dbc] class ContinuousQuery[S, T](
next()
}
})
val sourceWithHeartbeat = heartbeat(newState) match {
case None => source
case Some(h) => Source.single(h).concat(source)
}
val graph = Source
.fromGraph(source)
.fromGraph(sourceWithHeartbeat)
.to(sinkIn.sink)
interpreter.subFusingMaterializer.materialize(graph)
sinkIn.pull()
Expand Down
Loading

0 comments on commit c7899d1

Please sign in to comment.