Skip to content

Wallet worker restart #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions src/wallet/wallet.interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { TransactionReceipt, TransactionRequest, TransactionResponse } from 'ethers6';
import { MessagePort } from 'worker_threads';
import { WalletTransactionOptions, WalletTransactionRequestMessage, WalletTransactionRequestResponse } from './wallet.types';
import { tryErrorToString } from 'src/common/utils';
import { WALLET_WORKER_CRASHED_MESSAGE_ID } from './wallet.service';

export interface TransactionResult<T = any> {
txRequest: TransactionRequest;
@@ -30,19 +30,31 @@ export class WalletInterface {
const messageId = this.getNextPortMessageId();

const resultPromise = new Promise<TransactionResult<T>>(resolve => {
const listener = (data: WalletTransactionRequestResponse<T>) => {
if (data.messageId == messageId) {
const listener = (data: any) => {
if (data.messageId === messageId) {
this.port.off("message", listener);

const walletResponse = data as WalletTransactionRequestResponse<T>;

const result = {
txRequest: data.txRequest,
metadata: data.metadata,
tx: data.tx,
txReceipt: data.txReceipt,
submissionError: tryErrorToString(data.submissionError),
confirmationError: tryErrorToString(data.confirmationError)
txRequest: walletResponse.txRequest,
metadata: walletResponse.metadata,
tx: walletResponse.tx,
txReceipt: walletResponse.txReceipt,
submissionError: data.submissionError,
confirmationError: data.confirmationError
};
resolve(result);
} else if (data.messageId === WALLET_WORKER_CRASHED_MESSAGE_ID) {
this.port.off("message", listener);

const result = {
txRequest: transaction,
metadata,
submissionError: new Error('Wallet crashed.'), //TODO use a custom error type?
confirmationError: new Error('Wallet crashed.'), //TODO use a custom error type?
};
resolve(result);
}
};
this.port.on("message", listener);
191 changes: 146 additions & 45 deletions src/wallet/wallet.service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Global, Injectable, OnModuleInit } from '@nestjs/common';
import { join } from 'path';
import { LoggerOptions } from 'pino';
import { Worker, MessagePort } from 'worker_threads';
import { Worker, MessagePort, MessageChannel } from 'worker_threads';
import { ConfigService } from 'src/config/config.service';
import { LoggerService, STATUS_LOG_INTERVAL } from 'src/logger/logger.service';
import { WalletGetPortMessage, WalletGetPortResponse } from './wallet.types';
import { WalletServiceRoutingMessage, WalletTransactionRequestMessage } from './wallet.types';
import { Wallet } from 'ethers6';
import { tryErrorToString } from 'src/common/utils';

export const WALLET_WORKER_CRASHED_MESSAGE_ID = -1;

const DEFAULT_WALLET_RETRY_INTERVAL = 30000;
const DEFAULT_WALLET_PROCESSING_INTERVAL = 100;
const DEFAULT_WALLET_MAX_TRIES = 3;
@@ -56,18 +58,29 @@ export interface WalletWorkerData {

}

interface PortDescription {
chainId: string;
port: MessagePort;
}

@Global()
@Injectable()
export class WalletService implements OnModuleInit {
private readonly defaultWorkerConfig: DefaultWalletWorkerData;

private workers: Record<string, Worker | null> = {};
private requestPortMessageId = 0;
private portsCount = 0;
private readonly ports: Record<number, PortDescription> = {};

private readonly queuedRequests: Record<string, WalletServiceRoutingMessage[]> = {};

readonly publicKey: string;

constructor(
private readonly configService: ConfigService,
private readonly loggerService: LoggerService,
) {
this.defaultWorkerConfig = this.loadDefaultWorkerConfig();
this.publicKey = (new Wallet(this.configService.globalConfig.privateKey)).address;
}

@@ -80,31 +93,9 @@ export class WalletService implements OnModuleInit {
}

private async initializeWorkers(): Promise<void> {
const defaultWorkerConfig = this.loadDefaultWorkerConfig();

for (const [chainId,] of this.configService.chainsConfig) {

const workerData = this.loadWorkerConfig(chainId, defaultWorkerConfig);

const worker = new Worker(join(__dirname, 'wallet.worker.js'), {
workerData
});
this.workers[chainId] = worker;

worker.on('error', (error) =>
this.loggerService.fatal(
{ error: tryErrorToString(error), chainId },
`Error on wallet worker.`,
),
);

worker.on('exit', (exitCode) => {
this.workers[chainId] = null;
this.loggerService.fatal(
{ exitCode, chainId },
`Wallet worker exited.`,
);
});
this.spawnWorker(chainId);
}

// Add a small delay to wait for the workers to be initialized
@@ -152,9 +143,10 @@ export class WalletService implements OnModuleInit {

private loadWorkerConfig(
chainId: string,
defaultConfig: DefaultWalletWorkerData
): WalletWorkerData {

const defaultConfig = this.defaultWorkerConfig;

const chainConfig = this.configService.chainsConfig.get(chainId);
if (chainConfig == undefined) {
throw new Error(`Unable to load config for chain ${chainId}`);
@@ -215,6 +207,56 @@ export class WalletService implements OnModuleInit {
};
}

private spawnWorker(
chainId: string
): void {
const workerData = this.loadWorkerConfig(chainId);
this.loggerService.info(
{
chainId,
workerData,
},
`Spawning wallet worker.`
);

const worker = new Worker(join(__dirname, 'wallet.worker.js'), {
workerData
});
this.workers[chainId] = worker;

worker.on('error', (error) =>
this.loggerService.error(
{ error: tryErrorToString(error), chainId },
`Error on wallet worker.`,
),
);

worker.on('exit', (exitCode) => {
this.workers[chainId] = null;
this.loggerService.error(
{ exitCode, chainId },
`Wallet worker exited.`,
);

this.abortPendingRequests(chainId);
this.spawnWorker(chainId);
this.recoverQueuedMessages(chainId);
});

worker.on('message', (message: WalletServiceRoutingMessage) => {
const portDescription = this.ports[message.portId];
if (portDescription == undefined) {
this.loggerService.error(
message,
`Unable to route transaction response on wallet: port id not found.`
);
return;
}

portDescription.port.postMessage(message.data);
});
}

private initiateIntervalStatusLog(): void {
const logStatus = () => {
const activeWorkers = [];
@@ -232,32 +274,91 @@ export class WalletService implements OnModuleInit {
setInterval(logStatus, STATUS_LOG_INTERVAL);
}

async attachToWallet(chainId: string): Promise<MessagePort> {

const portId = this.portsCount++;

const { port1, port2 } = new MessageChannel();

private getNextRequestPortMessageId(): number {
return this.requestPortMessageId++;
port1.on('message', (message: WalletTransactionRequestMessage) => {
this.handleTransactionRequestMessage(
chainId,
portId,
message,
);
});

this.ports[portId] = {
chainId,
port: port1,
};

return port2;
}

async attachToWallet(chainId: string): Promise<MessagePort> {
private handleTransactionRequestMessage(
chainId: string,
portId: number,
message: WalletTransactionRequestMessage
): void {
const worker = this.workers[chainId];

const routingMessage: WalletServiceRoutingMessage = {
portId,
data: message
};

if (worker == undefined) {
throw new Error(`Wallet does not exist for chain ${chainId}`);
}
this.loggerService.warn(
{
chainId,
portId,
message
},
`Wallet does not exist for the requested chain. Queueing message.`
);

const messageId = this.getNextRequestPortMessageId();
const portPromise = new Promise<MessagePort>((resolve) => {
const listener = (data: WalletGetPortResponse) => {
if (data.messageId == messageId) {
worker.off("message", listener);
resolve(data.port);
}
};
worker.on("message", listener);
if (!(chainId in this.queuedRequests)) {
this.queuedRequests[chainId] = [];
}
this.queuedRequests[chainId]!.push(routingMessage);
} else {
worker.postMessage(routingMessage);
}
}

const portMessage: WalletGetPortMessage = { messageId };
worker.postMessage(portMessage);
});
private abortPendingRequests(
chainId: string,
): void {
for (const portDescription of Object.values(this.ports)) {
if (portDescription.chainId === chainId) {
portDescription.port.postMessage({
messageId: WALLET_WORKER_CRASHED_MESSAGE_ID
});
}
}
}

return portPromise;
private recoverQueuedMessages(
chainId: string,
): void {
const queuedRequests = this.queuedRequests[chainId] ?? [];
this.queuedRequests[chainId] = [];

this.loggerService.info(
{
chainId,
count: queuedRequests.length,
},
`Recovering queued wallet requests.`
);

for (const request of queuedRequests) {
this.handleTransactionRequestMessage(
chainId,
request.portId,
request.data,
);
}
}
}
11 changes: 3 additions & 8 deletions src/wallet/wallet.types.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { TransactionRequest, TransactionReceipt, TransactionResponse } from "ethers6";
import { MessagePort } from "worker_threads";



// Port Channels Types
// ************************************************************************************************
export interface WalletGetPortMessage {
messageId: number;
}

export interface WalletGetPortResponse {
messageId: number;
port: MessagePort;
export interface WalletServiceRoutingMessage<T = any> {
portId: number;
data: T;
}

//TODO add 'priority'
57 changes: 19 additions & 38 deletions src/wallet/wallet.worker.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { JsonRpcProvider, Wallet, Provider, AbstractProvider, ZeroAddress, TransactionResponse, TransactionReceipt, TransactionRequest } from "ethers6";
import pino, { LoggerOptions } from "pino";
import { workerData, parentPort, MessageChannel, MessagePort } from 'worker_threads';
import { workerData, parentPort, MessagePort } from 'worker_threads';
import { tryErrorToString, wait } from "src/common/utils";
import { STATUS_LOG_INTERVAL } from "src/logger/logger.service";
import { TransactionHelper } from "./transaction-helper";
import { ConfirmQueue } from "./queues/confirm-queue";
import { WalletWorkerData } from "./wallet.service";
import { ConfirmedTransaction, GasFeeConfig, WalletGetPortMessage, WalletGetPortResponse, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestMessage, WalletTransactionRequestResponse, BalanceConfig } from "./wallet.types";
import { ConfirmedTransaction, GasFeeConfig, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestResponse, BalanceConfig, WalletServiceRoutingMessage } from "./wallet.types";
import { SubmitQueue } from "./queues/submit-queue";


@@ -66,7 +66,7 @@ class WalletWorker {
this.logger
);

this.initializePorts();
this.initializePort();

this.initiateIntervalStatusLog();
}
@@ -149,35 +149,16 @@ class WalletWorker {
};
}

private initializePorts(): void {
parentPort!.on('message', (message: WalletGetPortMessage) => {
const port = this.registerNewPort();
const response: WalletGetPortResponse = {
messageId: message.messageId,
port
};
parentPort!.postMessage(response, [port])
});
}

private registerNewPort(): MessagePort {

const portId = this.portsCount++;

const { port1, port2 } = new MessageChannel();

port1.on('message', (message: WalletTransactionRequestMessage) => {
private initializePort(): void {
parentPort!.on('message', (message: WalletServiceRoutingMessage) => {
this.addTransaction(
portId,
message.messageId,
message.txRequest,
message.metadata,
message.options
message.portId,
message.data.messageId,
message.data.txRequest,
message.data.metadata,
message.data.options
);
})
this.ports[portId] = port1;

return port2;
});
}

private addTransaction(
@@ -477,13 +458,7 @@ class WalletWorker {
confirmationError?: any,
): void {

const port = this.ports[request.portId];
if (port == undefined) {
this.logger.error({ request }, 'Failed to send transaction result: invalid portId.');
return;
}

const response: WalletTransactionRequestResponse = {
const transactionResponse: WalletTransactionRequestResponse = {
messageId: request.messageId,
txRequest: request.txRequest,
metadata: request.metadata,
@@ -492,7 +467,13 @@ class WalletWorker {
submissionError: tryErrorToString(submissionError),
confirmationError: tryErrorToString(confirmationError),
}
port.postMessage(response);

const routingResponse: WalletServiceRoutingMessage = {
portId: request.portId,
data: transactionResponse,
}

parentPort!.postMessage(routingResponse);
}

private isNonceExpiredError(error: any, includeUnderpricedError?: boolean): boolean {