Skip to content

Commit

Permalink
Pubsub source v2 request zero messages when blocked
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 10, 2024
1 parent e8ea8ea commit d0685a7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ case class PubsubSourceConfigV2(
minRemainingDeadline: Double,
gcpUserAgent: GcpUserAgent,
maxPullsPerTransportChannel: Int,
progressTimeout: FiniteDuration,
modackOnProgressTimeout: Boolean,
cancelOnProgressTimeout: Boolean,
consistentClientId: Boolean
progressTimeout: FiniteDuration
)

object PubsubSourceConfigV2 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}

import scala.concurrent.duration.{Duration, DurationDouble, FiniteDuration}
import scala.concurrent.duration.{DurationDouble, FiniteDuration}
import scala.jdk.CollectionConverters._

import java.util.concurrent.{ExecutorService, Executors, LinkedBlockingQueue}
Expand Down Expand Up @@ -92,7 +92,7 @@ object PubsubSourceV2 {
(hotswap, _) <- Stream.resource(Hotswap(resource))
fs2Queue <- Stream.eval(Queue.synchronous[F, SubscriberAction])
_ <- extendDeadlines(config, stub, refStates, channelAffinity).spawn
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue, stub, channelAffinity)).repeat.spawn
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue)).repeat.spawn
lle <- Stream
.fromQueueUnterminated(fs2Queue)
.through(toLowLevelEvents(config, refStates, hotswap, resource, channelAffinity))
Expand All @@ -102,27 +102,12 @@ object PubsubSourceV2 {
private def queueToQueue[F[_]: Async](
config: PubsubSourceConfigV2,
jQueue: LinkedBlockingQueue[SubscriberAction],
fs2Queue: QueueSink[F, SubscriberAction],
stub: SubscriberStub,
channelAffinity: Int
fs2Queue: QueueSink[F, SubscriberAction]
): F[Unit] =
resolveNextAction(jQueue).flatMap {
case action @ SubscriberAction.ProcessRecords(records, controller, _) =>
val fallback = if (config.modackOnProgressTimeout) {
val ackIds = records.map(_.getAckId)
if (config.cancelOnProgressTimeout)
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
Sync[F].delay(controller.cancel()) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
else
Logger[F].debug(s"Nacking on Pubsub channel $channelAffinity for not making progress") *>
Sync[F].delay(controller.request(1)) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
} else {
if (config.cancelOnProgressTimeout)
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
Sync[F].delay(controller.cancel()) *> fs2Queue.offer(action)
else
fs2Queue.offer(action)
}
case action @ SubscriberAction.ProcessRecords(_, controller, _) =>
def fallback: F[Unit] =
Sync[F].delay(controller.request(0)) >> fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
case action: SubscriberAction.SubscriberError =>
fs2Queue.offer(action)
Expand Down Expand Up @@ -314,7 +299,7 @@ object PubsubSourceV2 {
val request = StreamingPullRequest.newBuilder
.setSubscription(config.subscription.show)
.setStreamAckDeadlineSeconds(config.durationPerAckExtension.toSeconds.toInt)
.setClientId(if (config.consistentClientId) clientId.toString else UUID.randomUUID.toString)
.setClientId(clientId.toString)
.setMaxOutstandingMessages(0)
.setMaxOutstandingBytes(0)
.build
Expand Down

0 comments on commit d0685a7

Please sign in to comment.