Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cumulus/client: added external rpc connection retry logic #5515

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cumulus/client/relay-chain-rpc-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[lints]
workspace = true

[dev-dependencies]
portpicker = "0.1.1"

[dependencies]
polkadot-overseer = { workspace = true, default-features = true }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use jsonrpsee::{
use sc_rpc_api::chain::ChainApiClient;
use schnellru::{ByLength, LruMap};
use sp_runtime::generic::SignedBlock;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{
channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender,
};
Expand All @@ -43,6 +43,9 @@ use url::Url;
use crate::rpc_client::{distribute_header, RpcDispatcherMessage};

const LOG_TARGET: &str = "reconnecting-websocket-client";
const DEFAULT_EXTERNAL_RPC_CONN_RETRIES: usize = 5;
const DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES: u64 = 1000;
const DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES: i32 = 2;

/// Worker that should be used in combination with [`RelayChainRpcClient`].
///
Expand Down Expand Up @@ -93,16 +96,45 @@ struct RelayChainSubscriptions {
best_subscription: Subscription<RelayHeader>,
}

/// Try to find a new RPC server to connect to.
/// Try to find a new RPC server to connect to. Uses a naive retry
/// logic that does an exponential backoff in between iterations
/// through all URLs from the list. It uses a constant to tell how
/// many iterations of connection attempts to all URLs we allow. We
/// return early when a connection is made.
async fn connect_next_available_rpc_server(
urls: &Vec<String>,
starting_position: usize,
) -> Result<(usize, Arc<JsonRpcClient>), ()> {
tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server.");
for (counter, url) in urls.iter().cycle().skip(starting_position).take(urls.len()).enumerate() {

let mut prev_iteration: u32 = 0;
for (counter, url) in urls
.iter()
.cycle()
.skip(starting_position)
.take(urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES)
.enumerate()
{
// If we reached the end of the urls list, backoff before retrying
// connections to the entire list once more.
let Ok(current_iteration) = (counter / urls.len()).try_into() else {
tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting...");
Copy link
Contributor

@michalkucharczyk michalkucharczyk Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting...");
tracing::error!(target: LOG_TARGET, "Too many failed connection attempts to the RPC servers, aborting...");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, will this error be ever printed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need extra check to print the error if loop concluded w/o actual connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, will this error be ever printed?

In practice it shouldn't be printed ever, but if we'll ever see the log then something's weird (either we're iterating for too many times for the retry logic or memory gets corrupted at runtime).

I think we need extra check to print the error if loop concluded w/o actual connection.

You mean outside of the loop if not concluding with a connection? I added a log here: d47a965. Let me know if that's what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when talking about too many attempts we actually talk about ~ 2^64 too many? Does it make sense to impose such limits? I.e., how much time should pass to actually hit it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never hit this branch since this will loop for a maximum urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES times.

break;
};
if current_iteration > prev_iteration {
// Safe conversion given we convert positive i32s which are lower than u64::MAX.
tokio::time::sleep(Duration::from_millis(
DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES *
DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES.pow(prev_iteration) as u64,
))
.await;
prev_iteration = current_iteration;
}

let index = (starting_position + counter) % urls.len();
tracing::info!(
target: LOG_TARGET,
current_iteration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: From users perspective we should probably just print attempt here and summarize index and iteration.

index,
url,
"Trying to connect to next external relaychain node.",
Expand All @@ -112,6 +144,8 @@ async fn connect_next_available_rpc_server(
Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."),
};
}

tracing::error!(target: LOG_TARGET, "Retrying to connect to any external relaychain node failed.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we display list of nodes maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's individually displayed with each iteration for each url. I think we should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming info level is enabled which may not be a case for defaults. Anyway it is minor.

Err(())
}

Expand Down Expand Up @@ -431,9 +465,14 @@ impl ReconnectingWebsocketWorker {

#[cfg(test)]
mod test {
use super::url_to_string_with_port;
use std::time::Duration;

use super::{url_to_string_with_port, ClientManager};
use jsonrpsee::Methods;
use url::Url;

const SERVER_STARTUP_DELAY_SECONDS: u64 = 10;

#[test]
fn url_to_string_works() {
let url = Url::parse("wss://something/path").unwrap();
Expand All @@ -460,4 +499,29 @@ mod test {
url_to_string_with_port(url)
);
}

#[tokio::test]
// Testing the retry logic at full means increasing CI with half a minute according
// to the current logic, so lets test it best effort.
async fn client_manager_retry_logic() {
let port = portpicker::pick_unused_port().unwrap();
let server = jsonrpsee::server::Server::builder()
.build(format!("0.0.0.0:{}", port))
.await
.unwrap();

// Start the server.
let server = tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(SERVER_STARTUP_DELAY_SECONDS)).await;
server.start(Methods::default())
});

// Start the client. Not exitting right away with an error means it
// is handling gracefully received connections refused while the server
// is starting.
let res = ClientManager::new(vec![format!("ws://127.0.0.1:{}", port)]).await;
assert!(res.is_ok());

server.await.unwrap();
}
}
15 changes: 15 additions & 0 deletions prdoc/pr_5515.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Add retry logic in relay chain rpc interface

doc:
- audience: [ Node Dev, Node Operator ]
description: |
Added a basic retry logic for collators connecting to external RPC servers. The collator
will try for 5 times to connect to each RPC server from the provided list. In between
each iteration will wait a duration which will increase exponentailly by a factor of two.
The maximum time a collator can spend in the retry logic is 1 + 2 + 4 + 8 + 16 = 31 seconds.
crates:
- name: cumulus-relay-chain-rpc-interface
bump: minor
Loading