Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cumulus/client: added external rpc connection retry logic #5515

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cumulus/client/relay-chain-rpc-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[lints]
workspace = true

[dev-dependencies]
portpicker = "0.1.1"

[dependencies]
polkadot-overseer = { workspace = true, default-features = true }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use jsonrpsee::{
use sc_rpc_api::chain::ChainApiClient;
use schnellru::{ByLength, LruMap};
use sp_runtime::generic::SignedBlock;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{
channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender,
};
Expand All @@ -43,6 +43,9 @@ use url::Url;
use crate::rpc_client::{distribute_header, RpcDispatcherMessage};

const LOG_TARGET: &str = "reconnecting-websocket-client";
const DEFAULT_EXTERNAL_RPC_CONN_RETRIES: usize = 5;
const DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES: u64 = 1000;
const DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES: i32 = 2;

/// Worker that should be used in combination with [`RelayChainRpcClient`].
///
Expand Down Expand Up @@ -93,16 +96,45 @@ struct RelayChainSubscriptions {
best_subscription: Subscription<RelayHeader>,
}

/// Try to find a new RPC server to connect to.
/// Try to find a new RPC server to connect to. Uses a naive retry
/// logic that does an exponential backoff in between iterations
/// through all URLs from the list. It uses a constant to tell how
/// many iterations of connection attempts to all URLs we allow. We
/// return early when a connection is made.
async fn connect_next_available_rpc_server(
urls: &Vec<String>,
starting_position: usize,
) -> Result<(usize, Arc<JsonRpcClient>), ()> {
tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server.");
for (counter, url) in urls.iter().cycle().skip(starting_position).take(urls.len()).enumerate() {

let mut prev_iteration: u32 = 0;
for (counter, url) in urls
.iter()
.cycle()
.skip(starting_position)
.take(urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES)
.enumerate()
{
// If we reached the end of the urls list, backoff before retrying
// connections to the entire list once more.
let Ok(current_iteration) = (counter / urls.len()).try_into() else {
tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting...");
Copy link
Contributor

@michalkucharczyk michalkucharczyk Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting...");
tracing::error!(target: LOG_TARGET, "Too many failed connection attempts to the RPC servers, aborting...");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, will this error be ever printed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need extra check to print the error if loop concluded w/o actual connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, will this error be ever printed?

In practice it shouldn't be printed ever, but if we'll ever see the log then something's weird (either we're iterating for too many times for the retry logic or memory gets corrupted at runtime).

I think we need extra check to print the error if loop concluded w/o actual connection.

You mean outside of the loop if not concluding with a connection? I added a log here: d47a965. Let me know if that's what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when talking about too many attempts we actually talk about ~ 2^64 too many? Does it make sense to impose such limits? I.e., how much time should pass to actually hit it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never hit this branch since this will loop for a maximum urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES times.

break;
};
if current_iteration > prev_iteration {
// Safe conversion given we convert positive i32s which are lower than u64::MAX.
tokio::time::sleep(Duration::from_millis(
DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES *
DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES.pow(prev_iteration) as u64,
))
.await;
prev_iteration = current_iteration;
}

let index = (starting_position + counter) % urls.len();
tracing::info!(
target: LOG_TARGET,
current_iteration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: From users perspective we should probably just print attempt here and summarize index and iteration.

index,
url,
"Trying to connect to next external relaychain node.",
Expand All @@ -112,6 +144,7 @@ async fn connect_next_available_rpc_server(
Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."),
};
}

Err(())
}

Expand Down Expand Up @@ -431,7 +464,10 @@ impl ReconnectingWebsocketWorker {

#[cfg(test)]
mod test {
use super::url_to_string_with_port;
use std::time::Duration;

use super::{url_to_string_with_port, ClientManager};
use jsonrpsee::Methods;
use url::Url;

#[test]
Expand Down Expand Up @@ -460,4 +496,48 @@ mod test {
url_to_string_with_port(url)
);
}

#[tokio::test]
// Testing the retry logic at full means increasing CI with half a minute according
// to the current logic, so lets test it best effort.
async fn client_manager_retry_logic() {
let port = portpicker::pick_unused_port().unwrap();
let server = jsonrpsee::server::Server::builder()
.build(format!("0.0.0.0:{}", port))
.await
.unwrap();

// Wait three seconds while attempting connection.
let conn_res = tokio::spawn(async move {
tokio::time::timeout(
Duration::from_secs(3),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using sleep in tests has high potential to make it flaky. I don't understand why we wait here 3 seconds, only to expect the obvious, there is no server up to connect to. What can happen is that the server might actually start if CI is overloaded and this will fail.

For testing the logic here I think there are other ways to do it. I mean you just want to see it trying multiple servers in the list and increasing the back off. This stuff is not actually tested here at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't seen this. It came a bit earlier than this: 71e44b4. I agree with your points.

I don't understand why we wait here 3 seconds, only to expect the obvious, there is no server up to connect to.

The idea of the wait is that the ClientManager::new call doesn't exit immediately as it should've (because of the connection refused error), but it exercises the retry logic hoping the server would be up. The error we're asserting against is the Elapsed error which is triggered because the ClientManager::new call timed out - which means retry logic worked behind the scenes.

I think though we can simplify the test by starting the server in a tokio task with a 10 second delay (10 because it is not too big/small for our usecase), and then create a client on the "main" task which exercises the retry logic until the server gets up and returns with an Ok result.

I mean you just want to see it trying multiple servers in the list and increasing the back off. This stuff is not actually tested here at all.

I think testing the backoff involves some other timers with timeouts given the logic doesn't produce testable side effects, which can add more sources of flakyness. I am open to suggestions if you see it differently. As a matter of fact I wasn't interested in testing the backoff, but the fact that the collators' relay chain interface tries a few more times to connect to the external RPCs at collator startup. I heard there weren't complaints about this in practice meaning the collators usually find available RPC nodes to connect to. This is more relevant to local testing on slow/overloaded machines, so testing anything other than a slight reliability improvement brings diminishing returns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a simpler version of the test here: ee68d04. Let me know if this changes anything!

ClientManager::new(vec![format!("ws://127.0.0.1:{}", port)]),
)
.await
});

// Start the server too.
let server = tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(10)).await;
server.start(Methods::default())
});

// By this time the client can not make a connection because the server is not up.
assert!(conn_res.await.unwrap().is_err());

// Trying to connect again to the RPC with a client that stays around for sufficient
// time to catche the RPC server online and connect to it.
iulianbarbu marked this conversation as resolved.
Show resolved Hide resolved
let conn_res = tokio::spawn(async move {
tokio::time::timeout(
Duration::from_secs(8),
Copy link
Contributor

@michalkucharczyk michalkucharczyk Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI tests are run on quite overloaded machines. 1s margin may not be enough, but let's see how it goes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a simpler version of the test here: ee68d04.

ClientManager::new(vec![format!("ws://127.0.0.1:{}", port)]),
)
.await
});
let res = conn_res.await.unwrap();
assert!(res.is_ok());
assert!(res.unwrap().is_ok());

server.await.unwrap();
}
}
15 changes: 15 additions & 0 deletions prdoc/pr_5515.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Add retry logic in relay chain rpc interface

doc:
- audience: [ Node Dev, Node Operator ]
description: |
Added a basic retry logic for collators connecting to external RPC servers. The collator
will try for 5 times to connect to each RPC server from the provided list. In between
each iteration will wait a duration which will increase exponentailly by a factor of two.
The maximum time a collator can spend in the retry logic is 1 + 2 + 4 + 8 + 16 = 31 seconds.
crates:
- name: cumulus-relay-chain-rpc-interface
bump: minor
Loading