Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions sdk/core/core-amqp/src/ConnectionContextBase.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { Connection, ConnectionOptions, generate_uuid } from "rhea-promise";
import {
AwaitableSender,
Connection,
ConnectionOptions,
CreateAwaitableSenderOptions,
CreateReceiverOptions,
CreateSenderOptions,
Receiver,
Sender,
generate_uuid,
} from "rhea-promise";
import { getFrameworkInfo, getPlatformInfo } from "./util/runtimeInfo";
import { CbsClient } from "./cbs";
import { ConnectionConfig } from "./connectionConfig/connectionConfig";
Expand Down Expand Up @@ -100,6 +110,51 @@ export interface CreateConnectionContextBaseParameters {
operationTimeoutInMs?: number;
}

class CoreAmqpConnection extends Connection {
/**
* Creates an amqp sender link. Max listener limit on the sender is set to 1000 because the
* default value of 10 in NodeJS is too low.
* @param options - Optional parameters to create a sender link.
* @returns Promise<Sender>.
*/
async createSender(options?: CreateSenderOptions): Promise<Sender> {
const sender = await super.createSender(options);
sender.setMaxListeners(1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this 1000 be put in a variable instead so it is easier to update it everywhere if there is a need to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted

return sender;
}

/**
* Creates an awaitable amqp sender. Max listener limit on the sender is set to 1000 because the
* default value of 10 in NodeJS is too low.
* @param options - Optional parameters to create an awaitable sender link.
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
* clear the timer and reject the Promise for all the entries of inflight send operation in its
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation.
*
* @returns Promise<AwaitableSender>.
*/
async createAwaitableSender(options?: CreateAwaitableSenderOptions): Promise<AwaitableSender> {
const sender = await super.createAwaitableSender(options);
sender.setMaxListeners(1000);
return sender;
}

/**
* Creates an amqp receiver link. Max listener limit on the sender is set to 1000 because the
* default value of 10 in NodeJS is too low.
* @param options - Optional parameters to create a receiver link.
* @returns Promise<Receiver>.
*/
async createReceiver(options?: CreateReceiverOptions): Promise<Receiver> {
const receiver = await super.createReceiver(options);
receiver.setMaxListeners(1000);
return receiver;
}
}

// eslint-disable-next-line @typescript-eslint/no-redeclare -- renaming constant would be a breaking change.
export const ConnectionContextBase = {
/**
Expand Down Expand Up @@ -157,7 +212,7 @@ export const ConnectionContextBase = {
};
}

const connection = new Connection(connectionOptions);
const connection = new CoreAmqpConnection(connectionOptions);
const connectionLock = `${Constants.establishConnection}-${generate_uuid()}`;
const connectionContextBase: ConnectionContextBase = {
wasConnectionCloseCalled: false,
Expand All @@ -168,7 +223,7 @@ export const ConnectionContextBase = {
cbsSession: new CbsClient(connection, connectionLock),
config: parameters.config,
refreshConnection() {
const newConnection = new Connection(connectionOptions);
const newConnection = new CoreAmqpConnection(connectionOptions);
const newConnectionLock = `${Constants.establishConnection}-${generate_uuid()}`;
this.wasConnectionCloseCalled = false;
this.connectionLock = newConnectionLock;
Expand Down