Skip to content

Commit

Permalink
fix: kafka initialization occasionally failed due to concurrent injec…
Browse files Browse the repository at this point in the history
…tion of OpenTelemetryMetricsReporter. (#12538)
  • Loading branch information
Cirilla-zmh committed Nov 7, 2024
1 parent c8bd230 commit 719e4ee
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,24 @@ public static class ConstructorMapAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}

// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
// this advice block can be shared across multiple threads. Directly modifying
// `config` could lead to unexpected item loss due to race conditions, where
// some entries might be lost as different threads attempt to modify it
// concurrently.
//
// To prevent such issues, a copy of the `config` should be created here before
// any modifications are made. This ensures that each thread operates on its
// own independent copy of the configuration, thereby eliminating the risk of
// configurations corruption.
//
// More detailed information:
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538

// ensure config is a mutable map and avoid concurrency conflicts
config = new HashMap<>(config);
enhanceConfig(config);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,24 @@ public static class ConstructorMapAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}

// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
// this advice block can be shared across multiple threads. Directly modifying
// `config` could lead to unexpected item loss due to race conditions, where
// some entries might be lost as different threads attempt to modify it
// concurrently.
//
// To prevent such issues, a copy of the `config` should be created here before
// any modifications are made. This ensures that each thread operates on its
// own independent copy of the configuration, thereby eliminating the risk of
// configurations corruption.
//
// More detailed information:
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538

// ensure config is a mutable map and avoid concurrency conflicts
config = new HashMap<>(config);
enhanceConfig(config);
}
}
Expand Down

0 comments on commit 719e4ee

Please sign in to comment.