Skip to content

KafkaConsumer closed prematurely on finite streams #1292

@ppodlovics

Description

@ppodlovics

Hello,

I've run into an issue where my app is consuming a finite number of records from a topic, does some processing, then commits the offsets. The issue is if I use call take(...) on the record stream, and then through(commitBatchWithin(...)) the commits time out. If I flip the order, it works. Not sure what's happening here exactly, but my guess is that the consumer closes it's connection and underlying Java consumer cannot finish the commit and just hangs.

Scala version: 2.12.18
fs2.kafka version: 3.1.0

Some example code snippet:

{
      val bootstrapServers = kafkaHostPort
      val topicName = "test-topic"

      val producerSettings: ProducerSettings[IO, String, String] =
        ProducerSettings(
          keySerializer = Serializer.string[IO](StandardCharsets.UTF_8),
          valueSerializer = Serializer.string[IO](StandardCharsets.UTF_8),
        )
          .withBootstrapServers(bootstrapServers)
          .withEnableIdempotence(true)
          .withRetries(3)

      val consumerSettings: ConsumerSettings[IO, String, String] =
        ConsumerSettings(
          Deserializer.string[IO],
          Deserializer.string[IO],
        )
          .withAutoOffsetReset(AutoOffsetReset.Earliest)
          .withBootstrapServers(bootstrapServers)
          .withGroupId("test-group")

      val producerRecords = ProducerRecords(Vector(ProducerRecord(topicName, "k1", "v1"), ProducerRecord(topicName, "k2", "v2")))

      KafkaProducer.resource(producerSettings).use { producer =>
        for {
          _ <- producer.produce(producerRecords)
          _ <- KafkaConsumer
            .stream(consumerSettings)
            .subscribeTo(topicName)
            .records
            .take(1)
            .map(_.offset)
            .through(commitBatchWithin(1, 1.second))
            .compile
            .toVector
        } yield ()
      }
    }

The above code throws fs2.kafka.CommitTimeoutException, but take(1) is moved below .through(commitBatchWithin(1, 1.second)), it finishes, and commits offset.

P.S.: this other issue might be related #1293

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions