Skip to content

Commit b66cdb1

Browse files
authored
Allow and handle negative credits, fixes invalid request number error (#3213)
1 parent 7a13887 commit b66cdb1

File tree

2 files changed

+49
-7
lines changed

2 files changed

+49
-7
lines changed

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,14 @@ public void onSubscribe(Subscription subscription) {
161161
private long setCreditsAndRequest(AmqpSender sender) {
162162
long credits = sender.remainingCredits();
163163
Subscription subscription = upstream.get();
164-
if (credits != 0L && subscription != Subscriptions.CANCELLED) {
164+
if (credits > 0L && subscription != Subscriptions.CANCELLED) {
165165
// Request upfront the sender remaining credits or the max inflights
166166
long request = maxInflights > 0 ? Math.min(credits, maxInflights) : credits;
167167
log.retrievedCreditsForChannel(configuration.getChannel(), credits);
168168
subscription.request(request);
169169
return credits;
170170
}
171-
if (credits == 0L && subscription != Subscriptions.CANCELLED) {
171+
if (credits <= 0L && subscription != Subscriptions.CANCELLED) {
172172
onNoMoreCredit(sender);
173173
}
174174
return 0L;
@@ -195,7 +195,7 @@ public void onNext(Message<?> message) {
195195
if (tuple != null) { // No serialization issue
196196
subscriber.onNext(tuple.getItem1());
197197
long remainingCredits = tuple.getItem3();
198-
if (remainingCredits == 0) { // no more credit, request more
198+
if (remainingCredits <= 0) { // no more credit, request more
199199
onNoMoreCredit(tuple.getItem2());
200200
} else { // keep the request one more message
201201
requestUpstream();

smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpCreditTest.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.assertj.core.api.Assertions.assertThat;
44

5+
import java.time.Duration;
56
import java.util.ArrayList;
67
import java.util.HashMap;
78
import java.util.List;
@@ -10,6 +11,7 @@
1011
import java.util.concurrent.CountDownLatch;
1112
import java.util.concurrent.Flow;
1213
import java.util.concurrent.TimeUnit;
14+
import java.util.function.Consumer;
1315
import java.util.stream.Collectors;
1416
import java.util.stream.IntStream;
1517

@@ -22,8 +24,10 @@
2224
import org.junit.jupiter.api.Timeout;
2325

2426
import io.smallrye.mutiny.Multi;
27+
import io.smallrye.mutiny.Uni;
2528
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
2629
import io.vertx.core.Vertx;
30+
import io.vertx.proton.ProtonReceiver;
2731

2832
public class AmqpCreditTest extends AmqpTestBase {
2933

@@ -48,10 +52,36 @@ public void testCreditBasedFlowControl() throws Exception {
4852
CountDownLatch msgsReceived = new CountDownLatch(msgCount);
4953
List<Object> payloadsReceived = new ArrayList<>(msgCount);
5054

51-
server = setupMockServer(msgCount, msgsReceived, payloadsReceived, executionHolder.vertx().getDelegate());
55+
server = setupMockServer(msgCount, msgsReceived, payloadsReceived, executionHolder.vertx().getDelegate(),
56+
msgCount / 10, r -> {
57+
});
58+
59+
Flow.Subscriber<? extends Message<?>> sink = createProviderAndSink(UUID.randomUUID().toString(),
60+
server.actualPort());
61+
//noinspection unchecked
62+
Multi.createFrom().range(0, msgCount)
63+
.map(Message::of)
64+
.subscribe((Flow.Subscriber<? super Message<Integer>>) sink);
65+
66+
assertThat(msgsReceived.await(20, TimeUnit.SECONDS))
67+
.withFailMessage("Sent %s msgs but %s remain outstanding", msgCount, msgsReceived.getCount()).isTrue();
68+
List<Integer> expectedPayloads = IntStream.range(0, msgCount).boxed().collect(Collectors.toList());
69+
assertThat(payloadsReceived).containsAll(expectedPayloads);
70+
}
71+
72+
@Test
73+
@Timeout(30)
74+
public void testNegativeCredit() throws Exception {
75+
int msgCount = 10;
76+
CountDownLatch msgsReceived = new CountDownLatch(msgCount);
77+
List<Object> payloadsReceived = new ArrayList<>(msgCount);
78+
79+
server = setupMockServer(msgCount, msgsReceived, payloadsReceived, executionHolder.vertx().getDelegate(), -2,
80+
this::updateCreditsEveryTwoSeconds);
5281

5382
Flow.Subscriber<? extends Message<?>> sink = createProviderAndSink(UUID.randomUUID().toString(),
5483
server.actualPort());
84+
5585
//noinspection unchecked
5686
Multi.createFrom().range(0, msgCount)
5787
.map(Message::of)
@@ -78,8 +108,18 @@ private Flow.Subscriber<? extends Message<?>> createProviderAndSink(String topic
78108
return provider.getSubscriber(new MapBasedConfig(config));
79109
}
80110

81-
private MockServer setupMockServer(int msgCount, CountDownLatch latch, List<Object> payloads, Vertx vertx)
82-
throws Exception {
111+
private void updateCreditsEveryTwoSeconds(ProtonReceiver serverReceiver) {
112+
Multi.createFrom().items(2, 4, 6)
113+
.onItem().transformToUniAndMerge(seconds -> {
114+
Duration delay = Duration.ofSeconds(seconds);
115+
return Uni.createFrom().item(seconds)
116+
.onItem().delayIt().by(delay);
117+
})
118+
.subscribe().with(item -> serverReceiver.flow(1));
119+
}
120+
121+
private MockServer setupMockServer(int msgCount, CountDownLatch latch, List<Object> payloads, Vertx vertx,
122+
int initialCredits, Consumer<ProtonReceiver> receiverFunction) throws Exception {
83123
assertThat(msgCount % 10 == 0).isTrue();
84124
int creditBatch = msgCount / 10;
85125

@@ -114,9 +154,11 @@ private MockServer setupMockServer(int msgCount, CountDownLatch latch, List<Obje
114154
}
115155
});
116156

157+
receiverFunction.accept(serverReceiver);
158+
117159
serverReceiver.open();
118160

119-
serverReceiver.flow(creditBatch);
161+
serverReceiver.flow(initialCredits);
120162
});
121163
});
122164
}

0 commit comments

Comments
 (0)