Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ val scala213 = "2.13.16"

val scala3 = "3.3.5"

ThisBuild / tlBaseVersion := "3.8"
ThisBuild / tlBaseVersion := "3.9"

ThisBuild / tlCiReleaseBranches := Seq("series/3.x")

Expand Down Expand Up @@ -299,7 +299,25 @@ ThisBuild / mimaBinaryIssueFilters ++= {
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.kafka.ProducerSettings#ProducerSettingsImpl.apply"
),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.produceRecord")
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.produceRecord"),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.kafka.ConsumerSettings.sessionTimeout"
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.kafka.ConsumerSettings.rebalanceRevokeMode"
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.kafka.ConsumerSettings.withRebalanceRevokeMode"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.kafka.ConsumerSettings#ConsumerSettingsImpl.copy"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.kafka.ConsumerSettings#ConsumerSettingsImpl.this"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.kafka.ConsumerSettings#ConsumerSettingsImpl.apply"
)
)
}

Expand Down
42 changes: 42 additions & 0 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,48 @@ You may notice, that actual graceful shutdown implementation requires a decent a

Also note, that even if you implement a graceful shutdown your application may fall with an error. And in this case, a graceful shutdown will not be invoked. It means that your application should be ready to an _at least once_ semantic even when a graceful shutdown is implemented. Or, if you need an _exactly once_ semantic, consider using [transactions](transactions.md).

### Graceful partition revoke

In addition to graceful shutdown of the whole consumer there is an option to configure your consumer to wait for the streams
to finish processing partition before "releasing" it. Behavior can be enabled via the following settings:

```scala mdoc:silent
object WithGracefulPartitionRevoke extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

def run(consumer: KafkaConsumer[IO, String, String]): IO[Unit] = {
consumer.subscribeTo("topic") >> consumer
.stream
.evalMap { msg =>
processRecord(msg).as(msg.offset)
}
.through(commitBatchWithin(100, 15.seconds))
.compile
.drain
}

val consumerSettings =
ConsumerSettings[IO, String, String]
.withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)
.withSessionTimeout(2.seconds)

KafkaConsumer
.resource(consumerSettings)
.use { consumer =>
run(consumer)
}
}
}
```

Please note that this setting does not guarantee that all the commits will be performed before partition is revoked and
that `session.timeout.ms` setting is set to lower value. Be aware that awaiting too long for partition processor
to finish will cause processing of the whole topic to be suspended.

Awaiting for commits to complete might be implemented in the future.

[commitrecovery-default]: @API_BASE_URL@/CommitRecovery$.html#Default:fs2.kafka.CommitRecovery
[committableconsumerrecord]: @API_BASE_URL@/CommittableConsumerRecord.html
[committableoffset]: @API_BASE_URL@/CommittableOffset.html
Expand Down
42 changes: 40 additions & 2 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package fs2.kafka

import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext
import scala.util.Try

import cats.effect.Resource
import cats.Show
Expand Down Expand Up @@ -151,6 +152,17 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V]

/**
* Returns value for property:
*
* {{{
* ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
* }}}
*
* Returns a value as a [[FiniteDuration]] for convenience
*/
def sessionTimeout: FiniteDuration

/**
* Returns a new [[ConsumerSettings]] instance with the specified session timeout. This is
* equivalent to setting the following property using the [[withProperty]] function, except you
Expand Down Expand Up @@ -373,6 +385,17 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]

/**
* One of two possible modes of operation for [[KafkaConsumer.partitionsMapStream]]. See
* [[RebalanceRevokeMode]] for detailed explanation of differences between them.
*/
def rebalanceRevokeMode: RebalanceRevokeMode

/**
* Creates a new [[ConsumerSettings]] with the specified [[rebalanceRevokeMode]].
*/
def withRebalanceRevokeMode(rebalanceRevokeMode: RebalanceRevokeMode): ConsumerSettings[F, K, V]

}

object ConsumerSettings {
Expand All @@ -388,7 +411,8 @@ object ConsumerSettings {
override val pollTimeout: FiniteDuration,
override val commitRecovery: CommitRecovery,
override val recordMetadata: ConsumerRecord[K, V] => String,
override val maxPrefetchBatches: Int
override val maxPrefetchBatches: Int,
override val rebalanceRevokeMode: RebalanceRevokeMode
) extends ConsumerSettings[F, K, V] {

override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
Expand Down Expand Up @@ -422,6 +446,14 @@ object ConsumerSettings {
override def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)

// need to use Try, to avoid separate implementation for scala 2.12
override def sessionTimeout: FiniteDuration =
properties
.get(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)
.flatMap(str => Try(str.toLong).toOption)
.map(_.millis)
.getOrElse(45000.millis)

override def withSessionTimeout(sessionTimeout: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toMillis.toString)

Expand Down Expand Up @@ -509,6 +541,11 @@ object ConsumerSettings {
): ConsumerSettings[F, K, V] =
withProperties(credentialsStore.properties)

override def withRebalanceRevokeMode(
rebalanceRevokeMode: RebalanceRevokeMode
): ConsumerSettings[F, K, V] =
copy(rebalanceRevokeMode = rebalanceRevokeMode)

override def toString: String =
s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)"

Expand Down Expand Up @@ -542,7 +579,8 @@ object ConsumerSettings {
pollTimeout = 50.millis,
commitRecovery = CommitRecovery.Default,
recordMetadata = _ => OffsetFetchResponse.NO_METADATA,
maxPrefetchBatches = 2
maxPrefetchBatches = 2,
rebalanceRevokeMode = RebalanceRevokeMode.Eager
)

def apply[F[_], K, V](
Expand Down
123 changes: 103 additions & 20 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import scala.collection.immutable.SortedSet
import scala.concurrent.duration.FiniteDuration
import scala.util.matching.Regex

import cats.{Foldable, Functor, Reducible}
import cats.{Applicative, Foldable, Functor, Reducible}
import cats.data.{NonEmptySet, OptionT}
import cats.effect.*
import cats.effect.implicits.*
Expand Down Expand Up @@ -126,6 +126,7 @@ object KafkaConsumer {

private def createKafkaConsumer[F[_], K, V](
requests: QueueSink[F, Request[F, K, V]],
settings: ConsumerSettings[F, K, V],
actor: KafkaConsumerActor[F, K, V],
fiber: Fiber[F, Throwable, Unit],
id: Int,
Expand All @@ -141,7 +142,8 @@ object KafkaConsumer {
type PartitionsMapQueue = Queue[F, Option[PartitionsMap]]

def partitionStream(
partition: TopicPartition
partition: TopicPartition,
signalCompletion: F[Unit]
): Stream[F, CommittableConsumerRecord[F, K, V]] =
Stream.force {
actor
Expand All @@ -155,54 +157,91 @@ object KafkaConsumer {
.void
.attempt

Stream.fromQueueUnterminated(chunksQueue, 1).unchunks.interruptWhen(stopStream)
Stream
.fromQueueUnterminated(chunksQueue, 1)
.unchunks
.interruptWhen(stopStream)
.onFinalize(signalCompletion)
}
}

def enqueueAssignment(
assigned: Set[TopicPartition],
assigned: Map[TopicPartition, AssignmentSignals[F]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
stopConsumingDeferred
.tryGet
.flatMap {
case None =>
val assignment: PartitionsMap = assigned
.view
.map { partition =>
partition -> partitionStream(partition)
}
.toMap
val assignment: PartitionsMap = assigned.map { case (partition, signals) =>
partition -> partitionStream(partition, signals.signalStreamFinished.void)
}

partitionsMapQueue.offer(Some(assignment))

case Some(()) =>
F.unit
}

def onRebalance(partitionsMapQueue: PartitionsMapQueue): OnRebalance[F] =
def onRebalance(
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): OnRebalance[F] =
OnRebalance(
onRevoked = _ => F.unit,
onAssigned = assigned => enqueueAssignment(assigned, partitionsMapQueue)
onRevoked = revoked =>
for {
assignment <- assignmentRef.get
_ <- revoked.toVector.flatMap(assignment.get).traverse_(_.awaitStreamFinishedSignal)
} yield (),
onAssigned = assigned =>
for {
assignment <- buildAssignment(assigned)
_ <- assignmentRef.update(_ ++ assignment)
_ <- enqueueAssignment(assignment, partitionsMapQueue)
} yield ()
)

def requestAssignment(partitionsMapQueue: PartitionsMapQueue): F[Set[TopicPartition]] = {
def buildAssignment(
assignedPartitions: SortedSet[TopicPartition]
): F[Map[TopicPartition, AssignmentSignals[F]]] = {
assignedPartitions
.toVector
.traverse { partition =>
settings.rebalanceRevokeMode match {
case RebalanceRevokeMode.EagerMode =>
(partition -> AssignmentSignals.eager[F]).pure[F]
case RebalanceRevokeMode.GracefulMode =>
Deferred[F, Unit].map(revokeFinisher =>
partition -> AssignmentSignals.graceful(revokeFinisher)
)
}
}
.map(_.toMap)
}

def requestAssignment(
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): F[Map[TopicPartition, AssignmentSignals[F]]] = {
val assignment = this.assignment(
Some(
onRebalance(partitionsMapQueue)
onRebalance(assignmentRef, partitionsMapQueue)
)
)

F.race(awaitTermination.attempt, assignment)
.flatMap {
case Left(_) => F.pure(Set.empty)
case Right(assigned) => F.pure(assigned)
case Left(_) => F.pure(Map.empty)
case Right(assigned) => buildAssignment(assigned).flatTap(assignmentRef.set)
}
}

def initialEnqueue(partitionsMapQueue: PartitionsMapQueue): F[Unit] =
def initialEnqueue(
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
for {
assigned <- requestAssignment(partitionsMapQueue)
assigned <- requestAssignment(assignmentRef, partitionsMapQueue)
_ <- enqueueAssignment(assigned, partitionsMapQueue)
} yield ()

Expand All @@ -212,7 +251,9 @@ object KafkaConsumer {
case None =>
for {
partitionsMapQueue <- Stream.eval(Queue.unbounded[F, Option[PartitionsMap]])
_ <- Stream.eval(initialEnqueue(partitionsMapQueue))
assignmentRef <-
Stream.eval(Ref[F].of(Map.empty[TopicPartition, AssignmentSignals[F]]))
_ <- Stream.eval(initialEnqueue(assignmentRef, partitionsMapQueue))
out <- Stream
.fromQueueNoneTerminated(partitionsMapQueue)
.interruptWhen(awaitTermination.attempt)
Expand Down Expand Up @@ -574,6 +615,7 @@ object KafkaConsumer {
fiber <- startBackgroundConsumer(requests, polls, actor, settings.pollInterval)
} yield createKafkaConsumer(
requests,
settings,
actor,
fiber,
id,
Expand Down Expand Up @@ -695,6 +737,47 @@ object KafkaConsumer {

}

/**
* Utility class to provide clarity for internals. Goal is to make [[RebalanceRevokeMode]]
* transparent to the rest of implementation internals.
* @tparam F
* effect used
*/
sealed abstract private class AssignmentSignals[F[_]] {

def signalStreamFinished: F[Boolean]
def awaitStreamFinishedSignal: F[Unit]

}

private object AssignmentSignals {

def eager[F[_]: Applicative]: AssignmentSignals[F] =
EagerSignals()

def graceful[F[_]](
revokeFinisher: Deferred[F, Unit]
): AssignmentSignals[F] =
GracefulSignals[F](revokeFinisher)

final private case class EagerSignals[F[_]: Applicative]() extends AssignmentSignals[F] {

override def signalStreamFinished: F[Boolean] = true.pure[F]
override def awaitStreamFinishedSignal: F[Unit] = ().pure[F]

}

final private case class GracefulSignals[F[_]](
revokeFinisher: Deferred[F, Unit]
) extends AssignmentSignals[F] {

override def signalStreamFinished: F[Boolean] = revokeFinisher.complete(())
override def awaitStreamFinishedSignal: F[Unit] = revokeFinisher.get

}

}

/*
* Prevents the default `MkConsumer` instance from being implicitly available
* to code defined in this object, ensuring factory methods require an instance
Expand Down
Loading