Skip to content

Commit

Permalink
Alternative implementation of a pubsub source
Browse files Browse the repository at this point in the history
The pubsub Source from common-streams is a wrapper around `Subscriber`
provided by the 3rd-party pubsub sdk. That `Subscriber` is a wrapper
around a lower-level GRPC stub.

This commit adds an alternative Source which directly wraps the GRPC
stub, not the higher-level Subscriber.

Compared with the previous Source implementation it has these
differences in behaviour:

- In the V1 source, ack extension periods were adjusted dynamically
  according to runtime heuristics of message processing times. In the V2
  source, the ack extension period is a fixed configurable period.
- The V1 source made a modack request (extending ack deadline)
  immediately after receiving any message.  Whereas the V2 source does
  not modack a message unless its deadline is about the expire.
- The V1 source periodically modacks all unacked messages currently held
  in memory. This is a problem for e.g. the Lake Loader which can have a
  very large number of unacked messages at any one time.  The V2 source
  only modacks messages when they are approaching their ack deadline.
- The V2 source uses a smaller thread pool for GRPC callbacks. The V1
  source needed a very large thread pool to avoid deadlocks in setups
  that opened a large number of streaming pulls.
- V2 opens PullRequests with pubsub, whereas V1 opened
  StreamingPullRequests

If this experimental V2 Source is successful, it is likely to be the
replacement of the V1 Source in a future release of common-streams.
  • Loading branch information
istreeter committed Oct 10, 2024
1 parent dc7e9a7 commit 7612dc7
Show file tree
Hide file tree
Showing 12 changed files with 629 additions and 5 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- uses: coursier/cache-action@v6
Expand All @@ -24,7 +24,7 @@ jobs:
publish_docker:
needs: test
if: github.ref_type == 'tag'
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
strategy:
matrix:
sbtProject:
Expand Down
5 changes: 5 additions & 0 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
"input": ${snowplow.defaults.sources.pubsub}
"input": {
"gcpUserAgent": ${gcpUserAgent}

// V2 defaults
"durationPerAckExtension": "10 minutes"
"minRemainingDeadline": 0.1
"maxMessagesPerPull": 1000
}
"output": {
"bad": ${snowplow.defaults.sinks.pubsub}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@

package com.snowplowanalytics.snowplow.lakes

import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceConfig}
import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceAlternative}
import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubSourceV2
import com.snowplowanalytics.snowplow.sinks.pubsub.{PubsubSink, PubsubSinkConfig}

object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo) {
object GcpApp extends LoaderApp[PubsubSourceAlternative, PubsubSinkConfig](BuildInfo) {

override def source: SourceProvider = PubsubSource.build(_)
override def source: SourceProvider = {
case PubsubSourceAlternative.V1(c) => PubsubSource.build(c)
case PubsubSourceAlternative.V2(c) => PubsubSourceV2.build(c)
}

override def badSink: SinkProvider = PubsubSink.resource(_)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub

import io.circe.Decoder
import cats.implicits._

import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubSourceConfigV2

/**
* Allows experimental support for the V2 source, while loading the V1 source by default
*
* Users can select the v2 Source by setting `"version": "v2"` in the hocon file
*/
sealed trait PubsubSourceAlternative

object PubsubSourceAlternative {
case class V1(config: PubsubSourceConfig) extends PubsubSourceAlternative
case class V2(config: PubsubSourceConfigV2) extends PubsubSourceAlternative

implicit def decoder: Decoder[PubsubSourceAlternative] = Decoder.decodeJsonObject.flatMap {
case obj if obj("version").flatMap(_.asString) === Some("v2") =>
implicitly[Decoder[PubsubSourceConfigV2]].map(V2(_))
case _ =>
implicitly[Decoder[PubsubSourceConfig]].map(V1(_))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### common-streams-extensions

Code in this directory is destined to be migrated to common-streams.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.effect.Async
import cats.implicits._
import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures}
import com.google.common.util.concurrent.MoreExecutors

object FutureInterop {
def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[A] =
Async[F]
.async[A] { cb =>
val cancel = Async[F].delay {
fut.cancel(false)
}.void
Async[F].delay {
addCallback(fut, cb)
Some(cancel)
}
}

def fromFuture_[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] =
fromFuture(fut).void

private def addCallback[A](fut: ApiFuture[A], cb: Either[Throwable, A] => Unit): Unit = {
val apiFutureCallback = new ApiFutureCallback[A] {
def onFailure(t: Throwable): Unit = cb(Left(t))
def onSuccess(result: A): Unit = cb(Right(result))
}
ApiFutures.addCallback(fut, apiFutureCallback, MoreExecutors.directExecutor)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import java.time.Instant

/**
* Data held about a batch of messages pulled from a pubsub subscription
*
* @param currentDeadline
* The deadline before which we must either ack, nack, or extend the deadline to something further
* in the future. This is updated over time if we approach a deadline.
* @param ackIds
* The IDs which are needed to ack all messages in the batch
* @param channelAffinity
* Corresponds to the GRPC channel (TCP connection) on which this batch was pulled. We ack and
* modack on the same channel from where the messages came.
*/
private case class PubsubBatchState(
currentDeadline: Instant,
ackIds: Vector[String],
channelAffinity: Int
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.effect.implicits._
import cats.implicits._
import cats.effect.kernel.Unique
import cats.effect.{Async, Deferred, Ref, Sync}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
import com.google.pubsub.v1.AcknowledgeRequest
import com.google.api.gax.grpc.GrpcCallContext
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.jdk.CollectionConverters._
import scala.concurrent.duration.Duration

import com.snowplowanalytics.snowplow.sources.internal.Checkpointer
import com.snowplowanalytics.snowplow.pubsub.FutureInterop
import com.snowplowanalytics.snowplow.sources.pubsub.v2.PubsubRetryOps.implicits._

/**
* The Pubsub checkpointer
*
* @param subscription
* Pubsub subscription name
* @param deferredResources
* Resources needed so we can ack/nack messages. This is wrapped in `Deferred` because the
* resources are not available until the application calls `.stream` on the `LowLevelSource`. This
* is a limitation in the design of the common-streams Source interface.
*/
class PubsubCheckpointer[F[_]: Async](
subscription: PubsubSourceConfigV2.Subscription,
deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]]
) extends Checkpointer[F, Vector[Unique.Token]] {

import PubsubCheckpointer._

private implicit def logger: Logger[F] = Slf4jLogger.getLogger[F]

override def combine(x: Vector[Unique.Token], y: Vector[Unique.Token]): Vector[Unique.Token] =
x |+| y

override val empty: Vector[Unique.Token] = Vector.empty

/**
* Ack some batches of messages received from pubsub
*
* @param c
* tokens which are keys to batch data held in the shared state
*/
override def ack(c: Vector[Unique.Token]): F[Unit] =
for {
Resources(stub, refAckIds) <- deferredResources.get
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
grouped = ackDatas.groupBy(_.channelAffinity)
_ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) =>
ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds =>
val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build
val context = GrpcCallContext.createDefault.withChannelAffinity(channelAffinity)
val attempt = for {
apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request, context))
_ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture)
} yield ()
attempt.retryingOnTransientGrpcFailures
.recoveringOnGrpcInvalidArgument { s =>
// This can happen if ack IDs have expired before we acked
Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}")
}
}
}
} yield ()

/**
* Nack some batches of messages received from pubsub
*
* @param c
* tokens which are keys to batch data held in the shared state
*/
override def nack(c: Vector[Unique.Token]): F[Unit] =
for {
Resources(stub, refAckIds) <- deferredResources.get
ackDatas <- refAckIds.modify(m => (m.removedAll(c), c.flatMap(m.get)))
grouped = ackDatas.groupBy(_.channelAffinity)
_ <- grouped.toVector.parTraverse_ { case (channelAffinity, ackDatas) =>
val ackIds = ackDatas.flatMap(_.ackIds)
// A nack is just a modack with zero duration
Utils.modAck[F](subscription, stub, ackIds, Duration.Zero, channelAffinity)
}
} yield ()
}

private object PubsubCheckpointer {

/**
* Resources needed by `PubsubCheckpointer` so it can ack/nack messages
*
* @param stub
* The GRPC stub needed to execute the ack/nack RPCs
* @param refState
* A map from tokens to the data held about a batch of messages received from pubsub. The map is
* wrapped in a `Ref` because it is concurrently modified by the source adding new batches to
* the state.
*/
case class Resources[F[_]](stub: SubscriberStub, refState: Ref[F, Map[Unique.Token, PubsubBatchState]])

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.implicits._
import cats.effect.Async
import com.google.api.gax.rpc.{ApiException, StatusCode}
import io.grpc.Status
import org.typelevel.log4cats.Logger
import retry.RetryPolicies
import retry.implicits._

import scala.concurrent.duration.DurationDouble

private object PubsubRetryOps {

object implicits {
implicit class Ops[F[_], A](val f: F[A]) extends AnyVal {

def retryingOnTransientGrpcFailures(implicit F: Async[F], L: Logger[F]): F[A] =
f.retryingOnSomeErrors(
isWorthRetrying = { e => isRetryableException(e).pure[F] },
policy = RetryPolicies.fullJitter(1.second),
onError = { case (t, _) =>
Logger[F].info(t)(s"Pubsub retryable GRPC error will be retried: ${t.getMessage}")
}
)

def recoveringOnGrpcInvalidArgument(f2: Status => F[A])(implicit F: Async[F]): F[A] =
f.recoverWith {
case StatusFromThrowable(s) if s.getCode.equals(Status.Code.INVALID_ARGUMENT) =>
f2(s)
}
}
}

private object StatusFromThrowable {
def unapply(t: Throwable): Option[Status] =
Some(Status.fromThrowable(t))
}

def isRetryableException: Throwable => Boolean = {
case apiException: ApiException =>
apiException.getStatusCode.getCode match {
case StatusCode.Code.DEADLINE_EXCEEDED => true
case StatusCode.Code.INTERNAL => true
case StatusCode.Code.CANCELLED => true
case StatusCode.Code.RESOURCE_EXHAUSTED => true
case StatusCode.Code.ABORTED => true
case StatusCode.Code.UNKNOWN => true
case StatusCode.Code.UNAVAILABLE => !apiException.getMessage().contains("Server shutdownNow invoked")
case _ => false
}
case _ =>
false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub.v2

import cats.Show
import io.circe.Decoder
import io.circe.generic.semiauto._
import io.circe.config.syntax._
import com.google.pubsub.v1.ProjectSubscriptionName

import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent

case class PubsubSourceConfigV2(
subscription: PubsubSourceConfigV2.Subscription,
parallelPullFactor: BigDecimal,
durationPerAckExtension: FiniteDuration,
minRemainingDeadline: Double,
gcpUserAgent: GcpUserAgent,
maxPullsPerTransportChannel: Int,
maxMessagesPerPull: Int
)

object PubsubSourceConfigV2 {

case class Subscription(projectId: String, subscriptionId: String)

object Subscription {
implicit def show: Show[Subscription] = Show[Subscription] { s =>
ProjectSubscriptionName.of(s.projectId, s.subscriptionId).toString
}
}

private implicit def subscriptionDecoder: Decoder[Subscription] =
Decoder.decodeString
.map(_.split("/"))
.emap {
case Array("projects", projectId, "subscriptions", subscriptionId) =>
Right(Subscription(projectId, subscriptionId))
case _ =>
Left("Expected format: projects/<project>/subscriptions/<subscription>")
}

implicit def decoder: Decoder[PubsubSourceConfigV2] = deriveDecoder[PubsubSourceConfigV2]
}
Loading

0 comments on commit 7612dc7

Please sign in to comment.