diff --git a/CHANGELOG.md b/CHANGELOG.md index e6e796ec8e1..d9ea8261b2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Changed +- [#1942](https://github.com/FuelLabs/fuel-core/pull/1942): Sequential relayer's commits. + ### Fixed - [#1950](https://github.com/FuelLabs/fuel-core/pull/1950): Fix cursor `BlockHeight` encoding in `SortedTXCursor` diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 35ffa1a4d6e..e46ce1a12de 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -479,9 +479,7 @@ where .advance_height() .ok_or(DatabaseError::FailedToAdvanceHeight)?; - // TODO: After https://github.com/FuelLabs/fuel-core/issues/451 - // we can replace `next_expected_height > new_height` with `next_expected_height != new_height`. - if next_expected_height > new_height { + if next_expected_height != new_height { return Err(DatabaseError::HeightsAreNotLinked { prev_height: prev_height.as_u64(), new_height: new_height.as_u64(), diff --git a/crates/services/relayer/src/service/get_logs.rs b/crates/services/relayer/src/service/get_logs.rs index 3b92c93e6b7..3d6d4bd24e3 100644 --- a/crates/services/relayer/src/service/get_logs.rs +++ b/crates/services/relayer/src/service/get_logs.rs @@ -4,18 +4,24 @@ use fuel_core_types::{ services::relayer::Event, }; use futures::TryStreamExt; -use std::collections::BTreeMap; +use std::collections::HashMap; #[cfg(test)] mod test; +pub struct DownloadedLogs { + pub start_height: u64, + pub last_height: u64, + pub logs: Vec, +} + /// Download the logs from the DA layer. pub(crate) fn download_logs<'a, P>( eth_sync_gap: &state::EthSyncGap, contracts: Vec, eth_node: &'a P, page_size: u64, -) -> impl futures::Stream), ProviderError>> + 'a +) -> impl futures::Stream> + 'a where P: Middleware + 'static, { @@ -41,16 +47,23 @@ where page.latest() ); + let oldest_block = page.oldest(); let latest_block = page.latest(); // Reduce the page. let page = page.reduce(); // Get the logs and return the reduced page. - eth_node - .get_logs(&filter) - .await - .map(|logs| Some(((latest_block, logs), page))) + eth_node.get_logs(&filter).await.map(|logs| { + Some(( + DownloadedLogs { + start_height: oldest_block, + last_height: latest_block, + logs, + }, + page, + )) + }) } } } @@ -62,12 +75,16 @@ where pub(crate) async fn write_logs(database: &mut D, logs: S) -> anyhow::Result<()> where D: RelayerDb, - S: futures::Stream), ProviderError>>, + S: futures::Stream>, { tokio::pin!(logs); - while let Some((last_height, events)) = logs.try_next().await? { - let last_height = last_height.into(); - let mut ordered_events = BTreeMap::>::new(); + while let Some(DownloadedLogs { + start_height, + last_height, + logs: events, + }) = logs.try_next().await? + { + let mut unordered_events = HashMap::>::new(); let sorted_events = sort_events_by_log_index(events)?; let fuel_events = sorted_events.into_iter().filter_map(|event| { match EthEventLog::try_from(&event) { @@ -90,21 +107,14 @@ where for event in fuel_events { let event = event?; let height = event.da_height(); - ordered_events.entry(height).or_default().push(event); - } - - let mut inserted_last_height = false; - for (height, events) in ordered_events { - database.insert_events(&height, &events)?; - if height == last_height { - inserted_last_height = true; - } + unordered_events.entry(height).or_default().push(event); } - // TODO: For https://github.com/FuelLabs/fuel-core/issues/451 we need to write each height - // (not only the last height), even if it's empty. - if !inserted_last_height { - database.insert_events(&last_height, &[])?; + let empty_events = Vec::new(); + for height in start_height..=last_height { + let height: DaBlockHeight = height.into(); + let events = unordered_events.get(&height).unwrap_or(&empty_events); + database.insert_events(&height, events)?; } } Ok(()) diff --git a/crates/services/relayer/src/service/get_logs/test.rs b/crates/services/relayer/src/service/get_logs/test.rs index 938780670d0..5125323d59d 100644 --- a/crates/services/relayer/src/service/get_logs/test.rs +++ b/crates/services/relayer/src/service/get_logs/test.rs @@ -137,7 +137,7 @@ async fn can_paginate_logs(input: Input) -> Expected { ð_node, DEFAULT_LOG_PAGE_SIZE, ) - .map_ok(|(_, l)| l) + .map_ok(|logs| logs.logs) .try_concat() .await .unwrap(); @@ -148,32 +148,39 @@ async fn can_paginate_logs(input: Input) -> Expected { } #[test_case(vec![ - Ok((1, messages_n(1, 0))) + Ok((1, 1, messages_n(1, 0))) ] => 1 ; "Can add single" )] #[test_case(vec![ - Ok((3, messages_n(3, 0))), - Ok((4, messages_n(1, 4))) + Ok((3, 3, messages_n(3, 0))), + Ok((4, 4, messages_n(1, 4))) ] => 4 ; "Can add two" )] #[test_case(vec![ - Ok((3, messages_n(3, 0))), - Ok((4, vec![])) + Ok((3, 3, messages_n(3, 0))), + Ok((4, 4, vec![])) ] => 4 ; "Can add empty" )] #[test_case(vec![ - Ok((7, messages_n(3, 0))), - Ok((19, messages_n(1, 4))), + Ok((1, 7, messages_n(3, 0))), + Ok((8, 19, messages_n(1, 4))), Err(ProviderError::CustomError("".to_string())) ] => 19 ; "Still adds height when error" )] #[tokio::test] +#[allow(clippy::type_complexity)] async fn test_da_height_updates( - stream: Vec), ProviderError>>, + stream: Vec), ProviderError>>, ) -> u64 { let mut mock_db = crate::mock_db::MockDb::default(); - let logs = futures::stream::iter(stream); + let logs = futures::stream::iter(stream).map(|result| { + result.map(|(start_height, last_height, logs)| DownloadedLogs { + start_height, + last_height, + logs, + }) + }); let _ = write_logs(&mut mock_db, logs).await; diff --git a/crates/services/relayer/src/service/test.rs b/crates/services/relayer/src/service/test.rs index 03870f01d15..860ba067424 100644 --- a/crates/services/relayer/src/service/test.rs +++ b/crates/services/relayer/src/service/test.rs @@ -38,7 +38,7 @@ async fn can_download_logs() { ð_node, DEFAULT_LOG_PAGE_SIZE, ) - .map_ok(|(_, l)| l) + .map_ok(|logs| logs.logs) .try_concat() .await .unwrap(); diff --git a/tests/test-helpers/src/builder.rs b/tests/test-helpers/src/builder.rs index 87b09a7244a..074412fe539 100644 --- a/tests/test-helpers/src/builder.rs +++ b/tests/test-helpers/src/builder.rs @@ -16,6 +16,7 @@ use fuel_core::{ use fuel_core_client::client::FuelClient; use fuel_core_poa::Trigger; use fuel_core_types::{ + blockchain::header::LATEST_STATE_TRANSITION_VERSION, fuel_asm::op, fuel_tx::{ field::Inputs, @@ -210,7 +211,7 @@ impl TestSetupBuilder { let latest_block = self.starting_block.map(|starting_block| LastBlockConfig { block_height: starting_block, - state_transition_version: 0, + state_transition_version: LATEST_STATE_TRANSITION_VERSION - 1, ..Default::default() });