Skip to content

Commit 5bf28a4

Browse files
committed
Allow and handle negative credits, fixes invalid request number error
1 parent 1d6b3c9 commit 5bf28a4

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void onSubscribe(Subscription subscription) {
161161
private long setCreditsAndRequest(AmqpSender sender) {
162162
long credits = sender.remainingCredits();
163163
Subscription subscription = upstream.get();
164-
if (credits != 0L && subscription != Subscriptions.CANCELLED) {
164+
if (credits > 0L && subscription != Subscriptions.CANCELLED) {
165165
// Request upfront the sender remaining credits or the max inflights
166166
long request = maxInflights > 0 ? Math.min(credits, maxInflights) : credits;
167167
log.retrievedCreditsForChannel(configuration.getChannel(), credits);
@@ -221,9 +221,9 @@ private void onNoMoreCredit(AmqpSender sender) {
221221
return;
222222
}
223223
long c = setCreditsAndRequest(sender);
224-
if (c == 0L) { // still no credits, schedule a periodic retry
224+
if (c <= 0L) { // still no credits, schedule a periodic retry
225225
holder.getVertx().setPeriodic(configuration.getCreditRetrievalPeriod(), id -> {
226-
if (setCreditsAndRequest(sender) != 0L || isCancelled()) {
226+
if (setCreditsAndRequest(sender) > 0L || isCancelled()) {
227227
// Got our new credits or the application has been terminated,
228228
// we cancel the periodic task.
229229
holder.getVertx().cancelTimer(id);

0 commit comments

Comments
 (0)