From d355afc6445228aa6c683dc5b5f6fd67e99e1367 Mon Sep 17 00:00:00 2001 From: Lumian Zhang <84360903+Cirilla-zmh@users.noreply.github.com> Date: Thu, 7 Nov 2024 23:59:07 +0800 Subject: [PATCH] fix: Kafka initialization occasionally failed due to concurrent injection of OpenTelemetryMetricsReporter (to #12538) (#12583) --- .../KafkaMetricsConsumerInstrumentation.java | 22 +++++++++++++++---- .../KafkaMetricsProducerInstrumentation.java | 22 +++++++++++++++---- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java index c9775596465d..45c0d6078d4a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java @@ -42,10 +42,24 @@ public static class ConstructorMapAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) Map config) { - // ensure config is a mutable map - if (config.getClass() != HashMap.class) { - config = new HashMap<>(config); - } + + // In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread` + // of DefaultKafkaProducerFactory is set to true, the `config` object entering + // this advice block can be shared across multiple threads. Directly modifying + // `config` could lead to unexpected item loss due to race conditions, where + // some entries might be lost as different threads attempt to modify it + // concurrently. + // + // To prevent such issues, a copy of the `config` should be created here before + // any modifications are made. This ensures that each thread operates on its + // own independent copy of the configuration, thereby eliminating the risk of + // configurations corruption. + // + // More detailed information: + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538 + + // ensure config is a mutable map and avoid concurrency conflicts + config = new HashMap<>(config); enhanceConfig(config); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java index 821bb8443a1d..3c199255522f 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java @@ -42,10 +42,24 @@ public static class ConstructorMapAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) Map config) { - // ensure config is a mutable map - if (config.getClass() != HashMap.class) { - config = new HashMap<>(config); - } + + // In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread` + // of DefaultKafkaProducerFactory is set to true, the `config` object entering + // this advice block can be shared across multiple threads. Directly modifying + // `config` could lead to unexpected item loss due to race conditions, where + // some entries might be lost as different threads attempt to modify it + // concurrently. + // + // To prevent such issues, a copy of the `config` should be created here before + // any modifications are made. This ensures that each thread operates on its + // own independent copy of the configuration, thereby eliminating the risk of + // configurations corruption. + // + // More detailed information: + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538 + + // ensure config is a mutable map and avoid concurrency conflicts + config = new HashMap<>(config); enhanceConfig(config); } }