diff --git a/sdk/core/core-amqp/src/ConnectionContextBase.ts b/sdk/core/core-amqp/src/ConnectionContextBase.ts index 875558ce6d34..7a9c64ea206f 100644 --- a/sdk/core/core-amqp/src/ConnectionContextBase.ts +++ b/sdk/core/core-amqp/src/ConnectionContextBase.ts @@ -1,7 +1,17 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { Connection, ConnectionOptions, generate_uuid } from "rhea-promise"; +import { + AwaitableSender, + Connection, + ConnectionOptions, + CreateAwaitableSenderOptions, + CreateReceiverOptions, + CreateSenderOptions, + Receiver, + Sender, + generate_uuid, +} from "rhea-promise"; import { getFrameworkInfo, getPlatformInfo } from "./util/runtimeInfo"; import { CbsClient } from "./cbs"; import { ConnectionConfig } from "./connectionConfig/connectionConfig"; @@ -100,6 +110,53 @@ export interface CreateConnectionContextBaseParameters { operationTimeoutInMs?: number; } +const maxListenerLimit = 1000; + +class CoreAmqpConnection extends Connection { + /** + * Creates an amqp sender link. Max listener limit on the sender is set to 1000 because the + * default value of 10 in NodeJS is too low. + * @param options - Optional parameters to create a sender link. + * @returns Promise. + */ + async createSender(options?: CreateSenderOptions): Promise { + const sender = await super.createSender(options); + sender.setMaxListeners(maxListenerLimit); + return sender; + } + + /** + * Creates an awaitable amqp sender. Max listener limit on the sender is set to 1000 because the + * default value of 10 in NodeJS is too low. + * @param options - Optional parameters to create an awaitable sender link. + * - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will + * clear the timer and reject the Promise for all the entries of inflight send operation in its + * `deliveryDispositionMap`. + * - If the user is handling the reconnection of sender link or the underlying connection in it's + * app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he + * shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation. + * + * @returns Promise. + */ + async createAwaitableSender(options?: CreateAwaitableSenderOptions): Promise { + const sender = await super.createAwaitableSender(options); + sender.setMaxListeners(maxListenerLimit); + return sender; + } + + /** + * Creates an amqp receiver link. Max listener limit on the sender is set to 1000 because the + * default value of 10 in NodeJS is too low. + * @param options - Optional parameters to create a receiver link. + * @returns Promise. + */ + async createReceiver(options?: CreateReceiverOptions): Promise { + const receiver = await super.createReceiver(options); + receiver.setMaxListeners(maxListenerLimit); + return receiver; + } +} + // eslint-disable-next-line @typescript-eslint/no-redeclare -- renaming constant would be a breaking change. export const ConnectionContextBase = { /** @@ -157,7 +214,7 @@ export const ConnectionContextBase = { }; } - const connection = new Connection(connectionOptions); + const connection = new CoreAmqpConnection(connectionOptions); const connectionLock = `${Constants.establishConnection}-${generate_uuid()}`; const connectionContextBase: ConnectionContextBase = { wasConnectionCloseCalled: false, @@ -168,7 +225,7 @@ export const ConnectionContextBase = { cbsSession: new CbsClient(connection, connectionLock), config: parameters.config, refreshConnection() { - const newConnection = new Connection(connectionOptions); + const newConnection = new CoreAmqpConnection(connectionOptions); const newConnectionLock = `${Constants.establishConnection}-${generate_uuid()}`; this.wasConnectionCloseCalled = false; this.connectionLock = newConnectionLock; diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index 273455e9cee1..80b99a3844e4 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -107,7 +107,7 @@ }, "dependencies": { "@azure/abort-controller": "^1.0.0", - "@azure/core-amqp": "^3.0.0", + "@azure/core-amqp": "^3.1.0", "@azure/core-asynciterator-polyfill": "^1.0.0", "@azure/core-auth": "^1.3.0", "@azure/core-tracing": "1.0.0-preview.13", diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 67dab584865c..706935669b99 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -523,7 +523,6 @@ export class EventHubSender extends LinkEntity { this.name, options ); - sender.setMaxListeners(1000); // It is possible for someone to close the sender and then start it again. // Thus make sure that the sender is present in the client cache. diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index 09386852cd6f..8286f69b7e7e 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -282,10 +282,8 @@ export class MessageSender extends LinkEntity { return retry(config); } - protected async createRheaLink(options: AwaitableSenderOptions): Promise { - const sender = await this._context.connection.createAwaitableSender(options); - sender.setMaxListeners(1000); - return sender; + protected createRheaLink(options: AwaitableSenderOptions): Promise { + return this._context.connection.createAwaitableSender(options); } /**