Skip to content

Commit 7856940

Browse files
authored
Merge pull request #2231 from cescoffier/acknowledgement-coordination
Provide a utility method to coordinate (negative) acknowledgement when multiple messages are created from one
2 parents 4ab36e9 + dfb79f8 commit 7856940

File tree

2 files changed

+295
-1
lines changed

2 files changed

+295
-1
lines changed

api/src/main/java/io/smallrye/reactive/messaging/Messages.java

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,63 @@
11
package io.smallrye.reactive.messaging;
22

33
import java.util.ArrayList;
4+
import java.util.Arrays;
45
import java.util.Collections;
56
import java.util.List;
67
import java.util.Optional;
78
import java.util.concurrent.CompletableFuture;
89
import java.util.concurrent.CompletionStage;
10+
import java.util.concurrent.CopyOnWriteArrayList;
11+
import java.util.concurrent.atomic.AtomicBoolean;
912
import java.util.function.Function;
1013
import java.util.function.Supplier;
1114
import java.util.stream.Collectors;
1215

1316
import org.eclipse.microprofile.reactive.messaging.Message;
1417
import org.eclipse.microprofile.reactive.messaging.Metadata;
1518

19+
import io.smallrye.common.annotation.CheckReturnValue;
20+
21+
/**
22+
* A class handling coordination between messages.
23+
*/
1624
public class Messages {
1725

1826
private Messages() {
1927
// Avoid direct instantiation.
2028
}
2129

30+
/**
31+
* Chains the given message with some other messages.
32+
* It coordinates the acknowledgement. When all the other messages are acknowledged successfully, the passed
33+
* message is acknowledged. If one of the other messages is acknowledged negatively, the passed message is also
34+
* nacked (with the same reason). Subsequent ack/nack will be ignored.
35+
* <p>
36+
*
37+
* @param message the message
38+
* @return the chain builder that let you decide how the metadata are passed, and the set of messages.
39+
*/
40+
@CheckReturnValue
41+
public static MessageChainBuilder chain(Message<?> message) {
42+
return new MessageChainBuilder(message);
43+
}
44+
45+
/**
46+
* Merges multiple messages into a single one.
47+
* This is an implementation of a <em>merge pattern</em>: n messages combined into 1.
48+
* <p>
49+
* Whe resulting message payload is computed using the combinator function.
50+
* When the returned message is acked/nacked, the passes messages are acked/nacked accordingly.
51+
* <p>
52+
* Metadata are also merged. The metadata of all the messages are copied into the resulting message. If, for a given
53+
* class, the metadata is already present in the result message, it's either ignored, or merged if the class
54+
* implements {@link MergeableMetadata}.
55+
*
56+
* @param list the list of message, must not be empty, must not be null
57+
* @param combinator the combinator method, must not be null
58+
* @param <T> the payload type of the produced message
59+
* @return the resulting message
60+
*/
2261
public static <T> Message<T> merge(List<Message<?>> list, Function<List<?>, T> combinator) {
2362
if (list.isEmpty()) {
2463
return Message.of(combinator.apply(Collections.emptyList()));
@@ -59,6 +98,20 @@ public static <T> Message<T> merge(List<Message<?>> list, Function<List<?>, T> c
5998
.withMetadata(metadata);
6099
}
61100

101+
/**
102+
* Merges multiple messages into a single one.
103+
* <p>
104+
* Whe resulting message payload is computed using the combinator function.
105+
* When the returned message is acked/nacked, the passes messages are acked/nacked accordingly.
106+
* <p>
107+
* Metadata are also merged. The metadata of all the messages are copied into the resulting message. If, for a given
108+
* class, the metadata is already present in the result message, it's either ignored, or merged if the class
109+
* implements {@link MergeableMetadata}.
110+
*
111+
* @param list the list of message, must not be empty, must not be null
112+
* @param <T> the payload type of the passed messages
113+
* @return the resulting message
114+
*/
62115
public static <T> Message<List<T>> merge(List<Message<T>> list) {
63116
if (list.isEmpty()) {
64117
return Message.of(Collections.emptyList());
@@ -90,7 +143,7 @@ public static <T> Message<List<T>> merge(List<Message<T>> list) {
90143
.withMetadata(metadata);
91144
}
92145

93-
@SuppressWarnings("unchecked")
146+
@SuppressWarnings({ "unchecked", "rawtypes" })
94147
private static Metadata merge(Metadata first, Metadata second) {
95148
Metadata result = first;
96149
for (Object meta : second) {
@@ -121,4 +174,103 @@ private static Metadata merge(Metadata first, Metadata second) {
121174
return result;
122175
}
123176

177+
/**
178+
* The message chain builder allows chaining message and configure metadata propagation.
179+
* By default, all the metadata from the given message are copied into the chained messages.
180+
*/
181+
public static class MessageChainBuilder {
182+
private final Message<?> input;
183+
private Metadata metadata;
184+
185+
private MessageChainBuilder(Message<?> message) {
186+
this.input = message;
187+
this.metadata = message.getMetadata().copy();
188+
}
189+
190+
/**
191+
* Do not copy any metadata from the initial message to the chained message.
192+
*
193+
* @return the current {@link MessageChainBuilder}
194+
*/
195+
@CheckReturnValue
196+
public MessageChainBuilder withoutMetadata() {
197+
this.metadata = Metadata.empty();
198+
return this;
199+
}
200+
201+
/**
202+
* Copy the given metadata of the given classes from the initial message to the chained message, if the initial
203+
* message does not include a metadata object of the given class.
204+
*
205+
* In general, this method must be used after {@link #withoutMetadata()}.
206+
*
207+
* @return the current {@link MessageChainBuilder}
208+
*/
209+
@CheckReturnValue
210+
public MessageChainBuilder withMetadata(Class<?>... mc) {
211+
for (Class<?> clazz : mc) {
212+
Optional<?> o = input.getMetadata().get(clazz);
213+
o.ifPresent(value -> this.metadata = metadata.with(value));
214+
}
215+
return this;
216+
}
217+
218+
/**
219+
* Do not the given metadata of the given classes from the initial message to the chained message, if the initial
220+
* message does not include a metadata object of the given class.
221+
*
222+
* @return the current {@link MessageChainBuilder}
223+
*/
224+
@CheckReturnValue
225+
public MessageChainBuilder withoutMetadata(Class<?>... mc) {
226+
for (Class<?> clazz : mc) {
227+
this.metadata = this.metadata.without(clazz);
228+
}
229+
return this;
230+
}
231+
232+
/**
233+
* Passed the chained messages.
234+
* The messages are not modified, but should not be used afterward, and should be replaced by the messages contained
235+
* in the returned list.
236+
* This method preserve the order. So, the first message corresponds to the first message in the returned list.
237+
* The message from the returned list have the necessary logic to chain the ack/nack signals and the copied metadata.
238+
*
239+
* @param messages the chained messages, must not be empty, must not be null, must not contain null
240+
* @return the list of modified messages
241+
*/
242+
public List<Message<?>> with(Message<?>... messages) {
243+
AtomicBoolean done = new AtomicBoolean();
244+
245+
// Must be modifiable
246+
List<Message<?>> trackers = Arrays.stream(messages).collect(Collectors.toCollection(CopyOnWriteArrayList::new));
247+
List<Message<?>> outcomes = new ArrayList<>();
248+
for (Message<?> message : messages) {
249+
Message<?> tmp = message;
250+
for (Object metadatum : metadata) {
251+
tmp = tmp.addMetadata(metadatum);
252+
}
253+
outcomes.add(tmp
254+
.withAck(() -> {
255+
CompletionStage<Void> acked = message.ack();
256+
if (trackers.remove(message)) {
257+
if (trackers.isEmpty() && done.compareAndSet(false, true)) {
258+
return acked.thenCompose(x -> input.ack());
259+
}
260+
}
261+
return acked;
262+
})
263+
.withNack((reason) -> {
264+
CompletionStage<Void> nacked = message.nack(reason);
265+
if (trackers.remove(message)) {
266+
if (done.compareAndSet(false, true)) {
267+
return nacked.thenCompose(x -> input.nack(reason));
268+
}
269+
}
270+
return nacked;
271+
}));
272+
}
273+
return outcomes;
274+
}
275+
}
124276
}

api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.List;
99
import java.util.concurrent.CompletableFuture;
1010
import java.util.concurrent.atomic.AtomicBoolean;
11+
import java.util.concurrent.atomic.AtomicInteger;
1112

1213
import org.eclipse.microprofile.reactive.messaging.Message;
1314
import org.junit.jupiter.api.Test;
@@ -216,6 +217,147 @@ void checkWithEmptyList() {
216217
assertThat(Messages.merge(List.of()).getPayload()).isEqualTo(Collections.emptyList());
217218
}
218219

220+
@Test
221+
void checkSimpleChainAcknowledgement() {
222+
AtomicBoolean o1Ack = new AtomicBoolean();
223+
AtomicBoolean o2Ack = new AtomicBoolean();
224+
AtomicInteger i1Ack = new AtomicInteger();
225+
Message<String> o1 = Message.of("foo", () -> {
226+
o1Ack.set(true);
227+
return CompletableFuture.completedFuture(null);
228+
});
229+
Message<String> o2 = Message.of("bar", () -> {
230+
o2Ack.set(true);
231+
return CompletableFuture.completedFuture(null);
232+
});
233+
234+
Message<Integer> i = Message.of(1, () -> {
235+
i1Ack.incrementAndGet();
236+
return CompletableFuture.completedFuture(null);
237+
});
238+
239+
List<Message<?>> outcomes = Messages.chain(i).with(o1, o2);
240+
assertThat(i1Ack).hasValue(0);
241+
assertThat(o1Ack).isFalse();
242+
assertThat(o2Ack).isFalse();
243+
244+
outcomes.get(0).ack();
245+
assertThat(i1Ack).hasValue(0);
246+
assertThat(o1Ack).isTrue();
247+
assertThat(o2Ack).isFalse();
248+
249+
outcomes.get(1).ack();
250+
assertThat(i1Ack).hasValue(1);
251+
assertThat(o1Ack).isTrue();
252+
assertThat(o1Ack).isTrue();
253+
254+
outcomes.get(1).ack();
255+
outcomes.get(0).ack();
256+
assertThat(i1Ack).hasValue(1);
257+
258+
outcomes.get(1).nack(new Exception("boom"));
259+
outcomes.get(0).nack(new Exception("boom"));
260+
assertThat(i1Ack).hasValue(1);
261+
}
262+
263+
@Test
264+
void checkSimpleChainNegativeAcknowledgement() {
265+
AtomicBoolean o1Ack = new AtomicBoolean();
266+
AtomicBoolean o2Ack = new AtomicBoolean();
267+
AtomicBoolean o1Nack = new AtomicBoolean();
268+
AtomicBoolean o2Nack = new AtomicBoolean();
269+
AtomicInteger i1Ack = new AtomicInteger();
270+
AtomicInteger i1Nack = new AtomicInteger();
271+
272+
Message<String> o1 = Message.of("foo", () -> {
273+
o1Ack.set(true);
274+
return CompletableFuture.completedFuture(null);
275+
}, t -> {
276+
o1Nack.set(true);
277+
return CompletableFuture.completedFuture(null);
278+
});
279+
Message<String> o2 = Message.of("bar", () -> {
280+
o2Ack.set(true);
281+
return CompletableFuture.completedFuture(null);
282+
}, t -> {
283+
o2Nack.set(true);
284+
return CompletableFuture.completedFuture(null);
285+
});
286+
287+
Message<Integer> i = Message.of(1, () -> {
288+
i1Ack.incrementAndGet();
289+
return CompletableFuture.completedFuture(null);
290+
}, t -> {
291+
i1Nack.incrementAndGet();
292+
return CompletableFuture.completedFuture(null);
293+
});
294+
295+
List<Message<?>> outcomes = Messages.chain(i).with(o1, o2);
296+
assertThat(i1Ack).hasValue(0);
297+
assertThat(o1Ack).isFalse();
298+
assertThat(o2Ack).isFalse();
299+
assertThat(i1Nack).hasValue(0);
300+
assertThat(o1Nack).isFalse();
301+
assertThat(o2Nack).isFalse();
302+
303+
outcomes.get(0).ack();
304+
assertThat(i1Ack).hasValue(0);
305+
assertThat(o1Ack).isTrue();
306+
assertThat(o2Ack).isFalse();
307+
assertThat(i1Nack).hasValue(0);
308+
assertThat(o1Nack).isFalse();
309+
assertThat(o2Nack).isFalse();
310+
311+
outcomes.get(0).nack(new Exception("boom"));
312+
assertThat(i1Ack).hasValue(0);
313+
assertThat(i1Nack).hasValue(0);
314+
315+
outcomes.get(1).nack(new Exception("boom"));
316+
assertThat(i1Nack).hasValue(1);
317+
assertThat(i1Ack).hasValue(0);
318+
assertThat(o2Nack).isTrue();
319+
320+
outcomes.get(1).ack();
321+
assertThat(i1Nack).hasValue(1);
322+
assertThat(i1Ack).hasValue(0);
323+
}
324+
325+
@Test
326+
void testChainWithMetadataSelection() {
327+
Message<Integer> i = Message.of(1)
328+
.withMetadata(List.of(new NonMergeableMetadata("hello"), new MergeableMetadata("hello"),
329+
new AnotherMetadata("hello")));
330+
331+
Message<String> m1 = Message.of("a");
332+
AnotherMetadata am = new AnotherMetadata("hello");
333+
Message<String> m2 = Message.of("b").addMetadata(am);
334+
335+
// No metadata copied from the original message
336+
List<Message<?>> out = Messages.chain(i).withoutMetadata().with(m1, m2);
337+
assertThat(out.get(0).getMetadata()).isEmpty();
338+
assertThat(out.get(1).getMetadata()).hasSize(1).containsOnly(am);
339+
340+
// All metadata are copied from the original message
341+
out = Messages.chain(i).with(m1, m2);
342+
assertThat(out.get(0).getMetadata()).hasSize(3);
343+
assertThat(out.get(1).getMetadata()).hasSize(3).doesNotContain(am);
344+
345+
// All metadata but MergeableMetadata are copied from the original message
346+
out = Messages.chain(i).withoutMetadata(MergeableMetadata.class).with(m1, m2);
347+
assertThat(out.get(0).getMetadata()).hasSize(2);
348+
assertThat(out.get(1).getMetadata()).hasSize(2).doesNotContain(am);
349+
350+
// All metadata but AnotherMetadata are copied from the original message
351+
out = Messages.chain(i).withoutMetadata(AnotherMetadata.class).with(m1, m2);
352+
assertThat(out.get(0).getMetadata()).hasSize(2);
353+
assertThat(out.get(1).getMetadata()).hasSize(3).contains(am);
354+
355+
out = Messages.chain(i).withoutMetadata().withMetadata(AnotherMetadata.class).with(m1, m2);
356+
assertThat(out.get(0).getMetadata()).hasSize(1);
357+
assertThat(out.get(1).getMetadata()).hasSize(1);
358+
359+
}
360+
219361
public static class NonMergeableMetadata {
220362
String value;
221363

0 commit comments

Comments
 (0)