Skip to content

Commit caf9622

Browse files
ozangunalpcescoffier
authored andcommitted
Fix Pulsar log messages to include channel-specific configuration
Fixes #3154
1 parent f629147 commit caf9622

File tree

3 files changed

+12
-2
lines changed

3 files changed

+12
-2
lines changed

smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.apache.pulsar.client.api.RedeliveryBackoff;
2525
import org.apache.pulsar.client.api.SubscriptionType;
2626
import org.apache.pulsar.client.impl.ClientBuilderImpl;
27+
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
2728
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
29+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
2830
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
2931
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
3032
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -356,4 +358,12 @@ private static boolean allCaps(String key) {
356358
return key.toUpperCase().equals(key);
357359
}
358360

361+
public ConsumerConfigurationData<?> getConfig(ConsumerBuilder<?> consumerBuilder) {
362+
return ((ConsumerBuilderImpl<?>) consumerBuilder).getConf();
363+
}
364+
365+
public ProducerConfigurationData getConfig(ProducerBuilder<?> producerBuilder) {
366+
return ((ProducerBuilderImpl<?>) producerBuilder).getConf();
367+
}
368+
359369
}

smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
7575
}
7676
ConsumerBuilder<T> builder = configResolver.configure(client.newConsumer(schema), ic, conf);
7777
this.consumer = builder.subscribe();
78-
log.createdConsumerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf);
78+
log.createdConsumerWithConfig(channel, SchemaResolver.getSchemaName(schema), configResolver.getConfig(builder));
7979
this.ackHandler = ackHandlerFactory.create(consumer, ic);
8080
this.failureHandler = failureHandlerFactory.create(consumer, ic, this::reportFailure);
8181
this.context = ((VertxInternal) vertx.getDelegate()).createEventLoopContext();

smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public PulsarOutgoingChannel(PulsarClient client, Schema<T> schema, PulsarConnec
5959
}
6060
ProducerBuilder<T> builder = configResolver.configure(client.newProducer(schema), oc, conf);
6161
this.producer = builder.create();
62-
log.createdProducerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf);
62+
log.createdProducerWithConfig(channel, SchemaResolver.getSchemaName(schema), configResolver.getConfig(builder));
6363
long requests = getRequests(oc, conf);
6464

6565
processor = new SenderProcessor(requests, oc.getWaitForWriteCompletion(), this::sendMessage);

0 commit comments

Comments
 (0)