Skip to content

Interrupting a consumer's partition stream interrupts the outer assignment stream #1410

@ex-ratt

Description

@ex-ratt

I stumbled upon a strange issue that I don't understand. Maybe someone can enlighten me?

I have a consumer and interrupt the inner partition streams after a while (it doesn't matter whether I use interruptAfter or interruptWhen). I expect the outer stream containing the current assignment to continue to run forever, even if the inner streams are interrupted. This works as expected if the interruption happens before any records arrive or if it just happens to be in between chunks. But if the interruption happens while a chunk is being processed, then not only is the partition stream interrupted, but everything else, too - the outer stream stops as if stopConsuming is called on the consumer.

I tried to reproduce the issue just with fs2 (without any Kafka stuff), trying to be as close to how the consumer works as possible, including using queues etc., but I just couldn't make it work. What am I missing?

The following code should reproduce it (you just need to have an instance of Kafka running). It is best to disable logging, as otherwise Apache Kafka will obscure the output.

import cats.effect.*
import cats.syntax.all.*
import fs2.Stream
import fs2.kafka.*
import org.apache.kafka.clients.admin.NewTopic

import scala.concurrent.duration.*
import scala.jdk.OptionConverters.*

object KafkaTest extends IOApp.Simple {

  private val bootstrapServerUrl = ??? // todo needs URL to Kafka broker
  private val topic = "test-topic"
  private val partitions = 4

  override def run: IO[Unit] =
    IO.unit
      >> createAndFillTopic
      >> readTopic
      >> deleteTopic

  private def readTopic: IO[Unit] =
    KafkaConsumer.stream(consumerSettings(Deserializer.string, Deserializer.int, AutoOffsetReset.Earliest))
      .evalTap(_.assign(topic))
      .flatMap(_.partitionsMapStream)
      .evalTap(partitionsMap => IO.println(s"Found ${partitionsMap.size} partitions"))
      .map { partitionsMap =>
        partitionsMap.map { case (tp, stream) =>
          val logStart = IO.println(s"Reading from partition ${tp.partition}")
          val logEnd = IO.println(s"Interrupted partition ${tp.partition}")
          tp -> (Stream.exec(logStart) ++ stream).onFinalize(logEnd).interruptAfter(800.milliseconds)
        }
      }
      .flatMap(_.values.reduce(_ ++ _)) // .flatMap(_.values.head) also reproduces the issue
      .map(_.record)
      .evalTap(record => IO.println(s"${record.partition}: ${record.key} -> ${record.value}"))
      .evalTap(_ => IO.sleep(50.milliseconds)) // not necessary, but less output spam before interruption
      .onFinalize(IO.println("Finished")) // we wouldn't expect to see this without interrupting the whole program
      .compile
      .drain

  private def createAndFillTopic: IO[Unit] =
    createTopic >> fillTopic

  private def createTopic: IO[Unit] =
    adminClient.use(_.createTopic(new NewTopic(topic, Int.box(partitions).some.toJava, none[java.lang.Short].toJava)))

  private def deleteTopic: IO[Unit] =
    adminClient.use(_.deleteTopics(List(topic)))

  private def fillTopic: IO[Unit] =
    KafkaProducer.stream(producerSettings(Serializer.string, Serializer.int))
      .flatMap { producer =>
        Stream("a", "c", "d", "f")
          .covary[IO]
          .flatMap(letter => Stream.iterate(1)(_ + 1).take(5000).map(letter -> _))
          .map { case (letter, number) => ProducerRecord(topic, letter, number) }
          .map(ProducerRecords.one)
          .evalMap(producer.produce)
          .groupWithin(1000, 1.second)
          .evalMap(_.sequence)
          .void
      }
      .compile
      .drain

  private def adminClient: Resource[IO, KafkaAdminClient[IO]] =
    KafkaAdminClient.resource(adminSettings)

  private def adminSettings: AdminClientSettings =
    AdminClientSettings(bootstrapServerUrl)

  private def producerSettings[K, V](
    keySerializer: KeySerializer[IO, K],
    valueSerializer: ValueSerializer[IO, V],
  ): ProducerSettings[IO, K, V] =
    ProducerSettings(keySerializer, valueSerializer)
      .withBootstrapServers(bootstrapServerUrl)

  private def consumerSettings[K, V](
    keyDeserializer: KeyDeserializer[IO, K],
    valueDeserializer: ValueDeserializer[IO, V],
    autoOffsetReset: AutoOffsetReset,
  ): ConsumerSettings[IO, K, V] =
    ConsumerSettings(keyDeserializer, valueDeserializer)
      .withBootstrapServers(bootstrapServerUrl)
      .withAutoOffsetReset(autoOffsetReset)
}

I would expect the output to show that all partitions are read from and interrupted, with some record output in between, and then the program to don't do anything. Instead only one partition is read from and after it's interruption it says "Finished" and the program ends like so:

Found 4 partitions
Reading from partition 0
0: a -> 1
0: a -> 2
0: a -> 3
0: a -> 4
0: a -> 5
Interrupted partition 0
Finished

You can make it work as expected by removing the sleeping between between records and shortening the interruption time, such that no records are consumed. Maybe you are lucky and can hit a timing such that partitions end with a record that outputs a number divisible by 500 (number of records per chunk, from ConsumerConfig.DEFAULT_MAX_POLL_RECORDS), because then it also works as expected.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions