From 3aec66e765bf9b1e945b27590d0d357cbcc884c6 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 26 Jan 2022 23:22:24 +0000 Subject: [PATCH 1/3] [core-amqp] set the max listener limit to 1000 for sender and receiver NodeJS would issue warning if the number of disconnected listeners on an event emitter exceeds 10. When many sessions on a connection are closed but the removal of the listener hasn't caught up, we will see this warning because the default limit of 10 in NodeJS is too low. The disconnected listeners DO get removed eventually in our code. We already do this for senders created in Service Bus. This PR increase the limit to 1000 for senders and receivers created off `ConnectionContextBase.connection` so that the limites apply to both Service Bus and Event Hubs. --- .../core-amqp/src/ConnectionContextBase.ts | 61 ++++++++++++++++++- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/sdk/core/core-amqp/src/ConnectionContextBase.ts b/sdk/core/core-amqp/src/ConnectionContextBase.ts index 875558ce6d34..f1b8abc48588 100644 --- a/sdk/core/core-amqp/src/ConnectionContextBase.ts +++ b/sdk/core/core-amqp/src/ConnectionContextBase.ts @@ -1,7 +1,17 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { Connection, ConnectionOptions, generate_uuid } from "rhea-promise"; +import { + AwaitableSender, + Connection, + ConnectionOptions, + CreateAwaitableSenderOptions, + CreateReceiverOptions, + CreateSenderOptions, + Receiver, + Sender, + generate_uuid, +} from "rhea-promise"; import { getFrameworkInfo, getPlatformInfo } from "./util/runtimeInfo"; import { CbsClient } from "./cbs"; import { ConnectionConfig } from "./connectionConfig/connectionConfig"; @@ -100,6 +110,51 @@ export interface CreateConnectionContextBaseParameters { operationTimeoutInMs?: number; } +class CoreAmqpConnection extends Connection { + /** + * Creates an amqp sender link. Max listener limit on the sender is set to 1000 because the + * default value of 10 in NodeJS is too low. + * @param options - Optional parameters to create a sender link. + * @returns Promise. + */ + async createSender(options?: CreateSenderOptions): Promise { + const sender = await super.createSender(options); + sender.setMaxListeners(1000); + return sender; + } + + /** + * Creates an awaitable amqp sender. Max listener limit on the sender is set to 1000 because the + * default value of 10 in NodeJS is too low. + * @param options - Optional parameters to create an awaitable sender link. + * - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will + * clear the timer and reject the Promise for all the entries of inflight send operation in its + * `deliveryDispositionMap`. + * - If the user is handling the reconnection of sender link or the underlying connection in it's + * app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he + * shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation. + * + * @returns Promise. + */ + async createAwaitableSender(options?: CreateAwaitableSenderOptions): Promise { + const sender = await super.createAwaitableSender(options); + sender.setMaxListeners(1000); + return sender; + } + + /** + * Creates an amqp receiver link. Max listener limit on the sender is set to 1000 because the + * default value of 10 in NodeJS is too low. + * @param options - Optional parameters to create a receiver link. + * @returns Promise. + */ + async createReceiver(options?: CreateReceiverOptions): Promise { + const receiver = await super.createReceiver(options); + receiver.setMaxListeners(1000); + return receiver; + } +} + // eslint-disable-next-line @typescript-eslint/no-redeclare -- renaming constant would be a breaking change. export const ConnectionContextBase = { /** @@ -157,7 +212,7 @@ export const ConnectionContextBase = { }; } - const connection = new Connection(connectionOptions); + const connection = new CoreAmqpConnection(connectionOptions); const connectionLock = `${Constants.establishConnection}-${generate_uuid()}`; const connectionContextBase: ConnectionContextBase = { wasConnectionCloseCalled: false, @@ -168,7 +223,7 @@ export const ConnectionContextBase = { cbsSession: new CbsClient(connection, connectionLock), config: parameters.config, refreshConnection() { - const newConnection = new Connection(connectionOptions); + const newConnection = new CoreAmqpConnection(connectionOptions); const newConnectionLock = `${Constants.establishConnection}-${generate_uuid()}`; this.wasConnectionCloseCalled = false; this.connectionLock = newConnectionLock; From 620757c8c4efbd2624771fd74f911ee2c0fec284 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 2 Feb 2022 01:03:49 +0000 Subject: [PATCH 2/3] EH and SB no longer need to set max listener limit. --- sdk/eventhub/event-hubs/package.json | 2 +- sdk/eventhub/event-hubs/src/eventHubSender.ts | 1 - sdk/servicebus/service-bus/src/core/messageSender.ts | 6 ++---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index 273455e9cee1..80b99a3844e4 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -107,7 +107,7 @@ }, "dependencies": { "@azure/abort-controller": "^1.0.0", - "@azure/core-amqp": "^3.0.0", + "@azure/core-amqp": "^3.1.0", "@azure/core-asynciterator-polyfill": "^1.0.0", "@azure/core-auth": "^1.3.0", "@azure/core-tracing": "1.0.0-preview.13", diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 67dab584865c..706935669b99 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -523,7 +523,6 @@ export class EventHubSender extends LinkEntity { this.name, options ); - sender.setMaxListeners(1000); // It is possible for someone to close the sender and then start it again. // Thus make sure that the sender is present in the client cache. diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index 09386852cd6f..8286f69b7e7e 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -282,10 +282,8 @@ export class MessageSender extends LinkEntity { return retry(config); } - protected async createRheaLink(options: AwaitableSenderOptions): Promise { - const sender = await this._context.connection.createAwaitableSender(options); - sender.setMaxListeners(1000); - return sender; + protected createRheaLink(options: AwaitableSenderOptions): Promise { + return this._context.connection.createAwaitableSender(options); } /** From 263b6bd4decdb70dd03bca8c16b140f73f7940af Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 2 Feb 2022 01:09:37 +0000 Subject: [PATCH 3/3] extract 1000 to a constant --- sdk/core/core-amqp/src/ConnectionContextBase.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/core/core-amqp/src/ConnectionContextBase.ts b/sdk/core/core-amqp/src/ConnectionContextBase.ts index f1b8abc48588..7a9c64ea206f 100644 --- a/sdk/core/core-amqp/src/ConnectionContextBase.ts +++ b/sdk/core/core-amqp/src/ConnectionContextBase.ts @@ -110,6 +110,8 @@ export interface CreateConnectionContextBaseParameters { operationTimeoutInMs?: number; } +const maxListenerLimit = 1000; + class CoreAmqpConnection extends Connection { /** * Creates an amqp sender link. Max listener limit on the sender is set to 1000 because the @@ -119,7 +121,7 @@ class CoreAmqpConnection extends Connection { */ async createSender(options?: CreateSenderOptions): Promise { const sender = await super.createSender(options); - sender.setMaxListeners(1000); + sender.setMaxListeners(maxListenerLimit); return sender; } @@ -138,7 +140,7 @@ class CoreAmqpConnection extends Connection { */ async createAwaitableSender(options?: CreateAwaitableSenderOptions): Promise { const sender = await super.createAwaitableSender(options); - sender.setMaxListeners(1000); + sender.setMaxListeners(maxListenerLimit); return sender; } @@ -150,7 +152,7 @@ class CoreAmqpConnection extends Connection { */ async createReceiver(options?: CreateReceiverOptions): Promise { const receiver = await super.createReceiver(options); - receiver.setMaxListeners(1000); + receiver.setMaxListeners(maxListenerLimit); return receiver; } }