Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace task run() return result with custom enum #2429

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Changed
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
- [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks
- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently.

#### Breaking
Expand Down
15 changes: 10 additions & 5 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use fuel_core_services::{
RunnableService,
RunnableTask,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::fuel_types::BlockHeight;
Expand Down Expand Up @@ -196,11 +197,15 @@ impl RunnableService for GraphqlService {

#[async_trait::async_trait]
impl RunnableTask for Task {
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result<bool> {
self.server.as_mut().await?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces a non-trivial amount of noise with the removal of the ? operators.

AFAICT, there isn't a way to implement Try for arbitrary types in Rust which is a real bummer.

// The `axum::Server` has its internal loop. If `await` is finished, we get an internal
// error or stop signal.
Ok(false /* should_continue */)
async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
match self.server.as_mut().await {
Ok(()) => {
// The `axum::Server` has its internal loop. If `await` is finished, we get an internal
// error or stop signal.
TaskNextAction::Stop
}
Err(err) => TaskNextAction::ErrorContinue(err.into()),
}
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
18 changes: 10 additions & 8 deletions crates/fuel-core/src/graphql_api/worker_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use fuel_core_services::{
RunnableTask,
ServiceRunner,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::{
Error as StorageError,
Expand Down Expand Up @@ -551,13 +552,12 @@ where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
tokio::select! {
biased;

_ = watcher.while_started() => {
should_continue = false;
TaskNextAction::Stop
}

result = self.block_importer.next() => {
Expand All @@ -567,17 +567,19 @@ where
// In the case of an error, shut down the service to avoid a huge
// de-synchronization between on-chain and off-chain databases.
if let Err(e) = result {
tracing::error!("Error processing block: {:?}", e);
should_continue = self.continue_on_error;
if self.continue_on_error {
TaskNextAction::ErrorContinue(e)
} else {
TaskNextAction::Stop
}
} else {
should_continue = true
TaskNextAction::Continue
}
} else {
should_continue = false
TaskNextAction::Stop
}
}
}
Ok(should_continue)
}

async fn shutdown(mut self) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api/worker_service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn run__relayed_transaction_events_are_added_to_storage() {
// when
let mut task =
worker_task_with_block_importer_and_db(block_importer, database.clone());
task.run(&mut state_watcher).await.unwrap();
task.run(&mut state_watcher).await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// then
Expand Down
6 changes: 3 additions & 3 deletions crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use fuel_core_services::{
ServiceRunner,
State,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::{
not_found,
Expand Down Expand Up @@ -428,7 +429,7 @@ impl RunnableService for Task {
#[async_trait::async_trait]
impl RunnableTask for Task {
#[tracing::instrument(skip_all)]
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
let mut stop_signals = vec![];
for service in self.services.iter() {
stop_signals.push(service.await_stop())
Expand All @@ -443,8 +444,7 @@ impl RunnableTask for Task {

// We received the stop signal from any of one source, so stop this service and
// all sub-services.
let should_continue = false;
Ok(should_continue)
TaskNextAction::Stop
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use fuel_core_services::{
ServiceRunner,
SharedMutex,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::{
not_found,
Expand Down Expand Up @@ -111,13 +112,12 @@ impl SharedState {

#[async_trait::async_trait]
impl RunnableTask for Task {
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
tokio::select! {
biased;

_ = watcher.while_started() => {
should_continue = false;
TaskNextAction::Stop
}

Some(event) = self.blocks_events.next() => {
Expand All @@ -135,16 +135,13 @@ impl RunnableTask for Task {
}
Err(err) => {
tracing::error!("Failed to cache consensus parameters: {:?}", err);
should_continue = false;
return Ok(should_continue)
return TaskNextAction::Stop
}
}
}
should_continue = true;
TaskNextAction::Continue
}
}

Ok(should_continue)
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -358,7 +355,7 @@ mod tests {
// When
let result_with_new_version = result_with_new_version(new_version);
let _ = block_sender.send(result_with_new_version);
task.run(&mut StateWatcher::started()).await.unwrap();
task.run(&mut StateWatcher::started()).await;

// Then
assert_eq!(
Expand Down
52 changes: 30 additions & 22 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use fuel_core_services::{
Service as OtherService,
ServiceRunner,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::transactional::Changes;
use fuel_core_types::{
Expand Down Expand Up @@ -518,16 +519,14 @@ where
PB: PredefinedBlocks,
C: GetTime,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
let mut sync_state = self.sync_task_handle.shared.clone();
// make sure we're synced first
if *sync_state.borrow_and_update() == SyncState::NotSynced {
tokio::select! {
biased;
result = watcher.while_started() => {
should_continue = result?.started();
return Ok(should_continue);
return result.map(|state| state.started()).into()
}
_ = sync_state.changed() => {}
}
Expand All @@ -538,26 +537,37 @@ where
}

let next_height = self.next_height();
let maybe_block = self.predefined_blocks.get_block(&next_height)?;
let maybe_block = match self.predefined_blocks.get_block(&next_height) {
Ok(option) => option,
Err(err) => return TaskNextAction::ErrorContinue(err),
};
if let Some(block) = maybe_block {
self.produce_predefined_block(&block).await?;
should_continue = true;
return Ok(should_continue)
let res = self.produce_predefined_block(&block).await;
return match res {
Ok(()) => TaskNextAction::Continue,
Err(err) => TaskNextAction::ErrorContinue(err),
}
}

let next_block_production: BoxFuture<()> = match self.trigger {
Trigger::Never | Trigger::Instant => Box::pin(core::future::pending()),
Trigger::Interval { block_time } => Box::pin(sleep_until(
self.last_block_created
Trigger::Interval { block_time } => {
let next_block_time = match self
.last_block_created
.checked_add(block_time)
.ok_or(anyhow!("Time exceeds system limits"))?,
)),
.ok_or(anyhow!("Time exceeds system limits"))
{
Ok(time) => time,
Err(err) => return TaskNextAction::ErrorContinue(err),
};
Box::pin(sleep_until(next_block_time))
}
};

tokio::select! {
biased;
_ = watcher.while_started() => {
should_continue = false;
TaskNextAction::Stop
}
request = self.request_receiver.recv() => {
if let Some(request) = request {
Expand All @@ -567,29 +577,27 @@ where
let _ = response.send(result);
}
}
should_continue = true;
TaskNextAction::Continue
} else {
tracing::error!("The PoA task should be the holder of the `Sender`");
should_continue = false;
TaskNextAction::Stop
}
}
_ = next_block_production => {
match self.on_timer().await.context("While processing timer event") {
Ok(()) => should_continue = true,
Ok(()) => TaskNextAction::Continue,
Err(err) => {
// Wait some time in case of error to avoid spamming retry block production
tokio::time::sleep(Duration::from_secs(1)).await;
return Err(err);
TaskNextAction::ErrorContinue(err)
}
};
}
}
_ = self.new_txs_watcher.changed() => {
self.on_txpool_event().await.context("While processing txpool event")?;
should_continue = true;
let res = self.on_txpool_event().await.context("While processing txpool event");
TaskNextAction::always_continue(res)
}
}

Ok(should_continue)
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
13 changes: 6 additions & 7 deletions crates/services/consensus_module/poa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use fuel_core_services::{
RunnableService,
RunnableTask,
StateWatcher,
TaskNextAction,
};
use fuel_core_types::{
blockchain::header::BlockHeader,
Expand Down Expand Up @@ -137,10 +138,7 @@ impl RunnableService for SyncTask {

#[async_trait::async_trait]
impl RunnableTask for SyncTask {
#[tracing::instrument(level = "debug", skip_all, err, ret)]
Copy link
Member Author

@MitchTurner MitchTurner Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to remove this from a couple locations since the instrument macro assumes the function returns Result

async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let mut should_continue = true;

async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
let tick: BoxFuture<tokio::time::Instant> = if let Some(timer) = &mut self.timer {
Box::pin(timer.tick())
} else {
Expand All @@ -150,7 +148,7 @@ impl RunnableTask for SyncTask {
tokio::select! {
biased;
_ = watcher.while_started() => {
should_continue = false;
TaskNextAction::Stop
}
Some(latest_peer_count) = self.peer_connections_stream.next() => {
let sufficient_peers = latest_peer_count >= self.min_connected_reserved_peers;
Expand All @@ -172,6 +170,7 @@ impl RunnableTask for SyncTask {
}
_ => {},
}
TaskNextAction::Continue
}
Some(block_info) = self.block_stream.next() => {
let new_block_height = block_info.block_header.height();
Expand Down Expand Up @@ -205,6 +204,7 @@ impl RunnableTask for SyncTask {
}
_ => {}
}
TaskNextAction::Continue
}
_ = tick => {
if let InnerSyncState::SufficientPeers(block_header) = &self.inner_state {
Expand All @@ -215,10 +215,9 @@ impl RunnableTask for SyncTask {
};
self.update_sync_state(SyncState::Synced(Arc::new(block_header)));
}
TaskNextAction::Continue
}
}

Ok(should_continue)
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
Loading
Loading