Skip to content

Commit 6cdcf40

Browse files
authored
fix(backend:rpc): reconnect on connection error (#538)
* fix(rpc): reconnect on connection error * fix(health): return error status on unhealthy * fix: undo side-effect on rpc connection check * fix: into_response
1 parent bcaf5a2 commit 6cdcf40

File tree

7 files changed

+92
-42
lines changed

7 files changed

+92
-42
lines changed

backend/lib/src/api/handlers/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ pub async fn value_props(State(services): State<Services>) -> Result<impl IntoRe
3232

3333
pub async fn msp_health(State(services): State<Services>) -> Result<impl IntoResponse, Error> {
3434
debug!("GET health check");
35-
let response = services.health.check_health().await;
36-
Ok(Json(response))
35+
Ok(services.health.check_health().await)
3736
}
3837

3938
// ==================== Payment Handler ====================

backend/lib/src/data/rpc/client.rs

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,22 @@ impl StorageHubRpcClient {
2727
self.connection.is_connected().await
2828
}
2929

30+
/// Attempts to reconnect if the connection isn't connected
31+
async fn ensure_connected(&self) {
32+
if !self.is_connected().await {
33+
// TODO: More robust reconnection mechanism, like we do for the original connection
34+
_ = self.connection.reconnect().await;
35+
}
36+
}
37+
3038
/// Call a JSON-RPC method on the connected node
3139
pub async fn call<P, R>(&self, method: &str, params: P) -> RpcResult<R>
3240
where
3341
P: ToRpcParams + Send,
3442
R: DeserializeOwned,
3543
{
44+
self.ensure_connected().await;
45+
3646
self.connection.call(method, params).await
3747
}
3848

@@ -41,6 +51,8 @@ impl StorageHubRpcClient {
4151
where
4252
R: DeserializeOwned,
4353
{
54+
self.ensure_connected().await;
55+
4456
self.connection.call_no_params(method).await
4557
}
4658

@@ -54,15 +66,14 @@ impl StorageHubRpcClient {
5466
pub async fn get_current_price_per_giga_unit_per_tick(&self) -> RpcResult<u128> {
5567
debug!(target: "rpc::client::get_current_price_per_giga_unit_per_tick", "RPC call: get_current_price_per_giga_unit_per_tick");
5668

57-
self.connection.call_no_params(methods::CURRENT_PRICE).await
69+
self.call_no_params(methods::CURRENT_PRICE).await
5870
}
5971

6072
/// Returns whether the given `file_key` is expected to be received by the MSP node
6173
pub async fn is_file_key_expected(&self, file_key: &str) -> RpcResult<bool> {
6274
debug!(target: "rpc::client::is_file_key_expected", file_key = %file_key, "RPC call: is_file_key_expected");
6375

64-
self.connection
65-
.call(methods::FILE_KEY_EXPECTED, jsonrpsee::rpc_params![file_key])
76+
self.call(methods::FILE_KEY_EXPECTED, jsonrpsee::rpc_params![file_key])
6677
.await
6778
}
6879

@@ -77,12 +88,11 @@ impl StorageHubRpcClient {
7788
&self,
7889
file_key: &str,
7990
) -> RpcResult<GetFileFromFileStorageResult> {
80-
self.connection
81-
.call(
82-
methods::IS_FILE_IN_FILE_STORAGE,
83-
jsonrpsee::rpc_params![file_key],
84-
)
85-
.await
91+
self.call(
92+
methods::IS_FILE_IN_FILE_STORAGE,
93+
jsonrpsee::rpc_params![file_key],
94+
)
95+
.await
8696
}
8797

8898
/// Request the MSP node to export the given `file_key` to the given URL
@@ -94,12 +104,11 @@ impl StorageHubRpcClient {
94104
"RPC call: save_file_to_disk"
95105
);
96106

97-
self.connection
98-
.call(
99-
methods::SAVE_FILE_TO_DISK,
100-
jsonrpsee::rpc_params![file_key, url],
101-
)
102-
.await
107+
self.call(
108+
methods::SAVE_FILE_TO_DISK,
109+
jsonrpsee::rpc_params![file_key, url],
110+
)
111+
.await
103112
}
104113

105114
/// Request the MSP to accept a FileKeyProof (`proof`) for the given `file_key`
@@ -111,33 +120,32 @@ impl StorageHubRpcClient {
111120
"RPC call: receive_file_chunks"
112121
);
113122

114-
self.connection
115-
.call(
116-
methods::RECEIVE_FILE_CHUNKS,
117-
jsonrpsee::rpc_params![file_key, proof],
118-
)
119-
.await
123+
self.call(
124+
methods::RECEIVE_FILE_CHUNKS,
125+
jsonrpsee::rpc_params![file_key, proof],
126+
)
127+
.await
120128
}
121129

122130
/// Retrieve the Onchain Provider ID of the MSP Node (therefore the MSP ID)
123131
pub async fn get_provider_id(&self) -> RpcResult<RpcProviderId> {
124132
debug!(target: "rpc::client::get_provider_id", "RPC call: get_provider_id");
125133

126-
self.connection.call_no_params(methods::PROVIDER_ID).await
134+
self.call_no_params(methods::PROVIDER_ID).await
127135
}
128136

129137
/// Retrieve the list of value propositions of the MSP Node
130138
pub async fn get_value_props(&self) -> RpcResult<GetValuePropositionsResult> {
131139
debug!(target: "rpc::client::get_value_props", "RPC call: get_value_props");
132140

133-
self.connection.call_no_params(methods::VALUE_PROPS).await
141+
self.call_no_params(methods::VALUE_PROPS).await
134142
}
135143

136144
/// Retrieve the list of multiaddresses associated with the MSP Node
137145
pub async fn get_multiaddresses(&self) -> RpcResult<Vec<String>> {
138146
debug!(target: "rpc::client::get_multiaddresses", "RPC call: get_multiaddresses");
139147

140-
self.connection.call_no_params(methods::PEER_IDS).await
148+
self.call_no_params(methods::PEER_IDS).await
141149
}
142150
}
143151

@@ -175,6 +183,27 @@ mod tests {
175183
StorageHubRpcClient::new(connection)
176184
}
177185

186+
#[tokio::test]
187+
async fn reconnect_automatically() {
188+
let conn = MockConnection::new();
189+
conn.disconnect().await;
190+
let conn = Arc::new(AnyRpcConnection::Mock(conn));
191+
let client = StorageHubRpcClient::new(conn);
192+
193+
assert!(
194+
!client.is_connected().await,
195+
"Should not be connected initially"
196+
);
197+
198+
let result = client.get_provider_id().await;
199+
assert!(
200+
result.is_ok(),
201+
"Should reconnect and be able to retrieve provider id"
202+
);
203+
204+
assert!(client.is_connected().await, "Should be connected now");
205+
}
206+
178207
#[tokio::test]
179208
async fn get_current_price_per_unit_per_tick() {
180209
let client = mock_rpc();

backend/lib/src/data/rpc/connection/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@ impl RpcConnection for AnyRpcConnection {
7171
}
7272
}
7373

74-
async fn close(&self) -> RpcResult<()> {
74+
async fn reconnect(&self) -> RpcResult<()> {
7575
match self {
76-
AnyRpcConnection::Real(conn) => conn.close().await,
76+
AnyRpcConnection::Real(conn) => conn.reconnect().await,
7777
#[cfg(feature = "mocks")]
78-
AnyRpcConnection::Mock(conn) => conn.close().await,
78+
AnyRpcConnection::Mock(conn) => conn.reconnect().await,
7979
}
8080
}
8181
}

backend/lib/src/data/rpc/mock_connection.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl MockConnection {
102102
}
103103

104104
/// Simulate reconnection
105-
pub async fn reconnect(&self) {
105+
pub async fn connect(&self) {
106106
let mut connected = self.connected.write().await;
107107
*connected = true;
108108
}
@@ -291,8 +291,8 @@ impl RpcConnection for MockConnection {
291291
*connected
292292
}
293293

294-
async fn close(&self) -> RpcResult<()> {
295-
self.disconnect().await;
294+
async fn reconnect(&self) -> RpcResult<()> {
295+
self.connect().await;
296296
Ok(())
297297
}
298298
}
@@ -325,8 +325,8 @@ mod tests {
325325
// Test connection status
326326
assert!(conn.is_connected().await);
327327

328-
// Test close
329-
conn.close().await.unwrap();
328+
// Test disconnection
329+
conn.disconnect().await;
330330
assert!(!conn.is_connected().await);
331331
}
332332

@@ -434,9 +434,6 @@ mod tests {
434434
async fn test_connection_disconnect_reconnect() {
435435
let conn = MockConnection::new();
436436

437-
// Initially connected
438-
assert!(conn.is_connected().await);
439-
440437
// Disconnect
441438
conn.disconnect().await;
442439
assert!(!conn.is_connected().await);
@@ -446,7 +443,7 @@ mod tests {
446443
assert!(matches!(result, Err(RpcConnectionError::ConnectionClosed)));
447444

448445
// Reconnect
449-
conn.reconnect().await;
446+
conn.connect().await;
450447
assert!(conn.is_connected().await);
451448

452449
// Call should work now

backend/lib/src/data/rpc/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,6 @@ pub trait RpcConnection: Send + Sync {
4343
/// Check if the connection is currently active
4444
async fn is_connected(&self) -> bool;
4545

46-
/// Close the connection gracefully
47-
async fn close(&self) -> RpcResult<()>;
46+
/// Create a new instance of the connection, closing the current one gracefully
47+
async fn reconnect(&self) -> RpcResult<()>;
4848
}

backend/lib/src/data/rpc/ws_connection.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,10 @@ impl RpcConnection for WsConnection {
128128
}
129129
}
130130

131-
async fn close(&self) -> RpcResult<()> {
132-
self.client.write().await.take();
131+
async fn reconnect(&self) -> RpcResult<()> {
132+
let new_client = Self::build_client(&self.config).await?;
133133

134+
self.client.write().await.replace(Arc::new(new_client));
134135
Ok(())
135136
}
136137
}

backend/lib/src/services/health.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44
55
use std::sync::Arc;
66

7+
use axum::{
8+
body::Body,
9+
http::StatusCode,
10+
response::{IntoResponse, Response},
11+
Json,
12+
};
713
use serde::Serialize;
814
use shc_rpc::RpcProviderId;
915
use tracing::{debug, error};
@@ -18,6 +24,24 @@ pub struct DetailedHealthStatus {
1824
pub components: HealthComponents,
1925
}
2026

27+
impl DetailedHealthStatus {
28+
pub fn is_healthy(&self) -> bool {
29+
self.status == HealthService::HEALTHY
30+
}
31+
}
32+
33+
impl IntoResponse for DetailedHealthStatus {
34+
fn into_response(self) -> Response<Body> {
35+
let status = if self.is_healthy() {
36+
StatusCode::OK
37+
} else {
38+
StatusCode::SERVICE_UNAVAILABLE
39+
};
40+
41+
(status, Json(self)).into_response()
42+
}
43+
}
44+
2145
#[derive(Serialize)]
2246
pub struct HealthComponents {
2347
pub storage: ComponentHealth,

0 commit comments

Comments
 (0)