Skip to content

Commit 9fbf8a2

Browse files
authored
perf: Avoid too large bursts of backtracking events (#349)
1 parent 9756a6b commit 9fbf8a2

File tree

4 files changed

+145
-18
lines changed

4 files changed

+145
-18
lines changed

core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ import org.slf4j.Logger
3131

3232
object QueryState {
3333
val empty: QueryState =
34-
QueryState(TimestampOffset.Zero, 0, 0, 0, backtrackingCount = 0, TimestampOffset.Zero, Buckets.empty)
34+
QueryState(TimestampOffset.Zero, 0, 0, 0, 0, backtrackingCount = 0, TimestampOffset.Zero, Buckets.empty)
3535
}
3636

3737
final case class QueryState(
3838
latest: TimestampOffset,
3939
rowCount: Int,
40+
rowCountSinceBacktracking: Long,
4041
queryCount: Long,
4142
idleCount: Long,
4243
backtrackingCount: Int,
@@ -252,23 +253,27 @@ import org.slf4j.Logger
252253
}
253254
}
254255

256+
val currentTimestamp =
257+
if (settings.useAppTimestamp) Future.successful(InstantFactory.now())
258+
else dao.currentDbTimestamp()
259+
255260
Source
256261
.futureSource[Envelope, NotUsed] {
257-
dao.currentDbTimestamp().map { currentDbTime =>
262+
currentTimestamp.map { currentTime =>
258263
if (log.isDebugEnabled())
259264
log.debugN(
260265
"{} query slices [{} - {}], from time [{}] until now [{}].",
261266
logPrefix,
262267
minSlice,
263268
maxSlice,
264269
initialOffset.timestamp,
265-
currentDbTime)
270+
currentTime)
266271

267272
ContinuousQuery[QueryState, Envelope](
268273
initialState = QueryState.empty.copy(latest = initialOffset),
269274
updateState = nextOffset,
270275
delayNextQuery = _ => None,
271-
nextQuery = state => nextQuery(state, currentDbTime),
276+
nextQuery = state => nextQuery(state, currentTime),
272277
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _))
273278
}
274279
}
@@ -340,7 +345,9 @@ import org.slf4j.Logger
340345
val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
341346
val newState =
342347
if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero &&
343-
(newIdleCount >= 5 || JDuration
348+
(newIdleCount >= 5 ||
349+
state.rowCountSinceBacktracking + state.rowCount >= settings.querySettings.bufferSize * 3 ||
350+
JDuration
344351
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
345352
.compareTo(halfBacktrackingWindow) > 0)) {
346353
// FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0`
@@ -357,18 +364,25 @@ import org.slf4j.Logger
357364

358365
state.copy(
359366
rowCount = 0,
367+
rowCountSinceBacktracking = 0,
360368
queryCount = state.queryCount + 1,
361369
idleCount = newIdleCount,
362370
backtrackingCount = 1,
363371
latestBacktracking = fromOffset)
364372
} else if (switchFromBacktracking(state)) {
365373
// switch from backtracking
366-
state.copy(rowCount = 0, queryCount = state.queryCount + 1, idleCount = newIdleCount, backtrackingCount = 0)
374+
state.copy(
375+
rowCount = 0,
376+
rowCountSinceBacktracking = 0,
377+
queryCount = state.queryCount + 1,
378+
idleCount = newIdleCount,
379+
backtrackingCount = 0)
367380
} else {
368381
// continue
369382
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
370383
state.copy(
371384
rowCount = 0,
385+
rowCountSinceBacktracking = state.rowCountSinceBacktracking + state.rowCount,
372386
queryCount = state.queryCount + 1,
373387
idleCount = newIdleCount,
374388
backtrackingCount = newBacktrackingCount)
@@ -381,19 +395,29 @@ import org.slf4j.Logger
381395
val fromTimestamp = newState.nextQueryFromTimestamp
382396
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
383397

384-
if (log.isDebugEnabled())
398+
if (log.isDebugEnabled()) {
399+
val backtrackingInfo =
400+
if (newState.backtracking && !state.backtracking)
401+
s" switching to backtracking mode, [${state.rowCountSinceBacktracking + state.rowCount}] events behind,"
402+
else if (!newState.backtracking && state.backtracking)
403+
" switching from backtracking mode,"
404+
else if (newState.backtracking && state.backtracking)
405+
" in backtracking mode,"
406+
else
407+
""
385408
log.debugN(
386409
"{} next query [{}]{} from slices [{} - {}], between time [{} - {}]. {}",
387410
logPrefix,
388411
newState.queryCount,
389-
if (newState.backtracking) " in backtracking mode" else "",
412+
backtrackingInfo,
390413
minSlice,
391414
maxSlice,
392415
fromTimestamp,
393416
toTimestamp.getOrElse("None"),
394417
if (newIdleCount >= 3) s"Idle in [$newIdleCount] queries."
395418
else if (state.backtracking) s"Found [${state.rowCount}] rows in previous backtracking query."
396419
else s"Found [${state.rowCount}] rows in previous query.")
420+
}
397421

398422
newState ->
399423
Some(

core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import akka.persistence.query.typed.EventEnvelope
1616
val SourceBacktracking = "BT"
1717
val SourcePubSub = "PS"
1818

19+
def fromQuery(env: EventEnvelope[_]): Boolean =
20+
env.source == SourceQuery
21+
1922
def fromBacktracking(env: EventEnvelope[_]): Boolean =
2023
env.source == SourceBacktracking
2124

core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
8282
val event = row.payload.map(payload => serialization.deserialize(payload, row.serId, row.serManifest).get)
8383
val metadata = row.metadata.map(meta => serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
8484
val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
85+
8586
new EventEnvelope(
8687
offset,
8788
row.persistenceId,

core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePerfSpec.scala

Lines changed: 109 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,43 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
1515
import akka.actor.typed.ActorSystem
1616
import akka.persistence.query.NoOffset
1717
import akka.persistence.query.PersistenceQuery
18+
import akka.persistence.query.TimestampOffset
1819
import akka.persistence.r2dbc.TestActors
1920
import akka.persistence.r2dbc.TestActors.Persister.Persist
2021
import akka.persistence.r2dbc.TestConfig
2122
import akka.persistence.r2dbc.TestData
2223
import akka.persistence.r2dbc.TestDbLifecycle
24+
import akka.persistence.r2dbc.internal.EnvelopeOrigin
2325
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
2426
import akka.stream.scaladsl.Sink
27+
import akka.stream.scaladsl.Source
28+
import com.typesafe.config.ConfigFactory
2529
import org.scalatest.wordspec.AnyWordSpecLike
2630

31+
object EventsBySlicePerfSpec {
32+
private val config = ConfigFactory
33+
.parseString("""
34+
akka.persistence.r2dbc.journal.publish-events = on
35+
akka.persistence.r2dbc.query {
36+
backtracking.enabled = on
37+
refresh-interval = 3s
38+
#buffer-size = 100
39+
}
40+
# to measure lag latency more accurately
41+
akka.persistence.r2dbc.use-app-timestamp = true
42+
""")
43+
.withFallback(TestConfig.config)
44+
45+
private final case class PidSeqNr(pid: String, seqNr: Long)
46+
}
47+
2748
class EventsBySlicePerfSpec
28-
extends ScalaTestWithActorTestKit(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.config))
49+
extends ScalaTestWithActorTestKit(EventsBySlicePerfSpec.config)
2950
with AnyWordSpecLike
3051
with TestDbLifecycle
31-
with TestData
32-
with LogCapturing {
52+
with LogCapturing
53+
with TestData {
54+
import EventsBySlicePerfSpec.PidSeqNr
3355

3456
override def typedSystem: ActorSystem[_] = system
3557

@@ -39,11 +61,12 @@ class EventsBySlicePerfSpec
3961

4062
"retrieve from several slices" in {
4163
// increase these properties for "real" testing
64+
// also, remove LogCapturing and change logback log levels for "real" testing
4265
val numberOfPersisters = 30
4366
val numberOfEvents = 5
4467
val writeConcurrency = 10
4568
val numberOfSliceRanges = 4
46-
val iterations = 3
69+
val iterations = 2
4770
val totalNumberOfEvents = numberOfPersisters * numberOfEvents
4871

4972
val entityType = nextEntityType()
@@ -83,22 +106,98 @@ class EventsBySlicePerfSpec
83106
val counts: Seq[Future[Int]] = ranges.map { range =>
84107
query
85108
.currentEventsBySlices[String](entityType, range.min, range.max, NoOffset)
86-
.runWith(Sink.fold(0) { case (acc, _) =>
87-
if (acc > 0 && acc % 100 == 0)
88-
println(s"#$iteration Reading [$acc] events from slices [${range.min}-${range.max}] " +
89-
s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
90-
acc + 1
109+
.runWith(Sink.fold(0) { case (acc, env) =>
110+
if (EnvelopeOrigin.fromQuery(env)) {
111+
if (acc > 0 && acc % 100 == 0)
112+
println(s"#$iteration Reading [$acc] events from slices [${range.min}-${range.max}] " +
113+
s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
114+
acc + 1
115+
} else {
116+
acc
117+
}
91118
})
92119
}
93120
implicit val ec: ExecutionContext = testKit.system.executionContext
94121
val total = Await.result(Future.sequence(counts).map(_.sum), 30.seconds)
95122
total shouldBe totalNumberOfEvents
96123
println(
97-
s"#$iteration Reading all [$totalNumberOfEvents] events from [${ranges.size}] eventsBySlices " +
124+
s"#$iteration Reading all [$totalNumberOfEvents] events from [${ranges.size}] slices with currentEventsBySlices " +
98125
s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
99126
}
100127
}
101128

129+
"write and read concurrently" in {
130+
// increase these properties for "real" testing
131+
// also, remove LogCapturing and change logback log levels for "real" testing
132+
val numberOfEventsPerWriter = 20
133+
val writeConcurrency = 10
134+
val writeRps = 300
135+
val iterations = 2
136+
val totalNumberOfEvents = writeConcurrency * numberOfEventsPerWriter
137+
val verbosePrintLag = false
138+
139+
implicit val ec: ExecutionContext = testKit.system.executionContext
140+
141+
val entityType = nextEntityType()
142+
val persistenceIds = (1 to writeConcurrency).map(_ => nextPid(entityType)).toVector
143+
144+
(1 to iterations).foreach { iteration =>
145+
val t0 = System.nanoTime()
146+
val writeProbe = createTestProbe[Done]()
147+
val persisters = persistenceIds.map(pid => testKit.spawn(TestActors.Persister(pid)))
148+
Source(1 to numberOfEventsPerWriter)
149+
.mapConcat(n => persisters.map(ref => ref -> n))
150+
.throttle(writeRps / 10, 100.millis)
151+
.map { case (ref, n) =>
152+
ref ! Persist(s"e-$n")
153+
}
154+
.runWith(Sink.ignore)
155+
.foreach { _ =>
156+
// stop them at the end
157+
persisters.foreach(_ ! TestActors.Persister.Stop(writeProbe.ref))
158+
}
159+
160+
val done: Future[Done] =
161+
query
162+
.eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices - 1, NoOffset)
163+
.scan(Set.empty[PidSeqNr]) { case (acc, env) =>
164+
val newAcc = acc + PidSeqNr(env.persistenceId, env.sequenceNr)
165+
166+
if (verbosePrintLag) {
167+
val duplicate = if (newAcc.size == acc.size) " (duplicate)" else ""
168+
val lagMillis = System.currentTimeMillis() - env.timestamp
169+
val delayed =
170+
(EnvelopeOrigin.fromPubSub(env) && lagMillis > 50) ||
171+
(EnvelopeOrigin.fromQuery(
172+
env) && lagMillis > r2dbcSettings.querySettings.refreshInterval.toMillis + 300) ||
173+
(EnvelopeOrigin.fromPubSub(
174+
env) && lagMillis > r2dbcSettings.querySettings.backtrackingWindow.toMillis / 2 + 300)
175+
if (delayed)
176+
println(
177+
s"# received ${newAcc.size}$duplicate from ${env.source}: ${env.persistenceId} seqNr ${env.sequenceNr}, lag $lagMillis ms")
178+
}
179+
180+
if (newAcc.size != acc.size && (newAcc.size % 100 == 0))
181+
println(s"#$iteration Reading [${newAcc.size}] events " +
182+
s"took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
183+
newAcc
184+
185+
}
186+
.takeWhile(_.size < totalNumberOfEvents)
187+
.runWith(Sink.ignore)
188+
189+
writeProbe.receiveMessages(persisters.size, (totalNumberOfEvents / writeRps).seconds + 10.seconds)
190+
println(
191+
s"#$iteration Persisting all [$totalNumberOfEvents] events from [${persistenceIds.size}] persistent " +
192+
s"actors took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
193+
194+
Await.result(done, 30.seconds)
195+
println(
196+
s"#$iteration Reading all [$totalNumberOfEvents] events with eventsBySlices " +
197+
s"took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
198+
}
199+
}
200+
102201
}
103202

104203
}

0 commit comments

Comments
 (0)