Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit ccce22d

Browse files
author
wmz7year
committedApr 20, 2024
Add SQS module @SnsNotificationMessage batch messages support
1 parent 9116c28 commit ccce22d

File tree

4 files changed

+87
-11
lines changed

4 files changed

+87
-11
lines changed
 

‎docs/src/main/asciidoc/sqs.adoc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,11 @@ NOTE: Queues declared in the same annotation will share the container, though ea
618618
===== SNS Messages
619619

620620
Since 3.1.1, when receiving SNS messages through the `@SqsListener`, the message includes all attributes of the `SnsNotification`. To only receive need the `Message` part of the payload, you can utilize the `@SnsNotificationMessage` annotation.
621+
622+
The @SnsNotificationMessage annotation can be used as follows:
623+
624+
For handling individual message processing, the @SnsNotificationMessage annotation can be used in the following manner:
625+
621626
[source, java]
622627
----
623628
@SqsListener("my-queue")
@@ -626,6 +631,16 @@ public void listen(@SnsNotificationMessage Pojo pojo) {
626631
}
627632
----
628633

634+
For batch message processing, use the @SnsNotificationMessage annotation with a List<Pojo> parameter.
635+
636+
[source, java]
637+
----
638+
@SqsListener("my-queue")
639+
public void listen(@SnsNotificationMessage List<Pojo> pojos) {
640+
System.out.println(pojos.size());
641+
}
642+
----
643+
629644
===== Specifying a MessageListenerContainerFactory
630645
A `MessageListenerContainerFactory` can be specified through the `factory` property.
631646
Such factory will then be used to create the container for the annotated method.

‎spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SnsMessageConverter.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import java.lang.reflect.ParameterizedType;
21+
import java.lang.reflect.Type;
22+
import java.util.List;
23+
import org.springframework.core.GenericTypeResolver;
24+
import org.springframework.core.MethodParameter;
25+
import org.springframework.core.ResolvableType;
2026
import org.springframework.lang.Nullable;
2127
import org.springframework.messaging.Message;
2228
import org.springframework.messaging.MessageHeaders;
@@ -29,6 +35,7 @@
2935
/**
3036
* @author Michael Sosa
3137
* @author gustavomonarin
38+
* @author Wei Jiang
3239
* @since 3.1.1
3340
*/
3441
public class SnsMessageConverter implements SmartMessageConverter {
@@ -45,10 +52,31 @@ public SnsMessageConverter(MessageConverter payloadConverter, ObjectMapper jsonM
4552
}
4653

4754
@Override
55+
@SuppressWarnings("unchecked")
4856
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
4957
Assert.notNull(message, "message must not be null");
5058
Assert.notNull(targetClass, "target class must not be null");
5159

60+
Object payload = message.getPayload();
61+
62+
if (payload instanceof List messages) {
63+
return fromGenericMessages(messages, targetClass, conversionHint);
64+
}
65+
else {
66+
return fromGenericMessage((GenericMessage<?>) message, targetClass, conversionHint);
67+
}
68+
}
69+
70+
private Object fromGenericMessages(List<GenericMessage<?>> messages, Class<?> targetClass,
71+
@Nullable Object conversionHint) {
72+
Type resolvedType = getResolvedType(targetClass, conversionHint);
73+
Class<?> resolvedClazz = ResolvableType.forType(resolvedType).resolve();
74+
75+
return messages.stream().map(message -> fromGenericMessage(message, resolvedClazz, resolvedType)).toList();
76+
}
77+
78+
private Object fromGenericMessage(GenericMessage<?> message, Class<?> targetClass,
79+
@Nullable Object conversionHint) {
5280
JsonNode jsonNode;
5381
try {
5482
jsonNode = this.jsonMapper.readTree(message.getPayload().toString());
@@ -77,7 +105,7 @@ public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Ob
77105
? ((SmartMessageConverter) this.payloadConverter).fromMessage(genericMessage, targetClass,
78106
conversionHint)
79107
: this.payloadConverter.fromMessage(genericMessage, targetClass);
80-
return new SnsMessageWrapper(jsonNode.path("Subject").asText(), convertedMessage);
108+
return convertedMessage;
81109
}
82110

83111
@Override
@@ -97,10 +125,23 @@ public Message<?> toMessage(Object payload, MessageHeaders headers, Object conve
97125
"This converter only supports reading a SNS notification and not writing them");
98126
}
99127

100-
/**
101-
* SNS Message wrapper.
102-
*/
103-
public record SnsMessageWrapper(String subject, Object message) {
128+
static Type getResolvedType(Class<?> targetClass, @Nullable Object conversionHint) {
129+
if (conversionHint instanceof MethodParameter param) {
130+
param = param.nestedIfOptional();
131+
if (Message.class.isAssignableFrom(param.getParameterType())) {
132+
param = param.nested();
133+
}
134+
Type genericParameterType = param.getNestedGenericParameterType();
135+
Class<?> contextClass = param.getContainingClass();
136+
Type resolveType = GenericTypeResolver.resolveType(genericParameterType, contextClass);
137+
if (resolveType instanceof ParameterizedType parameterizedType) {
138+
return parameterizedType.getActualTypeArguments()[0];
139+
}
140+
else {
141+
return resolveType;
142+
}
143+
}
144+
return targetClass;
104145
}
105146

106147
}

‎spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/NotificationMessageArgumentResolver.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
import org.springframework.messaging.converter.MessageConverter;
2424
import org.springframework.messaging.converter.SmartMessageConverter;
2525
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
26-
import org.springframework.util.Assert;
2726

2827
/**
2928
* @author Michael Sosa
3029
* @author gustavomonarin
30+
* @author Wei Jiang
3131
* @since 3.1.1
3232
*/
3333
public class NotificationMessageArgumentResolver implements HandlerMethodArgumentResolver {
@@ -45,10 +45,7 @@ public boolean supportsParameter(MethodParameter parameter) {
4545

4646
@Override
4747
public Object resolveArgument(MethodParameter par, Message<?> msg) {
48-
Object object = this.converter.fromMessage(msg, par.getParameterType(), par);
49-
Assert.isInstanceOf(SnsMessageConverter.SnsMessageWrapper.class, object);
50-
SnsMessageConverter.SnsMessageWrapper nr = (SnsMessageConverter.SnsMessageWrapper) object;
51-
return nr.message();
48+
return this.converter.fromMessage(msg, par.getParameterType(), par);
5249
}
5350

5451
}

‎spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageConversionIntegrationTests.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
*
5757
* @author Tomaz Fernandes
5858
* @author Mikhail Strokov
59+
* @author Wei Jiang
5960
*/
6061
@SpringBootTest
6162
class SqsMessageConversionIntegrationTests extends BaseSqsIntegrationTest {
@@ -69,6 +70,7 @@ class SqsMessageConversionIntegrationTests extends BaseSqsIntegrationTest {
6970
static final String RESOLVES_POJO_FROM_HEADER_QUEUE_NAME = "resolves_pojo_from_mapping_test_queue";
7071
static final String RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME = "resolves_my_other_pojo_from_mapping_test_queue";
7172
static final String RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME = "resolves_pojo_from_notification_message_queue";
73+
static final String RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME = "resolves_pojo_from_notification_message_list_test_queue";
7274

7375
@Autowired
7476
LatchContainer latchContainer;
@@ -85,7 +87,8 @@ static void beforeTests() {
8587
createQueue(client, RESOLVES_POJO_MESSAGE_LIST_QUEUE_NAME),
8688
createQueue(client, RESOLVES_POJO_FROM_HEADER_QUEUE_NAME),
8789
createQueue(client, RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME),
88-
createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME)).join();
90+
createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME),
91+
createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME)).join();
8992
}
9093

9194
@Test
@@ -151,6 +154,17 @@ void resolvesMyPojoFromNotificationMessage() throws Exception {
151154
assertThat(latchContainer.resolvesPojoNotificationMessageLatch.await(10, TimeUnit.SECONDS)).isTrue();
152155
}
153156

157+
@Test
158+
void resolvesMyPojoFromNotificationMessageList() throws Exception {
159+
byte[] notificationJsonContent = FileCopyUtils
160+
.copyToByteArray(getClass().getClassLoader().getResourceAsStream("notificationMessage.json"));
161+
String payload = new String(notificationJsonContent);
162+
sqsTemplate.send(RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, payload);
163+
logger.debug("Sent message to queue {} with messageBody {}",
164+
RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, payload);
165+
assertThat(latchContainer.resolvesPojoNotificationMessageListLatch.await(10, TimeUnit.SECONDS)).isTrue();
166+
}
167+
154168
private Map<String, Object> getHeaderMapping(Class<?> clazz) {
155169
return Collections.singletonMap(SqsHeaders.SQS_DEFAULT_TYPE_HEADER, clazz.getName());
156170
}
@@ -247,6 +261,14 @@ void listen(@SnsNotificationMessage MyEnvelope<MyPojo> myPojo) {
247261
RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME);
248262
latchContainer.resolvesPojoNotificationMessageLatch.countDown();
249263
}
264+
265+
@SqsListener(queueNames = RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, id = "resolves-pojo-with-notification-message-list", factory = "defaultSqsListenerContainerFactory")
266+
void listen(@SnsNotificationMessage List<MyEnvelope<MyPojo>> myPojos) {
267+
Assert.notEmpty(myPojos, "Received empty messages");
268+
logger.debug("Received messages {} from queue {}", myPojos,
269+
RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME);
270+
latchContainer.resolvesPojoNotificationMessageListLatch.countDown();
271+
}
250272
}
251273

252274
static class LatchContainer {
@@ -258,6 +280,7 @@ static class LatchContainer {
258280
CountDownLatch resolvesPojoFromMappingLatch = new CountDownLatch(1);
259281
CountDownLatch resolvesMyOtherPojoFromMappingLatch = new CountDownLatch(1);
260282
CountDownLatch resolvesPojoNotificationMessageLatch = new CountDownLatch(1);
283+
CountDownLatch resolvesPojoNotificationMessageListLatch = new CountDownLatch(1);
261284

262285
}
263286

0 commit comments

Comments
 (0)
Please sign in to comment.