Skip to content

Commit ad1b4ea

Browse files
committed
Pubsub source v2 re-sends streaming request when blocked
1 parent e8ea8ea commit ad1b4ea

File tree

2 files changed

+27
-31
lines changed

2 files changed

+27
-31
lines changed

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ case class PubsubSourceConfigV2(
2424
minRemainingDeadline: Double,
2525
gcpUserAgent: GcpUserAgent,
2626
maxPullsPerTransportChannel: Int,
27-
progressTimeout: FiniteDuration,
28-
modackOnProgressTimeout: Boolean,
29-
cancelOnProgressTimeout: Boolean,
30-
consistentClientId: Boolean
27+
progressTimeout: FiniteDuration
3128
)
3229

3330
object PubsubSourceConfigV2 {

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala

+26-27
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
3434
import com.snowplowanalytics.snowplow.sources.SourceAndAck
3535
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
3636

37-
import scala.concurrent.duration.{Duration, DurationDouble, FiniteDuration}
37+
import scala.concurrent.duration.{DurationDouble, FiniteDuration}
3838
import scala.jdk.CollectionConverters._
3939

4040
import java.util.concurrent.{ExecutorService, Executors, LinkedBlockingQueue}
@@ -92,7 +92,7 @@ object PubsubSourceV2 {
9292
(hotswap, _) <- Stream.resource(Hotswap(resource))
9393
fs2Queue <- Stream.eval(Queue.synchronous[F, SubscriberAction])
9494
_ <- extendDeadlines(config, stub, refStates, channelAffinity).spawn
95-
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue, stub, channelAffinity)).repeat.spawn
95+
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue, channelAffinity, hotswap)).repeat.spawn
9696
lle <- Stream
9797
.fromQueueUnterminated(fs2Queue)
9898
.through(toLowLevelEvents(config, refStates, hotswap, resource, channelAffinity))
@@ -103,26 +103,18 @@ object PubsubSourceV2 {
103103
config: PubsubSourceConfigV2,
104104
jQueue: LinkedBlockingQueue[SubscriberAction],
105105
fs2Queue: QueueSink[F, SubscriberAction],
106-
stub: SubscriberStub,
107-
channelAffinity: Int
106+
channelAffinity: Int,
107+
hotswap: Hotswap[F, KeepAlive[F]]
108108
): F[Unit] =
109109
resolveNextAction(jQueue).flatMap {
110-
case action @ SubscriberAction.ProcessRecords(records, controller, _) =>
111-
val fallback = if (config.modackOnProgressTimeout) {
112-
val ackIds = records.map(_.getAckId)
113-
if (config.cancelOnProgressTimeout)
114-
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
115-
Sync[F].delay(controller.cancel()) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
116-
else
117-
Logger[F].debug(s"Nacking on Pubsub channel $channelAffinity for not making progress") *>
118-
Sync[F].delay(controller.request(1)) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
119-
} else {
120-
if (config.cancelOnProgressTimeout)
121-
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
122-
Sync[F].delay(controller.cancel()) *> fs2Queue.offer(action)
123-
else
124-
fs2Queue.offer(action)
125-
}
110+
case action @ SubscriberAction.ProcessRecords(_, _, _) =>
111+
def fallback: F[Unit] =
112+
hotswap.get.use {
113+
case Some(keepAlive) =>
114+
Logger[F].debug(s"Sending keepalie for channel $channelAffinity") *>
115+
keepAlive.keepAlive
116+
case None => Sync[F].unit
117+
} >> fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
126118
fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
127119
case action: SubscriberAction.SubscriberError =>
128120
fs2Queue.offer(action)
@@ -194,8 +186,8 @@ object PubsubSourceV2 {
194186
private def toLowLevelEvents[F[_]: Async](
195187
config: PubsubSourceConfigV2,
196188
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
197-
hotswap: Hotswap[F, Unit],
198-
toSwap: Resource[F, Unit],
189+
hotswap: Hotswap[F, KeepAlive[F]],
190+
toSwap: Resource[F, KeepAlive[F]],
199191
channelAffinity: Int
200192
): Pipe[F, SubscriberAction, LowLevelEvents[Vector[Unique.Token]]] =
201193
_.flatMap {
@@ -282,7 +274,7 @@ object PubsubSourceV2 {
282274
actionQueue: LinkedBlockingQueue[SubscriberAction],
283275
channelAffinity: Int,
284276
clientId: UUID
285-
): Resource[F, Unit] = {
277+
): Resource[F, KeepAlive[F]] = {
286278

287279
val observer = new ResponseObserver[StreamingPullResponse] {
288280
var controller: StreamController = _
@@ -314,7 +306,7 @@ object PubsubSourceV2 {
314306
val request = StreamingPullRequest.newBuilder
315307
.setSubscription(config.subscription.show)
316308
.setStreamAckDeadlineSeconds(config.durationPerAckExtension.toSeconds.toInt)
317-
.setClientId(if (config.consistentClientId) clientId.toString else UUID.randomUUID.toString)
309+
.setClientId(clientId.toString)
318310
.setMaxOutstandingMessages(0)
319311
.setMaxOutstandingBytes(0)
320312
.build
@@ -323,11 +315,18 @@ object PubsubSourceV2 {
323315
.make(Sync[F].delay(subStub.streamingPullCallable.splitCall(observer, context))) { stream =>
324316
Sync[F].delay(stream.closeSendWithError(Status.CANCELLED.asException))
325317
}
326-
.evalMap { stream =>
327-
Sync[F].delay(stream.send(request))
318+
.map { stream =>
319+
new KeepAlive[F] {
320+
def keepAlive: F[Unit] =
321+
Sync[F].delay(stream.send(request))
322+
}
328323
}
329-
.void
324+
.evalTap(_.keepAlive)
325+
326+
}
330327

328+
trait KeepAlive[F[_]] {
329+
def keepAlive: F[Unit]
331330
}
332331

333332
private def executorResource[F[_]: Sync, E <: ExecutorService](make: F[E]): Resource[F, E] =

0 commit comments

Comments
 (0)