Skip to content

Commit 8ced0b9

Browse files
committed
Pubsub source v2 pre-fetches more messages when blocked
1 parent e8ea8ea commit 8ced0b9

9 files changed

+523
-212
lines changed

modules/gcp/src/main/resources/application.conf

+2-3
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
"durationPerAckExtension": "10 minutes"
1616
"minRemainingDeadline": 0.1
1717
"progressTimeout": "10 seconds"
18-
"modackOnProgressTimeout": true
19-
"cancelOnProgressTimeout": false
20-
"consistentClientId": true
18+
"prefetchMin": 1
19+
"prefetchMax": 10
2120
}
2221
"output": {
2322
"bad": ${snowplow.defaults.sinks.pubsub}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
3+
*
4+
* This program is licensed to you under the Snowplow Community License Version 1.0,
5+
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
6+
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
7+
*/
8+
package com.snowplowanalytics.snowplow.sources.pubsub.v2
9+
10+
import cats.effect.{Async, Ref, Resource, Sync}
11+
import cats.effect.kernel.Unique
12+
import cats.implicits._
13+
import cats.effect.implicits._
14+
import org.typelevel.log4cats.Logger
15+
import org.typelevel.log4cats.slf4j.Slf4jLogger
16+
17+
import com.google.cloud.pubsub.v1.stub.SubscriberStub
18+
19+
private trait LeaseManager[F[_], A] {
20+
def manageLeases(in: A): F[Unique.Token]
21+
def stopManagingLeases(tokens: Vector[Unique.Token]): F[Unit]
22+
}
23+
24+
private object LeaseManager {
25+
26+
private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F]
27+
28+
def resource[F[_]: Async](
29+
config: PubsubSourceConfigV2,
30+
stub: SubscriberStub,
31+
ref: Ref[F, Map[Unique.Token, PubsubBatchState]],
32+
channelAffinity: Int
33+
): Resource[F, LeaseManager[F, SubscriberAction.ProcessRecords]] =
34+
extendDeadlinesInBackground[F](config, stub, ref, channelAffinity)
35+
.as(impl(config, ref, channelAffinity))
36+
37+
private def impl[F[_]: Sync](
38+
config: PubsubSourceConfigV2,
39+
ref: Ref[F, Map[Unique.Token, PubsubBatchState]],
40+
channelAffinity: Int
41+
): LeaseManager[F, SubscriberAction.ProcessRecords] = new LeaseManager[F, SubscriberAction.ProcessRecords] {
42+
43+
def manageLeases(in: SubscriberAction.ProcessRecords): F[Unique.Token] =
44+
Unique[F].unique.flatMap { token =>
45+
val deadline = in.timeReceived.plusMillis(config.durationPerAckExtension.toMillis)
46+
val ackIds = in.records.map(_.getAckId)
47+
val state = PubsubBatchState(deadline, ackIds, channelAffinity)
48+
ref.update(_ + (token -> state)).as(token)
49+
}
50+
51+
def stopManagingLeases(tokens: Vector[Unique.Token]): F[Unit] =
52+
ref.update(_.removedAll(tokens))
53+
}
54+
55+
private def extendDeadlinesInBackground[F[_]: Async](
56+
config: PubsubSourceConfigV2,
57+
stub: SubscriberStub,
58+
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
59+
channelAffinity: Int
60+
): Resource[F, Unit] = {
61+
def go: F[Unit] = for {
62+
now <- Sync[F].realTimeInstant
63+
minAllowedDeadline = now.plusMillis((config.minRemainingDeadline * config.durationPerAckExtension.toMillis).toLong)
64+
newDeadline = now.plusMillis(config.durationPerAckExtension.toMillis)
65+
toExtend <- refStates.modify { m =>
66+
val toExtend = m.filter { case (_, batchState) =>
67+
batchState.channelAffinity === channelAffinity && batchState.currentDeadline.isBefore(minAllowedDeadline)
68+
}
69+
val fixed = toExtend.view
70+
.mapValues(_.copy(currentDeadline = newDeadline))
71+
.toMap
72+
(m ++ fixed, toExtend.values.toVector)
73+
}
74+
_ <- if (toExtend.isEmpty)
75+
Sync[F].sleep(0.5 * config.minRemainingDeadline * config.durationPerAckExtension)
76+
else {
77+
val ackIds = toExtend.sortBy(_.currentDeadline).flatMap(_.ackIds.toVector)
78+
Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension, channelAffinity)
79+
}
80+
_ <- go
81+
} yield ()
82+
go.background.void
83+
}
84+
85+
}

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubBatchState.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
*/
88
package com.snowplowanalytics.snowplow.sources.pubsub.v2
99

10+
import cats.data.NonEmptyVector
11+
1012
import java.time.Instant
1113

1214
/**
@@ -23,6 +25,6 @@ import java.time.Instant
2325
*/
2426
private case class PubsubBatchState(
2527
currentDeadline: Instant,
26-
ackIds: Vector[String],
28+
ackIds: NonEmptyVector[String],
2729
channelAffinity: Int
2830
)

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubCheckpointer.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class PubsubCheckpointer[F[_]: Async](
6060
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
6161
grouped = ackDatas.groupBy(_.channelAffinity)
6262
_ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) =>
63-
ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds =>
63+
ackDatas.flatMap(_.ackIds.toVector).grouped(1000).toVector.traverse_ { ackIds =>
6464
val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build
6565
val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)
6666
val attempt = for {
@@ -88,7 +88,7 @@ class PubsubCheckpointer[F[_]: Async](
8888
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
8989
grouped = ackDatas.groupBy(_.channelAffinity)
9090
_ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) =>
91-
val ackIds = ackDatas.flatMap(_.ackIds)
91+
val ackIds = ackDatas.flatMap(_.ackIds.toVector)
9292
// A nack is just a modack with zero duration
9393
Utils.modAck[F](subscription, stub, ackIds, Duration.Zero, channelAffinity)
9494
}

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@ case class PubsubSourceConfigV2(
2525
gcpUserAgent: GcpUserAgent,
2626
maxPullsPerTransportChannel: Int,
2727
progressTimeout: FiniteDuration,
28-
modackOnProgressTimeout: Boolean,
29-
cancelOnProgressTimeout: Boolean,
30-
consistentClientId: Boolean
28+
prefetchMin: Int,
29+
prefetchMax: Int
3130
)
3231

3332
object PubsubSourceConfigV2 {

0 commit comments

Comments
 (0)