Skip to content

Commit 1f5010d

Browse files
committed
Propagate delayed retry queue tracing
Fixes #3209
1 parent b1883de commit 1f5010d

File tree

3 files changed

+317
-4
lines changed

3 files changed

+317
-4
lines changed

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,15 @@ public KafkaSource(Vertx vertx,
183183
return commitHandler.received(record);
184184
}).concatenate();
185185

186-
if (config.getTracingEnabled()) {
187-
incomingMulti = incomingMulti.onItem().invoke(record -> incomingTrace(record, false));
188-
}
189186
if (failureHandler instanceof KafkaDelayedRetryTopic) {
190187
Multi<IncomingKafkaRecord<K, V>> retryStream = (Multi<IncomingKafkaRecord<K, V>>) ((KafkaDelayedRetryTopic) failureHandler)
191188
.retryStream();
192189
incomingMulti = Multi.createBy().merging().withConcurrency(2).streams(incomingMulti, retryStream);
193190
}
191+
192+
if (config.getTracingEnabled()) {
193+
incomingMulti = incomingMulti.onItem().invoke(record -> incomingTrace(record, false));
194+
}
194195
this.stream = incomingMulti
195196
.onFailure().invoke(t -> reportFailure(t, false));
196197
this.batchStream = null;

smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,8 @@ public void testTargetedWithTombstoneRecords() {
730730
.containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
731731
assertThat(consumed1.getRecords())
732732
.extracting(ConsumerRecord::value)
733-
.containsExactly("value-0", "value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7", "value-8", "value-9");
733+
.containsExactly("value-0", "value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7",
734+
"value-8", "value-9");
734735

735736
// Verify topic2 receives tombstone records (null values)
736737
assertThat(consumed2.getRecords())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
package io.smallrye.reactive.messaging.kafka.tracing;
2+
3+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
4+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_OFFSET;
5+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
6+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
7+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.*;
8+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.DELAYED_RETRY_CAUSE;
9+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.DELAYED_RETRY_CAUSE_CLASS_NAME;
10+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.DELAYED_RETRY_COUNT;
11+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.DELAYED_RETRY_FIRST_PROCESSING_TIMESTAMP;
12+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.DELAYED_RETRY_OFFSET;
13+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.DELAYED_RETRY_ORIGINAL_TIMESTAMP;
14+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.DELAYED_RETRY_PARTITION;
15+
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic.DELAYED_RETRY_TOPIC;
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
import static org.awaitility.Awaitility.await;
18+
19+
import java.util.List;
20+
import java.util.UUID;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.stream.Collectors;
23+
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.header.Header;
26+
import org.apache.kafka.common.serialization.IntegerDeserializer;
27+
import org.junit.jupiter.api.AfterAll;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
31+
import io.opentelemetry.api.GlobalOpenTelemetry;
32+
import io.opentelemetry.api.common.Attributes;
33+
import io.opentelemetry.api.trace.SpanId;
34+
import io.opentelemetry.api.trace.SpanKind;
35+
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
36+
import io.opentelemetry.context.propagation.ContextPropagators;
37+
import io.opentelemetry.sdk.OpenTelemetrySdk;
38+
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
39+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
40+
import io.opentelemetry.sdk.trace.SpanProcessor;
41+
import io.opentelemetry.sdk.trace.data.SpanData;
42+
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
43+
import io.opentelemetry.sdk.trace.samplers.Sampler;
44+
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
45+
import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig;
46+
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
47+
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
48+
import io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic;
49+
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandlerTest;
50+
51+
public class KafkaFailureHandlerTracingTest extends KafkaCompanionTestBase {
52+
53+
private SdkTracerProvider tracerProvider;
54+
private InMemorySpanExporter spanExporter;
55+
56+
@BeforeEach
57+
public void setup() {
58+
GlobalOpenTelemetry.resetForTest();
59+
60+
spanExporter = InMemorySpanExporter.create();
61+
SpanProcessor spanProcessor = SimpleSpanProcessor.create(spanExporter);
62+
63+
tracerProvider = SdkTracerProvider.builder()
64+
.addSpanProcessor(spanProcessor)
65+
.setSampler(Sampler.alwaysOn())
66+
.build();
67+
68+
OpenTelemetrySdk.builder()
69+
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
70+
.setTracerProvider(tracerProvider)
71+
.buildAndRegisterGlobal();
72+
}
73+
74+
@AfterAll
75+
static void shutdown() {
76+
GlobalOpenTelemetry.resetForTest();
77+
}
78+
79+
@Test
80+
public void testDelayedRetryStrategyWithTracing() {
81+
addBeans(KafkaDelayedRetryTopic.Factory.class, KafkaFailureHandlerTest.MyObservationCollector.class);
82+
List<String> delayedRetryTopics = List.of(getRetryTopic(topic, 2000), getRetryTopic(topic, 4000));
83+
KafkaFailureHandlerTest.MyReceiverBean bean = runApplication(getDelayedRetryConfig(topic, delayedRetryTopics)
84+
.with("tracing-enabled", true), KafkaFailureHandlerTest.MyReceiverBean.class);
85+
await().until(this::isReady);
86+
87+
companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);
88+
89+
await().atMost(20, TimeUnit.SECONDS)
90+
.untilAsserted(() -> assertThat(bean.list())
91+
.hasSizeGreaterThanOrEqualTo(16)
92+
.containsOnlyOnce(0, 1, 2, 4, 5, 7, 8)
93+
.contains(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
94+
95+
List<SpanData> spans = spanExporter.getFinishedSpanItems();
96+
assertThat(spans).hasSize(25);
97+
98+
List<SpanData> producerSpans = verifyProducerSpans(spans, delayedRetryTopics);
99+
100+
List<SpanData> consumerSpans = verifyConsumerSpans(spans, delayedRetryTopics);
101+
102+
verifyProducerConsumerParents(producerSpans, consumerSpans);
103+
104+
verifyMultipleRetryTraces(spans);
105+
106+
ConsumerTask<String, Integer> records = companion.consumeIntegers()
107+
.fromTopics(delayedRetryTopics.toArray(String[]::new));
108+
109+
await().untilAsserted(() -> assertThat(records.getRecords()).hasSizeGreaterThanOrEqualTo(6));
110+
assertThat(records.getRecords()).allSatisfy(r -> {
111+
assertThat(r.topic()).isIn(delayedRetryTopics);
112+
assertThat(r.value()).isIn(3, 6, 9);
113+
assertThat(new String(r.headers().lastHeader(DELAYED_RETRY_EXCEPTION_CLASS_NAME).value()))
114+
.isEqualTo(IllegalArgumentException.class.getName());
115+
assertThat(new String(r.headers().lastHeader(DELAYED_RETRY_REASON).value())).startsWith("nack 3 -");
116+
assertThat(r.headers().lastHeader(DELAYED_RETRY_CAUSE)).isNull();
117+
assertThat(r.headers().lastHeader(DELAYED_RETRY_CAUSE_CLASS_NAME)).isNull();
118+
assertThat(new String(r.headers().lastHeader(DELAYED_RETRY_PARTITION).value())).isEqualTo("0");
119+
assertThat(new String(r.headers().lastHeader(DELAYED_RETRY_TOPIC).value())).isEqualTo(topic);
120+
assertThat(new String(r.headers().lastHeader(DELAYED_RETRY_OFFSET).value())).isNotNull().isIn("3", "6", "9");
121+
assertThat(r.headers().lastHeader(DELAYED_RETRY_ORIGINAL_TIMESTAMP)).isNotNull();
122+
assertThat(r.headers().lastHeader(DELAYED_RETRY_FIRST_PROCESSING_TIMESTAMP)).isNotNull();
123+
assertThat(r.headers().lastHeader(DELAYED_RETRY_COUNT)).isNotNull();
124+
assertThat(r.headers().lastHeader("traceparent")).isNotNull();
125+
});
126+
127+
assertThat(isAlive()).isTrue();
128+
129+
}
130+
131+
private static void verifyProducerConsumerParents(List<SpanData> producerSpans, List<SpanData> consumerSpans) {
132+
// Verify that producer spans (delayed retry) have correct parent consumer spans
133+
for (SpanData producerSpan : producerSpans) {
134+
// Each producer span should have a parent consumer span
135+
String parentSpanId = producerSpan.getParentSpanId();
136+
assertThat(parentSpanId).isNotEqualTo(SpanId.getInvalid());
137+
138+
// Find the parent consumer span
139+
List<SpanData> parentSpans = consumerSpans.stream()
140+
.filter(cs -> cs.getSpanId().equals(parentSpanId))
141+
.toList();
142+
assertThat(parentSpans).hasSize(1);
143+
144+
// Verify they share the same trace ID
145+
SpanData parentConsumerSpan = parentSpans.get(0);
146+
assertThat(producerSpan.getTraceId()).isEqualTo(parentConsumerSpan.getTraceId());
147+
}
148+
}
149+
150+
private List<SpanData> verifyProducerSpans(List<SpanData> spans, List<String> delayedRetryTopics) {
151+
List<SpanData> producerSpans = spans.stream()
152+
.filter(s -> s.getKind() == SpanKind.PRODUCER)
153+
.toList();
154+
assertThat(producerSpans).hasSize(9);
155+
156+
// Filter only retry topic producer spans (not DLQ spans)
157+
List<SpanData> retryProducerSpans = producerSpans.stream()
158+
.filter(s -> delayedRetryTopics.stream()
159+
.anyMatch(retryTopic -> s.getAttributes().get(MESSAGING_DESTINATION_NAME).equals(retryTopic)))
160+
.toList();
161+
162+
assertThat(retryProducerSpans).hasSize(6);
163+
164+
// Verify producer spans have correct attributes
165+
for (SpanData producerSpan : retryProducerSpans) {
166+
Attributes attrs = producerSpan.getAttributes();
167+
assertThat(attrs.get(MESSAGING_SYSTEM)).isEqualTo("kafka");
168+
assertThat(attrs.get(MESSAGING_OPERATION)).isEqualTo("publish");
169+
assertThat(attrs.get(MESSAGING_DESTINATION_NAME)).isIn(delayedRetryTopics);
170+
assertThat(attrs.get(MESSAGING_KAFKA_OFFSET)).isNotNull();
171+
// Verify span name follows pattern "{topic} publish"
172+
assertThat(producerSpan.getName()).endsWith(" publish");
173+
}
174+
return producerSpans;
175+
}
176+
177+
private List<SpanData> verifyConsumerSpans(List<SpanData> spans, List<String> delayedRetryTopics) {
178+
// Verify span parent propagation
179+
// Filter to get consumer spans (parent spans from incoming messages)
180+
List<SpanData> consumerSpans = spans.stream()
181+
.filter(s -> s.getKind() == SpanKind.CONSUMER)
182+
.toList();
183+
assertThat(consumerSpans).hasSize(16); // 10 original + 6 retries
184+
185+
// Verify consumer spans have correct attributes
186+
for (SpanData consumerSpan : consumerSpans) {
187+
Attributes attrs = consumerSpan.getAttributes();
188+
assertThat(attrs.get(MESSAGING_SYSTEM)).isEqualTo("kafka");
189+
assertThat(attrs.get(MESSAGING_OPERATION)).isEqualTo("receive");
190+
String destinationName = attrs.get(MESSAGING_DESTINATION_NAME);
191+
assertThat(destinationName).isNotNull();
192+
assertThat(destinationName).satisfiesAnyOf(
193+
name -> assertThat(name).isEqualTo(topic),
194+
name -> assertThat(name).isIn(delayedRetryTopics));
195+
assertThat(attrs.get(MESSAGING_KAFKA_OFFSET)).isNotNull();
196+
// Verify span name follows pattern "{topic} receive"
197+
assertThat(consumerSpan.getName()).endsWith(" receive");
198+
}
199+
return consumerSpans;
200+
}
201+
202+
private void verifyMultipleRetryTraces(List<SpanData> spans) {
203+
// Group spans by trace ID to track individual message traces
204+
var spansByTrace = spans.stream()
205+
.filter(s -> s.getKind() == SpanKind.CONSUMER || s.getKind() == SpanKind.PRODUCER)
206+
.collect(Collectors.groupingBy(SpanData::getTraceId));
207+
208+
// For messages that go through multiple retry levels (3, 6, 9)
209+
// Each trace should contain: original consumer -> retry producer -> retry consumer -> retry producer -> retry consumer
210+
List<List<SpanData>> multiRetryTraces = spansByTrace.values().stream()
211+
.filter(traceSpans -> traceSpans.size() >= 5) // At least 2 retries
212+
.toList();
213+
214+
assertThat(multiRetryTraces).isNotEmpty();
215+
216+
for (List<SpanData> traceSpans : multiRetryTraces) {
217+
// Sort spans by start time to follow the flow
218+
List<SpanData> sortedSpans = traceSpans.stream()
219+
.sorted((s1, s2) -> Long.compare(s1.getStartEpochNanos(), s2.getStartEpochNanos()))
220+
.toList();
221+
222+
// Verify parent-child relationships
223+
for (int i = 1; i < sortedSpans.size(); i++) {
224+
SpanData currentSpan = sortedSpans.get(i);
225+
String parentSpanId = currentSpan.getParentSpanId();
226+
227+
// Each span (except root) should have a parent in the same trace
228+
if (!parentSpanId.equals(SpanId.getInvalid())) {
229+
boolean hasParent = sortedSpans.stream()
230+
.anyMatch(s -> s.getSpanId().equals(parentSpanId));
231+
assertThat(hasParent)
232+
.as("Span should have parent in same trace")
233+
.isTrue();
234+
}
235+
}
236+
}
237+
}
238+
239+
@Test
240+
public void testDeadLetterQueueWithTracing() {
241+
addBeans(KafkaDeadLetterQueue.Factory.class, KafkaFailureHandlerTest.MyObservationCollector.class);
242+
String dlqTopic = topic + "-dlq";
243+
KafkaFailureHandlerTest.MyReceiverBean bean = runApplication(getDLQConfig(topic, dlqTopic)
244+
.with("tracing-enabled", true), KafkaFailureHandlerTest.MyReceiverBean.class);
245+
await().until(this::isReady);
246+
247+
companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);
248+
249+
ConsumerTask<String, Integer> dlqRecords = companion.consumeIntegers().fromTopics(dlqTopic, 3)
250+
.awaitCompletion();
251+
252+
// Verify traceparent header is present in DLQ messages
253+
assertThat(dlqRecords.getRecords()).allSatisfy(r -> {
254+
Header traceparent = r.headers().lastHeader("traceparent");
255+
assertThat(traceparent).isNotNull();
256+
});
257+
258+
List<SpanData> spans = spanExporter.getFinishedSpanItems();
259+
List<SpanData> producerSpans = spans.stream()
260+
.filter(s -> s.getKind() == SpanKind.PRODUCER)
261+
.toList();
262+
263+
assertThat(producerSpans).isNotEmpty();
264+
265+
// Verify DLQ producer spans have correct parent consumer spans
266+
List<SpanData> consumerSpans = spans.stream()
267+
.filter(s -> s.getKind() == SpanKind.CONSUMER)
268+
.toList();
269+
270+
for (SpanData producerSpan : producerSpans) {
271+
String parentSpanId = producerSpan.getParentSpanId();
272+
assertThat(parentSpanId).isNotEqualTo(SpanId.getInvalid());
273+
274+
List<SpanData> parentSpans = consumerSpans.stream()
275+
.filter(cs -> cs.getSpanId().equals(parentSpanId))
276+
.toList();
277+
assertThat(parentSpans).hasSize(1);
278+
279+
// Verify they share the same trace ID
280+
assertThat(producerSpan.getTraceId()).isEqualTo(parentSpans.get(0).getTraceId());
281+
}
282+
}
283+
284+
private KafkaMapBasedConfig getDelayedRetryConfig(String topic, List<String> topics) {
285+
KafkaMapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka");
286+
config.put("topic", topic);
287+
config.put("group.id", UUID.randomUUID().toString());
288+
config.put("value.deserializer", IntegerDeserializer.class.getName());
289+
config.put("enable.auto.commit", "false");
290+
config.put("auto.offset.reset", "earliest");
291+
config.put("failure-strategy", "delayed-retry-topic");
292+
config.put("dead-letter-queue.topic", topic + "-dlq");
293+
config.put("delayed-retry-topic.topics", String.join(",", topics));
294+
295+
return config;
296+
}
297+
298+
private KafkaMapBasedConfig getDLQConfig(String topic, String dlqTopic) {
299+
KafkaMapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka");
300+
config.put("topic", topic);
301+
config.put("group.id", UUID.randomUUID().toString());
302+
config.put("value.deserializer", IntegerDeserializer.class.getName());
303+
config.put("enable.auto.commit", "false");
304+
config.put("auto.offset.reset", "earliest");
305+
config.put("failure-strategy", "dead-letter-queue");
306+
config.put("dead-letter-queue.topic", dlqTopic);
307+
308+
return config;
309+
}
310+
311+
}

0 commit comments

Comments
 (0)