Skip to content

Commit f77906c

Browse files
authored
feat: Recovery based on only the last event (#651)
* feat: Recovery based on only the last event * bump: Akka core 2.10.1 * EventEnvelope constructor * sql server limit 1
1 parent 64e6f22 commit f77906c

File tree

12 files changed

+298
-36
lines changed

12 files changed

+298
-36
lines changed

core/src/main/scala/akka/persistence/r2dbc/internal/QueryDao.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ private[r2dbc] trait QueryDao extends BySliceQuery.Dao[SerializedJournalRow] {
2626
def timestampOfEvent(persistenceId: String, seqNr: Long): Future[Option[Instant]]
2727
def loadEvent(persistenceId: String, seqNr: Long, includePayload: Boolean): Future[Option[SerializedJournalRow]]
2828

29+
def loadLastEvent(persistenceId: String, toSeqNr: Long, includeDeleted: Boolean): Future[Option[SerializedJournalRow]]
30+
2931
def eventsByPersistenceId(
3032
persistenceId: String,
3133
fromSequenceNr: Long,

core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import scala.concurrent.Future
1212
import scala.concurrent.duration.Duration
1313
import scala.concurrent.duration.FiniteDuration
1414

15+
import io.r2dbc.spi.Row
1516
import io.r2dbc.spi.Statement
1617
import org.slf4j.Logger
1718
import org.slf4j.LoggerFactory
@@ -137,23 +138,44 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
137138
protected def selectOneEventSql(slice: Int): String =
138139
sqlCache.get(slice, "selectOneEventSql") {
139140
sql"""
140-
SELECT slice, entity_type, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload, tags
141+
SELECT entity_type, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload, tags
141142
FROM ${journalTable(slice)}
142143
WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse"""
143144
}
144145

145146
protected def selectOneEventWithoutPayloadSql(slice: Int): String =
146147
sqlCache.get(slice, "selectOneEventWithoutPayloadSql") {
147148
sql"""
148-
SELECT slice, entity_type, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags
149+
SELECT entity_type, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags
149150
FROM ${journalTable(slice)}
150151
WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse"""
151152
}
152153

154+
protected def selectLastEventSql(slice: Int): String =
155+
sqlCache.get(slice, "selectLastEventSql") {
156+
sql"""
157+
SELECT entity_type, seq_nr, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags
158+
FROM ${journalTable(slice)}
159+
WHERE persistence_id = ? AND seq_nr <= ? AND deleted = $sqlFalse
160+
ORDER BY seq_nr DESC
161+
LIMIT 1"""
162+
}
163+
164+
protected def selectLastEventIncludeDeletedSql(slice: Int): String =
165+
sqlCache.get(slice, "selectLastEventIncludeDeletedSql") {
166+
sql"""
167+
SELECT entity_type, seq_nr, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags, deleted
168+
FROM ${journalTable(slice)}
169+
WHERE persistence_id = ? AND seq_nr <= ?
170+
ORDER BY seq_nr DESC
171+
LIMIT 1
172+
"""
173+
}
174+
153175
protected def selectEventsSql(slice: Int): String =
154176
sqlCache.get(slice, "selectEventsSql") {
155177
sql"""
156-
SELECT slice, entity_type, seq_nr, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags
178+
SELECT entity_type, seq_nr, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags
157179
from ${journalTable(slice)}
158180
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
159181
AND deleted = false
@@ -164,7 +186,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
164186
protected def selectEventsIncludeDeletedSql(slice: Int): String =
165187
sqlCache.get(slice, "selectEventsIncludeDeletedSql") {
166188
sql"""
167-
SELECT slice, entity_type, seq_nr, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags, deleted
189+
SELECT entity_type, seq_nr, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags, deleted
168190
from ${journalTable(slice)}
169191
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
170192
ORDER BY seq_nr
@@ -393,7 +415,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
393415
Some(row.getPayload("event_payload"))
394416
else None
395417
SerializedJournalRow(
396-
slice = row.get[Integer]("slice", classOf[Integer]),
418+
slice = slice,
397419
entityType = row.get("entity_type", classOf[String]),
398420
persistenceId,
399421
seqNr,
@@ -408,6 +430,42 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
408430
})
409431
}
410432

433+
override def loadLastEvent(
434+
persistenceId: String,
435+
toSeqNr: Long,
436+
includeDeleted: Boolean): Future[Option[SerializedJournalRow]] = {
437+
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
438+
val executor = executorProvider.executorFor(slice)
439+
val selectSql = if (includeDeleted) selectLastEventIncludeDeletedSql(slice) else selectLastEventSql(slice)
440+
executor.selectOne("select last event")(
441+
connection => {
442+
connection
443+
.createStatement(selectSql)
444+
.bind(0, persistenceId)
445+
.bind(1, toSeqNr)
446+
},
447+
row => {
448+
if (includeDeleted && row.get[java.lang.Boolean]("deleted", classOf[java.lang.Boolean])) {
449+
// deleted row
450+
deletedJournalRow(slice, persistenceId, row)
451+
} else {
452+
SerializedJournalRow(
453+
slice = slice,
454+
entityType = row.get("entity_type", classOf[String]),
455+
persistenceId,
456+
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
457+
dbTimestamp = row.getTimestamp("db_timestamp"),
458+
readDbTimestamp = row.getTimestamp("read_db_timestamp"),
459+
payload = Some(row.getPayload("event_payload")),
460+
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
461+
serManifest = row.get("event_ser_manifest", classOf[String]),
462+
writerUuid = row.get("writer", classOf[String]),
463+
tags = row.getTags("tags"),
464+
metadata = readMetadata(row))
465+
}
466+
})
467+
}
468+
411469
override def eventsByPersistenceId(
412470
persistenceId: String,
413471
fromSequenceNr: Long,
@@ -424,22 +482,10 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
424482
row =>
425483
if (includeDeleted && row.get[java.lang.Boolean]("deleted", classOf[java.lang.Boolean])) {
426484
// deleted row
427-
SerializedJournalRow(
428-
slice = row.get[Integer]("slice", classOf[Integer]),
429-
entityType = row.get("entity_type", classOf[String]),
430-
persistenceId = persistenceId,
431-
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
432-
dbTimestamp = row.getTimestamp("db_timestamp"),
433-
readDbTimestamp = row.getTimestamp("read_db_timestamp"),
434-
payload = None,
435-
serId = 0,
436-
serManifest = "",
437-
writerUuid = "",
438-
tags = Set.empty,
439-
metadata = None)
485+
deletedJournalRow(slice, persistenceId, row)
440486
} else {
441487
SerializedJournalRow(
442-
slice = row.get[Integer]("slice", classOf[Integer]),
488+
slice = slice,
443489
entityType = row.get("entity_type", classOf[String]),
444490
persistenceId = persistenceId,
445491
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
@@ -459,6 +505,22 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
459505
Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed)
460506
}
461507

508+
private def deletedJournalRow(slice: Int, persistenceId: String, row: Row): SerializedJournalRow = {
509+
SerializedJournalRow(
510+
slice = slice,
511+
entityType = row.get("entity_type", classOf[String]),
512+
persistenceId = persistenceId,
513+
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
514+
dbTimestamp = row.getTimestamp("db_timestamp"),
515+
readDbTimestamp = row.getTimestamp("read_db_timestamp"),
516+
payload = None,
517+
serId = 0,
518+
serManifest = "",
519+
writerUuid = "",
520+
tags = Set.empty,
521+
metadata = None)
522+
}
523+
462524
protected def bindPersistenceIdsForEntityTypeAfterSql(
463525
stmt: Statement,
464526
entityType: String,

core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,26 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider)
163163
stmt
164164
}
165165

166+
override protected def selectLastEventSql(slice: Int): String =
167+
sqlCache.get(slice, "selectLastEventSql") {
168+
sql"""
169+
SELECT TOP(1) entity_type, seq_nr, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags
170+
FROM ${journalTable(slice)}
171+
WHERE persistence_id = ? AND seq_nr <= ? AND deleted = $sqlFalse
172+
ORDER BY seq_nr DESC
173+
"""
174+
}
175+
176+
override protected def selectLastEventIncludeDeletedSql(slice: Int): String =
177+
sqlCache.get(slice, "selectLastEventIncludeDeletedSql") {
178+
sql"""
179+
SELECT TOP(1) entity_type, seq_nr, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags, deleted
180+
FROM ${journalTable(slice)}
181+
WHERE persistence_id = ? AND seq_nr <= ?
182+
ORDER BY seq_nr DESC
183+
"""
184+
}
185+
166186
override protected def persistenceIdsForEntityTypeAfterSql(minSlice: Int): String =
167187
sqlCache.get(minSlice, "persistenceIdsForEntityTypeAfterSql") {
168188
sql"""

core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,11 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends
220220

221221
override def replayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
222222
recoveryCallback: PersistentRepr => Unit): Future[Long] = {
223-
log.debug("replayMessages [{}] [{}]", persistenceId, fromSequenceNr)
223+
if (fromSequenceNr == -1)
224+
log.debug("replayMessages [{}] [{}]", persistenceId, "last")
225+
else
226+
log.debug("replayMessages [{}] [{}]", persistenceId, fromSequenceNr)
227+
224228
val pendingWrite = Option(writesInProgress.get(persistenceId)) match {
225229
case Some(f) =>
226230
log.debug("Write in progress for [{}], deferring replayMessages until write completed", persistenceId)
@@ -229,7 +233,23 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends
229233
case None => FutureDone
230234
}
231235
pendingWrite.flatMap { _ =>
232-
if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) {
236+
if (toSequenceNr <= 0 || max == 0) {
237+
// no replay
238+
journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
239+
} else if (fromSequenceNr == -1) {
240+
// recover from last event only
241+
query.internalLastEventByPersistenceId(persistenceId, toSequenceNr, includeDeleted = true).map {
242+
case Some(item) =>
243+
// payload is empty for deleted item
244+
if (item.payload.isDefined) {
245+
val repr = deserializeRow(serialization, item)
246+
recoveryCallback(repr)
247+
}
248+
item.seqNr
249+
case None =>
250+
0L
251+
}
252+
} else if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) {
233253
// this is the normal case, highest sequence number from last event
234254
query
235255
.internalCurrentEventsByPersistenceId(
@@ -246,9 +266,6 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends
246266
}
247267
item.seqNr
248268
})
249-
} else if (toSequenceNr <= 0) {
250-
// no replay
251-
journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
252269
} else {
253270
// replay to custom sequence number
254271

core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
194194
1L,
195195
eventOption = None,
196196
timestamp.toEpochMilli,
197-
eventMetadata = None,
197+
_eventMetadata = None,
198198
entityType,
199199
slice,
200200
filtered = true,
@@ -725,6 +725,16 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
725725
.mapMaterializedValue(_ => NotUsed)
726726
}
727727

728+
/**
729+
* INTERNAL API
730+
*/
731+
@InternalApi private[r2dbc] def internalLastEventByPersistenceId(
732+
persistenceId: String,
733+
toSequenceNr: Long,
734+
includeDeleted: Boolean): Future[Option[SerializedJournalRow]] = {
735+
queryDao.loadLastEvent(persistenceId, toSequenceNr, includeDeleted)
736+
}
737+
728738
// EventTimestampQuery
729739
override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = {
730740
val result = queryDao.timestampOfEvent(persistenceId, sequenceNr)

core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ object R2dbcSnapshotStore {
3838
.deserialize(serializedMeta.payload, serializedMeta.serializerId, serializedMeta.serializerManifest)
3939
.get)),
4040
serialization.deserialize(snap.snapshot, snap.serializerId, snap.serializerManifest).get)
41+
42+
private val FutureNone = Future.successful(None)
4143
}
4244

4345
/**
@@ -49,6 +51,7 @@ object R2dbcSnapshotStore {
4951
@InternalApi
5052
private[r2dbc] final class R2dbcSnapshotStore(cfg: Config, cfgPath: String) extends SnapshotStore {
5153
import R2dbcSnapshotStore.deserializeSnapshotRow
54+
import R2dbcSnapshotStore.FutureNone
5255

5356
private implicit val ec: ExecutionContext = context.dispatcher
5457
private val serialization: Serialization = SerializationExtension(context.system)
@@ -69,10 +72,14 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config, cfgPath: String) exte
6972
private val dao = settings.connectionFactorySettings.dialect.createSnapshotDao(executorProvider)
7073
private val queryDao = settings.connectionFactorySettings.dialect.createQueryDao(executorProvider)
7174

72-
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
73-
dao
74-
.load(persistenceId, criteria)
75-
.map(_.map(row => deserializeSnapshotRow(row, serialization)))
75+
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
76+
if (criteria.maxSequenceNr <= 0)
77+
FutureNone
78+
else
79+
dao
80+
.load(persistenceId, criteria)
81+
.map(_.map(row => deserializeSnapshotRow(row, serialization)))
82+
}
7683

7784
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
7885
val entityType = PersistenceId.extractEntityType(metadata.persistenceId)

core/src/test/scala/akka/persistence/r2dbc/TestActors.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import akka.persistence.typed.ReplicaId
1414
import akka.persistence.typed.ReplicationId
1515
import akka.persistence.typed.SnapshotCompleted
1616
import akka.persistence.typed.scaladsl.EventSourcedBehavior
17+
import akka.persistence.typed.scaladsl.Recovery
1718
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
1819
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
1920

@@ -82,6 +83,16 @@ object TestActors {
8283
}
8384
}
8485

86+
def withRecovery(pid: PersistenceId, recovery: Recovery): Behavior[Command] = {
87+
Behaviors.setup { context =>
88+
eventSourcedBehavior(pid, context)
89+
.snapshotWhen { case (_, event, _) =>
90+
event.toString.contains("snap")
91+
}
92+
.withRecovery(recovery)
93+
}
94+
}
95+
8596
def eventSourcedBehavior(
8697
pid: PersistenceId,
8798
context: ActorContext[Command]): EventSourcedBehavior[Command, Any, String] = {
@@ -198,8 +209,11 @@ object TestActors {
198209
ReplicationId(entityType, entityId, ReplicaId("dc-1")),
199210
Set(ReplicaId("dc-1")),
200211
R2dbcReadJournal.Identifier) { replicationContext =>
201-
Persister.eventSourcedBehavior(PersistenceId(entityType, entityId), context)
212+
Persister.eventSourcedBehavior(replicationContext.persistenceId, context)
202213
}
203214
}
204215
}
216+
217+
def replicatedEventSourcedPersistenceId(entityType: String, entityId: String): PersistenceId =
218+
ReplicationId(entityType, entityId, ReplicaId("dc-1")).persistenceId
205219
}

core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalSpec.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,14 @@ object R2dbcJournalSpec {
3333
}
3434

3535
class R2dbcJournalSpec extends JournalSpec(R2dbcJournalSpec.config) with TestDbLifecycle {
36-
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.off()
36+
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = false
37+
override protected def supportsReplayOnlyLast: CapabilityFlag = true
3738
override def typedSystem: ActorSystem[_] = system.toTyped
3839
}
3940

4041
class R2dbcJournalWithMetaSpec extends JournalSpec(R2dbcJournalSpec.configWithMeta) with TestDbLifecycle {
41-
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.off()
42-
protected override def supportsMetadata: CapabilityFlag = CapabilityFlag.on()
42+
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = false
43+
protected override def supportsMetadata: CapabilityFlag = true
44+
override protected def supportsReplayOnlyLast: CapabilityFlag = true
4345
override def typedSystem: ActorSystem[_] = system.toTyped
4446
}

core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,15 @@ class EventsByPersistenceIdSpec
155155
val probe = testKit.createTestProbe[Done]()
156156
val entityType = nextEntityType()
157157
val entityId = "entity-1"
158+
val pid = TestActors.replicatedEventSourcedPersistenceId(entityType, entityId).id
158159

159160
val persister = testKit.spawn(TestActors.replicatedEventSourcedPersister(entityType, entityId))
160161
persister ! Persister.PersistWithAck("e-1", probe.ref)
161162
probe.expectMessage(Done)
162163
persister ! Persister.PersistWithAck("e-2", probe.ref)
163164
probe.expectMessage(Done)
164165

165-
val sub = doQuery(PersistenceId(entityType, entityId).id, 0, Long.MaxValue)
166+
val sub = doQuery(pid, 0, Long.MaxValue)
166167
.runWith(TestSink())
167168
.request(10)
168169

core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ class EventsBySliceSpec
242242
val probe = testKit.createTestProbe[Done]()
243243
val entityType = nextEntityType()
244244
val entityId = "entity-1"
245-
val persistenceId = PersistenceId(entityType, entityId)
245+
val persistenceId = TestActors.replicatedEventSourcedPersistenceId(entityType, entityId)
246246
val slice = query.sliceForPersistenceId(persistenceId.id)
247247

248248
val persister = testKit.spawn(TestActors.replicatedEventSourcedPersister(entityType, entityId))

0 commit comments

Comments
 (0)