diff --git a/.gitignore b/.gitignore index 45c389f339..f0120d3600 100644 --- a/.gitignore +++ b/.gitignore @@ -110,3 +110,6 @@ s3 sequencer_venv/ sequencer_requirements.txt + +# Documentations of AI code gen chats +AI_docs diff --git a/.markdownlintignore b/.markdownlintignore index 7e3ab86bef..f1ed6b6e5d 100644 --- a/.markdownlintignore +++ b/.markdownlintignore @@ -2,3 +2,4 @@ node_modules/**/*.md **/target/**/*.md docs/**/*.md +AI_docs/*.md diff --git a/madara/Cargo.lock b/madara/Cargo.lock index 3ad301b02d..57ed239093 100644 --- a/madara/Cargo.lock +++ b/madara/Cargo.lock @@ -6530,6 +6530,7 @@ dependencies = [ "mp-gateway", "mp-oracle", "mp-receipt", + "mp-resilience", "mp-rpc", "mp-transactions", "mp-utils", @@ -6875,6 +6876,7 @@ dependencies = [ "futures", "http 1.3.1", "http-body-util", + "httpmock", "hyper 1.6.0", "hyper-tls", "hyper-util", @@ -6882,6 +6884,7 @@ dependencies = [ "mp-block", "mp-class", "mp-gateway", + "mp-resilience", "mp-rpc", "mp-transactions", "reqwest", @@ -7039,6 +7042,7 @@ dependencies = [ "mp-class", "mp-convert", "mp-oracle", + "mp-resilience", "mp-transactions", "mp-utils", "opentelemetry", @@ -7498,6 +7502,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "mp-resilience" +version = "0.8.0" +dependencies = [ + "rstest 0.18.2", + "tokio", + "tracing", +] + [[package]] name = "mp-rpc" version = "0.8.0" diff --git a/madara/Cargo.toml b/madara/Cargo.toml index a6fa09ac71..c1eb410107 100644 --- a/madara/Cargo.toml +++ b/madara/Cargo.toml @@ -23,6 +23,7 @@ members = [ "crates/primitives/gateway", "crates/primitives/rpc", "crates/primitives/receipt", + "crates/primitives/resilience", "crates/primitives/state_update", "crates/primitives/chain_config", "crates/primitives/utils", @@ -94,6 +95,7 @@ mp-state-update = { path = "crates/primitives/state_update", default-features = mp-utils = { path = "crates/primitives/utils", default-features = false } mp-chain-config = { path = "crates/primitives/chain_config", default-features = false } mp-oracle = { path = "crates/primitives/oracle", default-features = false } +mp-resilience = { path = "crates/primitives/resilience", default-features = false } # Madara client mc-analytics = { path = "crates/client/analytics" } diff --git a/madara/crates/client/gateway/client/Cargo.toml b/madara/crates/client/gateway/client/Cargo.toml index d6d2fc85c3..92910dd99a 100644 --- a/madara/crates/client/gateway/client/Cargo.toml +++ b/madara/crates/client/gateway/client/Cargo.toml @@ -25,6 +25,7 @@ mc-submit-tx.workspace = true mp-block.workspace = true mp-class.workspace = true mp-gateway.workspace = true +mp-resilience.workspace = true mp-rpc.workspace = true mp-transactions.workspace = true @@ -55,3 +56,4 @@ url.workspace = true rstest.workspace = true flate2.workspace = true reqwest = { workspace = true, features = ["json"] } +httpmock.workspace = true diff --git a/madara/crates/client/gateway/client/src/builder.rs b/madara/crates/client/gateway/client/src/builder.rs index 6dc16b58ea..3bc88599f3 100644 --- a/madara/crates/client/gateway/client/src/builder.rs +++ b/madara/crates/client/gateway/client/src/builder.rs @@ -28,13 +28,24 @@ type BodyTy = Full; type HttpsClient = Client, BodyTy>; type TimeoutRetryClient = Retry>; pub type PausedClient = PauseLayerMiddleware; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct GatewayProvider { pub(crate) client: PausedClient, pub(crate) headers: HeaderMap, pub(crate) gateway_url: Url, pub(crate) feeder_gateway_url: Url, pub(crate) madara_specific_url: Option, + pub(crate) health: Arc>, +} + +impl std::fmt::Debug for GatewayProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GatewayProvider") + .field("gateway_url", &self.gateway_url) + .field("feeder_gateway_url", &self.feeder_gateway_url) + .field("madara_specific_url", &self.madara_specific_url) + .finish() + } } impl GatewayProvider { @@ -64,7 +75,19 @@ impl GatewayProvider { let retry_layer = Retry::new(retry_policy, timeout_layer); let client = PauseLayerMiddleware::new(retry_layer, Arc::clone(&pause_until)); - Self { client, gateway_url, feeder_gateway_url, madara_specific_url: None, headers: HeaderMap::new() } + Self { + client, + gateway_url, + feeder_gateway_url, + madara_specific_url: None, + headers: HeaderMap::new(), + health: Arc::new(RwLock::new(crate::health::GatewayHealth::new("Gateway"))), + } + } + + /// Get a reference to the health tracker for this gateway + pub fn health(&self) -> Arc> { + Arc::clone(&self.health) } pub fn with_header(mut self, name: HeaderName, value: HeaderValue) -> Self { diff --git a/madara/crates/client/gateway/client/src/health.rs b/madara/crates/client/gateway/client/src/health.rs new file mode 100644 index 0000000000..aa1232e002 --- /dev/null +++ b/madara/crates/client/gateway/client/src/health.rs @@ -0,0 +1,36 @@ +/// Gateway Health Tracking +/// +/// This module provides gateway-specific health monitoring by wrapping the generic +/// mp-resilience ConnectionHealth with gateway-specific context. +use mp_resilience::ConnectionHealth; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Gateway health tracker (wraps generic ConnectionHealth) +pub type GatewayHealth = ConnectionHealth; + +/// Start the background health monitor task for the gateway +/// +/// This spawns a tokio task that periodically logs gateway health status. +/// The task will run until the health Arc is dropped or the program exits. +/// +/// # Arguments +/// * `health` - Arc to the GatewayHealth instance to monitor +pub fn start_gateway_health_monitor(health: Arc>) { + mp_resilience::start_health_monitor(health); +} + +// Re-export HealthState for convenience +pub use mp_resilience::HealthState; + +#[cfg(test)] +mod tests { + use super::*; + use mp_resilience::HealthState; + + #[test] + fn test_gateway_health_creation() { + let health = GatewayHealth::new("Gateway"); + assert!(matches!(health.state(), HealthState::Healthy)); + } +} diff --git a/madara/crates/client/gateway/client/src/lib.rs b/madara/crates/client/gateway/client/src/lib.rs index 0823ab38b0..c3b31ac7d9 100644 --- a/madara/crates/client/gateway/client/src/lib.rs +++ b/madara/crates/client/gateway/client/src/lib.rs @@ -288,10 +288,17 @@ //! [invoke transactions]: mp_gateway::user_transaction::UserTransaction::InvokeFunction mod builder; +mod health; mod methods; mod request_builder; +mod retry; mod submit_tx; +#[cfg(test)] +mod tests; + pub use mp_rpc::v0_7_1::{BlockId, BlockTag}; pub use builder::GatewayProvider; +pub use health::{start_gateway_health_monitor, GatewayHealth, HealthState}; +pub use retry::{RetryConfig, RetryPhase, RetryState}; diff --git a/madara/crates/client/gateway/client/src/methods.rs b/madara/crates/client/gateway/client/src/methods.rs index 571d26076f..7ed2768493 100644 --- a/madara/crates/client/gateway/client/src/methods.rs +++ b/madara/crates/client/gateway/client/src/methods.rs @@ -1,4 +1,5 @@ use super::{builder::GatewayProvider, request_builder::RequestBuilder}; +use crate::retry::{GatewayRetryState, RetryConfig}; use blockifier::bouncer::BouncerWeights; use mp_class::{ContractClass, FlattenedSierraClass, LegacyContractClass}; use mp_gateway::block::ProviderBlockPreConfirmed; @@ -19,115 +20,178 @@ use serde_json::Value; use starknet_types_core::felt::Felt; use std::{borrow::Cow, sync::Arc}; -/// Maximum number of retry attempts for failed API requests. -/// When an API request fails due to transient errors (such as network issues, -/// rate limits, or temporary service unavailability), the client will -/// automatically retry the request up to this many times before raising an -/// exception. -/// Retries use exponential backoff to avoid overwhelming the service -const MAX_RETRIES: usize = 5; -const BASE_DELAY_MS: u64 = 100; -const BACKOFF_BASE: u32 = 2; - impl GatewayProvider { - /// Generic retry mechanism for GET requests - async fn retry_get(&self, request_fn: F) -> Result + /// Hybrid retry mechanism for GET requests with phase-based backoff. + /// + /// This implements a sophisticated retry strategy that adapts to different failure scenarios: + /// - Phase 1 (0-5 min): Aggressive retry every 2 seconds for quick recovery + /// - Phase 2 (5-30 min): Exponential backoff for prolonged outages + /// - Phase 3 (30+ min): Steady polling at max backoff interval + /// + /// The strategy considers error types and uses clean, emoji-prefixed logging. + async fn retry_get(&self, request_fn: F, operation: &str) -> Result where T: DeserializeOwned, F: Fn() -> Fut, Fut: std::future::Future>, { - let mut last_error = None; + let config = RetryConfig::default(); + let mut state = GatewayRetryState::new(config.clone()); - for attempt in 0..MAX_RETRIES { + loop { match request_fn().await { - Ok(result) => return Ok(result), + Ok(result) => { + // Report success to health tracker + self.health.write().await.report_success(); + return Ok(result); + } Err(e) => { - if attempt < MAX_RETRIES - 1 { - tracing::warn!("Failed to get with {:?}, retrying", e); - // Exponential backoff: BASE_DELAY_MS * BACKOFF_BASE^attempt - // attempt 0: 100ms, attempt 1: 200ms, attempt 2: 400ms, attempt 3: 800ms, attempt 4: 1600ms - let delay_ms = BASE_DELAY_MS * (BACKOFF_BASE as u64).pow(attempt as u32); - tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; + // Check if this error is retryable + // Non-retryable errors (like NoBlockHeader, BlockNotFound) should be returned immediately + if !GatewayRetryState::is_retryable(&e) { + return Err(e); + } + + let retry_count = state.increment_retry(); + + // Report failure to health tracker + self.health.write().await.report_failure(operation); + + // Check if we should continue retrying + if !config.infinite_retry { + // For sequencers or other non-full-node modes, we might want to limit retries + // This is currently always true for full nodes + return Err(e); + } + + // Per-operation logging at DEBUG level (detailed diagnostics) + if state.should_log() { + let error_reason = GatewayRetryState::format_error_reason(&e); + let phase = state.current_phase(); + + tracing::debug!( + target: "mc_gateway_client::retry", + operation = operation, + reason = error_reason, + retries = retry_count, + phase = ?phase, + "Gateway unavailable" + ); } - last_error = Some(e); + + // Calculate delay based on error type and current phase + let delay = state.next_delay(&e); + + // Log phase transitions only once per operation (DEBUG level) + let current_phase = state.current_phase(); + if retry_count == 1 { + tracing::debug!( + target: "mc_gateway_client::retry", + operation = operation, + phase = ?current_phase, + interval_secs = delay.as_secs(), + "Retry strategy initialized" + ); + } + + tokio::time::sleep(delay).await; } } } - Err(last_error.expect("last_error should be Some after retry loop")) } pub async fn get_block(&self, block_id: BlockId) -> Result { - self.retry_get(|| async { - let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) - .add_uri_segment("get_block") - .expect("Failed to add URI segment. This should not fail in prod.") - .with_block_id(&block_id); - - request.send_get::().await - }) + self.retry_get( + || async { + let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) + .add_uri_segment("get_block") + .expect("Failed to add URI segment. This should not fail in prod.") + .with_block_id(&block_id); + + request.send_get::().await + }, + "get_block", + ) .await } pub async fn get_preconfirmed_block(&self, block_number: u64) -> Result { - self.retry_get(|| async { - let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) - .add_uri_segment("get_preconfirmed_block") - .expect("Failed to add URI segment. This should not fail in prod.") - .with_block_id(&BlockId::Number(block_number)); - - request.send_get::().await - }) + self.retry_get( + || async { + let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) + .add_uri_segment("get_preconfirmed_block") + .expect("Failed to add URI segment. This should not fail in prod.") + .with_block_id(&BlockId::Number(block_number)); + + request.send_get::().await + }, + "get_preconfirmed_block", + ) .await } pub async fn get_header(&self, block_id: BlockId) -> Result { - self.retry_get(|| async { - let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) - .add_uri_segment("get_block") - .expect("Failed to add URI segment. This should not fail in prod.") - .with_block_id(&block_id) - .add_param("headerOnly", "true"); - - request.send_get::().await - }) + self.retry_get( + || async { + let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) + .add_uri_segment("get_block") + .expect("Failed to add URI segment. This should not fail in prod.") + .with_block_id(&block_id) + .add_param("headerOnly", "true"); + + request.send_get::().await + }, + "get_header", + ) .await } pub async fn get_state_update(&self, block_id: BlockId) -> Result { - self.retry_get(|| async { - let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) - .add_uri_segment("get_state_update") - .expect("Failed to add URI segment. This should not fail in prod") - .with_block_id(&block_id); - - request.send_get::().await - }) + self.retry_get( + || async { + let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) + .add_uri_segment("get_state_update") + .expect("Failed to add URI segment. This should not fail in prod") + .with_block_id(&block_id); + + request.send_get::().await + }, + "get_state_update", + ) .await } pub async fn get_block_bouncer_weights(&self, block_number: u64) -> Result { - let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) - .add_uri_segment("get_block_bouncer_weights") - .expect("Failed to add URI segment. This should not fail in prod") - .with_block_id(&BlockId::Number(block_number)); - - request.send_get::().await + self.retry_get( + || async { + let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) + .add_uri_segment("get_block_bouncer_weights") + .expect("Failed to add URI segment. This should not fail in prod") + .with_block_id(&BlockId::Number(block_number)); + + request.send_get::().await + }, + "get_block_bouncer_weights", + ) + .await } pub async fn get_state_update_with_block( &self, block_id: BlockId, ) -> Result { - self.retry_get(|| async { - let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) - .add_uri_segment("get_state_update") - .expect("Failed to add URI segment. This should not fail in prod") - .with_block_id(&block_id) - .add_param(Cow::from("includeBlock"), "true"); - - request.send_get::().await - }) + self.retry_get( + || async { + let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) + .add_uri_segment("get_state_update") + .expect("Failed to add URI segment. This should not fail in prod") + .with_block_id(&block_id) + .add_param(Cow::from("includeBlock"), "true"); + + request.send_get::().await + }, + "get_state_update_with_block", + ) .await } @@ -136,14 +200,17 @@ impl GatewayProvider { return Err(StarknetError::no_signature_for_pending_block().into()); } - self.retry_get(|| async { - let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) - .add_uri_segment("get_signature") - .expect("Failed to add URI segment. This should not fail in prod") - .with_block_id(&block_id); + self.retry_get( + || async { + let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) + .add_uri_segment("get_signature") + .expect("Failed to add URI segment. This should not fail in prod") + .with_block_id(&block_id); - request.send_get::().await - }) + request.send_get::().await + }, + "get_signature", + ) .await } @@ -152,26 +219,29 @@ impl GatewayProvider { class_hash: Felt, block_id: BlockId, ) -> Result { - self.retry_get(|| async { - let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) - .add_uri_segment("get_class_by_hash") - .expect("Failed to add URI segment. This should not fail in prod.") - .with_block_id(&block_id) - .with_class_hash(class_hash); - - let value = request.send_get::().await?; - - if value.get("sierra_program").is_some() { - let sierra: FlattenedSierraClass = serde_json::from_value(value)?; - Ok(ContractClass::Sierra(Arc::new(sierra))) - } else if value.get("program").is_some() { - let legacy: mp_gateway::class::LegacyContractClass = serde_json::from_value(value)?; - Ok(ContractClass::Legacy(Arc::new(LegacyContractClass::from(legacy).compress()?.into()))) - } else { - let err = serde::de::Error::custom("Unknown contract type".to_string()); - Err(SequencerError::DeserializeBody { serde_error: err }) - } - }) + self.retry_get( + || async { + let request = RequestBuilder::new(&self.client, self.feeder_gateway_url.clone(), self.headers.clone()) + .add_uri_segment("get_class_by_hash") + .expect("Failed to add URI segment. This should not fail in prod.") + .with_block_id(&block_id) + .with_class_hash(class_hash); + + let value = request.send_get::().await?; + + if value.get("sierra_program").is_some() { + let sierra: FlattenedSierraClass = serde_json::from_value(value)?; + Ok(ContractClass::Sierra(Arc::new(sierra))) + } else if value.get("program").is_some() { + let legacy: mp_gateway::class::LegacyContractClass = serde_json::from_value(value)?; + Ok(ContractClass::Legacy(Arc::new(LegacyContractClass::from(legacy).compress()?.into()))) + } else { + let err = serde::de::Error::custom("Unknown contract type".to_string()); + Err(SequencerError::DeserializeBody { serde_error: err }) + } + }, + "get_class_by_hash", + ) .await } diff --git a/madara/crates/client/gateway/client/src/retry.rs b/madara/crates/client/gateway/client/src/retry.rs new file mode 100644 index 0000000000..5d2c12e598 --- /dev/null +++ b/madara/crates/client/gateway/client/src/retry.rs @@ -0,0 +1,230 @@ +/// Gateway-specific retry strategy with error handling +/// +/// This module wraps the generic mp-resilience retry logic with gateway-specific +/// error type handling for SequencerError. +use mp_gateway::error::{SequencerError, StarknetErrorCode}; +use std::time::Duration; + +// Re-export the generic retry types +pub use mp_resilience::{RetryConfig, RetryPhase, RetryState}; + +/// Gateway-specific retry state extensions +pub struct GatewayRetryState { + inner: RetryState, +} + +impl GatewayRetryState { + pub fn new(config: RetryConfig) -> Self { + Self { inner: RetryState::new(config) } + } + + /// Calculate delay for next retry based on current phase and error type + pub fn next_delay(&self, error: &SequencerError) -> Duration { + // Handle rate limiting separately + if self.is_rate_limited(error) { + return self.extract_retry_after(error).unwrap_or(Duration::from_secs(10)); + } + + self.inner.next_delay() + } + + /// Check if we should log this retry attempt (throttled logging) + pub fn should_log(&mut self) -> bool { + self.inner.should_log() + } + + /// Increment retry counter and return current count + pub fn increment_retry(&mut self) -> usize { + self.inner.increment_retry() + } + + /// Get current retry count + #[allow(dead_code)] + pub fn get_retry_count(&self) -> usize { + self.inner.get_retry_count() + } + + /// Determine current retry phase based on elapsed time + pub fn current_phase(&self) -> RetryPhase { + self.inner.current_phase() + } + + /// Get elapsed time since first retry + #[allow(dead_code)] + pub fn elapsed(&self) -> Duration { + self.inner.elapsed() + } + + /// Check if an error is retryable + /// + /// Retryable errors are transient network issues that may succeed on retry: + /// - HTTP/network errors (connection refused, timeout, etc.) + /// - Rate limiting errors + /// + /// Non-retryable errors are valid API responses that won't change on retry: + /// - NoBlockHeader, BlockNotFound, UndeclaredClass, etc. + pub fn is_retryable(error: &SequencerError) -> bool { + match error { + // Network-level errors are always retryable + SequencerError::HttpCallError(_) | SequencerError::HyperError(_) => true, + + // For StarknetError, only rate limits are retryable + // All other StarknetErrors are valid API responses + SequencerError::StarknetError(e) => matches!(e.code, StarknetErrorCode::RateLimited), + + // Other error types are retryable by default + _ => true, + } + } + + /// Check if error is a rate limit error + fn is_rate_limited(&self, error: &SequencerError) -> bool { + matches!( + error, + SequencerError::StarknetError(e) if e.code == StarknetErrorCode::RateLimited + ) + } + + /// Extract Retry-After duration from error if available + fn extract_retry_after(&self, _error: &SequencerError) -> Option { + // TODO: Parse Retry-After header from HttpCallError if available + // For now, return None and use default rate limit delay + None + } + + /// Check if error is a connection error (network-level failure) + #[allow(dead_code)] + pub fn is_connection_error(error: &SequencerError) -> bool { + match error { + SequencerError::HttpCallError(e) => { + let error_str = e.to_string().to_lowercase(); + error_str.contains("connection refused") + || error_str.contains("network unreachable") + || error_str.contains("connection reset") + || error_str.contains("broken pipe") + } + SequencerError::HyperError(_) => true, + _ => false, + } + } + + /// Check if error is a timeout + #[allow(dead_code)] + pub fn is_timeout_error(error: &SequencerError) -> bool { + match error { + SequencerError::HttpCallError(e) => { + let error_str = e.to_string().to_lowercase(); + error_str.contains("timeout") || error_str.contains("timed out") + } + _ => false, + } + } + + /// Format error for user-friendly logging + pub fn format_error_reason(error: &SequencerError) -> String { + match error { + SequencerError::HttpCallError(e) => { + let error_str = e.to_string(); + if error_str.contains("Connection refused") { + "connection refused".to_string() + } else if error_str.contains("timeout") || error_str.contains("timed out") { + "timeout".to_string() + } else if error_str.contains("network unreachable") { + "network unreachable".to_string() + } else if error_str.contains("connection reset") { + "connection reset".to_string() + } else { + "network error".to_string() + } + } + SequencerError::StarknetError(e) if e.code == StarknetErrorCode::RateLimited => "rate limited".to_string(), + SequencerError::StarknetError(e) => format!("{:?}", e.code).to_lowercase().replace('_', " "), + SequencerError::HyperError(_) => "http client error".to_string(), + _ => "unknown error".to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = RetryConfig::default(); + assert_eq!(config.phase1_duration, Duration::from_secs(5 * 60)); + assert_eq!(config.phase1_interval, Duration::from_secs(2)); + assert_eq!(config.max_backoff, Duration::from_secs(60)); + assert!(config.infinite_retry); + } + + #[test] + fn test_phase_determination() { + let config = RetryConfig { phase1_duration: Duration::from_secs(10), ..Default::default() }; + let state = GatewayRetryState::new(config); + + // Should start in aggressive phase + assert_eq!(state.current_phase(), RetryPhase::Aggressive); + } + + #[test] + fn test_retry_count() { + let mut state = GatewayRetryState::new(RetryConfig::default()); + assert_eq!(state.get_retry_count(), 0); + + assert_eq!(state.increment_retry(), 1); + assert_eq!(state.increment_retry(), 2); + assert_eq!(state.get_retry_count(), 2); + } + + #[tokio::test] + async fn test_log_throttling() { + let config = RetryConfig { log_interval: Duration::from_millis(100), ..Default::default() }; + let mut state = GatewayRetryState::new(config); + + // First log should always be allowed + assert!(state.should_log()); + + // Immediate second log should be throttled + assert!(!state.should_log()); + + // Wait for less than log_interval + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(!state.should_log(), "Still should be throttled"); + + // Wait for the rest of the interval + tokio::time::sleep(Duration::from_millis(60)).await; + assert!(state.should_log(), "Should log after interval"); + } + + #[test] + fn test_is_retryable_rate_limit() { + use mp_gateway::error::StarknetError; + + let rate_limit_error = SequencerError::StarknetError(StarknetError { + code: StarknetErrorCode::RateLimited, + message: "Rate limited".to_string(), + }); + + assert!(GatewayRetryState::is_retryable(&rate_limit_error), "Rate limit errors should be retryable"); + } + + #[test] + fn test_is_retryable_non_retryable_starknet_errors() { + use mp_gateway::error::StarknetError; + + // Test various non-retryable Starknet errors + let test_cases = vec![ + (StarknetErrorCode::NoBlockHeader, "NoBlockHeader"), + (StarknetErrorCode::BlockNotFound, "BlockNotFound"), + (StarknetErrorCode::UndeclaredClass, "UndeclaredClass"), + (StarknetErrorCode::NoSignatureForPendingBlock, "NoSignatureForPendingBlock"), + ]; + + for (code, name) in test_cases { + let error = SequencerError::StarknetError(StarknetError { code, message: format!("{} error", name) }); + + assert!(!GatewayRetryState::is_retryable(&error), "{} should not be retryable", name); + } + } +} diff --git a/madara/crates/client/gateway/client/src/tests/mod.rs b/madara/crates/client/gateway/client/src/tests/mod.rs new file mode 100644 index 0000000000..c771177ae4 --- /dev/null +++ b/madara/crates/client/gateway/client/src/tests/mod.rs @@ -0,0 +1 @@ +pub mod retry_tests; diff --git a/madara/crates/client/gateway/client/src/tests/retry_tests.rs b/madara/crates/client/gateway/client/src/tests/retry_tests.rs new file mode 100644 index 0000000000..96f2778565 --- /dev/null +++ b/madara/crates/client/gateway/client/src/tests/retry_tests.rs @@ -0,0 +1,97 @@ +/// Unit tests for gateway-specific retry error handling +use crate::retry::{GatewayRetryState, RetryConfig}; +use mp_gateway::error::SequencerError; +use std::time::Duration; + +#[tokio::test] +async fn test_connection_refused_error() { + let error = SequencerError::HttpCallError(Box::new(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "Connection refused", + ))); + + assert!(GatewayRetryState::is_connection_error(&error)); + + let reason = GatewayRetryState::format_error_reason(&error); + assert_eq!(reason, "connection refused"); +} + +#[tokio::test] +async fn test_timeout_error() { + let error = SequencerError::HttpCallError(Box::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "Operation timed out", + ))); + + assert!(GatewayRetryState::is_timeout_error(&error)); + + let reason = GatewayRetryState::format_error_reason(&error); + assert_eq!(reason, "timeout"); +} + +#[tokio::test] +async fn test_rate_limit_handling() { + use mp_gateway::error::StarknetError; + + let error = SequencerError::StarknetError(StarknetError::rate_limited()); + + let state = GatewayRetryState::new(RetryConfig::default()); + let delay = state.next_delay(&error); + + // Should respect rate limiting + assert!(delay >= Duration::from_secs(2), "Rate limited errors should have appropriate delay"); +} + +#[tokio::test] +async fn test_error_message_formatting() { + let test_cases = vec![ + ( + SequencerError::HttpCallError(Box::new(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "Connection refused", + ))), + "connection refused", + ), + ( + SequencerError::HttpCallError(Box::new(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout"))), + "timeout", + ), + (SequencerError::StarknetError(mp_gateway::error::StarknetError::rate_limited()), "rate limited"), + ]; + + for (error, expected) in test_cases { + let formatted = GatewayRetryState::format_error_reason(&error); + assert_eq!(formatted, expected, "Error formatting mismatch for: {:?}", error); + } +} + +#[tokio::test] +async fn test_mixed_error_types() { + let errors = vec![ + SequencerError::HttpCallError(Box::new(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "Connection refused", + ))), + SequencerError::HttpCallError(Box::new(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout"))), + SequencerError::StarknetError(mp_gateway::error::StarknetError::rate_limited()), + ]; + + let state = GatewayRetryState::new(RetryConfig::default()); + + for error in errors { + let delay = state.next_delay(&error); + let is_conn_error = GatewayRetryState::is_connection_error(&error); + let is_timeout = GatewayRetryState::is_timeout_error(&error); + let formatted = GatewayRetryState::format_error_reason(&error); + + // Verify basic properties + assert!(delay > Duration::from_secs(0), "Should have non-zero delay"); + + // At least one classification should match + assert!( + is_conn_error || is_timeout || !formatted.is_empty(), + "Error should be classified: {:?}", + error + ); + } +} diff --git a/madara/crates/client/settlement_client/Cargo.toml b/madara/crates/client/settlement_client/Cargo.toml index b23f8ea02d..a23a50271b 100644 --- a/madara/crates/client/settlement_client/Cargo.toml +++ b/madara/crates/client/settlement_client/Cargo.toml @@ -27,6 +27,7 @@ mp-block.workspace = true mp-chain-config.workspace = true mp-convert.workspace = true mp-oracle.workspace = true +mp-resilience.workspace = true mp-transactions.workspace = true mp-utils.workspace = true diff --git a/madara/crates/client/settlement_client/src/eth/mod.rs b/madara/crates/client/settlement_client/src/eth/mod.rs index d962b96fce..cbf8f50bd4 100644 --- a/madara/crates/client/settlement_client/src/eth/mod.rs +++ b/madara/crates/client/settlement_client/src/eth/mod.rs @@ -18,10 +18,12 @@ use error::EthereumClientError; use futures::stream::BoxStream; use futures::StreamExt; use mp_convert::{FeltExt, ToFelt}; +use mp_resilience::{ConnectionHealth, RetryConfig, RetryState}; use mp_transactions::L1HandlerTransactionWithFee; use mp_utils::service::ServiceContext; use std::str::FromStr; use std::sync::Arc; +use tokio::sync::RwLock; use url::Url; pub mod error; @@ -39,6 +41,7 @@ sol!( pub struct EthereumClient { pub provider: Arc, pub l1_core_contract: StarknetCoreContractInstance, RootProvider>>, + pub health: Arc>, } #[derive(Clone)] @@ -49,30 +52,92 @@ pub struct EthereumClientConfig { impl Clone for EthereumClient { fn clone(&self) -> Self { - EthereumClient { provider: Arc::clone(&self.provider), l1_core_contract: self.l1_core_contract.clone() } + EthereumClient { + provider: Arc::clone(&self.provider), + l1_core_contract: self.l1_core_contract.clone(), + health: Arc::clone(&self.health), + } } } impl EthereumClient { + /// Create a new Ethereum client with lazy initialization. + /// + /// This method no longer blocks on L1 availability. Instead, it: + /// - Validates the contract address format (fails fast on config errors) + /// - Creates the provider and contract instances + /// - Defers actual L1 connection verification to the first RPC call + /// + /// This allows Madara to start even when L1 is temporarily unavailable. + /// The retry logic in individual RPC calls will handle connection failures. pub async fn new(config: EthereumClientConfig) -> Result { let provider = ProviderBuilder::new().on_http(config.rpc_url); let core_contract_address = Address::from_str(&config.core_contract_address).map_err(|e| -> SettlementClientError { EthereumClientError::Conversion(format!("Invalid core contract address: {e}")).into() })?; - // Check if contract exists - if !provider - .get_code_at(core_contract_address) - .await - .map_err(|e| -> SettlementClientError { EthereumClientError::Rpc(e.to_string()).into() })? - .is_empty() - { - let contract = StarknetCoreContract::new(core_contract_address, provider.clone()); - Ok(Self { provider: Arc::new(provider), l1_core_contract: contract }) - } else { - Err(SettlementClientError::Ethereum(EthereumClientError::Contract( - "Core contract not found at given address".into(), - ))) + + // Note: We no longer check if the contract exists here to avoid blocking startup + // The contract existence will be verified on the first RPC call, with retry logic + let contract = StarknetCoreContract::new(core_contract_address, provider.clone()); + let health = Arc::new(RwLock::new(ConnectionHealth::new("L1 Endpoint"))); + + tracing::info!("L1 client initialized (lazy mode) - will connect on first use"); + Ok(Self { provider: Arc::new(provider), l1_core_contract: contract, health }) + } + + /// Get a reference to the health tracker for this L1 client + pub fn health(&self) -> Arc> { + Arc::clone(&self.health) + } + + /// Retry wrapper for L1 RPC calls with health tracking. + /// + /// This implements infinite retry with phase-based backoff: + /// - Phase 1 (0-5 min): Aggressive retry every 2 seconds + /// - Phase 2 (5-30 min): Exponential backoff (5s → 60s) + /// - Phase 3 (30+ min): Steady polling every 60 seconds + /// + /// All failures are reported to the health tracker for monitoring. + async fn retry_l1_call(&self, operation: &str, call_fn: F) -> Result + where + F: Fn() -> Fut, + Fut: std::future::Future>, + { + let config = RetryConfig::default(); + let mut state = RetryState::new(config.clone()); + + loop { + match call_fn().await { + Ok(result) => { + // Report success to health tracker + self.health.write().await.report_success(); + return Ok(result); + } + Err(e) => { + let retry_count = state.increment_retry(); + + // Report failure to health tracker + self.health.write().await.report_failure(operation); + + // Log with throttling to prevent spam + if state.should_log() { + let phase = state.current_phase(); + tracing::debug!( + target: "mc_settlement_client::retry", + operation = operation, + error = %e, + retries = retry_count, + phase = ?phase, + "L1 call failed - retrying" + ); + } + + // Calculate delay and wait + let delay = state.next_delay(); + tokio::time::sleep(delay).await; + } + } } } } @@ -87,76 +152,85 @@ impl SettlementLayerProvider for EthereumClient { /// Retrieves the latest Ethereum block number async fn get_latest_block_number(&self) -> Result { - self.provider - .get_block_number() - .await - .map(|n| n.as_u64()) - .map_err(|e| -> SettlementClientError { EthereumClientError::Rpc(e.to_string()).into() }) + self.retry_l1_call("get_latest_block_number", || async { + self.provider + .get_block_number() + .await + .map(|n| n.as_u64()) + .map_err(|e| -> SettlementClientError { EthereumClientError::Rpc(e.to_string()).into() }) + }) + .await } /// Get the block number of the last occurrence of the LogStateUpdate event. async fn get_last_event_block_number(&self) -> Result { - let latest_block = self.get_latest_block_number().await?; + self.retry_l1_call("get_last_event_block_number", || async { + let latest_block = self.get_latest_block_number().await?; - let filter = Filter::new().to_block(latest_block).address(*self.l1_core_contract.address()); + let filter = Filter::new().to_block(latest_block).address(*self.l1_core_contract.address()); - let logs = self - .provider - .get_logs(&filter) - .await - .map_err(|e| -> SettlementClientError { EthereumClientError::Rpc(e.to_string()).into() })?; - - let latest_logs = - logs.into_iter().rev().map(|log| log.log_decode::()).next(); - - match latest_logs { - Some(Ok(log)) => log - .block_number - .ok_or_else(|| -> SettlementClientError { EthereumClientError::MissingField("block_number").into() }), - Some(Err(e)) => Err(SettlementClientError::Ethereum(EthereumClientError::Contract(e.to_string()))), - None => Err(SettlementClientError::Ethereum(EthereumClientError::EventProcessing { - message: format!("no LogStateUpdate event found in block range [None, {}]", latest_block), - block_number: latest_block, - })), - } + let logs = self + .provider + .get_logs(&filter) + .await + .map_err(|e| -> SettlementClientError { EthereumClientError::Rpc(e.to_string()).into() })?; + + let latest_logs = + logs.into_iter().rev().map(|log| log.log_decode::()).next(); + + match latest_logs { + Some(Ok(log)) => log.block_number.ok_or_else(|| -> SettlementClientError { + EthereumClientError::MissingField("block_number").into() + }), + Some(Err(e)) => Err(SettlementClientError::Ethereum(EthereumClientError::Contract(e.to_string()))), + None => Err(SettlementClientError::Ethereum(EthereumClientError::EventProcessing { + message: format!("no LogStateUpdate event found in block range [None, {}]", latest_block), + block_number: latest_block, + })), + } + }) + .await } async fn get_current_core_contract_state(&self) -> Result { - // Get the latest block_n first, to guard against the case when the contract state changed in between the calls following calls. - let latest_block_n = self.get_latest_block_number().await?; - - let block_number = - self.l1_core_contract.stateBlockNumber().block(BlockId::number(latest_block_n)).call().await.map_err( - |e| -> SettlementClientError { - EthereumClientError::Contract(format!("Failed to get state block number: {e:#}")).into() - }, - )?; - // when the block 0 is not settled yet, this should be prev block number, this would be the output from the snos as well while - // executing the block 0. - // link: https://github.com/starkware-libs/cairo-lang/blob/master/src/starkware/starknet/solidity/StarknetState.sol#L32 - let block_number: Option = if block_number._0 == I256::MINUS_ONE { - None // initial contract state - } else { - Some(block_number._0.as_u64()) - }; - - let global_root = - self.l1_core_contract.stateRoot().block(BlockId::number(latest_block_n)).call().await.map_err( - |e| -> SettlementClientError { - EthereumClientError::Contract(format!("Failed to get state root: {e:#}")).into() - }, - )?; - let global_root = global_root._0.to_felt(); - - let block_hash = - self.l1_core_contract.stateBlockHash().block(BlockId::number(latest_block_n)).call().await.map_err( - |e| -> SettlementClientError { - EthereumClientError::Contract(format!("Failed to get state block number: {e:#}")).into() - }, - )?; - let block_hash = block_hash._0.to_felt(); - - Ok(StateUpdate { global_root, block_number, block_hash }) + self.retry_l1_call("get_current_core_contract_state", || async { + // Get the latest block_n first, to guard against the case when the contract state changed in between the calls following calls. + let latest_block_n = self.get_latest_block_number().await?; + + let block_number = + self.l1_core_contract.stateBlockNumber().block(BlockId::number(latest_block_n)).call().await.map_err( + |e| -> SettlementClientError { + EthereumClientError::Contract(format!("Failed to get state block number: {e:#}")).into() + }, + )?; + // when the block 0 is not settled yet, this should be prev block number, this would be the output from the snos as well while + // executing the block 0. + // link: https://github.com/starkware-libs/cairo-lang/blob/master/src/starkware/starknet/solidity/StarknetState.sol#L32 + let block_number: Option = if block_number._0 == I256::MINUS_ONE { + None // initial contract state + } else { + Some(block_number._0.as_u64()) + }; + + let global_root = + self.l1_core_contract.stateRoot().block(BlockId::number(latest_block_n)).call().await.map_err( + |e| -> SettlementClientError { + EthereumClientError::Contract(format!("Failed to get state root: {e:#}")).into() + }, + )?; + let global_root = global_root._0.to_felt(); + + let block_hash = + self.l1_core_contract.stateBlockHash().block(BlockId::number(latest_block_n)).call().await.map_err( + |e| -> SettlementClientError { + EthereumClientError::Contract(format!("Failed to get state block number: {e:#}")).into() + }, + )?; + let block_hash = block_hash._0.to_felt(); + + Ok(StateUpdate { global_root, block_number, block_hash }) + }) + .await } /// Listen for state update events from the L1 core contract and process them @@ -165,6 +239,11 @@ impl SettlementLayerProvider for EthereumClient { /// It will run until the context is cancelled. Each event is processed and used to update /// the L1 state in the backend database. /// + /// This implements infinite retry with exponential backoff: + /// - If the event stream fails to create, it retries with backoff + /// - If the stream dies during operation, it recreates the stream and retries + /// - All failures are reported to the health tracker + /// /// # Note /// This is a long-running function that blocks the current task until cancelled. async fn listen_for_update_state_events( @@ -172,67 +251,128 @@ impl SettlementLayerProvider for EthereumClient { mut ctx: ServiceContext, worker: StateUpdateWorker, ) -> Result<(), SettlementClientError> { - let event_filter = self.l1_core_contract.event_filter::(); - - let mut event_stream = match ctx.run_until_cancelled(event_filter.watch()).await { - Some(res) => res - .map_err(|e| -> SettlementClientError { - EthereumClientError::EventStream { message: format!("Failed to watch events: {}", e) }.into() - })? - .into_stream(), - None => return Ok(()), - }; - - // Process events in a loop until the context is cancelled - while let Some(Some(event_result)) = ctx.run_until_cancelled(event_stream.next()).await { - let log = event_result.map_err(|e| -> SettlementClientError { - EthereumClientError::EventStream { message: format!("Failed to process event: {e:#}") }.into() - })?; - - let format_event = convert_log_state_update(log.0.clone()).map_err(|e| -> SettlementClientError { - EthereumClientError::StateUpdate { message: format!("Failed to convert log state update: {e:#}") } - .into() - })?; + let config = RetryConfig::default(); + // Separate retry states for different failure modes: + // - stream_creation_retry: Tracks failures when creating the event stream connection + // - event_processing_retry: Tracks failures when processing events from an active stream + let mut stream_creation_retry = RetryState::new(config.clone()); + let mut event_processing_retry = RetryState::new(config); + + // Infinite retry loop for creating and maintaining the event stream + loop { + // Try to create the event stream with retry logic + let event_filter = self.l1_core_contract.event_filter::(); + + let event_stream_result = ctx + .run_until_cancelled(async { + self.retry_l1_call("watch_events", || async { + event_filter.watch().await.map_err(|e| -> SettlementClientError { + EthereumClientError::EventStream { message: format!("Failed to watch events: {}", e) } + .into() + }) + }) + .await + }) + .await; + + let mut event_stream = match event_stream_result { + Some(Ok(stream)) => stream.into_stream(), + Some(Err(e)) => { + // This shouldn't happen since retry_l1_call has infinite retry, + // but handle it just in case + tracing::error!("Failed to create event stream: {e}"); + let delay = stream_creation_retry.next_delay(); + stream_creation_retry.increment_retry(); + tokio::time::sleep(delay).await; + continue; + } + None => return Ok(()), // Context cancelled + }; + + // Process events from the stream + while let Some(event_option) = ctx.run_until_cancelled(event_stream.next()).await { + match event_option { + Some(Ok(log)) => { + // Successfully received an event + self.health.write().await.report_success(); + + match convert_log_state_update(log.0.clone()) { + Ok(format_event) => { + if let Err(e) = worker.update_state(format_event) { + tracing::error!("Failed to update L1 state: {e:#}"); + // Continue processing other events + } + } + Err(e) => { + tracing::error!("Failed to convert log state update: {e:#}"); + // Continue processing other events + } + } + } + Some(Err(e)) => { + // Stream error - report failure and recreate stream + tracing::warn!("Event stream error: {e:#} - will recreate stream"); + self.health.write().await.report_failure("event_stream"); + + let delay = event_processing_retry.next_delay(); + event_processing_retry.increment_retry(); + tokio::time::sleep(delay).await; + break; // Break inner loop to recreate stream + } + None => { + // Stream ended unexpectedly - recreate it + tracing::warn!("Event stream ended unexpectedly - will recreate stream"); + self.health.write().await.report_failure("event_stream"); + + let delay = event_processing_retry.next_delay(); + event_processing_retry.increment_retry(); + tokio::time::sleep(delay).await; + break; // Break inner loop to recreate stream + } + } + } - worker.update_state(format_event).map_err(|e| -> SettlementClientError { - EthereumClientError::StateUpdate { message: format!("Failed to update L1 state: {e:#}") }.into() - })?; + // If we broke out of the inner loop due to ctx cancellation, exit + if ctx.is_cancelled() { + return Ok(()); + } } - - Ok(()) } async fn get_gas_prices(&self) -> Result<(u128, u128), SettlementClientError> { - let block_number = self.get_latest_block_number().await?; - let fee_history = self - .provider - .get_fee_history(HISTORY_SIZE as u64, BlockNumberOrTag::Number(block_number), &[]) - .await - .map_err(|e| -> SettlementClientError { - EthereumClientError::GasPriceCalculation { - message: format!("Failed to get fee history for block {}: {}", block_number, e), - } - .into() + self.retry_l1_call("get_gas_prices", || async { + let block_number = self.get_latest_block_number().await?; + let fee_history = self + .provider + .get_fee_history(HISTORY_SIZE as u64, BlockNumberOrTag::Number(block_number), &[]) + .await + .map_err(|e| -> SettlementClientError { + EthereumClientError::GasPriceCalculation { + message: format!("Failed to get fee history for block {}: {}", block_number, e), + } + .into() + })?; + + // Calculate average blob base fee from recent blocks + // We use reverse iteration and take() to handle cases where the RPC might return + // more or fewer elements than requested, ensuring we use at most HISTORY_SIZE blocks + // for a more stable and representative average gas price + let avg_blob_base_fee = fee_history + .base_fee_per_blob_gas + .iter() + .rev() + .take(HISTORY_SIZE) + .sum::() + .checked_div(fee_history.base_fee_per_blob_gas.len() as u128) + .unwrap_or(0); + + let eth_gas_price = fee_history.base_fee_per_gas.last().ok_or_else(|| -> SettlementClientError { + EthereumClientError::MissingField("base_fee_per_gas in fee history response").into() })?; - // Calculate average blob base fee from recent blocks - // We use reverse iteration and take() to handle cases where the RPC might return - // more or fewer elements than requested, ensuring we use at most HISTORY_SIZE blocks - // for a more stable and representative average gas price - let avg_blob_base_fee = fee_history - .base_fee_per_blob_gas - .iter() - .rev() - .take(HISTORY_SIZE) - .sum::() - .checked_div(fee_history.base_fee_per_blob_gas.len() as u128) - .unwrap_or(0); - - let eth_gas_price = fee_history.base_fee_per_gas.last().ok_or_else(|| -> SettlementClientError { - EthereumClientError::MissingField("base_fee_per_gas in fee history response").into() - })?; - - Ok((*eth_gas_price, avg_blob_base_fee)) + Ok((*eth_gas_price, avg_blob_base_fee)) + }) + .await } fn calculate_message_hash(&self, event: &L1HandlerTransactionWithFee) -> Result, SettlementClientError> { @@ -263,17 +403,20 @@ impl SettlementLayerProvider for EthereumClient { /// - `true` if there is a cancellation request for this message to l2. /// - An Error if the call fail async fn message_to_l2_has_cancel_request(&self, msg_hash: &[u8]) -> Result { - let cancellation_timestamp = - self.l1_core_contract.l1ToL2MessageCancellations(B256::from_slice(msg_hash)).call().await.map_err( - |e| -> SettlementClientError { - EthereumClientError::L1ToL2Messaging { - message: format!("Failed to check message cancellation status: {}", e), - } - .into() - }, - )?; - - Ok(!cancellation_timestamp._0.is_zero()) + self.retry_l1_call("message_to_l2_has_cancel_request", || async { + let cancellation_timestamp = + self.l1_core_contract.l1ToL2MessageCancellations(B256::from_slice(msg_hash)).call().await.map_err( + |e| -> SettlementClientError { + EthereumClientError::L1ToL2Messaging { + message: format!("Failed to check message cancellation status: {}", e), + } + .into() + }, + )?; + + Ok(!cancellation_timestamp._0.is_zero()) + }) + .await } /// Get cancellation status of an L1 to L2 message @@ -291,19 +434,22 @@ impl SettlementLayerProvider for EthereumClient { /// - `true` if the message exists in the core contract /// - An Error if the call fail async fn message_to_l2_is_pending(&self, msg_hash: &[u8]) -> Result { - tracing::debug!("Calling l1ToL2Messages"); - let cancellation_timestamp = - self.l1_core_contract.l1ToL2Messages(B256::from_slice(msg_hash)).call().await.map_err( - |e| -> SettlementClientError { - EthereumClientError::L1ToL2Messaging { - message: format!("Failed to check that message exists, status: {}", e), - } - .into() - }, - )?; - - tracing::debug!("Returned"); - Ok(cancellation_timestamp._0 != U256::ZERO) + self.retry_l1_call("message_to_l2_is_pending", || async { + tracing::debug!("Calling l1ToL2Messages"); + let cancellation_timestamp = + self.l1_core_contract.l1ToL2Messages(B256::from_slice(msg_hash)).call().await.map_err( + |e| -> SettlementClientError { + EthereumClientError::L1ToL2Messaging { + message: format!("Failed to check that message exists, status: {}", e), + } + .into() + }, + )?; + + tracing::debug!("Returned"); + Ok(cancellation_timestamp._0 != U256::ZERO) + }) + .await } async fn get_block_n_timestamp(&self, l1_block_n: u64) -> Result { @@ -328,17 +474,22 @@ impl SettlementLayerProvider for EthereumClient { &self, from_l1_block_n: u64, ) -> Result>, SettlementClientError> { - let filter = self.l1_core_contract.event_filter::(); - let event_stream = - filter.from_block(from_l1_block_n).to_block(BlockNumberOrTag::Finalized).watch().await.map_err( - |e| -> SettlementClientError { - EthereumClientError::ArchiveRequired(format!( - "Could not fetch events, archive node may be required: {}", - e - )) - .into() - }, - )?; + // Wrap the watch call with retry logic to handle L1 being down + // Note: We need to recreate the filter inside the closure to avoid move issues + let event_stream = self + .retry_l1_call("watch_message_events", || async { + let filter = self.l1_core_contract.event_filter::(); + filter.from_block(from_l1_block_n).to_block(BlockNumberOrTag::Finalized).watch().await.map_err( + |e| -> SettlementClientError { + EthereumClientError::ArchiveRequired(format!( + "Could not fetch events, archive node may be required: {}", + e + )) + .into() + }, + ) + }) + .await?; Ok(EthereumEventStream::new(event_stream).boxed()) } @@ -378,7 +529,8 @@ pub mod eth_client_getter_test { let provider = ProviderBuilder::new().on_http(rpc_url.clone()); let address = Address::parse_checksummed(CORE_CONTRACT_ADDRESS, None).unwrap(); let contract = StarknetCoreContract::new(address, provider.clone()); - EthereumClient { provider: Arc::new(provider), l1_core_contract: contract } + let health = Arc::new(RwLock::new(ConnectionHealth::new("L1 Endpoint (Test)"))); + EthereumClient { provider: Arc::new(provider), l1_core_contract: contract, health } } #[tokio::test] @@ -456,7 +608,8 @@ pub mod eth_client_getter_test { let provider = ProviderBuilder::new().on_http(config.rpc_url); let contract = StarknetCoreContract::new(config.core_contract_address.parse().unwrap(), provider.clone()); - let eth_client = EthereumClient { provider: Arc::new(provider), l1_core_contract: contract }; + let health = Arc::new(RwLock::new(ConnectionHealth::new("L1 Endpoint (Test)"))); + let eth_client = EthereumClient { provider: Arc::new(provider), l1_core_contract: contract, health }; // Call contract and verify we get -1 as int256 let block_number = eth_client @@ -497,11 +650,13 @@ mod l1_messaging_tests { }; use mc_db::MadaraBackend; use mp_chain_config::ChainConfig; + use mp_resilience::ConnectionHealth; use mp_transactions::{L1HandlerTransaction, L1HandlerTransactionWithFee}; use mp_utils::service::ServiceContext; use rstest::*; use starknet_types_core::felt::Felt; use std::{sync::Arc, time::Duration}; + use tokio::sync::RwLock; use tracing_test::traced_test; use url::Url; @@ -624,8 +779,9 @@ mod l1_messaging_tests { let core_contract = StarknetCoreContract::new(*contract.address(), provider.clone()); + let health = Arc::new(RwLock::new(ConnectionHealth::new("L1 Endpoint (Test)"))); let eth_client = - EthereumClient { provider: Arc::new(provider.clone()), l1_core_contract: core_contract.clone() }; + EthereumClient { provider: Arc::new(provider.clone()), l1_core_contract: core_contract.clone(), health }; TestRunner { anvil, db_service: db, dummy_contract: contract, eth_client } } @@ -827,8 +983,9 @@ mod eth_client_event_subscription_test { let contract = DummyContract::deploy(provider.clone()).await.unwrap(); let core_contract = StarknetCoreContract::new(*contract.address(), provider.clone()); + let health = Arc::new(RwLock::new(ConnectionHealth::new("L1 Endpoint (Test)"))); let eth_client = - EthereumClient { provider: Arc::new(provider.clone()), l1_core_contract: core_contract.clone() }; + EthereumClient { provider: Arc::new(provider.clone()), l1_core_contract: core_contract.clone(), health }; let l1_block_metrics = L1BlockMetrics::register().unwrap(); let (snd, mut recv) = tokio::sync::watch::channel(None); diff --git a/madara/crates/client/settlement_client/src/gas_price.rs b/madara/crates/client/settlement_client/src/gas_price.rs index d372b73cf4..9ced176f17 100644 --- a/madara/crates/client/settlement_client/src/gas_price.rs +++ b/madara/crates/client/settlement_client/src/gas_price.rs @@ -197,13 +197,24 @@ pub async fn gas_price_worker( } } - if SystemTime::now().duration_since(last_update_timestamp).expect("SystemTime::now() < last_update_timestamp") - > gas_provider_config.poll_interval * 10 - { - return Err(SettlementClientError::GasPrice(format!( - "Failed to update gas prices for more than 10x poll interval: {:?}", - gas_provider_config.poll_interval - ))); + // Note: Removed the panic condition that would kill the worker after 10x poll interval + // The gas price worker now retries infinitely, relying on the underlying L1 calls' retry logic + // to handle transient failures. The health monitor tracks L1 connection status separately. + let time_since_last_update = match SystemTime::now().duration_since(last_update_timestamp) { + Ok(duration) => duration, + Err(_) => { + // System time went backwards (NTP adjustment, VM snapshot, etc.) + tracing::warn!("System time went backwards, resetting gas price update timestamp"); + last_update_timestamp = SystemTime::now(); + Duration::from_secs(0) + } + }; + + if time_since_last_update > gas_provider_config.poll_interval * 10 { + tracing::warn!( + "Gas prices haven't been updated for {:?} (>10x poll interval) - L1 may be down, continuing to retry...", + time_since_last_update + ); } } Ok(()) diff --git a/madara/crates/client/settlement_client/src/lib.rs b/madara/crates/client/settlement_client/src/lib.rs index 1af6b00630..394cfaec01 100644 --- a/madara/crates/client/settlement_client/src/lib.rs +++ b/madara/crates/client/settlement_client/src/lib.rs @@ -160,9 +160,10 @@ use futures::{ StreamExt, }; use mc_db::MadaraBackend; +use mp_resilience::ConnectionHealth; use mp_transactions::L1HandlerTransactionWithFee; use std::sync::Arc; -use tokio::sync::Notify; +use tokio::sync::{Notify, RwLock}; use url::Url; mod client; @@ -186,17 +187,27 @@ pub struct L1ClientImpl { provider: Arc, backend: Arc, notify_new_message_to_l2: Arc, + health: Arc>, } impl L1ClientImpl { - fn new(backend: Arc, provider: Arc) -> Self { - Self { provider, backend, notify_new_message_to_l2: Default::default() } + fn new( + backend: Arc, + provider: Arc, + health: Arc>, + ) -> Self { + Self { provider, backend, notify_new_message_to_l2: Default::default(), health } } pub fn provider(&self) -> Arc { self.provider.clone() } + /// Get a reference to the L1 health tracker + pub fn provider_health(&self) -> Arc> { + Arc::clone(&self.health) + } + pub async fn new_ethereum( backend: Arc, rpc_url: Url, @@ -205,7 +216,8 @@ impl L1ClientImpl { let provider = EthereumClient::new(EthereumClientConfig { rpc_url, core_contract_address }) .await .context("Creating ethereum client")?; - Ok(Self::new(backend, Arc::new(provider))) + let health = provider.health(); + Ok(Self::new(backend, Arc::new(provider), health)) } pub async fn new_starknet( @@ -216,7 +228,8 @@ impl L1ClientImpl { let provider = StarknetClient::new(StarknetClientConfig { rpc_url, core_contract_address }) .await .context("Creating starknet client")?; - Ok(Self::new(backend, Arc::new(provider))) + let health = provider.health(); + Ok(Self::new(backend, Arc::new(provider), health)) } } diff --git a/madara/crates/client/settlement_client/src/messaging.rs b/madara/crates/client/settlement_client/src/messaging.rs index 2f9bc276f9..e28cf30e58 100644 --- a/madara/crates/client/settlement_client/src/messaging.rs +++ b/madara/crates/client/settlement_client/src/messaging.rs @@ -76,9 +76,53 @@ pub async fn sync( notify_consumer: Arc, mut ctx: ServiceContext, ) -> Result<(), SettlementClientError> { - // sync inner is cancellation safe. - ctx.run_until_cancelled(sync_inner(settlement_client, backend, notify_consumer)).await.transpose()?; - Ok(()) + use mp_resilience::{RetryConfig, RetryState}; + + let config = RetryConfig::default(); + let mut retry_state = RetryState::new(config); + + // Infinite retry loop for message syncing + loop { + let result = ctx + .run_until_cancelled(sync_inner( + Arc::clone(&settlement_client), + Arc::clone(&backend), + Arc::clone(¬ify_consumer), + )) + .await; + + match result { + Some(Ok(())) => { + // Sync completed successfully (shouldn't normally happen as sync_inner runs indefinitely) + return Ok(()); + } + Some(Err(e)) => { + // Sync failed - log and retry + retry_state.increment_retry(); + + if retry_state.should_log() { + let phase = retry_state.current_phase(); + tracing::warn!( + error = %e, + retries = retry_state.get_retry_count(), + phase = ?phase, + "L1 message sync failed - retrying" + ); + } + + let delay = retry_state.next_delay(); + + // Check for cancellation during sleep to ensure fast shutdown + if ctx.run_until_cancelled(tokio::time::sleep(delay)).await.is_none() { + return Ok(()); // Cancelled during sleep + } + } + None => { + // Context was cancelled + return Ok(()); + } + } + } } async fn sync_inner( @@ -149,7 +193,12 @@ async fn sync_inner( let notify_consumer = notify_consumer.clone(); async move { match block_n { - Err(err) => tracing::debug!("Error while parsing the next ethereum message: {err:#}"), + Err(err) => { + tracing::warn!( + error = %err, + "Failed to parse L1 message - this message will be skipped but sync continues" + ); + } Ok((tx_hash, block_n)) => { tracing::debug!("Processed {tx_hash:#x} {block_n}"); tracing::debug!("Set l1_messaging_sync_tip={block_n}"); diff --git a/madara/crates/client/settlement_client/src/starknet/mod.rs b/madara/crates/client/settlement_client/src/starknet/mod.rs index c3d97c94e9..3c0e19ab9d 100644 --- a/madara/crates/client/settlement_client/src/starknet/mod.rs +++ b/madara/crates/client/settlement_client/src/starknet/mod.rs @@ -39,6 +39,7 @@ pub struct StarknetClient { pub provider: Arc>, pub core_contract_address: Felt, pub processed_update_state_block: AtomicU64, + pub health: Arc>, } #[derive(Clone)] @@ -53,6 +54,7 @@ impl Clone for StarknetClient { provider: Arc::clone(&self.provider), core_contract_address: self.core_contract_address, processed_update_state_block: AtomicU64::new(self.processed_update_state_block.load(Ordering::Relaxed)), + health: Arc::clone(&self.health), } } } @@ -65,20 +67,25 @@ impl StarknetClient { Felt::from_hex(&config.core_contract_address).map_err(|e| -> SettlementClientError { StarknetClientError::Conversion(format!("Invalid core contract address: {e}")).into() })?; - // Check if l2 contract exists - provider.get_class_at(BlockId::Tag(BlockTag::Latest), core_contract_address).await.map_err( - |e| -> SettlementClientError { - StarknetClientError::NetworkConnection { message: format!("Failed to connect to L2 contract: {}", e) } - .into() - }, - )?; + + // Note: We no longer check if the contract exists here to avoid blocking startup + // The contract existence will be verified on the first RPC call, with retry logic + let health = Arc::new(tokio::sync::RwLock::new(mp_resilience::ConnectionHealth::new("L1 Endpoint"))); + + tracing::info!("L1 client initialized (lazy mode) - will connect on first use"); Ok(Self { provider: Arc::new(provider), core_contract_address, processed_update_state_block: AtomicU64::new(0), // Keeping this as 0 initially when client is initialized. + health, }) } + + /// Get a reference to the health tracker for this L1 client + pub fn health(&self) -> Arc> { + Arc::clone(&self.health) + } } const POLL_INTERVAL: Duration = Duration::from_secs(5); // Interval between event polling attempts diff --git a/madara/crates/primitives/resilience/Cargo.toml b/madara/crates/primitives/resilience/Cargo.toml new file mode 100644 index 0000000000..9dc0957122 --- /dev/null +++ b/madara/crates/primitives/resilience/Cargo.toml @@ -0,0 +1,26 @@ +[package] +description = "Generic resilience primitives for connection health and retry logic" +name = "mp-resilience" +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +version.workspace = true +homepage.workspace = true + +[lints] +workspace = true + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +# Async runtime +tokio = { workspace = true, features = ["sync", "time", "macros"] } + +# Logging +tracing.workspace = true + +# Serialization (for HashMap) +[dev-dependencies] +rstest.workspace = true diff --git a/madara/crates/primitives/resilience/src/health.rs b/madara/crates/primitives/resilience/src/health.rs new file mode 100644 index 0000000000..b0da87bf07 --- /dev/null +++ b/madara/crates/primitives/resilience/src/health.rs @@ -0,0 +1,465 @@ +/// Generic Connection Health Tracking System +/// +/// This module provides centralized health monitoring for external service connections. +/// It tracks overall connection health state and provides clean, aggregated logging +/// instead of per-operation spam. +/// +/// Features: +/// - Three health states: Healthy, Degraded, Down +/// - Adaptive heartbeat logging (5s → 10s → 30s based on outage duration) +/// - Recovery confirmation (waits for stable connection before declaring healthy) +/// - Per-operation failure tracking +/// - State transition logging +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; + +// Health state transition thresholds +const CONSECUTIVE_FAILURES_TO_DOWN: usize = 10; +const FAILURE_RATE_DEGRADED_THRESHOLD: f32 = 0.5; +const CONSECUTIVE_SUCCESSES_FOR_RECOVERY: usize = 3; +const RECOVERY_ATTEMPTS_THRESHOLD: usize = 10; +const FAILURE_RATE_HEALTHY_THRESHOLD: f32 = 0.1; + +// Minimum time to stay in Degraded state before transitioning to Healthy +// This prevents rapid state oscillations if connection is flaky +const MIN_TIME_IN_DEGRADED: Duration = Duration::from_secs(2); + +// Heartbeat intervals +const HEARTBEAT_INTERVAL_DEGRADED: Duration = Duration::from_secs(10); +const HEARTBEAT_INTERVAL_DOWN_PHASE1: Duration = Duration::from_secs(5); // 0-5 minutes +const HEARTBEAT_INTERVAL_DOWN_PHASE2: Duration = Duration::from_secs(10); // 5-30 minutes +const HEARTBEAT_INTERVAL_DOWN_PHASE3: Duration = Duration::from_secs(60); // 30+ minutes + +// Phase durations for adaptive logging +const PHASE1_DURATION: Duration = Duration::from_secs(5 * 60); // 5 minutes +const PHASE2_DURATION: Duration = Duration::from_secs(30 * 60); // 30 minutes + +/// Connection health state +#[derive(Debug, Clone, PartialEq)] +pub enum HealthState { + /// All operations succeeding normally + Healthy, + + /// Some operations failing, some succeeding (intermittent issues) + Degraded { failure_rate: f32 }, + + /// All or most operations failing (connection down or unreachable) + Down, +} + +/// Generic connection health tracker +#[derive(Debug)] +pub struct ConnectionHealth { + /// Name of the service being tracked (e.g., "Gateway", "L1 Endpoint") + service_name: String, + + /// Current health state + state: HealthState, + + /// When the first failure occurred (start of degradation/downtime) + first_failure_time: Option, + + /// When the state last changed + last_state_change: Instant, + + /// When we last logged a heartbeat message + last_heartbeat_log: Option, + + /// Total requests since last state transition + total_requests: usize, + + /// Failed requests since last state transition + failed_requests: usize, + + /// Consecutive successful requests + consecutive_successes: usize, + + /// Consecutive failed requests + consecutive_failures: usize, + + /// Per-operation failure tracking + failed_operations: HashMap, + + /// Recovery attempts (successes since entering recovery) + recovery_attempts: usize, +} + +impl ConnectionHealth { + pub fn new(service_name: impl Into) -> Self { + Self { + service_name: service_name.into(), + state: HealthState::Healthy, + first_failure_time: None, + last_state_change: Instant::now(), + last_heartbeat_log: None, + total_requests: 0, + failed_requests: 0, + consecutive_successes: 0, + consecutive_failures: 0, + failed_operations: HashMap::new(), + recovery_attempts: 0, + } + } + + /// Report a failed operation + pub fn report_failure(&mut self, operation: &str) { + self.total_requests += 1; + self.failed_requests += 1; + self.consecutive_failures += 1; + self.consecutive_successes = 0; + + *self.failed_operations.entry(operation.to_string()).or_insert(0) += 1; + + // Prevent unbounded memory growth: limit to top 50 failing operations + if self.failed_operations.len() > 50 { + // Keep only the 20 most frequently failing operations + let mut ops: Vec<_> = self.failed_operations.iter().map(|(k, v)| (k.clone(), *v)).collect(); + ops.sort_by(|a, b| b.1.cmp(&a.1)); // Sort by failure count descending + self.failed_operations = ops.into_iter().take(20).collect(); + } + + self.transition_on_failure(); + } + + /// Handle state transitions on failure + fn transition_on_failure(&mut self) { + match &self.state { + HealthState::Healthy => self.transition_healthy_to_degraded(), + HealthState::Degraded { .. } if self.should_transition_to_down() => self.transition_degraded_to_down(), + _ => {} + } + } + + fn transition_healthy_to_degraded(&mut self) { + self.state = HealthState::Degraded { failure_rate: self.failure_rate() }; + self.first_failure_time = Some(Instant::now()); + self.last_state_change = Instant::now(); + tracing::warn!("🟡 {} experiencing intermittent errors", self.service_name); + } + + fn transition_degraded_to_down(&mut self) { + self.state = HealthState::Down; + self.last_state_change = Instant::now(); + self.recovery_attempts = 0; + tracing::warn!("🔴 {} connection lost - retrying...", self.service_name); + } + + fn should_transition_to_down(&self) -> bool { + self.consecutive_failures >= CONSECUTIVE_FAILURES_TO_DOWN + || self.failure_rate() > FAILURE_RATE_DEGRADED_THRESHOLD + } + + /// Report a successful operation + pub fn report_success(&mut self) { + self.total_requests += 1; + self.consecutive_successes += 1; + self.consecutive_failures = 0; + + self.transition_on_success(); + } + + /// Handle state transitions on success + fn transition_on_success(&mut self) { + match &self.state { + HealthState::Healthy => {} + HealthState::Down => self.transition_down_to_degraded(), + HealthState::Degraded { .. } => self.try_transition_to_healthy(), + } + } + + fn transition_down_to_degraded(&mut self) { + let downtime = self.first_failure_time.map(|t| t.elapsed()).unwrap_or(Duration::from_secs(0)); + let failed_ops = self.failed_requests; + + // Reset counters to reflect current state, not historical outage + self.total_requests = 1; // The success that triggered this transition + self.failed_requests = 0; + self.consecutive_failures = 0; + self.consecutive_successes = 1; + self.failed_operations.clear(); + + self.state = HealthState::Degraded { failure_rate: 0.0 }; + self.recovery_attempts = 1; + self.last_state_change = Instant::now(); + + tracing::info!( + "🟡 {} partially restored - monitoring stability... (was down for {}, {} operations failed)", + self.service_name, + format_duration(downtime), + failed_ops + ); + + // Immediately check if we can transition to healthy + // If the operation that brought us back is successful (which it is), + // and we have no ongoing failures, transition immediately + self.try_transition_to_healthy(); + } + + fn try_transition_to_healthy(&mut self) { + self.recovery_attempts += 1; + + if self.should_transition_to_healthy() { + let downtime = self.first_failure_time.map(|t| t.elapsed()).unwrap_or(Duration::from_secs(0)); + let failed_ops = self.failed_requests; + + self.state = HealthState::Healthy; + self.last_state_change = Instant::now(); + + tracing::info!( + "🟢 {} UP - Restored after {} ({} operations failed during outage)", + self.service_name, + format_duration(downtime), + failed_ops + ); + + self.reset_metrics(); + } + } + + fn should_transition_to_healthy(&self) -> bool { + // Immediate transition if we have enough consecutive successes + if self.consecutive_successes >= CONSECUTIVE_SUCCESSES_FOR_RECOVERY { + return true; + } + + // Also transition immediately if no operations are failing (clean recovery) + // This allows fast transition from Down -> Degraded -> Healthy when L1 comes back up + if self.failed_operations.is_empty() && self.failure_rate() == 0.0 && self.recovery_attempts > 0 { + return true; + } + + // For ongoing partial failures, ensure we've been in Degraded state for minimum time + // This prevents rapid Down -> Degraded -> Healthy -> Down cycles on flaky connections + if self.last_state_change.elapsed() < MIN_TIME_IN_DEGRADED { + return false; + } + + // Standard recovery: enough attempts with low failure rate + self.recovery_attempts >= RECOVERY_ATTEMPTS_THRESHOLD && self.failure_rate() < FAILURE_RATE_HEALTHY_THRESHOLD + } + + /// Calculate current failure rate + fn failure_rate(&self) -> f32 { + if self.total_requests == 0 { + 0.0 + } else { + self.failed_requests as f32 / self.total_requests as f32 + } + } + + /// Check if we should log a heartbeat based on current state + pub fn should_log_heartbeat(&self) -> bool { + let interval = match &self.state { + HealthState::Healthy => return false, // Don't log when healthy + HealthState::Degraded { .. } => HEARTBEAT_INTERVAL_DEGRADED, + HealthState::Down => self.get_down_heartbeat_interval(), + }; + + match self.last_heartbeat_log { + None => true, + Some(last) => last.elapsed() >= interval, + } + } + + /// Get adaptive heartbeat interval for Down state based on outage duration + fn get_down_heartbeat_interval(&self) -> Duration { + let elapsed = self.first_failure_time.unwrap().elapsed(); + + if elapsed < PHASE1_DURATION { + HEARTBEAT_INTERVAL_DOWN_PHASE1 + } else if elapsed < PHASE2_DURATION { + HEARTBEAT_INTERVAL_DOWN_PHASE2 + } else { + HEARTBEAT_INTERVAL_DOWN_PHASE3 + } + } + + /// Log current status (called by heartbeat task) + pub fn log_status(&mut self) { + match &self.state { + HealthState::Healthy => { + // No logging - silence is golden + } + + HealthState::Degraded { failure_rate } => { + let duration = self.first_failure_time.map(|t| t.elapsed()).unwrap_or(Duration::from_secs(0)); + let affected_ops: Vec<_> = self.failed_operations.keys().map(|s| s.as_str()).collect(); + + // Don't log if no operations are affected (empty list means we're recovering) + if !affected_ops.is_empty() { + tracing::warn!( + "🟡 {} unstable ({}) - {:.0}% failure rate, operations affected: {}", + self.service_name, + format_duration(duration), + failure_rate * 100.0, + affected_ops.join(", ") + ); + } + } + + HealthState::Down => { + let duration = self.first_failure_time.map(|t| t.elapsed()).unwrap_or(Duration::from_secs(0)); + let phase = get_retry_phase(duration); + + tracing::warn!( + "🔴 {} down ({}) - Phase: {} → {} failed operations", + self.service_name, + format_duration(duration), + phase, + self.failed_requests + ); + } + } + + self.last_heartbeat_log = Some(Instant::now()); + } + + /// Reset metrics (called when transitioning to healthy) + fn reset_metrics(&mut self) { + self.first_failure_time = None; + self.total_requests = 0; + self.failed_requests = 0; + self.consecutive_successes = 0; + self.consecutive_failures = 0; + self.failed_operations.clear(); + self.recovery_attempts = 0; + } + + /// Get current health state + pub fn state(&self) -> &HealthState { + &self.state + } +} + +/// Start a background health monitor task +/// +/// This spawns a tokio task that periodically logs connection health status. +/// The returned JoinHandle can be used to stop the monitor during graceful shutdown. +/// +/// # Arguments +/// * `health` - Arc to the ConnectionHealth instance to monitor +/// +/// # Returns +/// A JoinHandle that can be awaited or aborted to stop the monitor +pub fn start_health_monitor(health: Arc>) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let service_name = { + let guard = health.read().await; + guard.service_name.clone() + }; + + tracing::debug!("{} health monitor started", service_name); + + loop { + // Use tokio::select! to make the sleep cancellable + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(1)) => { + let mut health_guard = health.write().await; + + if health_guard.should_log_heartbeat() { + health_guard.log_status(); + } + } + } + } + }) +} + +/// Format duration in human-readable form +fn format_duration(d: Duration) -> String { + let secs = d.as_secs(); + if secs < 60 { + format!("{}s", secs) + } else if secs < 3600 { + let mins = secs / 60; + let rem_secs = secs % 60; + if rem_secs == 0 { + format!("{}m", mins) + } else { + format!("{}m{}s", mins, rem_secs) + } + } else { + let hours = secs / 3600; + let mins = (secs % 3600) / 60; + if mins == 0 { + format!("{}h", hours) + } else { + format!("{}h{}m", hours, mins) + } + } +} + +/// Get retry phase name based on duration +fn get_retry_phase(duration: Duration) -> &'static str { + let secs = duration.as_secs(); + if secs < 5 * 60 { + "Aggressive" + } else if secs < 30 * 60 { + "Backoff" + } else { + "Steady" + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_duration() { + assert_eq!(format_duration(Duration::from_secs(5)), "5s"); + assert_eq!(format_duration(Duration::from_secs(60)), "1m"); + assert_eq!(format_duration(Duration::from_secs(65)), "1m5s"); + assert_eq!(format_duration(Duration::from_secs(135)), "2m15s"); + assert_eq!(format_duration(Duration::from_secs(3600)), "1h"); + assert_eq!(format_duration(Duration::from_secs(3660)), "1h1m"); + assert_eq!(format_duration(Duration::from_secs(5400)), "1h30m"); + } + + #[test] + fn test_get_retry_phase() { + assert_eq!(get_retry_phase(Duration::from_secs(30)), "Aggressive"); + assert_eq!(get_retry_phase(Duration::from_secs(299)), "Aggressive"); + assert_eq!(get_retry_phase(Duration::from_secs(300)), "Backoff"); + assert_eq!(get_retry_phase(Duration::from_secs(1000)), "Backoff"); + assert_eq!(get_retry_phase(Duration::from_secs(1799)), "Backoff"); + assert_eq!(get_retry_phase(Duration::from_secs(1800)), "Steady"); + assert_eq!(get_retry_phase(Duration::from_secs(5000)), "Steady"); + } + + #[test] + fn test_state_transitions() { + let mut health = ConnectionHealth::new("TestService"); + + // Start healthy + assert!(matches!(health.state, HealthState::Healthy)); + + // First failure -> Degraded + health.report_failure("test_op"); + assert!(matches!(health.state, HealthState::Degraded { .. })); + + // More failures -> Down + for _ in 0..10 { + health.report_failure("test_op"); + } + assert!(matches!(health.state, HealthState::Down)); + + // First success -> Immediately transitions to Healthy (clean recovery with no ongoing failures) + health.report_success(); + assert!(matches!(health.state, HealthState::Healthy)); + } + + #[test] + fn test_failure_rate() { + let mut health = ConnectionHealth::new("TestService"); + + health.report_success(); + health.report_failure("test"); + assert!((health.failure_rate() - 0.5).abs() < 0.01); + + health.report_success(); + assert!((health.failure_rate() - 0.33).abs() < 0.02); + } +} diff --git a/madara/crates/primitives/resilience/src/lib.rs b/madara/crates/primitives/resilience/src/lib.rs new file mode 100644 index 0000000000..a63b90ef4e --- /dev/null +++ b/madara/crates/primitives/resilience/src/lib.rs @@ -0,0 +1,42 @@ +/// Generic resilience primitives for connection health tracking and retry logic. +/// +/// This crate provides reusable components for building resilient external service +/// clients. It includes: +/// +/// - **Health Tracking**: Monitor connection health with state machine (Healthy → Degraded → Down) +/// - **Retry Strategy**: Phase-based retry with aggressive, backoff, and steady phases +/// - **Adaptive Logging**: Prevent log spam during outages with throttled heartbeats +/// +/// # Example: Gateway Client +/// +/// ```rust,ignore +/// use mp_resilience::{ConnectionHealth, RetryState, RetryConfig, start_health_monitor}; +/// use std::sync::Arc; +/// use tokio::sync::RwLock; +/// +/// let health = Arc::new(RwLock::new(ConnectionHealth::new("Gateway"))); +/// start_health_monitor(Arc::clone(&health)); +/// +/// let mut retry_state = RetryState::new(RetryConfig::default()); +/// +/// loop { +/// match make_request().await { +/// Ok(result) => { +/// health.write().await.report_success(); +/// return Ok(result); +/// } +/// Err(e) => { +/// health.write().await.report_failure("request"); +/// retry_state.increment_retry(); +/// let delay = retry_state.next_delay(); +/// tokio::time::sleep(delay).await; +/// } +/// } +/// } +/// ``` +pub mod health; +pub mod retry; + +// Re-export main types for convenience +pub use health::{start_health_monitor, ConnectionHealth, HealthState}; +pub use retry::{RetryConfig, RetryPhase, RetryState}; diff --git a/madara/crates/primitives/resilience/src/retry.rs b/madara/crates/primitives/resilience/src/retry.rs new file mode 100644 index 0000000000..feb4f54bf4 --- /dev/null +++ b/madara/crates/primitives/resilience/src/retry.rs @@ -0,0 +1,228 @@ +/// Hybrid retry strategy with phase-based backoff for external service connections. +/// +/// This module implements a sophisticated retry mechanism that adapts to different +/// failure scenarios: +/// +/// - **Phase 1 (Aggressive)**: 0-5 minutes - Quick recovery for temporary blips (2s intervals) +/// - **Phase 2 (Backoff)**: 5-30 minutes - Exponential backoff for prolonged outages (5s → 60s) +/// - **Phase 3 (Steady)**: 30+ minutes - Fixed polling for extended maintenance (60s intervals) +/// +/// # Design Trade-offs +/// +/// - **Infinite retry by default**: Full nodes MUST sync eventually, so we never give up on external services +/// - **Phase-based backoff**: Balances fast recovery during brief outages vs. resource efficiency during extended downtime +/// - **No jitter**: Simplified implementation, acceptable for single-instance full nodes where thundering herd isn't a concern +/// - **Separate retry states**: Different operations (RPC calls, stream creation, event processing) maintain independent retry contexts +/// +/// # Performance Characteristics +/// +/// - **Phase 1 (0-5min)**: ~150 retry attempts (2s interval) +/// - **Phase 2 (5-30min)**: ~25 retry attempts (exponential: 5s → 60s) +/// - **Phase 3 (30min+)**: 1 retry per minute indefinitely +/// +/// Total attempts in first hour: ~150 + 25 + 30 = ~205 attempts +/// +/// # Usage Example +/// +/// ```rust,ignore +/// use mp_resilience::{RetryConfig, RetryState}; +/// +/// let config = RetryConfig::default(); +/// let mut state = RetryState::new(config); +/// +/// loop { +/// match risky_operation().await { +/// Ok(result) => return Ok(result), +/// Err(e) => { +/// let retry_count = state.increment_retry(); +/// if state.should_log() { +/// tracing::warn!("Operation failed (attempt {}): {}", retry_count, e); +/// } +/// tokio::time::sleep(state.next_delay()).await; +/// } +/// } +/// } +/// ``` +use std::time::{Duration, Instant}; + +/// Configuration for the hybrid retry strategy +#[derive(Debug, Clone)] +pub struct RetryConfig { + /// Duration of Phase 1 (aggressive retry phase) + pub phase1_duration: Duration, + /// Retry interval during Phase 1 + pub phase1_interval: Duration, + /// Minimum delay for Phase 2 exponential backoff + pub phase2_min_delay: Duration, + /// Maximum backoff delay (cap for exponential growth) + pub max_backoff: Duration, + /// Interval for logging warnings during retries + pub log_interval: Duration, + /// Whether to enable infinite retries (true for full nodes) + pub infinite_retry: bool, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + phase1_duration: Duration::from_secs(5 * 60), // 5 minutes + phase1_interval: Duration::from_secs(2), // 2 seconds + phase2_min_delay: Duration::from_secs(5), // 5 seconds + max_backoff: Duration::from_secs(60), // 60 seconds (1 minute) + log_interval: Duration::from_secs(10), // Log every 10 seconds + infinite_retry: true, // Full nodes should retry indefinitely + } + } +} + +/// Represents the current phase of the retry strategy +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RetryPhase { + /// Phase 1: Aggressive retry (0-5 minutes) + Aggressive, + /// Phase 2: Exponential backoff (5-30 minutes) + Backoff, + /// Phase 3: Steady state polling (30+ minutes) + Steady, +} + +/// State tracker for retry attempts +pub struct RetryState { + config: RetryConfig, + start_time: Instant, + retry_count: usize, + last_log_time: Option, +} + +impl std::fmt::Debug for RetryState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RetryState").field("config", &self.config).field("retry_count", &self.retry_count).finish() + } +} + +impl RetryState { + pub fn new(config: RetryConfig) -> Self { + Self { + config, + start_time: Instant::now(), + retry_count: 0, + last_log_time: None, + } + } + + /// Determine current retry phase based on elapsed time + pub fn current_phase(&self) -> RetryPhase { + let elapsed = self.start_time.elapsed(); + + if elapsed < self.config.phase1_duration { + RetryPhase::Aggressive + } else if elapsed < Duration::from_secs(30 * 60) { + // 30 minutes total (Phase 1 + Phase 2) + RetryPhase::Backoff + } else { + RetryPhase::Steady + } + } + + /// Calculate delay for next retry based on current phase + pub fn next_delay(&self) -> Duration { + match self.current_phase() { + RetryPhase::Aggressive => self.config.phase1_interval, + RetryPhase::Backoff => { + // Exponential backoff: 5s * 2^retry_count (cap exponent at 5 to prevent overflow) + let exponent = self.retry_count.saturating_sub(1).min(5) as u32; + let exponential_delay = self.config.phase2_min_delay.saturating_mul(2_u32.saturating_pow(exponent)); + exponential_delay.min(self.config.max_backoff) + } + RetryPhase::Steady => self.config.max_backoff, + } + } + + /// Check if we should log this retry attempt (throttled logging) + pub fn should_log(&mut self) -> bool { + match self.last_log_time { + None => { + // First log - always allow + self.last_log_time = Some(Instant::now()); + true + } + Some(last_log) => { + if last_log.elapsed() >= self.config.log_interval { + self.last_log_time = Some(Instant::now()); + true + } else { + false + } + } + } + } + + /// Increment retry counter and return current count + pub fn increment_retry(&mut self) -> usize { + self.retry_count += 1; + self.retry_count + } + + /// Get current retry count + pub fn get_retry_count(&self) -> usize { + self.retry_count + } + + /// Get elapsed time since first retry + pub fn elapsed(&self) -> Duration { + self.start_time.elapsed() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = RetryConfig::default(); + assert_eq!(config.phase1_duration, Duration::from_secs(5 * 60)); + assert_eq!(config.phase1_interval, Duration::from_secs(2)); + assert_eq!(config.max_backoff, Duration::from_secs(60)); + assert!(config.infinite_retry); + } + + #[test] + fn test_phase_determination() { + let config = RetryConfig { phase1_duration: Duration::from_secs(10), ..Default::default() }; + let state = RetryState::new(config); + + // Should start in aggressive phase + assert_eq!(state.current_phase(), RetryPhase::Aggressive); + } + + #[test] + fn test_retry_count() { + let mut state = RetryState::new(RetryConfig::default()); + assert_eq!(state.get_retry_count(), 0); + + assert_eq!(state.increment_retry(), 1); + assert_eq!(state.increment_retry(), 2); + assert_eq!(state.get_retry_count(), 2); + } + + #[tokio::test] + async fn test_log_throttling() { + let config = RetryConfig { log_interval: Duration::from_millis(100), ..Default::default() }; + let mut state = RetryState::new(config); + + // First log should always be allowed + assert!(state.should_log()); + + // Immediate second log should be throttled + assert!(!state.should_log()); + + // Wait for less than log_interval + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(!state.should_log(), "Still should be throttled"); + + // Wait for the rest of the interval + tokio::time::sleep(Duration::from_millis(60)).await; + assert!(state.should_log(), "Should log after interval"); + } +} diff --git a/madara/node/Cargo.toml b/madara/node/Cargo.toml index 06705fb638..dc464045be 100644 --- a/madara/node/Cargo.toml +++ b/madara/node/Cargo.toml @@ -38,6 +38,7 @@ mp-block = { workspace = true } mp-chain-config = { workspace = true } mp-class = { workspace = true } mp-oracle = { workspace = true } +mp-resilience = { workspace = true } mp-rpc = { workspace = true } mp-transactions = { workspace = true } mp-utils = { workspace = true } diff --git a/madara/node/src/cli/l2.rs b/madara/node/src/cli/l2.rs index d4743d0fa2..20bfded448 100644 --- a/madara/node/src/cli/l2.rs +++ b/madara/node/src/cli/l2.rs @@ -1,7 +1,7 @@ use anyhow::Context; use http::HeaderName; use http::HeaderValue; -use mc_gateway_client::GatewayProvider; +use mc_gateway_client::{start_gateway_health_monitor, GatewayProvider}; use mp_chain_config::ChainConfig; use mp_utils::parsers::parse_url; use serde::{Deserialize, Serialize}; @@ -114,6 +114,9 @@ impl L2SyncParams { ) } + // Start the gateway health monitor (runs in background) + start_gateway_health_monitor(client.health()); + Ok(Arc::new(client)) } } diff --git a/madara/node/src/main.rs b/madara/node/src/main.rs index 7f525f71e2..9530a22f86 100644 --- a/madara/node/src/main.rs +++ b/madara/node/src/main.rs @@ -262,6 +262,8 @@ async fn main() -> anyhow::Result<()> { tracing::info!("💾 Preconfirmed blocks will be saved to database"); } + tracing::info!("running latest code #2"); + let backend = MadaraBackend::open_rocksdb( &run_cmd.backend_params.base_path, chain_config.clone(), diff --git a/madara/node/src/service/l1.rs b/madara/node/src/service/l1.rs index ace7057e4f..4a8bea1e4b 100644 --- a/madara/node/src/service/l1.rs +++ b/madara/node/src/service/l1.rs @@ -85,6 +85,10 @@ impl L1SyncService { } }; + // Start the health monitor for L1 endpoint + mp_resilience::start_health_monitor(client.provider_health()); + tracing::info!("L1 health monitor started"); + if !gas_provider_config.all_is_fixed() { tracing::info!("⏳ Getting initial L1 gas prices"); // Gas prices are needed before starting the block producer