Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for class MultiplexedSession #2191

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import {EventEmitter} from 'events';
import PQueue from 'p-queue';
import {Database} from './database';
import {Session} from './session';
import {Transaction} from './transaction';

interface MuxSession {
multiplexedSession: Session | null;
}

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

/**
* Interface for implementing custom session pooling logic, it should extend the
* {@link https://nodejs.org/api/events.html|EventEmitter} class and emit any
* asynchronous errors via an error event.
*
* @interface SessionPoolInterface
* @extends external:{@link https://nodejs.org/api/events.html|EventEmitter}
*
* @constructs SessionPoolInterface
* @param {Database} database The database to create a pool for.
*/
export interface MultiplexedSessionInterface {
/**
* When called creates a multiplexed session.
*
* @name MultiplexedSessionInterface#createSession
*/
createSession(): void;

/**
* When called returns a multiplexed session.
*
* @name MultiplexedSessionInterface#getSession
* @param {GetSessionCallback} callback The callback function.
*/
getSession(callback: GetSessionCallback): void;
}

/**
* Multiplexed session configuration options.
*
* @typedef {object} MultiplexedSessionOptions
* @property {number} [refreshRate=30] How often to check for expiration of multiplexed session, in
* minutes. Must not be greater than 7 days.
*/

export interface MultiplexedSessionOptions {
refreshRate?: number;
databaseRole?: string | null;
}

const DEFAULTS: MultiplexedSessionOptions = {
refreshRate: 30,
databaseRole: null,
};

/**
* Class used to manage connections to Spanner using multiplexed session.
*
* **You don't need to use this class directly, connections will be handled for
* you.**
*
* @class
* @extends {EventEmitter}
*/
export class MultiplexedSession
extends EventEmitter
implements MultiplexedSessionInterface
{
database: Database;
options: MultiplexedSessionOptions;
_acquires: PQueue;
_muxSession!: MuxSession;
_pingHandle!: NodeJS.Timer;
_refreshHandle!: NodeJS.Timer;
constructor(
database: Database,
multiplexedSessionOptions?: MultiplexedSessionOptions
) {
super();
this.database = database;
this.options = Object.assign({}, DEFAULTS, multiplexedSessionOptions);
this.options.databaseRole = this.options.databaseRole
? this.options.databaseRole
: database.databaseRole;
this._muxSession = {
multiplexedSession: null,
};
this._acquires = new PQueue({
concurrency: 1,
});
}

/**
* Creates a new multiplexed session and manages its maintenance.
*
* This method initiates the session creation process by calling the `_createSession` method, which returns a Promise.
*/
createSession(): void {
this._createSession().then(() => {
this._maintain();
});
}

/**
* Creates a new multiplexed session.
*
* This method sends a request to the database to create a new session with multiplexing enabled.
* The response from the database would be an array, the first value of the array will be containing the multiplexed session.
*
* @returns {Promise<void>} A Promise that resolves when the session has been successfully created and assigned, an event
* `mux-session-available` will be emitted to signal that the session is ready.
*
* @throws {Error} If there is an issue with the database or session creation, the Promise will be rejected, and the
* `unimplemented` event is emitted along with the error.
*
* @private
*/
async _createSession(): Promise<void> {
const [createSessionResponse] = await this.database.createSession({
multiplexed: true,
});
this._muxSession.multiplexedSession = createSessionResponse;
this.emit('mux-session-available');
}

/**
* Maintains the multiplexed session by periodically refreshing it.
*
* This method sets up a periodic refresh interval for maintaining the session. The interval duration
* is determined by the @param refreshRate option, which is provided in minutes.
* The default value is 30 minutes.
*
* @returns {void} This method does not return any value.
*
* @throws {Error} If the `refreshRate` option is not defined, this method could throw an error.
*/
_maintain(): void {
const refreshRate = this.options.refreshRate! * 60000;
this._refreshHandle = setInterval(() => this._refresh(), refreshRate);
this._refreshHandle.unref();
}

/**
* Creates a transaction for a session.
*
* @private
*
* @param {Session} session The session object.
* @param {object} options The transaction options.
*/
_prepareTransaction(session: Session | null): void {
const transaction = session!.transaction(
(session!.parent as Database).queryOptions_
);
session!.txn = transaction;
}

/**
* Refreshes the session by checking its expiration time and recreating the session if expired.
* Every 7th day a new Multiplexed Session will get created. However, a Multiplexed Session will
* be alive for 30 days in the backend.
*
* @returns {Promise<void>} A Promise that resolves once the refresh process is completed.
* If the session is expired, a new session will be created.
*
* @throws {Error} If there is an issue with retrieving the session metadata or calculating the expiration time.
*/
async _refresh(): Promise<void> {
const metadata = await this._muxSession.multiplexedSession?.getMetadata();
const createTime =
parseInt(metadata![0].createTime.seconds) * 1000 +
metadata![0].createTime.nanos / 1000000;

// Calculate expiration time (7 days after session creation)
const expireTime = createTime + 7 * 24 * 60 * 60 * 1000;

// If the current time exceeds the expiration time, create a new session
if (Date.now() > expireTime) {
this.createSession();
}
}

/**
* Retrieves a session asynchronously and invokes a callback with the session details.
*
* @param {GetSessionCallback} callback - The callback to be invoked once the session is acquired or an error occurs.
*
* @returns {void} This method does not return any value, as it operates asynchronously and relies on the callback.
*
* @throws {Error} If the `_acquire()` method fails, an error will be logged, but not thrown explicitly.
*/
getSession(callback: GetSessionCallback): void {
this._acquire().then(
session => callback(null, session, session?.txn),
callback
);
}

/**
* Acquires a session asynchronously, with retry logic, and prepares the transaction for the session.
*
* Once a session is successfully acquired, it returns the session object (which may be `null` if unsuccessful).
*
* @returns {Promise<Session | null>}
* A Promise that resolves with the acquired session (or `null` if no session is available after retries).
*
*/
async _acquire(): Promise<Session | null> {
const getSession = async (): Promise<Session | null> => {
const session = await this._getSession();

if (session) {
return session;
}

return getSession();
};

const session = await this._acquires.add(getSession);
this._prepareTransaction(session);
return session;
}

/**
* Retrieves the current multiplexed session.
*
* Returns the current multiplexed session associated with this instance.
*
* @returns {Session | null} The current multiplexed session if available, or `null` if no session is present.
*/
_multiplexedSession(): Session | null {
return this._muxSession.multiplexedSession;
}

/**
* Attempts to get a session, waiting for it to become available if necessary.
*
* Waits for the `mux-session-available` event to be emitted if the multiplexed session is not yet available
* or for an `unimplemented` error event in case of any error. The method listens for these events, and once
* `mux-session-available` is emitted, it resolves and returns the session. If the `unimplemented` event is
* emitted instead, the method rejects with an error.
*
* @returns {Promise<Session | null>} A promise that resolves with the current multiplexed session if available,
* or `null` if the session is not available.
*
* @throws {Error} If the `unimplemented` event is triggered, indicating that the multiplexed session cannot be implemented.
*
* @private
*/
async _getSession(): Promise<Session | null> {
// Check if the multiplexed session is already available
if (this._muxSession.multiplexedSession !== null) {
return this._multiplexedSession();
}

// Define event and promises to wait for the session to become available or for an error
const availableEvent = 'mux-session-available';
let removeListener: Function;
const promise = new Promise(resolve => {
this.once(availableEvent, resolve);
removeListener = this.removeListener.bind(this, availableEvent, resolve);
});

try {
await promise;
} finally {
removeListener!();
}
// Return the multiplexed session after it becomes available
return this._multiplexedSession();
}
}
1 change: 1 addition & 0 deletions src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export interface GetSessionMetadataResponse {
createTime?: google.protobuf.ITimestamp | null;
approximateLastUseTime?: google.protobuf.ITimestamp | null;
databaseRole?: string | null;
multiplexed?: boolean;
}

export type GetSessionMetadataCallback =
Expand Down
Loading
Loading