diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java index c9775596465d..45c0d6078d4a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java @@ -42,10 +42,24 @@ public static class ConstructorMapAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) Map config) { - // ensure config is a mutable map - if (config.getClass() != HashMap.class) { - config = new HashMap<>(config); - } + + // In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread` + // of DefaultKafkaProducerFactory is set to true, the `config` object entering + // this advice block can be shared across multiple threads. Directly modifying + // `config` could lead to unexpected item loss due to race conditions, where + // some entries might be lost as different threads attempt to modify it + // concurrently. + // + // To prevent such issues, a copy of the `config` should be created here before + // any modifications are made. This ensures that each thread operates on its + // own independent copy of the configuration, thereby eliminating the risk of + // configurations corruption. + // + // More detailed information: + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538 + + // ensure config is a mutable map and avoid concurrency conflicts + config = new HashMap<>(config); enhanceConfig(config); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java index 821bb8443a1d..3c199255522f 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java @@ -42,10 +42,24 @@ public static class ConstructorMapAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) Map config) { - // ensure config is a mutable map - if (config.getClass() != HashMap.class) { - config = new HashMap<>(config); - } + + // In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread` + // of DefaultKafkaProducerFactory is set to true, the `config` object entering + // this advice block can be shared across multiple threads. Directly modifying + // `config` could lead to unexpected item loss due to race conditions, where + // some entries might be lost as different threads attempt to modify it + // concurrently. + // + // To prevent such issues, a copy of the `config` should be created here before + // any modifications are made. This ensures that each thread operates on its + // own independent copy of the configuration, thereby eliminating the risk of + // configurations corruption. + // + // More detailed information: + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538 + + // ensure config is a mutable map and avoid concurrency conflicts + config = new HashMap<>(config); enhanceConfig(config); } }