Skip to content

Commit eac1651

Browse files
committed
Add Otel tracing to SQS connector
Fixes #3188
1 parent 8b8dc62 commit eac1651

File tree

13 files changed

+824
-24
lines changed

13 files changed

+824
-24
lines changed

smallrye-reactive-messaging-aws-sqs/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
<artifactId>smallrye-reactive-messaging-provider</artifactId>
3535
<version>${project.version}</version>
3636
</dependency>
37+
<dependency>
38+
<groupId>io.smallrye.reactive</groupId>
39+
<artifactId>smallrye-reactive-messaging-otel</artifactId>
40+
<version>${project.version}</version>
41+
</dependency>
3742
<dependency>
3843
<groupId>org.eclipse.microprofile.config</groupId>
3944
<artifactId>microprofile-config-api</artifactId>
@@ -77,6 +82,16 @@
7782
<version>${jackson.version}</version>
7883
<scope>test</scope>
7984
</dependency>
85+
<dependency>
86+
<groupId>io.opentelemetry</groupId>
87+
<artifactId>opentelemetry-sdk-trace</artifactId>
88+
<scope>test</scope>
89+
</dependency>
90+
<dependency>
91+
<groupId>io.opentelemetry</groupId>
92+
<artifactId>opentelemetry-sdk-testing</artifactId>
93+
<scope>test</scope>
94+
</dependency>
8095
<dependency>
8196
<groupId>io.smallrye.reactive</groupId>
8297
<artifactId>smallrye-connector-attribute-processor</artifactId>

smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.eclipse.microprofile.reactive.messaging.Message;
2323
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
2424

25+
import io.opentelemetry.api.OpenTelemetry;
2526
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
2627
import io.smallrye.reactive.messaging.connector.InboundConnector;
2728
import io.smallrye.reactive.messaging.connector.OutboundConnector;
@@ -41,6 +42,7 @@
4142
@ConnectorAttribute(name = "endpoint-override", type = "string", direction = INCOMING_AND_OUTGOING, description = "The endpoint override")
4243
@ConnectorAttribute(name = "credentials-provider", type = "string", direction = INCOMING_AND_OUTGOING, description = "The credential provider to be used in the client")
4344
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
45+
@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")
4446

4547
@ConnectorAttribute(name = "group.id", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages with the specified group id")
4648
@ConnectorAttribute(name = "batch", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages in batches of maximum 10 messages", defaultValue = "false")
@@ -80,6 +82,9 @@ public class SqsConnector implements InboundConnector, OutboundConnector, Health
8082
@Any
8183
Instance<SqsFailureHandler.Factory> failureHandlerFactories;
8284

85+
@Inject
86+
Instance<OpenTelemetry> openTelemetryInstance;
87+
8388
Vertx vertx;
8489

8590
private final List<SqsInboundChannel> inboundChannels = new CopyOnWriteArrayList<>();
@@ -106,16 +111,16 @@ public Publisher<? extends Message<?>> getPublisher(Config config) {
106111
var conf = new SqsConnectorIncomingConfiguration(config);
107112
var customizer = CDIUtils.getInstanceById(customizers, conf.getReceiveRequestCustomizer().orElse(conf.getChannel()),
108113
() -> null);
109-
var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping, ackHandlerFactories,
110-
failureHandlerFactories);
114+
var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping, openTelemetryInstance,
115+
ackHandlerFactories, failureHandlerFactories);
111116
inboundChannels.add(channel);
112117
return channel.getStream();
113118
}
114119

115120
@Override
116121
public Subscriber<? extends Message<?>> getSubscriber(Config config) {
117122
var conf = new SqsConnectorOutgoingConfiguration(config);
118-
var channel = new SqsOutboundChannel(conf, sqsManager, jsonMapping);
123+
var channel = new SqsOutboundChannel(conf, sqsManager, jsonMapping, openTelemetryInstance);
119124
outboundChannels.add(channel);
120125
return channel.getSubscriber();
121126
}

smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515

1616
import org.eclipse.microprofile.reactive.messaging.Message;
1717

18+
import io.opentelemetry.api.OpenTelemetry;
1819
import io.smallrye.mutiny.Multi;
1920
import io.smallrye.mutiny.Uni;
2021
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsIgnoreAckHandler;
22+
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsOpenTelemetryInstrumenter;
23+
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
2124
import io.smallrye.reactive.messaging.health.HealthReport;
2225
import io.smallrye.reactive.messaging.json.JsonMapping;
2326
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
@@ -47,13 +50,18 @@ public class SqsInboundChannel {
4750
private final boolean healthEnabled;
4851
private final List<String> messageAttributeNames;
4952
private final Integer visibilityTimeout;
53+
private final boolean tracingEnabled;
54+
private final SqsOpenTelemetryInstrumenter sqsInstrumenter;
55+
private volatile String queueUrl;
5056

5157
public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsManager sqsManager,
5258
SqsReceiveMessageRequestCustomizer customizer, JsonMapping jsonMapper,
59+
Instance<OpenTelemetry> openTelemetryInstance,
5360
Instance<SqsAckHandler.Factory> ackHandlerFactories,
5461
Instance<SqsFailureHandler.Factory> failureHandlerFactories) {
5562
this.channel = conf.getChannel();
5663
this.healthEnabled = conf.getHealthEnabled();
64+
this.tracingEnabled = conf.getTracingEnabled();
5765
this.retries = conf.getReceiveRequestRetries();
5866
this.client = sqsManager.getClient(conf);
5967
this.queueUrlUni = sqsManager.getQueueUrl(conf).memoize().indefinitely();
@@ -65,7 +73,11 @@ public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, Sq
6573
this.maxNumberOfMessages = conf.getMaxNumberOfMessages();
6674
this.messageAttributeNames = getMessageAttributeNames(conf);
6775
this.customizer = customizer;
68-
76+
if (tracingEnabled) {
77+
sqsInstrumenter = SqsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
78+
} else {
79+
sqsInstrumenter = null;
80+
}
6981
SqsAckHandler ackHandler = createAckHandler(ackHandlerFactories, conf, vertx, client, queueUrlUni);
7082
SqsFailureHandler failureHandler = createFailureHandler(failureHandlerFactories, conf, client, queueUrlUni);
7183
PausablePollingStream<List<software.amazon.awssdk.services.sqs.model.Message>, software.amazon.awssdk.services.sqs.model.Message> pollingStream = new PausablePollingStream<>(
@@ -77,15 +89,27 @@ channel, request(null, 0), (messages, processor) -> {
7789
}
7890
}, requestExecutor, maxNumberOfMessages * 2, conf.getReceiveRequestPauseResume());
7991
this.stream = Multi.createFrom()
80-
.deferred(() -> queueUrlUni.onItem().transformToMulti(queueUrl -> pollingStream.getStream()))
92+
.deferred(() -> queueUrlUni.onItem().invoke(this::setQueueUrl)
93+
.onItem().transformToMulti(queueUrl -> pollingStream.getStream()))
8194
.emitOn(context::runOnContext, conf.getMaxNumberOfMessages())
8295
.onItem().transform(message -> new SqsMessage<>(message, jsonMapper, ackHandler, failureHandler))
96+
.onItem().invoke(this::incomingTrace)
8397
.onFailure().invoke(throwable -> {
8498
log.errorReceivingMessage(channel, throwable);
8599
reportFailure(throwable, false);
86100
});
87101
}
88102

103+
private void setQueueUrl(String queueUrl) {
104+
this.queueUrl = queueUrl;
105+
}
106+
107+
private void incomingTrace(SqsMessage<?> sqsMessage) {
108+
if (tracingEnabled) {
109+
sqsInstrumenter.traceIncoming(sqsMessage, new SqsTrace(this.queueUrl, sqsMessage.getMessage()));
110+
}
111+
}
112+
89113
private SqsFailureHandler createFailureHandler(Instance<SqsFailureHandler.Factory> failureHandlerFactories,
90114
SqsConnectorIncomingConfiguration conf,
91115
SqsAsyncClient client, Uni<String> queueUrlUni) {
@@ -107,6 +131,10 @@ private SqsAckHandler createAckHandler(Instance<SqsAckHandler.Factory> ackHandle
107131
private List<String> getMessageAttributeNames(SqsConnectorIncomingConfiguration conf) {
108132
List<String> names = new ArrayList<>();
109133
names.add(SqsConnector.CLASS_NAME_ATTRIBUTE);
134+
if (tracingEnabled) {
135+
names.add("traceparent");
136+
names.add("tracestate");
137+
}
110138
conf.getReceiveRequestMessageAttributeNames().ifPresent(s -> names.addAll(Arrays.asList(s.split(","))));
111139
return names;
112140
}

smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,28 @@
11
package io.smallrye.reactive.messaging.aws.sqs;
22

33
import java.time.Duration;
4-
import java.util.ArrayList;
5-
import java.util.HashMap;
6-
import java.util.List;
7-
import java.util.Map;
8-
import java.util.Optional;
4+
import java.util.*;
95
import java.util.concurrent.Flow;
106
import java.util.concurrent.atomic.AtomicBoolean;
117
import java.util.stream.Collectors;
128
import java.util.stream.IntStream;
139

10+
import jakarta.enterprise.inject.Instance;
11+
1412
import org.eclipse.microprofile.reactive.messaging.Message;
1513

14+
import io.opentelemetry.api.OpenTelemetry;
1615
import io.smallrye.mutiny.Multi;
1716
import io.smallrye.mutiny.Uni;
1817
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
1918
import io.smallrye.reactive.messaging.aws.sqs.i18n.AwsSqsLogging;
19+
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsOpenTelemetryInstrumenter;
20+
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
2021
import io.smallrye.reactive.messaging.health.HealthReport;
2122
import io.smallrye.reactive.messaging.json.JsonMapping;
2223
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
2324
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
24-
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
25-
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
26-
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
27-
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
28-
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
29-
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
30-
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
25+
import software.amazon.awssdk.services.sqs.model.*;
3126

3227
public class SqsOutboundChannel {
3328

@@ -43,10 +38,14 @@ public class SqsOutboundChannel {
4338
private final boolean batch;
4439
private final Duration batchDelay;
4540
private final int batchSize;
41+
private final boolean tracingEnabled;
42+
private final SqsOpenTelemetryInstrumenter sqsInstrumenter;
4643

47-
public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqsManager, JsonMapping jsonMapping) {
44+
public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqsManager, JsonMapping jsonMapping,
45+
Instance<OpenTelemetry> openTelemetryInstance) {
4846
this.channel = conf.getChannel();
4947
this.healthEnabled = conf.getHealthEnabled();
48+
this.tracingEnabled = conf.getTracingEnabled();
5049
this.client = sqsManager.getClient(conf);
5150
this.batch = conf.getBatch();
5251
this.batchSize = conf.getBatchSize();
@@ -69,6 +68,11 @@ public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqs
6968
AwsSqsLogging.log.unableToDispatch(channel, f);
7069
reportFailure(f);
7170
}));
71+
if (tracingEnabled) {
72+
sqsInstrumenter = SqsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
73+
} else {
74+
sqsInstrumenter = null;
75+
}
7276
}
7377

7478
public Flow.Subscriber<? extends Message<?>> getSubscriber() {
@@ -178,15 +182,29 @@ private SendMessageBatchRequestEntry sendMessageBatchRequestEntry(String channel
178182
private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message<?> m) {
179183
Object payload = m.getPayload();
180184
String queueUrl = channelQueueUrl;
181-
if (payload instanceof SendMessageRequest) {
182-
return (SendMessageRequest) payload;
185+
if (payload instanceof SendMessageRequest request) {
186+
if (tracingEnabled) {
187+
SendMessageRequest.Builder builder = request.toBuilder();
188+
Map<String, MessageAttributeValue> mutableAttributes = new HashMap<>(request.messageAttributes());
189+
outgoingTrace(m, mutableAttributes);
190+
builder.messageAttributes(mutableAttributes);
191+
return builder.build();
192+
}
193+
return request;
183194
}
184-
if (payload instanceof SendMessageRequest.Builder) {
185-
SendMessageRequest.Builder builder = ((SendMessageRequest.Builder) payload)
186-
.queueUrl(queueUrl);
195+
if (payload instanceof SendMessageRequest.Builder builder) {
196+
builder.queueUrl(queueUrl);
187197
if (groupId != null) {
188198
builder.messageGroupId(groupId);
189199
}
200+
if (tracingEnabled) {
201+
SendMessageRequest request = builder.build();
202+
Map<String, MessageAttributeValue> mutableAttributes = new HashMap<>(request.messageAttributes());
203+
outgoingTrace(m, mutableAttributes);
204+
return request.toBuilder()
205+
.messageAttributes(mutableAttributes)
206+
.build();
207+
}
190208
return builder.build();
191209
}
192210
SendMessageRequest.Builder builder = SendMessageRequest.builder();
@@ -216,6 +234,7 @@ private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message
216234
if (msg.hasAttributes()) {
217235
msgAttributes.putAll(msg.messageAttributes());
218236
}
237+
outgoingTrace(m, msgAttributes);
219238
return builder
220239
.queueUrl(queueUrl)
221240
.messageGroupId(groupId)
@@ -224,6 +243,7 @@ private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message
224243
.build();
225244
}
226245
String messageBody = outgoingPayloadClassName(payload, msgAttributes);
246+
outgoingTrace(m, msgAttributes);
227247
return builder
228248
.queueUrl(queueUrl)
229249
.messageGroupId(groupId)
@@ -258,6 +278,13 @@ private boolean isPrimitiveBoxed(Class<?> c) {
258278
|| c.equals(Long.class);
259279
}
260280

281+
private void outgoingTrace(Message<?> message, Map<String, MessageAttributeValue> attributes) {
282+
if (tracingEnabled) {
283+
String indefinitely = queueUrlUni.await().indefinitely(); // memoized
284+
sqsInstrumenter.traceOutgoing(message, new SqsTrace(indefinitely, attributes));
285+
}
286+
}
287+
261288
public void close() {
262289
closed.set(true);
263290
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package io.smallrye.reactive.messaging.aws.sqs.tracing;
2+
3+
import java.util.Collections;
4+
import java.util.List;
5+
6+
import io.opentelemetry.api.common.AttributesBuilder;
7+
import io.opentelemetry.context.Context;
8+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
9+
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
10+
11+
public class SqsAttributesExtractor implements AttributesExtractor<SqsTrace, Void> {
12+
private final MessagingAttributesGetter<SqsTrace, Void> messagingAttributesGetter;
13+
14+
public SqsAttributesExtractor() {
15+
this.messagingAttributesGetter = new SqsMessagingAttributesGetter();
16+
}
17+
18+
@Override
19+
public void onStart(final AttributesBuilder attributes, final Context parentContext, final SqsTrace SqsTrace) {
20+
21+
}
22+
23+
@Override
24+
public void onEnd(
25+
final AttributesBuilder attributes,
26+
final Context context,
27+
final SqsTrace SqsTrace,
28+
final Void unused,
29+
final Throwable error) {
30+
31+
}
32+
33+
public MessagingAttributesGetter<SqsTrace, Void> getMessagingAttributesGetter() {
34+
return messagingAttributesGetter;
35+
}
36+
37+
private static final class SqsMessagingAttributesGetter implements MessagingAttributesGetter<SqsTrace, Void> {
38+
@Override
39+
public String getSystem(final SqsTrace sqsTrace) {
40+
return "sqs";
41+
}
42+
43+
@Override
44+
public String getDestination(final SqsTrace sqsTrace) {
45+
return sqsTrace.getQueue();
46+
}
47+
48+
@Override
49+
public boolean isTemporaryDestination(final SqsTrace SqsTrace) {
50+
return false;
51+
}
52+
53+
@Override
54+
public String getConversationId(final SqsTrace SqsTrace) {
55+
return null;
56+
}
57+
58+
@Override
59+
public String getMessageId(final SqsTrace sqsTrace, final Void unused) {
60+
return sqsTrace.getMessageId();
61+
}
62+
63+
@Override
64+
public List<String> getMessageHeader(SqsTrace sqsTrace, String name) {
65+
return Collections.emptyList();
66+
}
67+
68+
@Override
69+
public String getDestinationTemplate(SqsTrace sqsTrace) {
70+
return null;
71+
}
72+
73+
@Override
74+
public boolean isAnonymousDestination(SqsTrace sqsTrace) {
75+
return false;
76+
}
77+
78+
@Override
79+
public Long getMessageBodySize(SqsTrace sqsTrace) {
80+
return null;
81+
}
82+
83+
@Override
84+
public Long getMessageEnvelopeSize(SqsTrace sqsTrace) {
85+
return null;
86+
}
87+
88+
@Override
89+
public String getClientId(SqsTrace sqsTrace) {
90+
return null;
91+
}
92+
93+
@Override
94+
public Long getBatchMessageCount(SqsTrace sqsTrace, Void unused) {
95+
return null;
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)