Skip to content

Behaviour of ReceiverOptions.addAssignListener changed #334

@gorkemaygul

Description

@gorkemaygul

On version 1.3.13, after rebalance operation, addAssignListener callback is called only if a partition is assigned to a consumer. So, when we have more consumers than partition size, addAssignListener callback is only triggering on the instances with partition assignment.

After version 1.3.15 it's called(with empty list) even when there is no assignment to the consumer. It is also working like this on version 1.3.17.

In our case, we had a logic inside that method and we made a fix to handle this case.
I didn't see any changes about this on 1.3.15 release notes so I'm wondering if it is working as intended now or it is a bug?

Expected Behavior

When we have more consumers(with same group-id) than topic partitions, addAssignListener callback should be triggered only on partition assigned consumers.

Actual Behavior

When we have more consumers(with same group-id) than partitions, addAssignListener callback is triggering on every consumer(With empty partition list) even if the consumer is not assigned to a partition.

Steps to Reproduce

I have a topic with 1 partition and I'm running multiple consumers with same group-id. When I start first consumer instance addAssignListener callback is called with [topic-0] . Then I start a new instance and after rebalance let's say first instance is assigned again to partition, revoke listener and assign listener is called on the first instance as expected.

Previously(on version 1.3.13), assignListener callback wasn't triggering on 2nd instance since there is no partitions left to consume, however after version 1.3.15, it is triggering with an empty array.

 Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);

        receiverOptions.subscription(Collections.singleton(topic))
                .addAssignListener(partitions -> {
                    log.info("Partitions assigned <{}>.", partitions);
                   //...
                    log.info("Initialization completed for partitions <{}>.",
                            partitions);
                })
                .addRevokeListener(partitions -> {
                    log.info("onPartitionsRevoked to be executed on context {}", partitions);
                    //...
                    log.info("onPartitionsRevoked executed on context {}", partitions);
                });

        var kafkaReceiver = KafkaReceiver.create(receiverOptions);
        var flux = kafkaReceiver.receive();

        flux.subscribe(reactiveSubscriber);
        log.info("Pipeline started.");

Your Environment

  • Reactor version(s) used: 1.3.17 (It occured after version 1.3.15)
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): 17
  • OS and version (eg uname -a):

Metadata

Metadata

Assignees

No one assigned

    Labels

    ❓need-triageThis issue needs triage, hasn't been looked at by a team member yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions