Skip to content

Commit cc4aa83

Browse files
authored
perf: Avoid asyncReadHighestSequenceNr query (#583)
* perf: Avoid asyncReadHighestSequenceNr query * AsyncReplay added in Akka akka/akka#32434 * Akka 2.9.4
1 parent 12cfe90 commit cc4aa83

File tree

8 files changed

+133
-45
lines changed

8 files changed

+133
-45
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ inThisBuild(
3232
resolvers += "Akka library repository".at("https://repo.akka.io/maven"),
3333
// add snapshot repo when Akka version overriden
3434
resolvers ++=
35-
(if (System.getProperty("override.akka.version") != null)
35+
(if (Dependencies.AkkaVersion.endsWith("-SNAPSHOT"))
3636
Seq("Akka library snapshot repository".at("https://repo.akka.io/snapshots"))
3737
else Seq.empty)))
3838

core/src/main/scala/akka/persistence/r2dbc/internal/QueryDao.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ private[r2dbc] trait QueryDao extends BySliceQuery.Dao[SerializedJournalRow] {
2929
def eventsByPersistenceId(
3030
persistenceId: String,
3131
fromSequenceNr: Long,
32-
toSequenceNr: Long): Source[SerializedJournalRow, NotUsed]
32+
toSequenceNr: Long,
33+
includeDeleted: Boolean): Source[SerializedJournalRow, NotUsed]
3334

3435
def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed]
3536

core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala

+45-16
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,24 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
144144
protected def selectEventsSql(slice: Int): String =
145145
sqlCache.get(slice, "selectEventsSql") {
146146
sql"""
147-
SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags
147+
SELECT slice, entity_type, seq_nr, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags
148148
from ${journalTable(slice)}
149149
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
150150
AND deleted = false
151151
ORDER BY seq_nr
152152
LIMIT ?"""
153153
}
154154

155+
protected def selectEventsIncludeDeletedSql(slice: Int): String =
156+
sqlCache.get(slice, "selectEventsIncludeDeletedSql") {
157+
sql"""
158+
SELECT slice, entity_type, seq_nr, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags, deleted
159+
from ${journalTable(slice)}
160+
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
161+
ORDER BY seq_nr
162+
LIMIT ?"""
163+
}
164+
155165
protected def bindSelectEventsSql(
156166
stmt: Statement,
157167
persistenceId: String,
@@ -378,28 +388,47 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
378388
override def eventsByPersistenceId(
379389
persistenceId: String,
380390
fromSequenceNr: Long,
381-
toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
391+
toSequenceNr: Long,
392+
includeDeleted: Boolean): Source[SerializedJournalRow, NotUsed] = {
382393
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
383394
val executor = executorProvider.executorFor(slice)
384395
val result = executor.select(s"select eventsByPersistenceId [$persistenceId]")(
385396
connection => {
386-
val stmt = connection.createStatement(selectEventsSql(slice))
397+
val selectSql = if (includeDeleted) selectEventsIncludeDeletedSql(slice) else selectEventsSql(slice)
398+
val stmt = connection.createStatement(selectSql)
387399
bindSelectEventsSql(stmt, persistenceId, fromSequenceNr, toSequenceNr, settings.querySettings.bufferSize)
388400
},
389401
row =>
390-
SerializedJournalRow(
391-
slice = row.get[Integer]("slice", classOf[Integer]),
392-
entityType = row.get("entity_type", classOf[String]),
393-
persistenceId = row.get("persistence_id", classOf[String]),
394-
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
395-
dbTimestamp = row.getTimestamp("db_timestamp"),
396-
readDbTimestamp = row.getTimestamp("read_db_timestamp"),
397-
payload = Some(row.getPayload("event_payload")),
398-
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
399-
serManifest = row.get("event_ser_manifest", classOf[String]),
400-
writerUuid = row.get("writer", classOf[String]),
401-
tags = row.getTags("tags"),
402-
metadata = readMetadata(row)))
402+
if (includeDeleted && row.get[java.lang.Boolean]("deleted", classOf[java.lang.Boolean])) {
403+
// deleted row
404+
SerializedJournalRow(
405+
slice = row.get[Integer]("slice", classOf[Integer]),
406+
entityType = row.get("entity_type", classOf[String]),
407+
persistenceId = persistenceId,
408+
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
409+
dbTimestamp = row.getTimestamp("db_timestamp"),
410+
readDbTimestamp = row.getTimestamp("read_db_timestamp"),
411+
payload = None,
412+
serId = 0,
413+
serManifest = "",
414+
writerUuid = "",
415+
tags = Set.empty,
416+
metadata = None)
417+
} else {
418+
SerializedJournalRow(
419+
slice = row.get[Integer]("slice", classOf[Integer]),
420+
entityType = row.get("entity_type", classOf[String]),
421+
persistenceId = persistenceId,
422+
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
423+
dbTimestamp = row.getTimestamp("db_timestamp"),
424+
readDbTimestamp = row.getTimestamp("read_db_timestamp"),
425+
payload = Some(row.getPayload("event_payload")),
426+
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
427+
serManifest = row.get("event_ser_manifest", classOf[String]),
428+
writerUuid = row.get("writer", classOf[String]),
429+
tags = row.getTags("tags"),
430+
metadata = readMetadata(row))
431+
})
403432

404433
if (log.isDebugEnabled)
405434
result.foreach(rows => log.debug("Read [{}] events for persistenceId [{}]", rows.size, persistenceId))

core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala

+9
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider)
5757
ORDER BY seq_nr"""
5858
}
5959

60+
override protected def selectEventsIncludeDeletedSql(slice: Int): String =
61+
sqlCache.get(slice, "selectEventsIncludeDeletedSql") {
62+
sql"""
63+
SELECT TOP(@limit) slice, entity_type, persistence_id, seq_nr, db_timestamp, SYSUTCDATETIME() AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags, deleted
64+
from ${journalTable(slice)}
65+
WHERE persistence_id = @persistenceId AND seq_nr >= @from AND seq_nr <= @to
66+
ORDER BY seq_nr"""
67+
}
68+
6069
/**
6170
* custom binding because the first param in the query is @limit (or '0' when using positional binding)
6271
*

core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala

+66-21
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import akka.stream.scaladsl.Sink
4242
import com.typesafe.config.Config
4343
import org.slf4j.LoggerFactory
4444

45+
import akka.persistence.journal.AsyncReplay
4546
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
4647

4748
/**
@@ -72,13 +73,16 @@ private[r2dbc] object R2dbcJournal {
7273
}
7374
reprWithMeta
7475
}
76+
77+
val FutureDone: Future[Done] = Future.successful(Done)
7578
}
7679

7780
/**
7881
* INTERNAL API
7982
*/
8083
@InternalApi
81-
private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends AsyncWriteJournal {
84+
private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends AsyncWriteJournal with AsyncReplay {
85+
import R2dbcJournal.FutureDone
8286
import R2dbcJournal.WriteFinished
8387
import R2dbcJournal.deserializeRow
8488

@@ -215,30 +219,71 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends
215219
journalDao.deleteEventsTo(persistenceId, toSequenceNr, resetSequenceNumber = false)
216220
}
217221

218-
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
219-
recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
220-
log.debug("asyncReplayMessages persistenceId [{}], fromSequenceNr [{}]", persistenceId, fromSequenceNr)
221-
val effectiveToSequenceNr =
222-
if (max == Long.MaxValue) toSequenceNr
223-
else math.min(toSequenceNr, fromSequenceNr + max - 1)
224-
query
225-
.internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, effectiveToSequenceNr)
226-
.runWith(Sink.foreach { row =>
227-
val repr = deserializeRow(serialization, row)
228-
recoveryCallback(repr)
229-
})
230-
.map(_ => ())
231-
}
232-
233-
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
234-
log.debug("asyncReadHighestSequenceNr [{}] [{}]", persistenceId, fromSequenceNr)
222+
override def replayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
223+
recoveryCallback: PersistentRepr => Unit): Future[Long] = {
224+
log.debug("replayMessages [{}] [{}]", persistenceId, fromSequenceNr)
235225
val pendingWrite = Option(writesInProgress.get(persistenceId)) match {
236226
case Some(f) =>
237-
log.debug("Write in progress for [{}], deferring highest seq nr until write completed", persistenceId)
227+
log.debug("Write in progress for [{}], deferring replayMessages until write completed", persistenceId)
238228
// we only want to make write - replay sequential, not fail if previous write failed
239229
f.recover { case _ => Done }(ExecutionContexts.parasitic)
240-
case None => Future.successful(Done)
230+
case None => FutureDone
231+
}
232+
pendingWrite.flatMap { _ =>
233+
if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) {
234+
// this is the normal case, highest sequence number from last event
235+
query
236+
.internalCurrentEventsByPersistenceId(
237+
persistenceId,
238+
fromSequenceNr,
239+
toSequenceNr,
240+
readHighestSequenceNr = false,
241+
includeDeleted = true)
242+
.runWith(Sink.fold(0L) { (_, item) =>
243+
// payload is empty for deleted item
244+
if (item.payload.isDefined) {
245+
val repr = deserializeRow(serialization, item)
246+
recoveryCallback(repr)
247+
}
248+
item.seqNr
249+
})
250+
} else if (toSequenceNr <= 0) {
251+
// no replay
252+
journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
253+
} else {
254+
// replay to custom sequence number
255+
256+
val highestSeqNr = journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
257+
258+
val effectiveToSequenceNr =
259+
if (max == Long.MaxValue) toSequenceNr
260+
else math.min(toSequenceNr, fromSequenceNr + max - 1)
261+
262+
query
263+
.internalCurrentEventsByPersistenceId(
264+
persistenceId,
265+
fromSequenceNr,
266+
effectiveToSequenceNr,
267+
readHighestSequenceNr = false,
268+
includeDeleted = false)
269+
.runWith(Sink
270+
.foreach { item =>
271+
val repr = deserializeRow(serialization, item)
272+
recoveryCallback(repr)
273+
})
274+
.flatMap(_ => highestSeqNr)
275+
}
241276
}
242-
pendingWrite.flatMap(_ => journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr))
277+
}
278+
279+
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
280+
recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
281+
throw new IllegalStateException(
282+
"asyncReplayMessages is not supposed to be called when implementing AsyncReplay. This is a bug, please report.")
283+
}
284+
285+
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
286+
throw new IllegalStateException(
287+
"asyncReplayMessages is not supposed to be called when implementing AsyncReplay. This is a bug, please report.")
243288
}
244289
}

core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala

+8-4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import com.typesafe.config.Config
5656
import org.slf4j.LoggerFactory
5757

5858
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
59+
import akka.util.OptionVal
5960

6061
object R2dbcReadJournal {
6162
val Identifier = "akka.persistence.r2dbc.query"
@@ -569,7 +570,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
569570
@InternalApi private[r2dbc] def internalCurrentEventsByPersistenceId(
570571
persistenceId: String,
571572
fromSequenceNr: Long,
572-
toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
573+
toSequenceNr: Long,
574+
readHighestSequenceNr: Boolean = true,
575+
includeDeleted: Boolean = false): Source[SerializedJournalRow, NotUsed] = {
573576

574577
def updateState(state: ByPersistenceIdState, row: SerializedJournalRow): ByPersistenceIdState =
575578
state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr)
@@ -591,7 +594,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
591594

592595
newState -> Some(
593596
queryDao
594-
.eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, highestSeqNr))
597+
.eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, highestSeqNr, includeDeleted))
595598
} else {
596599
log.debugN(
597600
"currentEventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.",
@@ -611,7 +614,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
611614
toSequenceNr)
612615

613616
val highestSeqNrFut =
614-
if (toSequenceNr == Long.MaxValue) journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
617+
if (readHighestSequenceNr && toSequenceNr == Long.MaxValue)
618+
journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
615619
else Future.successful(toSequenceNr)
616620

617621
Source
@@ -707,7 +711,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
707711
newState ->
708712
Some(
709713
queryDao
710-
.eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, toSequenceNr))
714+
.eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, toSequenceNr, includeDeleted = false))
711715
}
712716
}
713717

native-image-tests/build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ scalaVersion := "2.13.14"
66

77
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
88

9-
lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.9.3")
9+
lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.9.4")
1010
lazy val akkaR2dbcVersion = sys.props.getOrElse("akka.r2dbc.version", "1.2.3")
1111

1212
fork := true

project/Dependencies.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ object Dependencies {
99
val Scala3 = "3.3.3"
1010
val Scala2Versions = Seq(Scala213)
1111
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3
12-
val AkkaVersion = System.getProperty("override.akka.version", "2.9.3")
12+
val AkkaVersion = System.getProperty("override.akka.version", "2.9.4")
1313
val AkkaVersionInDocs = VersionNumber(AkkaVersion).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }
1414
val AkkaPersistenceJdbcVersion = "5.4.0" // only in migration tool tests
1515
val AkkaProjectionVersionInDocs = "current"

0 commit comments

Comments
 (0)