diff --git a/madara/crates/client/settlement_client/src/client.rs b/madara/crates/client/settlement_client/src/client.rs index 2320b6da28..4a680365f5 100644 --- a/madara/crates/client/settlement_client/src/client.rs +++ b/madara/crates/client/settlement_client/src/client.rs @@ -118,9 +118,10 @@ pub trait SettlementLayerProvider: Send + Sync { /// /// # Arguments /// * `from_l1_block_n` - Start returning events from this block_n. - /// * `end_l1_block_n` - Stop returning events at this block_n. None to keep continuing. + /// * `min_settlement_blocks` - Min no of settlement blocks after which to return messages. async fn messages_to_l2_stream( &self, from_l1_block_n: u64, + min_settlement_blocks: u64 ) -> Result>, SettlementClientError>; } diff --git a/madara/crates/client/settlement_client/src/eth/event.rs b/madara/crates/client/settlement_client/src/eth/event.rs index b3b71b1b4f..4f49d7712c 100644 --- a/madara/crates/client/settlement_client/src/eth/event.rs +++ b/madara/crates/client/settlement_client/src/eth/event.rs @@ -3,6 +3,7 @@ use crate::eth::error::EthereumClientError; use crate::eth::StarknetCoreContract::LogMessageToL2; use crate::messaging::MessageToL2WithMetadata; use alloy::contract::EventPoller; +use alloy::providers::Provider; use alloy::rpc::types::Log; use alloy::transports::http::{Client, Http}; use futures::Stream; @@ -10,7 +11,10 @@ use mp_convert::{Felt, ToFelt}; use mp_transactions::{L1HandlerTransaction, L1HandlerTransactionWithFee}; use std::iter; use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; // Event conversion impl TryFrom<(LogMessageToL2, Log)> for MessageToL2WithMetadata { @@ -85,6 +89,98 @@ impl Stream for EthereumEventStream { } } +/// A stream wrapper that filters events based on dynamic block confirmation depth +/// +/// This uses a background task to periodically update the latest block number, +/// making the stream implementation much simpler. It's generic and works with any +/// settlement layer provider. +pub struct ConfirmationDepthFilteredStream { + inner: S, + min_confirmations: u64, + latest_block: Arc, + _update_task: tokio::task::JoinHandle<()>, +} + +impl ConfirmationDepthFilteredStream +where + S: Stream> + Unpin, +{ + pub fn new(inner: S, provider: Arc, min_confirmations: u64) -> Self { + let latest_block = Arc::new(AtomicU64::new(0)); + + // Spawn background task to periodically update the latest block number + let latest_block_clone = Arc::clone(&latest_block); + let update_task = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(12)); + loop { + interval.tick().await; + match provider.get_block_number().await { + Ok(block) => { + latest_block_clone.store(block, Ordering::Relaxed); + tracing::trace!("Updated latest L1 block number to {}", block); + } + Err(e) => { + tracing::warn!("Failed to update latest L1 block number: {}", e); + } + } + } + }); + + Self { inner, min_confirmations, latest_block, _update_task: update_task } + } +} + +impl Drop for ConfirmationDepthFilteredStream { + fn drop(&mut self) { + // Abort the background task when the stream is dropped + self._update_task.abort(); + } +} + +impl Stream for ConfirmationDepthFilteredStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Poll the inner stream for the next event + loop { + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Ready(Some(Ok(event))) => { + // Check if this event meets the confirmation depth requirement + let latest = self.latest_block.load(Ordering::Relaxed); + let threshold = latest.saturating_sub(self.min_confirmations); + + if event.l1_block_number <= threshold { + return Poll::Ready(Some(Ok(event))); + } + + // Event doesn't have enough confirmations yet, skip it and continue polling + tracing::debug!( + "Skipping event at block {} (needs {} confirmations, latest L1 block: {})", + event.l1_block_number, + self.min_confirmations, + latest + ); + // Continue the loop to get the next event + } + Poll::Ready(Some(Err(e))) => { + // Pass through errors + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(None) => { + // Stream ended + return Poll::Ready(None); + } + Poll::Pending => { + return Poll::Pending; + } + } + } + } +} + #[cfg(test)] pub mod eth_event_stream_tests { use super::*; diff --git a/madara/crates/client/settlement_client/src/eth/mod.rs b/madara/crates/client/settlement_client/src/eth/mod.rs index d962b96fce..d0d8d7dd6d 100644 --- a/madara/crates/client/settlement_client/src/eth/mod.rs +++ b/madara/crates/client/settlement_client/src/eth/mod.rs @@ -327,10 +327,11 @@ impl SettlementLayerProvider for EthereumClient { async fn messages_to_l2_stream( &self, from_l1_block_n: u64, + min_settlement_blocks: u64 ) -> Result>, SettlementClientError> { let filter = self.l1_core_contract.event_filter::(); let event_stream = - filter.from_block(from_l1_block_n).to_block(BlockNumberOrTag::Finalized).watch().await.map_err( + filter.from_block(from_l1_block_n).to_block(BlockNumberOrTag::Latest).watch().await.map_err( |e| -> SettlementClientError { EthereumClientError::ArchiveRequired(format!( "Could not fetch events, archive node may be required: {}", @@ -340,7 +341,14 @@ impl SettlementLayerProvider for EthereumClient { }, )?; - Ok(EthereumEventStream::new(event_stream).boxed()) + let base_stream = EthereumEventStream::new(event_stream); + let filtered_stream = event::ConfirmationDepthFilteredStream::new( + base_stream, + Arc::clone(&self.provider), + min_settlement_blocks, + ); + + Ok(filtered_stream.boxed()) } } @@ -652,7 +660,7 @@ mod l1_messaging_tests { let worker_handle = { let db = Arc::clone(&db); tokio::spawn(async move { - sync(Arc::new(eth_client), Arc::clone(&db), Default::default(), ServiceContext::new_for_testing()).await + sync(Arc::new(eth_client), Arc::clone(&db), Default::default(), ServiceContext::new_for_testing(), 0).await }) }; diff --git a/madara/crates/client/settlement_client/src/messaging.rs b/madara/crates/client/settlement_client/src/messaging.rs index 2f9bc276f9..89bb18c506 100644 --- a/madara/crates/client/settlement_client/src/messaging.rs +++ b/madara/crates/client/settlement_client/src/messaging.rs @@ -75,9 +75,10 @@ pub async fn sync( backend: Arc, notify_consumer: Arc, mut ctx: ServiceContext, + min_settlement_blocks: u64, ) -> Result<(), SettlementClientError> { // sync inner is cancellation safe. - ctx.run_until_cancelled(sync_inner(settlement_client, backend, notify_consumer)).await.transpose()?; + ctx.run_until_cancelled(sync_inner(settlement_client, backend, notify_consumer, min_settlement_blocks)).await.transpose()?; Ok(()) } @@ -85,6 +86,7 @@ async fn sync_inner( settlement_client: Arc, backend: Arc, notify_consumer: Arc, + min_settlement_blocks: u64 ) -> Result<(), SettlementClientError> { // Note: Reprocessing events. // It's really important to make sure we don't mess up, we really want a strong guarantee we can't, in any circumstance, include an @@ -93,7 +95,7 @@ async fn sync_inner( // We can't make this guarantee here though. As such, we allow ourselves to reprocess events here, re-include them as pending & cie. // We still do *some* checks, but we can't make the full guarantee here. Instead, block production is responsible to make sure // it filters out any messages that are duplicated. - // Thus, it's fine to reprocess some events :) there are caught during process message AND during block production. + // Thus, it's fine to reprocess some events :) they are caught during process message AND during block production. // In fact, we rely on that to restart sequencing on a clean database, or switch from full-node to sequencing. let replay_max_duration = backend.chain_config().l1_messages_replay_max_duration; @@ -115,7 +117,7 @@ async fn sync_inner( tracing::info!("⟠ Starting L1 Messages Syncing from block #{from_l1_block_n}..."); settlement_client - .messages_to_l2_stream(from_l1_block_n) + .messages_to_l2_stream(from_l1_block_n, min_settlement_blocks) .await .map_err(|e| SettlementClientError::StreamProcessing(format!("Failed to create messaging stream: {}", e)))? .map(|message| { @@ -263,7 +265,7 @@ mod messaging_module_tests { client .expect_messages_to_l2_stream() .times(1) - .returning(move |_| Ok(stream::iter(events.clone()).map(Ok).boxed())); + .returning(move |_, _| Ok(stream::iter(events.clone()).map(Ok).boxed())); // nonce 1, is pending, not being cancelled, not consumed in db. => OK mock_l1_handler_tx(&mut client, 1, true, false); @@ -280,7 +282,7 @@ mod messaging_module_tests { let db_backend_clone = db.clone(); // Spawn the sync task in a separate thread - let sync_handle = tokio::spawn(async move { sync(client, db_backend_clone, notify, ctx).await }); + let sync_handle = tokio::spawn(async move { sync(client, db_backend_clone, notify, ctx, 0).await }); // Wait sufficient time for event to be processed tokio::time::sleep(Duration::from_secs(5)).await; @@ -329,9 +331,9 @@ mod messaging_module_tests { let events = vec![mock_event1.clone()]; client .expect_messages_to_l2_stream() - .with(predicate::eq(from_l1_block_n)) + .with(predicate::eq(from_l1_block_n), predicate::always()) .times(1) - .returning(move |_| Ok(stream::iter(events.clone()).map(Ok).boxed())); + .returning(move |_, _| Ok(stream::iter(events.clone()).map(Ok).boxed())); // nonce 1, is pending, not being cancelled, not consumed in db. => OK mock_l1_handler_tx(&mut client, 1, true, false); @@ -347,7 +349,7 @@ mod messaging_module_tests { let db_backend_clone = db.clone(); // Spawn the sync task in a separate thread - let sync_handle = tokio::spawn(async move { sync(client, db_backend_clone, notify, ctx).await }); + let sync_handle = tokio::spawn(async move { sync(client, db_backend_clone, notify, ctx, 0).await }); // Wait sufficient time for event to be processed tokio::time::sleep(Duration::from_secs(5)).await; diff --git a/madara/crates/client/settlement_client/src/starknet/event.rs b/madara/crates/client/settlement_client/src/starknet/event.rs index ca714b0ef9..1a8327113c 100644 --- a/madara/crates/client/settlement_client/src/starknet/event.rs +++ b/madara/crates/client/settlement_client/src/starknet/event.rs @@ -9,7 +9,10 @@ use starknet_core::types::{BlockId, EmittedEvent, EventFilter}; use starknet_providers::{Provider, ProviderError}; use starknet_types_core::felt::Felt; use std::iter; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::Duration; // Starknet event conversion @@ -155,6 +158,100 @@ pub fn watch_events( .try_flatten() } +/// A stream wrapper that filters events based on dynamic block confirmation depth for Starknet +/// +/// This uses a background task to periodically update the latest block number, +/// making the stream implementation much simpler. +pub struct ConfirmationDepthFilteredStream { + inner: S, + min_confirmations: u64, + latest_block: Arc, + _update_task: tokio::task::JoinHandle<()>, + _phantom: std::marker::PhantomData P>, +} + +impl ConfirmationDepthFilteredStream +where + S: Stream> + Unpin, + P: Provider + Send + Sync + 'static, +{ + pub fn new(inner: S, provider: Arc

, min_confirmations: u64) -> Self { + let latest_block = Arc::new(AtomicU64::new(0)); + + // Spawn background task to periodically update the latest block number + let latest_block_clone = Arc::clone(&latest_block); + let update_task = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(6)); // Starknet block time + loop { + interval.tick().await; + match provider.block_number().await { + Ok(block) => { + latest_block_clone.store(block, Ordering::Relaxed); + tracing::trace!("Updated latest Starknet block number to {}", block); + } + Err(e) => { + tracing::warn!("Failed to update latest Starknet block number: {}", e); + } + } + } + }); + + Self { inner, min_confirmations, latest_block, _update_task: update_task, _phantom: std::marker::PhantomData } + } +} + +impl Drop for ConfirmationDepthFilteredStream { + fn drop(&mut self) { + // Abort the background task when the stream is dropped + self._update_task.abort(); + } +} + +impl Stream for ConfirmationDepthFilteredStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + // Poll the inner stream for the next event + loop { + match Pin::new(&mut this.inner).poll_next(cx) { + Poll::Ready(Some(Ok(event))) => { + // Check if this event meets the confirmation depth requirement + let latest = this.latest_block.load(Ordering::Relaxed); + let threshold = latest.saturating_sub(this.min_confirmations); + + if event.l1_block_number <= threshold { + return Poll::Ready(Some(Ok(event))); + } + + // Event doesn't have enough confirmations yet, skip it and continue polling + tracing::debug!( + "Skipping event at block {} (needs {} confirmations, latest Starknet block: {})", + event.l1_block_number, + this.min_confirmations, + latest + ); + // Continue the loop to get the next event + } + Poll::Ready(Some(Err(e))) => { + // Pass through errors + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(None) => { + // Stream ended + return Poll::Ready(None); + } + Poll::Pending => { + return Poll::Pending; + } + } + } + } +} + #[cfg(test)] mod starknet_event_stream_tests { use super::*; diff --git a/madara/crates/client/settlement_client/src/starknet/mod.rs b/madara/crates/client/settlement_client/src/starknet/mod.rs index c3d97c94e9..ebb691a68a 100644 --- a/madara/crates/client/settlement_client/src/starknet/mod.rs +++ b/madara/crates/client/settlement_client/src/starknet/mod.rs @@ -362,8 +362,9 @@ impl SettlementLayerProvider for StarknetClient { async fn messages_to_l2_stream( &self, from_l1_block_n: u64, + min_settlement_blocks: u64 ) -> Result>, SettlementClientError> { - Ok(watch_events( + let base_stream = watch_events( self.provider.clone(), Some(from_l1_block_n), WatchEventFilter { @@ -382,7 +383,15 @@ impl SettlementLayerProvider for StarknetClient { ) .map_err(|e| SettlementClientError::from(StarknetClientError::Provider(format!("Provider error: {e:#}")))) .map(|r| r.and_then(MessageToL2WithMetadata::try_from)) - .boxed()) + .boxed(); // Box it to make it Unpin + + let filtered_stream = event::ConfirmationDepthFilteredStream::new( + base_stream, + self.provider.clone(), + min_settlement_blocks, + ); + + Ok(filtered_stream.boxed()) } } @@ -745,7 +754,7 @@ mod starknet_client_messaging_test { let starknet_client = fixture.starknet_client.clone(); tokio::spawn(async move { - sync(Arc::new(starknet_client), Arc::clone(&db), Default::default(), ServiceContext::new_for_testing()) + sync(Arc::new(starknet_client), Arc::clone(&db), Default::default(), ServiceContext::new_for_testing(), 0) .await .unwrap(); tracing::debug!("messaging worker stopped"); @@ -779,7 +788,7 @@ mod starknet_client_messaging_test { let starknet_client = fixture.starknet_client.clone(); tokio::spawn(async move { - sync(Arc::new(starknet_client), Arc::clone(&db), Default::default(), ServiceContext::new_for_testing()) + sync(Arc::new(starknet_client), Arc::clone(&db), Default::default(), ServiceContext::new_for_testing(), 0) .await }) }; diff --git a/madara/crates/client/settlement_client/src/sync.rs b/madara/crates/client/settlement_client/src/sync.rs index 491441611d..5a1c80cd2a 100644 --- a/madara/crates/client/settlement_client/src/sync.rs +++ b/madara/crates/client/settlement_client/src/sync.rs @@ -9,6 +9,7 @@ use mp_utils::service::ServiceContext; pub struct SyncWorkerConfig { pub gas_provider_config: GasPriceProviderConfig, + pub min_settlement_blocks: u64, pub l1_block_metrics: Arc, pub l1_head_sender: L1HeadSender, } @@ -30,6 +31,7 @@ impl L1ClientImpl { Arc::clone(&self.backend), self.notify_new_message_to_l2.clone(), ctx.clone(), + config.min_settlement_blocks, )); if !config.gas_provider_config.all_is_fixed() { diff --git a/madara/node/src/cli/l1.rs b/madara/node/src/cli/l1.rs index da24f3f222..6df00a8b02 100644 --- a/madara/node/src/cli/l1.rs +++ b/madara/node/src/cli/l1.rs @@ -122,4 +122,9 @@ pub struct L1SyncParams { default_value_t = MadaraSettlementLayer::Eth, )] pub settlement_layer: MadaraSettlementLayer, + + /// Minimum settlement blocks for L1 sync. + /// This is the minimum number of blocks that must be settled on L1 before the node can sync/process the messages. + #[clap(env = "MADARA_MIN_SETTLEMENT_BLOCKS", long, default_value = "10")] + pub min_settlement_blocks: u64, } diff --git a/madara/node/src/service/l1.rs b/madara/node/src/service/l1.rs index ace7057e4f..eb56a3fed2 100644 --- a/madara/node/src/service/l1.rs +++ b/madara/node/src/service/l1.rs @@ -99,6 +99,7 @@ impl L1SyncService { client: Some(client.into()), sync_worker_config: Some(SyncWorkerConfig { gas_provider_config, + min_settlement_blocks: config.min_settlement_blocks, l1_head_sender: sync_config.l1_head_snd, l1_block_metrics: sync_config.l1_block_metrics, }),