@@ -3,9 +3,10 @@ import { identity } from '../util/identity.js';
3
3
import { wrapWithAbort } from './operators/withabort.js' ;
4
4
import { throwIfAborted } from '../aborterror.js' ;
5
5
import { safeRace } from '../util/safeRace.js' ;
6
+ import { returnAsyncIterator } from '../util/returniterator.js' ;
6
7
7
8
// eslint-disable-next-line @typescript-eslint/no-empty-function
8
- const NEVER_PROMISE = new Promise ( ( ) => { } ) ;
9
+ const NEVER_PROMISE = new Promise < never > ( ( ) => { } ) ;
9
10
10
11
type MergeResult < T > = { value : T ; index : number } ;
11
12
@@ -28,39 +29,46 @@ export class CombineLatestAsyncIterable<TSource> extends AsyncIterableX<TSource[
28
29
const length = this . _sources . length ;
29
30
const iterators = new Array < AsyncIterator < TSource > > ( length ) ;
30
31
const nexts = new Array < Promise < MergeResult < IteratorResult < TSource > > > > ( length ) ;
31
- let hasValueAll = false ;
32
- const values = new Array < TSource > ( length ) ;
33
- const hasValues = new Array < boolean > ( length ) ;
34
- let active = length ;
35
32
36
- hasValues . fill ( false ) ;
33
+ let active = length ;
34
+ let allValuesAvailable = false ;
35
+ const values = new Array < TSource > ( length ) ;
36
+ const hasValues = new Array < boolean > ( length ) . fill ( false ) ;
37
37
38
38
for ( let i = 0 ; i < length ; i ++ ) {
39
39
const iterator = wrapWithAbort ( this . _sources [ i ] , signal ) [ Symbol . asyncIterator ] ( ) ;
40
40
iterators [ i ] = iterator ;
41
41
nexts [ i ] = wrapPromiseWithIndex ( iterator . next ( ) , i ) ;
42
42
}
43
43
44
- while ( active > 0 ) {
45
- const next = safeRace ( nexts ) ;
46
- const {
47
- value : { value : value$ , done : done$ } ,
48
- index,
49
- } = await next ;
50
- if ( done$ ) {
51
- nexts [ index ] = < Promise < MergeResult < IteratorResult < TSource > > > > NEVER_PROMISE ;
52
- active -- ;
53
- } else {
54
- values [ index ] = value$ ;
55
- hasValues [ index ] = true ;
56
-
57
- const iterator$ = iterators [ index ] ;
58
- nexts [ index ] = wrapPromiseWithIndex ( iterator$ . next ( ) , index ) ;
59
-
60
- if ( hasValueAll || ( hasValueAll = hasValues . every ( identity ) ) ) {
61
- yield values ;
44
+ try {
45
+ while ( active > 0 ) {
46
+ const next = safeRace ( nexts ) ;
47
+
48
+ const {
49
+ value : { value, done } ,
50
+ index,
51
+ } = await next ;
52
+
53
+ if ( done ) {
54
+ nexts [ index ] = NEVER_PROMISE ;
55
+ active -- ;
56
+ } else {
57
+ values [ index ] = value ;
58
+ hasValues [ index ] = true ;
59
+ allValuesAvailable = allValuesAvailable || hasValues . every ( identity ) ;
60
+
61
+ nexts [ index ] = wrapPromiseWithIndex ( iterators [ index ] . next ( ) , index ) ;
62
+
63
+ if ( allValuesAvailable ) {
64
+ yield values ;
65
+ }
62
66
}
63
67
}
68
+ } finally {
69
+ for ( const iterator of iterators ) {
70
+ await returnAsyncIterator ( iterator ) ;
71
+ }
64
72
}
65
73
}
66
74
}
@@ -176,5 +184,5 @@ export function combineLatest<T, T2, T3, T4, T5, T6>(
176
184
*/
177
185
export function combineLatest < T > ( ...sources : AsyncIterable < T > [ ] ) : AsyncIterableX < T [ ] > ;
178
186
export function combineLatest < T > ( ...sources : any [ ] ) : AsyncIterableX < T [ ] > {
179
- return new CombineLatestAsyncIterable < T > ( sources ) ;
187
+ return new CombineLatestAsyncIterable ( sources ) ;
180
188
}
0 commit comments