Skip to content

Commit c4eaada

Browse files
authored
feat: Mark snapshot envelopes with snapshot source (#469)
Includes fix for snapshot stage double pull race condition
1 parent ac25135 commit c4eaada

File tree

4 files changed

+12
-6
lines changed

4 files changed

+12
-6
lines changed

core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import akka.persistence.query.typed.EventEnvelope
1515
val SourceQuery = ""
1616
val SourceBacktracking = "BT"
1717
val SourcePubSub = "PS"
18+
val SourceSnapshot = "SN"
1819

1920
def fromQuery(env: EventEnvelope[_]): Boolean =
2021
env.source == SourceQuery
@@ -28,6 +29,9 @@ import akka.persistence.query.typed.EventEnvelope
2829
def fromPubSub(env: EventEnvelope[_]): Boolean =
2930
env.source == SourcePubSub
3031

32+
def fromSnapshot(env: EventEnvelope[_]): Boolean =
33+
env.source == SourceSnapshot
34+
3135
def isFilteredEvent(env: Any): Boolean =
3236
env match {
3337
case e: EventEnvelope[_] => e.filtered

core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ import akka.stream.stage.OutHandler
6666
}
6767

6868
override def onUpstreamFinish(): Unit = {
69-
val primaryHandler = new PrimaryHandler
69+
val primaryHandler = new PrimaryHandler(isAvailable(out))
7070
self.setHandler(out, primaryHandler)
7171

7272
subFusingMaterializer.materialize(
@@ -80,9 +80,9 @@ import akka.stream.stage.OutHandler
8080
}
8181
}
8282

83-
class PrimaryHandler extends OutHandler with InHandler {
83+
class PrimaryHandler(pullImmediately: Boolean) extends OutHandler with InHandler {
8484
val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots")
85-
subSink.pull()
85+
if (pullImmediately) subSink.pull()
8686
subSink.setHandler(this)
8787

8888
override def onPull(): Unit = {

core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
162162
row.entityType,
163163
row.slice,
164164
filtered = false,
165-
source = "",
165+
source = EnvelopeOrigin.SourceSnapshot,
166166
tags = row.tags)
167167
}
168168

core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdStartingFromSnapshotSpec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package akka.persistence.r2dbc.query
66

77
import java.time.Instant
8-
98
import akka.Done
109
import akka.NotUsed
1110
import akka.actor.testkit.typed.scaladsl.LogCapturing
@@ -20,6 +19,7 @@ import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck
2019
import akka.persistence.r2dbc.TestConfig
2120
import akka.persistence.r2dbc.TestData
2221
import akka.persistence.r2dbc.TestDbLifecycle
22+
import akka.persistence.r2dbc.internal.EnvelopeOrigin
2323
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
2424
import akka.persistence.typed.PersistenceId
2525
import akka.stream.scaladsl.Source
@@ -124,7 +124,9 @@ class EventsByPersistenceIdStartingFromSnapshotSpec
124124
.runWith(sinkProbe)
125125
.request(21)
126126

127-
result.expectNext().event shouldBe expectedSnapshotEvent(17)
127+
val evt17 = result.expectNext()
128+
evt17.event shouldBe expectedSnapshotEvent(17)
129+
EnvelopeOrigin.fromSnapshot(evt17) shouldBe true
128130
for (i <- 18 to 20) {
129131
result.expectNext().event shouldBe s"e-$i"
130132
}

0 commit comments

Comments
 (0)