Skip to content

Commit be8cfb4

Browse files
authored
Merge pull request #1131 from smallrye/backport-#1127-to-3.x
[3.x] Backport Improve Kafka source health check to handle the lazy consumption case
2 parents 7ed7821 + fc9cef1 commit be8cfb4

File tree

3 files changed

+76
-9
lines changed

3 files changed

+76
-9
lines changed

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/health/KafkaSourceReadinessHealth.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.smallrye.reactive.messaging.health.HealthReport;
1717
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
1818
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
19+
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
1920
import io.vertx.mutiny.core.Vertx;
2021
import io.vertx.mutiny.kafka.admin.KafkaAdminClient;
2122

@@ -27,20 +28,22 @@ public class KafkaSourceReadinessHealth extends BaseHealth {
2728
private final String channel;
2829
private final Set<String> topics;
2930
private final Metric metric;
31+
private final KafkaSource<?, ?> source;
3032

31-
public KafkaSourceReadinessHealth(Vertx vertx, KafkaConnectorIncomingConfiguration config,
33+
public KafkaSourceReadinessHealth(KafkaSource<?, ?> source, Vertx vertx, KafkaConnectorIncomingConfiguration config,
3234
Map<String, String> kafkaConfiguration, Consumer<?, ?> consumer, Set<String> topics, Pattern pattern) {
3335
super(config.getChannel());
3436
this.config = config;
3537
this.channel = config.getChannel();
3638
this.topics = topics;
3739
this.pattern = pattern;
38-
40+
this.source = source;
3941
if (config.getHealthReadinessTopicVerification()) {
4042
// Do not create the client if the readiness health checks are disabled
4143
Map<String, Object> adminConfiguration = new HashMap<>(kafkaConfiguration);
4244
this.admin = KafkaAdminHelper.createAdminClient(vertx, adminConfiguration, config.getChannel(), true);
4345
this.metric = null;
46+
4447
} else {
4548
this.admin = null;
4649
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
@@ -61,7 +64,15 @@ public void close() {
6164

6265
protected void metricsBasedHealthCheck(HealthReport.HealthReportBuilder builder) {
6366
if (metric != null) {
64-
builder.add(channel, (double) metric.metricValue() >= 1.0);
67+
boolean connected = (double) metric.metricValue() >= 1.0;
68+
boolean hasSubscribers = source.hasSubscribers();
69+
if (connected) {
70+
builder.add(channel, true);
71+
} else if (!hasSubscribers) {
72+
builder.add(channel, true, "no subscription yet, so no connection to the Kafka broker yet");
73+
} else {
74+
builder.add(channel, false);
75+
}
6576
} else {
6677
builder.add(channel, true).build();
6778
}

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class KafkaSource<K, V> {
5353
private final boolean isHealthEnabled;
5454
private final boolean isCloudEventEnabled;
5555
private final String channel;
56+
private volatile boolean subscribed;
5657
private final KafkaSourceReadinessHealth health;
5758

5859
public KafkaSource(Vertx vertx,
@@ -150,7 +151,7 @@ className, false, getDeserializationHandler(false, deserializationFailureHandler
150151
failureHandler = createFailureHandler(config, vertx, kafkaConfiguration, kafkaCDIEvents);
151152
this.consumer = kafkaConsumer;
152153
if (configuration.getHealthEnabled() && configuration.getHealthReadinessEnabled()) {
153-
health = new KafkaSourceReadinessHealth(vertx, configuration, kafkaConfiguration,
154+
health = new KafkaSourceReadinessHealth(this, vertx, configuration, kafkaConfiguration,
154155
consumer.getDelegate().unwrap(), topics, pattern);
155156
} else {
156157
health = null;
@@ -189,13 +190,15 @@ className, false, getDeserializationHandler(false, deserializationFailureHandler
189190
}
190191

191192
Multi<IncomingKafkaRecord<K, V>> incomingMulti = multi
193+
.onCancellation().invoke(() -> subscribed = false)
192194
.onSubscribe().call(s -> {
193195
this.consumer.exceptionHandler(t -> reportFailure(t, false));
194196
if (this.pattern != null) {
195197
BiConsumer<UniEmitter<?>, AsyncResult<Void>> completionHandler = (e, ar) -> {
196198
if (ar.failed()) {
197199
e.fail(ar.cause());
198200
} else {
201+
subscribed = true;
199202
e.complete(null);
200203
}
201204
};
@@ -206,7 +209,8 @@ className, false, getDeserializationHandler(false, deserializationFailureHandler
206209
delegate.subscribe(pattern, ar -> completionHandler.accept(e, ar));
207210
});
208211
} else {
209-
return this.consumer.subscribe(topics);
212+
return this.consumer.subscribe(topics)
213+
.onItem().invoke(() -> subscribed = true);
210214
}
211215
})
212216
.map(rec -> commitHandler
@@ -450,4 +454,8 @@ public void isReady(HealthReport.HealthReportBuilder builder) {
450454
public KafkaConsumer<K, V> getConsumer() {
451455
return this.consumer;
452456
}
457+
458+
public boolean hasSubscribers() {
459+
return subscribed;
460+
}
453461
}

smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/health/HealthCheckTest.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,20 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.awaitility.Awaitility.await;
55

6+
import java.time.Duration;
67
import java.util.concurrent.CountDownLatch;
78
import java.util.concurrent.TimeUnit;
89
import java.util.concurrent.atomic.AtomicInteger;
910

1011
import javax.enterprise.context.ApplicationScoped;
1112
import javax.inject.Inject;
1213

14+
import org.apache.kafka.clients.consumer.ConsumerConfig;
15+
import org.apache.kafka.clients.producer.ProducerRecord;
16+
import org.apache.kafka.common.serialization.IntegerDeserializer;
1317
import org.apache.kafka.common.serialization.IntegerSerializer;
1418
import org.eclipse.microprofile.config.inject.ConfigProperty;
15-
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
16-
import org.eclipse.microprofile.reactive.messaging.Incoming;
17-
import org.eclipse.microprofile.reactive.messaging.Message;
18-
import org.eclipse.microprofile.reactive.messaging.Outgoing;
19+
import org.eclipse.microprofile.reactive.messaging.*;
1920
import org.junit.jupiter.api.Test;
2021
import org.reactivestreams.Publisher;
2122

@@ -87,6 +88,41 @@ private KafkaMapBasedConfig getKafkaSinkConfigForProducingBean() {
8788
return builder.build();
8889
}
8990

91+
private KafkaMapBasedConfig getKafkaSourceConfig(String topic) {
92+
KafkaMapBasedConfig.Builder builder = KafkaMapBasedConfig.builder("mp.messaging.incoming.input")
93+
.put("value.deserializer", IntegerDeserializer.class.getName())
94+
.put("topic", topic)
95+
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
96+
return builder.build();
97+
}
98+
99+
@Test
100+
public void testHealthOfApplicationWithChannel() {
101+
KafkaMapBasedConfig config = getKafkaSourceConfig(topic);
102+
LazyConsumingBean bean = runApplication(config, LazyConsumingBean.class);
103+
104+
AtomicInteger expected = new AtomicInteger(0);
105+
usage.produceIntegers(10, null,
106+
() -> new ProducerRecord<>(topic, "key", expected.getAndIncrement()));
107+
108+
await().until(this::isReady);
109+
await().until(this::isAlive);
110+
111+
Multi<Integer> channel = bean.getChannel();
112+
channel
113+
.select().first(10)
114+
.collect().asList()
115+
.await().atMost(Duration.ofSeconds(10));
116+
117+
HealthReport liveness = getHealth().getLiveness();
118+
HealthReport readiness = getHealth().getReadiness();
119+
120+
assertThat(liveness.isOk()).isTrue();
121+
assertThat(readiness.isOk()).isTrue();
122+
assertThat(liveness.getChannels()).hasSize(1);
123+
assertThat(readiness.getChannels()).hasSize(1);
124+
}
125+
90126
@ApplicationScoped
91127
public static class ProducingBean {
92128

@@ -109,4 +145,16 @@ public Publisher<Integer> source() {
109145

110146
}
111147

148+
@ApplicationScoped
149+
public static class LazyConsumingBean {
150+
151+
@Inject
152+
@Channel("input")
153+
Multi<Integer> channel;
154+
155+
public Multi<Integer> getChannel() {
156+
return channel;
157+
}
158+
}
159+
112160
}

0 commit comments

Comments
 (0)