Skip to content

Commit 7221be2

Browse files
authored
fix(flatMap): extend flatMap's concurrency limit to outer values (#376)
* Fix memory memory management for flatMap. Prevent the flatMap operator from accumulating in an internal array while concurrency is maxed out
1 parent b8890f1 commit 7221be2

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

src/asynciterable/operators/_flatten.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,12 @@ export class FlattenConcurrentAsyncIterable<TSource, TResult> extends AsyncItera
9393
}
9494
if (active < concurrent) {
9595
pullNextOuter(value as TSource);
96+
results[0] = outer.next();
9697
} else {
98+
// remove the outer iterator from the race, we're full
99+
results[0] = NEVER_PROMISE;
97100
outerValues.push(value as TSource);
98101
}
99-
results[0] = outer.next();
100102
break;
101103
}
102104
case Type.INNER: {
@@ -119,6 +121,10 @@ export class FlattenConcurrentAsyncIterable<TSource, TResult> extends AsyncItera
119121
}
120122
case Type.INNER: {
121123
--active;
124+
// add the outer iterator to the race, if its been removed and we are not yet done with it
125+
if (results[0] === NEVER_PROMISE && !outerComplete) {
126+
results[0] = outer.next();
127+
}
122128
// return the current slot to the pool
123129
innerIndices.push(index);
124130
// synchronously drain the `outerValues` buffer

0 commit comments

Comments
 (0)