Skip to content

Commit 9153a3f

Browse files
committed
Fix OverrideConnectorConfig inheritance with concurrency
Fixes #2766
1 parent 1eed9ae commit 9153a3f

File tree

5 files changed

+290
-15
lines changed

5 files changed

+290
-15
lines changed

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX;
1313

1414
import java.nio.charset.StandardCharsets;
15-
import java.util.HashMap;
1615
import java.util.Map;
1716
import java.util.concurrent.CompletableFuture;
1817
import java.util.function.BiConsumer;
@@ -112,12 +111,12 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
112111
Vertx vertx,
113112
KafkaConsumer<?, ?> consumer,
114113
BiConsumer<Throwable, Boolean> reportFailure) {
115-
Map<String, Object> deadQueueProducerConfig = new HashMap<>(consumer.configuration());
116-
String keyDeserializer = (String) deadQueueProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG);
117-
String valueDeserializer = (String) deadQueueProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG);
114+
// Get deserializers from the consumer configuration
115+
String keyDeserializer = (String) consumer.configuration().get(KEY_DESERIALIZER_CLASS_CONFIG);
116+
String valueDeserializer = (String) consumer.configuration().get(VALUE_DESERIALIZER_CLASS_CONFIG);
118117

119118
String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
120-
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
119+
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, config.config(),
121120
KafkaConnector.CONNECTOR_NAME, config.getChannel(), CHANNEL_DLQ_SUFFIX,
122121
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
123122
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.time.Instant;
1919
import java.time.temporal.ChronoUnit;
2020
import java.util.Arrays;
21-
import java.util.HashMap;
2221
import java.util.HashSet;
2322
import java.util.List;
2423
import java.util.Map;
@@ -129,9 +128,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
129128
Vertx vertx,
130129
KafkaConsumer<?, ?> consumer,
131130
BiConsumer<Throwable, Boolean> reportFailure) {
132-
Map<String, Object> delayedRetryTopicProducerConfig = new HashMap<>(consumer.configuration());
133-
String keyDeserializer = (String) delayedRetryTopicProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG);
134-
String valueDeserializer = (String) delayedRetryTopicProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG);
131+
String keyDeserializer = (String) consumer.configuration().get(KEY_DESERIALIZER_CLASS_CONFIG);
132+
String valueDeserializer = (String) consumer.configuration().get(VALUE_DESERIALIZER_CLASS_CONFIG);
135133

136134
List<String> retryTopics = config.getDelayedRetryTopicTopics()
137135
.map(topics -> Arrays.stream(topics.split(",")).collect(Collectors.toList()))
@@ -144,7 +142,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
144142
String deadQueueTopic = config.getDeadLetterQueueTopic().orElse(null);
145143

146144
String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
147-
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
145+
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, config.config(),
148146
KafkaConnector.CONNECTOR_NAME, config.getChannel(), CHANNEL_DLQ_SUFFIX,
149147
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
150148
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
@@ -165,7 +163,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
165163
wireOutgoingConnectorToUpstream(processor, kafkaSink.getSink(), subscriberDecorators,
166164
producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX);
167165

168-
ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
166+
ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, config.config(),
169167
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "delayed-retry-topic.consumer",
170168
Map.of("lazy-client", c -> true,
171169
CLIENT_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId,

smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConcurrentProcessorTest.java

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.time.Duration;
77
import java.util.List;
88
import java.util.Map;
9+
import java.util.Set;
910
import java.util.UUID;
1011
import java.util.concurrent.ConcurrentHashMap;
1112
import java.util.concurrent.CopyOnWriteArrayList;
@@ -27,6 +28,7 @@
2728
import io.smallrye.mutiny.Uni;
2829
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
2930
import io.smallrye.reactive.messaging.kafka.converters.ConsumerRecordConverter;
31+
import io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic;
3032
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
3133
import io.vertx.core.impl.cpu.CpuCoreSensor;
3234

@@ -125,6 +127,173 @@ public void testConcurrentStreamInjectingBean() {
125127
});
126128
}
127129

130+
@Test
131+
public void testConcurrentConsumerWithDLQ() {
132+
addBeans(ConsumerRecordConverter.class);
133+
companion.topics().createAndWait(topic, 3);
134+
135+
String dlqTopic = topic + "-dlq";
136+
137+
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
138+
.with("group.id", UUID.randomUUID().toString())
139+
.with("topic", topic)
140+
.with("concurrency", 3)
141+
.with("failure-strategy", "dead-letter-queue")
142+
.with("dead-letter-queue.topic", dlqTopic)
143+
.with("auto.offset.reset", "earliest")
144+
.with("value.deserializer", IntegerDeserializer.class.getName());
145+
146+
produceMessages();
147+
MyConsumerBeanWithFailures bean = runApplication(config, MyConsumerBeanWithFailures.class);
148+
149+
await().untilAsserted(() -> {
150+
assertThat(bean.getResults())
151+
.hasSizeGreaterThanOrEqualTo(10)
152+
.contains(1, 2, 4, 5, 7, 8, 10);
153+
});
154+
155+
// Verify messages 3, 6, 9 were sent to DLQ
156+
await().untilAsserted(() -> {
157+
List<Integer> dlqMessages = companion.consumeIntegers()
158+
.fromTopics(dlqTopic, 3)
159+
.awaitCompletion()
160+
.getRecords().stream()
161+
.map(r -> r.value())
162+
.toList();
163+
assertThat(dlqMessages).containsExactlyInAnyOrder(3, 6, 9);
164+
});
165+
}
166+
167+
@Test
168+
public void testConcurrentConsumerWithNestedDLQConfig() {
169+
addBeans(ConsumerRecordConverter.class);
170+
companion.topics().createAndWait(topic, 3);
171+
172+
String dlqTopicDefault = topic + "-dlq";
173+
String dlqTopicOverride = topic + "-dlq-override";
174+
175+
// Configure DLQ for base channel and override topic for one concurrent channel via nested config
176+
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
177+
.with("group.id", UUID.randomUUID().toString())
178+
.with("topic", topic)
179+
.with("concurrency", 3)
180+
.with("failure-strategy", "dead-letter-queue")
181+
.with("dead-letter-queue.topic", dlqTopicDefault)
182+
.with("auto.offset.reset", "earliest")
183+
.with("value.deserializer", IntegerDeserializer.class.getName())
184+
.withPrefix("")
185+
// Override DLQ topic for first concurrent channel to test nested config
186+
.with("mp.messaging.incoming.data$1.dead-letter-queue.topic", dlqTopicOverride);
187+
188+
produceMessages();
189+
MyConsumerBeanWithFailures bean = runApplication(config, MyConsumerBeanWithFailures.class);
190+
191+
await().untilAsserted(() -> {
192+
assertThat(bean.getResults())
193+
.hasSizeGreaterThanOrEqualTo(10)
194+
.contains(1, 2, 4, 5, 7, 8, 10);
195+
});
196+
197+
// Verify messages 3, 6, 9 were sent to DLQ topics
198+
// At least one should go to the override topic (proving nested config works)
199+
await().untilAsserted(() -> {
200+
var records = companion.consumeIntegers()
201+
.fromTopics(Set.of(dlqTopicDefault, dlqTopicOverride), 3)
202+
.awaitCompletion()
203+
.getRecords();
204+
205+
// Verify all 3 messages are in DLQ
206+
List<Integer> allDlqMessages = records.stream()
207+
.map(r -> r.value())
208+
.toList();
209+
assertThat(allDlqMessages).containsExactlyInAnyOrder(3, 6, 9);
210+
211+
// Verify that nested configuration was actually used by checking some messages went to override topic
212+
long overrideCount = records.stream()
213+
.filter(r -> dlqTopicOverride.equals(r.topic()))
214+
.count();
215+
assertThat(overrideCount)
216+
.as("Override DLQ topic should have received at least one message (proving nested config works)")
217+
.isGreaterThan(0);
218+
});
219+
}
220+
221+
@Test
222+
public void testConcurrentConsumerWithDelayedRetryTopic() {
223+
addBeans(ConsumerRecordConverter.class, KafkaDelayedRetryTopic.Factory.class);
224+
companion.topics().createAndWait(topic, 3);
225+
226+
String retryTopic1 = KafkaDelayedRetryTopic.getRetryTopic(topic, 1000);
227+
String retryTopic2 = KafkaDelayedRetryTopic.getRetryTopic(topic, 2000);
228+
String dlqTopic = topic + "-dlq";
229+
230+
// Configure delayed-retry-topic with concurrency
231+
// This test verifies that the retry topic producer inherits the main channel config
232+
// which was the issue in #2766
233+
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
234+
.with("group.id", UUID.randomUUID().toString())
235+
.with("topic", topic)
236+
.with("concurrency", 3)
237+
.with("failure-strategy", "delayed-retry-topic")
238+
.with("delayed-retry-topic.topics", retryTopic1 + "," + retryTopic2)
239+
.with("dead-letter-queue.topic", dlqTopic)
240+
.with("auto.offset.reset", "earliest")
241+
.with("value.deserializer", IntegerDeserializer.class.getName());
242+
243+
produceMessages();
244+
MyConsumerBeanWithFailures bean = runApplication(config, MyConsumerBeanWithFailures.class);
245+
246+
// All messages should be processed (successful ones plus retries)
247+
await().untilAsserted(() -> {
248+
assertThat(bean.getResults())
249+
.hasSizeGreaterThanOrEqualTo(10)
250+
.contains(1, 2, 4, 5, 7, 8, 10);
251+
});
252+
253+
// Verify messages 3, 6, 9 were sent to retry topics
254+
// This proves that the delayed retry topic producer was created successfully
255+
// with the correct configuration inherited from the main channel
256+
await().untilAsserted(() -> {
257+
List<Integer> retryMessages = companion.consumeIntegers()
258+
.fromTopics(Set.of(retryTopic1, retryTopic2), 6)
259+
.awaitCompletion()
260+
.getRecords().stream()
261+
.map(r -> r.value())
262+
.toList();
263+
assertThat(retryMessages).hasSizeGreaterThanOrEqualTo(3);
264+
assertThat(retryMessages).contains(3, 6, 9);
265+
});
266+
}
267+
268+
@Test
269+
public void testConcurrentConsumerWithDelayedRetryTopicAndCustomBootstrap() {
270+
addBeans(ConsumerRecordConverter.class, KafkaDelayedRetryTopic.Factory.class);
271+
companion.topics().createAndWait(topic, 3);
272+
273+
String retryTopic1 = KafkaDelayedRetryTopic.getRetryTopic(topic, 1000);
274+
String retryTopic2 = KafkaDelayedRetryTopic.getRetryTopic(topic, 2000);
275+
String dlqTopic = topic + "-dlq";
276+
277+
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
278+
.with("group.id", UUID.randomUUID().toString())
279+
.with("topic", topic)
280+
.with("concurrency", 3)
281+
.with("failure-strategy", "delayed-retry-topic")
282+
.with("delayed-retry-topic.topics", retryTopic1 + "," + retryTopic2)
283+
.with("dead-letter-queue.topic", dlqTopic)
284+
.with("auto.offset.reset", "earliest")
285+
.with("value.deserializer", IntegerDeserializer.class.getName())
286+
// Custom bootstrap.servers that MUST be inherited by DLQ producer and retry consumer
287+
.with("dead-letter-queue.bootstrap.servers", "localhost:1234")
288+
.with("dead-letter-queue.client.id", "dlq-producer-should-not-connect");
289+
290+
produceMessages();
291+
MyConsumerBeanWithFailures bean = runApplication(config, MyConsumerBeanWithFailures.class);
292+
293+
// All messages should be processed (successful ones plus retries)
294+
await().untilAsserted(() -> assertThat(bean.getResults()).hasSizeLessThan(10));
295+
}
296+
128297
@ApplicationScoped
129298
public static class MyConsumerBean {
130299

@@ -149,6 +318,31 @@ public Map<Thread, List<Integer>> getPerThread() {
149318
}
150319
}
151320

321+
@ApplicationScoped
322+
public static class MyConsumerBeanWithFailures {
323+
324+
private final List<Integer> list = new CopyOnWriteArrayList<>();
325+
326+
@Incoming("data")
327+
public Uni<Void> process(Message<Integer> message) {
328+
int value = message.getPayload();
329+
int next = value + 1;
330+
list.add(next);
331+
332+
// Nack messages where value is divisible by 3 (values 3, 6, 9)
333+
if (value != 0 && value % 3 == 0) {
334+
return Uni.createFrom().completionStage(message.nack(new IllegalArgumentException("nack " + value)));
335+
}
336+
337+
return Uni.createFrom().completionStage(message.ack())
338+
.onItem().delayIt().by(Duration.ofMillis(100));
339+
}
340+
341+
public List<Integer> getResults() {
342+
return list;
343+
}
344+
}
345+
152346
@ApplicationScoped
153347
public static class MyProcessorBean {
154348

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public <T> Optional<T> getOriginalValue(String propertyName, Class<T> propertyTy
5757
public <T> T getValue(String propertyName, Class<T> propertyType) {
5858
if (nestedChannel != null) {
5959
// First check if the nestedChannel channel configuration contains the desired attribute.
60-
Optional<T> maybeResult = super.getOptionalValue(nestedChannelKey(propertyName), propertyType);
60+
Optional<T> maybeResult = getOptionalValueFromSuper(nestedChannelKey(propertyName), propertyType);
6161
if (maybeResult.isPresent()) {
6262
return maybeResult.get();
6363
}
@@ -74,14 +74,14 @@ public <T> T getValue(String propertyName, Class<T> propertyType) {
7474
}
7575
}
7676
}
77-
return super.getValue(propertyName, propertyType);
77+
return getValueFromSuper(propertyName, propertyType);
7878
}
7979

8080
@Override
8181
public <T> Optional<T> getOptionalValue(String propertyName, Class<T> propertyType) {
8282
if (nestedChannel != null) {
8383
// First check if the nestedChannel channel configuration contains the desired attribute.
84-
Optional<T> maybe = super.getOptionalValue(nestedChannelKey(propertyName), propertyType);
84+
Optional<T> maybe = getOptionalValueFromSuper(nestedChannelKey(propertyName), propertyType);
8585
if (maybe.isPresent()) {
8686
return maybe;
8787
}
@@ -99,7 +99,7 @@ public <T> Optional<T> getOptionalValue(String propertyName, Class<T> propertyTy
9999
return Optional.empty();
100100
}
101101
}
102-
return super.getOptionalValue(propertyName, propertyType);
102+
return getOptionalValueFromSuper(propertyName, propertyType);
103103
}
104104

105105
/**
@@ -178,6 +178,8 @@ public Iterable<String> getPropertyNames() {
178178
if (nameExists(prefix + computed)) {
179179
names.add(computed);
180180
}
181+
} else if (overall instanceof ConnectorConfig) {
182+
names.add(name);
181183
}
182184
}
183185

@@ -186,4 +188,14 @@ public Iterable<String> getPropertyNames() {
186188
return names;
187189
}
188190

191+
private <T> Optional<T> getOptionalValueFromSuper(String propertyName, Class<T> propertyType) {
192+
return overall instanceof ConnectorConfig ? overall.getOptionalValue(propertyName, propertyType)
193+
: super.getOptionalValue(propertyName, propertyType);
194+
}
195+
196+
private <T> T getValueFromSuper(String propertyName, Class<T> propertyType) {
197+
return overall instanceof ConnectorConfig ? overall.getValue(propertyName, propertyType)
198+
: super.getValue(propertyName, propertyType);
199+
}
200+
189201
}

0 commit comments

Comments
 (0)