Skip to content

Commit ab02df3

Browse files
committed
Add test for negative credits
Signed-off-by: Tim Terlegård <[email protected]>
1 parent d493d48 commit ab02df3

File tree

2 files changed

+50
-7
lines changed

2 files changed

+50
-7
lines changed

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private long setCreditsAndRequest(AmqpSender sender) {
168168
subscription.request(request);
169169
return credits;
170170
}
171-
if (credits == 0L && subscription != Subscriptions.CANCELLED) {
171+
if (credits <= 0L && subscription != Subscriptions.CANCELLED) {
172172
onNoMoreCredit(sender);
173173
}
174174
return 0L;
@@ -195,7 +195,7 @@ public void onNext(Message<?> message) {
195195
if (tuple != null) { // No serialization issue
196196
subscriber.onNext(tuple.getItem1());
197197
long remainingCredits = tuple.getItem3();
198-
if (remainingCredits == 0) { // no more credit, request more
198+
if (remainingCredits <= 0) { // no more credit, request more
199199
onNoMoreCredit(tuple.getItem2());
200200
} else { // keep the request one more message
201201
requestUpstream();
@@ -221,9 +221,9 @@ private void onNoMoreCredit(AmqpSender sender) {
221221
return;
222222
}
223223
long c = setCreditsAndRequest(sender);
224-
if (c <= 0L) { // still no credits, schedule a periodic retry
224+
if (c == 0L) { // still no credits, schedule a periodic retry
225225
holder.getVertx().setPeriodic(configuration.getCreditRetrievalPeriod(), id -> {
226-
if (setCreditsAndRequest(sender) > 0L || isCancelled()) {
226+
if (setCreditsAndRequest(sender) != 0L || isCancelled()) {
227227
// Got our new credits or the application has been terminated,
228228
// we cancel the periodic task.
229229
holder.getVertx().cancelTimer(id);

smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpCreditTest.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.assertj.core.api.Assertions.assertThat;
44

5+
import java.time.Duration;
56
import java.util.ArrayList;
67
import java.util.HashMap;
78
import java.util.List;
@@ -10,6 +11,7 @@
1011
import java.util.concurrent.CountDownLatch;
1112
import java.util.concurrent.Flow;
1213
import java.util.concurrent.TimeUnit;
14+
import java.util.function.Consumer;
1315
import java.util.stream.Collectors;
1416
import java.util.stream.IntStream;
1517

@@ -22,8 +24,10 @@
2224
import org.junit.jupiter.api.Timeout;
2325

2426
import io.smallrye.mutiny.Multi;
27+
import io.smallrye.mutiny.Uni;
2528
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
2629
import io.vertx.core.Vertx;
30+
import io.vertx.proton.ProtonReceiver;
2731

2832
public class AmqpCreditTest extends AmqpTestBase {
2933

@@ -48,10 +52,36 @@ public void testCreditBasedFlowControl() throws Exception {
4852
CountDownLatch msgsReceived = new CountDownLatch(msgCount);
4953
List<Object> payloadsReceived = new ArrayList<>(msgCount);
5054

51-
server = setupMockServer(msgCount, msgsReceived, payloadsReceived, executionHolder.vertx().getDelegate());
55+
server = setupMockServer(msgCount, msgsReceived, payloadsReceived, executionHolder.vertx().getDelegate(),
56+
msgCount / 10, r -> {
57+
});
58+
59+
Flow.Subscriber<? extends Message<?>> sink = createProviderAndSink(UUID.randomUUID().toString(),
60+
server.actualPort());
61+
//noinspection unchecked
62+
Multi.createFrom().range(0, msgCount)
63+
.map(Message::of)
64+
.subscribe((Flow.Subscriber<? super Message<Integer>>) sink);
65+
66+
assertThat(msgsReceived.await(20, TimeUnit.SECONDS))
67+
.withFailMessage("Sent %s msgs but %s remain outstanding", msgCount, msgsReceived.getCount()).isTrue();
68+
List<Integer> expectedPayloads = IntStream.range(0, msgCount).boxed().collect(Collectors.toList());
69+
assertThat(payloadsReceived).containsAll(expectedPayloads);
70+
}
71+
72+
@Test
73+
@Timeout(30)
74+
public void testNegativeCredit() throws Exception {
75+
int msgCount = 10;
76+
CountDownLatch msgsReceived = new CountDownLatch(msgCount);
77+
List<Object> payloadsReceived = new ArrayList<>(msgCount);
78+
79+
server = setupMockServer(msgCount, msgsReceived, payloadsReceived, executionHolder.vertx().getDelegate(), -2,
80+
this::updateCreditsEveryTwoSeconds);
5281

5382
Flow.Subscriber<? extends Message<?>> sink = createProviderAndSink(UUID.randomUUID().toString(),
5483
server.actualPort());
84+
5585
//noinspection unchecked
5686
Multi.createFrom().range(0, msgCount)
5787
.map(Message::of)
@@ -78,7 +108,18 @@ private Flow.Subscriber<? extends Message<?>> createProviderAndSink(String topic
78108
return provider.getSubscriber(new MapBasedConfig(config));
79109
}
80110

81-
private MockServer setupMockServer(int msgCount, CountDownLatch latch, List<Object> payloads, Vertx vertx)
111+
private void updateCreditsEveryTwoSeconds(ProtonReceiver serverReceiver) {
112+
Multi.createFrom().items(2, 4, 6)
113+
.onItem().transformToUniAndMerge(seconds -> {
114+
Duration delay = Duration.ofSeconds(seconds);
115+
return Uni.createFrom().item(seconds)
116+
.onItem().delayIt().by(delay);
117+
})
118+
.subscribe().with(item -> serverReceiver.flow(1));
119+
}
120+
121+
private MockServer setupMockServer(int msgCount, CountDownLatch latch, List<Object> payloads, Vertx vertx,
122+
int initialCredits, Consumer<ProtonReceiver> receiverFunction)
82123
throws Exception {
83124
assertThat(msgCount % 10 == 0).isTrue();
84125
int creditBatch = msgCount / 10;
@@ -114,9 +155,11 @@ private MockServer setupMockServer(int msgCount, CountDownLatch latch, List<Obje
114155
}
115156
});
116157

158+
receiverFunction.accept(serverReceiver);
159+
117160
serverReceiver.open();
118161

119-
serverReceiver.flow(creditBatch);
162+
serverReceiver.flow(initialCredits);
120163
});
121164
});
122165
}

0 commit comments

Comments
 (0)