From 152e02c4921cc30bc8c973c29391ec940c8359bc Mon Sep 17 00:00:00 2001 From: Jorge Sanmiguel <8038323+jsanmigimeno@users.noreply.github.com> Date: Thu, 23 May 2024 14:40:54 +0000 Subject: [PATCH 1/2] chore: Refactor wallet ports (route via wallet service) --- src/wallet/wallet.service.ts | 49 +++++++++++++++++++++--------------- src/wallet/wallet.types.ts | 11 +++----- src/wallet/wallet.worker.ts | 37 ++++++++++++++------------- 3 files changed, 51 insertions(+), 46 deletions(-) diff --git a/src/wallet/wallet.service.ts b/src/wallet/wallet.service.ts index 12cca2e..2cf7b12 100644 --- a/src/wallet/wallet.service.ts +++ b/src/wallet/wallet.service.ts @@ -1,10 +1,10 @@ import { Global, Injectable, OnModuleInit } from '@nestjs/common'; import { join } from 'path'; import { LoggerOptions } from 'pino'; -import { Worker, MessagePort } from 'worker_threads'; +import { Worker, MessagePort, MessageChannel } from 'worker_threads'; import { ConfigService } from 'src/config/config.service'; import { LoggerService, STATUS_LOG_INTERVAL } from 'src/logger/logger.service'; -import { WalletGetPortMessage, WalletGetPortResponse } from './wallet.types'; +import { WalletServiceRoutingMessage, WalletTransactionRequestMessage } from './wallet.types'; import { Wallet } from 'ethers6'; import { tryErrorToString } from 'src/common/utils'; @@ -60,7 +60,8 @@ export interface WalletWorkerData { @Injectable() export class WalletService implements OnModuleInit { private workers: Record = {}; - private requestPortMessageId = 0; + private portsCount = 0; + private readonly ports: Record = {}; readonly publicKey: string; @@ -105,6 +106,19 @@ export class WalletService implements OnModuleInit { `Wallet worker exited.`, ); }); + + worker.on('message', (message: WalletServiceRoutingMessage) => { + const port = this.ports[message.portId]; + if (port == undefined) { + this.loggerService.error( + message, + `Unable to route transaction response on wallet: port id not found.` + ); + return; + } + + port.postMessage(message.data); + }) } // Add a small delay to wait for the workers to be initialized @@ -232,32 +246,27 @@ export class WalletService implements OnModuleInit { setInterval(logStatus, STATUS_LOG_INTERVAL); } - - private getNextRequestPortMessageId(): number { - return this.requestPortMessageId++; - } - async attachToWallet(chainId: string): Promise { const worker = this.workers[chainId]; if (worker == undefined) { throw new Error(`Wallet does not exist for chain ${chainId}`); } + + const portId = this.portsCount++; - const messageId = this.getNextRequestPortMessageId(); - const portPromise = new Promise((resolve) => { - const listener = (data: WalletGetPortResponse) => { - if (data.messageId == messageId) { - worker.off("message", listener); - resolve(data.port); - } - }; - worker.on("message", listener); + const { port1, port2 } = new MessageChannel(); - const portMessage: WalletGetPortMessage = { messageId }; - worker.postMessage(portMessage); + port1.on('message', (message: WalletTransactionRequestMessage) => { + const routingMessage: WalletServiceRoutingMessage = { + portId, + data: message + }; + worker.postMessage(routingMessage); }); - return portPromise; + this.ports[portId] = port1; + + return port2; } } diff --git a/src/wallet/wallet.types.ts b/src/wallet/wallet.types.ts index 0de3580..a800af5 100644 --- a/src/wallet/wallet.types.ts +++ b/src/wallet/wallet.types.ts @@ -1,17 +1,12 @@ import { TransactionRequest, TransactionReceipt, TransactionResponse } from "ethers6"; -import { MessagePort } from "worker_threads"; // Port Channels Types // ************************************************************************************************ -export interface WalletGetPortMessage { - messageId: number; -} - -export interface WalletGetPortResponse { - messageId: number; - port: MessagePort; +export interface WalletServiceRoutingMessage { + portId: number; + data: T; } //TODO add 'priority' diff --git a/src/wallet/wallet.worker.ts b/src/wallet/wallet.worker.ts index edf107f..21b8b00 100644 --- a/src/wallet/wallet.worker.ts +++ b/src/wallet/wallet.worker.ts @@ -6,7 +6,7 @@ import { STATUS_LOG_INTERVAL } from "src/logger/logger.service"; import { TransactionHelper } from "./transaction-helper"; import { ConfirmQueue } from "./queues/confirm-queue"; import { WalletWorkerData } from "./wallet.service"; -import { ConfirmedTransaction, GasFeeConfig, WalletGetPortMessage, WalletGetPortResponse, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestMessage, WalletTransactionRequestResponse, BalanceConfig } from "./wallet.types"; +import { ConfirmedTransaction, GasFeeConfig, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestMessage, WalletTransactionRequestResponse, BalanceConfig, WalletServiceRoutingMessage } from "./wallet.types"; import { SubmitQueue } from "./queues/submit-queue"; @@ -66,7 +66,7 @@ class WalletWorker { this.logger ); - this.initializePorts(); + this.initializePort(); this.initiateIntervalStatusLog(); } @@ -149,14 +149,15 @@ class WalletWorker { }; } - private initializePorts(): void { - parentPort!.on('message', (message: WalletGetPortMessage) => { - const port = this.registerNewPort(); - const response: WalletGetPortResponse = { - messageId: message.messageId, - port - }; - parentPort!.postMessage(response, [port]) + private initializePort(): void { + parentPort!.on('message', (message: WalletServiceRoutingMessage) => { + this.addTransaction( + message.portId, + message.data.messageId, + message.data.txRequest, + message.data.metadata, + message.data.options + ); }); } @@ -477,13 +478,7 @@ class WalletWorker { confirmationError?: any, ): void { - const port = this.ports[request.portId]; - if (port == undefined) { - this.logger.error({ request }, 'Failed to send transaction result: invalid portId.'); - return; - } - - const response: WalletTransactionRequestResponse = { + const transactionResponse: WalletTransactionRequestResponse = { messageId: request.messageId, txRequest: request.txRequest, metadata: request.metadata, @@ -492,7 +487,13 @@ class WalletWorker { submissionError: tryErrorToString(submissionError), confirmationError: tryErrorToString(confirmationError), } - port.postMessage(response); + + const routingResponse: WalletServiceRoutingMessage = { + portId: request.portId, + data: transactionResponse, + } + + parentPort!.postMessage(routingResponse); } private isNonceExpiredError(error: any, includeUnderpricedError?: boolean): boolean { From 775a19495922d2bc0fb76313d5140cc92a1af743 Mon Sep 17 00:00:00 2001 From: Jorge Sanmiguel <8038323+jsanmigimeno@users.noreply.github.com> Date: Sat, 25 May 2024 17:56:57 +0000 Subject: [PATCH 2/2] feat: Wallet worker restart mechanism --- src/wallet/wallet.interface.ts | 30 ++++-- src/wallet/wallet.service.ts | 188 ++++++++++++++++++++++++--------- src/wallet/wallet.worker.ts | 24 +---- 3 files changed, 163 insertions(+), 79 deletions(-) diff --git a/src/wallet/wallet.interface.ts b/src/wallet/wallet.interface.ts index f962688..7df2c29 100644 --- a/src/wallet/wallet.interface.ts +++ b/src/wallet/wallet.interface.ts @@ -1,7 +1,7 @@ import { TransactionReceipt, TransactionRequest, TransactionResponse } from 'ethers6'; import { MessagePort } from 'worker_threads'; import { WalletTransactionOptions, WalletTransactionRequestMessage, WalletTransactionRequestResponse } from './wallet.types'; -import { tryErrorToString } from 'src/common/utils'; +import { WALLET_WORKER_CRASHED_MESSAGE_ID } from './wallet.service'; export interface TransactionResult { txRequest: TransactionRequest; @@ -30,19 +30,31 @@ export class WalletInterface { const messageId = this.getNextPortMessageId(); const resultPromise = new Promise>(resolve => { - const listener = (data: WalletTransactionRequestResponse) => { - if (data.messageId == messageId) { + const listener = (data: any) => { + if (data.messageId === messageId) { this.port.off("message", listener); + const walletResponse = data as WalletTransactionRequestResponse; + const result = { - txRequest: data.txRequest, - metadata: data.metadata, - tx: data.tx, - txReceipt: data.txReceipt, - submissionError: tryErrorToString(data.submissionError), - confirmationError: tryErrorToString(data.confirmationError) + txRequest: walletResponse.txRequest, + metadata: walletResponse.metadata, + tx: walletResponse.tx, + txReceipt: walletResponse.txReceipt, + submissionError: data.submissionError, + confirmationError: data.confirmationError }; resolve(result); + } else if (data.messageId === WALLET_WORKER_CRASHED_MESSAGE_ID) { + this.port.off("message", listener); + + const result = { + txRequest: transaction, + metadata, + submissionError: new Error('Wallet crashed.'), //TODO use a custom error type? + confirmationError: new Error('Wallet crashed.'), //TODO use a custom error type? + }; + resolve(result); } }; this.port.on("message", listener); diff --git a/src/wallet/wallet.service.ts b/src/wallet/wallet.service.ts index 2cf7b12..566a5d9 100644 --- a/src/wallet/wallet.service.ts +++ b/src/wallet/wallet.service.ts @@ -8,6 +8,8 @@ import { WalletServiceRoutingMessage, WalletTransactionRequestMessage } from './ import { Wallet } from 'ethers6'; import { tryErrorToString } from 'src/common/utils'; +export const WALLET_WORKER_CRASHED_MESSAGE_ID = -1; + const DEFAULT_WALLET_RETRY_INTERVAL = 30000; const DEFAULT_WALLET_PROCESSING_INTERVAL = 100; const DEFAULT_WALLET_MAX_TRIES = 3; @@ -56,12 +58,21 @@ export interface WalletWorkerData { } +interface PortDescription { + chainId: string; + port: MessagePort; +} + @Global() @Injectable() export class WalletService implements OnModuleInit { + private readonly defaultWorkerConfig: DefaultWalletWorkerData; + private workers: Record = {}; private portsCount = 0; - private readonly ports: Record = {}; + private readonly ports: Record = {}; + + private readonly queuedRequests: Record = {}; readonly publicKey: string; @@ -69,6 +80,7 @@ export class WalletService implements OnModuleInit { private readonly configService: ConfigService, private readonly loggerService: LoggerService, ) { + this.defaultWorkerConfig = this.loadDefaultWorkerConfig(); this.publicKey = (new Wallet(this.configService.globalConfig.privateKey)).address; } @@ -81,44 +93,9 @@ export class WalletService implements OnModuleInit { } private async initializeWorkers(): Promise { - const defaultWorkerConfig = this.loadDefaultWorkerConfig(); for (const [chainId,] of this.configService.chainsConfig) { - - const workerData = this.loadWorkerConfig(chainId, defaultWorkerConfig); - - const worker = new Worker(join(__dirname, 'wallet.worker.js'), { - workerData - }); - this.workers[chainId] = worker; - - worker.on('error', (error) => - this.loggerService.fatal( - { error: tryErrorToString(error), chainId }, - `Error on wallet worker.`, - ), - ); - - worker.on('exit', (exitCode) => { - this.workers[chainId] = null; - this.loggerService.fatal( - { exitCode, chainId }, - `Wallet worker exited.`, - ); - }); - - worker.on('message', (message: WalletServiceRoutingMessage) => { - const port = this.ports[message.portId]; - if (port == undefined) { - this.loggerService.error( - message, - `Unable to route transaction response on wallet: port id not found.` - ); - return; - } - - port.postMessage(message.data); - }) + this.spawnWorker(chainId); } // Add a small delay to wait for the workers to be initialized @@ -166,9 +143,10 @@ export class WalletService implements OnModuleInit { private loadWorkerConfig( chainId: string, - defaultConfig: DefaultWalletWorkerData ): WalletWorkerData { + const defaultConfig = this.defaultWorkerConfig; + const chainConfig = this.configService.chainsConfig.get(chainId); if (chainConfig == undefined) { throw new Error(`Unable to load config for chain ${chainId}`); @@ -229,6 +207,56 @@ export class WalletService implements OnModuleInit { }; } + private spawnWorker( + chainId: string + ): void { + const workerData = this.loadWorkerConfig(chainId); + this.loggerService.info( + { + chainId, + workerData, + }, + `Spawning wallet worker.` + ); + + const worker = new Worker(join(__dirname, 'wallet.worker.js'), { + workerData + }); + this.workers[chainId] = worker; + + worker.on('error', (error) => + this.loggerService.error( + { error: tryErrorToString(error), chainId }, + `Error on wallet worker.`, + ), + ); + + worker.on('exit', (exitCode) => { + this.workers[chainId] = null; + this.loggerService.error( + { exitCode, chainId }, + `Wallet worker exited.`, + ); + + this.abortPendingRequests(chainId); + this.spawnWorker(chainId); + this.recoverQueuedMessages(chainId); + }); + + worker.on('message', (message: WalletServiceRoutingMessage) => { + const portDescription = this.ports[message.portId]; + if (portDescription == undefined) { + this.loggerService.error( + message, + `Unable to route transaction response on wallet: port id not found.` + ); + return; + } + + portDescription.port.postMessage(message.data); + }); + } + private initiateIntervalStatusLog(): void { const logStatus = () => { const activeWorkers = []; @@ -247,26 +275,90 @@ export class WalletService implements OnModuleInit { } async attachToWallet(chainId: string): Promise { - const worker = this.workers[chainId]; - - if (worker == undefined) { - throw new Error(`Wallet does not exist for chain ${chainId}`); - } const portId = this.portsCount++; const { port1, port2 } = new MessageChannel(); port1.on('message', (message: WalletTransactionRequestMessage) => { - const routingMessage: WalletServiceRoutingMessage = { + this.handleTransactionRequestMessage( + chainId, portId, - data: message - }; - worker.postMessage(routingMessage); + message, + ); }); - this.ports[portId] = port1; + this.ports[portId] = { + chainId, + port: port1, + }; return port2; } + + private handleTransactionRequestMessage( + chainId: string, + portId: number, + message: WalletTransactionRequestMessage + ): void { + const worker = this.workers[chainId]; + + const routingMessage: WalletServiceRoutingMessage = { + portId, + data: message + }; + + if (worker == undefined) { + this.loggerService.warn( + { + chainId, + portId, + message + }, + `Wallet does not exist for the requested chain. Queueing message.` + ); + + if (!(chainId in this.queuedRequests)) { + this.queuedRequests[chainId] = []; + } + this.queuedRequests[chainId]!.push(routingMessage); + } else { + worker.postMessage(routingMessage); + } + } + + private abortPendingRequests( + chainId: string, + ): void { + for (const portDescription of Object.values(this.ports)) { + if (portDescription.chainId === chainId) { + portDescription.port.postMessage({ + messageId: WALLET_WORKER_CRASHED_MESSAGE_ID + }); + } + } + } + + private recoverQueuedMessages( + chainId: string, + ): void { + const queuedRequests = this.queuedRequests[chainId] ?? []; + this.queuedRequests[chainId] = []; + + this.loggerService.info( + { + chainId, + count: queuedRequests.length, + }, + `Recovering queued wallet requests.` + ); + + for (const request of queuedRequests) { + this.handleTransactionRequestMessage( + chainId, + request.portId, + request.data, + ); + } + } } diff --git a/src/wallet/wallet.worker.ts b/src/wallet/wallet.worker.ts index 21b8b00..70b0605 100644 --- a/src/wallet/wallet.worker.ts +++ b/src/wallet/wallet.worker.ts @@ -1,12 +1,12 @@ import { JsonRpcProvider, Wallet, Provider, AbstractProvider, ZeroAddress, TransactionResponse, TransactionReceipt, TransactionRequest } from "ethers6"; import pino, { LoggerOptions } from "pino"; -import { workerData, parentPort, MessageChannel, MessagePort } from 'worker_threads'; +import { workerData, parentPort, MessagePort } from 'worker_threads'; import { tryErrorToString, wait } from "src/common/utils"; import { STATUS_LOG_INTERVAL } from "src/logger/logger.service"; import { TransactionHelper } from "./transaction-helper"; import { ConfirmQueue } from "./queues/confirm-queue"; import { WalletWorkerData } from "./wallet.service"; -import { ConfirmedTransaction, GasFeeConfig, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestMessage, WalletTransactionRequestResponse, BalanceConfig, WalletServiceRoutingMessage } from "./wallet.types"; +import { ConfirmedTransaction, GasFeeConfig, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestResponse, BalanceConfig, WalletServiceRoutingMessage } from "./wallet.types"; import { SubmitQueue } from "./queues/submit-queue"; @@ -161,26 +161,6 @@ class WalletWorker { }); } - private registerNewPort(): MessagePort { - - const portId = this.portsCount++; - - const { port1, port2 } = new MessageChannel(); - - port1.on('message', (message: WalletTransactionRequestMessage) => { - this.addTransaction( - portId, - message.messageId, - message.txRequest, - message.metadata, - message.options - ); - }) - this.ports[portId] = port1; - - return port2; - } - private addTransaction( portId: number, messageId: number,