Skip to content

Commit 251d09f

Browse files
committed
fix(groupBy): resolve upstream request handling issue
The groupBy operator was not correctly managing upstream requests. It now makes prefetch amount of requests each time the drain loop does not emit any items. This commit includes both a reproducer and the fix for the issue.
1 parent 7634309 commit 251d09f

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,6 @@ void drain() {
417417
requested.addAndGet(-e);
418418
}
419419
parent.getUpstreamSubscription().request(e);
420-
} else {
421-
parent.getUpstreamSubscription().request(parent.prefetch);
422420
}
423421
}
424422

implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414
import java.util.Collections;
1515
import java.util.List;
1616
import java.util.NoSuchElementException;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.Executors;
1719
import java.util.concurrent.Flow.Subscriber;
1820
import java.util.concurrent.Flow.Subscription;
1921
import java.util.concurrent.atomic.AtomicBoolean;
2022
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.AtomicLong;
2124
import java.util.concurrent.atomic.AtomicReference;
2225
import java.util.function.Consumer;
2326

@@ -1037,4 +1040,45 @@ public void rejectGroupByBadPrefetch() {
10371040
assertThrows(IllegalArgumentException.class,
10381041
() -> Multi.createFrom().range(1, 10).group().by(i -> i % 2, 0L));
10391042
}
1043+
1044+
@Test
1045+
void testUpstreamRequestsNotBlownOutOfProportion() {
1046+
ExecutorService executor = Executors.newFixedThreadPool(10);
1047+
AtomicLong requestCounter = new AtomicLong(0);
1048+
AtomicLong itemCounter = new AtomicLong(0);
1049+
AtomicReference<MultiEmitter<? super Integer>> e = new AtomicReference<>();
1050+
1051+
Multi.createFrom().<Integer> emitter(e::set)
1052+
.onRequest().invoke(requestCounter::addAndGet)
1053+
.group().by(i -> i / 10)
1054+
.onItem().transformToMulti(g -> g.map(i -> g.key() + " : " + i)
1055+
.emitOn(executor)
1056+
.invoke(s -> {
1057+
try {
1058+
Thread.sleep(100);
1059+
itemCounter.incrementAndGet();
1060+
} catch (InterruptedException ex) {
1061+
throw new RuntimeException(ex);
1062+
}
1063+
}))
1064+
.merge()
1065+
.subscribe().with(s -> {
1066+
});
1067+
1068+
int itemCount = 100;
1069+
MultiEmitter<? super Integer> emitter = e.get();
1070+
new Thread(() -> {
1071+
int i = 0;
1072+
while (true) {
1073+
if (emitter.requested() > 0) {
1074+
emitter.emit(i);
1075+
i++;
1076+
}
1077+
}
1078+
}).start();
1079+
1080+
await().untilAsserted(() -> assertThat(itemCounter).hasValueGreaterThanOrEqualTo(itemCount));
1081+
System.out.println(requestCounter.get());
1082+
assertThat(requestCounter.get()).isLessThan(10000L); // this should not blow up
1083+
}
10401084
}

0 commit comments

Comments
 (0)