diff --git a/smallrye-reactive-messaging-aws-sqs/pom.xml b/smallrye-reactive-messaging-aws-sqs/pom.xml
index 08d4006f36..fb86283484 100644
--- a/smallrye-reactive-messaging-aws-sqs/pom.xml
+++ b/smallrye-reactive-messaging-aws-sqs/pom.xml
@@ -34,6 +34,11 @@
smallrye-reactive-messaging-provider
${project.version}
+
+ io.smallrye.reactive
+ smallrye-reactive-messaging-otel
+ ${project.version}
+
org.eclipse.microprofile.config
microprofile-config-api
@@ -77,6 +82,16 @@
${jackson.version}
test
+
+ io.opentelemetry
+ opentelemetry-sdk-trace
+ test
+
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
io.smallrye.reactive
smallrye-connector-attribute-processor
diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
index 7489cb66de..7bbe377b60 100644
--- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
+++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
@@ -22,6 +22,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
+import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
@@ -41,6 +42,7 @@
@ConnectorAttribute(name = "endpoint-override", type = "string", direction = INCOMING_AND_OUTGOING, description = "The endpoint override")
@ConnectorAttribute(name = "credentials-provider", type = "string", direction = INCOMING_AND_OUTGOING, description = "The credential provider to be used in the client")
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
+@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")
@ConnectorAttribute(name = "group.id", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages with the specified group id")
@ConnectorAttribute(name = "batch", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages in batches of maximum 10 messages", defaultValue = "false")
@@ -80,6 +82,9 @@ public class SqsConnector implements InboundConnector, OutboundConnector, Health
@Any
Instance failureHandlerFactories;
+ @Inject
+ Instance openTelemetryInstance;
+
Vertx vertx;
private final List inboundChannels = new CopyOnWriteArrayList<>();
@@ -106,8 +111,8 @@ public Publisher extends Message>> getPublisher(Config config) {
var conf = new SqsConnectorIncomingConfiguration(config);
var customizer = CDIUtils.getInstanceById(customizers, conf.getReceiveRequestCustomizer().orElse(conf.getChannel()),
() -> null);
- var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping, ackHandlerFactories,
- failureHandlerFactories);
+ var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping, openTelemetryInstance,
+ ackHandlerFactories, failureHandlerFactories);
inboundChannels.add(channel);
return channel.getStream();
}
@@ -115,7 +120,7 @@ public Publisher extends Message>> getPublisher(Config config) {
@Override
public Subscriber extends Message>> getSubscriber(Config config) {
var conf = new SqsConnectorOutgoingConfiguration(config);
- var channel = new SqsOutboundChannel(conf, sqsManager, jsonMapping);
+ var channel = new SqsOutboundChannel(conf, sqsManager, jsonMapping, openTelemetryInstance);
outboundChannels.add(channel);
return channel.getSubscriber();
}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java
index ac21cc4acd..8d013040b4 100644
--- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java
+++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java
@@ -15,9 +15,12 @@
import org.eclipse.microprofile.reactive.messaging.Message;
+import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsIgnoreAckHandler;
+import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsOpenTelemetryInstrumenter;
+import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
@@ -47,13 +50,18 @@ public class SqsInboundChannel {
private final boolean healthEnabled;
private final List messageAttributeNames;
private final Integer visibilityTimeout;
+ private final boolean tracingEnabled;
+ private final SqsOpenTelemetryInstrumenter sqsInstrumenter;
+ private volatile String queueUrl;
public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsManager sqsManager,
SqsReceiveMessageRequestCustomizer customizer, JsonMapping jsonMapper,
+ Instance openTelemetryInstance,
Instance ackHandlerFactories,
Instance failureHandlerFactories) {
this.channel = conf.getChannel();
this.healthEnabled = conf.getHealthEnabled();
+ this.tracingEnabled = conf.getTracingEnabled();
this.retries = conf.getReceiveRequestRetries();
this.client = sqsManager.getClient(conf);
this.queueUrlUni = sqsManager.getQueueUrl(conf).memoize().indefinitely();
@@ -65,7 +73,11 @@ public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, Sq
this.maxNumberOfMessages = conf.getMaxNumberOfMessages();
this.messageAttributeNames = getMessageAttributeNames(conf);
this.customizer = customizer;
-
+ if (tracingEnabled) {
+ sqsInstrumenter = SqsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
+ } else {
+ sqsInstrumenter = null;
+ }
SqsAckHandler ackHandler = createAckHandler(ackHandlerFactories, conf, vertx, client, queueUrlUni);
SqsFailureHandler failureHandler = createFailureHandler(failureHandlerFactories, conf, client, queueUrlUni);
PausablePollingStream, software.amazon.awssdk.services.sqs.model.Message> pollingStream = new PausablePollingStream<>(
@@ -77,15 +89,27 @@ channel, request(null, 0), (messages, processor) -> {
}
}, requestExecutor, maxNumberOfMessages * 2, conf.getReceiveRequestPauseResume());
this.stream = Multi.createFrom()
- .deferred(() -> queueUrlUni.onItem().transformToMulti(queueUrl -> pollingStream.getStream()))
+ .deferred(() -> queueUrlUni.onItem().invoke(this::setQueueUrl)
+ .onItem().transformToMulti(queueUrl -> pollingStream.getStream()))
.emitOn(context::runOnContext, conf.getMaxNumberOfMessages())
.onItem().transform(message -> new SqsMessage<>(message, jsonMapper, ackHandler, failureHandler))
+ .onItem().invoke(this::incomingTrace)
.onFailure().invoke(throwable -> {
log.errorReceivingMessage(channel, throwable);
reportFailure(throwable, false);
});
}
+ private void setQueueUrl(String queueUrl) {
+ this.queueUrl = queueUrl;
+ }
+
+ private void incomingTrace(SqsMessage> sqsMessage) {
+ if (tracingEnabled) {
+ sqsInstrumenter.traceIncoming(sqsMessage, new SqsTrace(this.queueUrl, sqsMessage.getMessage()));
+ }
+ }
+
private SqsFailureHandler createFailureHandler(Instance failureHandlerFactories,
SqsConnectorIncomingConfiguration conf,
SqsAsyncClient client, Uni queueUrlUni) {
@@ -107,6 +131,10 @@ private SqsAckHandler createAckHandler(Instance ackHandle
private List getMessageAttributeNames(SqsConnectorIncomingConfiguration conf) {
List names = new ArrayList<>();
names.add(SqsConnector.CLASS_NAME_ATTRIBUTE);
+ if (tracingEnabled) {
+ names.add("traceparent");
+ names.add("tracestate");
+ }
conf.getReceiveRequestMessageAttributeNames().ifPresent(s -> names.addAll(Arrays.asList(s.split(","))));
return names;
}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java
index 5298e0ec4c..b4ec37ba6f 100644
--- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java
+++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java
@@ -1,33 +1,28 @@
package io.smallrye.reactive.messaging.aws.sqs;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import jakarta.enterprise.inject.Instance;
+
import org.eclipse.microprofile.reactive.messaging.Message;
+import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.aws.sqs.i18n.AwsSqsLogging;
+import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsOpenTelemetryInstrumenter;
+import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
-import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
-import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
-import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
-import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+import software.amazon.awssdk.services.sqs.model.*;
public class SqsOutboundChannel {
@@ -43,10 +38,14 @@ public class SqsOutboundChannel {
private final boolean batch;
private final Duration batchDelay;
private final int batchSize;
+ private final boolean tracingEnabled;
+ private final SqsOpenTelemetryInstrumenter sqsInstrumenter;
- public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqsManager, JsonMapping jsonMapping) {
+ public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqsManager, JsonMapping jsonMapping,
+ Instance openTelemetryInstance) {
this.channel = conf.getChannel();
this.healthEnabled = conf.getHealthEnabled();
+ this.tracingEnabled = conf.getTracingEnabled();
this.client = sqsManager.getClient(conf);
this.batch = conf.getBatch();
this.batchSize = conf.getBatchSize();
@@ -69,6 +68,11 @@ public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqs
AwsSqsLogging.log.unableToDispatch(channel, f);
reportFailure(f);
}));
+ if (tracingEnabled) {
+ sqsInstrumenter = SqsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
+ } else {
+ sqsInstrumenter = null;
+ }
}
public Flow.Subscriber extends Message>> getSubscriber() {
@@ -178,15 +182,29 @@ private SendMessageBatchRequestEntry sendMessageBatchRequestEntry(String channel
private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message> m) {
Object payload = m.getPayload();
String queueUrl = channelQueueUrl;
- if (payload instanceof SendMessageRequest) {
- return (SendMessageRequest) payload;
+ if (payload instanceof SendMessageRequest request) {
+ if (tracingEnabled) {
+ SendMessageRequest.Builder builder = request.toBuilder();
+ Map mutableAttributes = new HashMap<>(request.messageAttributes());
+ outgoingTrace(m, mutableAttributes);
+ builder.messageAttributes(mutableAttributes);
+ return builder.build();
+ }
+ return request;
}
- if (payload instanceof SendMessageRequest.Builder) {
- SendMessageRequest.Builder builder = ((SendMessageRequest.Builder) payload)
- .queueUrl(queueUrl);
+ if (payload instanceof SendMessageRequest.Builder builder) {
+ builder.queueUrl(queueUrl);
if (groupId != null) {
builder.messageGroupId(groupId);
}
+ if (tracingEnabled) {
+ SendMessageRequest request = builder.build();
+ Map mutableAttributes = new HashMap<>(request.messageAttributes());
+ outgoingTrace(m, mutableAttributes);
+ return request.toBuilder()
+ .messageAttributes(mutableAttributes)
+ .build();
+ }
return builder.build();
}
SendMessageRequest.Builder builder = SendMessageRequest.builder();
@@ -216,6 +234,7 @@ private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message
if (msg.hasAttributes()) {
msgAttributes.putAll(msg.messageAttributes());
}
+ outgoingTrace(m, msgAttributes);
return builder
.queueUrl(queueUrl)
.messageGroupId(groupId)
@@ -224,6 +243,7 @@ private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message
.build();
}
String messageBody = outgoingPayloadClassName(payload, msgAttributes);
+ outgoingTrace(m, msgAttributes);
return builder
.queueUrl(queueUrl)
.messageGroupId(groupId)
@@ -258,6 +278,13 @@ private boolean isPrimitiveBoxed(Class> c) {
|| c.equals(Long.class);
}
+ private void outgoingTrace(Message> message, Map attributes) {
+ if (tracingEnabled) {
+ String indefinitely = queueUrlUni.await().indefinitely(); // memoized
+ sqsInstrumenter.traceOutgoing(message, new SqsTrace(indefinitely, attributes));
+ }
+ }
+
public void close() {
closed.set(true);
}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsAttributesExtractor.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsAttributesExtractor.java
new file mode 100644
index 0000000000..5af0e90f10
--- /dev/null
+++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsAttributesExtractor.java
@@ -0,0 +1,98 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import java.util.Collections;
+import java.util.List;
+
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
+import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
+
+public class SqsAttributesExtractor implements AttributesExtractor {
+ private final MessagingAttributesGetter messagingAttributesGetter;
+
+ public SqsAttributesExtractor() {
+ this.messagingAttributesGetter = new SqsMessagingAttributesGetter();
+ }
+
+ @Override
+ public void onStart(final AttributesBuilder attributes, final Context parentContext, final SqsTrace SqsTrace) {
+
+ }
+
+ @Override
+ public void onEnd(
+ final AttributesBuilder attributes,
+ final Context context,
+ final SqsTrace SqsTrace,
+ final Void unused,
+ final Throwable error) {
+
+ }
+
+ public MessagingAttributesGetter getMessagingAttributesGetter() {
+ return messagingAttributesGetter;
+ }
+
+ private static final class SqsMessagingAttributesGetter implements MessagingAttributesGetter {
+ @Override
+ public String getSystem(final SqsTrace sqsTrace) {
+ return "sqs";
+ }
+
+ @Override
+ public String getDestination(final SqsTrace sqsTrace) {
+ return sqsTrace.getQueue();
+ }
+
+ @Override
+ public boolean isTemporaryDestination(final SqsTrace SqsTrace) {
+ return false;
+ }
+
+ @Override
+ public String getConversationId(final SqsTrace SqsTrace) {
+ return null;
+ }
+
+ @Override
+ public String getMessageId(final SqsTrace sqsTrace, final Void unused) {
+ return sqsTrace.getMessageId();
+ }
+
+ @Override
+ public List getMessageHeader(SqsTrace sqsTrace, String name) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String getDestinationTemplate(SqsTrace sqsTrace) {
+ return null;
+ }
+
+ @Override
+ public boolean isAnonymousDestination(SqsTrace sqsTrace) {
+ return false;
+ }
+
+ @Override
+ public Long getMessageBodySize(SqsTrace sqsTrace) {
+ return null;
+ }
+
+ @Override
+ public Long getMessageEnvelopeSize(SqsTrace sqsTrace) {
+ return null;
+ }
+
+ @Override
+ public String getClientId(SqsTrace sqsTrace) {
+ return null;
+ }
+
+ @Override
+ public Long getBatchMessageCount(SqsTrace sqsTrace, Void unused) {
+ return null;
+ }
+ }
+}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsOpenTelemetryInstrumenter.java
new file mode 100644
index 0000000000..93089e1ecf
--- /dev/null
+++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsOpenTelemetryInstrumenter.java
@@ -0,0 +1,69 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import jakarta.enterprise.inject.Instance;
+
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
+import io.smallrye.reactive.messaging.tracing.TracingUtils;
+
+/**
+ * Encapsulates the OpenTelemetry instrumentation API so that those classes are only needed if
+ * users explicitly enable tracing.
+ */
+public class SqsOpenTelemetryInstrumenter {
+
+ private final Instrumenter instrumenter;
+
+ private SqsOpenTelemetryInstrumenter(Instrumenter instrumenter) {
+ this.instrumenter = instrumenter;
+ }
+
+ public static SqsOpenTelemetryInstrumenter createForSource(Instance openTelemetryInstance) {
+ return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true);
+ }
+
+ public static SqsOpenTelemetryInstrumenter createForSink(Instance openTelemetryInstance) {
+ return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false);
+ }
+
+ private static SqsOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean source) {
+
+ MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH;
+
+ SqsAttributesExtractor sqsAttributesExtractor = new SqsAttributesExtractor();
+ MessagingAttributesGetter messagingAttributesGetter = sqsAttributesExtractor
+ .getMessagingAttributesGetter();
+ InstrumenterBuilder builder = Instrumenter.builder(openTelemetry,
+ "io.smallrye.reactive.messaging",
+ MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation));
+
+ builder
+ .addAttributesExtractor(
+ MessagingAttributesExtractor.create(messagingAttributesGetter, messageOperation))
+ .addAttributesExtractor(sqsAttributesExtractor);
+
+ Instrumenter instrumenter;
+ if (source) {
+ instrumenter = builder.buildConsumerInstrumenter(SqsTraceTextMapGetter.INSTANCE);
+ } else {
+ instrumenter = builder.buildProducerInstrumenter(SqsTraceTextMapSetter.INSTANCE);
+ }
+
+ return new SqsOpenTelemetryInstrumenter(instrumenter);
+ }
+
+ public Message> traceIncoming(Message> message, SqsTrace sqsTrace) {
+ return TracingUtils.traceIncoming(instrumenter, message, sqsTrace);
+ }
+
+ public void traceOutgoing(Message> message, SqsTrace sqsTrace) {
+ TracingUtils.traceOutgoing(instrumenter, message, sqsTrace);
+ }
+}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTrace.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTrace.java
new file mode 100644
index 0000000000..b158880385
--- /dev/null
+++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTrace.java
@@ -0,0 +1,53 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+public class SqsTrace {
+ private final String queue;
+ private final String messageId;
+ private final Map attributes;
+
+ public SqsTrace(String queue, Message sqsMessage) {
+ this.queue = queue;
+ this.messageId = sqsMessage.messageId();
+ this.attributes = sqsMessage.hasMessageAttributes() ? sqsMessage.messageAttributes() : new HashMap<>();
+ }
+
+ public SqsTrace(String queue, Map attributes) {
+ this.queue = queue;
+ this.messageId = null;
+ this.attributes = attributes;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public List getPropertyNames() {
+ return new ArrayList<>(attributes.keySet());
+ }
+
+ public String getProperty(final String key) {
+ MessageAttributeValue attributeValue = attributes.get(key);
+ if (attributeValue == null) {
+ return null;
+ }
+ return attributeValue.stringValue();
+ }
+
+ public void setProperty(final String key, final String value) {
+ attributes.put(key, MessageAttributeValue.builder()
+ .dataType("String")
+ .stringValue(value).build());
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapGetter.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapGetter.java
new file mode 100644
index 0000000000..73b08fefc7
--- /dev/null
+++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapGetter.java
@@ -0,0 +1,20 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import io.opentelemetry.context.propagation.TextMapGetter;
+
+public enum SqsTraceTextMapGetter implements TextMapGetter {
+ INSTANCE;
+
+ @Override
+ public Iterable keys(final SqsTrace carrier) {
+ return carrier.getPropertyNames();
+ }
+
+ @Override
+ public String get(final SqsTrace carrier, final String key) {
+ if (carrier != null) {
+ return carrier.getProperty(key);
+ }
+ return null;
+ }
+}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapSetter.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapSetter.java
new file mode 100644
index 0000000000..c808731fc7
--- /dev/null
+++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapSetter.java
@@ -0,0 +1,14 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import io.opentelemetry.context.propagation.TextMapSetter;
+
+public enum SqsTraceTextMapSetter implements TextMapSetter {
+ INSTANCE;
+
+ @Override
+ public void set(final SqsTrace carrier, final String key, final String value) {
+ if (carrier != null) {
+ carrier.setProperty(key, value);
+ }
+ }
+}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/HeaderPropagationTest.java b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/HeaderPropagationTest.java
new file mode 100644
index 0000000000..e048d22627
--- /dev/null
+++ b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/HeaderPropagationTest.java
@@ -0,0 +1,120 @@
+package io.smallrye.reactive.messaging.aws.sqs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.Metadata;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.junit.jupiter.api.Test;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+public class HeaderPropagationTest extends SqsTestBase {
+
+ @Test
+ public void testFromAppToSqs() {
+ SqsClientProvider.client = getSqsClient();
+ addBeans(SqsClientProvider.class);
+ String queueUrl = createQueue(queue);
+ runApplication(new MapBasedConfig()
+ .with("mp.messaging.outgoing.sqs.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.outgoing.sqs.queue", queue)
+ .with("mp.messaging.outgoing.sqs.tracing-enabled", true), MyAppGeneratingData.class);
+
+ var messages = receiveMessages(queueUrl, r -> r.messageAttributeNames("prop"), 10, Duration.ofSeconds(10));
+
+ assertThat(messages).hasSize(10).allSatisfy(entry -> {
+ assertThat(entry.body()).isNotNull();
+ assertThat(entry.messageAttributes().get("prop").stringValue()).isEqualTo("bar");
+ });
+ }
+
+ @Test
+ public void testFromSqsToAppToSqs() {
+ SqsClientProvider.client = getSqsClient();
+ addBeans(SqsClientProvider.class);
+ String queueUrl = createQueue(queue);
+ String resultQueueUrl = createQueue(queue + "-result");
+ runApplication(new MapBasedConfig()
+ .with("mp.messaging.incoming.source.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.incoming.source.queue", queue)
+ .with("mp.messaging.incoming.source.tracing-enabled", true)
+ .with("mp.messaging.outgoing.sqs.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.outgoing.sqs.queue", queue + "-result")
+ .with("mp.messaging.outgoing.sqs.tracing-enabled", true), MyAppProcessingData.class);
+
+ sendMessage(queueUrl, 10, (i, r) -> {
+ Map attributes = new HashMap<>();
+ attributes.put("_classname", MessageAttributeValue.builder()
+ .stringValue("java.lang.Integer").dataType("String").build());
+ r.messageAttributes(attributes)
+ .messageBody(Integer.toString(i));
+ });
+
+ var messages = receiveMessages(resultQueueUrl, r -> r.messageAttributeNames("prop"), 10, Duration.ofSeconds(10));
+
+ assertThat(messages).hasSize(10).allSatisfy(entry -> {
+ assertThat(entry.body()).isNotNull();
+ assertThat(entry.messageAttributes().get("prop").stringValue()).isEqualTo("bar");
+ });
+ }
+
+ @ApplicationScoped
+ public static class MyAppGeneratingData {
+
+ @Outgoing("source")
+ public Multi source() {
+ return Multi.createFrom().range(0, 10);
+ }
+
+ @Incoming("source")
+ @Outgoing("p1")
+ public Message processMessage(Message input) {
+ HashMap attributes = new HashMap<>();
+ attributes.put("prop", MessageAttributeValue.builder()
+ .stringValue("bar").dataType("String").build());
+ return Message.of(input.getPayload())
+ .withMetadata(Metadata.of(SqsOutboundMetadata.builder()
+ .messageAttributes(attributes)
+ .build()));
+ }
+
+ @Incoming("p1")
+ @Outgoing("sqs")
+ public String processPayload(int payload) {
+ return Integer.toString(payload);
+ }
+ }
+
+ @ApplicationScoped
+ public static class MyAppProcessingData {
+
+ @Incoming("source")
+ @Outgoing("p1")
+ public Message processMessage(Message input) {
+ HashMap attributes = new HashMap<>();
+ attributes.put("prop", MessageAttributeValue.builder()
+ .stringValue("bar").dataType("String").build());
+ return Message.of(input.getPayload())
+ .withMetadata(Metadata.of(SqsOutboundMetadata.builder()
+ .messageAttributes(attributes)
+ .build()));
+ }
+
+ @Incoming("p1")
+ @Outgoing("sqs")
+ public String processPayload(int payload) {
+ return Integer.toString(payload);
+ }
+ }
+
+}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/tracing/MessagePropertiesExtractAdapterTest.java b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/tracing/MessagePropertiesExtractAdapterTest.java
new file mode 100644
index 0000000000..f6e1388925
--- /dev/null
+++ b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/tracing/MessagePropertiesExtractAdapterTest.java
@@ -0,0 +1,26 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+class MessagePropertiesExtractAdapterTest {
+ @Test
+ public void verifyNullHeaderHandled() {
+ Map messageProperties = new HashMap<>();
+ messageProperties.put("test_null_header", null);
+
+ SqsTrace sqsTrace = new SqsTrace("", messageProperties);
+
+ String headerValue = SqsTraceTextMapGetter.INSTANCE.get(sqsTrace, "test_null_header");
+ SqsTraceTextMapSetter.INSTANCE.set(sqsTrace, "test_other_header", "value");
+
+ assertThat(headerValue).isNull();
+ assertThat(messageProperties.get("test_other_header").stringValue()).isEqualTo("value");
+ }
+}
diff --git a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/tracing/TracingPropagationTest.java
new file mode 100644
index 0000000000..4e0c5d2fe5
--- /dev/null
+++ b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/tracing/TracingPropagationTest.java
@@ -0,0 +1,324 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.*;
+import static java.util.stream.Collectors.toList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanId;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.SpanProcessor;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import io.opentelemetry.sdk.trace.samplers.Sampler;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.reactive.messaging.aws.sqs.SqsClientProvider;
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnector;
+import io.smallrye.reactive.messaging.aws.sqs.SqsIncomingMetadata;
+import io.smallrye.reactive.messaging.aws.sqs.SqsTestBase;
+import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+public class TracingPropagationTest extends SqsTestBase {
+ private SdkTracerProvider tracerProvider;
+ private InMemorySpanExporter spanExporter;
+
+ @BeforeEach
+ public void setup() {
+ GlobalOpenTelemetry.resetForTest();
+
+ spanExporter = InMemorySpanExporter.create();
+ SpanProcessor spanProcessor = SimpleSpanProcessor.create(spanExporter);
+
+ tracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(spanProcessor)
+ .setSampler(Sampler.alwaysOn())
+ .build();
+
+ OpenTelemetrySdk.builder()
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .setTracerProvider(tracerProvider)
+ .buildAndRegisterGlobal();
+ }
+
+ @AfterAll
+ static void shutdown() {
+ GlobalOpenTelemetry.resetForTest();
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ @Test
+ public void testFromAppToSqs() {
+ SqsClientProvider.client = getSqsClient();
+ addBeans(SqsClientProvider.class, ProducerApp.class);
+ String queueUrl = createQueue(queue);
+ MyAppReceivingData bean = runApplication(new MapBasedConfig()
+ .with("mp.messaging.outgoing.sink.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.outgoing.sink.queue", queue)
+ .with("mp.messaging.outgoing.sink.tracing-enabled", true)
+ .with("mp.messaging.incoming.data.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.incoming.data.queue", queue)
+ .with("mp.messaging.incoming.data.tracing-enabled", true),
+ MyAppReceivingData.class);
+
+ await().until(() -> bean.results().size() >= 10);
+ assertThat(bean.results())
+ .extracting(m -> m.getMetadata(SqsIncomingMetadata.class).get())
+ .allSatisfy(m -> assertThat(m.getMessage().messageAttributes().get("traceparent")).isNotNull())
+ .extracting(m -> Integer.parseInt(m.getMessage().body()))
+ .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ CompletableResultCode completableResultCode = tracerProvider.forceFlush();
+ completableResultCode.join(10, TimeUnit.SECONDS);
+ List spans = spanExporter.getFinishedSpanItems();
+ assertEquals(20, spans.size());
+
+ assertEquals(10, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());
+
+ SpanData span = spans.get(0);
+ assertEquals(SpanKind.PRODUCER, span.getKind());
+ assertEquals(3, span.getAttributes().size());
+ assertEquals("sqs", span.getAttributes().get(MESSAGING_SYSTEM));
+ assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION));
+ assertEquals(queueUrl, span.getAttributes().get(MESSAGING_DESTINATION_NAME));
+ assertEquals(queueUrl + " publish", span.getName());
+ }
+
+ @Test
+ public void testFromSqsToAppToSqs() {
+ SqsClientProvider.client = getSqsClient();
+ addBeans(SqsClientProvider.class);
+ String inQueueUrl = createQueue(queue);
+ String resultQueue = queue + "-result";
+ String resultQueueUrl = createQueue(resultQueue);
+ MyAppProcessingData myAppProcessingData = runApplication(new MapBasedConfig()
+ .with("mp.messaging.incoming.source.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.incoming.source.queue", queue)
+ .with("mp.messaging.incoming.source.tracing-enabled", true)
+ .with("mp.messaging.outgoing.sqs.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.outgoing.sqs.queue", resultQueue)
+ .with("mp.messaging.outgoing.sqs.tracing-enabled", true), MyAppProcessingData.class);
+
+ sendMessage(inQueueUrl, 10, (i, r) -> r.messageBody(String.valueOf(i))
+ .messageAttributes(Map.of(SqsConnector.CLASS_NAME_ATTRIBUTE, MessageAttributeValue.builder()
+ .dataType("String").stringValue(Integer.class.getName()).build())));
+
+ var messages = receiveMessages(resultQueueUrl, 10, Duration.ofSeconds(10));
+ assertThat(messages)
+ .extracting(m -> Integer.valueOf(m.body()))
+ .containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ CompletableResultCode completableResultCode = tracerProvider.forceFlush();
+ completableResultCode.join(10, TimeUnit.SECONDS);
+ List spans = spanExporter.getFinishedSpanItems();
+ assertEquals(20, spans.size());
+
+ List parentSpans = spans.stream()
+ .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid()))
+ .collect(toList());
+ assertEquals(10, parentSpans.size());
+
+ SpanData consumer = parentSpans.get(0);
+ assertEquals(SpanKind.CONSUMER, consumer.getKind());
+ assertEquals(4, consumer.getAttributes().size());
+ assertEquals("sqs", consumer.getAttributes().get(MESSAGING_SYSTEM));
+ assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));
+ assertEquals(inQueueUrl, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
+ assertEquals(inQueueUrl + " receive", consumer.getName());
+
+ for (SpanData span : spans) {
+ System.out.println(span.getKind() + " " + span.getSpanId() + " -> " + span.getParentSpanId());
+ }
+ SpanData producerSpan = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(consumer.getSpanId()))
+ .findFirst().get();
+ assertEquals(SpanKind.PRODUCER, producerSpan.getKind());
+ assertEquals(3, producerSpan.getAttributes().size());
+ assertEquals("sqs", producerSpan.getAttributes().get(MESSAGING_SYSTEM));
+ assertEquals("publish", producerSpan.getAttributes().get(MESSAGING_OPERATION));
+ assertEquals(resultQueueUrl, producerSpan.getAttributes().get(MESSAGING_DESTINATION_NAME));
+ assertEquals(resultQueueUrl + " publish", producerSpan.getName());
+ }
+
+ @Test
+ public void testFromSqsToAppWithParentSpan() {
+ SqsClientProvider.client = getSqsClient();
+ addBeans(SqsClientProvider.class);
+ String queueUrl = createQueue(queue);
+ MapBasedConfig config = new MapBasedConfig()
+ .with("mp.messaging.incoming.data.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.incoming.data.queue", queue);
+ MyAppReceivingPayload bean = runApplication(config, MyAppReceivingPayload.class);
+
+ Map properties = new HashMap<>();
+ try (Scope ignored = Context.current().makeCurrent()) {
+ Tracer tracer = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging");
+ Span span = tracer.spanBuilder("producer").setSpanKind(SpanKind.PRODUCER).startSpan();
+ Context current = Context.current().with(span);
+ GlobalOpenTelemetry.getPropagators()
+ .getTextMapPropagator()
+ .inject(current, properties, (carrier, key, value) -> carrier.put(key, value));
+ span.end();
+ }
+
+ sendMessage(queueUrl, 10, (i, r) -> {
+ HashMap attributes = new HashMap<>();
+ attributes.put("_classname", MessageAttributeValue.builder()
+ .stringValue("java.lang.Integer").dataType("String").build());
+ properties.forEach((k, v) -> attributes.put(k, MessageAttributeValue.builder()
+ .stringValue(v).dataType("String").build()));
+ r.messageAttributes(attributes)
+ .messageBody(String.valueOf(i));
+ });
+
+ await().until(() -> bean.list().size() >= 10);
+
+ assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ CompletableResultCode completableResultCode = tracerProvider.forceFlush();
+ completableResultCode.join(10, TimeUnit.SECONDS);
+ // 1 Parent, 10 Children
+ List spans = spanExporter.getFinishedSpanItems();
+ assertEquals(11, spans.size());
+
+ // All should use the same Trace
+ assertEquals(1, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());
+
+ List parentSpans = spans.stream()
+ .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())).collect(toList());
+ assertEquals(1, parentSpans.size());
+
+ for (SpanData parentSpan : parentSpans) {
+ assertEquals(10,
+ spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count());
+ }
+
+ SpanData producerSpan = parentSpans.get(0);
+ assertEquals(SpanKind.PRODUCER, producerSpan.getKind());
+
+ SpanData consumer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(producerSpan.getSpanId()))
+ .findFirst().get();
+ assertEquals(4, consumer.getAttributes().size());
+ assertEquals("sqs", consumer.getAttributes().get(MESSAGING_SYSTEM));
+ assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));
+ assertEquals(queueUrl, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
+ assertEquals(queueUrl + " receive", consumer.getName());
+
+ }
+
+ @Test
+ public void testFromSqsToAppWithNoParent() {
+ SqsClientProvider.client = getSqsClient();
+ addBeans(SqsClientProvider.class);
+ String queueUrl = createQueue(queue);
+ MyAppReceivingPayload bean = runApplication(new MapBasedConfig()
+ .with("mp.messaging.incoming.data.connector", SqsConnector.CONNECTOR_NAME)
+ .with("mp.messaging.incoming.data.queue", queue)
+ .with("mp.messaging.incoming.data.tracing-enabled", true), MyAppReceivingPayload.class);
+
+ sendMessage(queueUrl, 10, (i, r) -> {
+ Map attributes = new HashMap<>();
+ attributes.put("_classname", MessageAttributeValue.builder()
+ .stringValue("java.lang.Integer").dataType("String").build());
+ r.messageAttributes(attributes)
+ .messageBody(Integer.toString(i));
+ });
+
+ await().until(() -> bean.list().size() >= 10);
+
+ CompletableResultCode completableResultCode = tracerProvider.forceFlush();
+ completableResultCode.join(10, TimeUnit.SECONDS);
+ List spans = spanExporter.getFinishedSpanItems();
+ assertThat(spans).hasSize(10).allSatisfy(span -> {
+ assertThat(span.getKind()).isEqualTo(SpanKind.CONSUMER);
+ assertThat(span.getSpanId()).isNotEqualTo(SpanId.getInvalid());
+ assertThat(span.getParentSpanId()).isEqualTo(SpanId.getInvalid());
+ });
+ }
+
+ @ApplicationScoped
+ public static class MyAppProcessingData {
+
+ private final List list = new CopyOnWriteArrayList<>();
+
+ @Incoming("source")
+ @Outgoing("sqs")
+ public Message processMessage(Message input) {
+ list.add(input.getPayload());
+ return input.withPayload(input.getPayload() + 1);
+ }
+
+ public List list() {
+ return list;
+ }
+ }
+
+ @ApplicationScoped
+ public static class MyAppReceivingData {
+ private final List> results = new CopyOnWriteArrayList<>();
+
+ @Incoming("data")
+ public CompletionStage consume(Message input) {
+ results.add(input);
+ return input.ack();
+ }
+
+ public List> results() {
+ return results;
+ }
+ }
+
+ @ApplicationScoped
+ public static class MyAppReceivingPayload {
+ private final List results = new CopyOnWriteArrayList<>();
+
+ @Incoming("data")
+ public void consume(Integer input) {
+ results.add(input);
+ }
+
+ public List list() {
+ return results;
+ }
+ }
+
+ @ApplicationScoped
+ public static class ProducerApp {
+
+ @Outgoing("sink")
+ Multi produce() {
+ return Multi.createFrom().range(0, 10);
+ }
+
+ }
+
+}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java
index 0392d5446e..a0a7f7a190 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java
@@ -730,7 +730,8 @@ public void testTargetedWithTombstoneRecords() {
.containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
assertThat(consumed1.getRecords())
.extracting(ConsumerRecord::value)
- .containsExactly("value-0", "value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7", "value-8", "value-9");
+ .containsExactly("value-0", "value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7",
+ "value-8", "value-9");
// Verify topic2 receives tombstone records (null values)
assertThat(consumed2.getRecords())