Skip to content

Commit b191973

Browse files
committed
refactor: simplify flatMap upstream request fix
1 parent 169e3b0 commit b191973

File tree

1 file changed

+2
-29
lines changed

1 file changed

+2
-29
lines changed

implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,7 @@ void tryEmit(FlatMapInner<O> inner, O item) {
223223
}
224224

225225
inner.request(1);
226-
int queuedInners = nonEmptyInnerQueues();
227-
if (queuedInners == 0) {
228-
upstream.request(1);
229-
}
226+
upstream.request(1);
230227
} else {
231228
if (q == null) {
232229
q = getOrCreateInnerQueue(inner);
@@ -364,7 +361,7 @@ void drainLoop() {
364361
d = inner.done;
365362
boolean empty = q.isEmpty();
366363
if (!d && empty && replenishMain == 0) {
367-
// replenishMain++;
364+
replenishMain++;
368365
}
369366
}
370367
}
@@ -413,11 +410,6 @@ void drainLoop() {
413410
}
414411
}
415412

416-
int queuedInners = nonEmptyInnerQueues();
417-
if (queuedInners == 0) {
418-
replenishMain++;
419-
}
420-
421413
if (replenishMain != 0L && !done && !cancelled) {
422414
upstream.request(replenishMain);
423415
}
@@ -525,25 +517,6 @@ Queue<O> getOrCreateInnerQueue(FlatMapInner<O> inner) {
525517
return q;
526518
}
527519

528-
int nonEmptyInnerQueues() {
529-
FlatMapInner<O>[] inners = get();
530-
if (inners == null || isEmpty()) {
531-
return -1;
532-
}
533-
int queuedInners = 0;
534-
for (int i = 0; i < inners.length; i++) {
535-
FlatMapInner<O> inner = inners[i];
536-
if (inner == null || inner.done) {
537-
continue;
538-
}
539-
Queue<O> q = inner.queue;
540-
if (q != null && !q.isEmpty()) {
541-
queuedInners++;
542-
}
543-
}
544-
return queuedInners;
545-
}
546-
547520
@Override
548521
public Context context() {
549522
if (downstream instanceof ContextSupport) {

0 commit comments

Comments
 (0)