Skip to content

Reactive kafka consumer doesn't pause on calling the consumer.pause() method #375

@ghost

Description

I am trying to implement a circuit breaker with resilience 4j and I need my reactive kafka consumer to pause/resume based on circuit breaker events.

Code flow

There is a reactive kafka receiver that receives records and in turn calls a downstream API. The circuit breaker is implemented such that when the downstream API fails, the circuit state becomes open. Once this happens, we are trying to pause the kafka consumer using the below code snippet:

case CLOSED_TO_FORCED_OPEN:
                            log.warn("Circuit state : OPEN");
                            reactiveKafkaReceiver.doOnConsumer(consumer -> {
                                log.warn("Inside doOnConsumer");
                                consumer.pause(consumer.assignment());
                                return consumer;
                            }).doOnSuccess(success -> {
                                log.info("Successful pause {}", success.paused());
                                    }).doOnError(e -> log.error("Error in pausing", e))
                                    .subscribe();
                            break;

The log "Successful pause" with the correct partitions is getting printed but the consumer is not pausing.

Expected Behavior

The consumer should be paused until it is resumed manually using consumer.resume(consumer.assignment())

Actual Behavior

Either one of 2 things is happening:

  1. Consumer is not getting paused at all
  2. Consumer is getting paused but it is again being resumed automatically

Steps to Reproduce

Wrap a circuit breaker on top of the processRecord() method and add the pause functionality on circuit breaker state transition from closed to open

reactiveKafkaReceiverRecord
                .doOnNext(record -> {
                    log.info("Reading offset {}, partition:{}, message:{}", record.offset(),record.partition(), record.value());
                })
                .concatMap(r -> this.processRecord(r)
                        .doOnSuccess(event -> r.receiverOffset().acknowledge())
                        .doOnError(error -> log.error("Error processing consumer events: {} ", error.getMessage()))
                        .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true))
                        .onErrorResume(error -> {
                            log.error("Retries exhausted for " + r.offset());
                            return Mono.empty();
                        }))
                .repeat()
                .subscribe();

Your Environment

  • Reactor version(s) used: 1.3.19
  • JVM version (java -version): 17

Metadata

Metadata

Assignees

No one assigned

    Labels

    ❓need-triageThis issue needs triage, hasn't been looked at by a team member yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions