Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ testing {
targets {
all {
testTask.configure {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")

Expand All @@ -65,10 +63,12 @@ testing {
}

tasks {
test {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
systemProperty("testLatestDeps", latestDepTest)
}

systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
test {
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,64 +177,126 @@ void shouldHandleFailureInKafkaListener() {
satisfies(longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative));

AtomicReference<SpanData> producer = new AtomicReference<>();
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer"),
span ->
span.hasName("testSingleTopic publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(
MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
satisfies(
MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"),
satisfies(
MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer"))));

producer.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
receiveSpanAssert,
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasStatus(StatusData.error())
.hasException(new IllegalArgumentException("boom"))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(1))),
trace ->
// trace structure differs in latest dep tests because CommonErrorHandler is only set for latest
// dep tests
if (Boolean.getBoolean("testLatestDeps")) {
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
receiveSpanAssert,
span -> span.hasName("producer"),
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
span.hasName("testSingleTopic publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasStatus(StatusData.error())
.hasException(new IllegalArgumentException("boom"))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(1))),
trace ->
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(
MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"),
satisfies(
MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer"))));

producer.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
receiveSpanAssert,
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasStatus(StatusData.error())
.hasException(new IllegalArgumentException("boom"))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(1)),
span -> span.hasName("handle exception").hasParent(trace.getSpan(1)),
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasStatus(StatusData.error())
.hasException(new IllegalArgumentException("boom"))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(4)),
span -> span.hasName("handle exception").hasParent(trace.getSpan(4)),
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(7))));

} else {
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
receiveSpanAssert,
span -> span.hasName("producer"),
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
span.hasName("testSingleTopic publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(
MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"),
satisfies(
MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer"))));

producer.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
receiveSpanAssert,
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasStatus(StatusData.error())
.hasException(new IllegalArgumentException("boom"))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(1))),
trace ->
trace.hasSpansSatisfyingExactly(
receiveSpanAssert,
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasStatus(StatusData.error())
.hasException(new IllegalArgumentException("boom"))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(1))),
trace ->
trace.hasSpansSatisfyingExactly(
receiveSpanAssert,
span ->
span.hasName("testSingleTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasAttributesSatisfyingExactly(processAttributes),
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,

private final Instrumenter<KafkaProcessRequest, Void> processInstrumenter;
@Nullable private final RecordInterceptor<K, V> decorated;
private static final ThreadLocal<ThreadState> threadLocalState = new ThreadLocal<>();

InstrumentedRecordInterceptor(
Instrumenter<KafkaProcessRequest, Void> processInstrumenter,
Expand Down Expand Up @@ -74,7 +75,10 @@ public void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
decorated.success(record, consumer);
}
} finally {
end(record, null);
// if thread state is present span is ended in afterRecord
if (threadLocalState.get() == null) {
end(record, null);
}
}
}

Expand All @@ -85,7 +89,13 @@ public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K
decorated.failure(record, exception, consumer);
}
} finally {
end(record, exception);
// if thread state is present span is ended in afterRecord
ThreadState threadState = threadLocalState.get();
if (threadState == null) {
end(record, exception);
} else {
threadState.error = exception;
}
}
}

Expand All @@ -102,6 +112,7 @@ private void end(ConsumerRecord<K, V> record, @Nullable Throwable error) {
@NoMuzzle // method was added in 2.8.0
@Override
public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
end(record, threadLocalState.get().error);
if (decorated != null) {
decorated.afterRecord(record, consumer);
}
Expand All @@ -110,6 +121,7 @@ public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
@NoMuzzle // method was added in 2.8.0
@Override
public void setupThreadState(Consumer<?, ?> consumer) {
threadLocalState.set(new ThreadState());
if (decorated != null) {
decorated.setupThreadState(consumer);
}
Expand All @@ -118,8 +130,14 @@ public void setupThreadState(Consumer<?, ?> consumer) {
@NoMuzzle // method was added in 2.8.0
@Override
public void clearThreadState(Consumer<?, ?> consumer) {
threadLocalState.remove();
if (decorated != null) {
decorated.clearThreadState(consumer);
}
}

private static class ThreadState {
// used to record the error in failure() so it could be used in afterRecord()
Throwable error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies {
implementation("io.opentelemetry.javaagent:opentelemetry-testing-common")
implementation("org.testcontainers:testcontainers-kafka")

compileOnly("org.springframework.kafka:spring-kafka:2.7.0")
compileOnly("org.springframework.kafka:spring-kafka:2.9.0")
compileOnly("org.springframework.boot:spring-boot-starter-test:2.5.3")
compileOnly("org.springframework.boot:spring-boot-starter:2.5.3")
}
Loading
Loading