1
+ /**
2
+ * This file initializes and manages Layer Zero workers.
3
+ *
4
+ * Inputs:
5
+ * - ConfigService: Service to fetch configuration settings.
6
+ * - MonitorService: Service to monitor and manage workers.
7
+ * - LoggerService: Service to log information and errors.
8
+ * - ChainConfig: Configuration settings for different chains.
9
+ * - CollectorModuleInterface: Interface for the collector module.
10
+ *
11
+ * Outputs:
12
+ * - Initializes and manages Layer Zero worker threads.
13
+ * - Logs status and errors related to the worker threads.
14
+ */
15
+
16
+ import { join } from 'path' ;
17
+ import { Worker , MessagePort } from 'worker_threads' ;
18
+ import { CollectorModuleInterface } from '../collector.controller' ;
19
+ import {
20
+ DEFAULT_GETTER_RETRY_INTERVAL ,
21
+ DEFAULT_GETTER_PROCESSING_INTERVAL ,
22
+ DEFAULT_GETTER_MAX_BLOCKS ,
23
+ } from '../../getter/getter.service' ;
24
+ import {
25
+ LoggerService ,
26
+ STATUS_LOG_INTERVAL ,
27
+ } from '../../logger/logger.service' ;
28
+ import { ConfigService } from '../../config/config.service' ;
29
+ import { ChainConfig } from '../../config/config.types' ;
30
+ import { LoggerOptions } from 'pino' ;
31
+ import { MonitorService } from '../../monitor/monitor.service' ;
32
+
33
+ interface GlobalLayerZeroConfig {
34
+ retryInterval : number ;
35
+ processingInterval : number ;
36
+ maxBlocks : number | null ;
37
+ layerZeroChainIdMap : Record < string , string > ;
38
+ incentivesAddresses : Record < string , string > ;
39
+ }
40
+
41
+ export interface LayerZeroWorkerData {
42
+ chainId : string ;
43
+ rpc : string ;
44
+ resolver : string | null ;
45
+ startingBlock ?: number ;
46
+ stoppingBlock ?: number ;
47
+ retryInterval : number ;
48
+ processingInterval : number ;
49
+ maxBlocks : number | null ;
50
+ layerZeroChainId : string ;
51
+ bridgeAddress : string ;
52
+ incentivesAddress : string ;
53
+ receiverAddress : string ;
54
+ monitorPort : MessagePort ;
55
+ loggerOptions : LoggerOptions ;
56
+ incentivesAddresses : Record < string , string > ;
57
+ layerZeroChainIdMap : Record < string , string > ;
58
+ }
59
+
60
+ /**
61
+ * Loads global configuration settings specific to Layer Zero.
62
+ *
63
+ * @param configService - Service to fetch configuration settings.
64
+ * @returns Global configuration settings for Layer Zero.
65
+ */
66
+ function loadGlobalLayerZeroConfig ( configService : ConfigService ) : GlobalLayerZeroConfig {
67
+
68
+ const getterConfig = configService . globalConfig . getter ;
69
+ const retryInterval = getterConfig . retryInterval ?? DEFAULT_GETTER_RETRY_INTERVAL ;
70
+ const processingInterval = getterConfig . processingInterval ?? DEFAULT_GETTER_PROCESSING_INTERVAL ;
71
+ const maxBlocks = getterConfig . maxBlocks ?? DEFAULT_GETTER_MAX_BLOCKS ;
72
+
73
+ const layerZeroChainIdMap : Record < string , string > = { } ;
74
+ const incentivesAddresses : Record < string , string > = { } ;
75
+
76
+ for ( const [ chainId ] of configService . chainsConfig ) {
77
+ const layerZeroChainId : string | undefined = configService . getAMBConfig < string > (
78
+ 'layer-zero' ,
79
+ 'layerZeroChainId' ,
80
+ chainId . toString ( ) ,
81
+ ) ;
82
+ const incentivesAddress : string | undefined = configService . getAMBConfig < string > (
83
+ 'layer-zero' ,
84
+ 'incentivesAddress' ,
85
+ chainId . toString ( ) ,
86
+ ) ;
87
+
88
+ if ( layerZeroChainId !== undefined ) {
89
+ layerZeroChainIdMap [ layerZeroChainId ] = chainId ;
90
+ if ( incentivesAddress !== undefined ) {
91
+ incentivesAddresses [ chainId ] = incentivesAddress . toLowerCase ( ) ;
92
+ }
93
+ }
94
+ }
95
+
96
+ return {
97
+ retryInterval,
98
+ processingInterval,
99
+ maxBlocks,
100
+ layerZeroChainIdMap,
101
+ incentivesAddresses,
102
+ } ;
103
+ }
104
+
105
+ /**
106
+ * Loads worker data for a specific chain configuration.
107
+ *
108
+ * @param configService - Service to fetch configuration settings.
109
+ * @param monitorService - Service to monitor and manage workers.
110
+ * @param loggerService - Service to log information and errors.
111
+ * @param chainConfig - Configuration settings for a specific chain.
112
+ * @param globalConfig - Global configuration settings for Layer Zero.
113
+ * @returns Worker data for the specific chain or null if configuration is incomplete.
114
+ */
115
+ async function loadWorkerData (
116
+ configService : ConfigService ,
117
+ monitorService : MonitorService ,
118
+ loggerService : LoggerService ,
119
+ chainConfig : ChainConfig ,
120
+ globalConfig : GlobalLayerZeroConfig ,
121
+ ) : Promise < LayerZeroWorkerData | null > {
122
+ const chainId = chainConfig . chainId . toString ( ) ;
123
+ const rpc = chainConfig . rpc ;
124
+
125
+ try {
126
+ const layerZeroChainId = configService . getAMBConfig < string > ( 'layer-zero' , 'layerZeroChainId' , chainId ) ;
127
+ const bridgeAddress = configService . getAMBConfig < string > ( 'layer-zero' , 'bridgeAddress' , chainId ) ;
128
+ const incentivesAddress = configService . getAMBConfig < string > ( 'layer-zero' , 'incentivesAddress' , chainId ) ;
129
+ const receiverAddress = configService . getAMBConfig < string > ( 'layer-zero' , 'receiverAddress' , chainId ) ;
130
+
131
+ if ( layerZeroChainId == null || bridgeAddress == null || incentivesAddress == null || receiverAddress == null ) {
132
+ return null ;
133
+ }
134
+
135
+ const port = await monitorService . attachToMonitor ( chainId ) ;
136
+
137
+ return {
138
+ chainId,
139
+ rpc,
140
+ resolver : chainConfig . resolver ,
141
+ startingBlock : chainConfig . startingBlock ,
142
+ stoppingBlock : chainConfig . stoppingBlock ,
143
+ retryInterval : chainConfig . getter . retryInterval ?? globalConfig . retryInterval ,
144
+ processingInterval : chainConfig . getter . processingInterval ?? globalConfig . processingInterval ,
145
+ maxBlocks : chainConfig . getter . maxBlocks ?? globalConfig . maxBlocks ,
146
+ layerZeroChainId,
147
+ bridgeAddress : bridgeAddress . toLowerCase ( ) ,
148
+ incentivesAddress : incentivesAddress . toLowerCase ( ) ,
149
+ receiverAddress : receiverAddress . toLowerCase ( ) ,
150
+ monitorPort : port ,
151
+ loggerOptions : loggerService . loggerOptions ,
152
+ incentivesAddresses : globalConfig . incentivesAddresses ,
153
+ layerZeroChainIdMap : globalConfig . layerZeroChainIdMap ,
154
+ } ;
155
+ } catch ( error ) {
156
+ loggerService . error ( error , 'Failed to load Layer Zero module: missing configuration.' ) ;
157
+ throw error ;
158
+ }
159
+ }
160
+
161
+ /**
162
+ * Main function for initializing Layer Zero workers.
163
+ *
164
+ * @param moduleInterface - Interface for the collector module.
165
+ */
166
+ export default async ( moduleInterface : CollectorModuleInterface ) => {
167
+ const { configService, monitorService, loggerService } = moduleInterface ;
168
+ const globalLayerZeroConfig = loadGlobalLayerZeroConfig ( configService ) ;
169
+
170
+ const workers : Record < string , Worker | null > = { } ;
171
+ const workersData : LayerZeroWorkerData [ ] = [ ] ;
172
+
173
+ for ( const [ , chainConfig ] of configService . chainsConfig ) {
174
+ const workerData = await loadWorkerData ( configService , monitorService , loggerService , chainConfig , globalLayerZeroConfig ) ;
175
+ if ( workerData ) {
176
+ workersData . push ( workerData ) ;
177
+ }
178
+ }
179
+
180
+ if ( workersData . length === 0 ) {
181
+ loggerService . warn ( 'Skipping Layer Zero worker initialization: no valid Layer Zero chain configs found' ) ;
182
+ return ;
183
+ }
184
+
185
+ initializeWorkers ( workersData , workers , loggerService ) ;
186
+
187
+ setInterval ( ( ) => logStatus ( workers , loggerService ) , STATUS_LOG_INTERVAL ) ;
188
+ } ;
189
+
190
+ /**
191
+ * Initializes workers with the given data and logs errors and exit statuses.
192
+ *
193
+ * @param workersData - Array of worker data to initialize.
194
+ * @param workers - Record to keep track of active workers.
195
+ * @param loggerService - Service to log information and errors.
196
+ */
197
+ function initializeWorkers (
198
+ workersData : LayerZeroWorkerData [ ] ,
199
+ workers : Record < string , Worker | null > ,
200
+ loggerService : LoggerService ,
201
+ ) {
202
+ for ( const workerData of workersData ) {
203
+ const worker = new Worker ( join ( __dirname , 'layer-zero.worker.js' ) , {
204
+ workerData : workerData ,
205
+ transferList : [ workerData . monitorPort ] ,
206
+ } ) ;
207
+ workers [ workerData . chainId ] = worker ;
208
+
209
+ worker . on ( 'error' , ( error ) => loggerService . fatal ( error , 'Error on Layer Zero Worker.' ) ) ;
210
+ worker . on ( 'exit' , ( exitCode ) => {
211
+ workers [ workerData . chainId ] = null ;
212
+ loggerService . info ( { exitCode, chainId : workerData . chainId } , `Layer Zero Worker exited.` ) ;
213
+ } ) ;
214
+ }
215
+ }
216
+
217
+ /**
218
+ * Logs the status of active and inactive workers.
219
+ *
220
+ * @param workers - Record of active and inactive workers.
221
+ * @param loggerService - Service to log information.
222
+ */
223
+ function logStatus ( workers : Record < string , Worker | null > , loggerService : LoggerService ) {
224
+ const activeWorkers = [ ] ;
225
+ const inactiveWorkers = [ ] ;
226
+
227
+ for ( const chainId of Object . keys ( workers ) ) {
228
+ if ( workers [ chainId ] ) {
229
+ activeWorkers . push ( chainId ) ;
230
+ } else {
231
+ inactiveWorkers . push ( chainId ) ;
232
+ }
233
+ }
234
+
235
+ const status = {
236
+ activeWorkers,
237
+ inactiveWorkers,
238
+ } ;
239
+ loggerService . info ( status , 'Layer Zero collector workers status.' ) ;
240
+ }
0 commit comments