@@ -179,9 +179,9 @@ impl SettlementLayerProvider for EthereumClient {
179179 logs. into_iter ( ) . rev ( ) . map ( |log| log. log_decode :: < StarknetCoreContract :: LogStateUpdate > ( ) ) . next ( ) ;
180180
181181 match latest_logs {
182- Some ( Ok ( log) ) => log
183- . block_number
184- . ok_or_else ( || -> SettlementClientError { EthereumClientError :: MissingField ( "block_number" ) . into ( ) } ) ,
182+ Some ( Ok ( log) ) => log. block_number . ok_or_else ( || -> SettlementClientError {
183+ EthereumClientError :: MissingField ( " block_number" ) . into ( )
184+ } ) ,
185185 Some ( Err ( e) ) => Err ( SettlementClientError :: Ethereum ( EthereumClientError :: Contract ( e. to_string ( ) ) ) ) ,
186186 None => Err ( SettlementClientError :: Ethereum ( EthereumClientError :: EventProcessing {
187187 message : format ! ( "no LogStateUpdate event found in block range [None, {}]" , latest_block) ,
@@ -263,18 +263,20 @@ impl SettlementLayerProvider for EthereumClient {
263263 // Try to create the event stream with retry logic
264264 let event_filter = self . l1_core_contract . event_filter :: < StarknetCoreContract :: LogStateUpdate > ( ) ;
265265
266- let event_stream_result = ctx. run_until_cancelled ( async {
267- self . retry_l1_call ( "watch_events" , || async {
268- event_filter. watch ( ) . await . map_err ( |e| -> SettlementClientError {
269- EthereumClientError :: EventStream { message : format ! ( "Failed to watch events: {}" , e) } . into ( )
266+ let event_stream_result = ctx
267+ . run_until_cancelled ( async {
268+ self . retry_l1_call ( "watch_events" , || async {
269+ event_filter. watch ( ) . await . map_err ( |e| -> SettlementClientError {
270+ EthereumClientError :: EventStream { message : format ! ( "Failed to watch events: {}" , e) }
271+ . into ( )
272+ } )
270273 } )
271- } ) . await
272- } ) . await ;
274+ . await
275+ } )
276+ . await ;
273277
274278 let mut event_stream = match event_stream_result {
275- Some ( Ok ( stream) ) => {
276- stream. into_stream ( )
277- } ,
279+ Some ( Ok ( stream) ) => stream. into_stream ( ) ,
278280 Some ( Err ( e) ) => {
279281 // This shouldn't happen since retry_l1_call has infinite retry,
280282 // but handle it just in case
@@ -474,18 +476,20 @@ impl SettlementLayerProvider for EthereumClient {
474476 ) -> Result < BoxStream < ' static , Result < MessageToL2WithMetadata , SettlementClientError > > , SettlementClientError > {
475477 // Wrap the watch call with retry logic to handle L1 being down
476478 // Note: We need to recreate the filter inside the closure to avoid move issues
477- let event_stream = self . retry_l1_call ( "watch_message_events" , || async {
478- let filter = self . l1_core_contract . event_filter :: < LogMessageToL2 > ( ) ;
479- filter. from_block ( from_l1_block_n) . to_block ( BlockNumberOrTag :: Finalized ) . watch ( ) . await . map_err (
480- |e| -> SettlementClientError {
481- EthereumClientError :: ArchiveRequired ( format ! (
482- "Could not fetch events, archive node may be required: {}" ,
483- e
484- ) )
485- . into ( )
486- } ,
487- )
488- } ) . await ?;
479+ let event_stream = self
480+ . retry_l1_call ( "watch_message_events" , || async {
481+ let filter = self . l1_core_contract . event_filter :: < LogMessageToL2 > ( ) ;
482+ filter. from_block ( from_l1_block_n) . to_block ( BlockNumberOrTag :: Finalized ) . watch ( ) . await . map_err (
483+ |e| -> SettlementClientError {
484+ EthereumClientError :: ArchiveRequired ( format ! (
485+ "Could not fetch events, archive node may be required: {}" ,
486+ e
487+ ) )
488+ . into ( )
489+ } ,
490+ )
491+ } )
492+ . await ?;
489493
490494 Ok ( EthereumEventStream :: new ( event_stream) . boxed ( ) )
491495 }
@@ -645,14 +649,14 @@ mod l1_messaging_tests {
645649 transports:: http:: { Client , Http } ,
646650 } ;
647651 use mc_db:: MadaraBackend ;
648- use mp_resilience:: ConnectionHealth ;
649- use tokio:: sync:: RwLock ;
650652 use mp_chain_config:: ChainConfig ;
653+ use mp_resilience:: ConnectionHealth ;
651654 use mp_transactions:: { L1HandlerTransaction , L1HandlerTransactionWithFee } ;
652655 use mp_utils:: service:: ServiceContext ;
653656 use rstest:: * ;
654657 use starknet_types_core:: felt:: Felt ;
655658 use std:: { sync:: Arc , time:: Duration } ;
659+ use tokio:: sync:: RwLock ;
656660 use tracing_test:: traced_test;
657661 use url:: Url ;
658662
0 commit comments