diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts index 4ef8391b042c..e63f28104f4f 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts @@ -51,8 +51,6 @@ testing { targets { all { testTask.configure { - usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) - jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false") jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false") @@ -65,10 +63,12 @@ testing { } tasks { - test { + withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + systemProperty("testLatestDeps", latestDepTest) + } - systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) + test { jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java index 6818653482ef..10922faa71f7 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java @@ -177,64 +177,126 @@ void shouldHandleFailureInKafkaListener() { satisfies(longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative)); AtomicReference producer = new AtomicReference<>(); - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testSingleTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_SYSTEM, "kafka"), - equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), - equalTo(MESSAGING_OPERATION, "publish"), - satisfies( - MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), - satisfies( - MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), - equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"), - satisfies( - MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("producer")))); - - producer.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - receiveSpanAssert, - span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer.get().getSpanContext())) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly(processAttributes), - span -> span.hasName("consumer").hasParent(trace.getSpan(1))), - trace -> + // trace structure differs in latest dep tests because CommonErrorHandler is only set for latest + // dep tests + if (Boolean.getBoolean("testLatestDeps")) { + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { trace.hasSpansSatisfyingExactly( - receiveSpanAssert, + span -> span.hasName("producer"), span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) + span.hasName("testSingleTopic publish") + .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer.get().getSpanContext())) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly(processAttributes), - span -> span.hasName("consumer").hasParent(trace.getSpan(1))), - trace -> + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(MESSAGING_OPERATION, "publish"), + satisfies( + MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), + satisfies( + MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), + equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + receiveSpanAssert, + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)), + span -> span.hasName("handle exception").hasParent(trace.getSpan(1)), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(4)), + span -> span.hasName("handle exception").hasParent(trace.getSpan(4)), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(7)))); + + } else { + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { trace.hasSpansSatisfyingExactly( - receiveSpanAssert, + span -> span.hasName("producer"), span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) + span.hasName("testSingleTopic publish") + .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer.get().getSpanContext())) - .hasAttributesSatisfyingExactly(processAttributes), - span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(MESSAGING_OPERATION, "publish"), + satisfies( + MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), + satisfies( + MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), + equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + receiveSpanAssert, + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(1))), + trace -> + trace.hasSpansSatisfyingExactly( + receiveSpanAssert, + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(1))), + trace -> + trace.hasSpansSatisfyingExactly( + receiveSpanAssert, + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + } } @Test diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java index 749924cb0255..cb77d40308e4 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java @@ -25,6 +25,7 @@ final class InstrumentedRecordInterceptor implements RecordInterceptor processInstrumenter; @Nullable private final RecordInterceptor decorated; + private static final ThreadLocal threadLocalState = new ThreadLocal<>(); InstrumentedRecordInterceptor( Instrumenter processInstrumenter, @@ -74,7 +75,10 @@ public void success(ConsumerRecord record, Consumer consumer) { decorated.success(record, consumer); } } finally { - end(record, null); + // if thread state is present span is ended in afterRecord + if (threadLocalState.get() == null) { + end(record, null); + } } } @@ -85,7 +89,13 @@ public void failure(ConsumerRecord record, Exception exception, Consumer record, @Nullable Throwable error) { @NoMuzzle // method was added in 2.8.0 @Override public void afterRecord(ConsumerRecord record, Consumer consumer) { + end(record, threadLocalState.get().error); if (decorated != null) { decorated.afterRecord(record, consumer); } @@ -110,6 +121,7 @@ public void afterRecord(ConsumerRecord record, Consumer consumer) { @NoMuzzle // method was added in 2.8.0 @Override public void setupThreadState(Consumer consumer) { + threadLocalState.set(new ThreadState()); if (decorated != null) { decorated.setupThreadState(consumer); } @@ -118,8 +130,14 @@ public void setupThreadState(Consumer consumer) { @NoMuzzle // method was added in 2.8.0 @Override public void clearThreadState(Consumer consumer) { + threadLocalState.remove(); if (decorated != null) { decorated.clearThreadState(consumer); } } + + private static class ThreadState { + // used to record the error in failure() so it could be used in afterRecord() + Throwable error; + } } diff --git a/instrumentation/spring/spring-kafka-2.7/testing/build.gradle.kts b/instrumentation/spring/spring-kafka-2.7/testing/build.gradle.kts index 6f8d2db1c9ed..c29d95a76d52 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/build.gradle.kts +++ b/instrumentation/spring/spring-kafka-2.7/testing/build.gradle.kts @@ -6,7 +6,7 @@ dependencies { implementation("io.opentelemetry.javaagent:opentelemetry-testing-common") implementation("org.testcontainers:testcontainers-kafka") - compileOnly("org.springframework.kafka:spring-kafka:2.7.0") + compileOnly("org.springframework.kafka:spring-kafka:2.9.0") compileOnly("org.springframework.boot:spring-boot-starter-test:2.5.3") compileOnly("org.springframework.boot:spring-boot-starter:2.5.3") } diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java index bcb57ad88c69..4fc5b1e91163 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java @@ -21,14 +21,17 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.AbstractStringAssert; import org.junit.jupiter.api.Test; @@ -127,51 +130,74 @@ void shouldHandleFailureInKafkaListener() { testing() .waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testSingleTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_SYSTEM, "kafka"), - equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), - equalTo(MESSAGING_OPERATION, "publish"), - satisfies( - MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("producer")), - satisfies( - MessagingIncubatingAttributes - .MESSAGING_DESTINATION_PARTITION_ID, - AbstractStringAssert::isNotEmpty), - satisfies( - MESSAGING_KAFKA_MESSAGE_OFFSET, - AbstractLongAssert::isNotNegative), - equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10")), - span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(1)) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly(processAttributes), - span -> span.hasName("consumer").hasParent(trace.getSpan(2)), - span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(1)) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly(processAttributes), - span -> span.hasName("consumer").hasParent(trace.getSpan(4)), - span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(1)) - .hasStatus(StatusData.unset()) - .hasAttributesSatisfyingExactly(processAttributes), - span -> span.hasName("consumer").hasParent(trace.getSpan(6)))); + trace -> { + List> assertions = + new ArrayList<>( + Arrays.asList( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(MESSAGING_OPERATION, "publish"), + satisfies( + MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), + satisfies( + MessagingIncubatingAttributes + .MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), + satisfies( + MESSAGING_KAFKA_MESSAGE_OFFSET, + AbstractLongAssert::isNotNegative), + equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + if (Boolean.getBoolean("testLatestDeps")) { + assertions.add( + span -> span.hasName("handle exception").hasParent(trace.getSpan(2))); + } + assertions.addAll( + Arrays.asList( + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> + span.hasName("consumer") + .hasParent( + trace.getSpan(Boolean.getBoolean("testLatestDeps") ? 5 : 4)))); + if (Boolean.getBoolean("testLatestDeps")) { + assertions.add( + span -> span.hasName("handle exception").hasParent(trace.getSpan(5))); + } + assertions.addAll( + Arrays.asList( + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly(processAttributes), + span -> + span.hasName("consumer") + .hasParent( + trace.getSpan(Boolean.getBoolean("testLatestDeps") ? 8 : 6)))); + + trace.hasSpansSatisfyingExactly(assertions); + }); } @Test diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java index 78fb2607b349..f63ae35ca240 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java @@ -68,6 +68,13 @@ public ConcurrentKafkaListenerContainerFactory singleFactory( factory.setConsumerFactory(consumerFactory); factory.setBatchListener(false); factory.setAutoStartup(true); + try { + // available since spring 2.8 + Class.forName("org.springframework.kafka.listener.CommonErrorHandler"); + ConsumerConfigUtil.addErrorHandler(factory); + } catch (ClassNotFoundException e) { + // ignore + } customizerProvider.ifAvailable(factory::setContainerCustomizer); return factory; } diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfigUtil.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfigUtil.java new file mode 100644 index 000000000000..6d17226b35a4 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfigUtil.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.testing; + +import io.opentelemetry.instrumentation.testing.GlobalTraceUtil; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.MessageListenerContainer; + +class ConsumerConfigUtil { + static void addErrorHandler(ConcurrentKafkaListenerContainerFactory factory) { + factory.setCommonErrorHandler( + new CommonErrorHandler() { + + @Override + public boolean handleOne( + Exception thrownException, + ConsumerRecord record, + Consumer consumer, + MessageListenerContainer container) { + GlobalTraceUtil.runWithSpan("handle exception", () -> {}); + return false; + } + }); + } + + private ConsumerConfigUtil() {} +}