-
Notifications
You must be signed in to change notification settings - Fork 76
add min settlement blocks #821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,14 +3,18 @@ use crate::eth::error::EthereumClientError; | |
| use crate::eth::StarknetCoreContract::LogMessageToL2; | ||
| use crate::messaging::MessageToL2WithMetadata; | ||
| use alloy::contract::EventPoller; | ||
| use alloy::providers::Provider; | ||
| use alloy::rpc::types::Log; | ||
| use alloy::transports::http::{Client, Http}; | ||
| use futures::Stream; | ||
| use mp_convert::{Felt, ToFelt}; | ||
| use mp_transactions::{L1HandlerTransaction, L1HandlerTransactionWithFee}; | ||
| use std::iter; | ||
| use std::pin::Pin; | ||
| use std::sync::atomic::{AtomicU64, Ordering}; | ||
| use std::sync::Arc; | ||
| use std::task::{Context, Poll}; | ||
| use std::time::Duration; | ||
|
|
||
| // Event conversion | ||
| impl TryFrom<(LogMessageToL2, Log)> for MessageToL2WithMetadata { | ||
|
|
@@ -85,6 +89,98 @@ impl Stream for EthereumEventStream { | |
| } | ||
| } | ||
|
|
||
| /// A stream wrapper that filters events based on dynamic block confirmation depth | ||
| /// | ||
| /// This uses a background task to periodically update the latest block number, | ||
| /// making the stream implementation much simpler. It's generic and works with any | ||
| /// settlement layer provider. | ||
| pub struct ConfirmationDepthFilteredStream<S> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how can we write a test case for this? |
||
| inner: S, | ||
| min_confirmations: u64, | ||
| latest_block: Arc<AtomicU64>, | ||
| _update_task: tokio::task::JoinHandle<()>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we remove it if not used?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's used. it keeps the latest block updating task in memory so that it's not dropped. we should add a comment @byteZorvin |
||
| } | ||
|
|
||
| impl<S> ConfirmationDepthFilteredStream<S> | ||
| where | ||
| S: Stream<Item = Result<MessageToL2WithMetadata, SettlementClientError>> + Unpin, | ||
| { | ||
| pub fn new(inner: S, provider: Arc<alloy::providers::ReqwestProvider>, min_confirmations: u64) -> Self { | ||
| let latest_block = Arc::new(AtomicU64::new(0)); | ||
|
|
||
| // Spawn background task to periodically update the latest block number | ||
| let latest_block_clone = Arc::clone(&latest_block); | ||
| let update_task = tokio::spawn(async move { | ||
| let mut interval = tokio::time::interval(Duration::from_secs(12)); | ||
| loop { | ||
| interval.tick().await; | ||
| match provider.get_block_number().await { | ||
| Ok(block) => { | ||
| latest_block_clone.store(block, Ordering::Relaxed); | ||
| tracing::trace!("Updated latest L1 block number to {}", block); | ||
| } | ||
| Err(e) => { | ||
| tracing::warn!("Failed to update latest L1 block number: {}", e); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| Self { inner, min_confirmations, latest_block, _update_task: update_task } | ||
| } | ||
| } | ||
|
|
||
| impl<S> Drop for ConfirmationDepthFilteredStream<S> { | ||
| fn drop(&mut self) { | ||
| // Abort the background task when the stream is dropped | ||
| self._update_task.abort(); | ||
| } | ||
| } | ||
|
|
||
| impl<S> Stream for ConfirmationDepthFilteredStream<S> | ||
| where | ||
| S: Stream<Item = Result<MessageToL2WithMetadata, SettlementClientError>> + Unpin, | ||
| { | ||
| type Item = Result<MessageToL2WithMetadata, SettlementClientError>; | ||
|
|
||
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| // Poll the inner stream for the next event | ||
| loop { | ||
| match Pin::new(&mut self.inner).poll_next(cx) { | ||
| Poll::Ready(Some(Ok(event))) => { | ||
| // Check if this event meets the confirmation depth requirement | ||
| let latest = self.latest_block.load(Ordering::Relaxed); | ||
| let threshold = latest.saturating_sub(self.min_confirmations); | ||
|
|
||
| if event.l1_block_number <= threshold { | ||
| return Poll::Ready(Some(Ok(event))); | ||
| } | ||
|
|
||
| // Event doesn't have enough confirmations yet, skip it and continue polling | ||
| tracing::debug!( | ||
| "Skipping event at block {} (needs {} confirmations, latest L1 block: {})", | ||
| event.l1_block_number, | ||
| self.min_confirmations, | ||
| latest | ||
| ); | ||
| // Continue the loop to get the next event | ||
| } | ||
|
Comment on lines
+150
to
+167
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we sure this works? seeems to me that if the depth isn't met then we will not do anything and that event will never come again? |
||
| Poll::Ready(Some(Err(e))) => { | ||
| // Pass through errors | ||
| return Poll::Ready(Some(Err(e))); | ||
| } | ||
| Poll::Ready(None) => { | ||
| // Stream ended | ||
| return Poll::Ready(None); | ||
| } | ||
| Poll::Pending => { | ||
| return Poll::Pending; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| pub mod eth_event_stream_tests { | ||
| use super::*; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,10 @@ use starknet_core::types::{BlockId, EmittedEvent, EventFilter}; | |
| use starknet_providers::{Provider, ProviderError}; | ||
| use starknet_types_core::felt::Felt; | ||
| use std::iter; | ||
| use std::pin::Pin; | ||
| use std::sync::atomic::{AtomicU64, Ordering}; | ||
| use std::sync::Arc; | ||
| use std::task::{Context, Poll}; | ||
| use std::time::Duration; | ||
|
|
||
| // Starknet event conversion | ||
|
|
@@ -155,6 +158,100 @@ pub fn watch_events( | |
| .try_flatten() | ||
| } | ||
|
|
||
| /// A stream wrapper that filters events based on dynamic block confirmation depth for Starknet | ||
| /// | ||
| /// This uses a background task to periodically update the latest block number, | ||
| /// making the stream implementation much simpler. | ||
| pub struct ConfirmationDepthFilteredStream<S, P> { | ||
| inner: S, | ||
| min_confirmations: u64, | ||
| latest_block: Arc<AtomicU64>, | ||
| _update_task: tokio::task::JoinHandle<()>, | ||
| _phantom: std::marker::PhantomData<fn() -> P>, | ||
| } | ||
|
Comment on lines
+165
to
+171
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we also have it in ethereum settlement client? why is it duplicated?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we create a single struct and a trait and implement that for different clients (if we need to)? |
||
|
|
||
| impl<S, P> ConfirmationDepthFilteredStream<S, P> | ||
| where | ||
| S: Stream<Item = Result<MessageToL2WithMetadata, SettlementClientError>> + Unpin, | ||
| P: Provider + Send + Sync + 'static, | ||
| { | ||
| pub fn new(inner: S, provider: Arc<P>, min_confirmations: u64) -> Self { | ||
| let latest_block = Arc::new(AtomicU64::new(0)); | ||
|
|
||
| // Spawn background task to periodically update the latest block number | ||
| let latest_block_clone = Arc::clone(&latest_block); | ||
| let update_task = tokio::spawn(async move { | ||
| let mut interval = tokio::time::interval(Duration::from_secs(6)); // Starknet block time | ||
| loop { | ||
| interval.tick().await; | ||
| match provider.block_number().await { | ||
| Ok(block) => { | ||
| latest_block_clone.store(block, Ordering::Relaxed); | ||
| tracing::trace!("Updated latest Starknet block number to {}", block); | ||
| } | ||
| Err(e) => { | ||
| tracing::warn!("Failed to update latest Starknet block number: {}", e); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| Self { inner, min_confirmations, latest_block, _update_task: update_task, _phantom: std::marker::PhantomData } | ||
| } | ||
| } | ||
|
|
||
| impl<S, P> Drop for ConfirmationDepthFilteredStream<S, P> { | ||
| fn drop(&mut self) { | ||
| // Abort the background task when the stream is dropped | ||
| self._update_task.abort(); | ||
| } | ||
| } | ||
|
|
||
| impl<S, P> Stream for ConfirmationDepthFilteredStream<S, P> | ||
| where | ||
| S: Stream<Item = Result<MessageToL2WithMetadata, SettlementClientError>> + Unpin, | ||
| { | ||
| type Item = Result<MessageToL2WithMetadata, SettlementClientError>; | ||
|
|
||
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| let this = self.get_mut(); | ||
| // Poll the inner stream for the next event | ||
| loop { | ||
| match Pin::new(&mut this.inner).poll_next(cx) { | ||
| Poll::Ready(Some(Ok(event))) => { | ||
| // Check if this event meets the confirmation depth requirement | ||
| let latest = this.latest_block.load(Ordering::Relaxed); | ||
| let threshold = latest.saturating_sub(this.min_confirmations); | ||
|
|
||
| if event.l1_block_number <= threshold { | ||
| return Poll::Ready(Some(Ok(event))); | ||
| } | ||
|
|
||
| // Event doesn't have enough confirmations yet, skip it and continue polling | ||
| tracing::debug!( | ||
| "Skipping event at block {} (needs {} confirmations, latest Starknet block: {})", | ||
| event.l1_block_number, | ||
| this.min_confirmations, | ||
| latest | ||
| ); | ||
| // Continue the loop to get the next event | ||
| } | ||
| Poll::Ready(Some(Err(e))) => { | ||
| // Pass through errors | ||
| return Poll::Ready(Some(Err(e))); | ||
| } | ||
| Poll::Ready(None) => { | ||
| // Stream ended | ||
| return Poll::Ready(None); | ||
| } | ||
| Poll::Pending => { | ||
| return Poll::Pending; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod starknet_event_stream_tests { | ||
| use super::*; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it can be used with any settlement client, why are we writing it again for starknet client below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, we should have one fileter and make it work for both