Skip to content

CommittablePartitionedSource Spawns Multiple Streams for the Same Topic-Partition #1290

Open
@andrii-kovalchuk-exa

Description

@andrii-kovalchuk-exa

Overview

Streams, built with Consumer.committablePartitionedSource get stalled (produce little or no records) after multiple rebalances. After restart on new pods streams work fine, but get stalled after multiple rebalances too.

Details

When researching the issue and enabling DEBUG logging for alpakka, we found that Kafka Consumer of an old pod gets assigned with new partitions, but new sub-sources don't get created, so that assigned partitions just get eventually revoked:

2020-12-17 01:02:03.307 | DEBUG |  | 34014098 ms| akka.kafka.internal.SubSourceLogic       | [6969b#1] Assigning new partitions: PEB_HIPReq_RT_NkLjaR-27, PEB_HIPReq_RT_NkLjaR-28, PEB_HIPFree_RT_NkLjaR-26, PEB_HIPReq_RT_NkLjaR-35, PEB_HIPFree_RT_NkLjaR-29, PEB_HIPFree_RT_NkLjaR-34, PEB_HIPReq_RT_NkLjaR-26, PEB_HIPReq_RT_NkLjaR-33, PEB_HIPReq_RT_NkLjaR-29, PEB_HIPFree_RT_NkLjaR-27, PEB_HIPFree_RT_NkLjaR-33, PEB_HIPFree_RT_NkLjaR-30, PEB_HIPFree_RT_NkLjaR-24, PEB_HIPReq_RT_NkLjaR-34, PEB_HIPReq_RT_NkLjaR-32, PEB_HIPReq_RT_NkLjaR-25, PEB_HIPReq_RT_NkLjaR-31, PEB_HIPFree_RT_NkLjaR-35, PEB_HIPReq_RT_NkLjaR-30, PEB_HIPFree_RT_NkLjaR-28, PEB_HIPReq_RT_NkLjaR-24, PEB_HIPFree_RT_NkLjaR-31, PEB_HIPFree_RT_NkLjaR-25, PEB_HIPFree_RT_NkLjaR-32
2020-12-17 01:02:30.937 | DEBUG |  | 34041728 ms| akka.kafka.internal.SubSourceLogic       | [6969b#1] Closing SubSources for revoked partitions: 
2020-12-17 01:02:49.347 | DEBUG |  | 34060138 ms| akka.kafka.internal.SubSourceLogic       | [6969b#1] Closing SubSources for revoked partitions: PEB_HIPReq_RT_NkLjaR-27, PEB_HIPReq_RT_NkLjaR-28, PEB_HIPFree_RT_NkLjaR-26, PEB_HIPReq_RT_NkLjaR-35, PEB_HIPFree_RT_NkLjaR-29, PEB_HIPFree_RT_NkLjaR-34, PEB_HIPReq_RT_NkLjaR-26, PEB_HIPReq_RT_NkLjaR-33, PEB_HIPReq_RT_NkLjaR-29, PEB_HIPFree_RT_NkLjaR-27, PEB_HIPFree_RT_NkLjaR-33, PEB_HIPFree_RT_NkLjaR-30, PEB_HIPFree_RT_NkLjaR-24, PEB_HIPReq_RT_NkLjaR-34, PEB_HIPReq_RT_NkLjaR-32, PEB_HIPReq_RT_NkLjaR-25, PEB_HIPReq_RT_NkLjaR-31, PEB_HIPFree_RT_NkLjaR-35, PEB_HIPReq_RT_NkLjaR-30, PEB_HIPFree_RT_NkLjaR-28, PEB_HIPReq_RT_NkLjaR-24, PEB_HIPFree_RT_NkLjaR-31, PEB_HIPFree_RT_NkLjaR-25, PEB_HIPFree_RT_NkLjaR-32

When a pod is new, streams work fine:

2020-12-17 00:32:35.771 | DEBUG |  | 11404 ms| akka.kafka.internal.SubSourceLogic       | [7f0ee#1] Assigning new partitions: PEB_HIPFree_RT_NkLjaR-4, PEB_HIPFree_RT_NkLjaR-5, PEB_HIPFree_RT_NkLjaR-3, PEB_HIPReq_RT_NkLjaR-3, PEB_HIPReq_RT_NkLjaR-4, PEB_HIPReq_RT_NkLjaR-5
2020-12-17 00:32:35.774 | INFO  |  | 11407 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPFree_RT_NkLjaR-4 state
2020-12-17 00:32:35.774 | INFO  |  | 11407 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPFree_RT_NkLjaR-4 state
2020-12-17 00:32:35.899 | INFO  |  | 11532 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPFree_RT_NkLjaR-5 state
2020-12-17 00:32:35.904 | INFO  |  | 11537 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPFree_RT_NkLjaR-3 state
2020-12-17 00:32:35.912 | INFO  |  | 11545 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPReq_RT_NkLjaR-3 state
2020-12-17 00:32:35.941 | INFO  |  | 11574 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPReq_RT_NkLjaR-4 state
2020-12-17 00:32:35.942 | INFO  |  | 11575 ms| a.k.i.CommittableSubSourceStageLogic     | [0e287#1] Starting. Partition PEB_HIPReq_RT_NkLjaR-3
2020-12-17 00:32:35.944 | INFO  |  | 11577 ms| a.k.i.CommittableSubSourceStageLogic     | [05281#1] Starting. Partition PEB_HIPReq_RT_NkLjaR-4
2020-12-17 00:32:35.945 | INFO  |  | 11578 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPReq_RT_NkLjaR-5 state
2020-12-17 00:32:35.948 | INFO  |  | 11581 ms| a.k.i.CommittableSubSourceStageLogic     | [b5dfc#1] Starting. Partition PEB_HIPReq_RT_NkLjaR-5
...

After digging deeper, we discovered the following logs, which looks to point at the root cause:

2020-12-15 17:48:23.603 | DEBUG |  | 72319 ms| akka.kafka.internal.SubSourceLogic       | [b51d3#1] Assigning new partitions: PEB_HIPReq_RT_NkLjaR-0, PEB_HIPFree_RT_NkLjaR-2, PEB_Mrg_RT_NkLjaR-0, PEB_HIPFree_RT_NkLjaR-0, PEB_HIPReq_RT_NkLjaR-2, PEB_HIPReq_RT_NkLjaR-1, PEB_HIPFree_RT_NkLjaR-1
...
2020-12-15 17:50:00.360 | INFO  |  | 169076 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:00.362 | INFO  |  | 169078 ms| a.k.i.CommittableSubSourceStageLogic     | [f7792#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
...
2020-12-15 17:50:05.393 | INFO  |  | 174109 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:05.397 | INFO  |  | 174113 ms| a.k.i.CommittableSubSourceStageLogic     | [e7f24#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
2020-12-15 17:50:10.412 | INFO  |  | 179128 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:10.414 | INFO  |  | 179130 ms| a.k.i.CommittableSubSourceStageLogic     | [f661e#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
2020-12-15 17:50:10.466 | WARN  |  | 179182 ms| akka.kafka.internal.KafkaConsumerActor   | [3ae81] RequestMessages from topic/partition Set(PEB_Mrg_RT_NkLjaR-0) already requested by other stage Set(PEB_Mrg_RT_NkLjaR-0)
2020-12-15 17:50:15.432 | INFO  |  | 184148 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:15.434 | INFO  |  | 184150 ms| a.k.i.CommittableSubSourceStageLogic     | [2ef31#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
2020-12-15 17:50:15.442 | WARN  |  | 184158 ms| akka.kafka.internal.KafkaConsumerActor   | [3ae81] RequestMessages from topic/partition Set(PEB_Mrg_RT_NkLjaR-0) already requested by other stage Set(PEB_Mrg_RT_NkLjaR-0)
2020-12-15 17:50:20.452 | INFO  |  | 189168 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:20.454 | INFO  |  | 189170 ms| a.k.i.CommittableSubSourceStageLogic     | [82b22#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
2020-12-15 17:50:20.463 | WARN  |  | 189179 ms| akka.kafka.internal.KafkaConsumerActor   | [3ae81] RequestMessages from topic/partition Set(PEB_Mrg_RT_NkLjaR-0) already requested by other stage Set(PEB_Mrg_RT_NkLjaR-0)
2020-12-15 17:50:25.472 | INFO  |  | 194188 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:25.474 | INFO  |  | 194190 ms| a.k.i.CommittableSubSourceStageLogic     | [a72e9#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0

As can be seen, multiple sub-sources get spawned for a single PEB_Mrg_RT_NkLjaR-0.

The streaming code is as follows:

Consumer
          .committablePartitionedSource(consumerSettings, topics) 
          .mapAsyncUnordered(parallelism /*32*/) {
            case (topicPartition, source) =>
                // bootstrap state and run the streamF here

We are on alpakka 2.0.2 and kafka 2.4.0.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions