4141import io .smallrye .reactive .messaging .kafka .tracing .HeaderInjectAdapter ;
4242import io .vertx .core .AsyncResult ;
4343import io .vertx .core .json .JsonObject ;
44+ import io .vertx .core .tracing .TracingPolicy ;
45+ import io .vertx .kafka .client .common .KafkaClientOptions ;
4446import io .vertx .kafka .client .producer .KafkaWriteStream ;
4547import io .vertx .mutiny .core .Vertx ;
4648
@@ -69,7 +71,14 @@ public KafkaSink(Vertx vertx, KafkaConnectorOutgoingConfiguration config, KafkaC
6971 JsonObject kafkaConfiguration = extractProducerConfiguration (config );
7072
7173 Map <String , Object > kafkaConfigurationMap = kafkaConfiguration .getMap ();
72- stream = KafkaWriteStream .create (vertx .getDelegate (), kafkaConfigurationMap );
74+ isTracingEnabled = config .getTracingEnabled ();
75+ final KafkaClientOptions clientOptions = KafkaClientOptions .fromMap (kafkaConfigurationMap , true );
76+ clientOptions .setConfig (kafkaConfigurationMap );
77+ if (isTracingEnabled ) {
78+ // Disable Vert.x Kafka Client traces, will be handled directly
79+ clientOptions .setTracingPolicy (TracingPolicy .IGNORE );
80+ }
81+ stream = KafkaWriteStream .create (vertx .getDelegate (), clientOptions );
7382 stream .exceptionHandler (e -> {
7483 if (config .getTopic ().isPresent ()) {
7584 log .unableToWrite (config .getChannel (), config .getTopic ().get (), e );
@@ -88,7 +97,6 @@ public KafkaSink(Vertx vertx, KafkaConnectorOutgoingConfiguration config, KafkaC
8897 deliveryTimeoutMs = kafkaConfiguration .getInteger (ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , defaultDeliveryTimeoutMs );
8998 topic = config .getTopic ().orElseGet (config ::getChannel );
9099 key = config .getKey ().orElse (null );
91- isTracingEnabled = config .getTracingEnabled ();
92100 writeCloudEvents = config .getCloudEvents ();
93101 writeAsBinaryCloudEvent = config .getCloudEventsMode ().equalsIgnoreCase ("binary" );
94102 boolean waitForWriteCompletion = config .getWaitForWriteCompletion ();
0 commit comments