From f3624c5a4382cfec096e335d4adf34ff660dea7a Mon Sep 17 00:00:00 2001 From: Harvey Woo Date: Sun, 10 Dec 2023 15:28:16 +0000 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20add=20support=20for=20iterator/asyn?= =?UTF-8?q?c=20iterator/acquire=20abortSignal,=20etc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/index.test.ts | 98 ++++++++++++++++++++++++++++++++++- src/pool.ts | 128 +++++++++++++++++++++++++++++++++------------- 2 files changed, 190 insertions(+), 36 deletions(-) diff --git a/src/index.test.ts b/src/index.test.ts index 011c5bc..89f6cd4 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -59,6 +59,28 @@ describe('Pool', () => { expect(pool.size).toBe(1); }); + it('should throw error when abort signal which is passed to acquire is aborted', async () => { + pool.acquire(); + pool.acquire(); + pool.acquire(); + + expect(pool.size).toBe(0); + + expect(() => { + const abortCtrl = new AbortController(); + abortCtrl.abort(); + const acquirePromise = pool.acquire(true, abortCtrl.signal); + return acquirePromise; + }).rejects.toThrow(); + + expect(() => { + const abortCtrl = new AbortController(); + const acquirePromise = pool.acquire(true, abortCtrl.signal); + abortCtrl.abort(); + return acquirePromise; + }).rejects.toThrow(); + }); + it('should limit concurrent execution with limiter', async () => { const limiter = pool.limit({ minDuration: 100 }); const delays = [50, 200, 300, 400, 500]; @@ -285,7 +307,81 @@ describe('Pool', () => { limiter.abort(); // biome-ignore lint/style/noNonNullAssertion: pool.release(resource1!); - await expect(result).rejects.toThrow('user abort'); + await expect(result).rejects.toThrow(); expect(fn).toHaveBeenCalledTimes(0); }); + + it('should resolve when all pool resources are released', async () => { + const pool = new Pool({ + create: (i) => (i >= 3 ? undefined : { value: i }), + reset: (item) => {}, + initialSize: 3, + }); + const resource1 = pool.acquire(); + const resource2 = pool.acquire(); + const resource3 = pool.acquire(); + + const resolve = jest.fn(); + + pool.then(resolve); + + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource1!); + await wait(10); + expect(resolve).toHaveBeenCalledTimes(0); + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource2!); + await wait(10); + expect(resolve).toHaveBeenCalledTimes(0); + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource3!); + await wait(10); + expect(resolve).toHaveBeenCalledTimes(1); + }); + + it('should iterate resources', async () => { + const pool = new Pool({ + create: (i) => (i >= 3 ? undefined : { value: i }), + reset: (item) => {}, + initialSize: 3, + }); + const resource1 = pool.acquire(); + const resource2 = pool.acquire(); + const resource3 = pool.acquire(); + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource1!); + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource2!); + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource3!); + + const resources = [...pool]; + expect(resources).toEqual([resource1, resource2, resource3]); + }); + + it('should async iterate resources', async () => { + const pool = new Pool({ + create: (i) => (i >= 3 ? undefined : { value: i }), + reset: (item) => {}, + initialSize: 3, + }); + const resource1 = pool.acquire(); + const resource2 = pool.acquire(); + const resource3 = pool.acquire(); + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource1!); + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource2!); + // biome-ignore lint/style/noNonNullAssertion: + pool.release(resource3!); + + const resources: { value: number }[] = []; + for await (const resource of pool) { + resources.push(resource); + if (resources.length === 3) { + break; + } + } + expect(resources).toEqual([resource1, resource2, resource3]); + }); }); diff --git a/src/pool.ts b/src/pool.ts index b6a15f4..4b066cd 100644 --- a/src/pool.ts +++ b/src/pool.ts @@ -21,6 +21,25 @@ export interface CreateLimiterOptions { abortError?: unknown; } +interface AbortSignalLike { + aborted: boolean; + reason?: unknown; + addEventListener(type: 'abort', listener: () => void): void; + removeEventListener(type: 'abort', listener: () => void): void; +} + +function waitAbortSignal(signal: AbortSignalLike) { + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(signal.reason); + return; + } + signal.addEventListener('abort', () => { + reject(signal.reason); + }); + }); +} + /** * Options for creating a new `Pool` instance. */ @@ -92,7 +111,9 @@ function normalizeOptions( * Represents a generic object pool. * @template T The type of objects in the pool. */ -export class Pool { +export class Pool + implements PromiseLike +{ // events handling, can be replaced with EventEmitter #handlers = { @@ -158,11 +179,18 @@ export class Pool { #resources: Set = new Set(); - #inUse: Set = new Set(); + // Map of inUser resources and their release promise resolvers + #inUse = new Map< + T, + { + resolve: (resource: T) => void; + promise: Promise; + } + >(); /** exsiting resource size, excluding the ones not created yet */ get totalSize() { - return this.#inUse.size + this.#resources.size; + return this.inUseSize + this.size; } /** get inUse resource size */ @@ -175,34 +203,27 @@ export class Pool { return this.#resources.size; } - #untilRelease() { - return new Promise((resolve) => { - const releaseListener = (item: T) => { - this.off('release', releaseListener); - resolve(); - }; - this.on('release', releaseListener); - }); - } - /** * Acquires a resource from the pool. * @param wait if true, will wait until a resource is released * @returns if `wait` is true, returns a promise that resolves to the acquired resource, otherwise returns the acquired resource or undefined */ acquire(wait?: false): T | undefined; - acquire(wait: true): Promise; - acquire(wait?: boolean): Promise | T | undefined { + acquire(wait: true, abortSignal?: AbortSignalLike): Promise; + acquire( + wait?: boolean, + abortSignal?: AbortSignalLike, + ): Promise | T | undefined { if (this.#resources.size) { const item = this.#resources.values().next().value; this.#resources.delete(item); - this.#inUse.add(item); + this.#inUse.set(item, withResolvers()); this.#emit('acquire', item); return item; } const createdItem = this.#create(this.totalSize); if (createdItem !== undefined) { - this.#inUse.add(createdItem); + this.#inUse.set(createdItem, withResolvers()); this.#emit('acquire', createdItem); return createdItem; } @@ -211,22 +232,25 @@ export class Pool { return undefined; } - return this.#untilRelease().then(() => { - return this.acquire(true); - }); + return Promise.race([ + ...Array.from(this.#inUse.values()).map((resolvers) => resolvers.promise), + abortSignal ? waitAbortSignal(abortSignal) : new Promise(() => {}), + ]).then(() => this.acquire(true, abortSignal)); } /** * Releases a resource and puts it back into the pool. * @param item The resource to release. */ release(item: T) { - if (this.#inUse.has(item)) { + const resolvers = this.#inUse.get(item); + if (resolvers) { this.#inUse.delete(item); if (this.#reset) { this.#reset(item); } this.#emit('release', item); this.#resources.add(item); + resolvers.resolve(item); } } /** @@ -237,6 +261,48 @@ export class Pool { this.#resources.clear(); } + /** + * Returns a promise that resolves when all resources are released. + * @param onfulfilled + * @param onrejected + * @returns + */ + then( + onfulfilled?: (() => TResult1 | PromiseLike) | undefined | null, + onrejected?: + | ((reason: unknown) => TResult2 | PromiseLike) + | undefined + | null, + ): Promise { + return Promise.all( + Array.from(this.#inUse.values()).map((resolvers) => resolvers.promise), + ).then(onfulfilled, onrejected); + } + + [Symbol.asyncIterator]() { + return { + next: async () => { + const item = await this.acquire(true); + return { + done: item === undefined, + value: item, + }; + }, + }; + } + + [Symbol.iterator]() { + return { + next: () => { + const item = this.acquire(); + return { + done: item === undefined, + value: item, + }; + }, + }; + } + /** * Creates a new `Pool` instance from an existing array. * @param items An array of items to initialize the pool with. @@ -267,21 +333,16 @@ export class Pool { abortError = new Error('user abort'), }: CreateLimiterOptions = {}): Limiter { const pool = this; - const aborts = new Set<(reason: unknown) => void>(); + const abortCtrl = new AbortController(); return Object.assign( - async function limited(fn, ...args) { - const resolvers = withResolvers(); - const abort = (reason: unknown) => { - resolvers.reject(reason); - }; - aborts.add(abort); + async function limited(fn, ...args: readonly unknown[]) { // biome-ignore lint/suspicious/noAsyncPromiseExecutor: - return new Promise(async (resolve, reject) => { + return new Promise>(async (resolve, reject) => { // mybe we can use this in the future // await using ctx = await pool.acquire(true); let ctx: T | undefined; try { - ctx = await Promise.race([pool.acquire(true), resolvers.promise]); + ctx = await pool.acquire(true, abortCtrl.signal); const start = Date.now(); const result = await fn.call(ctx, ...args); resolve(result); @@ -295,18 +356,15 @@ export class Pool { if (ctx) { pool.release(ctx); } - aborts.delete(abort); } }); }, { abort(abortedReason: unknown = abortError) { - for (const abort of aborts.values()) { - abort(abortedReason); - } + abortCtrl.abort(abortedReason); }, }, - ) as Limiter; + ); } /**