Skip to content

Commit 35757be

Browse files
authored
[core-amqp] set the max listener limit to 1000 for sender and receiver (#20088)
NodeJS would issue warning if the number of disconnected listeners on an event emitter exceeds 10. When many sessions on a connection are closed but the removal of the listener hasn't caught up, we will see this warning because the default limit of 10 in NodeJS is too low. The disconnected listeners DO get removed eventually in our code. We already do this for senders created in Service Bus. This PR increase the limit to 1000 for senders and receivers created off `ConnectionContextBase.connection` so that the limites apply to both Service Bus and Event Hubs. * EH and SB no longer need to set max listener limit. For Issue #12161
1 parent 1dbd41d commit 35757be

File tree

4 files changed

+63
-9
lines changed

4 files changed

+63
-9
lines changed

sdk/core/core-amqp/src/ConnectionContextBase.ts

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
import { Connection, ConnectionOptions, generate_uuid } from "rhea-promise";
4+
import {
5+
AwaitableSender,
6+
Connection,
7+
ConnectionOptions,
8+
CreateAwaitableSenderOptions,
9+
CreateReceiverOptions,
10+
CreateSenderOptions,
11+
Receiver,
12+
Sender,
13+
generate_uuid,
14+
} from "rhea-promise";
515
import { getFrameworkInfo, getPlatformInfo } from "./util/runtimeInfo";
616
import { CbsClient } from "./cbs";
717
import { ConnectionConfig } from "./connectionConfig/connectionConfig";
@@ -100,6 +110,53 @@ export interface CreateConnectionContextBaseParameters {
100110
operationTimeoutInMs?: number;
101111
}
102112

113+
const maxListenerLimit = 1000;
114+
115+
class CoreAmqpConnection extends Connection {
116+
/**
117+
* Creates an amqp sender link. Max listener limit on the sender is set to 1000 because the
118+
* default value of 10 in NodeJS is too low.
119+
* @param options - Optional parameters to create a sender link.
120+
* @returns Promise<Sender>.
121+
*/
122+
async createSender(options?: CreateSenderOptions): Promise<Sender> {
123+
const sender = await super.createSender(options);
124+
sender.setMaxListeners(maxListenerLimit);
125+
return sender;
126+
}
127+
128+
/**
129+
* Creates an awaitable amqp sender. Max listener limit on the sender is set to 1000 because the
130+
* default value of 10 in NodeJS is too low.
131+
* @param options - Optional parameters to create an awaitable sender link.
132+
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
133+
* clear the timer and reject the Promise for all the entries of inflight send operation in its
134+
* `deliveryDispositionMap`.
135+
* - If the user is handling the reconnection of sender link or the underlying connection in it's
136+
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
137+
* shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation.
138+
*
139+
* @returns Promise<AwaitableSender>.
140+
*/
141+
async createAwaitableSender(options?: CreateAwaitableSenderOptions): Promise<AwaitableSender> {
142+
const sender = await super.createAwaitableSender(options);
143+
sender.setMaxListeners(maxListenerLimit);
144+
return sender;
145+
}
146+
147+
/**
148+
* Creates an amqp receiver link. Max listener limit on the sender is set to 1000 because the
149+
* default value of 10 in NodeJS is too low.
150+
* @param options - Optional parameters to create a receiver link.
151+
* @returns Promise<Receiver>.
152+
*/
153+
async createReceiver(options?: CreateReceiverOptions): Promise<Receiver> {
154+
const receiver = await super.createReceiver(options);
155+
receiver.setMaxListeners(maxListenerLimit);
156+
return receiver;
157+
}
158+
}
159+
103160
// eslint-disable-next-line @typescript-eslint/no-redeclare -- renaming constant would be a breaking change.
104161
export const ConnectionContextBase = {
105162
/**
@@ -157,7 +214,7 @@ export const ConnectionContextBase = {
157214
};
158215
}
159216

160-
const connection = new Connection(connectionOptions);
217+
const connection = new CoreAmqpConnection(connectionOptions);
161218
const connectionLock = `${Constants.establishConnection}-${generate_uuid()}`;
162219
const connectionContextBase: ConnectionContextBase = {
163220
wasConnectionCloseCalled: false,
@@ -168,7 +225,7 @@ export const ConnectionContextBase = {
168225
cbsSession: new CbsClient(connection, connectionLock),
169226
config: parameters.config,
170227
refreshConnection() {
171-
const newConnection = new Connection(connectionOptions);
228+
const newConnection = new CoreAmqpConnection(connectionOptions);
172229
const newConnectionLock = `${Constants.establishConnection}-${generate_uuid()}`;
173230
this.wasConnectionCloseCalled = false;
174231
this.connectionLock = newConnectionLock;

sdk/eventhub/event-hubs/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
},
108108
"dependencies": {
109109
"@azure/abort-controller": "^1.0.0",
110-
"@azure/core-amqp": "^3.0.0",
110+
"@azure/core-amqp": "^3.1.0",
111111
"@azure/core-asynciterator-polyfill": "^1.0.0",
112112
"@azure/core-auth": "^1.3.0",
113113
"@azure/core-tracing": "1.0.0-preview.13",

sdk/eventhub/event-hubs/src/eventHubSender.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,6 @@ export class EventHubSender extends LinkEntity {
523523
this.name,
524524
options
525525
);
526-
sender.setMaxListeners(1000);
527526

528527
// It is possible for someone to close the sender and then start it again.
529528
// Thus make sure that the sender is present in the client cache.

sdk/servicebus/service-bus/src/core/messageSender.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,8 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
282282
return retry<void>(config);
283283
}
284284

285-
protected async createRheaLink(options: AwaitableSenderOptions): Promise<AwaitableSender> {
286-
const sender = await this._context.connection.createAwaitableSender(options);
287-
sender.setMaxListeners(1000);
288-
return sender;
285+
protected createRheaLink(options: AwaitableSenderOptions): Promise<AwaitableSender> {
286+
return this._context.connection.createAwaitableSender(options);
289287
}
290288

291289
/**

0 commit comments

Comments
 (0)