Skip to content

Commit 8feded1

Browse files
committed
Retrieve kafka config in places where Override connector config is used
1 parent ed964b8 commit 8feded1

File tree

4 files changed

+28
-7
lines changed

4 files changed

+28
-7
lines changed

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
3737
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
3838
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
39+
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
3940
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
4041
import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
4142
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
@@ -84,6 +85,10 @@ public static class Factory implements KafkaFailureHandler.Factory {
8485
@Inject
8586
Instance<Config> rootConfig;
8687

88+
@Inject
89+
@Any
90+
Instance<Map<String, Object>> configurations;
91+
8792
@Override
8893
public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
8994
Vertx vertx,
@@ -104,7 +109,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
104109
"key-serialization-failure-handler", c -> "dlq-serialization",
105110
"value-serialization-failure-handler", c -> "dlq-serialization",
106111
INTERCEPTOR_CLASSES_CONFIG, c -> ""));
107-
KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(connectorConfig);
112+
Config kafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, connectorConfig);
113+
KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(kafkaConfig);
108114

109115
String deadQueueTopic = config.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + config.getChannel());
110116

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
5555
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
5656
import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit;
57+
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
5758
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
5859
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
5960
import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
@@ -101,6 +102,10 @@ public static class Factory implements KafkaFailureHandler.Factory {
101102
@Inject
102103
Instance<Config> rootConfig;
103104

105+
@Inject
106+
@Any
107+
Instance<Map<String, Object>> configurations;
108+
104109
@Override
105110
public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
106111
Vertx vertx,
@@ -131,7 +136,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
131136
"key-serialization-failure-handler", c -> "dlq-serialization",
132137
"value-serialization-failure-handler", c -> "dlq-serialization",
133138
INTERCEPTOR_CLASSES_CONFIG, c -> ""));
134-
KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(connectorConfig);
139+
Config kafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, connectorConfig);
140+
KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(kafkaConfig);
135141

136142
log.delayedRetryTopic(config.getChannel(), retryTopics, maxRetries, retryTimeout, deadQueueTopic);
137143

@@ -144,7 +150,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
144150
"lazy-client", c -> true,
145151
CLIENT_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId,
146152
GROUP_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId));
147-
KafkaConnectorIncomingConfiguration retryConfig = new KafkaConnectorIncomingConfiguration(retryConsumerConfig);
153+
Config retryKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, retryConsumerConfig);
154+
KafkaConnectorIncomingConfiguration retryConfig = new KafkaConnectorIncomingConfiguration(retryKafkaConfig);
148155
ReactiveKafkaConsumer<Object, Object> retryConsumer = new ReactiveKafkaConsumer<>(retryConfig,
149156
deserializationFailureHandlers,
150157
retryConsumerConfig.getValue(GROUP_ID_CONFIG, String.class), -1,

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.lang.reflect.ParameterizedType;
44
import java.lang.reflect.Type;
5+
import java.util.Map;
56

67
import jakarta.enterprise.context.ApplicationScoped;
78
import jakarta.enterprise.inject.Any;
@@ -70,9 +71,13 @@ public class KafkaRequestReplyFactory implements EmitterFactory<KafkaRequestRepl
7071
@Inject
7172
Instance<Config> config;
7273

74+
@Inject
75+
@Any
76+
Instance<Map<String, Object>> configurations;
77+
7378
@Override
7479
public KafkaRequestReplyImpl<Object, Object> createEmitter(EmitterConfiguration configuration, long defaultBufferSize) {
75-
return new KafkaRequestReplyImpl<>(configuration, defaultBufferSize, config.get(), holder.vertx(),
80+
return new KafkaRequestReplyImpl<>(configuration, defaultBufferSize, config.get(), configurations, holder.vertx(),
7681
kafkaCDIEvents, commitStrategyFactories, failureStrategyFactories, failureHandlers,
7782
correlationIdHandlers, replyFailureHandlers, rebalanceListeners);
7883
}

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
4444
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
4545
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
46+
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
4647
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
4748
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
4849
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
@@ -75,7 +76,8 @@ public class KafkaRequestReplyImpl<Req, Rep> extends MutinyEmitterImpl<Req>
7576

7677
public KafkaRequestReplyImpl(EmitterConfiguration config,
7778
long defaultBufferSize,
78-
Config rootConfig,
79+
Config channelConfiguration,
80+
Instance<Map<String, Object>> configurations,
7981
Vertx vertx,
8082
KafkaCDIEvents kafkaCDIEvents,
8183
Instance<KafkaCommitHandler.Factory> commitHandlerFactory,
@@ -86,12 +88,13 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
8688
Instance<KafkaConsumerRebalanceListener> rebalanceListeners) {
8789
super(config, defaultBufferSize);
8890
this.channel = config.name();
89-
ConnectorConfig connectorConfig = new OverrideConnectorConfig(OUTGOING_PREFIX, rootConfig, channel,
91+
ConnectorConfig connectorConfig = new OverrideConnectorConfig(OUTGOING_PREFIX, channelConfiguration, channel,
9092
"reply", Map.of(
9193
"topic", c -> c.getOriginalValue("topic", String.class).orElse(channel) + DEFAULT_REPLIES_TOPIC_SUFFIX,
9294
"assign-seek",
9395
c -> c.getOriginalValue(REPLY_PARTITION_KEY, Integer.class).map(String::valueOf).orElse(null)));
94-
KafkaConnectorIncomingConfiguration consumerConfig = new KafkaConnectorIncomingConfiguration(connectorConfig);
96+
Config replyKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, connectorConfig);
97+
KafkaConnectorIncomingConfiguration consumerConfig = new KafkaConnectorIncomingConfiguration(replyKafkaConfig);
9598
this.replyTopic = consumerConfig.getTopic().orElse(null);
9699
this.replyPartition = connectorConfig.getOptionalValue(REPLY_PARTITION_KEY, Integer.class).orElse(-1);
97100
this.replyTimeout = Duration.ofMillis(connectorConfig.getOptionalValue(REPLY_TIMEOUT_KEY, Integer.class).orElse(5000));

0 commit comments

Comments
 (0)