From 017a16c33c5bc9355fc782bfc30735a97c58a3a4 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Thu, 18 Feb 2021 01:55:43 -0800 Subject: [PATCH 01/12] typo --- lib/session.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/session.ts b/lib/session.ts index dc010a7..9662616 100644 --- a/lib/session.ts +++ b/lib/session.ts @@ -404,7 +404,7 @@ export class Session extends Entity { * `deliveryDispositionMap`. * - If the user is handling the reconnection of sender link or the underlying connection in it's * app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he - * shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation. + * shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation. * * @return Promise * - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event. From e4b9966227fe920f77456c80916334ec5823a7ac Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Thu, 18 Feb 2021 01:55:58 -0800 Subject: [PATCH 02/12] typo --- lib/connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/connection.ts b/lib/connection.ts index a2abc31..32a0b66 100644 --- a/lib/connection.ts +++ b/lib/connection.ts @@ -166,7 +166,7 @@ export interface ReqResLink { */ receiver: Receiver; /** - * @property {Session} session - The underlying session on whicn the sender and receiver links + * @property {Session} session - The underlying session on which the sender and receiver links * exist. */ session: Session; From ab6e40cc4f6aebe773b818ca29a548ea70d29eb3 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 22:51:45 -0800 Subject: [PATCH 03/12] onDisconnectOccurrence --- lib/connection.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/connection.ts b/lib/connection.ts index 32a0b66..2dc19bc 100644 --- a/lib/connection.ts +++ b/lib/connection.ts @@ -180,6 +180,13 @@ export declare interface Connection { on(event: ConnectionEvents, listener: OnAmqpEvent): this; } +function onDisconnectOccurrence( + context: RheaEventContext, + disconnectEventAudienceMap: Map void> +): void { + disconnectEventAudienceMap.forEach((callback) => callback(context)); +} + /** * Describes the AMQP Connection. * @class Connection From 4b432a726fb112ead3be7a4b136a7e3279ec7c17 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 22:51:54 -0800 Subject: [PATCH 04/12] _disconnectEventAudienceMap --- lib/connection.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/connection.ts b/lib/connection.ts index 2dc19bc..db786c1 100644 --- a/lib/connection.ts +++ b/lib/connection.ts @@ -197,6 +197,15 @@ export class Connection extends Entity { * connection. */ options: ConnectionOptions; + /** + * Maintains a map of the audience(sessions/senders/receivers) interested in "disconnected" event. + * This helps us with not needing to create too many listeners on the "disconnected" event, + * which is particularly useful when dealing with 1000s of sessions at the same time. + */ + _disconnectEventAudienceMap: Map void> = new Map< + string, + (context: RheaEventContext) => void + >(); /** * @property {Container} container The underlying Container instance on which the connection * exists. From 89db282bc40bedf531f051c86dd8ad7b34d48c90 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 22:52:10 -0800 Subject: [PATCH 05/12] Disconnect event for the disconnectEventAudienceMap --- lib/connection.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/connection.ts b/lib/connection.ts index db786c1..2e7427d 100644 --- a/lib/connection.ts +++ b/lib/connection.ts @@ -248,6 +248,11 @@ export class Connection extends Entity { this.options = this._connection.options; this.options.operationTimeoutInSeconds = options.operationTimeoutInSeconds; + // Disconnect event for the disconnectEventAudienceMap + this._connection.on(ConnectionEvents.disconnected, (context) => { + onDisconnectOccurrence(context, this._disconnectEventAudienceMap); + }); + this._initializeEventListeners(); } From 8c82406753fd523dc0edad423212a9dbbc6bcff2 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 22:52:51 -0800 Subject: [PATCH 06/12] delete from map <=> equivalent to removing the disconnected listener --- lib/session.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/session.ts b/lib/session.ts index 9662616..9d7a020 100644 --- a/lib/session.ts +++ b/lib/session.ts @@ -167,6 +167,8 @@ export class Session extends Entity { let waitTimer: any; const removeListeners = () => { + // Remove the listener <=> delete from map + this._connection._disconnectEventAudienceMap.delete(this.id); clearTimeout(waitTimer); this.actionInitiated--; this._session.removeListener(SessionEvents.sessionError, onError); From 328a9a9b4dcae68e43ec6752608c7c1bb7edcb29 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 22:53:59 -0800 Subject: [PATCH 07/12] set id in the map to react to disconnected event <=> equivalent to addign disconnect event --- lib/session.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/lib/session.ts b/lib/session.ts index 9d7a020..e826b9a 100644 --- a/lib/session.ts +++ b/lib/session.ts @@ -218,6 +218,28 @@ export class Session extends Entity { this._session.once(SessionEvents.sessionClose, onClose); this._session.once(SessionEvents.sessionError, onError); this._session.connection.once(ConnectionEvents.disconnected, onDisconnected); + // Add listener <=> set in the map + this._connection._disconnectEventAudienceMap.set( + this.id, (context: RheaEventContext) => { + removeListeners(); + const error = + context.connection && context.connection.error + ? context.connection.error + : context.error; + console.log( + "[%s] Connection got disconnected while closing amqp session '%s': %O.", + this.connection.id, + this.id, + error + ); + log.error( + "[%s] Connection got disconnected while closing amqp session '%s': %O.", + this.connection.id, + this.id, + error + ); + } + ); log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id); waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); this._session.close(); From d88ad6357065f33f53f15eaa3f572d2198bc886e Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 22:54:16 -0800 Subject: [PATCH 08/12] remove console.log --- lib/session.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lib/session.ts b/lib/session.ts index e826b9a..5726dc1 100644 --- a/lib/session.ts +++ b/lib/session.ts @@ -226,12 +226,6 @@ export class Session extends Entity { context.connection && context.connection.error ? context.connection.error : context.error; - console.log( - "[%s] Connection got disconnected while closing amqp session '%s': %O.", - this.connection.id, - this.id, - error - ); log.error( "[%s] Connection got disconnected while closing amqp session '%s': %O.", this.connection.id, From 69864ac8cce3a2f8de32d4deb62b56cac4039365 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 22:55:42 -0800 Subject: [PATCH 09/12] A new test --- test/session.spec.ts | 65 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 9 deletions(-) diff --git a/test/session.spec.ts b/test/session.spec.ts index b9b1e41..704b5d2 100644 --- a/test/session.spec.ts +++ b/test/session.spec.ts @@ -42,6 +42,41 @@ describe("Session", () => { assert.isFalse(session.isOpen(), "Session should not be open."); }); + it("Single disconnected listener shared among all the sessions when close() is called in parallel", async () => { + const _connection = (connection as any)._connection; + const getDisconnectListenerCount = () => { + return _connection.listenerCount(rhea.ConnectionEvents.disconnected); + }; + const sessions: Session[] = []; + const sessionCount = 100; + for (let i = 0; i < sessionCount; i++) { + const session = await connection.createSession(); + sessions.push(session); + } + const disconnectListenerCountBefore = getDisconnectListenerCount(); + await Promise.all( + sessions + .map((session) => { + session.close(); + }) + .concat([ + (() => { + assert.equal( + getDisconnectListenerCount(), + sessionCount + disconnectListenerCountBefore, + `Unexpected number of "disconnected" listeners` + ); + _connection.emit(rhea.ConnectionEvents.disconnected, {}); + })(), + ]) + ); + assert.equal( + getDisconnectListenerCount(), + disconnectListenerCountBefore, + `Unexpected number of "disconnected" listeners after sessions were closed` + ); + }); + it(".remove() removes event listeners", async () => { const session = new Session( connection, @@ -73,7 +108,6 @@ describe("Session", () => { assert.strictEqual(session.listenerCount(SessionEvents.sessionOpen), 0); }); - describe("supports events", () => { it("sessionOpen", (done: Function) => { const session = new Session( @@ -137,7 +171,10 @@ describe("Session", () => { session.on(SessionEvents.sessionError, async (event) => { assert.exists(event, "Expected an AMQP event."); - assert.exists(event.session, "Expected session to be defined on AMQP event."); + assert.exists( + event.session, + "Expected session to be defined on AMQP event." + ); if (event.session) { const error = event.session.error as rhea.ConnectionError; assert.exists(error, "Expected an AMQP error."); @@ -173,7 +210,7 @@ describe("Session", () => { session.on(SessionEvents.sessionOpen, async () => { try { await session.close(); - throw new Error("boo") + throw new Error("boo"); } catch (error) { assert.exists(error, "Expected an AMQP error."); assert.strictEqual(error.condition, errorCondition); @@ -212,9 +249,12 @@ describe("Session", () => { abortErrorThrown = error.name === abortErrorName; } - assert.isTrue(abortErrorThrown, "AbortError should have been thrown.") + assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); assert.isFalse(session.isOpen(), "Session should not be open."); - assert.isTrue(session["_session"].is_remote_open(), "Session remote endpoint should not have gotten a chance to close."); + assert.isTrue( + session["_session"].is_remote_open(), + "Session remote endpoint should not have gotten a chance to close." + ); await connection.close(); }); @@ -243,9 +283,12 @@ describe("Session", () => { abortErrorThrown = error.name === abortErrorName; } - assert.isTrue(abortErrorThrown, "AbortError should have been thrown.") + assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); assert.isFalse(session.isOpen(), "Session should not be open."); - assert.isTrue(session["_session"].is_remote_open(), "Session remote endpoint should not have gotten a chance to close."); + assert.isTrue( + session["_session"].is_remote_open(), + "Session remote endpoint should not have gotten a chance to close." + ); await connection.close(); }); @@ -315,7 +358,9 @@ describe("Session", () => { // Pass an already aborted signal to createAwaitableSender() abortController.abort(); - const createAwaitableSenderPromise = session.createAwaitableSender({ abortSignal }); + const createAwaitableSenderPromise = session.createAwaitableSender({ + abortSignal, + }); let abortErrorThrown = false; try { @@ -340,7 +385,9 @@ describe("Session", () => { const abortSignal = abortController.signal; // Abort the signal after passing it to createAwaitableSender() - const createAwaitableSenderPromise = session.createAwaitableSender({ abortSignal }); + const createAwaitableSenderPromise = session.createAwaitableSender({ + abortSignal, + }); abortController.abort(); let abortErrorThrown = false; From fbbc0202c939ac231acb38f3e17536472b8441e7 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 23:01:47 -0800 Subject: [PATCH 10/12] Do not add and remove disconnected listeners --- lib/session.ts | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/lib/session.ts b/lib/session.ts index 5726dc1..7e2b8f9 100644 --- a/lib/session.ts +++ b/lib/session.ts @@ -167,13 +167,11 @@ export class Session extends Entity { let waitTimer: any; const removeListeners = () => { - // Remove the listener <=> delete from map - this._connection._disconnectEventAudienceMap.delete(this.id); clearTimeout(waitTimer); this.actionInitiated--; this._session.removeListener(SessionEvents.sessionError, onError); this._session.removeListener(SessionEvents.sessionClose, onClose); - this._session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); + this._connection._disconnectEventAudienceMap.delete(this.id); if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); } }; @@ -217,22 +215,9 @@ export class Session extends Entity { // listeners that we add for completing the operation are added directly to rhea's objects. this._session.once(SessionEvents.sessionClose, onClose); this._session.once(SessionEvents.sessionError, onError); - this._session.connection.once(ConnectionEvents.disconnected, onDisconnected); - // Add listener <=> set in the map this._connection._disconnectEventAudienceMap.set( - this.id, (context: RheaEventContext) => { - removeListeners(); - const error = - context.connection && context.connection.error - ? context.connection.error - : context.error; - log.error( - "[%s] Connection got disconnected while closing amqp session '%s': %O.", - this.connection.id, - this.id, - error - ); - } + this.id, + onDisconnected ); log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id); waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); From 9649630c5f2a4ec9d6d7448f00d4b5d39258c791 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 23:41:52 -0800 Subject: [PATCH 11/12] check the callbacks are called and the listener count doesn't blow up --- test/session.spec.ts | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/test/session.spec.ts b/test/session.spec.ts index 704b5d2..a1a8ecf 100644 --- a/test/session.spec.ts +++ b/test/session.spec.ts @@ -47,11 +47,13 @@ describe("Session", () => { const getDisconnectListenerCount = () => { return _connection.listenerCount(rhea.ConnectionEvents.disconnected); }; + const sessionCount = 1000; const sessions: Session[] = []; - const sessionCount = 100; + const callbackCalledForSessionId: { [key: string]: boolean } = {}; for (let i = 0; i < sessionCount; i++) { const session = await connection.createSession(); sessions.push(session); + callbackCalledForSessionId[session.id] = false; } const disconnectListenerCountBefore = getDisconnectListenerCount(); await Promise.all( @@ -63,13 +65,37 @@ describe("Session", () => { (() => { assert.equal( getDisconnectListenerCount(), - sessionCount + disconnectListenerCountBefore, - `Unexpected number of "disconnected" listeners` + disconnectListenerCountBefore, + `Unexpected number of "disconnected" listeners - originated from the close() calls` ); + assert.equal( + connection._disconnectEventAudienceMap.size, + sessionCount, + `Unexpected number of items in _disconnectEventAudienceMap` + ); + for (let [ + key, + callback, + ] of connection._disconnectEventAudienceMap) { + connection._disconnectEventAudienceMap.set( + key, + (context: rhea.EventContext) => { + callbackCalledForSessionId[key] = true; + callback(context); + } + ); + } _connection.emit(rhea.ConnectionEvents.disconnected, {}); })(), ]) ); + for (const session of sessions) { + assert.equal( + callbackCalledForSessionId[session.id as string], + true, + `callback not called for ${session.id} - this is unexpected` + ); + } assert.equal( getDisconnectListenerCount(), disconnectListenerCountBefore, From a747bd0ca99c5f0672bc670923730b56ef2b759c Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Sun, 21 Feb 2021 23:43:32 -0800 Subject: [PATCH 12/12] update comment --- lib/connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/connection.ts b/lib/connection.ts index 2e7427d..8ba3939 100644 --- a/lib/connection.ts +++ b/lib/connection.ts @@ -248,7 +248,7 @@ export class Connection extends Entity { this.options = this._connection.options; this.options.operationTimeoutInSeconds = options.operationTimeoutInSeconds; - // Disconnect event for the disconnectEventAudienceMap + // Disconnected event listener for the disconnectEventAudienceMap this._connection.on(ConnectionEvents.disconnected, (context) => { onDisconnectOccurrence(context, this._disconnectEventAudienceMap); });