Skip to content
Draft
15 changes: 15 additions & 0 deletions src/abortcontroller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export function createLinkedAbortController(...signals: AbortSignal[]) {
const controller = new AbortController();

Array.from(signals).forEach((signal) => {
signal.addEventListener(
'abort',
() => {
controller.abort();
},
{ once: true }
);
});

return controller;
}
159 changes: 159 additions & 0 deletions src/asyncobservable/asyncobservablex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import {
AsyncObservable,
AsyncObserver,
AsyncSubscription,
OperatorAsyncObservableFunction,
PartialAsyncObserver,
SYMBOL_ASYNC_DISPOSABLE,
UnaryFunction,
} from '../interfaces';
import { AsyncObserverX } from './asyncobserverx';

class AutoDetachObserver<T> extends AsyncObserverX<T> {
private _observer: AsyncObserver<T>;
private _subscription?: AsyncSubscription;
private _unsubscribing: boolean = false;
private _task?: Promise<void>;

constructor(observer: AsyncObserver<T>) {
super();
this._observer = observer;
}

async assign(subscription: AsyncSubscription) {
let shouldUnsubscribe = false;
if (this._unsubscribing) {
shouldUnsubscribe = true;
} else {
this._subscription = subscription;
}

if (shouldUnsubscribe) {
await this._subscription![SYMBOL_ASYNC_DISPOSABLE]();
}
}

async _next(value: T) {
if (this._unsubscribing) {
return;
}
this._task = this._observer.next(value);
try {
await this._task;
} finally {
this._task = undefined;
}
}

async _error(err: any) {
if (this._unsubscribing) {
return;
}
this._task = this._observer.error(err);
try {
await this._task;
} finally {
await this._finish();
}
}

async _complete() {
if (this._unsubscribing) {
return;
}
this._task = this._observer.complete();
try {
await this._task;
} finally {
await this._finish();
}
}

async [SYMBOL_ASYNC_DISPOSABLE]() {
let subscription;

if (!this._unsubscribing) {
this._unsubscribing = true;
subscription = this._subscription;
}

if (subscription) {
await subscription[SYMBOL_ASYNC_DISPOSABLE]();
}
}

private async _finish() {
let subscription;
if (!this._unsubscribing) {
this._unsubscribing = true;
subscription = this._subscription;
}

this._task = undefined;

if (subscription) {
await subscription[SYMBOL_ASYNC_DISPOSABLE]();
}
}
}

export class SafeObserver<T> extends AsyncObserverX<T> {
private _observer: PartialAsyncObserver<T>;

constructor(observer: PartialAsyncObserver<T>) {
super();
this._observer = observer;
}

async _next(value: T) {
if (this._observer.next) {
await this._observer.next(value);
}
}

async _error(err: any) {
if (this._observer.error) {
await this._observer.error(err);
} else {
throw err;
}
}

async _complete() {
if (this._observer.complete) {
await this._observer.complete();
}
}
}

export abstract class AsyncObservableX<T> implements AsyncObservable<T> {
async subscribeAsync(
observer: PartialAsyncObserver<T>,
signal?: AbortSignal
): Promise<AsyncSubscription> {
const safeObserver = new SafeObserver<T>(observer);
const autoDetachObserver = new AutoDetachObserver<T>(safeObserver);
const subscription = await this._subscribeAsync(autoDetachObserver, signal);
await autoDetachObserver.assign(subscription);
return autoDetachObserver;
}

abstract _subscribeAsync(
observer: AsyncObserver<T>,
signal?: AbortSignal
): Promise<AsyncSubscription>;

/** @nocollapse */
pipe<R>(...operations: UnaryFunction<AsyncObservable<T>, R>[]): R;
pipe<R>(...operations: OperatorAsyncObservableFunction<T, R>[]): AsyncObservableX<R>;
pipe(...args: any[]) {
let i = -1;
const n = args.length;
let acc: any = this;
while (++i < n) {
// TODO: Cast using `as`
acc = args[i](acc);
}
return acc;
}
}
55 changes: 55 additions & 0 deletions src/asyncobservable/asyncobserverx.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { AsyncObserver } from '../interfaces';

enum ObserverState {
Idle,
Busy,
Done,
}

export abstract class AsyncObserverX<T> implements AsyncObserver<T> {
private _state: ObserverState = ObserverState.Idle;

next(value: T) {
this._tryEnter();
try {
return this._next(value);
} finally {
this._state = ObserverState.Idle;
}
}

abstract _next(value: T): Promise<void>;

error(err: any) {
this._tryEnter();
try {
return this._error(err);
} finally {
this._state = ObserverState.Done;
}
}

abstract _error(err: any): Promise<void>;

complete() {
this._tryEnter();
try {
return this._complete();
} finally {
this._state = ObserverState.Done;
}
}

abstract _complete(): Promise<void>;

private _tryEnter() {
const old = this._state;
if (old === ObserverState.Idle) {
this._state = ObserverState.Busy;
} else if (old === ObserverState.Busy) {
throw new Error('Observer is already busy');
} else if (old === ObserverState.Done) {
throw new Error('Observer has already terminated');
}
}
}
26 changes: 26 additions & 0 deletions src/asyncobservable/concurrency/_delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { AbortError } from '../../aborterror';

export function delay(dueTime: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
if (signal.aborted) {
reject(new AbortError());
}

const id = setTimeout(() => {
if (signal.aborted) {
reject(new AbortError());
} else {
resolve();
}
}, dueTime);

signal.addEventListener(
'abort',
() => {
clearTimeout(id);
reject(new AbortError());
},
{ once: true }
);
});
}
70 changes: 70 additions & 0 deletions src/asyncobservable/concurrency/asyncschedulerx.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { throwIfAborted } from 'ix/aborterror';
import { AsyncScheduler, AsyncSubscription, SYMBOL_ASYNC_DISPOSABLE } from '../../interfaces';

function normalizeTime(time: number) {
return time < 0 ? 0 : time;
}

export abstract class AsyncSchedulerX implements AsyncScheduler {
get now() {
return Date.now();
}

scheduleNowAsync(action: (signal: AbortSignal) => Promise<void>, signal?: AbortSignal) {
return this._scheduleAsync(action, signal);
}

scheduleFutureAsync(
action: (signal: AbortSignal) => Promise<void>,
dueTime: number,
signal?: AbortSignal
) {
const newTime = normalizeTime(dueTime);

return this._scheduleAsync(async (innerSignal) => {
await this._delay(newTime, innerSignal);
await action(innerSignal);
}, signal);
}

async _scheduleAsync(
action: (signal: AbortSignal) => Promise<void>,
signal?: AbortSignal
): Promise<AsyncSubscription> {
throwIfAborted(signal);

const cas = new CancellationAsyncSubscription();
cas.link(signal);
await this._scheduleCoreAsync(action, cas.signal);
return cas;
}

abstract _scheduleCoreAsync(
action: (signal: AbortSignal) => Promise<void>,
signal: AbortSignal
): Promise<void>;

abstract _delay(dueTime: number, signal: AbortSignal): Promise<void>;
}

export class CancellationAsyncSubscription implements AsyncSubscription {
private _controller: AbortController;

constructor() {
this._controller = new AbortController();
}

get signal() {
return this._controller.signal;
}

link(signal?: AbortSignal) {
if (signal) {
signal.addEventListener('abort', () => this._controller.abort(), { once: true });
}
}

async [SYMBOL_ASYNC_DISPOSABLE]() {
this._controller.abort();
}
}
20 changes: 20 additions & 0 deletions src/asyncobservable/concurrency/immediateasyncscheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { AsyncSchedulerX } from './asyncschedulerx';
import { delay } from './_delay';

export class ImmediateAsyncScheduler extends AsyncSchedulerX {
private static _instance = new ImmediateAsyncScheduler();
static get instance() {
return ImmediateAsyncScheduler._instance;
}

async _scheduleCoreAsync(
action: (signal: AbortSignal) => Promise<void>,
signal: AbortSignal
): Promise<void> {
await action(signal);
}

async _delay(dueTime: number, signal: AbortSignal): Promise<void> {
await delay(dueTime, signal);
}
}
22 changes: 22 additions & 0 deletions src/asyncobservable/concurrency/microtaskscheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { AsyncSchedulerX } from './asyncschedulerx';
import { delay } from './_delay';

export class MicroTaskAsyncScheduler extends AsyncSchedulerX {
private static _instance = new MicroTaskAsyncScheduler();
static get instance() {
return MicroTaskAsyncScheduler._instance;
}

async _scheduleCoreAsync(
action: (signal: AbortSignal) => Promise<void>,
signal: AbortSignal
): Promise<void> {
return Promise.resolve().then(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return Promise.resolve().then(() => {
return await Promise.resolve().then(() => {

return action(signal);
});
}

async _delay(dueTime: number, signal: AbortSignal): Promise<void> {
await delay(dueTime, signal);
}
}
Loading