Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure catchError functions always return source iterator #373

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/asynciterable/creating.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ let value, done;

## Brief Interlude - `AsyncSink`

Very rarely will we ever need to create these async-iterables by hand, however, if you need a collection that you can add to as well as iterate, we have the `AsyncSink` class. This class serves as a basis for some of our operators such as binding to events and DOM and Node.js streams.
Very rarely will we ever need to create these async-iterables by hand, however, if you need a collection that you can add to as well as iterate, we have the `AsyncSink` class. This class serves as a basis for some of our operators such as binding to events and DOM and Node.js streams.

```typescript
import { AsyncSink } from 'ix/asynciterable';
Expand Down
3 changes: 1 addition & 2 deletions docs/asynciterable/transforming.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ await subscription.pipe(
.forEach(handleBatch)
```

Using this operator makes sure that if messages slow down you'll still
handle them in a reasonable time whereas using `buffer` would leave you stuck until you get
Using this operator makes sure that if messages slow down you'll still handle them in a reasonable time whereas using `buffer` would leave you stuck until you get
the right amount of messages.
22 changes: 21 additions & 1 deletion spec/asynciterable-operators/catcherror-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { jest } from '@jest/globals';
import '../asynciterablehelpers.js';
import { of, range, sequenceEqual, single, throwError } from 'ix/asynciterable/index.js';
import {
first,
from,
of,
range,
sequenceEqual,
single,
throwError,
} from 'ix/asynciterable/index.js';
import { catchError } from 'ix/asynciterable/operators/index.js';

test('AsyncIterable#catchError error catches', async () => {
Expand All @@ -26,3 +35,14 @@ test('AsyncIterable#catchError source and handler types are composed', async ()
const res = xs.pipe(catchError(async (_: Error) => of('foo')));
await expect(sequenceEqual(res, xs)).resolves.toBeTruthy();
});

test('AsyncIterable#catchError calls return() on source iterator when stopped early', async () => {
const xs = range(0, 10)[Symbol.asyncIterator]();
const returnSpy = jest.spyOn(xs, 'return');

const res = from(xs).pipe(catchError((_: Error) => from([])));

await first(res);

expect(returnSpy).toHaveBeenCalled();
});
14 changes: 13 additions & 1 deletion spec/asynciterable-operators/skip-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { jest } from '@jest/globals';
import { hasNext, noNext } from '../asynciterablehelpers.js';
import { of, throwError } from 'ix/asynciterable/index.js';
import { as, first, of, range, throwError } from 'ix/asynciterable/index.js';
import { skip } from 'ix/asynciterable/operators/index.js';

test('AsyncIterable#skip skips some', async () => {
Expand Down Expand Up @@ -40,3 +41,14 @@ test('AsyncIterable#skip throws', async () => {
const it = ys[Symbol.asyncIterator]();
await expect(it.next()).rejects.toThrow(err);
});

test('Iterable#skip calls return() on source iterator when stopped early', async () => {
const xs = range(0, 10)[Symbol.asyncIterator]();
const returnSpy = jest.spyOn(xs, 'return');

const res = as(xs).pipe(skip(2));

await first(res);

expect(returnSpy).toHaveBeenCalled();
});
34 changes: 29 additions & 5 deletions spec/asynciterable/catcherror-spec.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
import { jest } from '@jest/globals';
import { skip } from 'ix/asynciterable/operators.js';
import { hasNext } from '../asynciterablehelpers.js';
import { catchError, concat, range, sequenceEqual, throwError } from 'ix/asynciterable/index.js';

test('AsyncIterable#catch with no errors', async () => {
import {
catchError,
concat,
first,
from,
range,
sequenceEqual,
throwError,
} from 'ix/asynciterable/index.js';

test('AsyncIterable#catchError with no errors', async () => {
const res = catchError(range(0, 5), range(5, 5));
expect(await sequenceEqual(res, range(0, 5))).toBeTruthy();
});

test('AsyncIterable#catch with concat error', async () => {
test('AsyncIterable#catchError with concat error', async () => {
const res = catchError(concat(range(0, 5), throwError(new Error())), range(5, 5));

expect(await sequenceEqual(res, range(0, 10))).toBeTruthy();
});

test('AsyncIterable#catch still throws', async () => {
test('AsyncIterable#catchError still throws', async () => {
const e1 = new Error();
const er1 = throwError(e1);

Expand All @@ -31,3 +41,17 @@ test('AsyncIterable#catch still throws', async () => {
await hasNext(it, 3);
await expect(it.next()).rejects.toThrow();
});

test('AsyncIterable#catchError calls return() on source iterator when stopped early', async () => {
const e1 = new Error();
const er1 = throwError(e1);

const xs2 = range(2, 2)[Symbol.asyncIterator]();
const returnSpy = jest.spyOn(xs2, 'return');

const res = catchError(concat(range(0, 2), er1), from(xs2)).pipe(skip(2));

await first(res);

expect(returnSpy).toHaveBeenCalled();
});
14 changes: 13 additions & 1 deletion spec/iterable-operators/catcherror-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { jest } from '@jest/globals';
import '../iterablehelpers';
import { of, range, sequenceEqual, single, throwError } from 'ix/iterable/index.js';
import { first, from, of, range, sequenceEqual, single, throwError } from 'ix/iterable/index.js';
import { catchError } from 'ix/iterable/operators/index.js';

test('Iterable#catchError error catches', () => {
Expand All @@ -26,3 +27,14 @@ test('Iterable#catchError source and handler types are composed', () => {
const res = xs.pipe(catchError((_: Error) => of('foo')));
expect(sequenceEqual(res, xs)).toBeTruthy();
});

test('Iterable#catchError calls return() on source iterator when stopped early', () => {
const xs = range(0, 10)[Symbol.iterator]();
const returnSpy = jest.spyOn(xs, 'return');

const res = from(xs).pipe(catchError((_: Error) => from([])));

first(res);

expect(returnSpy).toHaveBeenCalled();
});
14 changes: 13 additions & 1 deletion spec/iterable-operators/skip-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { jest } from '@jest/globals';
import { hasNext, noNext } from '../iterablehelpers.js';
import { as, throwError } from 'ix/iterable/index.js';
import { as, first, range, throwError } from 'ix/iterable/index.js';
import { skip } from 'ix/iterable/operators/index.js';

test('Iterable#skip skips some', () => {
Expand Down Expand Up @@ -39,3 +40,14 @@ test('Iterable#skip throws', () => {
const it = ys[Symbol.iterator]();
expect(() => it.next()).toThrow();
});

test('Iterable#skip calls return() on source iterator when stopped early', () => {
const xs = range(0, 10)[Symbol.iterator]();
const returnSpy = jest.spyOn(xs, 'return');

const res = as(xs).pipe(skip(2));

first(res);

expect(returnSpy).toHaveBeenCalled();
});
26 changes: 25 additions & 1 deletion spec/iterable/catcherror-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import { jest } from '@jest/globals';
import { skip } from 'ix/iterable/operators.js';
import { hasNext } from '../iterablehelpers.js';
import { catchError, concat, range, sequenceEqual, throwError } from 'ix/iterable/index.js';
import {
from,
catchError,
concat,
range,
sequenceEqual,
throwError,
first,
} from 'ix/iterable/index.js';

test('Iterable.catchError with no errors', () => {
const res = catchError(range(0, 5), range(5, 5));
Expand Down Expand Up @@ -31,3 +41,17 @@ test('Iterable.catchError still throws', () => {
hasNext(it, 3);
expect(() => it.next()).toThrow();
});

test('Iterable.catchError calls return() on source iterator when stopped early', () => {
const e1 = new Error();
const er1 = throwError(e1);

const xs2 = range(2, 2)[Symbol.iterator]();
const returnSpy = jest.spyOn(xs2, 'return');

const res = catchError(concat(range(0, 2), er1), from(xs2)).pipe(skip(2));

first(res);

expect(returnSpy).toHaveBeenCalled();
});
30 changes: 16 additions & 14 deletions src/asynciterable/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
error = null;
hasError = false;

while (1) {
let c = <TSource>{};
try {
while (1) {
let c = <TSource>{};

try {
const { done, value } = await it.next();
if (done) {
await returnAsyncIterator(it);
try {
const { done, value } = await it.next();
if (done) {
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
await returnAsyncIterator(it);
break;
}

yield c;
yield c;
}
} finally {
await returnAsyncIterator(it);
}

if (!hasError) {
Expand Down
29 changes: 16 additions & 13 deletions src/asynciterable/operators/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,26 @@ export class CatchWithAsyncIterable<TSource, TResult> extends AsyncIterableX<TSo
let hasError = false;
const source = wrapWithAbort(this._source, signal);
const it = source[Symbol.asyncIterator]();
while (1) {
let c = <IteratorResult<TSource>>{};

try {
c = await it.next();
if (c.done) {
await returnAsyncIterator(it);
try {
while (1) {
let c = <IteratorResult<TSource>>{};

try {
c = await it.next();
if (c.done) {
break;
}
} catch (e) {
err = await this._handler(e, signal);
hasError = true;
break;
}
} catch (e) {
err = await this._handler(e, signal);
hasError = true;
await returnAsyncIterator(it);
break;
}

yield c.value;
yield c.value;
}
} finally {
await returnAsyncIterator(it);
}

if (hasError) {
Expand Down
18 changes: 12 additions & 6 deletions src/asynciterable/operators/skip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AsyncIterableX } from '../asynciterablex.js';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js';
import { wrapWithAbort } from './withabort.js';
import { throwIfAborted } from '../../aborterror.js';
import { returnAsyncIterator } from '../../util/returniterator.js';

/** @ignore */
export class SkipAsyncIterable<TSource> extends AsyncIterableX<TSource> {
Expand All @@ -20,13 +21,18 @@ export class SkipAsyncIterable<TSource> extends AsyncIterableX<TSource> {
const it = source[Symbol.asyncIterator]();
let count = this._count;
let next;
while (count > 0 && !(next = await it.next()).done) {
count--;
}
if (count <= 0) {
while (!(next = await it.next()).done) {
yield next.value;

try {
while (count > 0 && !(next = await it.next()).done) {
count--;
}
if (count <= 0) {
while (!(next = await it.next()).done) {
yield next.value;
}
}
} finally {
returnAsyncIterator(it);
}
}
}
Expand Down
30 changes: 16 additions & 14 deletions src/iterable/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@ export class CatchIterable<TSource> extends IterableX<TSource> {
error = null;
hasError = false;

while (1) {
let c = <TSource>{};
try {
while (1) {
let c = <TSource>{};

try {
const { done, value } = it.next();
if (done) {
returnIterator(it);
try {
const { done, value } = it.next();
if (done) {
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
returnIterator(it);
break;
}

yield c;
yield c;
}
} finally {
returnIterator(it);
}

if (!hasError) {
Expand Down
Loading
Loading