-
Notifications
You must be signed in to change notification settings - Fork 106
Description
Hey,
during deployments we have noticed a strange issue with records consumption.
Versions
FS2-Kafka Version = 3.0.0-M7 (observed also for M4)
Kafka client version = 3.2.0
Background
Let's assume that we are working with topic called topic
, which has 100
partitions.
Our consumer is running as a single pod (pod-1
). It is consuming all 100 partitions. During Kubernetes rolling deployments another instance is created, let's call it pod-2
. First rebalance is triggered, making pod-1
and pod-2
consuming 50 partitions each.
Let's assume that:
pod-1
is consuming partitions 0, 1, 2, 3, ...., 49
pod-2
is consuming partitions 50, 51, 52, 53, ...., 99
After few seconds, pod-1
gets shutdown and pod-2
is the only working pod. Second rebalance gets triggered and now pod-2
is consuming all partitions.
What we have noticed is that for partitions, which were previously assigned, some messages are not consumed just after second re-assignment.
More examples below.
Code
This issue is really hard to reproduce. It happens very rarely.
val settings =
ConsumerSettings[F, Option[String], Array[Byte]](
keyDeserializer = Deserializer[F, Option[String]],
valueDeserializer = Deserializer[F, Array[Byte]],
).withBootstrapServers("<configuration>")
.withGroupId("TestGroup")
.withAutoOffsetReset("earliest")
.withClientId("TestClientId")
.withEnableAutoCommit(false)
.withPollInterval(50 millis)
//Important note: following assignment strategies are included:
//partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
def recordProcessor(
records: Stream[F, CommittableConsumerRecord[F, Option[String], Array[Byte]]]
): Stream[F, CommittableOffset[F]] =
records.evalMap { record =>
val recordFormatted =
s"Published record: ${record.record.topic}-${record.record.partition}:${record.record.offset}"
Sync[F].delay(println(recordFormatted)) >> Sync[F].pure(record.offset)
}
KafkaConsumer
.stream(settings)
.evalTap(_.subscribeTo("topic"))
.flatMap(_.partitionedStream.map(recordProcessor))
.map(_
.through(
_.groupWithin(100, 50 millis)
.evalMap(CommittableOffsetBatch.fromFoldable(_).commit)
)
)
.parJoinUnbounded
.scope
Example issued output
//pod-2 has been started. Consumer is being prepared...
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000028, offsetEpoch=Optional.empty, ...}
//pod-1 has been shutdown. Rebalance has started again and now all partitions are assigned to pod-2
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000032, offsetEpoch=Optional.empty, ...}
Published record: topic-59:1000037 //Offset is not moved back to 1000032, but it should, since Range (EAGER)
// Commit happens, setting offset to 1000038.
Published record: topic-59:1000040 //Missed records: 1000038, 1000039, which exist in the topic but were skipped by consumer.
Published record: topic-59:1000041
Published record: topic-59:1000042