Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions smallrye-reactive-messaging-aws-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.config</groupId>
<artifactId>microprofile-config-api</artifactId>
Expand Down Expand Up @@ -77,6 +82,16 @@
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-connector-attribute-processor</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
Expand All @@ -41,6 +42,7 @@
@ConnectorAttribute(name = "endpoint-override", type = "string", direction = INCOMING_AND_OUTGOING, description = "The endpoint override")
@ConnectorAttribute(name = "credentials-provider", type = "string", direction = INCOMING_AND_OUTGOING, description = "The credential provider to be used in the client")
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")

@ConnectorAttribute(name = "group.id", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages with the specified group id")
@ConnectorAttribute(name = "batch", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages in batches of maximum 10 messages", defaultValue = "false")
Expand Down Expand Up @@ -80,6 +82,9 @@ public class SqsConnector implements InboundConnector, OutboundConnector, Health
@Any
Instance<SqsFailureHandler.Factory> failureHandlerFactories;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

Vertx vertx;

private final List<SqsInboundChannel> inboundChannels = new CopyOnWriteArrayList<>();
Expand All @@ -106,16 +111,16 @@ public Publisher<? extends Message<?>> getPublisher(Config config) {
var conf = new SqsConnectorIncomingConfiguration(config);
var customizer = CDIUtils.getInstanceById(customizers, conf.getReceiveRequestCustomizer().orElse(conf.getChannel()),
() -> null);
var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping, ackHandlerFactories,
failureHandlerFactories);
var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping, openTelemetryInstance,
ackHandlerFactories, failureHandlerFactories);
inboundChannels.add(channel);
return channel.getStream();
}

@Override
public Subscriber<? extends Message<?>> getSubscriber(Config config) {
var conf = new SqsConnectorOutgoingConfiguration(config);
var channel = new SqsOutboundChannel(conf, sqsManager, jsonMapping);
var channel = new SqsOutboundChannel(conf, sqsManager, jsonMapping, openTelemetryInstance);
outboundChannels.add(channel);
return channel.getSubscriber();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsIgnoreAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
Expand Down Expand Up @@ -47,13 +50,18 @@ public class SqsInboundChannel {
private final boolean healthEnabled;
private final List<String> messageAttributeNames;
private final Integer visibilityTimeout;
private final boolean tracingEnabled;
private final SqsOpenTelemetryInstrumenter sqsInstrumenter;
private volatile String queueUrl;

public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsManager sqsManager,
SqsReceiveMessageRequestCustomizer customizer, JsonMapping jsonMapper,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<SqsAckHandler.Factory> ackHandlerFactories,
Instance<SqsFailureHandler.Factory> failureHandlerFactories) {
this.channel = conf.getChannel();
this.healthEnabled = conf.getHealthEnabled();
this.tracingEnabled = conf.getTracingEnabled();
this.retries = conf.getReceiveRequestRetries();
this.client = sqsManager.getClient(conf);
this.queueUrlUni = sqsManager.getQueueUrl(conf).memoize().indefinitely();
Expand All @@ -65,7 +73,11 @@ public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, Sq
this.maxNumberOfMessages = conf.getMaxNumberOfMessages();
this.messageAttributeNames = getMessageAttributeNames(conf);
this.customizer = customizer;

if (tracingEnabled) {
sqsInstrumenter = SqsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
} else {
sqsInstrumenter = null;
}
SqsAckHandler ackHandler = createAckHandler(ackHandlerFactories, conf, vertx, client, queueUrlUni);
SqsFailureHandler failureHandler = createFailureHandler(failureHandlerFactories, conf, client, queueUrlUni);
PausablePollingStream<List<software.amazon.awssdk.services.sqs.model.Message>, software.amazon.awssdk.services.sqs.model.Message> pollingStream = new PausablePollingStream<>(
Expand All @@ -77,15 +89,27 @@ channel, request(null, 0), (messages, processor) -> {
}
}, requestExecutor, maxNumberOfMessages * 2, conf.getReceiveRequestPauseResume());
this.stream = Multi.createFrom()
.deferred(() -> queueUrlUni.onItem().transformToMulti(queueUrl -> pollingStream.getStream()))
.deferred(() -> queueUrlUni.onItem().invoke(this::setQueueUrl)
.onItem().transformToMulti(queueUrl -> pollingStream.getStream()))
.emitOn(context::runOnContext, conf.getMaxNumberOfMessages())
.onItem().transform(message -> new SqsMessage<>(message, jsonMapper, ackHandler, failureHandler))
.onItem().invoke(this::incomingTrace)
.onFailure().invoke(throwable -> {
log.errorReceivingMessage(channel, throwable);
reportFailure(throwable, false);
});
}

private void setQueueUrl(String queueUrl) {
this.queueUrl = queueUrl;
}

private void incomingTrace(SqsMessage<?> sqsMessage) {
if (tracingEnabled) {
sqsInstrumenter.traceIncoming(sqsMessage, new SqsTrace(this.queueUrl, sqsMessage.getMessage()));
}
}

private SqsFailureHandler createFailureHandler(Instance<SqsFailureHandler.Factory> failureHandlerFactories,
SqsConnectorIncomingConfiguration conf,
SqsAsyncClient client, Uni<String> queueUrlUni) {
Expand All @@ -107,6 +131,10 @@ private SqsAckHandler createAckHandler(Instance<SqsAckHandler.Factory> ackHandle
private List<String> getMessageAttributeNames(SqsConnectorIncomingConfiguration conf) {
List<String> names = new ArrayList<>();
names.add(SqsConnector.CLASS_NAME_ATTRIBUTE);
if (tracingEnabled) {
names.add("traceparent");
names.add("tracestate");
}
conf.getReceiveRequestMessageAttributeNames().ifPresent(s -> names.addAll(Arrays.asList(s.split(","))));
return names;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,28 @@
package io.smallrye.reactive.messaging.aws.sqs;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.aws.sqs.i18n.AwsSqsLogging;
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import software.amazon.awssdk.services.sqs.model.*;

public class SqsOutboundChannel {

Expand All @@ -43,10 +38,14 @@ public class SqsOutboundChannel {
private final boolean batch;
private final Duration batchDelay;
private final int batchSize;
private final boolean tracingEnabled;
private final SqsOpenTelemetryInstrumenter sqsInstrumenter;

public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqsManager, JsonMapping jsonMapping) {
public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqsManager, JsonMapping jsonMapping,
Instance<OpenTelemetry> openTelemetryInstance) {
this.channel = conf.getChannel();
this.healthEnabled = conf.getHealthEnabled();
this.tracingEnabled = conf.getTracingEnabled();
this.client = sqsManager.getClient(conf);
this.batch = conf.getBatch();
this.batchSize = conf.getBatchSize();
Expand All @@ -69,6 +68,11 @@ public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqs
AwsSqsLogging.log.unableToDispatch(channel, f);
reportFailure(f);
}));
if (tracingEnabled) {
sqsInstrumenter = SqsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
} else {
sqsInstrumenter = null;
}
}

public Flow.Subscriber<? extends Message<?>> getSubscriber() {
Expand Down Expand Up @@ -178,15 +182,29 @@ private SendMessageBatchRequestEntry sendMessageBatchRequestEntry(String channel
private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message<?> m) {
Object payload = m.getPayload();
String queueUrl = channelQueueUrl;
if (payload instanceof SendMessageRequest) {
return (SendMessageRequest) payload;
if (payload instanceof SendMessageRequest request) {
if (tracingEnabled) {
SendMessageRequest.Builder builder = request.toBuilder();
Map<String, MessageAttributeValue> mutableAttributes = new HashMap<>(request.messageAttributes());
outgoingTrace(m, mutableAttributes);
builder.messageAttributes(mutableAttributes);
return builder.build();
}
return request;
}
if (payload instanceof SendMessageRequest.Builder) {
SendMessageRequest.Builder builder = ((SendMessageRequest.Builder) payload)
.queueUrl(queueUrl);
if (payload instanceof SendMessageRequest.Builder builder) {
builder.queueUrl(queueUrl);
if (groupId != null) {
builder.messageGroupId(groupId);
}
if (tracingEnabled) {
SendMessageRequest request = builder.build();
Map<String, MessageAttributeValue> mutableAttributes = new HashMap<>(request.messageAttributes());
outgoingTrace(m, mutableAttributes);
return request.toBuilder()
.messageAttributes(mutableAttributes)
.build();
}
return builder.build();
}
SendMessageRequest.Builder builder = SendMessageRequest.builder();
Expand Down Expand Up @@ -216,6 +234,7 @@ private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message
if (msg.hasAttributes()) {
msgAttributes.putAll(msg.messageAttributes());
}
outgoingTrace(m, msgAttributes);
return builder
.queueUrl(queueUrl)
.messageGroupId(groupId)
Expand All @@ -224,6 +243,7 @@ private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message
.build();
}
String messageBody = outgoingPayloadClassName(payload, msgAttributes);
outgoingTrace(m, msgAttributes);
return builder
.queueUrl(queueUrl)
.messageGroupId(groupId)
Expand Down Expand Up @@ -258,6 +278,13 @@ private boolean isPrimitiveBoxed(Class<?> c) {
|| c.equals(Long.class);
}

private void outgoingTrace(Message<?> message, Map<String, MessageAttributeValue> attributes) {
if (tracingEnabled) {
String indefinitely = queueUrlUni.await().indefinitely(); // memoized
sqsInstrumenter.traceOutgoing(message, new SqsTrace(indefinitely, attributes));
}
}

public void close() {
closed.set(true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.smallrye.reactive.messaging.aws.sqs.tracing;

import java.util.Collections;
import java.util.List;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;

public class SqsAttributesExtractor implements AttributesExtractor<SqsTrace, Void> {
private final MessagingAttributesGetter<SqsTrace, Void> messagingAttributesGetter;

public SqsAttributesExtractor() {
this.messagingAttributesGetter = new SqsMessagingAttributesGetter();
}

@Override
public void onStart(final AttributesBuilder attributes, final Context parentContext, final SqsTrace SqsTrace) {

}

@Override
public void onEnd(
final AttributesBuilder attributes,
final Context context,
final SqsTrace SqsTrace,
final Void unused,
final Throwable error) {

}

public MessagingAttributesGetter<SqsTrace, Void> getMessagingAttributesGetter() {
return messagingAttributesGetter;
}

private static final class SqsMessagingAttributesGetter implements MessagingAttributesGetter<SqsTrace, Void> {
@Override
public String getSystem(final SqsTrace sqsTrace) {
return "sqs";
}

@Override
public String getDestination(final SqsTrace sqsTrace) {
return sqsTrace.getQueue();
}

@Override
public boolean isTemporaryDestination(final SqsTrace SqsTrace) {
return false;
}

@Override
public String getConversationId(final SqsTrace SqsTrace) {
return null;
}

@Override
public String getMessageId(final SqsTrace sqsTrace, final Void unused) {
return sqsTrace.getMessageId();
}

@Override
public List<String> getMessageHeader(SqsTrace sqsTrace, String name) {
return Collections.emptyList();
}

@Override
public String getDestinationTemplate(SqsTrace sqsTrace) {
return null;
}

@Override
public boolean isAnonymousDestination(SqsTrace sqsTrace) {
return false;
}

@Override
public Long getMessageBodySize(SqsTrace sqsTrace) {
return null;
}

@Override
public Long getMessageEnvelopeSize(SqsTrace sqsTrace) {
return null;
}

@Override
public String getClientId(SqsTrace sqsTrace) {
return null;
}

@Override
public Long getBatchMessageCount(SqsTrace sqsTrace, Void unused) {
return null;
}
}
}
Loading