Skip to content

Commit ab00001

Browse files
committed
Pubsub source v2 re-sends streaming request when blocked
1 parent e8ea8ea commit ab00001

File tree

3 files changed

+33
-47
lines changed

3 files changed

+33
-47
lines changed

.github/workflows/ci.yml

+3-15
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,9 @@ on:
66
- 'tbr'
77

88
jobs:
9-
test:
10-
runs-on: ubuntu-22.04
11-
steps:
12-
- uses: actions/checkout@v2
13-
- uses: coursier/cache-action@v6
14-
- name: Set up JDK 11
15-
uses: actions/setup-java@v1
16-
with:
17-
java-version: 11
18-
- name: Check Scala formatting
19-
run: sbt scalafmtCheckAll scalafmtSbtCheck
20-
- name: Run tests
21-
run: sbt test
22-
239
publish_docker:
2410
needs: test
25-
runs-on: ubuntu-22.04
11+
runs-on: ubuntu-latest
2612
strategy:
2713
matrix:
2814
sbtProject:
@@ -34,6 +20,8 @@ jobs:
3420
dockerSuffix: gcp
3521
dockerTagSuffix: ""
3622
steps:
23+
- name: Install sbt
24+
uses: sbt/setup-sbt@v1
3725
- name: Checkout Github
3826
uses: actions/checkout@v2
3927
- uses: coursier/cache-action@v6

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ case class PubsubSourceConfigV2(
2424
minRemainingDeadline: Double,
2525
gcpUserAgent: GcpUserAgent,
2626
maxPullsPerTransportChannel: Int,
27-
progressTimeout: FiniteDuration,
28-
modackOnProgressTimeout: Boolean,
29-
cancelOnProgressTimeout: Boolean,
30-
consistentClientId: Boolean
27+
progressTimeout: FiniteDuration
3128
)
3229

3330
object PubsubSourceConfigV2 {

modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala

+29-28
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
3434
import com.snowplowanalytics.snowplow.sources.SourceAndAck
3535
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
3636

37-
import scala.concurrent.duration.{Duration, DurationDouble, FiniteDuration}
37+
import scala.concurrent.duration.{DurationDouble, FiniteDuration}
3838
import scala.jdk.CollectionConverters._
3939

4040
import java.util.concurrent.{ExecutorService, Executors, LinkedBlockingQueue}
@@ -92,7 +92,7 @@ object PubsubSourceV2 {
9292
(hotswap, _) <- Stream.resource(Hotswap(resource))
9393
fs2Queue <- Stream.eval(Queue.synchronous[F, SubscriberAction])
9494
_ <- extendDeadlines(config, stub, refStates, channelAffinity).spawn
95-
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue, stub, channelAffinity)).repeat.spawn
95+
_ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue, channelAffinity, hotswap)).repeat.spawn
9696
lle <- Stream
9797
.fromQueueUnterminated(fs2Queue)
9898
.through(toLowLevelEvents(config, refStates, hotswap, resource, channelAffinity))
@@ -103,26 +103,18 @@ object PubsubSourceV2 {
103103
config: PubsubSourceConfigV2,
104104
jQueue: LinkedBlockingQueue[SubscriberAction],
105105
fs2Queue: QueueSink[F, SubscriberAction],
106-
stub: SubscriberStub,
107-
channelAffinity: Int
106+
channelAffinity: Int,
107+
hotswap: Hotswap[F, KeepAlive[F]]
108108
): F[Unit] =
109109
resolveNextAction(jQueue).flatMap {
110-
case action @ SubscriberAction.ProcessRecords(records, controller, _) =>
111-
val fallback = if (config.modackOnProgressTimeout) {
112-
val ackIds = records.map(_.getAckId)
113-
if (config.cancelOnProgressTimeout)
114-
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
115-
Sync[F].delay(controller.cancel()) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
116-
else
117-
Logger[F].debug(s"Nacking on Pubsub channel $channelAffinity for not making progress") *>
118-
Sync[F].delay(controller.request(1)) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity)
119-
} else {
120-
if (config.cancelOnProgressTimeout)
121-
Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *>
122-
Sync[F].delay(controller.cancel()) *> fs2Queue.offer(action)
123-
else
124-
fs2Queue.offer(action)
125-
}
110+
case action @ SubscriberAction.ProcessRecords(_, _, _) =>
111+
def fallback: F[Unit] =
112+
hotswap.get.use {
113+
case Some(keepAlive) =>
114+
Logger[F].debug(s"Sending keepalie for channel $channelAffinity") *>
115+
keepAlive.keepAlive
116+
case None => Sync[F].unit
117+
} >> fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
126118
fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback)
127119
case action: SubscriberAction.SubscriberError =>
128120
fs2Queue.offer(action)
@@ -194,8 +186,8 @@ object PubsubSourceV2 {
194186
private def toLowLevelEvents[F[_]: Async](
195187
config: PubsubSourceConfigV2,
196188
refStates: Ref[F, Map[Unique.Token, PubsubBatchState]],
197-
hotswap: Hotswap[F, Unit],
198-
toSwap: Resource[F, Unit],
189+
hotswap: Hotswap[F, KeepAlive[F]],
190+
toSwap: Resource[F, KeepAlive[F]],
199191
channelAffinity: Int
200192
): Pipe[F, SubscriberAction, LowLevelEvents[Vector[Unique.Token]]] =
201193
_.flatMap {
@@ -282,7 +274,7 @@ object PubsubSourceV2 {
282274
actionQueue: LinkedBlockingQueue[SubscriberAction],
283275
channelAffinity: Int,
284276
clientId: UUID
285-
): Resource[F, Unit] = {
277+
): Resource[F, KeepAlive[F]] = {
286278

287279
val observer = new ResponseObserver[StreamingPullResponse] {
288280
var controller: StreamController = _
@@ -311,25 +303,34 @@ object PubsubSourceV2 {
311303

312304
val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)
313305

314-
val request = StreamingPullRequest.newBuilder
306+
val openRequest = StreamingPullRequest.newBuilder
315307
.setSubscription(config.subscription.show)
316308
.setStreamAckDeadlineSeconds(config.durationPerAckExtension.toSeconds.toInt)
317-
.setClientId(if (config.consistentClientId) clientId.toString else UUID.randomUUID.toString)
309+
.setClientId(clientId.toString)
318310
.setMaxOutstandingMessages(0)
319311
.setMaxOutstandingBytes(0)
320312
.build
321313

314+
val keepAliveRequest = openRequest.toBuilder.clearSubscription.build
315+
322316
Resource
323317
.make(Sync[F].delay(subStub.streamingPullCallable.splitCall(observer, context))) { stream =>
324318
Sync[F].delay(stream.closeSendWithError(Status.CANCELLED.asException))
325319
}
326-
.evalMap { stream =>
327-
Sync[F].delay(stream.send(request))
320+
.evalTap(stream => Sync[F].delay(stream.send(openRequest)))
321+
.map { stream =>
322+
new KeepAlive[F] {
323+
def keepAlive: F[Unit] =
324+
Sync[F].delay(stream.send(keepAliveRequest))
325+
}
328326
}
329-
.void
330327

331328
}
332329

330+
trait KeepAlive[F[_]] {
331+
def keepAlive: F[Unit]
332+
}
333+
333334
private def executorResource[F[_]: Sync, E <: ExecutorService](make: F[E]): Resource[F, E] =
334335
Resource.make(make)(es => Sync[F].blocking(es.shutdown()))
335336

0 commit comments

Comments
 (0)