|
| 1 | +/* |
| 2 | + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. |
| 3 | + * |
| 4 | + * This program is licensed to you under the Snowplow Community License Version 1.0, |
| 5 | + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. |
| 6 | + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 |
| 7 | + */ |
| 8 | +package com.snowplowanalytics.snowplow.sources.pubsub.v2 |
| 9 | + |
| 10 | +import cats.effect.{Async, Ref, Resource, Sync} |
| 11 | +import cats.effect.kernel.Unique |
| 12 | +import cats.implicits._ |
| 13 | +import cats.effect.implicits._ |
| 14 | +import org.typelevel.log4cats.Logger |
| 15 | +import org.typelevel.log4cats.slf4j.Slf4jLogger |
| 16 | + |
| 17 | +import com.google.cloud.pubsub.v1.stub.SubscriberStub |
| 18 | + |
| 19 | +private trait LeaseManager[F[_], A] { |
| 20 | + def manageLeases(in: A): F[Unique.Token] |
| 21 | + def stopManagingLeases(tokens: Vector[Unique.Token]): F[Unit] |
| 22 | +} |
| 23 | + |
| 24 | +private object LeaseManager { |
| 25 | + |
| 26 | + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] |
| 27 | + |
| 28 | + def resource[F[_]: Async]( |
| 29 | + config: PubsubSourceConfigV2, |
| 30 | + stub: SubscriberStub, |
| 31 | + ref: Ref[F, Map[Unique.Token, PubsubBatchState]], |
| 32 | + channelAffinity: Int |
| 33 | + ): Resource[F, LeaseManager[F, SubscriberAction.ProcessRecords]] = |
| 34 | + extendDeadlinesInBackground[F](config, stub, ref, channelAffinity) |
| 35 | + .as(impl(config, ref, channelAffinity)) |
| 36 | + |
| 37 | + private def impl[F[_]: Sync]( |
| 38 | + config: PubsubSourceConfigV2, |
| 39 | + ref: Ref[F, Map[Unique.Token, PubsubBatchState]], |
| 40 | + channelAffinity: Int |
| 41 | + ): LeaseManager[F, SubscriberAction.ProcessRecords] = new LeaseManager[F, SubscriberAction.ProcessRecords] { |
| 42 | + |
| 43 | + def manageLeases(in: SubscriberAction.ProcessRecords): F[Unique.Token] = |
| 44 | + Unique[F].unique.flatMap { token => |
| 45 | + val deadline = in.timeReceived.plusMillis(config.durationPerAckExtension.toMillis) |
| 46 | + val ackIds = in.records.map(_.getAckId) |
| 47 | + val state = PubsubBatchState(deadline, ackIds, channelAffinity) |
| 48 | + ref.update(_ + (token -> state)).as(token) |
| 49 | + } |
| 50 | + |
| 51 | + def stopManagingLeases(tokens: Vector[Unique.Token]): F[Unit] = |
| 52 | + ref.update(_.removedAll(tokens)) |
| 53 | + } |
| 54 | + |
| 55 | + private def extendDeadlinesInBackground[F[_]: Async]( |
| 56 | + config: PubsubSourceConfigV2, |
| 57 | + stub: SubscriberStub, |
| 58 | + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]], |
| 59 | + channelAffinity: Int |
| 60 | + ): Resource[F, Unit] = { |
| 61 | + def go: F[Unit] = for { |
| 62 | + now <- Sync[F].realTimeInstant |
| 63 | + minAllowedDeadline = now.plusMillis((config.minRemainingDeadline * config.durationPerAckExtension.toMillis).toLong) |
| 64 | + newDeadline = now.plusMillis(config.durationPerAckExtension.toMillis) |
| 65 | + toExtend <- refStates.modify { m => |
| 66 | + val toExtend = m.filter { case (_, batchState) => |
| 67 | + batchState.channelAffinity === channelAffinity && batchState.currentDeadline.isBefore(minAllowedDeadline) |
| 68 | + } |
| 69 | + val fixed = toExtend.view |
| 70 | + .mapValues(_.copy(currentDeadline = newDeadline)) |
| 71 | + .toMap |
| 72 | + (m ++ fixed, toExtend.values.toVector) |
| 73 | + } |
| 74 | + _ <- if (toExtend.isEmpty) |
| 75 | + Sync[F].sleep(0.5 * config.minRemainingDeadline * config.durationPerAckExtension) |
| 76 | + else { |
| 77 | + val ackIds = toExtend.sortBy(_.currentDeadline).flatMap(_.ackIds.toVector) |
| 78 | + Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension, channelAffinity) |
| 79 | + } |
| 80 | + _ <- go |
| 81 | + } yield () |
| 82 | + go.background.void |
| 83 | + } |
| 84 | + |
| 85 | +} |
0 commit comments