From 2bb6d47a8caef17be1e9753df125e266783f84f8 Mon Sep 17 00:00:00 2001 From: jsanmigimeno <8038323+jsanmigimeno@users.noreply.github.com> Date: Fri, 19 Jul 2024 12:52:37 +0200 Subject: [PATCH] [chore]: Store Refactor (#47) * refactor: Complete store refactor * chore: Recover 'transactionBlockNumber' naming * fix: Wormhole recovery (txHash and block data) * fix: LZ messageIdentifier storage * fix: LayerZero AMBMessage generation --- src/collector/layer-zero/layer-zero.worker.ts | 80 +- src/collector/mock/mock.worker.ts | 46 +- src/collector/polymer/polymer.worker.ts | 26 +- .../wormhole/wormhole-engine.worker.ts | 51 +- .../wormhole-message-sniffer.worker.ts | 58 +- .../wormhole/wormhole-recovery.worker.ts | 130 ++- src/getter/getter.worker.ts | 72 +- src/store/persister/persister.worker.ts | 59 +- src/store/postgres/postgres.transformer.ts | 29 +- src/store/store.controller.ts | 16 +- src/store/store.gateway.ts | 13 +- src/store/store.lib.ts | 976 +++++++++--------- src/store/store.types.ts | 149 +++ src/store/types/bounty.enum.ts | 5 - src/store/types/store.types.ts | 72 -- src/submitter/queues/eval-queue.ts | 56 +- src/submitter/queues/submit-queue.ts | 8 +- src/submitter/submitter.types.ts | 16 + src/submitter/submitter.worker.ts | 57 +- 19 files changed, 1056 insertions(+), 863 deletions(-) create mode 100644 src/store/store.types.ts delete mode 100644 src/store/types/bounty.enum.ts delete mode 100644 src/store/types/store.types.ts diff --git a/src/collector/layer-zero/layer-zero.worker.ts b/src/collector/layer-zero/layer-zero.worker.ts index 9467ba3..a04cd79 100644 --- a/src/collector/layer-zero/layer-zero.worker.ts +++ b/src/collector/layer-zero/layer-zero.worker.ts @@ -36,7 +36,7 @@ import { UlnConfigStruct, UlnConfigStructOutput, } from 'src/contracts/ReceiveULN302'; -import { AmbPayload } from 'src/store/types/store.types'; +import { AMBMessage, AMBProof } from 'src/store/store.types'; import { LayerZeroEnpointV2__factory } from 'src/contracts'; import { Resolver, loadResolver } from 'src/resolvers/resolver'; import { ParsePayload } from 'src/payload/decode.payload'; @@ -47,6 +47,11 @@ interface LayerZeroWorkerDataWithMapping extends LayerZeroWorkerData { layerZeroChainIdMap: Record; } +interface LayerZeroPayloadData { + messageIdentifier: string, + payload: string, +} + class LayerZeroWorker { private readonly config: LayerZeroWorkerDataWithMapping; private readonly chainId: string; @@ -72,7 +77,7 @@ class LayerZeroWorker { this.chainId = this.config.chainId; this.layerZeroChainIdMap = this.config.layerZeroChainIdMap; this.incentivesAddresses = this.config.incentivesAddresses; - this.store = new Store(this.chainId); + this.store = new Store(); this.provider = this.initializeProvider(this.config.rpc); this.logger = this.initializeLogger(this.chainId); this.receiveULN302 = ReceiveULN302__factory.connect( @@ -390,21 +395,29 @@ class LayerZeroWorker { ); const transactionBlockNumber = await this.resolver.getTransactionBlockNumber(log.blockNumber); - await this.store.setAmb( - { - messageIdentifier: decodedMessage.messageIdentifier, - amb: 'layer-zero', - sourceChain: srcEidMapped.toString(), - destinationChain: dstEidMapped.toString(), - sourceEscrow: packet.sender, - payload: decodedMessage.message, - recoveryContext: '0x', - blockNumber: log.blockNumber, - transactionBlockNumber, - blockHash: log.blockHash, - transactionHash: log.transactionHash, - }, - log.transactionHash, + + const messageIdentifier = '0x' + decodedMessage.messageIdentifier; + const ambMessage: AMBMessage = { + messageIdentifier, + + amb: 'layer-zero', + fromChainId: srcEidMapped.toString(), + toChainId: dstEidMapped.toString(), + fromIncentivesAddress: '0x' + packet.sender.slice(24), // Keep only the relevant bytes (i.e. discard the first 12 bytes) + // toIncentivesAddress: , //TODO + + incentivesPayload: '0x' + packet.message, + + transactionBlockNumber, + + blockNumber: log.blockNumber, + blockHash: log.blockHash, + transactionHash: log.transactionHash, + } + + await this.store.setAMBMessage( + this.chainId, + ambMessage, ); const payloadHash = this.calculatePayloadHash( @@ -412,15 +425,18 @@ class LayerZeroWorker { packet.message, ); - await this.store.setPayload('layer-zero', 'ambMessage', payloadHash, { - messageIdentifier: decodedMessage.messageIdentifier, - destinationChain: dstEidMapped, - payload: encodedPayload, - }); + await this.store.setAdditionalAMBData( + 'layer-zero', + payloadHash.toLowerCase(), + { + messageIdentifier, + payload: encodedPayload + }, + ); this.logger.info( { - messageIdentifier: decodedMessage.messageIdentifier, + messageIdentifier, transactionHash: log.transactionHash, payloadHash }, @@ -469,7 +485,10 @@ class LayerZeroWorker { { dvn, decodedHeader, confirmations, proofHash }, 'PayloadVerified event decoded.', ); - const payloadData = await this.store.getPayload('layer-zero', 'ambMessage', proofHash); + const payloadData = await this.store.getAdditionalAMBData( + 'layer-zero', + proofHash.toLowerCase() + ); if (!payloadData) { this.logger.error( { proofHash }, @@ -490,17 +509,20 @@ class LayerZeroWorker { proofHash, ); if (isVerifiable) { - const ambPayload: AmbPayload = { - messageIdentifier: '0x' + payloadData.messageIdentifier, + const ambProof: AMBProof = { + messageIdentifier: payloadData.messageIdentifier, + amb: 'layer-zero', - destinationChainId: dstEidMapped.toString(), + fromChainId: srcEidMapped.toString(), + toChainId: dstEidMapped.toString(), + message: payloadData.payload, messageCtx: '0x', }; this.logger.info({ proofHash }, `LayerZero proof found.`); - await this.store.submitProof( + await this.store.setAMBProof( this.layerZeroChainIdMap[decodedHeader.dstEid]!, - ambPayload, + ambProof, ); } else { this.logger.debug('Payload could not be verified'); diff --git a/src/collector/mock/mock.worker.ts b/src/collector/mock/mock.worker.ts index 462b831..333e7dd 100644 --- a/src/collector/mock/mock.worker.ts +++ b/src/collector/mock/mock.worker.ts @@ -3,7 +3,7 @@ import pino from 'pino'; import { convertHexToDecimal, tryErrorToString, wait } from 'src/common/utils'; import { IncentivizedMockEscrow__factory } from 'src/contracts'; import { Store } from 'src/store/store.lib'; -import { AmbMessage, AmbPayload } from 'src/store/types/store.types'; +import { AMBMessage, AMBProof } from 'src/store/store.types'; import { workerData, MessagePort } from 'worker_threads'; import { decodeMockMessage, @@ -65,7 +65,7 @@ class MockCollectorWorker { // Get a connection to the redis store. // The redis store has been wrapped into a lib to make it easier to standardise // communication between the various components. - this.store = new Store(this.chainId); + this.store = new Store(); // Get an Ethers provider with which to collect the bounties information. this.provider = this.initializeProvider(this.config.rpc); @@ -320,25 +320,30 @@ class MockCollectorWorker { log.blockNumber ); - const amb: AmbMessage = { - ...decodedMessage, + const ambMessage: AMBMessage = { + messageIdentifier: decodedMessage.messageIdentifier, + amb: 'mock', - sourceEscrow: this.config.incentivesAddress, - blockNumber: log.blockNumber, + fromChainId: decodedMessage.sourceChain, + toChainId: decodedMessage.destinationChain, + fromIncentivesAddress: this.config.incentivesAddress, + toIncentivesAddress: messageEvent.recipient, + + incentivesPayload: decodedMessage.payload, + transactionBlockNumber, + + blockNumber: log.blockNumber, blockHash: log.blockHash, transactionHash: log.transactionHash } // Set the collect message on-chain. This is not the proof but the raw message. // It can be used by plugins to facilitate other jobs. - await this.store.setAmb(amb, log.transactionHash); - - // Set destination address for the bounty. - await this.store.registerDestinationAddress({ - messageIdentifier: amb.messageIdentifier, - destinationAddress: messageEvent.recipient, - }); + await this.store.setAMBMessage( + this.chainId, + ambMessage + ); // Encode and sign the message for delivery. // This is the proof which enables us to submit the transaciton later. @@ -348,27 +353,30 @@ class MockCollectorWorker { const signature = this.signingKey.sign(keccak256(encodedMessage)); const executionContext = encodeSignature(signature); - const destinationChainId = convertHexToDecimal(amb.destinationChain); + const destinationChainId = convertHexToDecimal(ambMessage.toChainId); // Construct the payload. - const ambPayload: AmbPayload = { - messageIdentifier: amb.messageIdentifier, + const ambPayload: AMBProof = { + messageIdentifier: ambMessage.messageIdentifier, + amb: 'mock', - destinationChainId, + fromChainId: this.chainId, + toChainId: destinationChainId, + message: encodedMessage, messageCtx: executionContext, // If the generalised incentives implementation does not use the context set it to "0x". }; this.logger.info( { - messageIdentifier: amb.messageIdentifier, + messageIdentifier: ambMessage.messageIdentifier, destinationChainId: destinationChainId, }, `Mock message found.`, ); // Submit the proofs to any listeners. If there is a submitter, it will process the proof and submit it. - await this.store.submitProof(destinationChainId, ambPayload); + await this.store.setAMBProof(destinationChainId, ambPayload); } diff --git a/src/collector/polymer/polymer.worker.ts b/src/collector/polymer/polymer.worker.ts index 3ce5163..428c7e7 100644 --- a/src/collector/polymer/polymer.worker.ts +++ b/src/collector/polymer/polymer.worker.ts @@ -2,7 +2,7 @@ import pino from 'pino'; import { tryErrorToString, wait } from 'src/common/utils'; import { IbcEventEmitter__factory } from 'src/contracts'; import { Store } from 'src/store/store.lib'; -import { AmbMessage } from 'src/store/types/store.types'; +import { AMBMessage } from 'src/store/store.types'; import { workerData, MessagePort } from 'worker_threads'; import { PolymerWorkerData } from './polymer'; import { AbiCoder, JsonRpcProvider, Log, LogDescription, zeroPadValue } from 'ethers6'; @@ -40,7 +40,7 @@ class PolymerCollectorSnifferWorker { this.chainId = this.config.chainId; - this.store = new Store(this.chainId); + this.store = new Store(); this.provider = this.initializeProvider(this.config.rpc); this.logger = this.initializeLogger(this.chainId); this.resolver = this.loadResolver( @@ -295,26 +295,30 @@ class PolymerCollectorSnifferWorker { log.blockNumber ); - const amb: AmbMessage = { + const ambMessage: AMBMessage = { messageIdentifier, + amb: 'polymer', - sourceChain: this.chainId, - destinationChain, - sourceEscrow: event.sourcePortAddress, - payload: packet, - blockNumber: log.blockNumber, + fromChainId: this.chainId, + toChainId: destinationChain, + fromIncentivesAddress: event.sourcePortAddress, + + incentivesPayload: packet, + transactionBlockNumber, + + blockNumber: log.blockNumber, blockHash: log.blockHash, transactionHash: log.transactionHash - }; + } // Set the collect message on-chain. This is not the proof but the raw message. // It can be used by plugins to facilitate other jobs. - await this.store.setAmb(amb, log.transactionHash); + await this.store.setAMBMessage(this.chainId, ambMessage); this.logger.info( { - messageIdentifier: amb.messageIdentifier, + messageIdentifier, destinationChainId: destinationChain, }, `Polymer message found.`, diff --git a/src/collector/wormhole/wormhole-engine.worker.ts b/src/collector/wormhole/wormhole-engine.worker.ts index 1a14db3..5f44645 100644 --- a/src/collector/wormhole/wormhole-engine.worker.ts +++ b/src/collector/wormhole/wormhole-engine.worker.ts @@ -8,10 +8,9 @@ import { decodeWormholeMessage } from 'src/collector/wormhole/wormhole.utils'; import { add0X } from 'src/common/utils'; import { workerData } from 'worker_threads'; import { Store } from 'src/store/store.lib'; -import { AmbPayload } from 'src/store/types/store.types'; +import { AMBProof } from 'src/store/store.types'; import pino, { LoggerOptions } from 'pino'; import { - WormholeChainConfig, WormholeChainId, WormholeRelayerEngineWorkerData, } from './wormhole.types'; @@ -35,13 +34,13 @@ class WormholeEngineWorker { private readonly config: WormholeRelayerEngineWorkerData; private readonly logger: pino.Logger; - private readonly stores: Map; + private readonly store: Store; constructor() { this.config = workerData as WormholeRelayerEngineWorkerData; this.logger = this.initializeLogger(this.config.loggerOptions); - this.stores = this.loadStores(this.config.wormholeChainConfigs); + this.store = new Store(); } // Initialization helpers @@ -67,17 +66,6 @@ class WormholeEngineWorker { }); } - private loadStores( - wormholeChainConfig: Map, - ): Map { - const stores: Map = new Map(); - for (const [chainId, wormholeConfig] of wormholeChainConfig) { - stores.set(wormholeConfig.wormholeChainId, new Store(chainId)); - } - - return stores; - } - private async loadWormholeRelayerEngine(): Promise> { const enviroment = this.config.isTestnet ? Environment.TESTNET @@ -206,6 +194,20 @@ class WormholeEngineWorker { add0X(vaa.payload.toString('hex')), ); + const sourceChainId = this.config.wormholeChainIdMap.get( + vaa.emitterChain, + ); + if (sourceChainId == undefined) { + this.logger.warn( + { + vaa, + sourceWormholeChainId: vaa.emitterChain, + }, + `Failed to process VAA: source chain id given Wormhole chain id not found.`, + ); + return; + } + const destinationChainId = this.config.wormholeChainIdMap.get( wormholeInfo.destinationWormholeChainId, ); @@ -221,10 +223,13 @@ class WormholeEngineWorker { return; } - const ambPayload: AmbPayload = { + const ambProof: AMBProof = { messageIdentifier: wormholeInfo.messageIdentifier, + amb: 'wormhole', - destinationChainId, + fromChainId: sourceChainId, + toChainId: destinationChainId, + message: add0X(vaa.bytes.toString('hex')), messageCtx: '0x', }; @@ -234,17 +239,7 @@ class WormholeEngineWorker { `Wormhole VAA found.`, ); - const store = this.stores.get(vaa.emitterChain); - if (store != undefined) { - await store.submitProof(destinationChainId, ambPayload); - } else { - this.logger.warn( - { - wormholeVAAEmitterChain: vaa.emitterChain, - }, - `No 'Store' found for the Wormhole VAA emitter chain id.`, - ); - } + await this.store.setAMBProof(destinationChainId, ambProof); } } diff --git a/src/collector/wormhole/wormhole-message-sniffer.worker.ts b/src/collector/wormhole/wormhole-message-sniffer.worker.ts index e9a89bc..f62c548 100644 --- a/src/collector/wormhole/wormhole-message-sniffer.worker.ts +++ b/src/collector/wormhole/wormhole-message-sniffer.worker.ts @@ -16,6 +16,7 @@ import { AbiCoder, JsonRpcProvider } from 'ethers6'; import { MonitorInterface, MonitorStatus } from 'src/monitor/monitor.interface'; import { Resolver, loadResolver } from 'src/resolvers/resolver'; import { STATUS_LOG_INTERVAL } from 'src/logger/logger.service'; +import { AMBMessage } from 'src/store/store.types'; const defaultAbiCoder = AbiCoder.defaultAbiCoder(); @@ -45,7 +46,7 @@ class WormholeMessageSnifferWorker { this.chainId = this.config.chainId; - this.store = new Store(this.chainId); + this.store = new Store(); this.logger = this.initializeLogger( this.chainId, this.config.loggerOptions, @@ -281,23 +282,6 @@ class WormholeMessageSnifferWorker { log.blockNumber ); - await this.store.setAmb( - { - messageIdentifier: decodedWormholeMessage.messageIdentifier, - amb: 'wormhole', - sourceChain: this.chainId, - destinationChain, - sourceEscrow: log.args.sender, - payload: decodedWormholeMessage.payload, - recoveryContext: log.args.sequence.toString(), - blockNumber: log.blockNumber, - transactionBlockNumber, - blockHash: log.blockHash, - transactionHash: log.transactionHash, - }, - log.transactionHash, - ); - // Decode payload const decodedPayload = ParsePayload(decodedWormholeMessage.payload); if (decodedPayload === undefined) { @@ -305,16 +289,36 @@ class WormholeMessageSnifferWorker { return; } - // Set destination address for the bounty. - await this.store.registerDestinationAddress({ + //TODO the following contract call could fail. Set to 'undefined' and continue on that case? + //TODO cache the query + const toIncentivesAddress = await this.messageEscrowContract.implementationAddress( + decodedPayload?.sourceApplicationAddress, + defaultAbiCoder.encode(['uint256'], [destinationChain]), + ); + + const ambMessage: AMBMessage = { messageIdentifier: decodedWormholeMessage.messageIdentifier, - //TODO the following contract call could fail - destinationAddress: - await this.messageEscrowContract.implementationAddress( - decodedPayload?.sourceApplicationAddress, - defaultAbiCoder.encode(['uint256'], [destinationChain]), - ), - }); + + amb: 'wormhole', + fromChainId: this.chainId, + toChainId: destinationChain, + fromIncentivesAddress: log.args.sender, + toIncentivesAddress, + + incentivesPayload: decodedWormholeMessage.payload, + recoveryContext: log.args.sequence.toString(), + + transactionBlockNumber, + + blockNumber: log.blockNumber, + blockHash: log.blockHash, + transactionHash: log.transactionHash, + }; + + await this.store.setAMBMessage( + this.chainId, + ambMessage, + ); } diff --git a/src/collector/wormhole/wormhole-recovery.worker.ts b/src/collector/wormhole/wormhole-recovery.worker.ts index 8dff720..148b35b 100644 --- a/src/collector/wormhole/wormhole-recovery.worker.ts +++ b/src/collector/wormhole/wormhole-recovery.worker.ts @@ -7,7 +7,7 @@ import { } from '@wormhole-foundation/relayer-engine'; import { decodeWormholeMessage } from './wormhole.utils'; import { add0X } from 'src/common/utils'; -import { AmbPayload } from 'src/store/types/store.types'; +import { AMBMessage, AMBProof } from 'src/store/store.types'; import { ParsePayload } from 'src/payload/decode.payload'; import { IncentivizedMessageEscrow, @@ -19,6 +19,10 @@ import { fetchVAAs } from './api-utils'; const defaultAbiCoder = AbiCoder.defaultAbiCoder(); +interface RecoveredVAAData { + vaa: ParsedVaaWithBytes, + transactionHash: string, +} class WormholeRecoveryWorker { private readonly store: Store; @@ -37,7 +41,7 @@ class WormholeRecoveryWorker { this.chainId = this.config.chainId; - this.store = new Store(this.chainId); + this.store = new Store(); this.logger = this.initializeLogger( this.chainId, this.config.loggerOptions, @@ -92,7 +96,7 @@ class WormholeRecoveryWorker { this.config.stoppingBlock, ); - const vaas = await this.recoverVAAs( + const recoveredVAAs = await this.recoverVAAs( timestamps.startingTimestamp, timestamps.stoppingTimestamp, this.config.wormholeChainId, @@ -101,27 +105,29 @@ class WormholeRecoveryWorker { ); // Store VAAs oldest to newest - for (const [, parsedVAA] of Array.from(vaas).reverse()) { + for (const [, data] of Array.from(recoveredVAAs).reverse()) { try { - await this.processVAA(parsedVAA); + await this.processVAA(data); } catch (error) { this.logger.warn( - { emitterAddress: parsedVAA.emitterAddress, error }, + { emitterAddress: data.vaa.emitterAddress, error }, 'Failed to process recovered VAA', ); } } } - private async processVAA(vaa: ParsedVaaWithBytes): Promise { - await this.processVAAMessage(vaa); - await this.processVAAProof(vaa); + private async processVAA(recoveredVAAData: RecoveredVAAData): Promise { + await this.processVAAMessage(recoveredVAAData); + await this.processVAAProof(recoveredVAAData); } - private async processVAAMessage(vaa: ParsedVaaWithBytes): Promise { + private async processVAAMessage(recoveredVAAData: RecoveredVAAData): Promise { // The following effectively runs the same logic as the 'wormhole.service.ts' worker. When // recovering VAAs, both this and the 'wormhole.service.ts' are executed to prevent VAAs from // being missed in some edge cases (when recovering right before the latest blocks). + const vaa = recoveredVAAData.vaa; + const decodedWormholeMessage = decodeWormholeMessage( vaa.payload.toString('hex'), ); @@ -150,58 +156,89 @@ class WormholeRecoveryWorker { // Emitter address is a 32 bytes buffer: convert to hex string and keep the last 20 bytes const sender = '0x' + vaa.emitterAddress.toString('hex').slice(24); - await this.store.setAmb( - { - messageIdentifier: decodedWormholeMessage.messageIdentifier, - amb: 'wormhole', - sourceChain, - destinationChain, //TODO this should be the chainId and not the wormholeChainId - sourceEscrow: sender, - payload: decodedWormholeMessage.payload, - recoveryContext: vaa.sequence.toString(), - }, - vaa.hash.toString('hex'), - ); - // Decode payload const decodedPayload = ParsePayload(decodedWormholeMessage.payload); if (decodedPayload === undefined) { throw new Error('Could not decode VAA payload.'); } - // Set destination address for the bounty. - await this.store.registerDestinationAddress({ + //TODO the following contract call could fail. Set to 'undefined' and continue on that case? + //TODO cache the query + const toIncentivesAddress = await this.messageEscrowContract.implementationAddress( + decodedPayload?.sourceApplicationAddress, + defaultAbiCoder.encode( + ['uint256'], + [decodedWormholeMessage.destinationWormholeChainId], + ), + ); + + // TODO the following query could fail. Add a retry mechanism. + const transactionHash = recoveredVAAData.transactionHash; + const transactionReceipt = await this.provider.getTransactionReceipt(transactionHash); + + if (transactionReceipt == null) { + throw new Error( + `Failed to recover wormhole VAA: transaction receipt not found for the given hash (${transactionHash}).` + ); + } + + const ambMessage: AMBMessage = { messageIdentifier: decodedWormholeMessage.messageIdentifier, - destinationAddress: - //TODO the following contract call could fail - await this.messageEscrowContract.implementationAddress( - decodedPayload?.sourceApplicationAddress, - defaultAbiCoder.encode( - ['uint256'], - [decodedWormholeMessage.destinationWormholeChainId], - ), - ), - }); + + amb: 'wormhole', + fromChainId: sourceChain, + toChainId: destinationChain, + fromIncentivesAddress: sender, + toIncentivesAddress, + + incentivesPayload: decodedWormholeMessage.payload, + recoveryContext: vaa.sequence.toString(), + + // transactionBlockNumber: , //TODO add resolver and translate the block number + + transactionHash, + blockHash: transactionReceipt.blockHash, + blockNumber: transactionReceipt.blockNumber, + }; + + await this.store.setAMBMessage( + this.chainId, + ambMessage, + ); } - private async processVAAProof(vaa: ParsedVaaWithBytes): Promise { + private async processVAAProof(recoveredVAAData: RecoveredVAAData): Promise { + const vaa = recoveredVAAData.vaa; + const wormholeInfo = decodeWormholeMessage( add0X(vaa.payload.toString('hex')), ); + const sourceChain = this.config.wormholeChainIdMap.get( + vaa.emitterChain, + ); + if (sourceChain == undefined) { + throw new Error( + `Source chain id not found for the given wormhole chain id (${vaa.emitterChain})` + ) + } + const destinationChain = this.config.wormholeChainIdMap.get( wormholeInfo.destinationWormholeChainId, ); if (destinationChain == undefined) { throw new Error( - `Destination chain id not found for the given wormhole chain id (${vaa.emitterChain}`, + `Destination chain id not found for the given wormhole chain id (${wormholeInfo.destinationWormholeChainId}`, ); } - const ambPayload: AmbPayload = { + const ambPayload: AMBProof = { messageIdentifier: wormholeInfo.messageIdentifier, + amb: 'wormhole', - destinationChainId: destinationChain, + fromChainId: sourceChain, + toChainId: destinationChain, + message: add0X(vaa.bytes.toString('hex')), messageCtx: '0x', }; @@ -210,7 +247,7 @@ class WormholeRecoveryWorker { `Wormhole VAA found.`, ); - await this.store.submitProof(destinationChain, ambPayload); + await this.store.setAMBProof(destinationChain, ambPayload); } private async getTimestampsFromBlockNumbers( @@ -259,8 +296,8 @@ class WormholeRecoveryWorker { emitterAddress: string, pageSize = 1000, searchDelay = 1000, - ): Promise> { - const foundVAAs = new Map(); + ): Promise> { + const foundVAAs = new Map(); let pageIndex = 0; while (true) { @@ -288,7 +325,14 @@ class WormholeRecoveryWorker { const parsedVaa = parseVaaWithBytes(Buffer.from(vaa.vaa, 'base64')); - foundVAAs.set(vaa.sequence, parsedVaa); // Use 'Map' to avoid duplicates + // Use 'Map' to avoid duplicates + foundVAAs.set( + vaa.sequence, + { + vaa: parsedVaa, + transactionHash: '0x' + vaa.txHash, + } + ); } if (searchComplete) break; diff --git a/src/getter/getter.worker.ts b/src/getter/getter.worker.ts index 3503829..0fe5aee 100644 --- a/src/getter/getter.worker.ts +++ b/src/getter/getter.worker.ts @@ -8,6 +8,7 @@ import { JsonRpcProvider, Log, LogDescription } from 'ethers6'; import { BountyClaimedEvent, BountyIncreasedEvent, BountyPlacedEvent, IMessageEscrowEventsInterface, MessageDeliveredEvent } from 'src/contracts/IMessageEscrowEvents'; import { MonitorInterface, MonitorStatus } from 'src/monitor/monitor.interface'; import { STATUS_LOG_INTERVAL } from 'src/logger/logger.service'; +import { BountyClaimedEventDetails, BountyIncreasedEventDetails, BountyPlacedEventDetails, MessageDeliveredEventDetails } from 'src/store/store.types'; class GetterWorker { @@ -34,7 +35,7 @@ class GetterWorker { this.chainId = this.config.chainId; - this.store = new Store(this.chainId); + this.store = new Store(); this.provider = this.initializeProvider(this.config.rpc); this.logger = this.initializeLogger(this.chainId); @@ -289,16 +290,29 @@ class GetterWorker { const event = parsedLog.args as unknown as BountyPlacedEvent.OutputObject; const messageIdentifier = event.messageIdentifier; - const incentive = event.incentive; this.logger.info({ messageIdentifier }, `BountyPlaced event found.`); - await this.store.registerBountyPlaced({ - messageIdentifier, - incentive, - incentivesAddress: log.address, + const eventDetails: BountyPlacedEventDetails = { transactionHash: log.transactionHash, - }); + blockHash: log.blockHash, + blockNumber: log.blockNumber, + + fromChainId: this.chainId, + incentivesAddress: log.address, + + maxGasDelivery: event.incentive.maxGasDelivery, + maxGasAck: event.incentive.maxGasAck, + refundGasTo: event.incentive.refundGasTo, + priceOfDeliveryGas: event.incentive.priceOfDeliveryGas, + priceOfAckGas: event.incentive.priceOfAckGas, + targetDelta: event.incentive.targetDelta, + }; + + await this.store.setBountyPlaced( + messageIdentifier, + eventDetails, + ); }; private async handleBountyClaimedEvent( @@ -312,11 +326,16 @@ class GetterWorker { this.logger.info({ messageIdentifier }, `BountyClaimed event found.`); - await this.store.registerBountyClaimed({ - messageIdentifier, - incentivesAddress: log.address, + const eventDetails: BountyClaimedEventDetails = { transactionHash: log.transactionHash, - }); + blockHash: log.blockHash, + blockNumber: log.blockNumber, + }; + + await this.store.setBountyClaimed( + messageIdentifier, + eventDetails + ); }; private async handleMessageDeliveredEvent( @@ -330,11 +349,18 @@ class GetterWorker { this.logger.info({ messageIdentifier }, `MessageDelivered event found.`); - await this.store.registerMessageDelivered({ - messageIdentifier, - incentivesAddress: log.address, + const eventDetails: MessageDeliveredEventDetails = { transactionHash: log.transactionHash, - }); + blockHash: log.blockHash, + blockNumber: log.blockNumber, + + toChainId: this.chainId, + }; + + await this.store.setMessageDelivered( + messageIdentifier, + eventDetails, + ); }; private async handleBountyIncreasedEvent( @@ -348,13 +374,19 @@ class GetterWorker { this.logger.info({ messageIdentifier }, `BountyIncreased event found.`); - await this.store.registerBountyIncreased({ - messageIdentifier, + const eventDetails: BountyIncreasedEventDetails = { + transactionHash: log.transactionHash, + blockHash: log.blockHash, + blockNumber: log.blockNumber, + newDeliveryGasPrice: event.newDeliveryGasPrice, newAckGasPrice: event.newAckGasPrice, - incentivesAddress: log.address, - transactionHash: log.transactionHash, - }); + }; + + await this.store.setBountyIncreased( + messageIdentifier, + eventDetails, + ); }; diff --git a/src/store/persister/persister.worker.ts b/src/store/persister/persister.worker.ts index 82afaab..9ec42c3 100644 --- a/src/store/persister/persister.worker.ts +++ b/src/store/persister/persister.worker.ts @@ -4,7 +4,7 @@ import { workerData } from 'worker_threads'; import { NodePgDatabase, drizzle } from 'drizzle-orm/node-postgres'; import { migrate } from 'drizzle-orm/node-postgres/migrator'; import { Client } from 'pg'; -import { AmbMessage, BountyJson } from '../types/store.types'; +import { AMBMessage, KeyActionMessage, RelayStateJSON } from '../store.types'; import { bounties, transactions, @@ -13,10 +13,6 @@ import { import { bountyFromJson } from '../postgres/postgres.transformer'; import { and, eq } from 'drizzle-orm'; -type StoreUpdate = { - key: string; - action: 'set' | 'del'; -}; const REDIS_QUEUE_KEY = 'relayer:presister:queue'; @@ -76,16 +72,16 @@ class PersisterWorker { } } - async queueGet(): Promise { + async queueGet(): Promise { const queue = await this.store.redis.get(REDIS_QUEUE_KEY); if (queue) return JSON.parse(queue); return undefined; } - async queueShift(): Promise { + async queueShift(): Promise { const queue = await this.store.redis.get(REDIS_QUEUE_KEY); if (queue) { - const parsedQueue: StoreUpdate[] = JSON.parse(queue); + const parsedQueue: KeyActionMessage[] = JSON.parse(queue); const returnElement = parsedQueue.shift(); await this.store.redis.set(REDIS_QUEUE_KEY, JSON.stringify(parsedQueue)); @@ -95,10 +91,10 @@ class PersisterWorker { return undefined; } - async queuePush(message: StoreUpdate) { + async queuePush(message: KeyActionMessage) { const queue = await this.store.redis.get(REDIS_QUEUE_KEY); if (queue) { - const parsedQueue: StoreUpdate[] = JSON.parse(queue); + const parsedQueue: KeyActionMessage[] = JSON.parse(queue); parsedQueue.push(message); return this.store.redis.set(REDIS_QUEUE_KEY, JSON.stringify(parsedQueue)); @@ -133,7 +129,7 @@ class PersisterWorker { await this.store.on('key', (event: any) => { //TODO verify event format - const message = event as StoreUpdate; + const message = event as KeyActionMessage; void this.queuePush(message); }); // Listen for proofs. Notice that proofs aren't submitted so we need to listen seperately. @@ -149,20 +145,19 @@ class PersisterWorker { // I don't know why, but without the function factory, 'this' is empty and we cannot use this.store or this.logger. async examineKey(key: string) { const keyKeys = key.split(':'); - if (keyKeys.includes(Store.bountyMidfix)) { + if (keyKeys.includes(Store.RELAY_STATE_KEY_PREFIX)) { this.logger.debug(`${key}, bounty`); const value: string | null = await this.store.get(key); if (value === null) return; - const parsedValue: BountyJson = JSON.parse(value); + const parsedValue: RelayStateJSON = JSON.parse(value); // Get or set all transactions. - const { - fromChainId, - toChainId, - submitTransactionHash, - execTransactionHash, - ackTransactionHash, - } = parsedValue; + const fromChainId = parsedValue.bountyPlacedEvent?.fromChainId; + const toChainId = parsedValue.messageDeliveredEvent?.toChainId; + const submitTransactionHash = parsedValue?.bountyPlacedEvent?.transactionHash; + const execTransactionHash = parsedValue?.messageDeliveredEvent?.transactionHash; + const ackTransactionHash = parsedValue?.bountyClaimedEvent?.transactionHash; + let submitTransactionId: number | undefined, execTransactionId: number | undefined, ackTransactionId: number | undefined; @@ -270,11 +265,11 @@ class PersisterWorker { eq(bounties.bountyIdentifier, sqlReadyBounty.bountyIdentifier), ); } - } else if (keyKeys.includes(Store.ambMidfix)) { + } else if (keyKeys.includes(Store.AMB_MESSAGE_KEY_PREFIX)) { this.logger.debug(`${key}, amb`); const value: string | null = await this.store.get(key); if (value === null) return; - const parsedValue: AmbMessage = JSON.parse(value); + const parsedValue: AMBMessage = JSON.parse(value); let bountyId: number; // Get or set an associated bounty. @@ -291,20 +286,30 @@ class PersisterWorker { bountyId = ( await this.db .insert(bounties) - .values({ bountyIdentifier: parsedValue.messageIdentifier }) + .values({ + bountyIdentifier: parsedValue.messageIdentifier, + destinationAddress: parsedValue.toIncentivesAddress, + }) .returning({ id: bounties.id }) )[0]!.id; } else { // Set the bountyId based on the selected. bountyId = bountiesSelected[0]!.id; + // Set the `destinationAddress` within the existing record + await this.db + .update(bounties) + .set({ destinationAddress: parsedValue.toIncentivesAddress }) + .where( + eq(bounties.bountyIdentifier, parsedValue.messageIdentifier), + ); } const sqlReadyBounty: typeof ambPayloads.$inferInsert = { bountyId, amb: parsedValue.amb, - sourceChain: parsedValue.sourceChain, - destinationChain: parsedValue.destinationChain, - payload: parsedValue.payload, + sourceChain: parsedValue.fromChainId, + destinationChain: parsedValue.toChainId, + payload: parsedValue.incentivesPayload, recoveryContext: parsedValue.recoveryContext, }; @@ -327,7 +332,7 @@ class PersisterWorker { this.logger.debug(`Inserting ${key} as amb`); return this.db.insert(ambPayloads).values(sqlReadyBounty); } - } else if (keyKeys.includes(Store.proofMidfix)) { + } else if (keyKeys.includes(Store.AMB_PROOF_KEY_PREFIX)) { this.logger.debug(`${key}, proof`); } return undefined; diff --git a/src/store/postgres/postgres.transformer.ts b/src/store/postgres/postgres.transformer.ts index d4dde18..92194e3 100644 --- a/src/store/postgres/postgres.transformer.ts +++ b/src/store/postgres/postgres.transformer.ts @@ -1,8 +1,8 @@ -import { BountyJson } from '../types/store.types'; +import { RelayStateJSON } from '../store.types'; import { bounties } from './postgres.schema'; export function bountyFromJson( - bounty: BountyJson, + bounty: RelayStateJSON, ): typeof bounties.$inferInsert & { submitTransactionHash?: string; execTransactionHash?: string; @@ -10,17 +10,22 @@ export function bountyFromJson( } { return { bountyIdentifier: bounty.messageIdentifier, - fromChainId: bounty.fromChainId, - toChainId: bounty.toChainId, - maxGasDelivery: bounty.maxGasDelivery?.toString(), - maxGasAck: bounty.maxGasAck?.toString(), - refundGasTo: bounty.refundGasTo, - priceOfDeliveryGas: bounty.priceOfDeliveryGas, - priceOfAckGas: bounty.priceOfAckGas, - targetDelta: bounty.targetDelta, + + fromChainId: bounty.bountyPlacedEvent?.fromChainId, + toChainId: bounty.messageDeliveredEvent?.toChainId, + + maxGasDelivery: bounty.bountyPlacedEvent?.maxGasDelivery?.toString(), + maxGasAck: bounty.bountyPlacedEvent?.maxGasAck?.toString(), + refundGasTo: bounty.bountyPlacedEvent?.refundGasTo, + priceOfDeliveryGas: bounty.bountyIncreasedEvent?.newDeliveryGasPrice + ?? bounty.bountyPlacedEvent?.priceOfDeliveryGas, + priceOfAckGas: bounty.bountyIncreasedEvent?.newAckGasPrice + ?? bounty.bountyPlacedEvent?.priceOfAckGas, + targetDelta: bounty.bountyPlacedEvent?.targetDelta, + bountyStatus: bounty.status, - sourceAddress: bounty.sourceAddress, - destinationAddress: bounty.destinationAddress, + + sourceAddress: bounty.bountyPlacedEvent?.incentivesAddress, }; } diff --git a/src/store/store.controller.ts b/src/store/store.controller.ts index 6d2d8bc..7f18db4 100644 --- a/src/store/store.controller.ts +++ b/src/store/store.controller.ts @@ -1,14 +1,19 @@ import { Body, Controller, Get, Post, Query } from '@nestjs/common'; import { LoggerService } from 'src/logger/logger.service'; import { Store } from './store.lib'; -import { PrioritiseMessage } from './types/store.types'; +import { PrioritiseMessage } from './store.types'; @Controller() export class StoreController { + + private readonly store: Store; + constructor( private readonly loggerService: LoggerService, - ) {} + ) { + this.store = new Store(); + } @Get('getAMBMessages') async getAMBMessages(@Query() query: any): Promise { @@ -19,8 +24,7 @@ export class StoreController { return undefined; //TODO return error } - const store = new Store(chainId); - const amb = await store.getAMBsByTxHash(chainId, txHash); + const amb = await this.store.getAMBMessagesByTransactionHash(chainId, txHash); if (amb != null) return JSON.stringify(amb); } @@ -38,8 +42,8 @@ export class StoreController { `Message prioritisation requested.` ) - const store = new Store(); - await store.setAmbPriority( + await this.store.setAMBMessagePriority( + body.sourceChainId, body.messageIdentifier, true, ); diff --git a/src/store/store.gateway.ts b/src/store/store.gateway.ts index 3a6257b..6b26df3 100644 --- a/src/store/store.gateway.ts +++ b/src/store/store.gateway.ts @@ -1,7 +1,7 @@ import { OnGatewayInit, SubscribeMessage, WebSocketGateway, WsResponse } from "@nestjs/websockets"; import { Observable, Subject } from 'rxjs'; import { LoggerService } from "src/logger/logger.service"; -import { AmbMessage } from "./types/store.types"; +import { AMBMessage } from "./store.types"; import { Store } from "./store.lib"; const newAMBMessageEventName = 'ambMessage'; @@ -12,7 +12,7 @@ export class StoreGateway implements OnGatewayInit { constructor(private readonly loggerService: LoggerService) {} private store = new Store(); - private onAMBMessageObservable = new Subject>(); + private onAMBMessageObservable = new Subject>(); async afterInit() { this.loggerService.info("Store gateway initialized."); @@ -20,7 +20,7 @@ export class StoreGateway implements OnGatewayInit { } @SubscribeMessage(newAMBMessageEventName) - subscribeToAMBMessages(): Observable> { + subscribeToAMBMessages(): Observable> { this.loggerService.info("Client subscribed to new AMB messages.") return this.onAMBMessageObservable; } @@ -28,10 +28,11 @@ export class StoreGateway implements OnGatewayInit { private async listenForNewAMBMessages(): Promise { this.loggerService.info(`Listening for new AMB messages to broadcast.`); - await this.store.on(Store.newAMBChannel, (event: any) => { + const onAMBMessageChannelPattern = Store.getOnAMBMessageChannel('*'); - //TODO verify event format - const message = event as AmbMessage; + await this.store.onPattern(onAMBMessageChannelPattern, (event: any) => { + + const message = event as AMBMessage; this.onAMBMessageObservable.next({ event: newAMBMessageEventName, data: message diff --git a/src/store/store.lib.ts b/src/store/store.lib.ts index a285bdc..4577e26 100644 --- a/src/store/store.lib.ts +++ b/src/store/store.lib.ts @@ -1,127 +1,136 @@ import { Redis } from 'ioredis'; -import { BountyStatus } from 'src/store/types/bounty.enum'; import { - AmbMessage, - AmbPayload, - Bounty, - BountyJson, -} from 'src/store/types/store.types'; + AMBMessage, + AMBProof, + BountyClaimedEventDetails, + BountyIncreasedEventDetails, + BountyPlacedEventDetails, + RelayState, + RelayStateJSON, + RelayStatus, + MessageDeliveredEventDetails, + KeyActionMessage, + AMBMessageJSON, + AMBProofJSON, +} from 'src/store/store.types'; // Monkey patch BigInt. https://github.com/GoogleChromeLabs/jsbi/issues/30#issuecomment-1006086291 (BigInt.prototype as any).toJSON = function () { return this.toString(); }; + +// Constants const DEFAULT_REDIS_PORT = 6379; const DEFAULT_REDIS_DB_INDEX = 4; -//---------- STORE LAYOUT ----------// -// The redis store is used for 2 things: -// 1. Storing bounty information. -// 2. pub/sub for communication between workers. - -// For the storage, bounties are stored against their messageIdentifier. - -// For pub/sub, below are a list of the channels being used: -// 'submit-': { messageIdentifier, destinationChain, message, messageCtx, priority? } -// 'amb': { messageIdentifier, destinationChain, payload } -// 'key': { key, action} - -// Below is a list of general todos on this library. -// TODO: add chainId to the index. -// TODO: Carify storage types: Move Bounty type here? -// TODO: Fix cases where bounty doesn't exist. - -//---------- UNIQUE KEYS ----------// -// Generalised Incentives uses a message identifier: -// return keccak256( -// abi.encodePacked( -// bytes32(block.number), -// chainId(), <-- Assume to be ~unique for an AMB. -// destinationIdentifier, -// message -// ) -// ); -// Assume that only 1 AMB exist -// The result is that the message identifier is unique. -// In a single block, there cannot be 2 equal message identifiers since there is a check to protect -// against 2 equal active message identifiers. Between 2 different blocks they aren't the same since -// hash contains the block number. - -// Now assume multiple exists. None of the previous findings apply since if any -// 2 AMBs share similar chain ids structures you could compute an equal value. - -// As a result, we need to add the AMB id to the message identifier. -// TODO: Figure out how to do. + +// ! IMPORTANT +// ! The 'RelayState' information (i.e. the bounty/message state information gathered from the +// ! GeneralisedIncentives events) is stored just under the `messageIdentifier` that is present on +// ! the queried events. This `messageIdentifier` is generated by the GeneralisedIncentives +// ! contracts; nothing prevents a malicious contract from crafting an identifier that clashes with +// ! that of another GenerlisedIncentives contract. +// ! +// ! Ideally, the 'RelayState' data storage should be also indexed by the source chain id and the +// ! emitting contract address. However, given the limited information contained by the +// ! GeneralisedIncentives events, this is quite complex to achieve. +// ! +// ! For the correct operation of this store it is very important for the `messageIdentifier`s +// ! across the GeneralisedIncentives implementations not to collide. This will always be the +// ! case when using the unaltered GeneralisedIncentives contracts, each deployed with a unique +// ! `uniqueSourceIdentifier`. + + +// TODO find a way to add the chainId to the RelayState index? +// TODO add the 'amb' to the RelayState index? + export class Store { + + // Redis Keys + static readonly KEY_SEPARATOR: string = ':'; + + static readonly RELAY_STATE_KEY_PREFIX: string = 'relay_state'; + static readonly AMB_MESSAGE_KEY_PREFIX: string = 'amb_message'; + static readonly AMB_PROOF_KEY_PREFIX: string = 'amb_proof'; + static readonly AMB_TRANSACTION_HASH_MAP_KEY_PREFIX: string = 'amb_transaction_hash_map'; + + // Redis Channels + static readonly ON_KEY_CHANGE_CHANNEL: string = 'on_key_change_channel'; + static readonly ON_AMB_MESSAGE_CHANNEL_PREFIX: string = 'on_amb_message_channel'; + static readonly ON_AMB_PROOF_CHANNEL_PREFIX: string = 'on_amb_proof_channel'; + + readonly redis: Redis; + readonly redisSubscriptions: Redis; readonly redisHost: string | undefined; readonly redisPort: number; readonly redisDBIndex: number; - // When a redis connection is used to listen for subscriptions, it cannot be - // used for anything except to modify the subscription set which is being listened - // to. As a result, we need a dedicated connection if we ever decide to listen to - // subscriptions. - redisSubscriptions: Redis | undefined; - - static readonly relayerStorePrefix: string = 'relayer'; - static readonly bountyMidfix: string = 'bounty'; - static readonly ambMidfix: string = 'amb'; - static readonly ambPayloadMidfix: string = 'ambPayload'; - static readonly hashAmbMapMidfix: string = 'hashAmbMap'; - static readonly proofMidfix: string = 'proof'; - static readonly wordConnecter: string = ':'; - static readonly destinationAddressPostfix: string = 'destinationAddress'; + private readonly redisConnections: Redis[] = []; - static readonly newAMBChannel: string = 'newAMBChannel'; - readonly chainId: string | null; - - // If chainId is set to null, this should only be used for reading. - constructor(chainId: string | null = null) { - this.chainId = chainId; + constructor() { this.redisHost = this.loadRedisHost(); this.redisPort = this.loadRedisPort(); this.redisDBIndex = this.loadRedisDBIndex(); - this.redis = new Redis(this.redisPort, { - db: this.redisDBIndex, - host: this.redisHost, - }); + // Once a 'redis' connection is used for subscriptions, it may not be used to manage the + // redis storage. Keep 2 distinct redis objects for each purpose. + this.redis = this.getRedisConnection(); + this.redisSubscriptions = this.getRedisConnection(); } + + + // Initialization and state/redis management utils + // ******************************************************************************************** + private loadRedisHost(): string | undefined { return process.env['REDIS_HOST']; } private loadRedisPort(): number { - return process.env['REDIS_PORT'] ? parseInt(process.env['REDIS_PORT']) : DEFAULT_REDIS_PORT; + return process.env['REDIS_PORT'] + ? parseInt(process.env['REDIS_PORT']) + : DEFAULT_REDIS_PORT; } private loadRedisDBIndex(): number { - return process.env['REDIS_DB_INDEX'] ? parseInt(process.env['REDIS_DB_INDEX']) : DEFAULT_REDIS_DB_INDEX; + return process.env['REDIS_DB_INDEX'] + ? parseInt(process.env['REDIS_DB_INDEX']) + : DEFAULT_REDIS_DB_INDEX; + } + + getRedisConnection(): Redis { + const redis = new Redis(this.redisPort, { + db: this.redisDBIndex, + host: this.redisHost, + }); + + this.redisConnections.push(redis); + + return redis; } async quit(): Promise { - await this.redis.quit(); + const quitPromises = this.redisConnections.map( + (redis) => redis.quit() + ); + + await Promise.all(quitPromises); } - // ----- Translation ----- + - async scan(callback: (key: string) => void) { - const stream = this.redis.scanStream({ - match: `${Store.relayerStorePrefix}:*`, - }); + // Key management helpers + // ******************************************************************************************** - stream.on('data', (keys) => { - for (const key of keys) { - callback(key); - } - }); + static formatKey(...components: string[]): string { + return components.join(Store.KEY_SEPARATOR); } async get(key: string) { @@ -129,529 +138,476 @@ export class Store { } async set(key: string, value: string) { - // We want to notify a potential subscribed that there has been a change to this key. - // Lets set the key first. await this.redis.set(key, value); - // Then post that message. - await this.postMessage('key', { key, action: 'set' }); + await this.postMessage( + Store.ON_KEY_CHANGE_CHANNEL, + { key, action: 'set' } + ); } async del(key: string) { await this.redis.del(key); - await this.postMessage('key', { key, action: 'del' }); - } - - // ----- Subscriptions ------ - - /** - * @notice Use this function to get a redis connection for any subscriptions. - * This is because when a SUBSCRIBE calls goes out to redis, the connection can - * only be used to modify the subscriptions or receive them. As a result, any - * redis.get or redis.set or redis.del does not work. - */ - getOrOpenSubscription(): Redis { - if (!this.redisSubscriptions) { - this.redisSubscriptions = new Redis(this.redisPort, { - db: this.redisDBIndex, - host: this.redisHost, - }); - } - return this.redisSubscriptions; + await this.postMessage( + Store.ON_KEY_CHANGE_CHANNEL, + { key, action: 'del' } + ); } + - static getChannel(channel: string, describer: string): string { - return Store.combineString(channel, describer); - } - static combineString(...vals: string[]): string { - return vals.join(Store.wordConnecter); + // Channel helpers + // ******************************************************************************************** + + static getChannel(...components: string[]): string { + return components.join(Store.KEY_SEPARATOR); } - async postMessage(channel: string, payload: { [key: string]: any }) { + async postMessage( + channel: string, + payload: T, + ) { return this.redis.publish( - Store.combineString(Store.relayerStorePrefix, channel), + channel, JSON.stringify(payload), ); } async on( channel: string, - callback: (payload: { [key: string]: any }) => void, + callback: (payload: Record) => void, ) { - const redisSubscriptions = this.getOrOpenSubscription(); - // Subscribe to the channel so that we get messages. - const channelWithprefix = Store.combineString( - Store.relayerStorePrefix, - channel, - ); - await redisSubscriptions.subscribe(channelWithprefix); - // Set the callback when we receive messages function. - redisSubscriptions.on('message', (redis_channel, redis_message) => { - if (redis_channel === channelWithprefix) - callback(JSON.parse(redis_message)); + await this.redisSubscriptions.subscribe(channel); + + this.redisSubscriptions.on('message', (callbackChannel, callbackMessage) => { + if (callbackChannel === channel) { + callback(JSON.parse(callbackMessage)); + } + }); + } + + async onPattern( + pattern: string, + callback: (payload: Record) => void, + ) { + await this.redisSubscriptions.psubscribe(pattern); + + this.redisSubscriptions.on('pmessage', (callbackPattern, _, callbackMessage) => { + if (callbackPattern === pattern) { + callback(JSON.parse(callbackMessage)); + } }); } - // ----- Bounties ------ - //TODO also filter by chain? - async getBounty(messageIdentifier: string): Promise { - const query: string | null = await this.redis.get( - Store.combineString( - Store.relayerStorePrefix, - Store.bountyMidfix, - messageIdentifier, - ), + + // Relay state utils + // ******************************************************************************************** + + static getRelayStateKey(messageIdentifier: string): string { + return Store.formatKey( + Store.RELAY_STATE_KEY_PREFIX, + messageIdentifier.toLowerCase(), ); - const bounty: Bounty | null = - query === null ? undefined : JSON.parse(query); - if ( - bounty != null && - bounty.priceOfDeliveryGas && - bounty.priceOfAckGas && - bounty.targetDelta - ) { - bounty.priceOfDeliveryGas = BigInt(bounty.priceOfDeliveryGas); - bounty.priceOfAckGas = BigInt(bounty.priceOfAckGas); - bounty.targetDelta = BigInt(bounty.targetDelta); - } else { - // TODO: handle this case better. + } + + + async getRelayStateByKey(key: string): Promise { + + const data = await this.redis.get(key); + + const rawState = data != null + ? JSON.parse(data) + : null; + + if (rawState == null) { return null; } - if (bounty.deliveryGasCost != undefined) { - bounty.deliveryGasCost = BigInt(bounty.deliveryGasCost); + // Recover format of types lost on JSON.stringify (bigints). + return { + ...rawState, + + bountyPlacedEvent: rawState.bountyPlacedEvent != undefined + ? { + ...rawState.bountyPlacedEvent, + maxGasDelivery: BigInt(rawState.bountyPlacedEvent.maxGasDelivery), + maxGasAck: BigInt(rawState.bountyPlacedEvent.maxGasAck), + priceOfDeliveryGas: BigInt(rawState.bountyPlacedEvent.priceOfDeliveryGas), + priceOfAckGas: BigInt(rawState.bountyPlacedEvent.priceOfAckGas), + targetDelta: BigInt(rawState.bountyPlacedEvent.targetDelta), + } + : undefined, + + bountyIncreasedEvent: rawState.bountyIncreasedEvent != undefined + ? { + ...rawState.bountyIncreasedEvent, + newDeliveryGasPrice: BigInt(rawState.bountyIncreasedEvent.newDeliveryGasPrice), + newAckGasPrice: BigInt(rawState.bountyIncreasedEvent.newAckGasPrice), + } + : undefined, + + deliveryGasCost: rawState.deliveryGasCost != undefined + ? BigInt(rawState.deliveryGasCost) + : undefined, + }; + } + + async getRelayState(messageIdentifier: string): Promise { + const key = Store.getRelayStateKey(messageIdentifier); + return this.getRelayStateByKey(key); + } + + + // Update the saved RelayState using the existing and incoming state information. + private async updateRelayState(incomingState: RelayState): Promise { + + const key = Store.getRelayStateKey(incomingState.messageIdentifier); + + const data = await this.redis.get(key); + const currentState: RelayStateJSON | null = data != null + ? JSON.parse(data) + : null; + + const overridingState = currentState != null; + const newState = overridingState ? currentState : incomingState; + + if (overridingState) { + newState.bountyPlacedEvent = incomingState.bountyPlacedEvent + ?? currentState.bountyPlacedEvent; + newState.messageDeliveredEvent = incomingState.messageDeliveredEvent + ?? currentState.messageDeliveredEvent; + newState.bountyClaimedEvent = incomingState.bountyClaimedEvent + ?? currentState.bountyClaimedEvent; + + // Only override the 'BountyIncreased' event data if the new prices are higher than the + // currently registered ones (events may not be processed in order). + if (currentState.bountyIncreasedEvent && incomingState.bountyIncreasedEvent) { + const currentIncreaseEvent = currentState.bountyIncreasedEvent; + const incomingIncreaseEvent = incomingState.bountyIncreasedEvent; + + // NOTE: if either field increases, it is guaranteed that the other field will not + // have decreased. + if ( + incomingIncreaseEvent.newDeliveryGasPrice > BigInt(currentIncreaseEvent.newDeliveryGasPrice) + || incomingIncreaseEvent.newAckGasPrice > BigInt(currentIncreaseEvent.newAckGasPrice) + ) { + newState.bountyIncreasedEvent = incomingState.bountyIncreasedEvent; + } else { + newState.bountyIncreasedEvent = currentState.bountyIncreasedEvent; + } + } + else { + newState.bountyIncreasedEvent = currentState.bountyIncreasedEvent + ?? incomingState.bountyIncreasedEvent; + } + + newState.deliveryGasCost = incomingState.deliveryGasCost + ?? currentState.deliveryGasCost; } - return bounty; - } - - /** - * @dev This is generally assumed to be the first time that a bounty is ever seen. - * However, we also wanna be able to run the relayer in a way where registerBountyPlaced might be called - * AFTER it has already been stored. (say the relayer is rerun from scratch on an already populated redis store) - */ - async registerBountyPlaced(event: { - messageIdentifier: string; - incentive: any; - incentivesAddress: string; - transactionHash: string; - }) { - const chainId = this.chainId; - if (chainId === null) - throw new Error('ChainId is not set: This connection is readonly'); - const messageIdentifier = event.messageIdentifier; - const incentive = event.incentive; - - //TODO after the ethers 6 upgrade `maxGasDelivery` and `maxGasAck` are now a bigint. The rest of the code should be refactored/reviewed to work with this change (including persister/explorer). - let bounty: Bounty = { - messageIdentifier: messageIdentifier, - fromChainId: chainId, - maxGasDelivery: Number(incentive.maxGasDelivery), - maxGasAck: Number(incentive.maxGasAck), - refundGasTo: incentive.refundGasTo, - priceOfDeliveryGas: incentive.priceOfDeliveryGas, - priceOfAckGas: incentive.priceOfAckGas, - targetDelta: incentive.targetDelta, - status: BountyStatus.BountyPlaced, - sourceAddress: event.incentivesAddress, - finalised: false, - submitTransactionHash: event.transactionHash, + if (newState.bountyClaimedEvent) { + newState.status = RelayStatus.BountyClaimed; + } else if (newState.messageDeliveredEvent) { + newState.status = RelayStatus.MessageDelivered; + } else { + newState.status = RelayStatus.BountyPlaced; + } + + await this.set(key, JSON.stringify(newState)); + } + + + async setBountyPlaced( + messageIdentifier: string, + bountyPlacedEvent: BountyPlacedEventDetails, + ) { + const relayState: RelayState = { + status: RelayStatus.BountyPlaced, + messageIdentifier, + bountyPlacedEvent, }; - const key = Store.combineString( - Store.relayerStorePrefix, - Store.bountyMidfix, + await this.updateRelayState(relayState); + } + + async setMessageDelivered( + messageIdentifier: string, + messageDeliveredEvent: MessageDeliveredEventDetails, + ) { + const relayState: RelayState = { + status: RelayStatus.MessageDelivered, messageIdentifier, - ); + messageDeliveredEvent, + }; - // Check if there exists an object already here. - const existingValue = await this.get(key); - if (existingValue) { - // There are 2 ways for there to already be a key here. - // 1. The key was set by another event on another chain because the origin chain is too slow / we missed the event. - // 2. We are going over blocks again and hit this event. Either way, we should set the key again but not modify anything which could have - // been entered or modified. - // As a result, fill out the bounty but override the bounty information by anything which might already be present. - bounty = { - ...bounty, // Init the dictionary with the bounty. - ...JSON.parse(existingValue), // Then overwrite with anything that is already stored. - }; - } - await this.set(key, JSON.stringify(bounty)); - } - - /** - * Register the destination address of a bounty. - */ - async registerDestinationAddress(event: { - messageIdentifier: string; - destinationAddress: string; - }) { - const chainId = this.chainId; - if (chainId === null) - throw new Error('ChainId is not set: This connection is readonly'); - const messageIdentifier = event.messageIdentifier; - - // Lets get the bounty. - const key = Store.combineString( - Store.relayerStorePrefix, - Store.bountyMidfix, + await this.updateRelayState(relayState); + } + + async setBountyClaimed( + messageIdentifier: string, + bountyClaimedEvent: BountyClaimedEventDetails, + ) { + const relayState: RelayState = { + status: RelayStatus.MessageDelivered, messageIdentifier, - Store.destinationAddressPostfix, - ); + bountyClaimedEvent, + }; - const bounty = { + await this.updateRelayState(relayState); + } + + async setBountyIncreased( + messageIdentifier: string, + bountyIncreasedEvent: BountyIncreasedEventDetails, + ) { + const relayState: RelayState = { + status: RelayStatus.MessageDelivered, messageIdentifier, - destinationAddress: event.destinationAddress, + bountyIncreasedEvent, }; - // We can set this value now. - return this.set(key, JSON.stringify(bounty)); - } - - /** - * Register how much gas was used to delivery the message associated with the Bounty. - */ - async registerDeliveryCost(event: { - messageIdentifier: string; - deliveryGasCost: bigint; - }): Promise { - const chainId = this.chainId; - if (chainId === null) - throw new Error('ChainId is not set: This connection is readonly'); - - const messageIdentifier = event.messageIdentifier; - - // Get the bounty - const key = Store.combineString( - Store.relayerStorePrefix, - Store.bountyMidfix, + + await this.updateRelayState(relayState); + } + + async setDeliveryCost( + messageIdentifier: string, + deliveryGasCost: bigint, + ): Promise { + const relayState: RelayState = { + status: RelayStatus.MessageDelivered, messageIdentifier, + deliveryGasCost, + }; + + await this.updateRelayState(relayState); + } + + + + // AMB data utils + // ******************************************************************************************** + + // ! NOTE: AMB messages are stored under the source chain id, whereas AMB proofs are stored + // ! under the destination chain id. + + // ! Filter by `chainId` to prevent message deliveries/acks from overwriting each other. + static getAMBMessageKey( + chainId: string, + messageIdentifier: string + ): string { + return Store.formatKey( + Store.AMB_MESSAGE_KEY_PREFIX, + chainId.toLowerCase(), + messageIdentifier.toLowerCase(), ); - const existingValue = await this.redis.get(key); - if (!existingValue) { - return; //TODO This case should never be reached. Add log. - } + } - // Update the bounty information - const bounty: BountyJson = JSON.parse(existingValue); - // It's fine to override the 'BountyJson' type to 'Bounty', as the newly added information - // will be converted on JSON.stringify() (i.e. bigint => string). - (bounty as unknown as Bounty).deliveryGasCost = event.deliveryGasCost; - await this.set(key, JSON.stringify(bounty)); - } - - /** - * @dev This is generally assumed to be the first time the event is seen and the second time that a bounty is seen. - * However, we also wanna be able to run the relayer in a way where the event is seen for the second time AND/OR - * it is the first time the bounty has been seen. - */ - async registerMessageDelivered(event: { - messageIdentifier: string; - incentivesAddress: string; - transactionHash: string; - }) { - const chainId = this.chainId; - if (chainId === null) - throw new Error('ChainId is not set: This connection is readonly'); - const messageIdentifier = event.messageIdentifier; - - // Lets get the bounty. - const key = Store.combineString( - Store.relayerStorePrefix, - Store.bountyMidfix, - messageIdentifier, + // ! Filter by `chainId` to prevent message deliveries/acks from overwriting each other. + static getAMBProofKey( + chainId: string, + messageIdentifier: string + ): string { + return Store.formatKey( + Store.AMB_PROOF_KEY_PREFIX, + chainId.toLowerCase(), + messageIdentifier.toLowerCase(), ); - const existingValue = await this.redis.get(key); - if (!existingValue) { - // Then we need to create some kind of baseline with the information we know. - const bounty = { - messageIdentifier: messageIdentifier, // we know the ID. The ID isn't going to change. - status: BountyStatus.MessageDelivered, // Well, we know the the message has now been delivered. - execTransactionHash: event.transactionHash, - toChainId: this.chainId, - }; - // We can set this value now. - return this.set(key, JSON.stringify(bounty)); - } - // Okay, we know a bounty exists at this value. Lets try to update it without destorying any information. - const bountyAsRead: BountyJson = JSON.parse(existingValue); - const bounty = { - destinationAddress: event.incentivesAddress, - ...bountyAsRead, - status: Math.max(bountyAsRead.status, BountyStatus.MessageDelivered), - execTransactionHash: event.transactionHash, - toChainId: this.chainId, - }; - await this.set(key, JSON.stringify(bounty)); - } - - /** - * @dev This is generally assumed to be the first time the event is seen and the third time that a bounty is seen. - * However, we also wanna be able to run the relayer in a way where the event is seen for the second time AND/OR - * it is the first time the bounty has been seen. - */ - async registerBountyClaimed(event: { - messageIdentifier: string; - incentivesAddress: string; - transactionHash: string; - }) { - const chainId = this.chainId; - if (chainId === null) - throw new Error('ChainId is not set: This connection is readonly'); - const messageIdentifier = event.messageIdentifier; - - // Lets get the bounty. - const key = Store.combineString( - Store.relayerStorePrefix, - Store.bountyMidfix, - messageIdentifier, + } + + // Key to be used to store any additional information required by the AMB-specific collector. + static getAdditionalAMBDataKey( + amb: string, + key: string + ): string { + return Store.formatKey( + Store.AMB_PROOF_KEY_PREFIX, + amb, + key, ); - const existingValue = await this.redis.get(key); - if (!existingValue) { - // Then we need to create some kind of baseline with the information we know. - const bounty = { - messageIdentifier: messageIdentifier, // we know the ID. The ID isn't going to change. - status: BountyStatus.BountyClaimed, // Well, we know the the message has now been delivered. - ackTransactionHash: event.transactionHash, - fromChainId: this.chainId, - sourceAddress: event.incentivesAddress, - }; - // We can set this value now. - return this.set(key, JSON.stringify(bounty)); - } - // Okay, we know a bounty exists at this value. Lets try to update it without destorying any information. - const bountyAsRead: BountyJson = JSON.parse(existingValue); - const bounty = { - sourceAddress: event.incentivesAddress, - ...bountyAsRead, - status: Math.max(BountyStatus.BountyClaimed, bountyAsRead.status), - ackTransactionHash: event.transactionHash, - fromChainId: this.chainId, - }; - await this.set(key, JSON.stringify(bounty)); - } - - /** - * @dev This is STRICTLY assumed to be the second time that a bounty is seen. - * This function cannot handle the case where it is the first time that the bounty is seen. - */ - async registerBountyIncreased(event: { - messageIdentifier: string; - newDeliveryGasPrice: bigint; - newAckGasPrice: bigint; - incentivesAddress: string; - transactionHash: string; - }) { - const chainId = this.chainId; - if (chainId === null) - throw new Error('ChainId is not set: This connection is readonly'); - const messageIdentifier = event.messageIdentifier; - const newDeliveryGasPrice: bigint = event.newDeliveryGasPrice; - const newAckGasPrice: bigint = event.newAckGasPrice; - - // Lets get the bounty. - const key = Store.combineString( - Store.relayerStorePrefix, - Store.bountyMidfix, - messageIdentifier, + } + + static getAMBMessagesByTransactionHashKey( + chainId: string, + txHash: string, + ): string { + return Store.formatKey( + Store.AMB_TRANSACTION_HASH_MAP_KEY_PREFIX, + chainId.toLowerCase(), + txHash.toLowerCase(), ); - const existingValue = await this.redis.get(key); - if (!existingValue) { - // Then we need to create some kind of baseline with the information we know. - const bounty = { - messageIdentifier: messageIdentifier, // we know the ID. The ID isn't going to change. - priceOfDeliveryGas: newDeliveryGasPrice, - priceOfAckGas: newAckGasPrice, - sourceAddress: event.incentivesAddress, - }; - // We can set this value now. - return this.set(key, JSON.stringify(bounty)); - } - // We know a bounty exists at this value. - const bountyAsRead: BountyJson = JSON.parse(existingValue); - // Lets check if there exists values for it, otherwise set to 0. Remember, these are stored as strings. - const currentDeliveryGasPrice = BigInt( - bountyAsRead.priceOfDeliveryGas ?? '0', + } + + + + static getOnAMBMessageChannel( + chainId: string, + ): string { + return Store.getChannel( + Store.ON_AMB_MESSAGE_CHANNEL_PREFIX, + chainId.toLowerCase(), + ); + } + + static getOnAMBProofChannel( + chainId: string, + ): string { + return Store.getChannel( + Store.ON_AMB_PROOF_CHANNEL_PREFIX, + chainId.toLowerCase(), ); - const currentAckGasPrice = BigInt(bountyAsRead.priceOfAckGas ?? '0'); - // Otherwise we need to check get the maximums of current and observed new values. - const hasDeliveryGasPriceIncreased = - currentDeliveryGasPrice < newDeliveryGasPrice; - const hasAckGasPriceIncreased = currentAckGasPrice < newAckGasPrice; - const hasChanged = hasDeliveryGasPriceIncreased || hasAckGasPriceIncreased; - // If hasChanged is false, then we don't need to do anything. - if (hasChanged) { - const newBounty = { - sourceAddress: event.incentivesAddress, - ...bountyAsRead, - priceOfDeliveryGas: hasDeliveryGasPriceIncreased - ? newDeliveryGasPrice - : currentDeliveryGasPrice, - priceOfAckGas: hasAckGasPriceIncreased - ? newAckGasPrice - : currentAckGasPrice, - }; - - await this.set(key, JSON.stringify(newBounty)); - } } - // ----- AMB ------ - async getAmb(swapIdentifier: string): Promise { - const query: string | null = await this.redis.get( - Store.combineString( - Store.relayerStorePrefix, - Store.ambMidfix, - swapIdentifier, - ), + + async getAMBMessage( + chainId: string, + messageIdentifier: string + ): Promise { + const key = Store.getAMBMessageKey( + chainId, + messageIdentifier, ); - const amb: AmbMessage | null = - query === null ? undefined : JSON.parse(query); - return amb; + const data = await this.redis.get(key); + + const parsedData: AMBMessageJSON | null = data != null + ? JSON.parse(data) + : null; + + return parsedData; } - async getAmbPayload( + async getAMBProof( chainId: string, messageIdentifier: string, - ): Promise { - const query: string | null = await this.redis.get( - Store.combineString( - Store.relayerStorePrefix, - Store.ambPayloadMidfix, - chainId, - messageIdentifier, - ), + ): Promise { + const key = Store.getAMBProofKey( + chainId, + messageIdentifier, ); - const amb: AmbPayload | null = - query === null ? undefined : JSON.parse(query); - return amb; + const data = await this.redis.get(key); + + const parsedData: AMBProofJSON | null = data != null + ? JSON.parse(data) + : null; + + return parsedData; } - async getAMBsByTxHash( + async getAMBMessagesByTransactionHash( chainId: string, - txHash: string, - ): Promise { - const query: string | null = await this.redis.get( - Store.combineString( - Store.relayerStorePrefix, - Store.hashAmbMapMidfix, - chainId, - txHash, - ), + transactionHash: string, + ): Promise { + const key = Store.getAMBMessagesByTransactionHashKey( + chainId, + transactionHash, ); + const messageIdentifiers: string[] = await this.redis.lrange(key, 0, -1); - const messageIdentifiers: string[] = - query === null ? [] : JSON.parse(query); - - const ambs: Promise[] = []; + const ambMessagesPromise: Promise[] = []; for (const messageId of messageIdentifiers) { - ambs.push(this.getAmb(messageId)); + ambMessagesPromise.push(this.getAMBMessage(chainId, messageId)); } - return (await Promise.all(ambs)).filter( - (amb) => amb != undefined, - ) as AmbMessage[]; + return (await Promise.all(ambMessagesPromise)).filter( + (amb): amb is AMBMessage => amb != undefined, + ); } - /** - * Set an Amb message (not payload). - */ - async setAmb(amb: AmbMessage, txHash: string): Promise { - const key = Store.combineString( - Store.relayerStorePrefix, - Store.ambMidfix, - amb.messageIdentifier, + + async setAMBMessage( + chainId: string, + ambMessage: AMBMessage, + ): Promise { + + const key = Store.getAMBMessageKey( + chainId, + ambMessage.messageIdentifier, ); - await this.set(key, JSON.stringify(amb)); - await this.registerAmbTxHash( - amb.sourceChain, - amb.messageIdentifier, - txHash, + await this.set(key, JSON.stringify(ambMessage)); + + await this.setAMBMessageTransactionHash( + ambMessage.fromChainId, + ambMessage.messageIdentifier, + ambMessage.transactionHash, ); - await this.postMessage(Store.newAMBChannel, amb); + const channel = Store.getOnAMBMessageChannel( + chainId + ); + await this.postMessage(channel, ambMessage); } - async setAmbPriority(messageIdentifier: string, priority: boolean): Promise { - const key = Store.combineString( - Store.relayerStorePrefix, - Store.ambMidfix, - messageIdentifier, + async setAMBMessagePriority( + chainId: string, + messageIdentifier: string, + priority: boolean + ): Promise { + const key = Store.getAMBMessageKey( + chainId, + messageIdentifier ); - const query: string | null = await this.redis.get(key); + const data = await this.redis.get(key); - if (query == null) { + if (data == null) { throw new Error( - `Unable to set AMB priority: AMB message not found (message identifier: ${messageIdentifier}` + `Unable to set AMB priority: AMB message not found (message identifier: ${messageIdentifier})` ); } - const amb: AmbMessage = JSON.parse(query); + const amb: AMBMessageJSON = JSON.parse(data); amb.priority = priority; - await this.set(key, JSON.stringify(amb)); - } - async setAmbPayload(amb: AmbPayload): Promise { - const chainId = this.chainId; - if (chainId === null) - throw new Error('ChainId is not set: This connection is readonly'); - const key = Store.combineString( - Store.relayerStorePrefix, - Store.ambPayloadMidfix, - chainId, - amb.messageIdentifier, - ); await this.set(key, JSON.stringify(amb)); } - async registerAmbTxHash( + async setAMBMessageTransactionHash( chainId: string, messageIdentifier: string, - txHash: string, + transactionHash: string, ): Promise { - const key = Store.combineString( - Store.relayerStorePrefix, - Store.hashAmbMapMidfix, + const key = Store.getAMBMessagesByTransactionHashKey( chainId, - txHash, + transactionHash, ); - const currentValue = await this.get(key); - if (currentValue == null) { - await this.set(key, JSON.stringify([messageIdentifier])); - } else { - const parsedValue = JSON.parse(currentValue); - parsedValue.push(messageIdentifier); - await this.set(key, JSON.stringify(parsedValue)); - } + await this.redis.rpush(key, messageIdentifier); } - /** - * Proofs are only available through subscriptions. This posts the messages to any - * listeners on Store.getChannel('submit', destinationChain). - */ - async submitProof(destinationChain: string, ambPayload: AmbPayload) { - const emitToChannel = Store.getChannel('submit', destinationChain); + async setAMBProof(chainId: string, ambProof: AMBProof) { + const key = Store.getAMBProofKey( + chainId, + ambProof.messageIdentifier + ); + await this.set(key, JSON.stringify(ambProof)); - await this.setAmbPayload(ambPayload); - await this.postMessage(emitToChannel, ambPayload); + const channel = Store.getOnAMBProofChannel( + chainId + ); + await this.postMessage(channel, ambProof); } - async setPayload(prefix: string, suffix: string, payloadHash: string, dataToStore: any): Promise { - const key = `${prefix}:${suffix}:${payloadHash.toLowerCase()}`; - const data = JSON.stringify(dataToStore); - await this.redis.set(key, data); + async setAdditionalAMBData(amb: string, key: string, data: T): Promise { + const fullKey = Store.getAdditionalAMBDataKey( + amb, + key + ); + await this.set(fullKey, JSON.stringify(data)); } - async getPayload(prefix: string, suffix: string, payloadHash: string): Promise { - const key = `${prefix}:${suffix}:${payloadHash.toLowerCase()}`; - const result = await this.redis.get(key); - return result ? JSON.parse(result) : null; + async getAdditionalAMBData(amb: string, key: string): Promise { + const fullKey = Store.getAdditionalAMBDataKey( + amb, + key + ); + const rawData = await this.get(fullKey); + + return rawData != null + ? JSON.parse(rawData) + : null; } + } diff --git a/src/store/store.types.ts b/src/store/store.types.ts new file mode 100644 index 0000000..f80553b --- /dev/null +++ b/src/store/store.types.ts @@ -0,0 +1,149 @@ +// Store types +// ************************************************************************************************ + +export interface TransactionDescription { + transactionHash: string; + blockHash: string; + blockNumber: number; +} + +export interface KeyActionMessage { + key: string; + action: 'set' | 'del'; +} + + + +// AMB related types +// ************************************************************************************************ + +export interface AMBMessage extends TransactionDescription { + messageIdentifier: string; + + amb: string; + fromChainId: string; + toChainId: string; + fromIncentivesAddress: string; + toIncentivesAddress?: string; + + incentivesPayload: string; + recoveryContext?: string; + additionalData?: T; + + transactionBlockNumber?: number; // The block number as seen by the transaction. + + priority?: boolean; +}; + +export type AMBMessageJSON = AMBMessage; + + +export interface AMBProof { + messageIdentifier: string; + + amb: string; + fromChainId: string; + toChainId: string; + + message: string; + messageCtx?: string; +}; + +export type AMBProofJSON = AMBProof; + + + +// Relay and Bounty types +// ************************************************************************************************ + +export enum RelayStatus { + BountyPlaced, + MessageDelivered, + BountyClaimed, +} + +export interface RelayState { + + // Common fields (derived from events) + status: RelayStatus; + messageIdentifier: string; + + // GeneralisedIncentives specific details + bountyPlacedEvent?: BountyPlacedEventDetails; + messageDeliveredEvent?: MessageDeliveredEventDetails; + bountyClaimedEvent?: BountyClaimedEventDetails; + bountyIncreasedEvent?: BountyIncreasedEventDetails; + + // Delivery information + deliveryGasCost?: bigint; +} + +export interface BountyPlacedEventDetails extends TransactionDescription { + fromChainId: string; + incentivesAddress: string; + + maxGasDelivery: bigint; + maxGasAck: bigint; + refundGasTo: string; + priceOfDeliveryGas: bigint; + priceOfAckGas: bigint; + targetDelta: bigint; +} + +export interface MessageDeliveredEventDetails extends TransactionDescription { + toChainId: string; +} + +export interface BountyClaimedEventDetails extends TransactionDescription { +} + +export interface BountyIncreasedEventDetails extends TransactionDescription { + newDeliveryGasPrice: bigint; + newAckGasPrice: bigint; +} + + +export interface RelayStateJSON { + status: RelayStatus; + messageIdentifier: string; + + bountyPlacedEvent?: BountyPlacedEventDetailsJSON; + messageDeliveredEvent?: MessageDeliveredEventDetailsJSON; + bountyClaimedEvent?: BountyClaimedEventDetailsJSON; + bountyIncreasedEvent?: BountyIncreasedEventDetailsJSON; + + deliveryGasCost?: string; +} + +export interface BountyPlacedEventDetailsJSON extends TransactionDescription { + fromChainId: string; + incentivesAddress: string; + + maxGasDelivery: string; + maxGasAck: string; + refundGasTo: string; + priceOfDeliveryGas: string; + priceOfAckGas: string; + targetDelta: string; +} + +export type MessageDeliveredEventDetailsJSON = MessageDeliveredEventDetails; + +export type BountyClaimedEventDetailsJSON = BountyClaimedEventDetails; + +export interface BountyIncreasedEventDetailsJSON extends TransactionDescription { + newDeliveryGasPrice: string; + newAckGasPrice: string; +} + + + +// Controller Types +// ************************************************************************************************ + +export interface PrioritiseMessage { + messageIdentifier: string; + amb: string; + sourceChainId: string; + destinationChainId: string; +}; diff --git a/src/store/types/bounty.enum.ts b/src/store/types/bounty.enum.ts deleted file mode 100644 index 33fff83..0000000 --- a/src/store/types/bounty.enum.ts +++ /dev/null @@ -1,5 +0,0 @@ -export enum BountyStatus { - BountyPlaced, - MessageDelivered, - BountyClaimed, -} diff --git a/src/store/types/store.types.ts b/src/store/types/store.types.ts deleted file mode 100644 index 9ffc41f..0000000 --- a/src/store/types/store.types.ts +++ /dev/null @@ -1,72 +0,0 @@ -import { BountyStatus } from './bounty.enum'; - -export type AmbMessage = { - messageIdentifier: string; - amb: string; - sourceChain: string; - destinationChain: string; - sourceEscrow: string; - payload: string; // This is specifically Generalised Incentive payload. - recoveryContext?: string; // Normally we would listen for the proofs but sometimes we might miss or somethings goes wrong. If this field is set, then it can be used to recover the tx. The encoding scheme depends entirely on the amb. - priority?: boolean; - blockNumber?: number; // The block number as returned by the rpc. - transactionBlockNumber?: number; // The block number as seen by the transaction. - blockHash?: string; - transactionHash?: string; -}; - -export type AmbPayload = { - messageIdentifier: string; - amb: string; - destinationChainId: string; - message: string; - messageCtx?: string; - priority?: boolean; -}; - -export type PrioritiseMessage = { - messageIdentifier: string; - amb: string; - sourceChainId: string; - destinationChainId: string; -}; - -export type Bounty = { - messageIdentifier: string; - fromChainId: string; - toChainId?: string; - maxGasDelivery: number; - maxGasAck: number; - refundGasTo: string; - priceOfDeliveryGas: bigint; - priceOfAckGas: bigint; - targetDelta: bigint; - status: BountyStatus; - sourceAddress: string; - destinationAddress?: string; - finalised?: boolean; - submitTransactionHash?: string; - execTransactionHash?: string; - ackTransactionHash?: string; - deliveryGasCost?: bigint; -}; - -export type BountyJson = { - messageIdentifier: string; - fromChainId?: string; - toChainId?: string; - maxGasDelivery?: number; - maxGasAck?: number; - refundGasTo?: string; - priceOfDeliveryGas?: string; - priceOfAckGas?: string; - targetDelta?: string; - status: BountyStatus; - sourceAddress?: string; - destinationAddress?: string; - finalised?: boolean; - submitTransactionHash?: string; - execTransactionHash?: string; - ackTransactionHash?: string; - deliveryGasCost?: string; -}; diff --git a/src/submitter/queues/eval-queue.ts b/src/submitter/queues/eval-queue.ts index 6aba20e..f3c8ffe 100644 --- a/src/submitter/queues/eval-queue.ts +++ b/src/submitter/queues/eval-queue.ts @@ -2,11 +2,10 @@ import { HandleOrderResult, ProcessingQueue, } from '../../processing-queue/processing-queue'; -import { BountyEvaluationConfig, EvalOrder, SubmitOrder } from '../submitter.types'; +import { Bounty, BountyEvaluationConfig, EvalOrder, SubmitOrder } from '../submitter.types'; import pino from 'pino'; import { Store } from 'src/store/store.lib'; -import { Bounty } from 'src/store/types/store.types'; -import { BountyStatus } from 'src/store/types/bounty.enum'; +import { RelayState, RelayStatus } from 'src/store/store.types'; import { IncentivizedMockEscrow__factory } from 'src/contracts'; import { tryErrorToString } from 'src/common/utils'; import { AbstractProvider, BytesLike, MaxUint256, TransactionRequest, zeroPadValue } from 'ethers6'; @@ -55,8 +54,15 @@ export class EvalQueue extends ProcessingQueue { `Handling submitter eval order.`, ); - const bounty = await this.queryBountyInfo(order.messageIdentifier); - if (bounty === null || bounty === undefined) { + const relayState = await this.store.getRelayState(order.messageIdentifier); + if (relayState == null) { + throw Error( + `Relay state of message not found on evaluation (message ${order.messageIdentifier})`, + ); + } + + const bounty = this.getBountyFromRelayState(relayState); + if (bounty == null) { throw Error( `Bounty of message not found on evaluation (message ${order.messageIdentifier})`, ); @@ -66,7 +72,7 @@ export class EvalQueue extends ProcessingQueue { const isDelivery = bounty.fromChainId != this.chainId; if (isDelivery) { // Source to Destination - if (bounty.status >= BountyStatus.MessageDelivered) { + if (relayState.status >= RelayStatus.MessageDelivered) { this.logger.info( { messageIdentifier: bounty.messageIdentifier }, `Bounty evaluation (source to destination). Message already delivered.`, @@ -75,7 +81,7 @@ export class EvalQueue extends ProcessingQueue { } } else { // Destination to Source - if (bounty.status >= BountyStatus.BountyClaimed) { + if (relayState.status >= RelayStatus.BountyClaimed) { this.logger.info( { messageIdentifier: bounty.messageIdentifier }, `Bounty evaluation (destination to source). Ack already delivered.`, @@ -187,15 +193,6 @@ export class EvalQueue extends ProcessingQueue { } } - /** - * TODO: What is the point of this helper? - */ - private async queryBountyInfo( - messageIdentifier: string, - ): Promise { - return this.store.getBounty(messageIdentifier); - } - private async evaluateRelaySubmission( gasEstimateComponents: GasEstimateComponents, value: bigint, @@ -552,4 +549,31 @@ export class EvalQueue extends ProcessingQueue { return null; } + + private getBountyFromRelayState(relayState: RelayState): Bounty | null { + const bountyPlacedEvent = relayState.bountyPlacedEvent; + if (bountyPlacedEvent == undefined) { + return null; + } + + const priceOfDeliveryGas = relayState.bountyIncreasedEvent?.newDeliveryGasPrice + ?? bountyPlacedEvent.priceOfDeliveryGas; + const priceOfAckGas = relayState.bountyIncreasedEvent?.newAckGasPrice + ?? bountyPlacedEvent.priceOfAckGas; + + return { + messageIdentifier: relayState.messageIdentifier, + + fromChainId: bountyPlacedEvent.fromChainId, + + maxGasDelivery: bountyPlacedEvent.maxGasDelivery, + maxGasAck: bountyPlacedEvent.maxGasAck, + refundGasTo: bountyPlacedEvent.refundGasTo, + priceOfDeliveryGas, + priceOfAckGas, + targetDelta: bountyPlacedEvent.targetDelta, + + deliveryGasCost: relayState.deliveryGasCost, + } + } } diff --git a/src/submitter/queues/submit-queue.ts b/src/submitter/queues/submit-queue.ts index 7841eb3..ea49788 100644 --- a/src/submitter/queues/submit-queue.ts +++ b/src/submitter/queues/submit-queue.ts @@ -146,10 +146,10 @@ export class SubmitQueue extends ProcessingQueue< ): Promise { // Currently the 'ack' submission cost is not registered. if (order.isDelivery) { - void this.store.registerDeliveryCost({ - messageIdentifier: order.messageIdentifier, - deliveryGasCost: gasUsed - }); + void this.store.setDeliveryCost( + order.messageIdentifier, + gasUsed, + ); } } } diff --git a/src/submitter/submitter.types.ts b/src/submitter/submitter.types.ts index 5f78e8c..f25c368 100644 --- a/src/submitter/submitter.types.ts +++ b/src/submitter/submitter.types.ts @@ -32,6 +32,22 @@ export interface PendingOrder { } +export interface Bounty { + messageIdentifier: string; + + fromChainId: string; + + maxGasDelivery: bigint; + maxGasAck: bigint; + refundGasTo: string; + priceOfDeliveryGas: bigint; + priceOfAckGas: bigint; + targetDelta: bigint; + + deliveryGasCost?: bigint; +} + + export interface BountyEvaluationConfig { evaluationRetryInterval: number, maxEvaluationDuration: number, diff --git a/src/submitter/submitter.worker.ts b/src/submitter/submitter.worker.ts index dbfee72..8bfcda9 100644 --- a/src/submitter/submitter.worker.ts +++ b/src/submitter/submitter.worker.ts @@ -2,7 +2,7 @@ import { BytesLike, JsonRpcProvider } from 'ethers6'; import pino, { LoggerOptions } from 'pino'; import { Store } from 'src/store/store.lib'; import { workerData } from 'worker_threads'; -import { AmbPayload } from 'src/store/types/store.types'; +import { AMBProof } from 'src/store/store.types'; import { STATUS_LOG_INTERVAL } from 'src/logger/logger.service'; import { BountyEvaluationConfig, EvalOrder, PendingOrder } from './submitter.types'; import { EvalQueue } from './queues/eval-queue'; @@ -39,7 +39,7 @@ class SubmitterWorker { this.chainId = this.config.chainId; - this.store = new Store(this.chainId); + this.store = new Store(); this.logger = this.initializeLogger( this.chainId, this.config.loggerOptions, @@ -234,37 +234,38 @@ class SubmitterWorker { * Subscribe to the Store to listen for relevant payloads to submit. */ private async listenForOrders(): Promise { - const listenToChannel = Store.getChannel('submit', this.chainId); + const onAMBProofChannel = Store.getOnAMBProofChannel(this.chainId); this.logger.info( - { globalChannel: listenToChannel }, + { channel: onAMBProofChannel }, `Listing for messages to submit.`, ); - await this.store.on(listenToChannel, (event: any) => { - - //TODO verify event format - const message = event as AmbPayload; - - void this.store.getAmb(message.messageIdentifier) - .then(ambMessage => { - if (ambMessage == null) { - this.logger.warn( - { - messageIdentifier: message.messageIdentifier, - }, - `AMB message not found on submit order. Submission evaluation will be less accurate.` - ); - } - - return this.addSubmitOrder( - message.amb, - message.messageIdentifier, - message.message, - message.messageCtx ?? '', - ambMessage?.priority ?? false, // eval priority => undefined = false. - ambMessage?.payload, + await this.store.on(onAMBProofChannel, (event: any) => { + + const ambProof = event as AMBProof; + + void this.store.getAMBMessage( + ambProof.fromChainId, + ambProof.messageIdentifier + ).then(ambMessage => { + if (ambMessage == null) { + this.logger.warn( + { + messageIdentifier: ambProof.messageIdentifier, + }, + `AMB message not found on submit order. Submission evaluation will be less accurate.` ); - }) + } + + return this.addSubmitOrder( + ambProof.amb, + ambProof.messageIdentifier, + ambProof.message, + ambProof.messageCtx ?? '0x', + ambMessage?.priority ?? false, + ambMessage?.incentivesPayload, + ); + }); }); }