From c7899d1318d69b872b918aea117aabfd7e300c43 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 18 Oct 2024 10:04:56 +0200 Subject: [PATCH] feat: Skip backtracking when far behind,Accept PubSub events after idle (#614) * feat: Accept PubSub events after idle * PubSub events are ignored if they are too far ahead of backtracking. * That means that they will always be ignored after an idle period. * This emits heartbeat events when the query is idle and thereby progress the backtracking timestamp in the PubSub filter. * base relative heartbeat timestamps on initial database time * previous query wall clock minus backtracking behind current time * including fake persistenceId corresponding to the slice * don't deduplicate heartbeat * mima filter * unique heartbeat persistenceId * filter heartbeats early in pubsub merge * feat: Skip backtracking when far behind --------- Co-authored-by: Peter Vlugter <59895+pvlugter@users.noreply.github.com> --- .../heartbeat.excludes | 6 + .../SkipBacktracking.excludes | 2 + .../r2dbc/internal/BySliceQuery.scala | 209 ++++++++++++------ .../r2dbc/internal/ContinuousQuery.scala | 15 +- .../r2dbc/internal/EnvelopeOrigin.scala | 10 + .../query/scaladsl/R2dbcReadJournal.scala | 95 ++++++-- .../scaladsl/R2dbcDurableStateStore.scala | 6 +- core/src/test/resources/logback-test.xml | 2 +- .../query/EventsBySliceBacktrackingSpec.scala | 124 ++++++++++- .../EventsBySlicePubSubBacktrackingSpec.scala | 151 +++++++++++++ .../r2dbc/query/EventsBySliceSpec.scala | 2 +- 11 files changed, 531 insertions(+), 91 deletions(-) create mode 100644 core/src/main/mima-filters/1.2.5.backwards.excludes/heartbeat.excludes create mode 100644 core/src/main/mima-filters/1.3.0-M1.backwards.excludes/SkipBacktracking.excludes create mode 100644 core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubBacktrackingSpec.scala diff --git a/core/src/main/mima-filters/1.2.5.backwards.excludes/heartbeat.excludes b/core/src/main/mima-filters/1.2.5.backwards.excludes/heartbeat.excludes new file mode 100644 index 00000000..646505ec --- /dev/null +++ b/core/src/main/mima-filters/1.2.5.backwards.excludes/heartbeat.excludes @@ -0,0 +1,6 @@ +# PR #599 internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.copy*") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.unapply") diff --git a/core/src/main/mima-filters/1.3.0-M1.backwards.excludes/SkipBacktracking.excludes b/core/src/main/mima-filters/1.3.0-M1.backwards.excludes/SkipBacktracking.excludes new file mode 100644 index 00000000..600ff835 --- /dev/null +++ b/core/src/main/mima-filters/1.3.0-M1.backwards.excludes/SkipBacktracking.excludes @@ -0,0 +1,2 @@ +# internals +ProblemFilters.exclude[Problem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState*") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index ff14e3bf..db4dbf07 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -4,14 +4,18 @@ package akka.persistence.r2dbc.internal +import java.time.Clock + import scala.collection.immutable import java.time.Instant import java.time.{ Duration => JDuration } + import scala.annotation.tailrec import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration + import akka.NotUsed import akka.annotation.InternalApi import akka.persistence.query.Offset @@ -42,7 +46,11 @@ import org.slf4j.Logger backtrackingExpectFiltered = 0, buckets = Buckets.empty, previous = TimestampOffset.Zero, - previousBacktracking = TimestampOffset.Zero) + previousBacktracking = TimestampOffset.Zero, + startTimestamp = Instant.EPOCH, + startWallClock = Instant.EPOCH, + currentQueryWallClock = Instant.EPOCH, + previousQueryWallClock = Instant.EPOCH) } final case class QueryState( @@ -57,7 +65,11 @@ import org.slf4j.Logger backtrackingExpectFiltered: Int, buckets: Buckets, previous: TimestampOffset, - previousBacktracking: TimestampOffset) { + previousBacktracking: TimestampOffset, + startTimestamp: Instant, + startWallClock: Instant, + currentQueryWallClock: Instant, + previousQueryWallClock: Instant) { def backtracking: Boolean = backtrackingCount > 0 @@ -65,16 +77,20 @@ import org.slf4j.Logger if (backtracking) latestBacktracking else latest - def nextQueryFromTimestamp: Instant = - if (backtracking) latestBacktracking.timestamp - else latest.timestamp + def nextQueryFromTimestamp(backtrackingWindow: JDuration): Instant = + if (backtracking && latest.timestamp.minus(backtrackingWindow).isAfter(latestBacktracking.timestamp)) + latest.timestamp.minus(backtrackingWindow) + else if (backtracking) + latestBacktracking.timestamp + else + latest.timestamp def nextQueryFromSeqNr: Option[Long] = if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking) else highestSeenSeqNr(previous, latest) - def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = { - buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match { + def nextQueryToTimestamp(backtrackingWindow: JDuration, atLeastNumberOfEvents: Int): Option[Instant] = { + buckets.findTimeForLimit(nextQueryFromTimestamp(backtrackingWindow), atLeastNumberOfEvents) match { case Some(t) => if (backtracking) if (t.isAfter(latest.timestamp)) Some(latest.timestamp) else Some(t) @@ -208,6 +224,8 @@ import org.slf4j.Logger dao: BySliceQuery.Dao[Row], createEnvelope: (TimestampOffset, Row) => Envelope, extractOffset: Envelope => TimestampOffset, + createHeartbeat: Instant => Option[Envelope], + clock: Clock, settings: R2dbcSettings, log: Logger)(implicit val ec: ExecutionContext) { import BySliceQuery._ @@ -215,8 +233,9 @@ import org.slf4j.Logger private val backtrackingWindow = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis) private val halfBacktrackingWindow = backtrackingWindow.dividedBy(2) - private val firstBacktrackingQueryWindow = - backtrackingWindow.plus(JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis)) + private val backtrackingBehindCurrentTime = + JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis) + private val firstBacktrackingQueryWindow = backtrackingWindow.plus(backtrackingBehindCurrentTime) private val eventBucketCountInterval = JDuration.ofSeconds(60) def currentBySlices( @@ -228,8 +247,12 @@ import org.slf4j.Logger filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = { val initialOffset = toTimestampOffset(offset) - def nextOffset(state: QueryState, envelope: Envelope): QueryState = - state.copy(latest = extractOffset(envelope), rowCount = state.rowCount + 1) + def nextOffset(state: QueryState, envelope: Envelope): QueryState = { + if (EnvelopeOrigin.isHeartbeatEvent(envelope)) + state + else + state.copy(latest = extractOffset(envelope), rowCount = state.rowCount + 1) + } def nextQuery(state: QueryState, endTimestamp: Instant): (QueryState, Option[Source[Envelope, NotUsed]]) = { // Note that we can't know how many events with the same timestamp that are filtered out @@ -241,7 +264,7 @@ import org.slf4j.Logger val fromTimestamp = state.latest.timestamp val fromSeqNr = highestSeenSeqNr(state.previous, state.latest) - val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match { + val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match { case Some(t) => if (t.isBefore(endTimestamp)) t else endTimestamp case None => @@ -333,45 +356,49 @@ import org.slf4j.Logger initialOffset.timestamp) def nextOffset(state: QueryState, envelope: Envelope): QueryState = { - val offset = extractOffset(envelope) - if (state.backtracking) { - if (offset.timestamp.isBefore(state.latestBacktracking.timestamp)) - throw new IllegalArgumentException( - s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].") - - val newSeenCount = - if (offset.timestamp == state.latestBacktracking.timestamp && - highestSeenSeqNr(state.previousBacktracking, offset) == - highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking)) - state.latestBacktrackingSeenCount + 1 - else 1 - - state.copy( - latestBacktracking = offset, - latestBacktrackingSeenCount = newSeenCount, - rowCount = state.rowCount + 1) + if (EnvelopeOrigin.isHeartbeatEvent(envelope)) + state + else { + val offset = extractOffset(envelope) + if (state.backtracking) { + if (offset.timestamp.isBefore(state.latestBacktracking.timestamp)) + throw new IllegalArgumentException( + s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].") + + val newSeenCount = + if (offset.timestamp == state.latestBacktracking.timestamp && + highestSeenSeqNr(state.previousBacktracking, offset) == + highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking)) + state.latestBacktrackingSeenCount + 1 + else 1 - } else { - if (offset.timestamp.isBefore(state.latest.timestamp)) - throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].") + state.copy( + latestBacktracking = offset, + latestBacktrackingSeenCount = newSeenCount, + rowCount = state.rowCount + 1) - if (log.isDebugEnabled()) { - if (state.latestBacktracking.seen.nonEmpty && - offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow))) - log.debug( - "{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]", - logPrefix, - state.latestBacktracking, - offset) - } + } else { + if (offset.timestamp.isBefore(state.latest.timestamp)) + throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].") + + if (log.isDebugEnabled()) { + if (state.latestBacktracking.seen.nonEmpty && + offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow))) + log.debug( + "{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]", + logPrefix, + state.latestBacktracking, + offset) + } - state.copy(latest = offset, rowCount = state.rowCount + 1) + state.copy(latest = offset, rowCount = state.rowCount + 1) + } } } def delayNextQuery(state: QueryState): Option[FiniteDuration] = { if (switchFromBacktracking(state)) { - // switch from from backtracking immediately + // switch from backtracking immediately None } else { val delay = ContinuousQuery.adjustNextDelay( @@ -398,20 +425,38 @@ import org.slf4j.Logger state.backtracking && state.rowCount < settings.querySettings.bufferSize - state.backtrackingExpectFiltered } + def switchToBacktracking(state: QueryState, newIdleCount: Long): Boolean = { + // Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after + // the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow + + val qSettings = settings.querySettings + + def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = { + val aheadOfInitial = + initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp) + val previousTimestamp = + if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp + aheadOfInitial && + previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow)) + } + + qSettings.backtrackingEnabled && + !state.backtracking && + state.latest != TimestampOffset.Zero && + !disableBacktrackingWhenFarBehindCurrentWallClockTime && + (newIdleCount >= 5 || + state.rowCountSinceBacktracking + state.rowCount >= qSettings.bufferSize * 3 || + JDuration + .between(state.latestBacktracking.timestamp, state.latest.timestamp) + .compareTo(halfBacktrackingWindow) > 0) + } + def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = { val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0 + // only start tracking query wall clock (for heartbeats) after initial backtracking query + val newQueryWallClock = if (state.latestBacktracking != TimestampOffset.Zero) clock.instant() else Instant.EPOCH val newState = - if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero && - (newIdleCount >= 5 || - state.rowCountSinceBacktracking + state.rowCount >= settings.querySettings.bufferSize * 3 || - JDuration - .between(state.latestBacktracking.timestamp, state.latest.timestamp) - .compareTo(halfBacktrackingWindow) > 0)) { - // FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0` - - // Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after - // the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow - + if (switchToBacktracking(state, newIdleCount)) { // switching to backtracking val fromOffset = if (state.latestBacktracking == TimestampOffset.Zero) @@ -426,15 +471,19 @@ import org.slf4j.Logger idleCount = newIdleCount, backtrackingCount = 1, latestBacktracking = fromOffset, - backtrackingExpectFiltered = state.latestBacktrackingSeenCount) + backtrackingExpectFiltered = state.latestBacktrackingSeenCount, + currentQueryWallClock = newQueryWallClock, + previousQueryWallClock = state.currentQueryWallClock) } else if (switchFromBacktracking(state)) { - // switch from backtracking + // switching from backtracking state.copy( rowCount = 0, rowCountSinceBacktracking = 0, queryCount = state.queryCount + 1, idleCount = newIdleCount, - backtrackingCount = 0) + backtrackingCount = 0, + currentQueryWallClock = newQueryWallClock, + previousQueryWallClock = state.currentQueryWallClock) } else { // continue val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0 @@ -444,16 +493,18 @@ import org.slf4j.Logger queryCount = state.queryCount + 1, idleCount = newIdleCount, backtrackingCount = newBacktrackingCount, - backtrackingExpectFiltered = state.latestBacktrackingSeenCount) + backtrackingExpectFiltered = state.latestBacktrackingSeenCount, + currentQueryWallClock = newQueryWallClock, + previousQueryWallClock = state.currentQueryWallClock) } val behindCurrentTime = if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime else settings.querySettings.behindCurrentTime - val fromTimestamp = newState.nextQueryFromTimestamp + val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow) val fromSeqNr = newState.nextQueryFromSeqNr - val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) + val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) if (log.isDebugEnabled()) { val backtrackingInfo = @@ -501,12 +552,38 @@ import org.slf4j.Logger .via(deserializeAndAddOffset(newState.currentOffset))) } - ContinuousQuery[QueryState, Envelope]( - initialState = QueryState.empty.copy(latest = initialOffset), - updateState = nextOffset, - delayNextQuery = delayNextQuery, - nextQuery = nextQuery, - beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _)) + def heartbeat(state: QueryState): Option[Envelope] = { + if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) { + // using wall clock to measure duration since the start time (database timestamp) up to idle backtracking limit + val timestamp = state.startTimestamp.plus( + JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime))) + createHeartbeat(timestamp) + } else + None + } + + val nextHeartbeat: QueryState => Option[Envelope] = + if (settings.journalPublishEvents) heartbeat else _ => None + + val currentTimestamp = + if (settings.useAppTimestamp) Future.successful(InstantFactory.now()) + else dao.currentDbTimestamp(minSlice) + + Source + .futureSource[Envelope, NotUsed] { + currentTimestamp.map { currentTime => + val currentWallClock = clock.instant() + ContinuousQuery[QueryState, Envelope]( + initialState = QueryState.empty + .copy(latest = initialOffset, startTimestamp = currentTime, startWallClock = currentWallClock), + updateState = nextOffset, + delayNextQuery = delayNextQuery, + nextQuery = nextQuery, + beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _), + heartbeat = nextHeartbeat) + } + } + .mapMaterializedValue(_ => NotUsed) } private def beforeQuery( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala index 5d93e034..0b035ba8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala @@ -29,8 +29,10 @@ private[r2dbc] object ContinuousQuery { updateState: (S, T) => S, delayNextQuery: S => Option[FiniteDuration], nextQuery: S => (S, Option[Source[T, NotUsed]]), - beforeQuery: S => Option[Future[S]] = (_: S) => None): Source[T, NotUsed] = - Source.fromGraph(new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery)) + beforeQuery: S => Option[Future[S]] = (_: S) => None, + heartbeat: S => Option[T] = (_: S) => None): Source[T, NotUsed] = + Source.fromGraph( + new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery, heartbeat)) private case object NextQuery @@ -68,7 +70,8 @@ final private[r2dbc] class ContinuousQuery[S, T]( updateState: (S, T) => S, delayNextQuery: S => Option[FiniteDuration], nextQuery: S => (S, Option[Source[T, NotUsed]]), - beforeQuery: S => Option[Future[S]]) + beforeQuery: S => Option[Future[S]], + heartbeat: S => Option[T]) extends GraphStage[SourceShape[T]] { import ContinuousQuery._ @@ -149,8 +152,12 @@ final private[r2dbc] class ContinuousQuery[S, T]( next() } }) + val sourceWithHeartbeat = heartbeat(newState) match { + case None => source + case Some(h) => Source.single(h).concat(source) + } val graph = Source - .fromGraph(source) + .fromGraph(sourceWithHeartbeat) .to(sinkIn.sink) interpreter.subFusingMaterializer.materialize(graph) sinkIn.pull() diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala index 6d18ee13..4fcfe94b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala @@ -16,6 +16,7 @@ import akka.persistence.query.typed.EventEnvelope val SourceBacktracking = "BT" val SourcePubSub = "PS" val SourceSnapshot = "SN" + val SourceHeartbeat = "HB" def fromQuery(env: EventEnvelope[_]): Boolean = env.source == SourceQuery @@ -32,6 +33,15 @@ import akka.persistence.query.typed.EventEnvelope def fromSnapshot(env: EventEnvelope[_]): Boolean = env.source == SourceSnapshot + def fromHeartbeat(env: EventEnvelope[_]): Boolean = + env.source == SourceHeartbeat + + def isHeartbeatEvent(env: Any): Boolean = + env match { + case e: EventEnvelope[_] => fromHeartbeat(e) + case _ => false + } + def isFilteredEvent(env: Any): Boolean = env match { case e: EventEnvelope[_] => e.filtered diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index a6758ce1..f4c58e8f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -4,9 +4,13 @@ package akka.persistence.r2dbc.query.scaladsl +import java.time.Clock import java.time.Instant import java.time.{ Duration => JDuration } +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import scala.annotation.tailrec import scala.collection.immutable import scala.collection.mutable import scala.concurrent.Future @@ -55,7 +59,6 @@ import com.typesafe.config.Config import org.slf4j.LoggerFactory import akka.persistence.r2dbc.internal.R2dbcExecutorProvider -import akka.util.OptionVal object R2dbcReadJournal { val Identifier = "akka.persistence.r2dbc.query" @@ -109,19 +112,55 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat private val filteredPayloadSerId = SerializationExtension(system).findSerializerFor(FilteredPayload).identifier + // key is tuple of entity type and slice + private val heartbeatPersistenceIds = new ConcurrentHashMap[(String, Int), String]() + private val heartbeatUuid = UUID.randomUUID().toString + log.debug("Using heartbeat UUID [{}]", heartbeatUuid) + + private def heartbeatPersistenceId(entityType: String, slice: Int): String = { + val key = entityType -> slice + heartbeatPersistenceIds.get(key) match { + case null => + // no need to block other threads, it's just a cache + val pid = generateHeartbeatPersistenceId(entityType, slice) + heartbeatPersistenceIds.put(key, pid) + pid + case pid => pid + } + } + + @tailrec private def generateHeartbeatPersistenceId(entityType: String, slice: Int, n: Int = 1): String = { + if (n < 1000000) { + // including a uuid to make sure it is not the same as any persistence id of the application + val pid = PersistenceId.concat(entityType, s"_hb-$heartbeatUuid-$n") + if (persistenceExt.sliceForPersistenceId(pid) == slice) + pid + else + generateHeartbeatPersistenceId(entityType, slice, n + 1) + } else + throw new IllegalStateException(s"Couldn't find heartbeat persistenceId for [$entityType] with slice [$slice]") + + } + private def deserializePayload[Event](row: SerializedJournalRow): Option[Event] = row.payload.map(payload => serialization.deserialize(payload, row.serId, row.serManifest).get.asInstanceOf[Event]) - private val _bySlice: BySliceQuery[SerializedJournalRow, EventEnvelope[Any]] = { - val createEnvelope: (TimestampOffset, SerializedJournalRow) => EventEnvelope[Any] = createEventEnvelope + private val clock = Clock.systemUTC() - val extractOffset: EventEnvelope[Any] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset] + private def bySlice[Event]( + entityType: String, + minSlice: Int): BySliceQuery[SerializedJournalRow, EventEnvelope[Event]] = { + val createEnvelope: (TimestampOffset, SerializedJournalRow) => EventEnvelope[Event] = createEventEnvelope - new BySliceQuery(queryDao, createEnvelope, extractOffset, settings, log)(typedSystem.executionContext) - } + val extractOffset: EventEnvelope[Event] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset] - private def bySlice[Event]: BySliceQuery[SerializedJournalRow, EventEnvelope[Event]] = - _bySlice.asInstanceOf[BySliceQuery[SerializedJournalRow, EventEnvelope[Event]]] + val createHeartbeat: Instant => Option[EventEnvelope[Event]] = { timestamp => + Some(createEventEnvelopeHeartbeat(entityType, minSlice, timestamp)) + } + + new BySliceQuery(queryDao, createEnvelope, extractOffset, createHeartbeat, clock, settings, log)( + typedSystem.executionContext) + } private def deserializeBySliceRow[Event](row: SerializedJournalRow): EventEnvelope[Event] = { val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr)) @@ -148,6 +187,21 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat tags = row.tags) } + def createEventEnvelopeHeartbeat[Event](entityType: String, slice: Int, timestamp: Instant): EventEnvelope[Event] = { + new EventEnvelope( + TimestampOffset(timestamp, Map.empty), + heartbeatPersistenceId(entityType, slice), + 1L, + eventOption = None, + timestamp.toEpochMilli, + eventMetadata = None, + entityType, + slice, + filtered = true, + source = EnvelopeOrigin.SourceHeartbeat, + Set.empty) + } + private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope = { val event = deserializePayload(row) // note that it's not possible to filter out FilteredPayload here @@ -161,13 +215,20 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat } private def snapshotsBySlice[Snapshot, Event]( + entityType: String, + minSlice: Int, transformSnapshot: Snapshot => Event): BySliceQuery[SerializedSnapshotRow, EventEnvelope[Event]] = { val createEnvelope: (TimestampOffset, SerializedSnapshotRow) => EventEnvelope[Event] = (offset, row) => createEnvelopeFromSnapshot(row, offset, transformSnapshot) val extractOffset: EventEnvelope[Event] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset] - new BySliceQuery(snapshotDao, createEnvelope, extractOffset, settings, log)(typedSystem.executionContext) + val createHeartbeat: Instant => Option[EventEnvelope[Event]] = { timestamp => + Some(createEventEnvelopeHeartbeat(entityType, minSlice, timestamp).asInstanceOf[EventEnvelope[Event]]) + } + + new BySliceQuery(snapshotDao, createEnvelope, extractOffset, createHeartbeat, clock, settings, log)( + typedSystem.executionContext) } private def createEnvelopeFromSnapshot[Snapshot, Event]( @@ -208,7 +269,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed] = { - bySlice + bySlice(entityType, minSlice) .currentBySlices("currentEventsBySlices", entityType, minSlice, maxSlice, offset) } @@ -250,7 +311,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed] = { - val dbSource = bySlice[Event].liveBySlices("eventsBySlices", entityType, minSlice, maxSlice, offset) + val dbSource = + bySlice[Event](entityType, minSlice).liveBySlices("eventsBySlices", entityType, minSlice, maxSlice, offset) if (settings.journalPublishEvents) { val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice) mergeDbAndPubSubSources(dbSource, pubSubSource) @@ -282,7 +344,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val timestampOffset = toTimestampOffset(offset) val snapshotSource = - snapshotsBySlice[Snapshot, Event](transformSnapshot) + snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot) .currentBySlices("currentSnapshotsBySlices", entityType, minSlice, maxSlice, offset) Source.fromGraph( @@ -305,7 +367,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat initOffset, snapshotOffsets.size) - bySlice.currentBySlices( + bySlice(entityType, minSlice).currentBySlices( "currentEventsBySlices", entityType, minSlice, @@ -339,7 +401,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val timestampOffset = toTimestampOffset(offset) val snapshotSource = - snapshotsBySlice[Snapshot, Event](transformSnapshot) + snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot) .currentBySlices("snapshotsBySlices", entityType, minSlice, maxSlice, offset) Source.fromGraph( @@ -363,7 +425,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat snapshotOffsets.size) val dbSource = - bySlice[Event].liveBySlices( + bySlice[Event](entityType, minSlice).liveBySlices( "eventsBySlices", entityType, minSlice, @@ -523,6 +585,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat if (EnvelopeOrigin.fromBacktracking(env)) { latestBacktracking = t.timestamp env :: Nil + } else if (EnvelopeOrigin.fromHeartbeat(env)) { + latestBacktracking = t.timestamp + Nil // always drop heartbeats } else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking == Instant.EPOCH) { log.trace( "Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet.", diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 995bfa61..ed7ef198 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -4,6 +4,7 @@ package akka.persistence.r2dbc.state.scaladsl +import java.time.Clock import java.time.Instant import java.util.UUID @@ -86,6 +87,8 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg if (settings.journalPublishEvents) Some(PubSub(typedSystem)) else None + private val clock = Clock.systemUTC() + private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = { val createEnvelope: (TimestampOffset, SerializedStateRow) => DurableStateChange[A] = (offset, row) => { row.payload match { @@ -107,7 +110,8 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg val extractOffset: DurableStateChange[A] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset] - new BySliceQuery(stateDao, createEnvelope, extractOffset, settings, log)(typedSystem.executionContext) + new BySliceQuery(stateDao, createEnvelope, extractOffset, createHeartbeat = _ => None, clock, settings, log)( + typedSystem.executionContext) } override def getObject(persistenceId: String): Future[GetObjectResult[A]] = { diff --git a/core/src/test/resources/logback-test.xml b/core/src/test/resources/logback-test.xml index bcac0cfe..82b541d7 100644 --- a/core/src/test/resources/logback-test.xml +++ b/core/src/test/resources/logback-test.xml @@ -24,4 +24,4 @@ - \ No newline at end of file + diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index 16f26b6d..abad9410 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -37,6 +37,7 @@ import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSink +import scala.jdk.DurationConverters._ object EventsBySliceBacktrackingSpec { private val BufferSize = 10 // small buffer for testing @@ -103,7 +104,7 @@ class EventsBySliceBacktrackingSpec val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem) // don't let behind-current-time be a reason for not finding events - val startTime = InstantFactory.now().minusSeconds(10 * 60) + val startTime = InstantFactory.now().minusSeconds(90) writeEvent(slice1, pid1, 1L, startTime, "e1-1") writeEvent(slice1, pid1, 2L, startTime.plusMillis(1), "e1-2") @@ -194,7 +195,7 @@ class EventsBySliceBacktrackingSpec val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem) // don't let behind-current-time be a reason for not finding events - val startTime = InstantFactory.now().minusSeconds(10 * 60) + val startTime = InstantFactory.now().minusSeconds(90) writeEvent(slice1, pid1, 1L, startTime, "e1-1") writeEvent(slice1, pid1, 2L, startTime.plusMillis(2), "e1-2") @@ -243,6 +244,119 @@ class EventsBySliceBacktrackingSpec result2.cancel() } + "skip backtracking when far behind current time" in { + pendingIfMoreThanOneDataPartition() + + val entityType = nextEntityType() + val pid1 = nextPid(entityType) + val pid2 = nextPid(entityType) + val slice1 = query.sliceForPersistenceId(pid1) + val slice2 = query.sliceForPersistenceId(pid2) + val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem) + + val startTime = InstantFactory.now().minusSeconds(60 * 60 * 24) + + (1 to 100).foreach { n => + writeEvent(slice1, pid1, n, startTime.plusSeconds(n).plusMillis(1), s"e1-$n") + writeEvent(slice2, pid2, n, startTime.plusSeconds(n).plusMillis(2), s"e2-$n") + } + + def startQuery(offset: Offset): TestSubscriber.Probe[EventEnvelope[String]] = + query + .eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices - 1, offset) + .runWith(sinkProbe) + .request(1000) + + def expect(env: EventEnvelope[String], pid: String, seqNr: Long, eventOption: Option[String]): Offset = { + env.persistenceId shouldBe pid + env.sequenceNr shouldBe seqNr + if (eventOption.isEmpty) + env.source shouldBe EnvelopeOrigin.SourceBacktracking + else + env.source shouldBe EnvelopeOrigin.SourceQuery + env.eventOption shouldBe eventOption + env.offset + } + + val result1 = startQuery(NoOffset) + (1 to 100).foreach { n => + expect(result1.expectNext(), pid1, n, Some(s"e1-$n")) + expect(result1.expectNext(), pid2, n, Some(s"e2-$n")) + } + // no backtracking + result1.expectNoMessage() + + val now = InstantFactory.now().minus(settings.querySettings.backtrackingBehindCurrentTime.toJava) + writeEvent(slice1, pid1, 101, now, "e1-101") + writeEvent(slice2, pid2, 101, now.plusMillis(1), "e2-101") + + expect(result1.expectNext(), pid1, 101, Some("e1-101")) + expect(result1.expectNext(), pid2, 101, Some("e2-101")) + + // backtracking events + expect(result1.expectNext(), pid1, 101, None) + expect(result1.expectNext(), pid2, 101, None) + + result1.cancel() + } + + "still make initial backtracking until ahead of start offset" in { + pendingIfMoreThanOneDataPartition() + + val entityType = nextEntityType() + val pid1 = nextPid(entityType) + val pid2 = nextPid(entityType) + val slice1 = query.sliceForPersistenceId(pid1) + val slice2 = query.sliceForPersistenceId(pid2) + val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem) + + val startTime = InstantFactory.now().minusSeconds(60 * 60 * 24) + + writeEvent(slice1, pid1, 1, startTime.plusMillis(1), "e1-1") + writeEvent(slice2, pid2, 1, startTime.plusMillis(2), "e2-1") + writeEvent(slice1, pid1, 2, startTime.plusMillis(3), "e1-2") + writeEvent(slice2, pid2, 2, startTime.plusMillis(4), "e2-2") + + (3 to 10).foreach { n => + writeEvent(slice1, pid1, n, startTime.plusSeconds(20 + n).plusMillis(1), s"e1-$n") + writeEvent(slice2, pid2, n, startTime.plusSeconds(20 + n).plusMillis(2), s"e2-$n") + } + + def startQuery(offset: Offset): TestSubscriber.Probe[EventEnvelope[String]] = + query + .eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices - 1, offset) + .runWith(sinkProbe) + .request(1000) + + def expect(env: EventEnvelope[String], pid: String, seqNr: Long, eventOption: Option[String]): Offset = { + env.persistenceId shouldBe pid + env.sequenceNr shouldBe seqNr + if (eventOption.isEmpty) + env.source shouldBe EnvelopeOrigin.SourceBacktracking + else + env.source shouldBe EnvelopeOrigin.SourceQuery + env.eventOption shouldBe eventOption + env.offset + } + + val result1 = startQuery(TimestampOffset(startTime.plusSeconds(20), Map.empty)) + // from backtracking + expect(result1.expectNext(), pid1, 1, None) + expect(result1.expectNext(), pid2, 1, None) + expect(result1.expectNext(), pid1, 2, None) + expect(result1.expectNext(), pid2, 2, None) + + // from normal + (3 to 10).foreach { n => + expect(result1.expectNext(), pid1, n, Some(s"e1-$n")) + expect(result1.expectNext(), pid2, n, Some(s"e2-$n")) + } + // no backtracking + result1.expectNoMessage() + + result1.cancel() + } + "predict backtracking filtered events based on latest seen counts" in { pendingIfMoreThanOneDataPartition() @@ -252,7 +366,11 @@ class EventsBySliceBacktrackingSpec val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem) // use times in the past well outside behind-current-time - val timeZero = InstantFactory.now().truncatedTo(ChronoUnit.SECONDS).minusSeconds(10 * 60) + val timeZero = InstantFactory + .now() + .truncatedTo(ChronoUnit.SECONDS) + .minus(settings.querySettings.backtrackingBehindCurrentTime.toJava) + .minusSeconds(10) // events around the buffer size (of 10) will share the same timestamp // to test tracking of seen events that will be filtered on the next cycle diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubBacktrackingSpec.scala new file mode 100644 index 00000000..2da9b586 --- /dev/null +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubBacktrackingSpec.scala @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.persistence.r2dbc.query + +import scala.annotation.tailrec +import scala.concurrent.duration._ + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.actor.typed.internal.pubsub.TopicImpl +import akka.persistence.query.NoOffset +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.r2dbc.TestActors +import akka.persistence.r2dbc.TestActors.Persister.Persist +import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck +import akka.persistence.r2dbc.TestConfig +import akka.persistence.r2dbc.TestData +import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.EnvelopeOrigin +import akka.persistence.r2dbc.internal.PubSub +import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.scaladsl.TestSink + +object EventsBySlicePubSubBacktrackingSpec { + def config: Config = ConfigFactory + .parseString(""" + akka.persistence.r2dbc { + journal.publish-events = on + query { + refresh-interval = 1 s + + # Too make the test predictable, PubSub arriving first + behind-current-time = 2 s + + backtracking { + behind-current-time = 3 s + # enough space for heartbeats (previous query - behind current time) + window = 6 s + } + } + } + akka.actor.testkit.typed.filter-leeway = 20.seconds + """) + .withFallback(TestConfig.config) +} + +class EventsBySlicePubSubBacktrackingSpec + extends ScalaTestWithActorTestKit(EventsBySlicePubSubBacktrackingSpec.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + + override def typedSystem: ActorSystem[_] = system + + private val query = PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) + + private class Setup { + val entityType = nextEntityType() + val persistenceId = nextPid(entityType) + val slice = query.sliceForPersistenceId(persistenceId) + val persister = spawn(TestActors.Persister(persistenceId)) + val probe = createTestProbe[Done]() + val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem) + } + + s"EventsBySlices pub-sub with backtracking enabled" should { + + "publish events" in new Setup { + + val result: TestSubscriber.Probe[EventEnvelope[String]] = + query.eventsBySlices[String](this.entityType, slice, slice, NoOffset).runWith(sinkProbe).request(1000) + + val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]() + eventually { + PubSub(typedSystem).eventTopic[String](this.entityType, slice) ! TopicImpl.GetTopicStats(topicStatsProbe.ref) + topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 1 + } + + for (i <- 1 to 9) { + persister ! Persist(s"e-$i") + } + persister ! PersistWithAck("e-10", probe.ref) + probe.expectMessage(Done) + // Initial PubSub events are dropped because no backtracking events yet + result.expectNoMessage(500.millis) + + for (i <- 1 to 10) { + val env = result.expectNext() + env.event shouldBe s"e-$i" + env.source shouldBe EnvelopeOrigin.SourceQuery + } + + result.expectNoMessage(1.second) + for (i <- 1 to 10) { + val env = result.expectNext() + env.sequenceNr shouldBe i + env.source shouldBe EnvelopeOrigin.SourceBacktracking + } + + // after backtracking the PubSub events will get through + for (i <- 11 to 19) { + persister ! Persist(s"e-$i") + } + persister ! PersistWithAck("e-20", probe.ref) + for (i <- 11 to 20) { + val env = result.expectNext() + env.event shouldBe s"e-$i" + env.source shouldBe EnvelopeOrigin.SourcePubSub + } + // and then the ordinary query + for (i <- 11 to 20) { + val env = result.expectNext() + env.event shouldBe s"e-$i" + env.source shouldBe EnvelopeOrigin.SourceQuery + } + // and backtracking + for (i <- 11 to 20) { + val env = result.expectNext() + env.sequenceNr shouldBe i + env.source shouldBe EnvelopeOrigin.SourceBacktracking + } + + // after idle it will emit heartbeat + Thread.sleep(6000) + + // and because of the heartbeat it will accept PubSub even though it's now > backtracking.window + persister ! PersistWithAck("e-21", probe.ref) + + { + val env = result.expectNext() + env.event shouldBe s"e-21" + env.source shouldBe EnvelopeOrigin.SourcePubSub + } + + result.cancel() + } + + } + +} diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index c87aebb6..a42b92ae 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -240,7 +240,7 @@ class EventsBySliceSpec val slice3 = query.sliceForPersistenceId(pid3) val slice4 = query.sliceForPersistenceId(pid4) val slices = Seq(slice1, slice2, slice3, slice4) - val t1 = InstantFactory.now() + val t1 = InstantFactory.now().minusSeconds(10) val t2 = t1.plusMillis(1) writeEvent(slice1, pid1, 1L, t1, "A1")