diff --git a/lib/connection.ts b/lib/connection.ts index a2abc31..8ba3939 100644 --- a/lib/connection.ts +++ b/lib/connection.ts @@ -166,7 +166,7 @@ export interface ReqResLink { */ receiver: Receiver; /** - * @property {Session} session - The underlying session on whicn the sender and receiver links + * @property {Session} session - The underlying session on which the sender and receiver links * exist. */ session: Session; @@ -180,6 +180,13 @@ export declare interface Connection { on(event: ConnectionEvents, listener: OnAmqpEvent): this; } +function onDisconnectOccurrence( + context: RheaEventContext, + disconnectEventAudienceMap: Map void> +): void { + disconnectEventAudienceMap.forEach((callback) => callback(context)); +} + /** * Describes the AMQP Connection. * @class Connection @@ -190,6 +197,15 @@ export class Connection extends Entity { * connection. */ options: ConnectionOptions; + /** + * Maintains a map of the audience(sessions/senders/receivers) interested in "disconnected" event. + * This helps us with not needing to create too many listeners on the "disconnected" event, + * which is particularly useful when dealing with 1000s of sessions at the same time. + */ + _disconnectEventAudienceMap: Map void> = new Map< + string, + (context: RheaEventContext) => void + >(); /** * @property {Container} container The underlying Container instance on which the connection * exists. @@ -232,6 +248,11 @@ export class Connection extends Entity { this.options = this._connection.options; this.options.operationTimeoutInSeconds = options.operationTimeoutInSeconds; + // Disconnected event listener for the disconnectEventAudienceMap + this._connection.on(ConnectionEvents.disconnected, (context) => { + onDisconnectOccurrence(context, this._disconnectEventAudienceMap); + }); + this._initializeEventListeners(); } diff --git a/lib/session.ts b/lib/session.ts index dc010a7..7e2b8f9 100644 --- a/lib/session.ts +++ b/lib/session.ts @@ -171,7 +171,7 @@ export class Session extends Entity { this.actionInitiated--; this._session.removeListener(SessionEvents.sessionError, onError); this._session.removeListener(SessionEvents.sessionClose, onClose); - this._session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); + this._connection._disconnectEventAudienceMap.delete(this.id); if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); } }; @@ -215,7 +215,10 @@ export class Session extends Entity { // listeners that we add for completing the operation are added directly to rhea's objects. this._session.once(SessionEvents.sessionClose, onClose); this._session.once(SessionEvents.sessionError, onError); - this._session.connection.once(ConnectionEvents.disconnected, onDisconnected); + this._connection._disconnectEventAudienceMap.set( + this.id, + onDisconnected + ); log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id); waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); this._session.close(); @@ -404,7 +407,7 @@ export class Session extends Entity { * `deliveryDispositionMap`. * - If the user is handling the reconnection of sender link or the underlying connection in it's * app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he - * shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation. + * shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation. * * @return Promise * - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event. diff --git a/test/session.spec.ts b/test/session.spec.ts index b9b1e41..a1a8ecf 100644 --- a/test/session.spec.ts +++ b/test/session.spec.ts @@ -42,6 +42,67 @@ describe("Session", () => { assert.isFalse(session.isOpen(), "Session should not be open."); }); + it("Single disconnected listener shared among all the sessions when close() is called in parallel", async () => { + const _connection = (connection as any)._connection; + const getDisconnectListenerCount = () => { + return _connection.listenerCount(rhea.ConnectionEvents.disconnected); + }; + const sessionCount = 1000; + const sessions: Session[] = []; + const callbackCalledForSessionId: { [key: string]: boolean } = {}; + for (let i = 0; i < sessionCount; i++) { + const session = await connection.createSession(); + sessions.push(session); + callbackCalledForSessionId[session.id] = false; + } + const disconnectListenerCountBefore = getDisconnectListenerCount(); + await Promise.all( + sessions + .map((session) => { + session.close(); + }) + .concat([ + (() => { + assert.equal( + getDisconnectListenerCount(), + disconnectListenerCountBefore, + `Unexpected number of "disconnected" listeners - originated from the close() calls` + ); + assert.equal( + connection._disconnectEventAudienceMap.size, + sessionCount, + `Unexpected number of items in _disconnectEventAudienceMap` + ); + for (let [ + key, + callback, + ] of connection._disconnectEventAudienceMap) { + connection._disconnectEventAudienceMap.set( + key, + (context: rhea.EventContext) => { + callbackCalledForSessionId[key] = true; + callback(context); + } + ); + } + _connection.emit(rhea.ConnectionEvents.disconnected, {}); + })(), + ]) + ); + for (const session of sessions) { + assert.equal( + callbackCalledForSessionId[session.id as string], + true, + `callback not called for ${session.id} - this is unexpected` + ); + } + assert.equal( + getDisconnectListenerCount(), + disconnectListenerCountBefore, + `Unexpected number of "disconnected" listeners after sessions were closed` + ); + }); + it(".remove() removes event listeners", async () => { const session = new Session( connection, @@ -73,7 +134,6 @@ describe("Session", () => { assert.strictEqual(session.listenerCount(SessionEvents.sessionOpen), 0); }); - describe("supports events", () => { it("sessionOpen", (done: Function) => { const session = new Session( @@ -137,7 +197,10 @@ describe("Session", () => { session.on(SessionEvents.sessionError, async (event) => { assert.exists(event, "Expected an AMQP event."); - assert.exists(event.session, "Expected session to be defined on AMQP event."); + assert.exists( + event.session, + "Expected session to be defined on AMQP event." + ); if (event.session) { const error = event.session.error as rhea.ConnectionError; assert.exists(error, "Expected an AMQP error."); @@ -173,7 +236,7 @@ describe("Session", () => { session.on(SessionEvents.sessionOpen, async () => { try { await session.close(); - throw new Error("boo") + throw new Error("boo"); } catch (error) { assert.exists(error, "Expected an AMQP error."); assert.strictEqual(error.condition, errorCondition); @@ -212,9 +275,12 @@ describe("Session", () => { abortErrorThrown = error.name === abortErrorName; } - assert.isTrue(abortErrorThrown, "AbortError should have been thrown.") + assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); assert.isFalse(session.isOpen(), "Session should not be open."); - assert.isTrue(session["_session"].is_remote_open(), "Session remote endpoint should not have gotten a chance to close."); + assert.isTrue( + session["_session"].is_remote_open(), + "Session remote endpoint should not have gotten a chance to close." + ); await connection.close(); }); @@ -243,9 +309,12 @@ describe("Session", () => { abortErrorThrown = error.name === abortErrorName; } - assert.isTrue(abortErrorThrown, "AbortError should have been thrown.") + assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); assert.isFalse(session.isOpen(), "Session should not be open."); - assert.isTrue(session["_session"].is_remote_open(), "Session remote endpoint should not have gotten a chance to close."); + assert.isTrue( + session["_session"].is_remote_open(), + "Session remote endpoint should not have gotten a chance to close." + ); await connection.close(); }); @@ -315,7 +384,9 @@ describe("Session", () => { // Pass an already aborted signal to createAwaitableSender() abortController.abort(); - const createAwaitableSenderPromise = session.createAwaitableSender({ abortSignal }); + const createAwaitableSenderPromise = session.createAwaitableSender({ + abortSignal, + }); let abortErrorThrown = false; try { @@ -340,7 +411,9 @@ describe("Session", () => { const abortSignal = abortController.signal; // Abort the signal after passing it to createAwaitableSender() - const createAwaitableSenderPromise = session.createAwaitableSender({ abortSignal }); + const createAwaitableSenderPromise = session.createAwaitableSender({ + abortSignal, + }); abortController.abort(); let abortErrorThrown = false;