Skip to content

Commit 383d16e

Browse files
Batch SnsNotification Support (#1191)
* Add SQS module @SnsNotificationMessage batch messages support * Polishing Fixes #1129 --------- Co-authored-by: maxjiang153 <[email protected]>
1 parent 4a90b73 commit 383d16e

File tree

4 files changed

+88
-11
lines changed

4 files changed

+88
-11
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,9 @@ NOTE: Queues declared in the same annotation will share the container, though ea
618618
===== SNS Messages
619619

620620
Since 3.1.1, when receiving SNS messages through the `@SqsListener`, the message includes all attributes of the `SnsNotification`. To only receive need the `Message` part of the payload, you can utilize the `@SnsNotificationMessage` annotation.
621+
622+
For handling individual message processing, the @SnsNotificationMessage annotation can be used in the following manner:
623+
621624
[source, java]
622625
----
623626
@SqsListener("my-queue")
@@ -626,6 +629,16 @@ public void listen(@SnsNotificationMessage Pojo pojo) {
626629
}
627630
----
628631

632+
For batch message processing, use the @SnsNotificationMessage annotation with a List<Pojo> parameter.
633+
634+
[source, java]
635+
----
636+
@SqsListener("my-queue")
637+
public void listen(@SnsNotificationMessage List<Pojo> pojos) {
638+
System.out.println(pojos.size());
639+
}
640+
----
641+
629642
===== Specifying a MessageListenerContainerFactory
630643
A `MessageListenerContainerFactory` can be specified through the `factory` property.
631644
Such factory will then be used to create the container for the annotated method.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SnsMessageConverter.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import java.lang.reflect.ParameterizedType;
21+
import java.lang.reflect.Type;
22+
import java.util.List;
23+
import org.springframework.core.GenericTypeResolver;
24+
import org.springframework.core.MethodParameter;
25+
import org.springframework.core.ResolvableType;
2026
import org.springframework.lang.Nullable;
2127
import org.springframework.messaging.Message;
2228
import org.springframework.messaging.MessageHeaders;
@@ -29,6 +35,7 @@
2935
/**
3036
* @author Michael Sosa
3137
* @author gustavomonarin
38+
* @author Wei Jiang
3239
* @since 3.1.1
3340
*/
3441
public class SnsMessageConverter implements SmartMessageConverter {
@@ -45,10 +52,31 @@ public SnsMessageConverter(MessageConverter payloadConverter, ObjectMapper jsonM
4552
}
4653

4754
@Override
55+
@SuppressWarnings("unchecked")
4856
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
4957
Assert.notNull(message, "message must not be null");
5058
Assert.notNull(targetClass, "target class must not be null");
5159

60+
Object payload = message.getPayload();
61+
62+
if (payload instanceof List messages) {
63+
return fromGenericMessages(messages, targetClass, conversionHint);
64+
}
65+
else {
66+
return fromGenericMessage((GenericMessage<?>) message, targetClass, conversionHint);
67+
}
68+
}
69+
70+
private Object fromGenericMessages(List<GenericMessage<?>> messages, Class<?> targetClass,
71+
@Nullable Object conversionHint) {
72+
Type resolvedType = getResolvedType(targetClass, conversionHint);
73+
Class<?> resolvedClazz = ResolvableType.forType(resolvedType).resolve();
74+
75+
return messages.stream().map(message -> fromGenericMessage(message, resolvedClazz, resolvedType)).toList();
76+
}
77+
78+
private Object fromGenericMessage(GenericMessage<?> message, Class<?> targetClass,
79+
@Nullable Object conversionHint) {
5280
JsonNode jsonNode;
5381
try {
5482
jsonNode = this.jsonMapper.readTree(message.getPayload().toString());
@@ -77,7 +105,7 @@ public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Ob
77105
? ((SmartMessageConverter) this.payloadConverter).fromMessage(genericMessage, targetClass,
78106
conversionHint)
79107
: this.payloadConverter.fromMessage(genericMessage, targetClass);
80-
return new SnsMessageWrapper(jsonNode.path("Subject").asText(), convertedMessage);
108+
return convertedMessage;
81109
}
82110

83111
@Override
@@ -97,10 +125,23 @@ public Message<?> toMessage(Object payload, MessageHeaders headers, Object conve
97125
"This converter only supports reading a SNS notification and not writing them");
98126
}
99127

100-
/**
101-
* SNS Message wrapper.
102-
*/
103-
public record SnsMessageWrapper(String subject, Object message) {
128+
private static Type getResolvedType(Class<?> targetClass, @Nullable Object conversionHint) {
129+
if (conversionHint instanceof MethodParameter param) {
130+
param = param.nestedIfOptional();
131+
if (Message.class.isAssignableFrom(param.getParameterType())) {
132+
param = param.nested();
133+
}
134+
Type genericParameterType = param.getNestedGenericParameterType();
135+
Class<?> contextClass = param.getContainingClass();
136+
Type resolveType = GenericTypeResolver.resolveType(genericParameterType, contextClass);
137+
if (resolveType instanceof ParameterizedType parameterizedType) {
138+
return parameterizedType.getActualTypeArguments()[0];
139+
}
140+
else {
141+
return resolveType;
142+
}
143+
}
144+
return targetClass;
104145
}
105146

106147
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/NotificationMessageArgumentResolver.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
import org.springframework.messaging.converter.MessageConverter;
2424
import org.springframework.messaging.converter.SmartMessageConverter;
2525
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
26-
import org.springframework.util.Assert;
2726

2827
/**
2928
* @author Michael Sosa
3029
* @author gustavomonarin
30+
* @author Wei Jiang
3131
* @since 3.1.1
3232
*/
3333
public class NotificationMessageArgumentResolver implements HandlerMethodArgumentResolver {
@@ -45,10 +45,7 @@ public boolean supportsParameter(MethodParameter parameter) {
4545

4646
@Override
4747
public Object resolveArgument(MethodParameter par, Message<?> msg) {
48-
Object object = this.converter.fromMessage(msg, par.getParameterType(), par);
49-
Assert.isInstanceOf(SnsMessageConverter.SnsMessageWrapper.class, object);
50-
SnsMessageConverter.SnsMessageWrapper nr = (SnsMessageConverter.SnsMessageWrapper) object;
51-
return nr.message();
48+
return this.converter.fromMessage(msg, par.getParameterType(), par);
5249
}
5350

5451
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageConversionIntegrationTests.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.concurrent.CompletableFuture;
3535
import java.util.concurrent.CountDownLatch;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.stream.IntStream;
38+
3739
import org.junit.jupiter.api.BeforeAll;
3840
import org.junit.jupiter.api.Test;
3941
import org.slf4j.Logger;
@@ -56,6 +58,7 @@
5658
*
5759
* @author Tomaz Fernandes
5860
* @author Mikhail Strokov
61+
* @author Wei Jiang
5962
*/
6063
@SpringBootTest
6164
class SqsMessageConversionIntegrationTests extends BaseSqsIntegrationTest {
@@ -69,6 +72,7 @@ class SqsMessageConversionIntegrationTests extends BaseSqsIntegrationTest {
6972
static final String RESOLVES_POJO_FROM_HEADER_QUEUE_NAME = "resolves_pojo_from_mapping_test_queue";
7073
static final String RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME = "resolves_my_other_pojo_from_mapping_test_queue";
7174
static final String RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME = "resolves_pojo_from_notification_message_queue";
75+
static final String RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME = "resolves_pojo_from_notification_message_list_test_queue";
7276

7377
@Autowired
7478
LatchContainer latchContainer;
@@ -85,7 +89,8 @@ static void beforeTests() {
8589
createQueue(client, RESOLVES_POJO_MESSAGE_LIST_QUEUE_NAME),
8690
createQueue(client, RESOLVES_POJO_FROM_HEADER_QUEUE_NAME),
8791
createQueue(client, RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME),
88-
createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME)).join();
92+
createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME),
93+
createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME)).join();
8994
}
9095

9196
@Test
@@ -151,6 +156,18 @@ void resolvesMyPojoFromNotificationMessage() throws Exception {
151156
assertThat(latchContainer.resolvesPojoNotificationMessageLatch.await(10, TimeUnit.SECONDS)).isTrue();
152157
}
153158

159+
@Test
160+
void resolvesMyPojoFromNotificationMessageList() throws Exception {
161+
byte[] notificationJsonContent = FileCopyUtils
162+
.copyToByteArray(getClass().getClassLoader().getResourceAsStream("notificationMessage.json"));
163+
String payload = new String(notificationJsonContent);
164+
List<Message<String>> messages = IntStream.range(0, 10).mapToObj(index -> MessageBuilder.withPayload(payload).build()).toList();
165+
sqsTemplate.sendMany(RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, messages);
166+
logger.debug("Sent message to queue {} with messageBody {}",
167+
RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, payload);
168+
assertThat(latchContainer.resolvesPojoNotificationMessageListLatch.await(10, TimeUnit.SECONDS)).isTrue();
169+
}
170+
154171
private Map<String, Object> getHeaderMapping(Class<?> clazz) {
155172
return Collections.singletonMap(SqsHeaders.SQS_DEFAULT_TYPE_HEADER, clazz.getName());
156173
}
@@ -247,6 +264,14 @@ void listen(@SnsNotificationMessage MyEnvelope<MyPojo> myPojo) {
247264
RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME);
248265
latchContainer.resolvesPojoNotificationMessageLatch.countDown();
249266
}
267+
268+
@SqsListener(queueNames = RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, id = "resolves-pojo-with-notification-message-list", factory = "defaultSqsListenerContainerFactory")
269+
void listen(@SnsNotificationMessage List<MyEnvelope<MyPojo>> myPojos) {
270+
Assert.notEmpty(myPojos, "Received empty messages");
271+
logger.debug("Received messages {} from queue {}", myPojos,
272+
RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME);
273+
latchContainer.resolvesPojoNotificationMessageListLatch.countDown();
274+
}
250275
}
251276

252277
static class LatchContainer {
@@ -258,6 +283,7 @@ static class LatchContainer {
258283
CountDownLatch resolvesPojoFromMappingLatch = new CountDownLatch(1);
259284
CountDownLatch resolvesMyOtherPojoFromMappingLatch = new CountDownLatch(1);
260285
CountDownLatch resolvesPojoNotificationMessageLatch = new CountDownLatch(1);
286+
CountDownLatch resolvesPojoNotificationMessageListLatch = new CountDownLatch(1);
261287

262288
}
263289

0 commit comments

Comments
 (0)