Skip to content

Commit

Permalink
Fixed halting of the node in rare conditions (#1914)
Browse files Browse the repository at this point in the history
My node halted during synchronization with the dev cluster. According to
the logs, the importing of the blocks stopped because someone was still
processing the previous block. I've added a 20-second timeout on the
block processing to debug this issue more easily in the future.

During my node's graceful shutdown, the `PoA` service was waiting for
the child `PoASyncTask` to finish his work. The child process was locked
and captured the `ImportResult`, which caused the importer service to be
locked. I assume one of `DeadlineClock`'s await functions caused that. I
haven't looked into how exactly it could happen; I just replaced the
usage of the custom `DeadlineClock` with an `Interval` since it doesn't
require `await`. It is just a bad practice to call `await` inside of the
`tokio::select`.

Maybe, we need to remove `DeadlineClock` from the `PoA` service as well
later just to minimize `await` in the `tokio::select`. Or re-write it
without async functions.

### Before requesting review
- [x] I have reviewed the code myself
  • Loading branch information
xgreenx authored May 30, 2024
1 parent 8eeae5d commit acd8d19
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 24 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Removed
- [#1913](https://github.com/FuelLabs/fuel-core/pull/1913): Removed dead code from the project.

### Fixed
- [#1914](https://github.com/FuelLabs/fuel-core/pull/1914): Fixed halting of the node during synchronization in PoA service.

## [Version 0.27.0]

### Added
Expand Down
58 changes: 35 additions & 23 deletions crates/services/consensus_module/poa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ use std::{
};

use fuel_core_services::{
stream::BoxStream,
stream::{
BoxFuture,
BoxStream,
},
RunnableService,
RunnableTask,
StateWatcher,
};
use fuel_core_types::services::block_importer::BlockImportInfo;

use fuel_core_types::blockchain::header::BlockHeader;
use tokio::sync::watch;
use tokio_stream::StreamExt;

use crate::deadline_clock::{
DeadlineClock,
OnConflict,
use fuel_core_types::{
blockchain::header::BlockHeader,
services::block_importer::BlockImportInfo,
};
use tokio::{
sync::watch,
time::MissedTickBehavior,
};
use tokio_stream::StreamExt;

#[derive(Debug, Clone, PartialEq)]
pub enum SyncState {
Expand All @@ -42,14 +44,13 @@ impl SyncState {

pub struct SyncTask {
min_connected_reserved_peers: usize,
time_until_synced: Duration,
peer_connections_stream: BoxStream<usize>,
block_stream: BoxStream<BlockImportInfo>,
state_sender: watch::Sender<SyncState>,
// shared with `MainTask` via SyncTask::SharedState
state_receiver: watch::Receiver<SyncState>,
inner_state: InnerSyncState,
timer: DeadlineClock,
timer: Option<tokio::time::Interval>,
}

impl SyncTask {
Expand All @@ -65,7 +66,13 @@ impl SyncTask {
time_until_synced,
block_header.clone(),
);
let timer = DeadlineClock::new();
let timer = if time_until_synced == Duration::ZERO {
None
} else {
let mut timer = tokio::time::interval(time_until_synced);
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
Some(timer)
};

let initial_sync_state = SyncState::from_config(
min_connected_reserved_peers,
Expand All @@ -79,7 +86,6 @@ impl SyncTask {
Self {
peer_connections_stream,
min_connected_reserved_peers,
time_until_synced,
block_stream,
state_sender,
state_receiver,
Expand All @@ -100,10 +106,10 @@ impl SyncTask {
});
}

async fn restart_timer(&mut self) {
self.timer
.set_timeout(self.time_until_synced, OnConflict::Overwrite)
.await;
fn restart_timer(&mut self) {
if let Some(timer) = &mut self.timer {
timer.reset();
}
}
}

Expand Down Expand Up @@ -135,6 +141,12 @@ impl RunnableTask for SyncTask {
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let mut should_continue = true;

let tick: BoxFuture<tokio::time::Instant> = if let Some(timer) = &mut self.timer {
Box::pin(timer.tick())
} else {
let future = core::future::pending();
Box::pin(future)
};
tokio::select! {
biased;
_ = watcher.while_started() => {
Expand All @@ -146,11 +158,11 @@ impl RunnableTask for SyncTask {
match &self.inner_state {
InnerSyncState::InsufficientPeers(block_header) if sufficient_peers => {
self.inner_state = InnerSyncState::SufficientPeers(block_header.clone());
self.restart_timer().await;
self.restart_timer();
}
InnerSyncState::SufficientPeers(block_header) if !sufficient_peers => {
self.inner_state = InnerSyncState::InsufficientPeers(block_header.clone());
self.timer.clear().await;
self.restart_timer();
}
InnerSyncState::Synced { block_header, .. } => {
self.inner_state = InnerSyncState::Synced {
Expand All @@ -170,7 +182,7 @@ impl RunnableTask for SyncTask {
}
InnerSyncState::SufficientPeers(block_header) if new_block_height > block_header.height() => {
self.inner_state = InnerSyncState::SufficientPeers(block_info.block_header);
self.restart_timer().await;
self.restart_timer();
}
InnerSyncState::Synced { block_header, has_sufficient_peers } if new_block_height > block_header.height() => {
if block_info.is_locally_produced() {
Expand All @@ -183,7 +195,7 @@ impl RunnableTask for SyncTask {
// we considered to be synced but we're obviously not!
if *has_sufficient_peers {
self.inner_state = InnerSyncState::SufficientPeers(block_info.block_header);
self.restart_timer().await;
self.restart_timer();
} else {
self.inner_state = InnerSyncState::InsufficientPeers(block_info.block_header);
}
Expand All @@ -194,7 +206,7 @@ impl RunnableTask for SyncTask {
_ => {}
}
}
_ = self.timer.wait() => {
_ = tick => {
if let InnerSyncState::SufficientPeers(block_header) = &self.inner_state {
let block_header = block_header.clone();
self.inner_state = InnerSyncState::Synced {
Expand Down
13 changes: 12 additions & 1 deletion crates/services/importer/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,18 @@ where

// Await until all receivers of the notification process the result.
if let Some(channel) = previous_block_result {
let _ = channel.await;
const TIMEOUT: u64 = 20;
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel)
.await;

if result.is_err() {
tracing::error!(
"The previous block processing \
was not finished for {TIMEOUT} seconds."
);
return Err(Error::PreviousBlockProcessingNotFinished)
}
}

let start = Instant::now();
Expand Down

0 comments on commit acd8d19

Please sign in to comment.