Skip to content

Commit

Permalink
✨ add support for iterator/async iterator/acquire abortSignal, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
harvey-woo committed Dec 10, 2023
1 parent 5d5948e commit f3624c5
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 36 deletions.
98 changes: 97 additions & 1 deletion src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ describe('Pool', () => {
expect(pool.size).toBe(1);
});

it('should throw error when abort signal which is passed to acquire is aborted', async () => {
pool.acquire();
pool.acquire();
pool.acquire();

expect(pool.size).toBe(0);

expect(() => {
const abortCtrl = new AbortController();
abortCtrl.abort();
const acquirePromise = pool.acquire(true, abortCtrl.signal);
return acquirePromise;
}).rejects.toThrow();

expect(() => {
const abortCtrl = new AbortController();
const acquirePromise = pool.acquire(true, abortCtrl.signal);
abortCtrl.abort();
return acquirePromise;
}).rejects.toThrow();
});

it('should limit concurrent execution with limiter', async () => {
const limiter = pool.limit({ minDuration: 100 });
const delays = [50, 200, 300, 400, 500];
Expand Down Expand Up @@ -285,7 +307,81 @@ describe('Pool', () => {
limiter.abort();
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource1!);
await expect(result).rejects.toThrow('user abort');
await expect(result).rejects.toThrow();
expect(fn).toHaveBeenCalledTimes(0);
});

it('should resolve when all pool resources are released', async () => {
const pool = new Pool({
create: (i) => (i >= 3 ? undefined : { value: i }),
reset: (item) => {},
initialSize: 3,
});
const resource1 = pool.acquire();
const resource2 = pool.acquire();
const resource3 = pool.acquire();

const resolve = jest.fn();

pool.then(resolve);

// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource1!);
await wait(10);
expect(resolve).toHaveBeenCalledTimes(0);
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource2!);
await wait(10);
expect(resolve).toHaveBeenCalledTimes(0);
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource3!);
await wait(10);
expect(resolve).toHaveBeenCalledTimes(1);
});

it('should iterate resources', async () => {
const pool = new Pool({
create: (i) => (i >= 3 ? undefined : { value: i }),
reset: (item) => {},
initialSize: 3,
});
const resource1 = pool.acquire();
const resource2 = pool.acquire();
const resource3 = pool.acquire();
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource1!);
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource2!);
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource3!);

const resources = [...pool];
expect(resources).toEqual([resource1, resource2, resource3]);
});

it('should async iterate resources', async () => {
const pool = new Pool({
create: (i) => (i >= 3 ? undefined : { value: i }),
reset: (item) => {},
initialSize: 3,
});
const resource1 = pool.acquire();
const resource2 = pool.acquire();
const resource3 = pool.acquire();
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource1!);
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource2!);
// biome-ignore lint/style/noNonNullAssertion: <explanation>
pool.release(resource3!);

const resources: { value: number }[] = [];
for await (const resource of pool) {
resources.push(resource);
if (resources.length === 3) {
break;
}
}
expect(resources).toEqual([resource1, resource2, resource3]);
});
});
128 changes: 93 additions & 35 deletions src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ export interface CreateLimiterOptions {
abortError?: unknown;
}

interface AbortSignalLike {
aborted: boolean;
reason?: unknown;
addEventListener(type: 'abort', listener: () => void): void;
removeEventListener(type: 'abort', listener: () => void): void;
}

function waitAbortSignal(signal: AbortSignalLike) {
return new Promise<void>((resolve, reject) => {
if (signal.aborted) {
reject(signal.reason);
return;
}
signal.addEventListener('abort', () => {
reject(signal.reason);
});
});
}

/**
* Options for creating a new `Pool` instance.
*/
Expand Down Expand Up @@ -92,7 +111,9 @@ function normalizeOptions<T extends Resource & object = Resource>(
* Represents a generic object pool.
* @template T The type of objects in the pool.
*/
export class Pool<T extends Resource & object = Resource> {
export class Pool<T extends Resource & object = Resource>
implements PromiseLike<void>
{
// events handling, can be replaced with EventEmitter

#handlers = {
Expand Down Expand Up @@ -158,11 +179,18 @@ export class Pool<T extends Resource & object = Resource> {

#resources: Set<T> = new Set<T>();

#inUse: Set<T> = new Set<T>();
// Map of inUser resources and their release promise resolvers
#inUse = new Map<
T,
{
resolve: (resource: T) => void;
promise: Promise<T>;
}
>();

/** exsiting resource size, excluding the ones not created yet */
get totalSize() {
return this.#inUse.size + this.#resources.size;
return this.inUseSize + this.size;
}

/** get inUse resource size */
Expand All @@ -175,34 +203,27 @@ export class Pool<T extends Resource & object = Resource> {
return this.#resources.size;
}

#untilRelease() {
return new Promise<void>((resolve) => {
const releaseListener = (item: T) => {
this.off('release', releaseListener);
resolve();
};
this.on('release', releaseListener);
});
}

/**
* Acquires a resource from the pool.
* @param wait if true, will wait until a resource is released
* @returns if `wait` is true, returns a promise that resolves to the acquired resource, otherwise returns the acquired resource or undefined
*/
acquire(wait?: false): T | undefined;
acquire(wait: true): Promise<T>;
acquire(wait?: boolean): Promise<T> | T | undefined {
acquire(wait: true, abortSignal?: AbortSignalLike): Promise<T>;
acquire(
wait?: boolean,
abortSignal?: AbortSignalLike,
): Promise<T> | T | undefined {
if (this.#resources.size) {
const item = this.#resources.values().next().value;
this.#resources.delete(item);
this.#inUse.add(item);
this.#inUse.set(item, withResolvers<T>());
this.#emit('acquire', item);
return item;
}
const createdItem = this.#create(this.totalSize);
if (createdItem !== undefined) {
this.#inUse.add(createdItem);
this.#inUse.set(createdItem, withResolvers<T>());
this.#emit('acquire', createdItem);
return createdItem;
}
Expand All @@ -211,22 +232,25 @@ export class Pool<T extends Resource & object = Resource> {
return undefined;
}

return this.#untilRelease().then(() => {
return this.acquire(true);
});
return Promise.race([
...Array.from(this.#inUse.values()).map((resolvers) => resolvers.promise),
abortSignal ? waitAbortSignal(abortSignal) : new Promise(() => {}),
]).then(() => this.acquire(true, abortSignal));
}
/**
* Releases a resource and puts it back into the pool.
* @param item The resource to release.
*/
release(item: T) {
if (this.#inUse.has(item)) {
const resolvers = this.#inUse.get(item);
if (resolvers) {
this.#inUse.delete(item);
if (this.#reset) {
this.#reset(item);
}
this.#emit('release', item);
this.#resources.add(item);
resolvers.resolve(item);
}
}
/**
Expand All @@ -237,6 +261,48 @@ export class Pool<T extends Resource & object = Resource> {
this.#resources.clear();
}

/**
* Returns a promise that resolves when all resources are released.
* @param onfulfilled
* @param onrejected
* @returns
*/
then<TResult1 = void, TResult2 = never>(
onfulfilled?: (() => TResult1 | PromiseLike<TResult1>) | undefined | null,
onrejected?:
| ((reason: unknown) => TResult2 | PromiseLike<TResult2>)
| undefined
| null,
): Promise<TResult1 | TResult2> {
return Promise.all(
Array.from(this.#inUse.values()).map((resolvers) => resolvers.promise),
).then(onfulfilled, onrejected);
}

[Symbol.asyncIterator]() {
return {
next: async () => {
const item = await this.acquire(true);
return {
done: item === undefined,
value: item,
};
},
};
}

[Symbol.iterator]() {
return {
next: () => {
const item = this.acquire();
return {
done: item === undefined,
value: item,
};
},
};
}

/**
* Creates a new `Pool` instance from an existing array.
* @param items An array of items to initialize the pool with.
Expand Down Expand Up @@ -267,21 +333,16 @@ export class Pool<T extends Resource & object = Resource> {
abortError = new Error('user abort'),
}: CreateLimiterOptions = {}): Limiter<T> {
const pool = this;
const aborts = new Set<(reason: unknown) => void>();
const abortCtrl = new AbortController();
return Object.assign(
async function limited(fn, ...args) {
const resolvers = withResolvers<T>();
const abort = (reason: unknown) => {
resolvers.reject(reason);
};
aborts.add(abort);
async function limited(fn, ...args: readonly unknown[]) {
// biome-ignore lint/suspicious/noAsyncPromiseExecutor: <explanation>
return new Promise(async (resolve, reject) => {
return new Promise<ReturnType<typeof fn>>(async (resolve, reject) => {
// mybe we can use this in the future
// await using ctx = await pool.acquire(true);
let ctx: T | undefined;
try {
ctx = await Promise.race([pool.acquire(true), resolvers.promise]);
ctx = await pool.acquire(true, abortCtrl.signal);
const start = Date.now();
const result = await fn.call(ctx, ...args);
resolve(result);
Expand All @@ -295,18 +356,15 @@ export class Pool<T extends Resource & object = Resource> {
if (ctx) {
pool.release(ctx);
}
aborts.delete(abort);
}
});
},
{
abort(abortedReason: unknown = abortError) {
for (const abort of aborts.values()) {
abort(abortedReason);
}
abortCtrl.abort(abortedReason);
},
},
) as Limiter<T>;
);
}

/**
Expand Down

0 comments on commit f3624c5

Please sign in to comment.