diff --git a/Cargo.lock b/Cargo.lock index 0d41be5c9bd1..e37b79cc58e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4457,6 +4457,7 @@ dependencies = [ "parity-scale-codec", "pin-project", "polkadot-overseer", + "portpicker", "rand", "sc-client-api", "sc-rpc-api", diff --git a/cumulus/client/relay-chain-rpc-interface/Cargo.toml b/cumulus/client/relay-chain-rpc-interface/Cargo.toml index 6c0730a56a26..c2deddc5341d 100644 --- a/cumulus/client/relay-chain-rpc-interface/Cargo.toml +++ b/cumulus/client/relay-chain-rpc-interface/Cargo.toml @@ -9,6 +9,9 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0" [lints] workspace = true +[dev-dependencies] +portpicker = "0.1.1" + [dependencies] polkadot-overseer = { workspace = true, default-features = true } diff --git a/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs b/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs index 48d35dd3a55e..dc0e9d697b46 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs @@ -34,7 +34,7 @@ use jsonrpsee::{ use sc_rpc_api::chain::ChainApiClient; use schnellru::{ByLength, LruMap}; use sp_runtime::generic::SignedBlock; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc::{ channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender, }; @@ -43,6 +43,9 @@ use url::Url; use crate::rpc_client::{distribute_header, RpcDispatcherMessage}; const LOG_TARGET: &str = "reconnecting-websocket-client"; +const DEFAULT_EXTERNAL_RPC_CONN_RETRIES: usize = 5; +const DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES: u64 = 1000; +const DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES: i32 = 2; /// Worker that should be used in combination with [`RelayChainRpcClient`]. /// @@ -93,16 +96,45 @@ struct RelayChainSubscriptions { best_subscription: Subscription, } -/// Try to find a new RPC server to connect to. +/// Try to find a new RPC server to connect to. Uses a naive retry +/// logic that does an exponential backoff in between iterations +/// through all URLs from the list. It uses a constant to tell how +/// many iterations of connection attempts to all URLs we allow. We +/// return early when a connection is made. async fn connect_next_available_rpc_server( urls: &Vec, starting_position: usize, ) -> Result<(usize, Arc), ()> { tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server."); - for (counter, url) in urls.iter().cycle().skip(starting_position).take(urls.len()).enumerate() { + + let mut prev_iteration: u32 = 0; + for (counter, url) in urls + .iter() + .cycle() + .skip(starting_position) + .take(urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES) + .enumerate() + { + // If we reached the end of the urls list, backoff before retrying + // connections to the entire list once more. + let Ok(current_iteration) = (counter / urls.len()).try_into() else { + tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting..."); + break; + }; + if current_iteration > prev_iteration { + // Safe conversion given we convert positive i32s which are lower than u64::MAX. + tokio::time::sleep(Duration::from_millis( + DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES * + DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES.pow(prev_iteration) as u64, + )) + .await; + prev_iteration = current_iteration; + } + let index = (starting_position + counter) % urls.len(); tracing::info!( target: LOG_TARGET, + attempt = current_iteration, index, url, "Trying to connect to next external relaychain node.", @@ -112,6 +144,8 @@ async fn connect_next_available_rpc_server( Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."), }; } + + tracing::error!(target: LOG_TARGET, "Retrying to connect to any external relaychain node failed."); Err(()) } @@ -431,9 +465,14 @@ impl ReconnectingWebsocketWorker { #[cfg(test)] mod test { - use super::url_to_string_with_port; + use std::time::Duration; + + use super::{url_to_string_with_port, ClientManager}; + use jsonrpsee::Methods; use url::Url; + const SERVER_STARTUP_DELAY_SECONDS: u64 = 10; + #[test] fn url_to_string_works() { let url = Url::parse("wss://something/path").unwrap(); @@ -460,4 +499,29 @@ mod test { url_to_string_with_port(url) ); } + + #[tokio::test] + // Testing the retry logic at full means increasing CI with half a minute according + // to the current logic, so lets test it best effort. + async fn client_manager_retry_logic() { + let port = portpicker::pick_unused_port().unwrap(); + let server = jsonrpsee::server::Server::builder() + .build(format!("0.0.0.0:{}", port)) + .await + .unwrap(); + + // Start the server. + let server = tokio::spawn(async { + tokio::time::sleep(Duration::from_secs(SERVER_STARTUP_DELAY_SECONDS)).await; + server.start(Methods::default()) + }); + + // Start the client. Not exitting right away with an error means it + // is handling gracefully received connections refused while the server + // is starting. + let res = ClientManager::new(vec![format!("ws://127.0.0.1:{}", port)]).await; + assert!(res.is_ok()); + + server.await.unwrap(); + } } diff --git a/prdoc/pr_5515.prdoc b/prdoc/pr_5515.prdoc new file mode 100644 index 000000000000..60f43b922c7f --- /dev/null +++ b/prdoc/pr_5515.prdoc @@ -0,0 +1,15 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Add retry logic in relay chain rpc interface + +doc: + - audience: [ Node Dev, Node Operator ] + description: | + Added a basic retry logic for collators connecting to external RPC servers. The collator + will try for 5 times to connect to each RPC server from the provided list. In between + each iteration will wait a duration which will increase exponentailly by a factor of two. + The maximum time a collator can spend in the retry logic is 1 + 2 + 4 + 8 + 16 = 31 seconds. +crates: + - name: cumulus-relay-chain-rpc-interface + bump: minor