Skip to content

Commit e225414

Browse files
authored
feat: Configuration of number of pub-sub topics, #252 (#341)
1 parent 21ae530 commit e225414

File tree

5 files changed

+82
-4
lines changed

5 files changed

+82
-4
lines changed

core/src/main/resources/reference.conf

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ akka.persistence.r2dbc {
3434
throughput-collect-interval = 10 seconds
3535
}
3636

37+
# Group the slices for an entity type into this number of topics. Most efficient is to use
38+
# the same number as number of projection instances. If configured to less than the number of
39+
# of projection instances the overhead is that events will be sent more than once and discarded
40+
# on the destination side. If configured to more than the number of projection instances
41+
# the events will only be sent once but there is a risk of exceeding the limits of number
42+
# of topics that PubSub can handle (e.g. OversizedPayloadException).
43+
# Must be between 1 and 1024 and a whole number divisor of 1024 (number of slices).
44+
# This configuration can be changed in a rolling update, but there might be some events
45+
# that are not delivered via the pub-sub path and instead delivered later by the queries.
46+
# This configuration cannot be defined per journal, but is global for the ActorSystem.
47+
publish-events-number-of-topics = 128
48+
3749
# replay filter not needed for this plugin
3850
replay-filter.mode = off
3951
}

core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ import org.slf4j.LoggerFactory
4949

5050
private val settings = new PublishEventsDynamicSettings(
5151
system.settings.config.getConfig("akka.persistence.r2dbc.journal.publish-events-dynamic"))
52+
53+
private val sliceRanges = {
54+
val numberOfTopics = system.settings.config.getInt("akka.persistence.r2dbc.journal.publish-events-number-of-topics")
55+
persistenceExt.sliceRanges(numberOfTopics)
56+
}
57+
private val sliceRangeLookup = new ConcurrentHashMap[Int, Range]
58+
5259
private val throughputCollectIntervalMillis = settings.throughputCollectInterval.toMillis
5360
private val throughputThreshold = settings.throughputThreshold.toDouble
5461
private val throughputSampler =
@@ -64,8 +71,23 @@ import org.slf4j.LoggerFactory
6471
.narrow[Topic.Command[EventEnvelope[Event]]]
6572
}
6673

67-
private def topicName(entityType: String, slice: Int): String =
68-
URLEncoder.encode(s"r2dbc-$entityType-$slice", StandardCharsets.UTF_8.name())
74+
def eventTopics[Event](
75+
entityType: String,
76+
minSlice: Int,
77+
maxSlice: Int): Set[ActorRef[Topic.Command[EventEnvelope[Event]]]] = {
78+
(minSlice to maxSlice).map(eventTopic[Event](entityType, _)).toSet
79+
}
80+
81+
private def topicName(entityType: String, slice: Int): String = {
82+
val range = sliceRangeLookup.computeIfAbsent(
83+
slice,
84+
_ =>
85+
sliceRanges
86+
.find(_.contains(slice))
87+
.getOrElse(throw new IllegalArgumentException(s"Slice [$slice] not found in " +
88+
s"slice ranges [${sliceRanges.mkString(", ")}]")))
89+
URLEncoder.encode(s"r2dbc-$entityType-${range.min}-${range.max}", StandardCharsets.UTF_8.name())
90+
}
6991

7092
def publish(pr: PersistentRepr, timestamp: Instant): Unit = {
7193

core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,15 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
162162
bufferSize = settings.querySettings.bufferSize,
163163
overflowStrategy = OverflowStrategy.dropNew)
164164
.mapMaterializedValue { ref =>
165-
(minSlice to maxSlice).foreach { slice =>
165+
pubSub.eventTopics[Event](entityType, minSlice, maxSlice).foreach { topic =>
166166
import akka.actor.typed.scaladsl.adapter._
167-
pubSub.eventTopic(entityType, slice) ! Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
167+
topic ! Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
168168
}
169169
}
170+
.filter { env =>
171+
val slice = sliceForPersistenceId(env.persistenceId)
172+
minSlice <= slice && slice <= maxSlice
173+
}
170174
dbSource
171175
.mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
172176
.via(deduplicate(settings.querySettings.deduplicateCapacity))

core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package akka.persistence.r2dbc.query
66

7+
import scala.collection.immutable
78
import java.time.Instant
89

910
import scala.concurrent.Await
@@ -15,6 +16,7 @@ import akka.actor.testkit.typed.scaladsl.LoggingTestKit
1516
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
1617
import akka.actor.typed.ActorSystem
1718
import akka.actor.typed.internal.pubsub.TopicImpl
19+
import akka.persistence.Persistence
1820
import akka.persistence.query.NoOffset
1921
import akka.persistence.query.PersistenceQuery
2022
import akka.persistence.query.TimestampOffset
@@ -44,6 +46,7 @@ object EventsBySlicePubSubSpec {
4446
.parseString("""
4547
akka.persistence.r2dbc {
4648
journal.publish-events = on
49+
#journal.publish-events-number-of-topics = 4
4750
journal.publish-events-dynamic {
4851
throughput-threshold = 50
4952
throughput-collect-interval = 1 second
@@ -234,6 +237,41 @@ class EventsBySlicePubSubSpec
234237

235238
}
236239

240+
"group slices into topics" in new Setup {
241+
242+
val numberOfTopics =
243+
typedSystem.settings.config.getInt("akka.persistence.r2dbc.journal.publish-events-number-of-topics")
244+
//
245+
val querySliceRanges = Persistence(typedSystem).sliceRanges(numberOfTopics * 2)
246+
val queries: immutable.IndexedSeq[TestSubscriber.Probe[EventEnvelope[String]]] = {
247+
querySliceRanges.map { range =>
248+
query.eventsBySlices[String](entityType, range.min, range.max, NoOffset).runWith(sinkProbe).request(100)
249+
}
250+
}
251+
252+
val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
253+
eventually {
254+
(0 until 1024).foreach { i =>
255+
withClue(s"slice $i: ") {
256+
PubSub(typedSystem).eventTopic[String](entityType, i) ! TopicImpl.GetTopicStats(topicStatsProbe.ref)
257+
topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 2
258+
}
259+
}
260+
}
261+
262+
for (i <- 1 to 10) {
263+
persister ! PersistWithAck(s"e-$i", probe.ref)
264+
probe.expectMessage(Done)
265+
}
266+
267+
for (i <- 1 to 10) {
268+
val queryIndex = querySliceRanges.indexOf(querySliceRanges.find(_.contains(slice)).get)
269+
queries(queryIndex).expectNext().event shouldBe s"e-$i"
270+
}
271+
272+
queries.foreach(_.cancel())
273+
}
274+
237275
}
238276

239277
}

docs/src/main/paradox/query.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ Disable publishing of events with configuration:
112112
akka.persistence.r2dbc.journal.publish-events = off
113113
```
114114

115+
If you use many queries or Projection instances you should consider adjusting the `akka.persistence.r2dbc.journal.publish-events-number-of-topics` configuration, see @ref:[Configuration](#configuration).
116+
115117
## Durable state queries
116118

117119
@apidoc[R2dbcDurableStateStore] implements the following @extref:[Persistence Queries](akka:durable-state/persistence-query.html):

0 commit comments

Comments
 (0)