Skip to content

Commit

Permalink
Pubsub source v2 uses non-streaming pull
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 10, 2024
1 parent e8ea8ea commit 03fad04
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 176 deletions.
4 changes: 0 additions & 4 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
// V2 defaults
"durationPerAckExtension": "10 minutes"
"minRemainingDeadline": 0.1
"progressTimeout": "10 seconds"
"modackOnProgressTimeout": true
"cancelOnProgressTimeout": false
"consistentClientId": true
}
"output": {
"bad": ${snowplow.defaults.sinks.pubsub}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.effect.Async
import cats.implicits._
import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures}
import com.google.common.util.concurrent.MoreExecutors

object FutureInterop {
def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[A] =
Async[F]
.async[A] { cb =>
val cancel = Async[F].delay {
fut.cancel(false)
}.void
Async[F].delay {
addCallback(fut, cb)
Some(cancel)
}
}

def fromFuture_[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] =
fromFuture(fut).void

private def addCallback[A](fut: ApiFuture[A], cb: Either[Throwable, A] => Unit): Unit = {
val apiFutureCallback = new ApiFutureCallback[A] {
def onFailure(t: Throwable): Unit = cb(Left(t))
def onSuccess(result: A): Unit = cb(Right(result))
}
ApiFutures.addCallback(fut, apiFutureCallback, MoreExecutors.directExecutor)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ case class PubsubSourceConfigV2(
durationPerAckExtension: FiniteDuration,
minRemainingDeadline: Double,
gcpUserAgent: GcpUserAgent,
maxPullsPerTransportChannel: Int,
progressTimeout: FiniteDuration,
modackOnProgressTimeout: Boolean,
cancelOnProgressTimeout: Boolean,
consistentClientId: Boolean
maxPullsPerTransportChannel: Int
)

object PubsubSourceConfigV2 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.effect.{Async, Deferred, Ref, Resource, Sync}
import cats.effect.std.{Hotswap, Queue, QueueSink}
import cats.effect.kernel.Unique
import cats.effect.implicits._
import cats.implicits._
import fs2.{Chunk, Pipe, Stream}
import org.typelevel.log4cats.Logger
Expand All @@ -21,24 +19,22 @@ import java.time.Instant
// pubsub
import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider}
import com.google.api.gax.grpc.{ChannelPoolSettings, GrpcCallContext}
import com.google.api.gax.rpc.{ResponseObserver, StreamController}
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings
import com.google.pubsub.v1.{StreamingPullRequest, StreamingPullResponse}
import com.google.pubsub.v1.{PullRequest, PullResponse}
import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStub}
import io.grpc.Status
import org.threeten.bp.{Duration => ThreetenDuration}

// snowplow
import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._

import scala.concurrent.duration.{Duration, DurationDouble, FiniteDuration}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

import java.util.concurrent.{ExecutorService, Executors, LinkedBlockingQueue}
import java.util.UUID
import java.util.concurrent.{ExecutorService, Executors}

object PubsubSourceV2 {

Expand Down Expand Up @@ -83,49 +79,50 @@ object PubsubSourceV2 {
stub: SubscriberStub,
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
channelAffinity: Int
): Stream[F, LowLevelEvents[Vector[Unique.Token]]] = {
val jQueue = new LinkedBlockingQueue[SubscriberAction]()
val clientId = UUID.randomUUID
val resource = initializeStreamingPull[F](config, stub, jQueue, channelAffinity, clientId)
): Stream[F, LowLevelEvents[Vector[Unique.Token]]] =
Stream
.eval[F, PullResponse](pullFromSubscription(config, stub, channelAffinity))
.filter(_.getReceivedMessagesCount > 0)
.through(addToRefStates(config, stub, refStates, channelAffinity))
.repeat
.prefetch
.concurrently(extendDeadlines(config, stub, refStates, channelAffinity))

for {
(hotswap, _) <- Stream.resource(Hotswap(resource))
fs2Queue <- Stream.eval(Queue.synchronous[F, SubscriberAction])
_ <- extendDeadlines(config, stub, refStates, channelAffinity).spawn
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue, stub, channelAffinity)).repeat.spawn
lle <- Stream
.fromQueueUnterminated(fs2Queue)
.through(toLowLevelEvents(config, refStates, hotswap, resource, channelAffinity))
} yield lle
private def pullFromSubscription[F[_]: Async](
config: PubsubSourceConfigV2,
stub: SubscriberStub,
channelAffinity: Int
): F[PullResponse] = {
val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)
val request = PullRequest.newBuilder
.setSubscription(config.subscription.show)
.build
val io = for {
apiFuture <- Sync[F].delay(stub.pullCallable.futureCall(request, context))
res <- FutureInterop.fromFuture[F, PullResponse](apiFuture)
} yield res
io.retryingOnTransientGrpcFailures
}

private def queueToQueue[F[_]: Async](
private def addToRefStates[F[_]: Async](
config: PubsubSourceConfigV2,
jQueue: LinkedBlockingQueue[SubscriberAction],
fs2Queue: QueueSink[F, SubscriberAction],
stub: SubscriberStub,
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
channelAffinity: Int
): F[Unit] =
resolveNextAction(jQueue).flatMap {
case action @ SubscriberAction.ProcessRecords(records, controller, _) =>
val fallback = if (config.modackOnProgressTimeout) {
val ackIds = records.map(_.getAckId)
if (config.cancelOnProgressTimeout)
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
Sync[F].delay(controller.cancel()) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
else
Logger[F].debug(s"Nacking on Pubsub channel $channelAffinity for not making progress") *>
Sync[F].delay(controller.request(1)) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
} else {
if (config.cancelOnProgressTimeout)
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
Sync[F].delay(controller.cancel()) *> fs2Queue.offer(action)
else
fs2Queue.offer(action)
}
fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
case action: SubscriberAction.SubscriberError =>
fs2Queue.offer(action)
): Pipe[F, PullResponse, LowLevelEvents[Vector[Unique.Token]]] =
_.evalMap { response =>
val records = response.getReceivedMessagesList.asScala.toVector
val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer()))
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
val ackIds = records.map(_.getAckId)
for {
timeReceived <- Sync[F].realTimeInstant
_ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension, channelAffinity)
token <- Unique[F].unique
currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis)
_ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds, channelAffinity)))
} yield LowLevelEvents(chunk, Vector(token), Some(Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)))
}

/**
Expand Down Expand Up @@ -174,72 +171,6 @@ object PubsubSourceV2 {
.repeat
.drain

/**
* Pipe from SubscriberAction to LowLevelEvents TODO: Say what else this does
*
* @param config
* The source configuration
* @param refStates
* A map from tokens to the data held about a batch of messages received from pubsub. This
* function must update the state to add new batches.
* @param hotswap
* A Hotswap wrapping the Resource that is populating the queue
* @param toSwap
* Initializes the Resource which is populating the queue. If we get an error from the queue
* then need to swap in the new Resource into the Hotswap
* @param channelAffinity
* Identifies the GRPC channel (TCP connection) creating these Actions. Each GRPC channel has
* its own queue, observer, and puller.
*/
private def toLowLevelEvents[F[_]: Async](
config: PubsubSourceConfigV2,
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
hotswap: Hotswap[F, Unit],
toSwap: Resource[F, Unit],
channelAffinity: Int
): Pipe[F, SubscriberAction, LowLevelEvents[Vector[Unique.Token]]] =
_.flatMap {
case SubscriberAction.ProcessRecords(records, controller, timeReceived) =>
val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer()))
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
val ackIds = records.map(_.getAckId)
Stream.eval {
for {
token <- Unique[F].unique
currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis)
_ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds, channelAffinity)))
_ <- Sync[F].delay(controller.request(1))
} yield LowLevelEvents(chunk, Vector(token), Some(Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)))
}
case SubscriberAction.SubscriberError(t) =>
if (PubsubRetryOps.isRetryableException(t)) {
// val nextDelay = (2 * delayOnSubscriberError).min((10 + scala.util.Random.nextDouble()).second)
// Log at debug level because retryable errors are very frequent.
// In particular, if the pubsub subscription is empty then a streaming pull returns UNAVAILABLE
Stream.eval {
Logger[F].debug(s"Retryable error on PubSub channel $channelAffinity: ${t.getMessage}") *>
hotswap.clear *>
Async[F].sleep((1.0 + scala.util.Random.nextDouble()).second) *> // TODO expotential backoff
hotswap.swap(toSwap)
}.drain
} else if (t.isInstanceOf[java.util.concurrent.CancellationException]) {
Stream.eval {
Logger[F].debug("Cancellation exception on PubSub channel") *>
hotswap.clear *>
hotswap.swap(toSwap)
}.drain
} else {
Stream.eval(Logger[F].error(t)("Exception from PubSub source")) *> Stream.raiseError[F](t)
}
}

private def resolveNextAction[F[_]: Sync, A](queue: LinkedBlockingQueue[A]): F[A] =
Sync[F].delay(Option[A](queue.poll)).flatMap {
case Some(action) => Sync[F].pure(action)
case None => Sync[F].interruptible(queue.take)
}

private def stubResource[F[_]: Async](
config: PubsubSourceConfigV2,
channelCount: Int
Expand Down Expand Up @@ -276,60 +207,6 @@ object PubsubSourceV2 {
Resource.make(Sync[F].delay(GrpcSubscriberStub.create(stubSettings)))(stub => Sync[F].blocking(stub.shutdownNow))
}

private def initializeStreamingPull[F[_]: Sync](
config: PubsubSourceConfigV2,
subStub: SubscriberStub,
actionQueue: LinkedBlockingQueue[SubscriberAction],
channelAffinity: Int,
clientId: UUID
): Resource[F, Unit] = {

val observer = new ResponseObserver[StreamingPullResponse] {
var controller: StreamController = _
override def onResponse(response: StreamingPullResponse): Unit = {
val messages = response.getReceivedMessagesList.asScala.toVector
if (messages.isEmpty) {
controller.request(1)
} else {
val action = SubscriberAction.ProcessRecords(messages, controller, Instant.now())
actionQueue.put(action)
}
}

override def onStart(c: StreamController): Unit = {
controller = c
controller.disableAutoInboundFlowControl()
controller.request(1)
}

override def onError(t: Throwable): Unit =
actionQueue.put(SubscriberAction.SubscriberError(t))

override def onComplete(): Unit = ()

}

val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)

val request = StreamingPullRequest.newBuilder
.setSubscription(config.subscription.show)
.setStreamAckDeadlineSeconds(config.durationPerAckExtension.toSeconds.toInt)
.setClientId(if (config.consistentClientId) clientId.toString else UUID.randomUUID.toString)
.setMaxOutstandingMessages(0)
.setMaxOutstandingBytes(0)
.build

Resource
.make(Sync[F].delay(subStub.streamingPullCallable.splitCall(observer, context))) { stream =>
Sync[F].delay(stream.closeSendWithError(Status.CANCELLED.asException))
}
.evalMap { stream =>
Sync[F].delay(stream.send(request))
}
.void

}

private def executorResource[F[_]: Sync, E <: ExecutorService](make: F[E]): Resource[F, E] =
Resource.make(make)(es => Sync[F].blocking(es.shutdown()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.typelevel.log4cats.Logger
import com.google.api.gax.grpc.GrpcCallContext
import com.google.cloud.pubsub.v1.stub.SubscriberStub
import com.google.pubsub.v1.ModifyAckDeadlineRequest
import com.snowplowanalytics.snowplow.pubsub.FutureInterop
import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._

import scala.concurrent.duration.FiniteDuration
Expand All @@ -38,7 +37,7 @@ private object Utils {
val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)
val io = for {
apiFuture <- Sync[F].delay(stub.modifyAckDeadlineCallable.futureCall(request, context))
_ <- FutureInterop.fromFuture(apiFuture)
_ <- FutureInterop.fromFuture_(apiFuture)
} yield ()

io.retryingOnTransientGrpcFailures
Expand Down

0 comments on commit 03fad04

Please sign in to comment.