Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -112,12 +111,12 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
Vertx vertx,
KafkaConsumer<?, ?> consumer,
BiConsumer<Throwable, Boolean> reportFailure) {
Map<String, Object> deadQueueProducerConfig = new HashMap<>(consumer.configuration());
String keyDeserializer = (String) deadQueueProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG);
String valueDeserializer = (String) deadQueueProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG);
// Get deserializers from the consumer configuration
String keyDeserializer = (String) consumer.configuration().get(KEY_DESERIALIZER_CLASS_CONFIG);
String valueDeserializer = (String) consumer.configuration().get(VALUE_DESERIALIZER_CLASS_CONFIG);

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, config.config(),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), CHANNEL_DLQ_SUFFIX,
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -129,9 +128,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
Vertx vertx,
KafkaConsumer<?, ?> consumer,
BiConsumer<Throwable, Boolean> reportFailure) {
Map<String, Object> delayedRetryTopicProducerConfig = new HashMap<>(consumer.configuration());
String keyDeserializer = (String) delayedRetryTopicProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG);
String valueDeserializer = (String) delayedRetryTopicProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG);
String keyDeserializer = (String) consumer.configuration().get(KEY_DESERIALIZER_CLASS_CONFIG);
String valueDeserializer = (String) consumer.configuration().get(VALUE_DESERIALIZER_CLASS_CONFIG);

List<String> retryTopics = config.getDelayedRetryTopicTopics()
.map(topics -> Arrays.stream(topics.split(",")).collect(Collectors.toList()))
Expand All @@ -144,7 +142,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
String deadQueueTopic = config.getDeadLetterQueueTopic().orElse(null);

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, config.config(),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), CHANNEL_DLQ_SUFFIX,
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
Expand All @@ -165,7 +163,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
wireOutgoingConnectorToUpstream(processor, kafkaSink.getSink(), subscriberDecorators,
producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX);

ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, config.config(),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "delayed-retry-topic.consumer",
Map.of("lazy-client", c -> true,
CLIENT_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -27,6 +28,7 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.kafka.converters.ConsumerRecordConverter;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.impl.cpu.CpuCoreSensor;

Expand Down Expand Up @@ -125,6 +127,173 @@ public void testConcurrentStreamInjectingBean() {
});
}

@Test
public void testConcurrentConsumerWithDLQ() {
addBeans(ConsumerRecordConverter.class);
companion.topics().createAndWait(topic, 3);

String dlqTopic = topic + "-dlq";

MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
.with("group.id", UUID.randomUUID().toString())
.with("topic", topic)
.with("concurrency", 3)
.with("failure-strategy", "dead-letter-queue")
.with("dead-letter-queue.topic", dlqTopic)
.with("auto.offset.reset", "earliest")
.with("value.deserializer", IntegerDeserializer.class.getName());

produceMessages();
MyConsumerBeanWithFailures bean = runApplication(config, MyConsumerBeanWithFailures.class);

await().untilAsserted(() -> {
assertThat(bean.getResults())
.hasSizeGreaterThanOrEqualTo(10)
.contains(1, 2, 4, 5, 7, 8, 10);
});

// Verify messages 3, 6, 9 were sent to DLQ
await().untilAsserted(() -> {
List<Integer> dlqMessages = companion.consumeIntegers()
.fromTopics(dlqTopic, 3)
.awaitCompletion()
.getRecords().stream()
.map(r -> r.value())
.toList();
assertThat(dlqMessages).containsExactlyInAnyOrder(3, 6, 9);
});
}

@Test
public void testConcurrentConsumerWithNestedDLQConfig() {
addBeans(ConsumerRecordConverter.class);
companion.topics().createAndWait(topic, 3);

String dlqTopicDefault = topic + "-dlq";
String dlqTopicOverride = topic + "-dlq-override";

// Configure DLQ for base channel and override topic for one concurrent channel via nested config
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
.with("group.id", UUID.randomUUID().toString())
.with("topic", topic)
.with("concurrency", 3)
.with("failure-strategy", "dead-letter-queue")
.with("dead-letter-queue.topic", dlqTopicDefault)
.with("auto.offset.reset", "earliest")
.with("value.deserializer", IntegerDeserializer.class.getName())
.withPrefix("")
// Override DLQ topic for first concurrent channel to test nested config
.with("mp.messaging.incoming.data$1.dead-letter-queue.topic", dlqTopicOverride);

produceMessages();
MyConsumerBeanWithFailures bean = runApplication(config, MyConsumerBeanWithFailures.class);

await().untilAsserted(() -> {
assertThat(bean.getResults())
.hasSizeGreaterThanOrEqualTo(10)
.contains(1, 2, 4, 5, 7, 8, 10);
});

// Verify messages 3, 6, 9 were sent to DLQ topics
// At least one should go to the override topic (proving nested config works)
await().untilAsserted(() -> {
var records = companion.consumeIntegers()
.fromTopics(Set.of(dlqTopicDefault, dlqTopicOverride), 3)
.awaitCompletion()
.getRecords();

// Verify all 3 messages are in DLQ
List<Integer> allDlqMessages = records.stream()
.map(r -> r.value())
.toList();
assertThat(allDlqMessages).containsExactlyInAnyOrder(3, 6, 9);

// Verify that nested configuration was actually used by checking some messages went to override topic
long overrideCount = records.stream()
.filter(r -> dlqTopicOverride.equals(r.topic()))
.count();
assertThat(overrideCount)
.as("Override DLQ topic should have received at least one message (proving nested config works)")
.isGreaterThan(0);
});
}

@Test
public void testConcurrentConsumerWithDelayedRetryTopic() {
addBeans(ConsumerRecordConverter.class, KafkaDelayedRetryTopic.Factory.class);
companion.topics().createAndWait(topic, 3);

String retryTopic1 = KafkaDelayedRetryTopic.getRetryTopic(topic, 1000);
String retryTopic2 = KafkaDelayedRetryTopic.getRetryTopic(topic, 2000);
String dlqTopic = topic + "-dlq";

// Configure delayed-retry-topic with concurrency
// This test verifies that the retry topic producer inherits the main channel config
// which was the issue in #2766
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
.with("group.id", UUID.randomUUID().toString())
.with("topic", topic)
.with("concurrency", 3)
.with("failure-strategy", "delayed-retry-topic")
.with("delayed-retry-topic.topics", retryTopic1 + "," + retryTopic2)
.with("dead-letter-queue.topic", dlqTopic)
.with("auto.offset.reset", "earliest")
.with("value.deserializer", IntegerDeserializer.class.getName());

produceMessages();
MyConsumerBeanWithFailures bean = runApplication(config, MyConsumerBeanWithFailures.class);

// All messages should be processed (successful ones plus retries)
await().untilAsserted(() -> {
assertThat(bean.getResults())
.hasSizeGreaterThanOrEqualTo(10)
.contains(1, 2, 4, 5, 7, 8, 10);
});

// Verify messages 3, 6, 9 were sent to retry topics
// This proves that the delayed retry topic producer was created successfully
// with the correct configuration inherited from the main channel
await().untilAsserted(() -> {
List<Integer> retryMessages = companion.consumeIntegers()
.fromTopics(Set.of(retryTopic1, retryTopic2), 6)
.awaitCompletion()
.getRecords().stream()
.map(r -> r.value())
.toList();
assertThat(retryMessages).hasSizeGreaterThanOrEqualTo(3);
assertThat(retryMessages).contains(3, 6, 9);
});
}

@Test
public void testConcurrentConsumerWithDelayedRetryTopicAndCustomBootstrap() {
addBeans(ConsumerRecordConverter.class, KafkaDelayedRetryTopic.Factory.class);
companion.topics().createAndWait(topic, 3);

String retryTopic1 = KafkaDelayedRetryTopic.getRetryTopic(topic, 1000);
String retryTopic2 = KafkaDelayedRetryTopic.getRetryTopic(topic, 2000);
String dlqTopic = topic + "-dlq";

MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
.with("group.id", UUID.randomUUID().toString())
.with("topic", topic)
.with("concurrency", 3)
.with("failure-strategy", "delayed-retry-topic")
.with("delayed-retry-topic.topics", retryTopic1 + "," + retryTopic2)
.with("dead-letter-queue.topic", dlqTopic)
.with("auto.offset.reset", "earliest")
.with("value.deserializer", IntegerDeserializer.class.getName())
// Custom bootstrap.servers that MUST be inherited by DLQ producer and retry consumer
.with("dead-letter-queue.bootstrap.servers", "localhost:1234")
.with("dead-letter-queue.client.id", "dlq-producer-should-not-connect");

produceMessages();
MyConsumerBeanWithFailures bean = runApplication(config, MyConsumerBeanWithFailures.class);

// All messages should be processed (successful ones plus retries)
await().untilAsserted(() -> assertThat(bean.getResults()).hasSizeLessThan(10));
}

@ApplicationScoped
public static class MyConsumerBean {

Expand All @@ -149,6 +318,31 @@ public Map<Thread, List<Integer>> getPerThread() {
}
}

@ApplicationScoped
public static class MyConsumerBeanWithFailures {

private final List<Integer> list = new CopyOnWriteArrayList<>();

@Incoming("data")
public Uni<Void> process(Message<Integer> message) {
int value = message.getPayload();
int next = value + 1;
list.add(next);

// Nack messages where value is divisible by 3 (values 3, 6, 9)
if (value != 0 && value % 3 == 0) {
return Uni.createFrom().completionStage(message.nack(new IllegalArgumentException("nack " + value)));
}

return Uni.createFrom().completionStage(message.ack())
.onItem().delayIt().by(Duration.ofMillis(100));
}

public List<Integer> getResults() {
return list;
}
}

@ApplicationScoped
public static class MyProcessorBean {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public <T> Optional<T> getOriginalValue(String propertyName, Class<T> propertyTy
public <T> T getValue(String propertyName, Class<T> propertyType) {
if (nestedChannel != null) {
// First check if the nestedChannel channel configuration contains the desired attribute.
Optional<T> maybeResult = super.getOptionalValue(nestedChannelKey(propertyName), propertyType);
Optional<T> maybeResult = getOptionalValueFromSuper(nestedChannelKey(propertyName), propertyType);
if (maybeResult.isPresent()) {
return maybeResult.get();
}
Expand All @@ -74,14 +74,14 @@ public <T> T getValue(String propertyName, Class<T> propertyType) {
}
}
}
return super.getValue(propertyName, propertyType);
return getValueFromSuper(propertyName, propertyType);
}

@Override
public <T> Optional<T> getOptionalValue(String propertyName, Class<T> propertyType) {
if (nestedChannel != null) {
// First check if the nestedChannel channel configuration contains the desired attribute.
Optional<T> maybe = super.getOptionalValue(nestedChannelKey(propertyName), propertyType);
Optional<T> maybe = getOptionalValueFromSuper(nestedChannelKey(propertyName), propertyType);
if (maybe.isPresent()) {
return maybe;
}
Expand All @@ -99,7 +99,7 @@ public <T> Optional<T> getOptionalValue(String propertyName, Class<T> propertyTy
return Optional.empty();
}
}
return super.getOptionalValue(propertyName, propertyType);
return getOptionalValueFromSuper(propertyName, propertyType);
}

/**
Expand Down Expand Up @@ -178,6 +178,8 @@ public Iterable<String> getPropertyNames() {
if (nameExists(prefix + computed)) {
names.add(computed);
}
} else if (overall instanceof ConnectorConfig) {
names.add(name);
}
}

Expand All @@ -186,4 +188,14 @@ public Iterable<String> getPropertyNames() {
return names;
}

private <T> Optional<T> getOptionalValueFromSuper(String propertyName, Class<T> propertyType) {
return overall instanceof ConnectorConfig ? overall.getOptionalValue(propertyName, propertyType)
: super.getOptionalValue(propertyName, propertyType);
}

private <T> T getValueFromSuper(String propertyName, Class<T> propertyType) {
return overall instanceof ConnectorConfig ? overall.getValue(propertyName, propertyType)
: super.getValue(propertyName, propertyType);
}

}
Loading