Skip to content

Commit 089b21a

Browse files
authored
perf: use subqueries over series for latest event timestamp (#671)
* perf: use subqueries over series for latest event timestamp * adjust test for multiple partitions
1 parent ee48425 commit 089b21a

File tree

2 files changed

+39
-26
lines changed

2 files changed

+39
-26
lines changed

core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,20 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
135135
WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse"""
136136
}
137137

138-
protected def selectLatestEventTimestampSql(minSlice: Int, maxSlice: Int): String =
139-
sqlCache.get(minSlice, s"selectLatestEventTimestampSql-$minSlice-$maxSlice") {
138+
protected def selectLatestEventTimestampSql(slice: Int): String =
139+
sqlCache.get(slice, "selectLatestEventTimestampSql") {
140140
sql"""
141-
SELECT MAX(db_timestamp) AS latest_timestamp
142-
FROM ${journalTable(minSlice)}
143-
WHERE entity_type = ?
144-
AND ${sliceCondition(minSlice, maxSlice)}
145-
AND deleted = $sqlFalse
141+
SELECT MAX(per_slice.latest_timestamp) AS latest_timestamp
142+
FROM (
143+
SELECT
144+
(SELECT MAX(db_timestamp)
145+
FROM ${journalTable(slice)}
146+
WHERE entity_type = ?
147+
AND slice = slice_range.slice
148+
AND deleted = $sqlFalse) AS latest_timestamp
149+
FROM (SELECT * FROM generate_series(?, ?)) AS slice_range(slice)
150+
) per_slice
151+
WHERE per_slice.latest_timestamp IS NOT NULL;
146152
"""
147153
}
148154

@@ -412,8 +418,10 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
412418
.selectOne(s"select latest event timestamp for entity type [$entityType] slice range [$minSlice - $maxSlice]")(
413419
connection =>
414420
connection
415-
.createStatement(selectLatestEventTimestampSql(minSlice, maxSlice))
416-
.bind(0, entityType),
421+
.createStatement(selectLatestEventTimestampSql(minSlice))
422+
.bind(0, entityType)
423+
.bind(1, minSlice)
424+
.bind(2, maxSlice),
417425
row => Option.when(row.get("latest_timestamp") ne null)(row.getTimestamp("latest_timestamp")))
418426
.map(_.flatten)(ExecutionContext.parasitic)
419427

core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -292,24 +292,29 @@ class EventsBySliceSpec
292292

293293
query shouldBe a[LatestEventTimestampQuery]
294294

295-
{
296-
// test all slice ranges, with the events expected in one of the ranges
297-
val numRanges = 4
298-
val rangeSize = 1024 / numRanges
299-
val expectedRangeIndex = slice / rangeSize
300-
301-
def sliceRange(rangeIndex: Int): (Int, Int) = {
302-
val minSlice = rangeIndex * rangeSize
303-
val maxSlice = minSlice + rangeSize - 1
304-
minSlice -> maxSlice
305-
}
295+
val partitions = settings.numberOfDataPartitions
296+
val testNumRanges =
297+
if (partitions > 1) List(partitions, partitions * 2, 1024)
298+
else List(1, 4, 1024)
299+
testNumRanges.foreach { numRanges =>
300+
withClue(s"numRanges=$numRanges: ") {
301+
// test all slice ranges, with the events expected in one of the ranges
302+
val rangeSize = 1024 / numRanges
303+
val expectedRangeIndex = slice / rangeSize
304+
305+
def sliceRange(rangeIndex: Int): (Int, Int) = {
306+
val minSlice = rangeIndex * rangeSize
307+
val maxSlice = minSlice + rangeSize - 1
308+
minSlice -> maxSlice
309+
}
306310

307-
for (rangeIndex <- 0 until numRanges) {
308-
val (minSlice, maxSlice) = sliceRange(rangeIndex)
309-
val expectedTimestamp =
310-
if (rangeIndex != expectedRangeIndex) None
311-
else query.timestampOf(persistenceId, 3L).futureValue
312-
query.latestEventTimestamp(entityType, minSlice, maxSlice).futureValue shouldBe expectedTimestamp
311+
for (rangeIndex <- 0 until numRanges) {
312+
val (minSlice, maxSlice) = sliceRange(rangeIndex)
313+
val expectedTimestamp =
314+
if (rangeIndex != expectedRangeIndex) None
315+
else query.timestampOf(persistenceId, 3L).futureValue
316+
query.latestEventTimestamp(entityType, minSlice, maxSlice).futureValue shouldBe expectedTimestamp
317+
}
313318
}
314319
}
315320
}

0 commit comments

Comments
 (0)