Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace task run() return result with custom enum #2429

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Changed
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
- [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks

## [Version 0.40.0]

Expand Down
15 changes: 10 additions & 5 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use fuel_core_services::{
RunnableService,
RunnableTask,
StateWatcher,
TaskRunResult,
};
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::fuel_types::BlockHeight;
Expand Down Expand Up @@ -196,11 +197,15 @@ impl RunnableService for GraphqlService {

#[async_trait::async_trait]
impl RunnableTask for Task {
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result<bool> {
self.server.as_mut().await?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces a non-trivial amount of noise with the removal of the ? operators.

AFAICT, there isn't a way to implement Try for arbitrary types in Rust which is a real bummer.

// The `axum::Server` has its internal loop. If `await` is finished, we get an internal
// error or stop signal.
Ok(false /* should_continue */)
async fn run(&mut self, _: &mut StateWatcher) -> TaskRunResult {
match self.server.as_mut().await {
Ok(()) => {
// The `axum::Server` has its internal loop. If `await` is finished, we get an internal
// error or stop signal.
TaskRunResult::Stop
}
Err(err) => TaskRunResult::ErrorContinue(err.into()),
}
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions crates/fuel-core/src/graphql_api/worker_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use fuel_core_services::{
RunnableTask,
ServiceRunner,
StateWatcher,
TaskRunResult,
};
use fuel_core_storage::{
Error as StorageError,
Expand Down Expand Up @@ -551,7 +552,7 @@ where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskRunResult {
let should_continue;
tokio::select! {
biased;
Expand All @@ -577,7 +578,7 @@ where
}
}
}
Ok(should_continue)
TaskRunResult::should_continue(should_continue)
}

async fn shutdown(mut self) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api/worker_service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn run__relayed_transaction_events_are_added_to_storage() {
// when
let mut task =
worker_task_with_block_importer_and_db(block_importer, database.clone());
task.run(&mut state_watcher).await.unwrap();
task.run(&mut state_watcher).await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// then
Expand Down
6 changes: 3 additions & 3 deletions crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use fuel_core_services::{
ServiceRunner,
State,
StateWatcher,
TaskRunResult,
};
use fuel_core_storage::{
not_found,
Expand Down Expand Up @@ -428,7 +429,7 @@ impl RunnableService for Task {
#[async_trait::async_trait]
impl RunnableTask for Task {
#[tracing::instrument(skip_all)]
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskRunResult {
let mut stop_signals = vec![];
for service in self.services.iter() {
stop_signals.push(service.await_stop())
Expand All @@ -443,8 +444,7 @@ impl RunnableTask for Task {

// We received the stop signal from any of one source, so stop this service and
// all sub-services.
let should_continue = false;
Ok(should_continue)
TaskRunResult::Stop
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use fuel_core_services::{
ServiceRunner,
SharedMutex,
StateWatcher,
TaskRunResult,
};
use fuel_core_storage::{
not_found,
Expand Down Expand Up @@ -111,7 +112,7 @@ impl SharedState {

#[async_trait::async_trait]
impl RunnableTask for Task {
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskRunResult {
let should_continue;
tokio::select! {
biased;
Expand All @@ -135,16 +136,15 @@ impl RunnableTask for Task {
}
Err(err) => {
tracing::error!("Failed to cache consensus parameters: {:?}", err);
should_continue = false;
return Ok(should_continue)
return TaskRunResult::Stop
}
}
}
should_continue = true;
}
}

Ok(should_continue)
TaskRunResult::should_continue(should_continue)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though the idea was to remove should_continue variable and replace with the usage of the enum=D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal was to change the interface.

I'm happy to do some more refactoring too.

}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -358,7 +358,7 @@ mod tests {
// When
let result_with_new_version = result_with_new_version(new_version);
let _ = block_sender.send(result_with_new_version);
task.run(&mut StateWatcher::started()).await.unwrap();
task.run(&mut StateWatcher::started()).await;

// Then
assert_eq!(
Expand Down
45 changes: 30 additions & 15 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use fuel_core_services::{
Service as OtherService,
ServiceRunner,
StateWatcher,
TaskRunResult,
};
use fuel_core_storage::transactional::Changes;
use fuel_core_types::{
Expand Down Expand Up @@ -518,16 +519,15 @@ where
PB: PredefinedBlocks,
C: GetTime,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskRunResult {
let should_continue;
let mut sync_state = self.sync_task_handle.shared.clone();
// make sure we're synced first
if *sync_state.borrow_and_update() == SyncState::NotSynced {
tokio::select! {
biased;
result = watcher.while_started() => {
should_continue = result?.started();
return Ok(should_continue);
return result.map(|state| state.started()).into()
}
_ = sync_state.changed() => {}
}
Expand All @@ -538,20 +538,31 @@ where
}

let next_height = self.next_height();
let maybe_block = self.predefined_blocks.get_block(&next_height)?;
let maybe_block = match self.predefined_blocks.get_block(&next_height) {
Ok(option) => option,
Err(err) => return TaskRunResult::ErrorContinue(err),
};
if let Some(block) = maybe_block {
self.produce_predefined_block(&block).await?;
should_continue = true;
return Ok(should_continue)
let res = self.produce_predefined_block(&block).await;
return match res {
Ok(()) => TaskRunResult::Continue,
Err(err) => TaskRunResult::ErrorContinue(err),
}
}

let next_block_production: BoxFuture<()> = match self.trigger {
Trigger::Never | Trigger::Instant => Box::pin(core::future::pending()),
Trigger::Interval { block_time } => Box::pin(sleep_until(
self.last_block_created
Trigger::Interval { block_time } => {
let next_block_time = match self
.last_block_created
.checked_add(block_time)
.ok_or(anyhow!("Time exceeds system limits"))?,
)),
.ok_or(anyhow!("Time exceeds system limits"))
{
Ok(time) => time,
Err(err) => return TaskRunResult::ErrorContinue(err),
};
Box::pin(sleep_until(next_block_time))
}
};

tokio::select! {
Expand Down Expand Up @@ -579,17 +590,21 @@ where
Err(err) => {
// Wait some time in case of error to avoid spamming retry block production
tokio::time::sleep(Duration::from_secs(1)).await;
return Err(err);
return TaskRunResult::ErrorContinue(err);
}
};
}
_ = self.new_txs_watcher.changed() => {
self.on_txpool_event().await.context("While processing txpool event")?;
should_continue = true;
match self.on_txpool_event().await.context("While processing txpool event") {
Ok(()) => should_continue = true,
Err(err) => {
return TaskRunResult::ErrorContinue(err);
}
};
}
}

Ok(should_continue)
TaskRunResult::should_continue(should_continue)
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions crates/services/consensus_module/poa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use fuel_core_services::{
RunnableService,
RunnableTask,
StateWatcher,
TaskRunResult,
};
use fuel_core_types::{
blockchain::header::BlockHeader,
Expand Down Expand Up @@ -137,8 +138,7 @@ impl RunnableService for SyncTask {

#[async_trait::async_trait]
impl RunnableTask for SyncTask {
#[tracing::instrument(level = "debug", skip_all, err, ret)]
Copy link
Member Author

@MitchTurner MitchTurner Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to remove this from a couple locations since the instrument macro assumes the function returns Result

async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskRunResult {
let mut should_continue = true;

let tick: BoxFuture<tokio::time::Instant> = if let Some(timer) = &mut self.timer {
Expand Down Expand Up @@ -218,7 +218,7 @@ impl RunnableTask for SyncTask {
}
}

Ok(should_continue)
TaskRunResult::should_continue(should_continue)
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
25 changes: 17 additions & 8 deletions crates/services/gas_price_service/src/v0/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use async_trait::async_trait;
use fuel_core_services::{
RunnableTask,
StateWatcher,
TaskRunResult,
};
use fuel_gas_price_algorithm::v0::{
AlgorithmUpdaterV0,
Expand Down Expand Up @@ -121,24 +122,32 @@ where
L2: L2BlockSource,
Metadata: MetadataStorage,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskRunResult {
tokio::select! {
biased;
_ = watcher.while_started() => {
tracing::debug!("Stopping gas price service");
should_continue = false;
TaskRunResult::Stop
}
l2_block_res = self.l2_block_source.get_l2_block() => {
tracing::info!("Received L2 block result: {:?}", l2_block_res);
let block = l2_block_res?;
let block = match l2_block_res {
Ok(block) => block,
Err(err) => {
return anyhow!(err).into()
}
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this pattern in many places. Maybe it would be simpler to defined a helper trait with into_continue_task_result which either returns TaskRunResult::Continue or TaskRunResult::ErrorContinue


tracing::debug!("Updating gas price algorithm");
self.apply_block_info_to_gas_algorithm(block).await?;
should_continue = true;
match self.apply_block_info_to_gas_algorithm(block).await {
Ok(_) => {},
Err(err) => {
return err.into()
}
}
TaskRunResult::Continue
}
}
Ok(should_continue)
}

async fn shutdown(mut self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -253,7 +262,7 @@ mod tests {
let initial_price = read_algo.next_gas_price();

// when
service.run(&mut watcher).await.unwrap();
service.run(&mut watcher).await;
l2_block_sender.send(l2_block).await.unwrap();
service.shutdown().await.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions crates/services/gas_price_service/src/v0/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async fn next_gas_price__affected_by_new_l2_block() {
let mut watcher = StateWatcher::default();

// when
service.run(&mut watcher).await.unwrap();
service.run(&mut watcher).await;
l2_block_sender.send(l2_block).await.unwrap();
service.shutdown().await.unwrap();

Expand Down Expand Up @@ -213,7 +213,7 @@ async fn next__new_l2_block_saves_old_metadata() {
let mut watcher = StateWatcher::default();
let start = read_algo.next_gas_price();

service.run(&mut watcher).await.unwrap();
service.run(&mut watcher).await;
l2_block_sender.send(l2_block).await.unwrap();
service.shutdown().await.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use fuel_core_services::{
RunnableTask,
ServiceRunner,
StateWatcher,
TaskRunResult,
};
use std::time::Duration;
use tokio::{
Expand Down Expand Up @@ -100,21 +101,23 @@ where
{
/// This function polls the source according to a polling interval
/// described by the DaBlockCostsService
async fn run(&mut self, state_watcher: &mut StateWatcher) -> Result<bool> {
let continue_running;

async fn run(&mut self, state_watcher: &mut StateWatcher) -> TaskRunResult {
tokio::select! {
biased;
_ = state_watcher.while_started() => {
continue_running = false;
TaskRunResult::Stop
}
_ = self.poll_interval.tick() => {
let da_block_costs = self.source.request_da_block_cost().await?;
self.shared_state.0.send(da_block_costs)?;
continue_running = true;
match self.source.request_da_block_cost().await.and_then(|da_block_costs| self.shared_state.0.send(da_block_costs).map_err(|err| anyhow::anyhow!(err))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very hard to read, maybe we could it inside of the function and process the result of the function?=)

Ok(da_block_costs) => {
TaskRunResult::Continue
}
Err(err) => {
TaskRunResult::ErrorContinue(err)
}
}
}
}
Ok(continue_running)
}

/// There are no shutdown hooks required by the sources *yet*
Expand Down
Loading
Loading