Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion madara/crates/client/settlement_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ pub trait SettlementLayerProvider: Send + Sync {
///
/// # Arguments
/// * `from_l1_block_n` - Start returning events from this block_n.
/// * `end_l1_block_n` - Stop returning events at this block_n. None to keep continuing.
/// * `min_settlement_blocks` - Min no of settlement blocks after which to return messages.
async fn messages_to_l2_stream(
&self,
from_l1_block_n: u64,
min_settlement_blocks: u64
) -> Result<BoxStream<'static, Result<MessageToL2WithMetadata, SettlementClientError>>, SettlementClientError>;
}
96 changes: 96 additions & 0 deletions madara/crates/client/settlement_client/src/eth/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ use crate::eth::error::EthereumClientError;
use crate::eth::StarknetCoreContract::LogMessageToL2;
use crate::messaging::MessageToL2WithMetadata;
use alloy::contract::EventPoller;
use alloy::providers::Provider;
use alloy::rpc::types::Log;
use alloy::transports::http::{Client, Http};
use futures::Stream;
use mp_convert::{Felt, ToFelt};
use mp_transactions::{L1HandlerTransaction, L1HandlerTransactionWithFee};
use std::iter;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

// Event conversion
impl TryFrom<(LogMessageToL2, Log)> for MessageToL2WithMetadata {
Expand Down Expand Up @@ -85,6 +89,98 @@ impl Stream for EthereumEventStream {
}
}

/// A stream wrapper that filters events based on dynamic block confirmation depth
///
/// This uses a background task to periodically update the latest block number,
/// making the stream implementation much simpler. It's generic and works with any
/// settlement layer provider.
Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it can be used with any settlement client, why are we writing it again for starknet client below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, we should have one fileter and make it work for both

pub struct ConfirmationDepthFilteredStream<S> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can we write a test case for this?

inner: S,
min_confirmations: u64,
latest_block: Arc<AtomicU64>,
_update_task: tokio::task::JoinHandle<()>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove it if not used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's used. it keeps the latest block updating task in memory so that it's not dropped. we should add a comment @byteZorvin

}

impl<S> ConfirmationDepthFilteredStream<S>
where
S: Stream<Item = Result<MessageToL2WithMetadata, SettlementClientError>> + Unpin,
{
pub fn new(inner: S, provider: Arc<alloy::providers::ReqwestProvider>, min_confirmations: u64) -> Self {
let latest_block = Arc::new(AtomicU64::new(0));

// Spawn background task to periodically update the latest block number
let latest_block_clone = Arc::clone(&latest_block);
let update_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(12));
loop {
interval.tick().await;
match provider.get_block_number().await {
Ok(block) => {
latest_block_clone.store(block, Ordering::Relaxed);
tracing::trace!("Updated latest L1 block number to {}", block);
}
Err(e) => {
tracing::warn!("Failed to update latest L1 block number: {}", e);
}
}
}
});

Self { inner, min_confirmations, latest_block, _update_task: update_task }
}
}

impl<S> Drop for ConfirmationDepthFilteredStream<S> {
fn drop(&mut self) {
// Abort the background task when the stream is dropped
self._update_task.abort();
}
}

impl<S> Stream for ConfirmationDepthFilteredStream<S>
where
S: Stream<Item = Result<MessageToL2WithMetadata, SettlementClientError>> + Unpin,
{
type Item = Result<MessageToL2WithMetadata, SettlementClientError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Poll the inner stream for the next event
loop {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(event))) => {
// Check if this event meets the confirmation depth requirement
let latest = self.latest_block.load(Ordering::Relaxed);
let threshold = latest.saturating_sub(self.min_confirmations);

if event.l1_block_number <= threshold {
return Poll::Ready(Some(Ok(event)));
}

// Event doesn't have enough confirmations yet, skip it and continue polling
tracing::debug!(
"Skipping event at block {} (needs {} confirmations, latest L1 block: {})",
event.l1_block_number,
self.min_confirmations,
latest
);
// Continue the loop to get the next event
}
Comment on lines +150 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure this works? seeems to me that if the depth isn't met then we will not do anything and that event will never come again?

Poll::Ready(Some(Err(e))) => {
// Pass through errors
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
// Stream ended
return Poll::Ready(None);
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
}

#[cfg(test)]
pub mod eth_event_stream_tests {
use super::*;
Expand Down
14 changes: 11 additions & 3 deletions madara/crates/client/settlement_client/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,11 @@ impl SettlementLayerProvider for EthereumClient {
async fn messages_to_l2_stream(
&self,
from_l1_block_n: u64,
min_settlement_blocks: u64
) -> Result<BoxStream<'static, Result<MessageToL2WithMetadata, SettlementClientError>>, SettlementClientError> {
let filter = self.l1_core_contract.event_filter::<LogMessageToL2>();
let event_stream =
filter.from_block(from_l1_block_n).to_block(BlockNumberOrTag::Finalized).watch().await.map_err(
filter.from_block(from_l1_block_n).to_block(BlockNumberOrTag::Latest).watch().await.map_err(
|e| -> SettlementClientError {
EthereumClientError::ArchiveRequired(format!(
"Could not fetch events, archive node may be required: {}",
Expand All @@ -340,7 +341,14 @@ impl SettlementLayerProvider for EthereumClient {
},
)?;

Ok(EthereumEventStream::new(event_stream).boxed())
let base_stream = EthereumEventStream::new(event_stream);
let filtered_stream = event::ConfirmationDepthFilteredStream::new(
base_stream,
Arc::clone(&self.provider),
min_settlement_blocks,
);

Ok(filtered_stream.boxed())
}
}

Expand Down Expand Up @@ -652,7 +660,7 @@ mod l1_messaging_tests {
let worker_handle = {
let db = Arc::clone(&db);
tokio::spawn(async move {
sync(Arc::new(eth_client), Arc::clone(&db), Default::default(), ServiceContext::new_for_testing()).await
sync(Arc::new(eth_client), Arc::clone(&db), Default::default(), ServiceContext::new_for_testing(), 0).await
})
};

Expand Down
18 changes: 10 additions & 8 deletions madara/crates/client/settlement_client/src/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,18 @@ pub async fn sync(
backend: Arc<MadaraBackend>,
notify_consumer: Arc<Notify>,
mut ctx: ServiceContext,
min_settlement_blocks: u64,
) -> Result<(), SettlementClientError> {
// sync inner is cancellation safe.
ctx.run_until_cancelled(sync_inner(settlement_client, backend, notify_consumer)).await.transpose()?;
ctx.run_until_cancelled(sync_inner(settlement_client, backend, notify_consumer, min_settlement_blocks)).await.transpose()?;
Ok(())
}

async fn sync_inner(
settlement_client: Arc<dyn SettlementLayerProvider>,
backend: Arc<MadaraBackend>,
notify_consumer: Arc<Notify>,
min_settlement_blocks: u64
) -> Result<(), SettlementClientError> {
// Note: Reprocessing events.
// It's really important to make sure we don't mess up, we really want a strong guarantee we can't, in any circumstance, include an
Expand All @@ -93,7 +95,7 @@ async fn sync_inner(
// We can't make this guarantee here though. As such, we allow ourselves to reprocess events here, re-include them as pending & cie.
// We still do *some* checks, but we can't make the full guarantee here. Instead, block production is responsible to make sure
// it filters out any messages that are duplicated.
// Thus, it's fine to reprocess some events :) there are caught during process message AND during block production.
// Thus, it's fine to reprocess some events :) they are caught during process message AND during block production.
// In fact, we rely on that to restart sequencing on a clean database, or switch from full-node to sequencing.

let replay_max_duration = backend.chain_config().l1_messages_replay_max_duration;
Expand All @@ -115,7 +117,7 @@ async fn sync_inner(
tracing::info!("⟠ Starting L1 Messages Syncing from block #{from_l1_block_n}...");

settlement_client
.messages_to_l2_stream(from_l1_block_n)
.messages_to_l2_stream(from_l1_block_n, min_settlement_blocks)
.await
.map_err(|e| SettlementClientError::StreamProcessing(format!("Failed to create messaging stream: {}", e)))?
.map(|message| {
Expand Down Expand Up @@ -263,7 +265,7 @@ mod messaging_module_tests {
client
.expect_messages_to_l2_stream()
.times(1)
.returning(move |_| Ok(stream::iter(events.clone()).map(Ok).boxed()));
.returning(move |_, _| Ok(stream::iter(events.clone()).map(Ok).boxed()));

// nonce 1, is pending, not being cancelled, not consumed in db. => OK
mock_l1_handler_tx(&mut client, 1, true, false);
Expand All @@ -280,7 +282,7 @@ mod messaging_module_tests {
let db_backend_clone = db.clone();

// Spawn the sync task in a separate thread
let sync_handle = tokio::spawn(async move { sync(client, db_backend_clone, notify, ctx).await });
let sync_handle = tokio::spawn(async move { sync(client, db_backend_clone, notify, ctx, 0).await });

// Wait sufficient time for event to be processed
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down Expand Up @@ -329,9 +331,9 @@ mod messaging_module_tests {
let events = vec![mock_event1.clone()];
client
.expect_messages_to_l2_stream()
.with(predicate::eq(from_l1_block_n))
.with(predicate::eq(from_l1_block_n), predicate::always())
.times(1)
.returning(move |_| Ok(stream::iter(events.clone()).map(Ok).boxed()));
.returning(move |_, _| Ok(stream::iter(events.clone()).map(Ok).boxed()));

// nonce 1, is pending, not being cancelled, not consumed in db. => OK
mock_l1_handler_tx(&mut client, 1, true, false);
Expand All @@ -347,7 +349,7 @@ mod messaging_module_tests {
let db_backend_clone = db.clone();

// Spawn the sync task in a separate thread
let sync_handle = tokio::spawn(async move { sync(client, db_backend_clone, notify, ctx).await });
let sync_handle = tokio::spawn(async move { sync(client, db_backend_clone, notify, ctx, 0).await });

// Wait sufficient time for event to be processed
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down
97 changes: 97 additions & 0 deletions madara/crates/client/settlement_client/src/starknet/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use starknet_core::types::{BlockId, EmittedEvent, EventFilter};
use starknet_providers::{Provider, ProviderError};
use starknet_types_core::felt::Felt;
use std::iter;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

// Starknet event conversion
Expand Down Expand Up @@ -155,6 +158,100 @@ pub fn watch_events(
.try_flatten()
}

/// A stream wrapper that filters events based on dynamic block confirmation depth for Starknet
///
/// This uses a background task to periodically update the latest block number,
/// making the stream implementation much simpler.
pub struct ConfirmationDepthFilteredStream<S, P> {
inner: S,
min_confirmations: u64,
latest_block: Arc<AtomicU64>,
_update_task: tokio::task::JoinHandle<()>,
_phantom: std::marker::PhantomData<fn() -> P>,
}
Comment on lines +165 to +171
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also have it in ethereum settlement client? why is it duplicated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create a single struct and a trait and implement that for different clients (if we need to)?


impl<S, P> ConfirmationDepthFilteredStream<S, P>
where
S: Stream<Item = Result<MessageToL2WithMetadata, SettlementClientError>> + Unpin,
P: Provider + Send + Sync + 'static,
{
pub fn new(inner: S, provider: Arc<P>, min_confirmations: u64) -> Self {
let latest_block = Arc::new(AtomicU64::new(0));

// Spawn background task to periodically update the latest block number
let latest_block_clone = Arc::clone(&latest_block);
let update_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(6)); // Starknet block time
loop {
interval.tick().await;
match provider.block_number().await {
Ok(block) => {
latest_block_clone.store(block, Ordering::Relaxed);
tracing::trace!("Updated latest Starknet block number to {}", block);
}
Err(e) => {
tracing::warn!("Failed to update latest Starknet block number: {}", e);
}
}
}
});

Self { inner, min_confirmations, latest_block, _update_task: update_task, _phantom: std::marker::PhantomData }
}
}

impl<S, P> Drop for ConfirmationDepthFilteredStream<S, P> {
fn drop(&mut self) {
// Abort the background task when the stream is dropped
self._update_task.abort();
}
}

impl<S, P> Stream for ConfirmationDepthFilteredStream<S, P>
where
S: Stream<Item = Result<MessageToL2WithMetadata, SettlementClientError>> + Unpin,
{
type Item = Result<MessageToL2WithMetadata, SettlementClientError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// Poll the inner stream for the next event
loop {
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(Ok(event))) => {
// Check if this event meets the confirmation depth requirement
let latest = this.latest_block.load(Ordering::Relaxed);
let threshold = latest.saturating_sub(this.min_confirmations);

if event.l1_block_number <= threshold {
return Poll::Ready(Some(Ok(event)));
}

// Event doesn't have enough confirmations yet, skip it and continue polling
tracing::debug!(
"Skipping event at block {} (needs {} confirmations, latest Starknet block: {})",
event.l1_block_number,
this.min_confirmations,
latest
);
// Continue the loop to get the next event
}
Poll::Ready(Some(Err(e))) => {
// Pass through errors
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
// Stream ended
return Poll::Ready(None);
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
}

#[cfg(test)]
mod starknet_event_stream_tests {
use super::*;
Expand Down
Loading
Loading