diff --git a/src/wallet/wallet.interface.ts b/src/wallet/wallet.interface.ts index f962688..7df2c29 100644 --- a/src/wallet/wallet.interface.ts +++ b/src/wallet/wallet.interface.ts @@ -1,7 +1,7 @@ import { TransactionReceipt, TransactionRequest, TransactionResponse } from 'ethers6'; import { MessagePort } from 'worker_threads'; import { WalletTransactionOptions, WalletTransactionRequestMessage, WalletTransactionRequestResponse } from './wallet.types'; -import { tryErrorToString } from 'src/common/utils'; +import { WALLET_WORKER_CRASHED_MESSAGE_ID } from './wallet.service'; export interface TransactionResult { txRequest: TransactionRequest; @@ -30,19 +30,31 @@ export class WalletInterface { const messageId = this.getNextPortMessageId(); const resultPromise = new Promise>(resolve => { - const listener = (data: WalletTransactionRequestResponse) => { - if (data.messageId == messageId) { + const listener = (data: any) => { + if (data.messageId === messageId) { this.port.off("message", listener); + const walletResponse = data as WalletTransactionRequestResponse; + const result = { - txRequest: data.txRequest, - metadata: data.metadata, - tx: data.tx, - txReceipt: data.txReceipt, - submissionError: tryErrorToString(data.submissionError), - confirmationError: tryErrorToString(data.confirmationError) + txRequest: walletResponse.txRequest, + metadata: walletResponse.metadata, + tx: walletResponse.tx, + txReceipt: walletResponse.txReceipt, + submissionError: data.submissionError, + confirmationError: data.confirmationError }; resolve(result); + } else if (data.messageId === WALLET_WORKER_CRASHED_MESSAGE_ID) { + this.port.off("message", listener); + + const result = { + txRequest: transaction, + metadata, + submissionError: new Error('Wallet crashed.'), //TODO use a custom error type? + confirmationError: new Error('Wallet crashed.'), //TODO use a custom error type? + }; + resolve(result); } }; this.port.on("message", listener); diff --git a/src/wallet/wallet.service.ts b/src/wallet/wallet.service.ts index 12cca2e..566a5d9 100644 --- a/src/wallet/wallet.service.ts +++ b/src/wallet/wallet.service.ts @@ -1,13 +1,15 @@ import { Global, Injectable, OnModuleInit } from '@nestjs/common'; import { join } from 'path'; import { LoggerOptions } from 'pino'; -import { Worker, MessagePort } from 'worker_threads'; +import { Worker, MessagePort, MessageChannel } from 'worker_threads'; import { ConfigService } from 'src/config/config.service'; import { LoggerService, STATUS_LOG_INTERVAL } from 'src/logger/logger.service'; -import { WalletGetPortMessage, WalletGetPortResponse } from './wallet.types'; +import { WalletServiceRoutingMessage, WalletTransactionRequestMessage } from './wallet.types'; import { Wallet } from 'ethers6'; import { tryErrorToString } from 'src/common/utils'; +export const WALLET_WORKER_CRASHED_MESSAGE_ID = -1; + const DEFAULT_WALLET_RETRY_INTERVAL = 30000; const DEFAULT_WALLET_PROCESSING_INTERVAL = 100; const DEFAULT_WALLET_MAX_TRIES = 3; @@ -56,11 +58,21 @@ export interface WalletWorkerData { } +interface PortDescription { + chainId: string; + port: MessagePort; +} + @Global() @Injectable() export class WalletService implements OnModuleInit { + private readonly defaultWorkerConfig: DefaultWalletWorkerData; + private workers: Record = {}; - private requestPortMessageId = 0; + private portsCount = 0; + private readonly ports: Record = {}; + + private readonly queuedRequests: Record = {}; readonly publicKey: string; @@ -68,6 +80,7 @@ export class WalletService implements OnModuleInit { private readonly configService: ConfigService, private readonly loggerService: LoggerService, ) { + this.defaultWorkerConfig = this.loadDefaultWorkerConfig(); this.publicKey = (new Wallet(this.configService.globalConfig.privateKey)).address; } @@ -80,31 +93,9 @@ export class WalletService implements OnModuleInit { } private async initializeWorkers(): Promise { - const defaultWorkerConfig = this.loadDefaultWorkerConfig(); for (const [chainId,] of this.configService.chainsConfig) { - - const workerData = this.loadWorkerConfig(chainId, defaultWorkerConfig); - - const worker = new Worker(join(__dirname, 'wallet.worker.js'), { - workerData - }); - this.workers[chainId] = worker; - - worker.on('error', (error) => - this.loggerService.fatal( - { error: tryErrorToString(error), chainId }, - `Error on wallet worker.`, - ), - ); - - worker.on('exit', (exitCode) => { - this.workers[chainId] = null; - this.loggerService.fatal( - { exitCode, chainId }, - `Wallet worker exited.`, - ); - }); + this.spawnWorker(chainId); } // Add a small delay to wait for the workers to be initialized @@ -152,9 +143,10 @@ export class WalletService implements OnModuleInit { private loadWorkerConfig( chainId: string, - defaultConfig: DefaultWalletWorkerData ): WalletWorkerData { + const defaultConfig = this.defaultWorkerConfig; + const chainConfig = this.configService.chainsConfig.get(chainId); if (chainConfig == undefined) { throw new Error(`Unable to load config for chain ${chainId}`); @@ -215,6 +207,56 @@ export class WalletService implements OnModuleInit { }; } + private spawnWorker( + chainId: string + ): void { + const workerData = this.loadWorkerConfig(chainId); + this.loggerService.info( + { + chainId, + workerData, + }, + `Spawning wallet worker.` + ); + + const worker = new Worker(join(__dirname, 'wallet.worker.js'), { + workerData + }); + this.workers[chainId] = worker; + + worker.on('error', (error) => + this.loggerService.error( + { error: tryErrorToString(error), chainId }, + `Error on wallet worker.`, + ), + ); + + worker.on('exit', (exitCode) => { + this.workers[chainId] = null; + this.loggerService.error( + { exitCode, chainId }, + `Wallet worker exited.`, + ); + + this.abortPendingRequests(chainId); + this.spawnWorker(chainId); + this.recoverQueuedMessages(chainId); + }); + + worker.on('message', (message: WalletServiceRoutingMessage) => { + const portDescription = this.ports[message.portId]; + if (portDescription == undefined) { + this.loggerService.error( + message, + `Unable to route transaction response on wallet: port id not found.` + ); + return; + } + + portDescription.port.postMessage(message.data); + }); + } + private initiateIntervalStatusLog(): void { const logStatus = () => { const activeWorkers = []; @@ -232,32 +274,91 @@ export class WalletService implements OnModuleInit { setInterval(logStatus, STATUS_LOG_INTERVAL); } + async attachToWallet(chainId: string): Promise { + + const portId = this.portsCount++; + + const { port1, port2 } = new MessageChannel(); - private getNextRequestPortMessageId(): number { - return this.requestPortMessageId++; + port1.on('message', (message: WalletTransactionRequestMessage) => { + this.handleTransactionRequestMessage( + chainId, + portId, + message, + ); + }); + + this.ports[portId] = { + chainId, + port: port1, + }; + + return port2; } - async attachToWallet(chainId: string): Promise { + private handleTransactionRequestMessage( + chainId: string, + portId: number, + message: WalletTransactionRequestMessage + ): void { const worker = this.workers[chainId]; + const routingMessage: WalletServiceRoutingMessage = { + portId, + data: message + }; + if (worker == undefined) { - throw new Error(`Wallet does not exist for chain ${chainId}`); - } + this.loggerService.warn( + { + chainId, + portId, + message + }, + `Wallet does not exist for the requested chain. Queueing message.` + ); - const messageId = this.getNextRequestPortMessageId(); - const portPromise = new Promise((resolve) => { - const listener = (data: WalletGetPortResponse) => { - if (data.messageId == messageId) { - worker.off("message", listener); - resolve(data.port); - } - }; - worker.on("message", listener); + if (!(chainId in this.queuedRequests)) { + this.queuedRequests[chainId] = []; + } + this.queuedRequests[chainId]!.push(routingMessage); + } else { + worker.postMessage(routingMessage); + } + } - const portMessage: WalletGetPortMessage = { messageId }; - worker.postMessage(portMessage); - }); + private abortPendingRequests( + chainId: string, + ): void { + for (const portDescription of Object.values(this.ports)) { + if (portDescription.chainId === chainId) { + portDescription.port.postMessage({ + messageId: WALLET_WORKER_CRASHED_MESSAGE_ID + }); + } + } + } - return portPromise; + private recoverQueuedMessages( + chainId: string, + ): void { + const queuedRequests = this.queuedRequests[chainId] ?? []; + this.queuedRequests[chainId] = []; + + this.loggerService.info( + { + chainId, + count: queuedRequests.length, + }, + `Recovering queued wallet requests.` + ); + + for (const request of queuedRequests) { + this.handleTransactionRequestMessage( + chainId, + request.portId, + request.data, + ); + } } } diff --git a/src/wallet/wallet.types.ts b/src/wallet/wallet.types.ts index 0de3580..a800af5 100644 --- a/src/wallet/wallet.types.ts +++ b/src/wallet/wallet.types.ts @@ -1,17 +1,12 @@ import { TransactionRequest, TransactionReceipt, TransactionResponse } from "ethers6"; -import { MessagePort } from "worker_threads"; // Port Channels Types // ************************************************************************************************ -export interface WalletGetPortMessage { - messageId: number; -} - -export interface WalletGetPortResponse { - messageId: number; - port: MessagePort; +export interface WalletServiceRoutingMessage { + portId: number; + data: T; } //TODO add 'priority' diff --git a/src/wallet/wallet.worker.ts b/src/wallet/wallet.worker.ts index edf107f..70b0605 100644 --- a/src/wallet/wallet.worker.ts +++ b/src/wallet/wallet.worker.ts @@ -1,12 +1,12 @@ import { JsonRpcProvider, Wallet, Provider, AbstractProvider, ZeroAddress, TransactionResponse, TransactionReceipt, TransactionRequest } from "ethers6"; import pino, { LoggerOptions } from "pino"; -import { workerData, parentPort, MessageChannel, MessagePort } from 'worker_threads'; +import { workerData, parentPort, MessagePort } from 'worker_threads'; import { tryErrorToString, wait } from "src/common/utils"; import { STATUS_LOG_INTERVAL } from "src/logger/logger.service"; import { TransactionHelper } from "./transaction-helper"; import { ConfirmQueue } from "./queues/confirm-queue"; import { WalletWorkerData } from "./wallet.service"; -import { ConfirmedTransaction, GasFeeConfig, WalletGetPortMessage, WalletGetPortResponse, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestMessage, WalletTransactionRequestResponse, BalanceConfig } from "./wallet.types"; +import { ConfirmedTransaction, GasFeeConfig, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestResponse, BalanceConfig, WalletServiceRoutingMessage } from "./wallet.types"; import { SubmitQueue } from "./queues/submit-queue"; @@ -66,7 +66,7 @@ class WalletWorker { this.logger ); - this.initializePorts(); + this.initializePort(); this.initiateIntervalStatusLog(); } @@ -149,35 +149,16 @@ class WalletWorker { }; } - private initializePorts(): void { - parentPort!.on('message', (message: WalletGetPortMessage) => { - const port = this.registerNewPort(); - const response: WalletGetPortResponse = { - messageId: message.messageId, - port - }; - parentPort!.postMessage(response, [port]) - }); - } - - private registerNewPort(): MessagePort { - - const portId = this.portsCount++; - - const { port1, port2 } = new MessageChannel(); - - port1.on('message', (message: WalletTransactionRequestMessage) => { + private initializePort(): void { + parentPort!.on('message', (message: WalletServiceRoutingMessage) => { this.addTransaction( - portId, - message.messageId, - message.txRequest, - message.metadata, - message.options + message.portId, + message.data.messageId, + message.data.txRequest, + message.data.metadata, + message.data.options ); - }) - this.ports[portId] = port1; - - return port2; + }); } private addTransaction( @@ -477,13 +458,7 @@ class WalletWorker { confirmationError?: any, ): void { - const port = this.ports[request.portId]; - if (port == undefined) { - this.logger.error({ request }, 'Failed to send transaction result: invalid portId.'); - return; - } - - const response: WalletTransactionRequestResponse = { + const transactionResponse: WalletTransactionRequestResponse = { messageId: request.messageId, txRequest: request.txRequest, metadata: request.metadata, @@ -492,7 +467,13 @@ class WalletWorker { submissionError: tryErrorToString(submissionError), confirmationError: tryErrorToString(confirmationError), } - port.postMessage(response); + + const routingResponse: WalletServiceRoutingMessage = { + portId: request.portId, + data: transactionResponse, + } + + parentPort!.postMessage(routingResponse); } private isNonceExpiredError(error: any, includeUnderpricedError?: boolean): boolean {