Skip to content

Commit 4c85ea4

Browse files
committed
Pubsub source v2 request zero messages when blocked
1 parent e8ea8ea commit 4c85ea4

File tree

2 files changed

+8
-26
lines changed

2 files changed

+8
-26
lines changed

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ case class PubsubSourceConfigV2(
2424
minRemainingDeadline: Double,
2525
gcpUserAgent: GcpUserAgent,
2626
maxPullsPerTransportChannel: Int,
27-
progressTimeout: FiniteDuration,
28-
modackOnProgressTimeout: Boolean,
29-
cancelOnProgressTimeout: Boolean,
30-
consistentClientId: Boolean
27+
progressTimeout: FiniteDuration
3128
)
3229

3330
object PubsubSourceConfigV2 {

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala

+7-22
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
3434
import com.snowplowanalytics.snowplow.sources.SourceAndAck
3535
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
3636

37-
import scala.concurrent.duration.{Duration, DurationDouble, FiniteDuration}
37+
import scala.concurrent.duration.{DurationDouble, FiniteDuration}
3838
import scala.jdk.CollectionConverters._
3939

4040
import java.util.concurrent.{ExecutorService, Executors, LinkedBlockingQueue}
@@ -92,7 +92,7 @@ object PubsubSourceV2 {
9292
(hotswap, _) <- Stream.resource(Hotswap(resource))
9393
fs2Queue <- Stream.eval(Queue.synchronous[F, SubscriberAction])
9494
_ <- extendDeadlines(config, stub, refStates, channelAffinity).spawn
95-
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue, stub, channelAffinity)).repeat.spawn
95+
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue)).repeat.spawn
9696
lle <- Stream
9797
.fromQueueUnterminated(fs2Queue)
9898
.through(toLowLevelEvents(config, refStates, hotswap, resource, channelAffinity))
@@ -102,27 +102,12 @@ object PubsubSourceV2 {
102102
private def queueToQueue[F[_]: Async](
103103
config: PubsubSourceConfigV2,
104104
jQueue: LinkedBlockingQueue[SubscriberAction],
105-
fs2Queue: QueueSink[F, SubscriberAction],
106-
stub: SubscriberStub,
107-
channelAffinity: Int
105+
fs2Queue: QueueSink[F, SubscriberAction]
108106
): F[Unit] =
109107
resolveNextAction(jQueue).flatMap {
110-
case action @ SubscriberAction.ProcessRecords(records, controller, _) =>
111-
val fallback = if (config.modackOnProgressTimeout) {
112-
val ackIds = records.map(_.getAckId)
113-
if (config.cancelOnProgressTimeout)
114-
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
115-
Sync[F].delay(controller.cancel()) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
116-
else
117-
Logger[F].debug(s"Nacking on Pubsub channel $channelAffinity for not making progress") *>
118-
Sync[F].delay(controller.request(1)) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
119-
} else {
120-
if (config.cancelOnProgressTimeout)
121-
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
122-
Sync[F].delay(controller.cancel()) *> fs2Queue.offer(action)
123-
else
124-
fs2Queue.offer(action)
125-
}
108+
case action @ SubscriberAction.ProcessRecords(_, controller, _) =>
109+
def fallback: F[Unit] =
110+
Sync[F].delay(controller.request(0)) *> fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
126111
fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
127112
case action: SubscriberAction.SubscriberError =>
128113
fs2Queue.offer(action)
@@ -314,7 +299,7 @@ object PubsubSourceV2 {
314299
val request = StreamingPullRequest.newBuilder
315300
.setSubscription(config.subscription.show)
316301
.setStreamAckDeadlineSeconds(config.durationPerAckExtension.toSeconds.toInt)
317-
.setClientId(if (config.consistentClientId) clientId.toString else UUID.randomUUID.toString)
302+
.setClientId(clientId.toString)
318303
.setMaxOutstandingMessages(0)
319304
.setMaxOutstandingBytes(0)
320305
.build

0 commit comments

Comments
 (0)