Skip to content

Zipping finite KafkaConsumer-based streams doesn't behave as expected (first stream gets recycled after first 1000 elements) #1293

@ppodlovics

Description

@ppodlovics

Hello,

I've run into an issue with finite fs2-kafka streams and zipping. It seems that if I take more than 1000 elements from the default-configured streams and then zip them, the first stream's elements get recycled after the 1000th one starting with the 0th one. The example code snippet makes it clearer below.

Scala version: 2.12.18
fs2.kafka version: 3.1.0

{
      val bootstrapServers = kafkaHostPort
      val topic1Name = "test-topic-1"
      val topic2Name = "test-topic-2"

      val producerSettings: ProducerSettings[IO, String, String] =
        ProducerSettings(
          keySerializer = Serializer.string[IO](StandardCharsets.UTF_8),
          valueSerializer = Serializer.string[IO](StandardCharsets.UTF_8),
        )
          .withBootstrapServers(bootstrapServers)
          .withEnableIdempotence(true)
          .withRetries(3)

      val consumerSettings: ConsumerSettings[IO, String, String] =
        ConsumerSettings(
          Deserializer.string[IO],
          Deserializer.string[IO],
        )
          .withAutoOffsetReset(AutoOffsetReset.Earliest)
          .withBootstrapServers(bootstrapServers)
          .withGroupId("test-group")

      val records = (0 to 1010).flatMap { i =>
        Vector(
          ProducerRecord(topic1Name, s"k${i}", s"v${i}"),
          ProducerRecord(topic2Name, s"k${i}", s"v${i}"),
        )
      }.toVector
      val producerRecords = ProducerRecords(records)

      KafkaProducer.resource(producerSettings).use { producer =>
        for {
          _ <- producer.produce(producerRecords)
          topic1Stream = KafkaConsumer
            .stream(consumerSettings)
            .subscribeTo(topic1Name)
            .records
            .take(1005)
          topic2Stream = KafkaConsumer
            .stream(consumerSettings)
            .subscribeTo(topic2Name)
            .records
            .take(1005)
          result <- (topic1Stream zip topic2Stream)
            .evalTap { case (lhs, rhs) => IO.println(lhs) >> IO.println(rhs) }
            .compile
            .toVector
        } yield {
          result.foreach { case (lhs, rhs) =>
            assert(lhs.record.key == rhs.record.key && lhs.record.value == rhs.record.value)
          }
        }
      }
    }

This code fails on the assert with "k[]0" did not equal "k[100]0". I added some logging to see more clearly what is happening, and there it shows this:

CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 997, key = k997, value = v997, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 998, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 997, key = k997, value = v997, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 998, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 998, key = k998, value = v998, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 999, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 998, key = k998, value = v998, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 999, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 999, key = k999, value = v999, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 1000, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 999, key = k999, value = v999, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1000, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 0, key = k0, value = v0, timestamp = Timestamp(createTime = 1706178880476), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 1, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1000, key = k1000, value = v1000, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1001, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 1, key = k1, value = v1, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 2, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1001, key = k1001, value = v1001, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1002, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 2, key = k2, value = v2, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 3, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1002, key = k1002, value = v1002, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1003, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 3, key = k3, value = v3, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 4, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1003, key = k1003, value = v1003, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1004, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 4, key = k4, value = v4, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 5, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1004, key = k1004, value = v1004, timestamp = Timestamp(createTime = 1706178880599), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1005, test-group))

Could you help me resolve this?

P.S.: this other issue might be related: #1292

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions