From 32fd034df8197fe26dae4ce0e9b81b642bdbe651 Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Wed, 30 Oct 2024 17:58:13 +0100 Subject: [PATCH 01/11] feat: `PipelineMetrics` common scheme, metrics for `DerivationPipeline` --- bin/client/src/kona.rs | 3 ++ bin/client/src/l1/driver.rs | 4 ++ crates/derive-alloy/src/pipeline.rs | 15 +++--- crates/derive/src/lib.rs | 1 + crates/derive/src/metrics/mod.rs | 34 +++++++++++++ crates/derive/src/metrics/noop.rs | 28 ++++++++++ crates/derive/src/pipeline/builder.rs | 17 +++++-- crates/derive/src/pipeline/core.rs | 65 ++++++++++++++++++------ crates/derive/src/test_utils/pipeline.rs | 4 +- crates/derive/src/traits/mod.rs | 4 +- crates/derive/src/traits/pipeline.rs | 12 +++++ 11 files changed, 160 insertions(+), 27 deletions(-) create mode 100644 crates/derive/src/metrics/mod.rs create mode 100644 crates/derive/src/metrics/noop.rs diff --git a/bin/client/src/kona.rs b/bin/client/src/kona.rs index e1d825a37..fc7d912bd 100644 --- a/bin/client/src/kona.rs +++ b/bin/client/src/kona.rs @@ -17,6 +17,7 @@ use kona_common_proc::client_entry; pub(crate) mod fault; use fault::{fpvm_handle_register, HINT_WRITER, ORACLE_READER}; +use kona_derive::metrics::PipelineMetrics; /// The size of the LRU cache in the oracle. const ORACLE_LRU_SIZE: usize = 1024; @@ -42,6 +43,7 @@ fn main() -> Result<()> { let l1_provider = OracleL1ChainProvider::new(boot.clone(), oracle.clone()); let l2_provider = OracleL2ChainProvider::new(boot.clone(), oracle.clone()); let beacon = OracleBlobProvider::new(oracle.clone()); + let metrics = PipelineMetrics::no_op(); //////////////////////////////////////////////////////////////// // DERIVATION & EXECUTION // @@ -54,6 +56,7 @@ fn main() -> Result<()> { beacon, l1_provider, l2_provider.clone(), + metrics, ) .await?; diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 96da16245..9f03d4c10 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -13,6 +13,7 @@ use core::fmt::Debug; use kona_derive::{ attributes::StatefulAttributesBuilder, errors::{PipelineErrorKind, ResetError}, + metrics::PipelineMetrics, pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult}, sources::EthereumDataSource, stages::{ @@ -37,6 +38,7 @@ use tracing::{error, info, warn}; pub type OraclePipeline = DerivationPipeline< OracleAttributesQueue, O>, OracleL2ChainProvider, + PipelineMetrics, >; /// An oracle-backed Ethereum data source. @@ -121,6 +123,7 @@ where blob_provider: B, mut chain_provider: OracleL1ChainProvider, mut l2_chain_provider: OracleL2ChainProvider, + metrics: PipelineMetrics, ) -> Result { let cfg = Arc::new(boot_info.rollup_config.clone()); @@ -158,6 +161,7 @@ where .chain_provider(chain_provider) .builder(attributes) .origin(l1_origin) + .metrics(metrics) .build(); Ok(Self { l2_safe_head, l2_safe_head_header, pipeline }) diff --git a/crates/derive-alloy/src/pipeline.rs b/crates/derive-alloy/src/pipeline.rs index d284b0a83..6ef4d9a60 100644 --- a/crates/derive-alloy/src/pipeline.rs +++ b/crates/derive-alloy/src/pipeline.rs @@ -1,7 +1,11 @@ //! Helper to construct a [DerivationPipeline] using online types. +use crate::{ + AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback, +}; use kona_derive::{ attributes::StatefulAttributesBuilder, + metrics::PipelineMetrics, pipeline::{DerivationPipeline, PipelineBuilder}, sources::EthereumDataSource, stages::{ @@ -13,13 +17,12 @@ use op_alloy_genesis::RollupConfig; use op_alloy_protocol::BlockInfo; use std::sync::Arc; -use crate::{ - AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback, -}; - /// An online derivation pipeline. -pub type OnlinePipeline = - DerivationPipeline, AlloyL2ChainProvider>; +pub type OnlinePipeline = DerivationPipeline< + OnlineAttributesQueue, + AlloyL2ChainProvider, + PipelineMetrics, +>; /// An `online` Ethereum data source. pub type OnlineDataProvider = EthereumDataSource< diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index bee907787..3b955bbf9 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -22,6 +22,7 @@ pub mod prelude { pub mod attributes; pub mod errors; +pub mod metrics; pub mod pipeline; pub mod sources; pub mod stages; diff --git a/crates/derive/src/metrics/mod.rs b/crates/derive/src/metrics/mod.rs new file mode 100644 index 000000000..50aeeb7bb --- /dev/null +++ b/crates/derive/src/metrics/mod.rs @@ -0,0 +1,34 @@ +//! Metrics for the derivation pipeline. + +mod noop; + +use alloc::sync::Arc; +use core::fmt::Debug; + +use crate::traits::{DerivationPipelineMetrics, StepResult}; + +/// Composite metrics struct containing metrics for all stages. +pub struct PipelineMetrics { + pub(crate) derivation_pipeline_metrics: Arc, + // todo: add more metrics here for each stage +} + +impl Debug for PipelineMetrics { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("PipelineMetrics").finish() + } +} + +impl DerivationPipelineMetrics for PipelineMetrics { + fn record_step_result(&self, result: &StepResult) { + self.derivation_pipeline_metrics.record_step_result(result) + } + + fn inc_reset_signals(&self) { + self.derivation_pipeline_metrics.inc_reset_signals() + } + + fn inc_flush_channel_signals(&self) { + self.derivation_pipeline_metrics.inc_flush_channel_signals() + } +} diff --git a/crates/derive/src/metrics/noop.rs b/crates/derive/src/metrics/noop.rs new file mode 100644 index 000000000..64690a6d6 --- /dev/null +++ b/crates/derive/src/metrics/noop.rs @@ -0,0 +1,28 @@ +use crate::{metrics::PipelineMetrics, pipeline::StepResult, traits::DerivationPipelineMetrics}; +use alloc::sync::Arc; + +impl PipelineMetrics { + /// No-op implementation for `PipelineMetrics`. + pub fn no_op() -> Self { + Self { + derivation_pipeline_metrics: Arc::new(NoopDerivationPipelineMetrics), + // todo: add more metrics here for each stage + } + } +} + +/// No-op implementation of `DerivationPipelineMetrics`. +#[derive(Debug)] +struct NoopDerivationPipelineMetrics; + +impl DerivationPipelineMetrics for NoopDerivationPipelineMetrics { + fn record_step_result(&self, _result: &StepResult) { + // No-op + } + fn inc_reset_signals(&self) { + // No-op + } + fn inc_flush_channel_signals(&self) { + // No-op + } +} diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index 8dc995412..ea4558dc2 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -2,6 +2,7 @@ use super::{AttributesBuilder, DataAvailabilityProvider, DerivationPipeline}; use crate::{ + metrics::PipelineMetrics, stages::{ AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, @@ -37,6 +38,7 @@ where builder: Option, origin: Option, rollup_config: Option>, + metrics: Option, } impl Default for PipelineBuilder @@ -54,6 +56,7 @@ where builder: None, origin: None, rollup_config: None, + metrics: None, } } } @@ -106,14 +109,20 @@ where self } + /// Sets the metrics implementation for the pipeline. + pub fn metrics(mut self, metrics: PipelineMetrics) -> Self { + self.metrics = Some(metrics); + self + } + /// Builds the pipeline. - pub fn build(self) -> DerivationPipeline, T> { + pub fn build(self) -> DerivationPipeline, T, PipelineMetrics> { self.into() } } impl From> - for DerivationPipeline, T> + for DerivationPipeline, T, PipelineMetrics> where B: AttributesBuilder + Send + Debug, P: ChainProvider + Send + Sync + Debug, @@ -127,7 +136,9 @@ where let l2_chain_provider = builder.l2_chain_provider.expect("chain_provider must be set"); let dap_source = builder.dap_source.expect("dap_source must be set"); let attributes_builder = builder.builder.expect("builder must be set"); + let metrics = builder.metrics.expect("metrics must be set"); + // todo: add metrics to the stages // Compose the stage stack. let mut l1_traversal = L1Traversal::new(chain_provider, Arc::clone(&rollup_config)); l1_traversal.block = Some(builder.origin.expect("origin must be set")); @@ -143,6 +154,6 @@ where AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder); // Create the pipeline. - Self::new(attributes, rollup_config, l2_chain_provider) + Self::new(attributes, rollup_config, l2_chain_provider, metrics) } } diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 6d3f09f09..17cf31f1f 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -6,7 +6,10 @@ use super::{ }; use crate::{ errors::PipelineErrorKind, - traits::{ActivationSignal, L2ChainProvider, ResetSignal, Signal, SignalReceiver}, + traits::{ + ActivationSignal, DerivationPipelineMetrics, L2ChainProvider, ResetSignal, Signal, + SignalReceiver, + }, }; use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc}; use async_trait::async_trait; @@ -18,10 +21,11 @@ use tracing::{error, trace, warn}; /// The derivation pipeline is responsible for deriving L2 inputs from L1 data. #[derive(Debug)] -pub struct DerivationPipeline +pub struct DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, + M: DerivationPipelineMetrics + Send + Sync, { /// A handle to the next attributes. pub attributes: S, @@ -33,37 +37,43 @@ where pub rollup_config: Arc, /// The L2 Chain Provider used to fetch the system config on reset. pub l2_chain_provider: P, + /// Metrics collector. + pub metrics: M, } -impl DerivationPipeline +impl DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, + M: DerivationPipelineMetrics + Send + Sync, { /// Creates a new instance of the [DerivationPipeline]. pub const fn new( attributes: S, rollup_config: Arc, l2_chain_provider: P, + metrics: M, ) -> Self { - Self { attributes, prepared: VecDeque::new(), rollup_config, l2_chain_provider } + Self { attributes, prepared: VecDeque::new(), rollup_config, l2_chain_provider, metrics } } } -impl OriginProvider for DerivationPipeline +impl OriginProvider for DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, + M: DerivationPipelineMetrics + Send + Sync, { fn origin(&self) -> Option { self.attributes.origin() } } -impl Iterator for DerivationPipeline +impl Iterator for DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, + M: DerivationPipelineMetrics + Send + Sync, { type Item = OpAttributesWithParent; @@ -73,10 +83,11 @@ where } #[async_trait] -impl SignalReceiver for DerivationPipeline +impl SignalReceiver for DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, + M: DerivationPipelineMetrics + Send + Sync, { /// Signals the pipeline by calling the [`SignalReceiver::signal`] method. /// @@ -95,6 +106,8 @@ where match signal { s @ Signal::Reset(ResetSignal { l2_safe_head, .. }) | s @ Signal::Activation(ActivationSignal { l2_safe_head, .. }) => { + self.metrics.inc_reset_signals(); + let system_config = self .l2_chain_provider .system_config_by_number( @@ -117,6 +130,8 @@ where } } Signal::FlushChannel => { + self.metrics.inc_flush_channel_signals(); + self.attributes.signal(signal).await?; } } @@ -125,10 +140,11 @@ where } #[async_trait] -impl Pipeline for DerivationPipeline +impl Pipeline for DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, + M: DerivationPipelineMetrics + Send + Sync, { /// Peeks at the next prepared [OpAttributesWithParent] from the pipeline. fn peek(&self) -> Option<&OpAttributesWithParent> { @@ -148,7 +164,7 @@ where /// /// [PipelineError]: crate::errors::PipelineError async fn step(&mut self, cursor: L2BlockInfo) -> StepResult { - match self.attributes.next_attributes(cursor).await { + let result = match self.attributes.next_attributes(cursor).await { Ok(a) => { trace!(target: "pipeline", "Prepared L2 attributes: {:?}", a); self.prepared.push_back(a); @@ -167,13 +183,18 @@ where StepResult::StepFailed(err) } }, - } + }; + + self.metrics.record_step_result(&result); + + result } } #[cfg(test)] mod tests { use crate::{ + metrics::{NoopDerivationPipelineMetrics, PipelineMetrics}, pipeline::{DerivationPipeline, PipelineError, StepResult}, test_utils::{TestL2ChainProvider, *}, traits::{ActivationSignal, Pipeline, ResetSignal, Signal, SignalReceiver}, @@ -243,7 +264,9 @@ mod tests { let l2_chain_provider = TestL2ChainProvider::default(); let expected = default_test_payload_attributes(); let attributes = TestNextAttributes { next_attributes: Some(expected) }; - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = PipelineMetrics::no_op(); + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Step on the pipeline and expect the result. let cursor = L2BlockInfo::default(); @@ -256,7 +279,9 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = NoopDerivationPipelineMetrics; + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Step on the pipeline and expect the result. let cursor = L2BlockInfo::default(); @@ -270,7 +295,9 @@ mod tests { let mut l2_chain_provider = TestL2ChainProvider::default(); l2_chain_provider.system_configs.insert(0, SystemConfig::default()); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = NoopDerivationPipelineMetrics; + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Signal the pipeline to reset. let result = pipeline.signal(ActivationSignal::default().signal()).await; @@ -282,7 +309,9 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = NoopDerivationPipelineMetrics; + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Signal the pipeline to reset. let result = pipeline.signal(Signal::FlushChannel).await; @@ -294,7 +323,9 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = NoopDerivationPipelineMetrics; + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Signal the pipeline to reset. let result = pipeline.signal(ResetSignal::default().signal()).await.unwrap_err(); @@ -307,7 +338,9 @@ mod tests { let mut l2_chain_provider = TestL2ChainProvider::default(); l2_chain_provider.system_configs.insert(0, SystemConfig::default()); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = NoopDerivationPipelineMetrics; + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Signal the pipeline to reset. let result = pipeline.signal(ResetSignal::default().signal()).await; diff --git a/crates/derive/src/test_utils/pipeline.rs b/crates/derive/src/test_utils/pipeline.rs index 8e721a30c..3b3721be7 100644 --- a/crates/derive/src/test_utils/pipeline.rs +++ b/crates/derive/src/test_utils/pipeline.rs @@ -13,6 +13,7 @@ use op_alloy_rpc_types_engine::OpAttributesWithParent; // Re-export these types used internally to the test pipeline. use crate::{ errors::PipelineError, + metrics::PipelineMetrics, pipeline::{DerivationPipeline, PipelineBuilder, PipelineResult}, stages::{ AttributesQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, @@ -86,7 +87,8 @@ pub type TestBatchProvider = BatchProvider pub type TestAttributesQueue = AttributesQueue; /// A [DerivationPipeline] using test providers and sources. -pub type TestPipeline = DerivationPipeline; +pub type TestPipeline = + DerivationPipeline; /// Constructs a [DerivationPipeline] using test providers and sources. pub fn new_test_pipeline() -> TestPipeline { diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 170d6325b..0e23ff623 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,7 +2,9 @@ //! pipeline. mod pipeline; -pub use pipeline::{ActivationSignal, Pipeline, ResetSignal, Signal, StepResult}; +pub use pipeline::{ + ActivationSignal, DerivationPipelineMetrics, Pipeline, ResetSignal, Signal, StepResult, +}; mod providers; pub use providers::{ChainProvider, L2ChainProvider}; diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index 3b1f28d55..0ee4c8f1c 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -101,6 +101,18 @@ pub trait Pipeline: OriginProvider + Iterator { async fn step(&mut self, cursor: L2BlockInfo) -> StepResult; } +/// Metrics trait for `DerivationPipeline`. +pub trait DerivationPipelineMetrics { + /// Records the result of a step in the pipeline. + fn record_step_result(&self, result: &StepResult); + + /// Increments the count of reset signals received. + fn inc_reset_signals(&self); + + /// Increments the count of flush channel signals received. + fn inc_flush_channel_signals(&self); +} + #[cfg(test)] mod tests { use super::*; From a824ce3d98106f7dd6b4e99b188aa1f36125c2d1 Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Wed, 30 Oct 2024 18:14:55 +0100 Subject: [PATCH 02/11] fix: tests --- crates/derive-alloy/src/pipeline.rs | 1 + crates/derive/.idea/.gitignore | 6 ++++++ crates/derive/.idea/derive.iml | 10 ++++++++++ crates/derive/.idea/modules.xml | 8 ++++++++ crates/derive/.idea/vcs.xml | 6 ++++++ crates/derive/src/pipeline/core.rs | 12 ++++++------ crates/derive/src/test_utils/pipeline.rs | 1 + crates/mpt/.idea/.gitignore | 6 ++++++ crates/mpt/.idea/modules.xml | 8 ++++++++ crates/mpt/.idea/mpt.iml | 11 +++++++++++ crates/mpt/.idea/vcs.xml | 6 ++++++ 11 files changed, 69 insertions(+), 6 deletions(-) create mode 100644 crates/derive/.idea/.gitignore create mode 100644 crates/derive/.idea/derive.iml create mode 100644 crates/derive/.idea/modules.xml create mode 100644 crates/derive/.idea/vcs.xml create mode 100644 crates/mpt/.idea/.gitignore create mode 100644 crates/mpt/.idea/modules.xml create mode 100644 crates/mpt/.idea/mpt.iml create mode 100644 crates/mpt/.idea/vcs.xml diff --git a/crates/derive-alloy/src/pipeline.rs b/crates/derive-alloy/src/pipeline.rs index 6ef4d9a60..8151614c7 100644 --- a/crates/derive-alloy/src/pipeline.rs +++ b/crates/derive-alloy/src/pipeline.rs @@ -66,6 +66,7 @@ pub fn new_online_pipeline( .chain_provider(chain_provider) .builder(builder) .origin(origin) + .metrics(PipelineMetrics::no_op()) .build() } diff --git a/crates/derive/.idea/.gitignore b/crates/derive/.idea/.gitignore new file mode 100644 index 000000000..8bf4d45d6 --- /dev/null +++ b/crates/derive/.idea/.gitignore @@ -0,0 +1,6 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/crates/derive/.idea/derive.iml b/crates/derive/.idea/derive.iml new file mode 100644 index 000000000..54bc4f701 --- /dev/null +++ b/crates/derive/.idea/derive.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/crates/derive/.idea/modules.xml b/crates/derive/.idea/modules.xml new file mode 100644 index 000000000..5cb66b3c2 --- /dev/null +++ b/crates/derive/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/crates/derive/.idea/vcs.xml b/crates/derive/.idea/vcs.xml new file mode 100644 index 000000000..b2bdec2d7 --- /dev/null +++ b/crates/derive/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 17cf31f1f..fc7bb19b0 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -194,7 +194,7 @@ where #[cfg(test)] mod tests { use crate::{ - metrics::{NoopDerivationPipelineMetrics, PipelineMetrics}, + metrics::{PipelineMetrics}, pipeline::{DerivationPipeline, PipelineError, StepResult}, test_utils::{TestL2ChainProvider, *}, traits::{ActivationSignal, Pipeline, ResetSignal, Signal, SignalReceiver}, @@ -279,7 +279,7 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let metrics = NoopDerivationPipelineMetrics; + let metrics = PipelineMetrics::no_op(); let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); @@ -295,7 +295,7 @@ mod tests { let mut l2_chain_provider = TestL2ChainProvider::default(); l2_chain_provider.system_configs.insert(0, SystemConfig::default()); let attributes = TestNextAttributes::default(); - let metrics = NoopDerivationPipelineMetrics; + let metrics = PipelineMetrics::no_op(); let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); @@ -309,7 +309,7 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let metrics = NoopDerivationPipelineMetrics; + let metrics = PipelineMetrics::no_op(); let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); @@ -323,7 +323,7 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let metrics = NoopDerivationPipelineMetrics; + let metrics = PipelineMetrics::no_op(); let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); @@ -338,7 +338,7 @@ mod tests { let mut l2_chain_provider = TestL2ChainProvider::default(); l2_chain_provider.system_configs.insert(0, SystemConfig::default()); let attributes = TestNextAttributes::default(); - let metrics = NoopDerivationPipelineMetrics; + let metrics = PipelineMetrics::no_op(); let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); diff --git a/crates/derive/src/test_utils/pipeline.rs b/crates/derive/src/test_utils/pipeline.rs index 3b3721be7..150ccd01e 100644 --- a/crates/derive/src/test_utils/pipeline.rs +++ b/crates/derive/src/test_utils/pipeline.rs @@ -99,5 +99,6 @@ pub fn new_test_pipeline() -> TestPipeline { .builder(TestAttributesBuilder::default()) .chain_provider(TestChainProvider::default()) .l2_chain_provider(TestL2ChainProvider::default()) + .metrics(PipelineMetrics::no_op()) .build() } diff --git a/crates/mpt/.idea/.gitignore b/crates/mpt/.idea/.gitignore new file mode 100644 index 000000000..8bf4d45d6 --- /dev/null +++ b/crates/mpt/.idea/.gitignore @@ -0,0 +1,6 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/crates/mpt/.idea/modules.xml b/crates/mpt/.idea/modules.xml new file mode 100644 index 000000000..d37e5de01 --- /dev/null +++ b/crates/mpt/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/crates/mpt/.idea/mpt.iml b/crates/mpt/.idea/mpt.iml new file mode 100644 index 000000000..3c65f9b12 --- /dev/null +++ b/crates/mpt/.idea/mpt.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/crates/mpt/.idea/vcs.xml b/crates/mpt/.idea/vcs.xml new file mode 100644 index 000000000..b2bdec2d7 --- /dev/null +++ b/crates/mpt/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file From 04f79bac7288e4ba9da3b1a2a0b5cb20a558b7a4 Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Wed, 30 Oct 2024 18:21:52 +0100 Subject: [PATCH 03/11] chore: fmt --- crates/derive/src/pipeline/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index fc7bb19b0..123160920 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -194,7 +194,7 @@ where #[cfg(test)] mod tests { use crate::{ - metrics::{PipelineMetrics}, + metrics::PipelineMetrics, pipeline::{DerivationPipeline, PipelineError, StepResult}, test_utils::{TestL2ChainProvider, *}, traits::{ActivationSignal, Pipeline, ResetSignal, Signal, SignalReceiver}, From 5b3cced447cd3d72cdf792e6ae3109dd22420712 Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Fri, 1 Nov 2024 13:33:12 +0100 Subject: [PATCH 04/11] fix: `.idea` dirs --- .gitignore | 3 +++ crates/derive/.idea/.gitignore | 6 ------ crates/derive/.idea/derive.iml | 10 ---------- crates/derive/.idea/modules.xml | 8 -------- crates/derive/.idea/vcs.xml | 6 ------ crates/mpt/.idea/.gitignore | 6 ------ crates/mpt/.idea/modules.xml | 8 -------- crates/mpt/.idea/mpt.iml | 11 ----------- crates/mpt/.idea/vcs.xml | 6 ------ 9 files changed, 3 insertions(+), 61 deletions(-) delete mode 100644 crates/derive/.idea/.gitignore delete mode 100644 crates/derive/.idea/derive.iml delete mode 100644 crates/derive/.idea/modules.xml delete mode 100644 crates/derive/.idea/vcs.xml delete mode 100644 crates/mpt/.idea/.gitignore delete mode 100644 crates/mpt/.idea/modules.xml delete mode 100644 crates/mpt/.idea/mpt.iml delete mode 100644 crates/mpt/.idea/vcs.xml diff --git a/.gitignore b/.gitignore index 7df26869d..cdb74debd 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ monorepo/ # Environment Variables .env + +# JetBrains +**/.idea/ diff --git a/crates/derive/.idea/.gitignore b/crates/derive/.idea/.gitignore deleted file mode 100644 index 8bf4d45d6..000000000 --- a/crates/derive/.idea/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/crates/derive/.idea/derive.iml b/crates/derive/.idea/derive.iml deleted file mode 100644 index 54bc4f701..000000000 --- a/crates/derive/.idea/derive.iml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - - - - \ No newline at end of file diff --git a/crates/derive/.idea/modules.xml b/crates/derive/.idea/modules.xml deleted file mode 100644 index 5cb66b3c2..000000000 --- a/crates/derive/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/crates/derive/.idea/vcs.xml b/crates/derive/.idea/vcs.xml deleted file mode 100644 index b2bdec2d7..000000000 --- a/crates/derive/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/crates/mpt/.idea/.gitignore b/crates/mpt/.idea/.gitignore deleted file mode 100644 index 8bf4d45d6..000000000 --- a/crates/mpt/.idea/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/crates/mpt/.idea/modules.xml b/crates/mpt/.idea/modules.xml deleted file mode 100644 index d37e5de01..000000000 --- a/crates/mpt/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/crates/mpt/.idea/mpt.iml b/crates/mpt/.idea/mpt.iml deleted file mode 100644 index 3c65f9b12..000000000 --- a/crates/mpt/.idea/mpt.iml +++ /dev/null @@ -1,11 +0,0 @@ - - - - - - - - - - - \ No newline at end of file diff --git a/crates/mpt/.idea/vcs.xml b/crates/mpt/.idea/vcs.xml deleted file mode 100644 index b2bdec2d7..000000000 --- a/crates/mpt/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file From 5f61eca8408b336ebf77a6ce0e85cd8af943961e Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Fri, 1 Nov 2024 13:38:28 +0100 Subject: [PATCH 05/11] fix: `DerivationPipelineMetrics` changed --- crates/derive/src/metrics/mod.rs | 10 +++------- crates/derive/src/metrics/noop.rs | 7 +++---- crates/derive/src/pipeline/core.rs | 4 ++-- crates/derive/src/traits/pipeline.rs | 7 ++----- 4 files changed, 10 insertions(+), 18 deletions(-) diff --git a/crates/derive/src/metrics/mod.rs b/crates/derive/src/metrics/mod.rs index 50aeeb7bb..2340b61ca 100644 --- a/crates/derive/src/metrics/mod.rs +++ b/crates/derive/src/metrics/mod.rs @@ -4,7 +4,7 @@ mod noop; use alloc::sync::Arc; use core::fmt::Debug; - +use crate::pipeline::Signal; use crate::traits::{DerivationPipelineMetrics, StepResult}; /// Composite metrics struct containing metrics for all stages. @@ -24,11 +24,7 @@ impl DerivationPipelineMetrics for PipelineMetrics { self.derivation_pipeline_metrics.record_step_result(result) } - fn inc_reset_signals(&self) { - self.derivation_pipeline_metrics.inc_reset_signals() - } - - fn inc_flush_channel_signals(&self) { - self.derivation_pipeline_metrics.inc_flush_channel_signals() + fn record_signal(&self, signal: &Signal) { + self.derivation_pipeline_metrics.record_signal(signal) } } diff --git a/crates/derive/src/metrics/noop.rs b/crates/derive/src/metrics/noop.rs index 64690a6d6..1cfaaf59c 100644 --- a/crates/derive/src/metrics/noop.rs +++ b/crates/derive/src/metrics/noop.rs @@ -1,5 +1,6 @@ use crate::{metrics::PipelineMetrics, pipeline::StepResult, traits::DerivationPipelineMetrics}; use alloc::sync::Arc; +use crate::pipeline::Signal; impl PipelineMetrics { /// No-op implementation for `PipelineMetrics`. @@ -19,10 +20,8 @@ impl DerivationPipelineMetrics for NoopDerivationPipelineMetrics { fn record_step_result(&self, _result: &StepResult) { // No-op } - fn inc_reset_signals(&self) { - // No-op - } - fn inc_flush_channel_signals(&self) { + + fn record_signal(&self, _signal: &Signal) { // No-op } } diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 123160920..b62d9812b 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -106,7 +106,7 @@ where match signal { s @ Signal::Reset(ResetSignal { l2_safe_head, .. }) | s @ Signal::Activation(ActivationSignal { l2_safe_head, .. }) => { - self.metrics.inc_reset_signals(); + self.metrics.record_signal(&s); let system_config = self .l2_chain_provider @@ -130,7 +130,7 @@ where } } Signal::FlushChannel => { - self.metrics.inc_flush_channel_signals(); + self.metrics.record_signal(&signal); self.attributes.signal(signal).await?; } diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index 0ee4c8f1c..d261a59f8 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -106,11 +106,8 @@ pub trait DerivationPipelineMetrics { /// Records the result of a step in the pipeline. fn record_step_result(&self, result: &StepResult); - /// Increments the count of reset signals received. - fn inc_reset_signals(&self); - - /// Increments the count of flush channel signals received. - fn inc_flush_channel_signals(&self); + /// Records a signal event in the pipeline. + fn record_signal(&self, signal: &Signal); } #[cfg(test)] From 82591bc4b2341c2427befa6a877f49975601cc74 Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Fri, 1 Nov 2024 14:02:42 +0100 Subject: [PATCH 06/11] fix: `DerivationPipeline` metrics changes --- crates/derive/src/pipeline/builder.rs | 4 ++-- crates/derive/src/pipeline/core.rs | 23 +++++++++-------------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index ea4558dc2..ab389afa3 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -116,13 +116,13 @@ where } /// Builds the pipeline. - pub fn build(self) -> DerivationPipeline, T, PipelineMetrics> { + pub fn build(self) -> DerivationPipeline, T> { self.into() } } impl From> - for DerivationPipeline, T, PipelineMetrics> + for DerivationPipeline, T> where B: AttributesBuilder + Send + Debug, P: ChainProvider + Send + Sync + Debug, diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index b62d9812b..3cae29fb2 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -18,14 +18,14 @@ use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpAttributesWithParent; use tracing::{error, trace, warn}; +use crate::metrics::PipelineMetrics; /// The derivation pipeline is responsible for deriving L2 inputs from L1 data. #[derive(Debug)] -pub struct DerivationPipeline +pub struct DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, - M: DerivationPipelineMetrics + Send + Sync, { /// A handle to the next attributes. pub attributes: S, @@ -38,42 +38,39 @@ where /// The L2 Chain Provider used to fetch the system config on reset. pub l2_chain_provider: P, /// Metrics collector. - pub metrics: M, + pub metrics: PipelineMetrics } -impl DerivationPipeline +impl DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, - M: DerivationPipelineMetrics + Send + Sync, { /// Creates a new instance of the [DerivationPipeline]. pub const fn new( attributes: S, rollup_config: Arc, l2_chain_provider: P, - metrics: M, + metrics: PipelineMetrics, ) -> Self { Self { attributes, prepared: VecDeque::new(), rollup_config, l2_chain_provider, metrics } } } -impl OriginProvider for DerivationPipeline +impl OriginProvider for DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, - M: DerivationPipelineMetrics + Send + Sync, { fn origin(&self) -> Option { self.attributes.origin() } } -impl Iterator for DerivationPipeline +impl Iterator for DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, - M: DerivationPipelineMetrics + Send + Sync, { type Item = OpAttributesWithParent; @@ -83,11 +80,10 @@ where } #[async_trait] -impl SignalReceiver for DerivationPipeline +impl SignalReceiver for DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, - M: DerivationPipelineMetrics + Send + Sync, { /// Signals the pipeline by calling the [`SignalReceiver::signal`] method. /// @@ -140,11 +136,10 @@ where } #[async_trait] -impl Pipeline for DerivationPipeline +impl Pipeline for DerivationPipeline where S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, - M: DerivationPipelineMetrics + Send + Sync, { /// Peeks at the next prepared [OpAttributesWithParent] from the pipeline. fn peek(&self) -> Option<&OpAttributesWithParent> { From b27315daf276ab1d0eac1f2a9818978e347fbe52 Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Fri, 1 Nov 2024 14:26:45 +0100 Subject: [PATCH 07/11] feat: add `AttributesQueueMetrics`, refactoring --- bin/client/src/l1/driver.rs | 1 - crates/derive-alloy/src/pipeline.rs | 7 ++--- crates/derive/src/metrics/mod.rs | 14 +++++++-- crates/derive/src/metrics/noop.rs | 18 +++++++++-- crates/derive/src/pipeline/builder.rs | 8 +++-- crates/derive/src/pipeline/core.rs | 4 +-- crates/derive/src/stages/attributes_queue.rs | 33 +++++++++++++++----- crates/derive/src/test_utils/pipeline.rs | 3 +- crates/derive/src/traits/attributes.rs | 6 ++++ crates/derive/src/traits/mod.rs | 4 ++- 10 files changed, 73 insertions(+), 25 deletions(-) diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 9f03d4c10..464ed4442 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -38,7 +38,6 @@ use tracing::{error, info, warn}; pub type OraclePipeline = DerivationPipeline< OracleAttributesQueue, O>, OracleL2ChainProvider, - PipelineMetrics, >; /// An oracle-backed Ethereum data source. diff --git a/crates/derive-alloy/src/pipeline.rs b/crates/derive-alloy/src/pipeline.rs index 8151614c7..e21a95fd3 100644 --- a/crates/derive-alloy/src/pipeline.rs +++ b/crates/derive-alloy/src/pipeline.rs @@ -18,11 +18,8 @@ use op_alloy_protocol::BlockInfo; use std::sync::Arc; /// An online derivation pipeline. -pub type OnlinePipeline = DerivationPipeline< - OnlineAttributesQueue, - AlloyL2ChainProvider, - PipelineMetrics, ->; +pub type OnlinePipeline = + DerivationPipeline, AlloyL2ChainProvider>; /// An `online` Ethereum data source. pub type OnlineDataProvider = EthereumDataSource< diff --git a/crates/derive/src/metrics/mod.rs b/crates/derive/src/metrics/mod.rs index 2340b61ca..21821bdd1 100644 --- a/crates/derive/src/metrics/mod.rs +++ b/crates/derive/src/metrics/mod.rs @@ -2,14 +2,18 @@ mod noop; +use crate::{ + pipeline::Signal, + traits::{AttributesQueueMetrics, DerivationPipelineMetrics, StepResult}, +}; use alloc::sync::Arc; use core::fmt::Debug; -use crate::pipeline::Signal; -use crate::traits::{DerivationPipelineMetrics, StepResult}; /// Composite metrics struct containing metrics for all stages. +#[derive(Clone)] pub struct PipelineMetrics { pub(crate) derivation_pipeline_metrics: Arc, + pub(crate) attributes_queue_metrics: Arc, // todo: add more metrics here for each stage } @@ -28,3 +32,9 @@ impl DerivationPipelineMetrics for PipelineMetrics { self.derivation_pipeline_metrics.record_signal(signal) } } + +impl AttributesQueueMetrics for PipelineMetrics { + fn record_some_metric(&self) { + self.attributes_queue_metrics.record_some_metric() + } +} diff --git a/crates/derive/src/metrics/noop.rs b/crates/derive/src/metrics/noop.rs index 1cfaaf59c..cf31c35b9 100644 --- a/crates/derive/src/metrics/noop.rs +++ b/crates/derive/src/metrics/noop.rs @@ -1,12 +1,16 @@ -use crate::{metrics::PipelineMetrics, pipeline::StepResult, traits::DerivationPipelineMetrics}; +use crate::{ + metrics::PipelineMetrics, + pipeline::{Signal, StepResult}, + traits::{AttributesQueueMetrics, DerivationPipelineMetrics}, +}; use alloc::sync::Arc; -use crate::pipeline::Signal; impl PipelineMetrics { /// No-op implementation for `PipelineMetrics`. pub fn no_op() -> Self { Self { derivation_pipeline_metrics: Arc::new(NoopDerivationPipelineMetrics), + attributes_queue_metrics: Arc::new(NoopAttributesQueueMetrics), // todo: add more metrics here for each stage } } @@ -25,3 +29,13 @@ impl DerivationPipelineMetrics for NoopDerivationPipelineMetrics { // No-op } } + +/// No-op implementation of `DerivationPipelineMetrics`. +#[derive(Debug)] +struct NoopAttributesQueueMetrics; + +impl AttributesQueueMetrics for NoopAttributesQueueMetrics { + fn record_some_metric(&self) { + // No-op + } +} diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index ab389afa3..d7cb35cbc 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -150,8 +150,12 @@ where BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); let batch_provider = BatchProvider::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); - let attributes = - AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder); + let attributes = AttributesQueue::new( + rollup_config.clone(), + batch_provider, + attributes_builder, + metrics.clone(), + ); // Create the pipeline. Self::new(attributes, rollup_config, l2_chain_provider, metrics) diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 3cae29fb2..544acbb32 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -6,6 +6,7 @@ use super::{ }; use crate::{ errors::PipelineErrorKind, + metrics::PipelineMetrics, traits::{ ActivationSignal, DerivationPipelineMetrics, L2ChainProvider, ResetSignal, Signal, SignalReceiver, @@ -18,7 +19,6 @@ use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpAttributesWithParent; use tracing::{error, trace, warn}; -use crate::metrics::PipelineMetrics; /// The derivation pipeline is responsible for deriving L2 inputs from L1 data. #[derive(Debug)] @@ -38,7 +38,7 @@ where /// The L2 Chain Provider used to fetch the system config on reset. pub l2_chain_provider: P, /// Metrics collector. - pub metrics: PipelineMetrics + pub metrics: PipelineMetrics, } impl DerivationPipeline diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index f746d365a..4a41899f4 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -2,9 +2,10 @@ use crate::{ errors::{PipelineError, PipelineResult, ResetError}, + metrics::PipelineMetrics, traits::{ - AttributesBuilder, AttributesProvider, NextAttributes, OriginAdvancer, OriginProvider, - Signal, SignalReceiver, + AttributesBuilder, AttributesProvider, AttributesQueueMetrics, NextAttributes, + OriginAdvancer, OriginProvider, Signal, SignalReceiver, }, }; use alloc::{boxed::Box, sync::Arc}; @@ -44,6 +45,8 @@ where batch: Option, /// The attributes builder. builder: AB, + /// Metrics collector. + metrics: PipelineMetrics, } impl AttributesQueue @@ -52,12 +55,18 @@ where AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. - pub const fn new(cfg: Arc, prev: P, builder: AB) -> Self { - Self { cfg, prev, is_last_in_span: false, batch: None, builder } + pub const fn new( + cfg: Arc, + prev: P, + builder: AB, + metrics: PipelineMetrics, + ) -> Self { + Self { cfg, prev, is_last_in_span: false, batch: None, builder, metrics } } /// Loads a [SingleBatch] from the [AttributesProvider] if needed. pub async fn load_batch(&mut self, parent: L2BlockInfo) -> PipelineResult { + self.metrics.record_some_metric(); if self.batch.is_none() { let batch = self.prev.next_batch(parent).await?; self.batch = Some(batch); @@ -228,7 +237,12 @@ mod tests { let cfg = cfg.unwrap_or_default(); let mock_batch_queue = new_test_attributes_provider(origin, batches); let mock_attributes_builder = TestAttributesBuilder::default(); - AttributesQueue::new(Arc::new(cfg), mock_batch_queue, mock_attributes_builder) + AttributesQueue::new( + Arc::new(cfg), + mock_batch_queue, + mock_attributes_builder, + PipelineMetrics::no_op(), + ) } #[tokio::test] @@ -246,7 +260,8 @@ mod tests { let cfg = RollupConfig::default(); let mock = new_test_attributes_provider(None, vec![]); let mock_builder = TestAttributesBuilder::default(); - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); aq.batch = Some(SingleBatch::default()); assert!(!aq.prev.reset); aq.signal(ResetSignal::default().signal()).await.unwrap(); @@ -340,7 +355,8 @@ mod tests { let mut payload_attributes = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] }; - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let txs = vec![Bytes::default(), Bytes::default()]; let batch = SingleBatch { transactions: txs.clone(), ..Default::default() }; @@ -368,7 +384,8 @@ mod tests { let mock = new_test_attributes_provider(None, vec![Ok(Default::default())]); let mut pa = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(pa.clone())] }; - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); // If we load the batch, we should get the last in span. // But it won't take it so it will be available in the next_attributes call. let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); diff --git a/crates/derive/src/test_utils/pipeline.rs b/crates/derive/src/test_utils/pipeline.rs index 150ccd01e..03ee9ab86 100644 --- a/crates/derive/src/test_utils/pipeline.rs +++ b/crates/derive/src/test_utils/pipeline.rs @@ -87,8 +87,7 @@ pub type TestBatchProvider = BatchProvider pub type TestAttributesQueue = AttributesQueue; /// A [DerivationPipeline] using test providers and sources. -pub type TestPipeline = - DerivationPipeline; +pub type TestPipeline = DerivationPipeline; /// Constructs a [DerivationPipeline] using test providers and sources. pub fn new_test_pipeline() -> TestPipeline { diff --git a/crates/derive/src/traits/attributes.rs b/crates/derive/src/traits/attributes.rs index 070ac724e..4f9f103c9 100644 --- a/crates/derive/src/traits/attributes.rs +++ b/crates/derive/src/traits/attributes.rs @@ -47,3 +47,9 @@ pub trait AttributesBuilder { epoch: BlockNumHash, ) -> PipelineResult; } + +/// Metrics trait for `AttributesQueue` stage. +pub trait AttributesQueueMetrics { + /// Records something. + fn record_some_metric(&self); +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 0e23ff623..783b5c788 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -10,7 +10,9 @@ mod providers; pub use providers::{ChainProvider, L2ChainProvider}; mod attributes; -pub use attributes::{AttributesBuilder, AttributesProvider, NextAttributes}; +pub use attributes::{ + AttributesBuilder, AttributesProvider, AttributesQueueMetrics, NextAttributes, +}; mod data_sources; pub use data_sources::{AsyncIterator, BlobProvider, DataAvailabilityProvider}; From 014f07399c8ebb3f2b88c2e16aa8befff4370a7c Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Tue, 5 Nov 2024 21:18:02 +0100 Subject: [PATCH 08/11] feat: add no-op metrics for a few stages --- crates/derive/src/metrics/mod.rs | 76 +++++++++++++++-- crates/derive/src/metrics/noop.rs | 85 ++++++++++++++++++-- crates/derive/src/pipeline/builder.rs | 16 ++-- crates/derive/src/stages/attributes_queue.rs | 33 ++------ crates/derive/src/stages/frame_queue.rs | 40 ++++----- crates/derive/src/stages/l1_retrieval.rs | 58 +++++++------ crates/derive/src/stages/l1_traversal.rs | 36 +++++++-- crates/derive/src/stages/mod.rs | 4 +- crates/derive/src/test_utils/frame_queue.rs | 3 +- crates/derive/src/test_utils/frames.rs | 7 +- crates/derive/src/traits/attributes.rs | 6 -- crates/derive/src/traits/frame_queue.rs | 30 +++++++ crates/derive/src/traits/l1_retrieval.rs | 35 ++++++++ crates/derive/src/traits/l1_traversal.rs | 15 ++++ crates/derive/src/traits/mod.rs | 13 ++- 15 files changed, 341 insertions(+), 116 deletions(-) create mode 100644 crates/derive/src/traits/frame_queue.rs create mode 100644 crates/derive/src/traits/l1_retrieval.rs create mode 100644 crates/derive/src/traits/l1_traversal.rs diff --git a/crates/derive/src/metrics/mod.rs b/crates/derive/src/metrics/mod.rs index 21821bdd1..530107f22 100644 --- a/crates/derive/src/metrics/mod.rs +++ b/crates/derive/src/metrics/mod.rs @@ -3,8 +3,12 @@ mod noop; use crate::{ + errors::PipelineErrorKind, pipeline::Signal, - traits::{AttributesQueueMetrics, DerivationPipelineMetrics, StepResult}, + traits::{ + DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, L1TraversalMetrics, + StepResult, + }, }; use alloc::sync::Arc; use core::fmt::Debug; @@ -13,7 +17,9 @@ use core::fmt::Debug; #[derive(Clone)] pub struct PipelineMetrics { pub(crate) derivation_pipeline_metrics: Arc, - pub(crate) attributes_queue_metrics: Arc, + pub(crate) l1_traversal_metrics: Arc, + pub(crate) l1_retrieval_metrics: Arc, + pub(crate) frame_queue_metrics: Arc, // todo: add more metrics here for each stage } @@ -33,8 +39,68 @@ impl DerivationPipelineMetrics for PipelineMetrics { } } -impl AttributesQueueMetrics for PipelineMetrics { - fn record_some_metric(&self) { - self.attributes_queue_metrics.record_some_metric() +impl L1TraversalMetrics for PipelineMetrics { + fn record_block_processed(&self, block_number: u64) { + self.l1_traversal_metrics.record_block_processed(block_number) + } + + fn record_system_config_update(&self) { + self.l1_traversal_metrics.record_system_config_update() + } + + fn record_reorg_detected(&self) { + self.l1_traversal_metrics.record_reorg_detected() + } + + fn record_holocene_activation(&self) { + self.l1_traversal_metrics.record_holocene_activation() + } + + fn record_error(&self, error: &PipelineErrorKind) { + self.l1_traversal_metrics.record_error(error) + } +} + +impl L1RetrievalMetrics for PipelineMetrics { + fn record_data_fetch_attempt(&self, block_number: u64) { + self.l1_retrieval_metrics.record_data_fetch_attempt(block_number) + } + + fn record_data_fetch_success(&self, block_number: u64) { + self.l1_retrieval_metrics.record_data_fetch_success(block_number) + } + + fn record_data_fetch_failure(&self, block_number: u64, error: &PipelineErrorKind) { + self.l1_retrieval_metrics.record_data_fetch_failure(block_number, error) + } + + fn record_block_processed(&self, block_number: u64) { + self.l1_retrieval_metrics.record_block_processed(block_number) + } + + fn record_error(&self, error: &PipelineErrorKind) { + self.l1_retrieval_metrics.record_error(error) + } +} + +impl FrameQueueMetrics for PipelineMetrics { + fn record_frames_decoded(&self, count: usize) { + self.frame_queue_metrics.record_frames_decoded(count) + } + + fn record_frames_dropped(&self, count: usize) { + self.frame_queue_metrics.record_frames_dropped(count) + } + + fn record_frames_queued(&self, count: usize) { + self.frame_queue_metrics.record_frames_queued(count) + } + + fn record_load_frames_attempt(&self) { + self.frame_queue_metrics.record_load_frames_attempt() + } + + fn record_error(&self, error: &PipelineErrorKind) { + self.frame_queue_metrics.record_error(error) } } diff --git a/crates/derive/src/metrics/noop.rs b/crates/derive/src/metrics/noop.rs index cf31c35b9..10b86e77e 100644 --- a/crates/derive/src/metrics/noop.rs +++ b/crates/derive/src/metrics/noop.rs @@ -1,7 +1,10 @@ use crate::{ + errors::PipelineErrorKind, metrics::PipelineMetrics, pipeline::{Signal, StepResult}, - traits::{AttributesQueueMetrics, DerivationPipelineMetrics}, + traits::{ + DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, L1TraversalMetrics, + }, }; use alloc::sync::Arc; @@ -10,7 +13,9 @@ impl PipelineMetrics { pub fn no_op() -> Self { Self { derivation_pipeline_metrics: Arc::new(NoopDerivationPipelineMetrics), - attributes_queue_metrics: Arc::new(NoopAttributesQueueMetrics), + l1_traversal_metrics: Arc::new(NoopL1TraversalMetrics), + l1_retrieval_metrics: Arc::new(NoopL1RetrievalMetrics), + frame_queue_metrics: Arc::new(NoopFrameQueueMetrics), // todo: add more metrics here for each stage } } @@ -30,12 +35,80 @@ impl DerivationPipelineMetrics for NoopDerivationPipelineMetrics { } } -/// No-op implementation of `DerivationPipelineMetrics`. +/// No-op implementation of `L1TraversalMetrics`. #[derive(Debug)] -struct NoopAttributesQueueMetrics; +struct NoopL1TraversalMetrics; + +impl L1TraversalMetrics for NoopL1TraversalMetrics { + fn record_block_processed(&self, _block_number: u64) { + // No-op + } + + fn record_system_config_update(&self) { + // No-op + } + + fn record_reorg_detected(&self) { + // No-op + } + + fn record_holocene_activation(&self) { + // No-op + } + + fn record_error(&self, _error: &PipelineErrorKind) { + // No-op + } +} + +/// No-op implementation of `L1RetrievalMetrics`. +#[derive(Debug)] +struct NoopL1RetrievalMetrics; + +impl L1RetrievalMetrics for NoopL1RetrievalMetrics { + fn record_data_fetch_attempt(&self, _block_number: u64) { + // No-op + } + + fn record_data_fetch_success(&self, _block_number: u64) { + // No-op + } + + fn record_data_fetch_failure(&self, _block_number: u64, _error: &PipelineErrorKind) { + // No-op + } + + fn record_block_processed(&self, _block_number: u64) { + // No-op + } + + fn record_error(&self, _error: &PipelineErrorKind) { + // No-op + } +} + +/// No-op implementation of `FrameQueueMetrics`. +#[derive(Debug)] +struct NoopFrameQueueMetrics; + +impl FrameQueueMetrics for NoopFrameQueueMetrics { + fn record_frames_decoded(&self, _count: usize) { + // No-op + } + + fn record_frames_dropped(&self, _count: usize) { + // No-op + } + + fn record_frames_queued(&self, _count: usize) { + // No-op + } + + fn record_load_frames_attempt(&self) { + // No-op + } -impl AttributesQueueMetrics for NoopAttributesQueueMetrics { - fn record_some_metric(&self) { + fn record_error(&self, _error: &PipelineErrorKind) { // No-op } } diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index d7cb35cbc..ad3405f3e 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -140,22 +140,20 @@ where // todo: add metrics to the stages // Compose the stage stack. - let mut l1_traversal = L1Traversal::new(chain_provider, Arc::clone(&rollup_config)); + let mut l1_traversal = + L1Traversal::new(chain_provider, Arc::clone(&rollup_config), metrics.clone()); l1_traversal.block = Some(builder.origin.expect("origin must be set")); - let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); - let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config)); + let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source, metrics.clone()); + let frame_queue = + FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config), metrics.clone()); let channel_provider = ChannelProvider::new(Arc::clone(&rollup_config), frame_queue); let channel_reader = ChannelReader::new(channel_provider, Arc::clone(&rollup_config)); let batch_stream = BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); let batch_provider = BatchProvider::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); - let attributes = AttributesQueue::new( - rollup_config.clone(), - batch_provider, - attributes_builder, - metrics.clone(), - ); + let attributes = + AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder); // Create the pipeline. Self::new(attributes, rollup_config, l2_chain_provider, metrics) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 4a41899f4..f746d365a 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -2,10 +2,9 @@ use crate::{ errors::{PipelineError, PipelineResult, ResetError}, - metrics::PipelineMetrics, traits::{ - AttributesBuilder, AttributesProvider, AttributesQueueMetrics, NextAttributes, - OriginAdvancer, OriginProvider, Signal, SignalReceiver, + AttributesBuilder, AttributesProvider, NextAttributes, OriginAdvancer, OriginProvider, + Signal, SignalReceiver, }, }; use alloc::{boxed::Box, sync::Arc}; @@ -45,8 +44,6 @@ where batch: Option, /// The attributes builder. builder: AB, - /// Metrics collector. - metrics: PipelineMetrics, } impl AttributesQueue @@ -55,18 +52,12 @@ where AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. - pub const fn new( - cfg: Arc, - prev: P, - builder: AB, - metrics: PipelineMetrics, - ) -> Self { - Self { cfg, prev, is_last_in_span: false, batch: None, builder, metrics } + pub const fn new(cfg: Arc, prev: P, builder: AB) -> Self { + Self { cfg, prev, is_last_in_span: false, batch: None, builder } } /// Loads a [SingleBatch] from the [AttributesProvider] if needed. pub async fn load_batch(&mut self, parent: L2BlockInfo) -> PipelineResult { - self.metrics.record_some_metric(); if self.batch.is_none() { let batch = self.prev.next_batch(parent).await?; self.batch = Some(batch); @@ -237,12 +228,7 @@ mod tests { let cfg = cfg.unwrap_or_default(); let mock_batch_queue = new_test_attributes_provider(origin, batches); let mock_attributes_builder = TestAttributesBuilder::default(); - AttributesQueue::new( - Arc::new(cfg), - mock_batch_queue, - mock_attributes_builder, - PipelineMetrics::no_op(), - ) + AttributesQueue::new(Arc::new(cfg), mock_batch_queue, mock_attributes_builder) } #[tokio::test] @@ -260,8 +246,7 @@ mod tests { let cfg = RollupConfig::default(); let mock = new_test_attributes_provider(None, vec![]); let mock_builder = TestAttributesBuilder::default(); - let mut aq = - AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); + let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); aq.batch = Some(SingleBatch::default()); assert!(!aq.prev.reset); aq.signal(ResetSignal::default().signal()).await.unwrap(); @@ -355,8 +340,7 @@ mod tests { let mut payload_attributes = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] }; - let mut aq = - AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); + let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); let parent = L2BlockInfo::default(); let txs = vec![Bytes::default(), Bytes::default()]; let batch = SingleBatch { transactions: txs.clone(), ..Default::default() }; @@ -384,8 +368,7 @@ mod tests { let mock = new_test_attributes_provider(None, vec![Ok(Default::default())]); let mut pa = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(pa.clone())] }; - let mut aq = - AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); + let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); // If we load the batch, we should get the last in span. // But it won't take it so it will be available in the next_attributes call. let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 6e103785c..1efa1c4b5 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -2,29 +2,20 @@ use crate::{ errors::{PipelineError, PipelineResult}, + metrics::PipelineMetrics, stages::NextFrameProvider, - traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + traits::{ + FrameQueueMetrics, FrameQueueProvider, OriginAdvancer, OriginProvider, Signal, + SignalReceiver, + }, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; -use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, Frame}; use tracing::{debug, error, trace}; -/// Provides data frames for the [FrameQueue] stage. -#[async_trait] -pub trait FrameQueueProvider { - /// An item that can be converted into a byte array. - type Item: Into; - - /// Retrieves the next data item from the L1 retrieval stage. - /// If there is data, it pushes it into the next stage. - /// If there is no data, it returns an error. - async fn next_data(&mut self) -> PipelineResult; -} - /// The [FrameQueue] stage of the derivation pipeline. /// This stage takes the output of the [L1Retrieval] stage and parses it into frames. /// @@ -40,6 +31,8 @@ where queue: VecDeque, /// The rollup config. rollup_config: Arc, + /// Metrics collector. + metrics: PipelineMetrics, } impl

FrameQueue

@@ -49,8 +42,8 @@ where /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// /// [L1Retrieval]: crate::stages::L1Retrieval - pub const fn new(prev: P, cfg: Arc) -> Self { - Self { prev, queue: VecDeque::new(), rollup_config: cfg } + pub const fn new(prev: P, cfg: Arc, metrics: PipelineMetrics) -> Self { + Self { prev, queue: VecDeque::new(), rollup_config: cfg, metrics } } /// Returns if holocene is active. @@ -64,6 +57,8 @@ where return; } + let initial_len = self.queue.len(); + let mut i = 0; while i < self.queue.len() - 1 { let prev_frame = &self.queue[i]; @@ -105,6 +100,8 @@ where i += 1; } + + self.metrics.record_frames_dropped(initial_len - self.queue.len()); } /// Loads more frames into the [FrameQueue]. @@ -114,6 +111,8 @@ where return Ok(()); } + self.metrics.record_load_frames_attempt(); + let data = match self.prev.next_data().await { Ok(data) => data, Err(e) => { @@ -130,6 +129,8 @@ where return Ok(()); }; + self.metrics.record_frames_decoded(frames.len()); + // Optimistically extend the queue with the new frames. self.queue.extend(frames); @@ -195,11 +196,12 @@ pub(crate) mod tests { use super::*; use crate::{test_utils::TestFrameQueueProvider, traits::ResetSignal}; use alloc::vec; + use alloy_primitives::Bytes; #[tokio::test] async fn test_frame_queue_reset() { let mock = TestFrameQueueProvider::new(vec![]); - let mut frame_queue = FrameQueue::new(mock, Default::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default(), PipelineMetrics::no_op()); assert!(!frame_queue.prev.reset); frame_queue.signal(ResetSignal::default().signal()).await.unwrap(); assert_eq!(frame_queue.queue.len(), 0); @@ -211,7 +213,7 @@ pub(crate) mod tests { let data = vec![Ok(Bytes::from(vec![0x00]))]; let mut mock = TestFrameQueueProvider::new(data); mock.set_origin(BlockInfo::default()); - let mut frame_queue = FrameQueue::new(mock, Default::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default(), PipelineMetrics::no_op()); assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::NotEnoughData.temp()); @@ -222,7 +224,7 @@ pub(crate) mod tests { let data = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())]; let mut mock = TestFrameQueueProvider::new(data); mock.set_origin(BlockInfo::default()); - let mut frame_queue = FrameQueue::new(mock, Default::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default(), PipelineMetrics::no_op()); assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::NotEnoughData.temp()); diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 7fc7c298c..9ee7351ab 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -2,33 +2,17 @@ use crate::{ errors::{PipelineError, PipelineErrorKind, PipelineResult}, - stages::FrameQueueProvider, + metrics::PipelineMetrics, traits::{ - ActivationSignal, AsyncIterator, DataAvailabilityProvider, OriginAdvancer, OriginProvider, - ResetSignal, Signal, SignalReceiver, + ActivationSignal, AsyncIterator, DataAvailabilityProvider, FrameQueueProvider, + L1RetrievalMetrics, L1RetrievalProvider, OriginAdvancer, OriginProvider, ResetSignal, + Signal, SignalReceiver, }, }; use alloc::boxed::Box; -use alloy_primitives::Address; use async_trait::async_trait; use op_alloy_protocol::BlockInfo; -/// Provides L1 blocks for the [L1Retrieval] stage. -/// This is the previous stage in the pipeline. -#[async_trait] -pub trait L1RetrievalProvider { - /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. - /// This function can only be called once while the stage is in progress, and will return - /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is - /// complete and the [BlockInfo] has been consumed, an [PipelineError::Eof] error is returned. - /// - /// [L1Traversal]: crate::stages::L1Traversal - async fn next_l1_block(&mut self) -> PipelineResult>; - - /// Returns the batcher [Address] from the [op_alloy_genesis::SystemConfig]. - fn batcher_addr(&self) -> Address; -} - /// The [L1Retrieval] stage of the derivation pipeline. /// /// For each L1 [BlockInfo] pulled from the [L1Traversal] stage, [L1Retrieval] fetches the @@ -49,6 +33,8 @@ where pub provider: DAP, /// The current data iterator. pub(crate) data: Option, + /// Metrics collector. + metrics: PipelineMetrics, } impl L1Retrieval @@ -60,8 +46,8 @@ where /// [DataAvailabilityProvider]. /// /// [L1Traversal]: crate::stages::L1Traversal - pub const fn new(prev: P, provider: DAP) -> Self { - Self { prev, provider, data: None } + pub const fn new(prev: P, provider: DAP, metrics: PipelineMetrics) -> Self { + Self { prev, provider, data: None, metrics } } } @@ -91,7 +77,9 @@ where .next_l1_block() .await? // SAFETY: This question mark bubbles up the Eof error. .ok_or(PipelineError::MissingL1Data.temp())?; + self.metrics.record_data_fetch_attempt(next.number); self.data = Some(self.provider.open_data(&next).await?); + self.metrics.record_data_fetch_success(next.number); } match self.data.as_mut().expect("Cannot be None").next().await { @@ -143,13 +131,13 @@ mod tests { test_utils::{TestDAP, TestIter}, }; use alloc::vec; - use alloy_primitives::Bytes; + use alloy_primitives::{Address, Bytes}; #[tokio::test] async fn test_l1_retrieval_flush_channel() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let mut retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); retrieval.data = None; @@ -162,7 +150,7 @@ mod tests { async fn test_l1_retrieval_activation_signal() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let mut retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); retrieval.data = None; @@ -181,7 +169,7 @@ mod tests { async fn test_l1_retrieval_reset_signal() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let mut retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); retrieval.data = None; @@ -200,7 +188,7 @@ mod tests { async fn test_l1_retrieval_origin() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let retrieval = L1Retrieval::new(traversal, dap); + let retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); let expected = BlockInfo::default(); assert_eq!(retrieval.origin(), Some(expected)); } @@ -210,7 +198,7 @@ mod tests { let traversal = new_populated_test_traversal(); let results = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())]; let dap = TestDAP { results, batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let mut retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); assert_eq!(retrieval.data, None); let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); @@ -236,7 +224,12 @@ mod tests { // (traversal) is called in the retrieval stage. let traversal = new_test_traversal(vec![], vec![]); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; + let mut retrieval = L1Retrieval { + prev: traversal, + provider: dap, + data: Some(data), + metrics: PipelineMetrics::no_op(), + }; let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); assert!(retrieval.data.is_some()); @@ -252,7 +245,12 @@ mod tests { }; let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; + let mut retrieval = L1Retrieval { + prev: traversal, + provider: dap, + data: Some(data), + metrics: PipelineMetrics::no_op(), + }; let data = retrieval.next_data().await.unwrap_err(); assert_eq!(data, PipelineError::Eof.temp()); assert!(retrieval.data.is_none()); diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index f43e3cbb9..244f23ae7 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -2,10 +2,10 @@ use crate::{ errors::{PipelineError, PipelineResult, ResetError}, - stages::L1RetrievalProvider, + metrics::PipelineMetrics, traits::{ - ActivationSignal, ChainProvider, OriginAdvancer, OriginProvider, ResetSignal, Signal, - SignalReceiver, + ActivationSignal, ChainProvider, L1RetrievalProvider, L1TraversalMetrics, OriginAdvancer, + OriginProvider, ResetSignal, Signal, SignalReceiver, }, }; use alloc::{boxed::Box, string::ToString, sync::Arc}; @@ -34,6 +34,8 @@ pub struct L1Traversal { pub system_config: SystemConfig, /// A reference to the rollup config. pub rollup_config: Arc, + /// Metrics collector. + metrics: PipelineMetrics, } #[async_trait] @@ -54,13 +56,14 @@ impl L1RetrievalProvider for L1Traversal { impl L1Traversal { /// Creates a new [L1Traversal] instance. - pub fn new(data_source: F, cfg: Arc) -> Self { + pub fn new(data_source: F, cfg: Arc, metrics: PipelineMetrics) -> Self { Self { block: Some(BlockInfo::default()), data_source, done: false, system_config: SystemConfig::default(), rollup_config: cfg, + metrics, } } } @@ -76,24 +79,34 @@ impl OriginAdvancer for L1Traversal { let block = match self.block { Some(block) => block, None => { + self.metrics.record_error(&PipelineError::Eof.temp()); warn!(target: "l1-traversal", "Missing current block, can't advance origin with no reference."); return Err(PipelineError::Eof.temp()); } }; let next_l1_origin = match self.data_source.block_info_by_number(block.number + 1).await { Ok(block) => block, - Err(e) => return Err(PipelineError::Provider(e.to_string()).temp()), + Err(e) => { + let error = PipelineError::Provider(e.to_string()).temp(); + self.metrics.record_error(&error); + return Err(error); + } }; // Check block hashes for reorgs. if block.hash != next_l1_origin.parent_hash { + self.metrics.record_reorg_detected(); return Err(ResetError::ReorgDetected(block.hash, next_l1_origin.parent_hash).into()); } // Fetch receipts for the next l1 block and update the system config. let receipts = match self.data_source.receipts_by_hash(next_l1_origin.hash).await { Ok(receipts) => receipts, - Err(e) => return Err(PipelineError::Provider(e.to_string()).temp()), + Err(e) => { + let error = PipelineError::Provider(e.to_string()).temp(); + self.metrics.record_error(&error); + return Err(error); + } }; if let Err(e) = self.system_config.update_with_receipts( @@ -101,9 +114,13 @@ impl OriginAdvancer for L1Traversal { self.rollup_config.l1_system_config_address, self.rollup_config.is_ecotone_active(next_l1_origin.timestamp), ) { - return Err(PipelineError::SystemConfigUpdate(e).crit()); + let error = PipelineError::SystemConfigUpdate(e).crit(); + self.metrics.record_error(&error); + return Err(error); } + self.metrics.record_system_config_update(); + let prev_block_holocene = self.rollup_config.is_holocene_active(block.timestamp); let next_block_holocene = self.rollup_config.is_holocene_active(next_l1_origin.timestamp); @@ -111,9 +128,12 @@ impl OriginAdvancer for L1Traversal { self.block = Some(next_l1_origin); self.done = false; + self.metrics.record_block_processed(next_l1_origin.number); + // If the prev block is not holocene, but the next is, we need to flag this // so the pipeline driver will reset the pipeline for holocene activation. if !prev_block_holocene && next_block_holocene { + self.metrics.record_holocene_activation(); return Err(ResetError::HoloceneActivation.reset()); } @@ -198,7 +218,7 @@ pub(crate) mod tests { let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default(); provider.insert_receipts(hash, vec![receipt.clone()]); } - L1Traversal::new(provider, Arc::new(rollup_config)) + L1Traversal::new(provider, Arc::new(rollup_config), PipelineMetrics::no_op()) } pub(crate) fn new_populated_test_traversal() -> L1Traversal { diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index cec0b6341..5cf71bf6e 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -19,10 +19,10 @@ mod l1_traversal; pub use l1_traversal::L1Traversal; mod l1_retrieval; -pub use l1_retrieval::{L1Retrieval, L1RetrievalProvider}; +pub use l1_retrieval::L1Retrieval; mod frame_queue; -pub use frame_queue::{FrameQueue, FrameQueueProvider}; +pub use frame_queue::FrameQueue; mod channel; pub use channel::{ diff --git a/crates/derive/src/test_utils/frame_queue.rs b/crates/derive/src/test_utils/frame_queue.rs index db23a6a65..26c71f77f 100644 --- a/crates/derive/src/test_utils/frame_queue.rs +++ b/crates/derive/src/test_utils/frame_queue.rs @@ -2,8 +2,7 @@ use crate::{ errors::{PipelineError, PipelineResult}, - stages::FrameQueueProvider, - traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + traits::{FrameQueueProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; diff --git a/crates/derive/src/test_utils/frames.rs b/crates/derive/src/test_utils/frames.rs index 9c4a8eb0a..e59142dc3 100644 --- a/crates/derive/src/test_utils/frames.rs +++ b/crates/derive/src/test_utils/frames.rs @@ -2,6 +2,7 @@ use crate::{ errors::{PipelineError, PipelineErrorKind}, + metrics::PipelineMetrics, stages::{FrameQueue, NextFrameProvider}, test_utils::TestFrameQueueProvider, traits::OriginProvider, @@ -84,7 +85,11 @@ impl FrameQueueBuilder { let config = self.config.unwrap_or_default(); let config = Arc::new(config); let err = self.expected_err.unwrap_or_else(|| PipelineError::Eof.temp()); - FrameQueueAsserter::new(FrameQueue::new(mock, config), self.expected_frames, err) + FrameQueueAsserter::new( + FrameQueue::new(mock, config, PipelineMetrics::no_op()), + self.expected_frames, + err, + ) } } diff --git a/crates/derive/src/traits/attributes.rs b/crates/derive/src/traits/attributes.rs index 4f9f103c9..070ac724e 100644 --- a/crates/derive/src/traits/attributes.rs +++ b/crates/derive/src/traits/attributes.rs @@ -47,9 +47,3 @@ pub trait AttributesBuilder { epoch: BlockNumHash, ) -> PipelineResult; } - -/// Metrics trait for `AttributesQueue` stage. -pub trait AttributesQueueMetrics { - /// Records something. - fn record_some_metric(&self); -} diff --git a/crates/derive/src/traits/frame_queue.rs b/crates/derive/src/traits/frame_queue.rs new file mode 100644 index 000000000..2a409e723 --- /dev/null +++ b/crates/derive/src/traits/frame_queue.rs @@ -0,0 +1,30 @@ +use crate::errors::{PipelineErrorKind, PipelineResult}; +use alloc::boxed::Box; +use alloy_primitives::Bytes; +use async_trait::async_trait; + +/// Provides data frames for the [FrameQueue] stage. +#[async_trait] +pub trait FrameQueueProvider { + /// An item that can be converted into a byte array. + type Item: Into; + + /// Retrieves the next data item from the L1 retrieval stage. + /// If there is data, it pushes it into the next stage. + /// If there is no data, it returns an error. + async fn next_data(&mut self) -> PipelineResult; +} + +/// Metrics trait for `FrameQueue`. +pub trait FrameQueueMetrics { + /// Records the number of frames decoded. + fn record_frames_decoded(&self, count: usize); + /// Records the number of frames dropped. + fn record_frames_dropped(&self, count: usize); + /// Records the number of frames queued. + fn record_frames_queued(&self, count: usize); + /// Records the number of frames loaded. + fn record_load_frames_attempt(&self); + /// Records error loading frames. + fn record_error(&self, error: &PipelineErrorKind); +} diff --git a/crates/derive/src/traits/l1_retrieval.rs b/crates/derive/src/traits/l1_retrieval.rs new file mode 100644 index 000000000..2944fb639 --- /dev/null +++ b/crates/derive/src/traits/l1_retrieval.rs @@ -0,0 +1,35 @@ +use crate::errors::{PipelineErrorKind, PipelineResult}; +use alloc::boxed::Box; +use alloy_primitives::Address; +use async_trait::async_trait; +use op_alloy_protocol::BlockInfo; + +/// Provides L1 blocks for the [L1Retrieval] stage. +/// This is the previous stage in the pipeline. +#[async_trait] +pub trait L1RetrievalProvider { + /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. + /// This function can only be called once while the stage is in progress, and will return + /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is + /// complete and the [BlockInfo] has been consumed, an [PipelineError::Eof] error is returned. + /// + /// [L1Traversal]: crate::stages::L1Traversal + async fn next_l1_block(&mut self) -> PipelineResult>; + + /// Returns the batcher [Address] from the [op_alloy_genesis::SystemConfig]. + fn batcher_addr(&self) -> Address; +} + +/// Metrics trait for `L1Retrieval`. +pub trait L1RetrievalMetrics: Send + Sync { + /// Records the number of data fetch attempts. + fn record_data_fetch_attempt(&self, block_number: u64); + /// Records successful data fetches. + fn record_data_fetch_success(&self, block_number: u64); + /// Records failed data fetches. + fn record_data_fetch_failure(&self, block_number: u64, error: &PipelineErrorKind); + /// Records the number of blocks processed. + fn record_block_processed(&self, block_number: u64); + /// Records errors. + fn record_error(&self, error: &PipelineErrorKind); +} diff --git a/crates/derive/src/traits/l1_traversal.rs b/crates/derive/src/traits/l1_traversal.rs new file mode 100644 index 000000000..32d412088 --- /dev/null +++ b/crates/derive/src/traits/l1_traversal.rs @@ -0,0 +1,15 @@ +use crate::errors::PipelineErrorKind; + +/// Metrics trait for `L1Traversal`. +pub trait L1TraversalMetrics: Send + Sync { + /// Records the block number of the last processed block. + fn record_block_processed(&self, block_number: u64); + /// Records system config update. + fn record_system_config_update(&self); + /// Records reorg detection. + fn record_reorg_detected(&self); + /// Records Holocene activation. + fn record_holocene_activation(&self); + /// Records the block number of the last processed block. + fn record_error(&self, error: &PipelineErrorKind); +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 783b5c788..63f3ee91f 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -10,9 +10,7 @@ mod providers; pub use providers::{ChainProvider, L2ChainProvider}; mod attributes; -pub use attributes::{ - AttributesBuilder, AttributesProvider, AttributesQueueMetrics, NextAttributes, -}; +pub use attributes::{AttributesBuilder, AttributesProvider, NextAttributes}; mod data_sources; pub use data_sources::{AsyncIterator, BlobProvider, DataAvailabilityProvider}; @@ -22,3 +20,12 @@ pub use reset::ResetProvider; mod stages; pub use stages::{OriginAdvancer, OriginProvider, SignalReceiver}; + +mod l1_traversal; +pub use l1_traversal::L1TraversalMetrics; + +mod l1_retrieval; +pub use l1_retrieval::{L1RetrievalMetrics, L1RetrievalProvider}; + +mod frame_queue; +pub use frame_queue::{FrameQueueMetrics, FrameQueueProvider}; From 4a0d6af632e5d387bca7dd8723670ec1dea85017 Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Thu, 7 Nov 2024 15:08:55 +0100 Subject: [PATCH 09/11] feat: add no-op metrics for `ChannelProvider` stage --- crates/derive/src/metrics/mod.rs | 23 ++++---- crates/derive/src/metrics/noop.rs | 23 ++++---- crates/derive/src/pipeline/builder.rs | 3 +- .../src/stages/channel/channel_provider.rs | 52 +++++++++++++------ crates/derive/src/stages/l1_traversal.rs | 13 ++--- crates/derive/src/traits/channel_provider.rs | 7 +++ crates/derive/src/traits/frame_queue.rs | 6 +-- crates/derive/src/traits/l1_retrieval.rs | 2 - crates/derive/src/traits/l1_traversal.rs | 4 -- crates/derive/src/traits/mod.rs | 3 ++ 10 files changed, 77 insertions(+), 59 deletions(-) create mode 100644 crates/derive/src/traits/channel_provider.rs diff --git a/crates/derive/src/metrics/mod.rs b/crates/derive/src/metrics/mod.rs index 530107f22..85c1d19a8 100644 --- a/crates/derive/src/metrics/mod.rs +++ b/crates/derive/src/metrics/mod.rs @@ -6,8 +6,8 @@ use crate::{ errors::PipelineErrorKind, pipeline::Signal, traits::{ - DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, L1TraversalMetrics, - StepResult, + ChannelProviderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, + L1TraversalMetrics, StepResult, }, }; use alloc::sync::Arc; @@ -20,6 +20,7 @@ pub struct PipelineMetrics { pub(crate) l1_traversal_metrics: Arc, pub(crate) l1_retrieval_metrics: Arc, pub(crate) frame_queue_metrics: Arc, + pub(crate) channel_provider_metrics: Arc, // todo: add more metrics here for each stage } @@ -55,10 +56,6 @@ impl L1TraversalMetrics for PipelineMetrics { fn record_holocene_activation(&self) { self.l1_traversal_metrics.record_holocene_activation() } - - fn record_error(&self, error: &PipelineErrorKind) { - self.l1_traversal_metrics.record_error(error) - } } impl L1RetrievalMetrics for PipelineMetrics { @@ -77,10 +74,6 @@ impl L1RetrievalMetrics for PipelineMetrics { fn record_block_processed(&self, block_number: u64) { self.l1_retrieval_metrics.record_block_processed(block_number) } - - fn record_error(&self, error: &PipelineErrorKind) { - self.l1_retrieval_metrics.record_error(error) - } } impl FrameQueueMetrics for PipelineMetrics { @@ -99,8 +92,14 @@ impl FrameQueueMetrics for PipelineMetrics { fn record_load_frames_attempt(&self) { self.frame_queue_metrics.record_load_frames_attempt() } +} + +impl ChannelProviderMetrics for PipelineMetrics { + fn record_stage_transition(&self, from: &str, to: &str) { + self.channel_provider_metrics.record_stage_transition(from, to) + } - fn record_error(&self, error: &PipelineErrorKind) { - self.frame_queue_metrics.record_error(error) + fn record_data_item_provided(&self) { + self.channel_provider_metrics.record_data_item_provided() } } diff --git a/crates/derive/src/metrics/noop.rs b/crates/derive/src/metrics/noop.rs index 10b86e77e..84a0fe43b 100644 --- a/crates/derive/src/metrics/noop.rs +++ b/crates/derive/src/metrics/noop.rs @@ -3,7 +3,8 @@ use crate::{ metrics::PipelineMetrics, pipeline::{Signal, StepResult}, traits::{ - DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, L1TraversalMetrics, + ChannelProviderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, + L1TraversalMetrics, }, }; use alloc::sync::Arc; @@ -16,6 +17,7 @@ impl PipelineMetrics { l1_traversal_metrics: Arc::new(NoopL1TraversalMetrics), l1_retrieval_metrics: Arc::new(NoopL1RetrievalMetrics), frame_queue_metrics: Arc::new(NoopFrameQueueMetrics), + channel_provider_metrics: Arc::new(NoopChannelProviderMetrics), // todo: add more metrics here for each stage } } @@ -55,10 +57,6 @@ impl L1TraversalMetrics for NoopL1TraversalMetrics { fn record_holocene_activation(&self) { // No-op } - - fn record_error(&self, _error: &PipelineErrorKind) { - // No-op - } } /// No-op implementation of `L1RetrievalMetrics`. @@ -81,10 +79,6 @@ impl L1RetrievalMetrics for NoopL1RetrievalMetrics { fn record_block_processed(&self, _block_number: u64) { // No-op } - - fn record_error(&self, _error: &PipelineErrorKind) { - // No-op - } } /// No-op implementation of `FrameQueueMetrics`. @@ -107,8 +101,17 @@ impl FrameQueueMetrics for NoopFrameQueueMetrics { fn record_load_frames_attempt(&self) { // No-op } +} + +#[derive(Debug)] +struct NoopChannelProviderMetrics; + +impl ChannelProviderMetrics for NoopChannelProviderMetrics { + fn record_stage_transition(&self, _from: &str, _to: &str) { + // No-op + } - fn record_error(&self, _error: &PipelineErrorKind) { + fn record_data_item_provided(&self) { // No-op } } diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index ad3405f3e..dc64c89b0 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -146,7 +146,8 @@ where let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source, metrics.clone()); let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config), metrics.clone()); - let channel_provider = ChannelProvider::new(Arc::clone(&rollup_config), frame_queue); + let channel_provider = + ChannelProvider::new(Arc::clone(&rollup_config), frame_queue, metrics.clone()); let channel_reader = ChannelReader::new(channel_provider, Arc::clone(&rollup_config)); let batch_stream = BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); diff --git a/crates/derive/src/stages/channel/channel_provider.rs b/crates/derive/src/stages/channel/channel_provider.rs index 1dc868ac1..d32d3341d 100644 --- a/crates/derive/src/stages/channel/channel_provider.rs +++ b/crates/derive/src/stages/channel/channel_provider.rs @@ -3,7 +3,8 @@ use super::{ChannelAssembler, ChannelBank, ChannelReaderProvider, NextFrameProvider}; use crate::{ errors::{PipelineError, PipelineResult}, - traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + metrics::PipelineMetrics, + traits::{ChannelProviderMetrics, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Bytes; @@ -39,6 +40,8 @@ where /// /// Must be [None] if `prev` or `channel_bank` is [Some]. channel_assembler: Option>, + /// Metrics collector. + metrics: PipelineMetrics, } impl

ChannelProvider

@@ -46,8 +49,8 @@ where P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Creates a new [ChannelProvider] with the given configuration and previous stage. - pub const fn new(cfg: Arc, prev: P) -> Self { - Self { cfg, prev: Some(prev), channel_bank: None, channel_assembler: None } + pub const fn new(cfg: Arc, prev: P, metrics: PipelineMetrics) -> Self { + Self { cfg, prev: Some(prev), channel_bank: None, channel_assembler: None, metrics } } /// Attempts to update the active stage of the mux. @@ -57,13 +60,16 @@ where // On the first call to `attempt_update`, we need to determine the active stage to // initialize the mux with. if self.cfg.is_holocene_active(origin.timestamp) { + self.metrics.record_stage_transition("ChannelProvider", "ChannelAssembler"); self.channel_assembler = Some(ChannelAssembler::new(self.cfg.clone(), prev)); } else { + self.metrics.record_stage_transition("ChannelProvider", "ChannelBank"); self.channel_bank = Some(ChannelBank::new(self.cfg.clone(), prev)); } } else if self.channel_bank.is_some() && self.cfg.is_holocene_active(origin.timestamp) { // If the channel bank is active and Holocene is also active, transition to the channel // assembler. + self.metrics.record_stage_transition("ChannelBank", "ChannelAssembler"); let channel_bank = self.channel_bank.take().expect("Must have channel bank"); self.channel_assembler = Some(ChannelAssembler::new(self.cfg.clone(), channel_bank.prev)); @@ -72,6 +78,7 @@ where // If the channel assembler is active, and Holocene is not active, it indicates an L1 // reorg around Holocene activation. Transition back to the channel bank // until Holocene re-activates. + self.metrics.record_stage_transition("ChannelAssembler", "ChannelBank"); let channel_assembler = self.channel_assembler.take().expect("Must have channel assembler"); self.channel_bank = Some(ChannelBank::new(self.cfg.clone(), channel_assembler.prev)); @@ -141,13 +148,23 @@ where async fn next_data(&mut self) -> PipelineResult> { self.attempt_update()?; - if let Some(channel_assembler) = self.channel_assembler.as_mut() { - channel_assembler.next_data().await + let data = if let Some(channel_assembler) = self.channel_assembler.as_mut() { + let data = channel_assembler.next_data().await?; + if data.is_some() { + self.metrics.record_data_item_provided(); + } + data } else if let Some(channel_bank) = self.channel_bank.as_mut() { - channel_bank.next_data().await + let data = channel_bank.next_data().await?; + if data.is_some() { + self.metrics.record_data_item_provided(); + } + data } else { - Err(PipelineError::NotEnoughData.temp()) - } + return Err(PipelineError::NotEnoughData.temp()); + }; + + Ok(data) } } @@ -155,6 +172,7 @@ where mod test { use super::ChannelProvider; use crate::{ + metrics::PipelineMetrics, prelude::{OriginProvider, PipelineError}, stages::ChannelReaderProvider, test_utils::TestNextFrameProvider, @@ -168,7 +186,7 @@ mod test { fn test_channel_provider_assembler_active() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); assert!(channel_provider.attempt_update().is_ok()); assert!(channel_provider.prev.is_none()); @@ -180,7 +198,7 @@ mod test { fn test_channel_provider_bank_active() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig::default()); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); assert!(channel_provider.attempt_update().is_ok()); assert!(channel_provider.prev.is_none()); @@ -192,7 +210,7 @@ mod test { fn test_channel_provider_retain_current_bank() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig::default()); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); // Assert the multiplexer hasn't been initialized. assert!(channel_provider.channel_bank.is_none()); @@ -215,7 +233,7 @@ mod test { fn test_channel_provider_retain_current_assembler() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); // Assert the multiplexer hasn't been initialized. assert!(channel_provider.channel_bank.is_none()); @@ -238,7 +256,7 @@ mod test { fn test_channel_provider_transition_stage() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); channel_provider.attempt_update().unwrap(); @@ -260,7 +278,7 @@ mod test { fn test_channel_provider_transition_stage_backwards() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); channel_provider.attempt_update().unwrap(); @@ -294,7 +312,8 @@ mod test { ]; let provider = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect()); let cfg = Arc::new(RollupConfig::default()); - let mut channel_provider = ChannelProvider::new(cfg.clone(), provider); + let mut channel_provider = + ChannelProvider::new(cfg.clone(), provider, PipelineMetrics::no_op()); // Load in the first frame. assert_eq!( @@ -325,7 +344,8 @@ mod test { ]; let provider = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect()); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg.clone(), provider); + let mut channel_provider = + ChannelProvider::new(cfg.clone(), provider, PipelineMetrics::no_op()); // Load in the first frame. assert_eq!( diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 244f23ae7..729fe4ce7 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -79,7 +79,6 @@ impl OriginAdvancer for L1Traversal { let block = match self.block { Some(block) => block, None => { - self.metrics.record_error(&PipelineError::Eof.temp()); warn!(target: "l1-traversal", "Missing current block, can't advance origin with no reference."); return Err(PipelineError::Eof.temp()); } @@ -87,9 +86,7 @@ impl OriginAdvancer for L1Traversal { let next_l1_origin = match self.data_source.block_info_by_number(block.number + 1).await { Ok(block) => block, Err(e) => { - let error = PipelineError::Provider(e.to_string()).temp(); - self.metrics.record_error(&error); - return Err(error); + return Err(PipelineError::Provider(e.to_string()).temp()); } }; @@ -103,9 +100,7 @@ impl OriginAdvancer for L1Traversal { let receipts = match self.data_source.receipts_by_hash(next_l1_origin.hash).await { Ok(receipts) => receipts, Err(e) => { - let error = PipelineError::Provider(e.to_string()).temp(); - self.metrics.record_error(&error); - return Err(error); + return Err(PipelineError::Provider(e.to_string()).temp()); } }; @@ -114,9 +109,7 @@ impl OriginAdvancer for L1Traversal { self.rollup_config.l1_system_config_address, self.rollup_config.is_ecotone_active(next_l1_origin.timestamp), ) { - let error = PipelineError::SystemConfigUpdate(e).crit(); - self.metrics.record_error(&error); - return Err(error); + return Err(PipelineError::SystemConfigUpdate(e).crit()); } self.metrics.record_system_config_update(); diff --git a/crates/derive/src/traits/channel_provider.rs b/crates/derive/src/traits/channel_provider.rs new file mode 100644 index 000000000..2ab859189 --- /dev/null +++ b/crates/derive/src/traits/channel_provider.rs @@ -0,0 +1,7 @@ +/// Metrics trait for `ChannelProvider`. +pub trait ChannelProviderMetrics: Send + Sync { + /// Records the number of data items consumed and what type of data was consumed. + fn record_stage_transition(&self, from: &str, to: &str); + /// Records the number of data items provided. + fn record_data_item_provided(&self); +} diff --git a/crates/derive/src/traits/frame_queue.rs b/crates/derive/src/traits/frame_queue.rs index 2a409e723..f480ad4d1 100644 --- a/crates/derive/src/traits/frame_queue.rs +++ b/crates/derive/src/traits/frame_queue.rs @@ -1,4 +1,4 @@ -use crate::errors::{PipelineErrorKind, PipelineResult}; +use crate::errors::PipelineResult; use alloc::boxed::Box; use alloy_primitives::Bytes; use async_trait::async_trait; @@ -16,7 +16,7 @@ pub trait FrameQueueProvider { } /// Metrics trait for `FrameQueue`. -pub trait FrameQueueMetrics { +pub trait FrameQueueMetrics: Send + Sync { /// Records the number of frames decoded. fn record_frames_decoded(&self, count: usize); /// Records the number of frames dropped. @@ -25,6 +25,4 @@ pub trait FrameQueueMetrics { fn record_frames_queued(&self, count: usize); /// Records the number of frames loaded. fn record_load_frames_attempt(&self); - /// Records error loading frames. - fn record_error(&self, error: &PipelineErrorKind); } diff --git a/crates/derive/src/traits/l1_retrieval.rs b/crates/derive/src/traits/l1_retrieval.rs index 2944fb639..d38663454 100644 --- a/crates/derive/src/traits/l1_retrieval.rs +++ b/crates/derive/src/traits/l1_retrieval.rs @@ -30,6 +30,4 @@ pub trait L1RetrievalMetrics: Send + Sync { fn record_data_fetch_failure(&self, block_number: u64, error: &PipelineErrorKind); /// Records the number of blocks processed. fn record_block_processed(&self, block_number: u64); - /// Records errors. - fn record_error(&self, error: &PipelineErrorKind); } diff --git a/crates/derive/src/traits/l1_traversal.rs b/crates/derive/src/traits/l1_traversal.rs index 32d412088..6f4001276 100644 --- a/crates/derive/src/traits/l1_traversal.rs +++ b/crates/derive/src/traits/l1_traversal.rs @@ -1,5 +1,3 @@ -use crate::errors::PipelineErrorKind; - /// Metrics trait for `L1Traversal`. pub trait L1TraversalMetrics: Send + Sync { /// Records the block number of the last processed block. @@ -10,6 +8,4 @@ pub trait L1TraversalMetrics: Send + Sync { fn record_reorg_detected(&self); /// Records Holocene activation. fn record_holocene_activation(&self); - /// Records the block number of the last processed block. - fn record_error(&self, error: &PipelineErrorKind); } diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 63f3ee91f..0e65d8d40 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -29,3 +29,6 @@ pub use l1_retrieval::{L1RetrievalMetrics, L1RetrievalProvider}; mod frame_queue; pub use frame_queue::{FrameQueueMetrics, FrameQueueProvider}; + +mod channel_provider; +pub use channel_provider::ChannelProviderMetrics; From 5ca014c970f64918215ff895f4e590664a5b4cee Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Thu, 7 Nov 2024 16:15:21 +0100 Subject: [PATCH 10/11] feat: add no-op metrics for `ChannelReader`, `BatchStream`, `BatchQueue` stages --- crates/derive/src/metrics/mod.rs | 50 ++++++++++++++- crates/derive/src/metrics/noop.rs | 58 ++++++++++++++++- crates/derive/src/pipeline/builder.rs | 19 ++++-- .../derive/src/stages/batch/batch_provider.rs | 47 ++++++++++---- crates/derive/src/stages/batch/batch_queue.rs | 53 ++++++++------- .../derive/src/stages/batch/batch_stream.rs | 64 ++++++++++++++++--- .../src/stages/channel/channel_reader.rs | 35 ++++++---- crates/derive/src/traits/batch_queue.rs | 9 +++ crates/derive/src/traits/batch_stream.rs | 11 ++++ crates/derive/src/traits/channel_reader.rs | 7 ++ crates/derive/src/traits/mod.rs | 9 +++ 11 files changed, 297 insertions(+), 65 deletions(-) create mode 100644 crates/derive/src/traits/batch_queue.rs create mode 100644 crates/derive/src/traits/batch_stream.rs create mode 100644 crates/derive/src/traits/channel_reader.rs diff --git a/crates/derive/src/metrics/mod.rs b/crates/derive/src/metrics/mod.rs index 85c1d19a8..d6e3c098b 100644 --- a/crates/derive/src/metrics/mod.rs +++ b/crates/derive/src/metrics/mod.rs @@ -6,8 +6,9 @@ use crate::{ errors::PipelineErrorKind, pipeline::Signal, traits::{ - ChannelProviderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, - L1TraversalMetrics, StepResult, + BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics, ChannelReaderMetrics, + DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, L1TraversalMetrics, + StepResult, }, }; use alloc::sync::Arc; @@ -21,6 +22,9 @@ pub struct PipelineMetrics { pub(crate) l1_retrieval_metrics: Arc, pub(crate) frame_queue_metrics: Arc, pub(crate) channel_provider_metrics: Arc, + pub(crate) channel_reader_metrics: Arc, + pub(crate) batch_stream_metrics: Arc, + pub(crate) batch_queue_metrics: Arc, // todo: add more metrics here for each stage } @@ -103,3 +107,45 @@ impl ChannelProviderMetrics for PipelineMetrics { self.channel_provider_metrics.record_data_item_provided() } } + +impl ChannelReaderMetrics for PipelineMetrics { + fn record_batch_read(&self) { + self.channel_reader_metrics.record_batch_read() + } + + fn record_channel_flushed(&self) { + self.channel_reader_metrics.record_channel_flushed() + } +} + +impl BatchStreamMetrics for PipelineMetrics { + fn record_batch_processed(&self) { + self.batch_stream_metrics.record_batch_processed() + } + + fn record_span_batch_accepted(&self) { + self.batch_stream_metrics.record_span_batch_accepted() + } + + fn record_span_batch_dropped(&self) { + self.batch_stream_metrics.record_span_batch_dropped() + } + + fn record_buffer_size(&self, size: usize) { + self.batch_stream_metrics.record_buffer_size(size) + } +} + +impl BatchQueueMetrics for PipelineMetrics { + fn record_batches_queued(&self, count: usize) { + self.batch_queue_metrics.record_batches_queued(count) + } + + fn record_batch_dropped(&self) { + self.batch_queue_metrics.record_batch_dropped() + } + + fn record_epoch_advanced(&self, epoch: u64) { + self.batch_queue_metrics.record_epoch_advanced(epoch) + } +} diff --git a/crates/derive/src/metrics/noop.rs b/crates/derive/src/metrics/noop.rs index 84a0fe43b..b938f5289 100644 --- a/crates/derive/src/metrics/noop.rs +++ b/crates/derive/src/metrics/noop.rs @@ -3,8 +3,8 @@ use crate::{ metrics::PipelineMetrics, pipeline::{Signal, StepResult}, traits::{ - ChannelProviderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, - L1TraversalMetrics, + BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics, ChannelReaderMetrics, + DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, L1TraversalMetrics, }, }; use alloc::sync::Arc; @@ -18,6 +18,9 @@ impl PipelineMetrics { l1_retrieval_metrics: Arc::new(NoopL1RetrievalMetrics), frame_queue_metrics: Arc::new(NoopFrameQueueMetrics), channel_provider_metrics: Arc::new(NoopChannelProviderMetrics), + channel_reader_metrics: Arc::new(NoopChannelReaderMetrics), + batch_stream_metrics: Arc::new(NoopBatchStreamMetrics), + batch_queue_metrics: Arc::new(NoopBatchQueueMetrics), // todo: add more metrics here for each stage } } @@ -115,3 +118,54 @@ impl ChannelProviderMetrics for NoopChannelProviderMetrics { // No-op } } + +#[derive(Debug)] +struct NoopChannelReaderMetrics; + +impl ChannelReaderMetrics for NoopChannelReaderMetrics { + fn record_batch_read(&self) { + // No-op + } + + fn record_channel_flushed(&self) { + // No-op + } +} + +#[derive(Debug)] +struct NoopBatchStreamMetrics; + +impl BatchStreamMetrics for NoopBatchStreamMetrics { + fn record_batch_processed(&self) { + // No-op + } + + fn record_span_batch_accepted(&self) { + // No-op + } + + fn record_span_batch_dropped(&self) { + // No-op + } + + fn record_buffer_size(&self, _size: usize) { + // No-op + } +} + +#[derive(Debug)] +struct NoopBatchQueueMetrics; + +impl BatchQueueMetrics for NoopBatchQueueMetrics { + fn record_batches_queued(&self, _count: usize) { + // No-op + } + + fn record_batch_dropped(&self) { + // No-op + } + + fn record_epoch_advanced(&self, _epoch: u64) { + // No-op + } +} diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index dc64c89b0..2a9f54dac 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -148,11 +148,20 @@ where FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config), metrics.clone()); let channel_provider = ChannelProvider::new(Arc::clone(&rollup_config), frame_queue, metrics.clone()); - let channel_reader = ChannelReader::new(channel_provider, Arc::clone(&rollup_config)); - let batch_stream = - BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); - let batch_provider = - BatchProvider::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); + let channel_reader = + ChannelReader::new(channel_provider, Arc::clone(&rollup_config), metrics.clone()); + let batch_stream = BatchStream::new( + channel_reader, + rollup_config.clone(), + l2_chain_provider.clone(), + metrics.clone(), + ); + let batch_provider = BatchProvider::new( + rollup_config.clone(), + batch_stream, + l2_chain_provider.clone(), + metrics.clone(), + ); let attributes = AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder); diff --git a/crates/derive/src/stages/batch/batch_provider.rs b/crates/derive/src/stages/batch/batch_provider.rs index 315aa74af..7ba515800 100644 --- a/crates/derive/src/stages/batch/batch_provider.rs +++ b/crates/derive/src/stages/batch/batch_provider.rs @@ -3,6 +3,7 @@ use super::NextBatchProvider; use crate::{ errors::{PipelineError, PipelineResult}, + metrics::PipelineMetrics, stages::{BatchQueue, BatchValidator}, traits::{ AttributesProvider, L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver, @@ -47,6 +48,8 @@ where /// /// Must be [None] if `prev` or `batch_queue` is [Some]. batch_validator: Option>, + /// Metrics collector. + metrics: PipelineMetrics, } impl BatchProvider @@ -55,8 +58,13 @@ where F: L2ChainProvider + Clone + Debug, { /// Creates a new [BatchProvider] with the given configuration and previous stage. - pub const fn new(cfg: Arc, prev: P, provider: F) -> Self { - Self { cfg, provider, prev: Some(prev), batch_queue: None, batch_validator: None } + pub const fn new( + cfg: Arc, + prev: P, + provider: F, + metrics: PipelineMetrics, + ) -> Self { + Self { cfg, provider, prev: Some(prev), batch_queue: None, batch_validator: None, metrics } } /// Attempts to update the active stage of the mux. @@ -68,8 +76,12 @@ where if self.cfg.is_holocene_active(origin.timestamp) { self.batch_validator = Some(BatchValidator::new(self.cfg.clone(), prev)); } else { - self.batch_queue = - Some(BatchQueue::new(self.cfg.clone(), prev, self.provider.clone())); + self.batch_queue = Some(BatchQueue::new( + self.cfg.clone(), + prev, + self.provider.clone(), + self.metrics.clone(), + )); } } else if self.batch_queue.is_some() && self.cfg.is_holocene_active(origin.timestamp) { // If the batch queue is active and Holocene is also active, transition to the batch @@ -83,8 +95,12 @@ where // reorg around Holocene activation. Transition back to the batch queue // until Holocene re-activates. let batch_validator = self.batch_validator.take().expect("Must have batch validator"); - let mut bq = - BatchQueue::new(self.cfg.clone(), batch_validator.prev, self.provider.clone()); + let mut bq = BatchQueue::new( + self.cfg.clone(), + batch_validator.prev, + self.provider.clone(), + self.metrics.clone(), + ); bq.l1_blocks = batch_validator.l1_blocks; self.batch_queue = Some(bq); } @@ -178,6 +194,7 @@ where mod test { use super::BatchProvider; use crate::{ + metrics::PipelineMetrics, test_utils::{TestL2ChainProvider, TestNextBatchProvider}, traits::{OriginProvider, ResetSignal, SignalReceiver}, }; @@ -190,7 +207,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); assert!(batch_provider.attempt_update().is_ok()); assert!(batch_provider.prev.is_none()); @@ -203,7 +221,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig::default()); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); assert!(batch_provider.attempt_update().is_ok()); assert!(batch_provider.prev.is_none()); @@ -216,7 +235,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); batch_provider.attempt_update().unwrap(); @@ -239,7 +259,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); batch_provider.attempt_update().unwrap(); @@ -270,7 +291,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig::default()); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); // Reset the batch provider. batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); @@ -286,7 +308,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); // Reset the batch provider. batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); diff --git a/crates/derive/src/stages/batch/batch_queue.rs b/crates/derive/src/stages/batch/batch_queue.rs index dc8ba759f..47fd19441 100644 --- a/crates/derive/src/stages/batch/batch_queue.rs +++ b/crates/derive/src/stages/batch/batch_queue.rs @@ -3,9 +3,10 @@ use super::NextBatchProvider; use crate::{ errors::{PipelineEncodingError, PipelineError, PipelineErrorKind, PipelineResult, ResetError}, + metrics::PipelineMetrics, traits::{ - AttributesProvider, L2ChainProvider, OriginAdvancer, OriginProvider, ResetSignal, Signal, - SignalReceiver, + AttributesProvider, BatchQueueMetrics, L2ChainProvider, OriginAdvancer, OriginProvider, + ResetSignal, Signal, SignalReceiver, }, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; @@ -58,6 +59,8 @@ where pub(crate) next_spans: Vec, /// Used to validate the batches. pub(crate) fetcher: BF, + /// Metrics collector. + metrics: PipelineMetrics, } impl BatchQueue @@ -67,7 +70,7 @@ where { /// Creates a new [BatchQueue] stage. #[allow(clippy::missing_const_for_fn)] - pub fn new(cfg: Arc, prev: P, fetcher: BF) -> Self { + pub fn new(cfg: Arc, prev: P, fetcher: BF, metrics: PipelineMetrics) -> Self { Self { cfg, prev, @@ -76,6 +79,7 @@ where batches: Default::default(), next_spans: Default::default(), fetcher, + metrics, } } @@ -232,6 +236,7 @@ where "Advancing to next epoch: {}, timestamp: {}, epoch timestamp: {}", next_epoch.number, next_timestamp, next_epoch.timestamp ); + self.metrics.record_epoch_advanced(next_epoch.number); self.l1_blocks.remove(0); Err(PipelineError::Eof.temp()) } @@ -252,12 +257,14 @@ where (self.cfg.is_holocene_active(origin.timestamp) && validity.is_future()); if drop { self.prev.flush(); + self.metrics.record_batch_dropped(); return Ok(()); } else if validity.is_outdated() { // If the batch is outdated, we drop it without flushing the previous stage. return Ok(()); } self.batches.push(data); + self.metrics.record_batches_queued(self.batches.len()); Ok(()) } } @@ -492,7 +499,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let sb = SingleBatch::default(); bq.next_spans.push(sb.clone()); @@ -506,7 +513,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.l1_blocks.push(BlockInfo::default()); bq.next_spans.push(SingleBatch::default()); bq.batches.push(BatchWithInclusionBlock { @@ -527,7 +534,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.l1_blocks.push(BlockInfo::default()); bq.next_spans.push(SingleBatch::default()); bq.batches.push(BatchWithInclusionBlock { @@ -569,7 +576,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -600,7 +607,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -634,7 +641,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -669,7 +676,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -685,7 +692,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(data); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let result = bq.derive_next_batch(false, parent).await.unwrap_err(); assert_eq!(result, PipelineError::MissingOrigin.crit()); @@ -702,7 +709,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo { l1_origin: BlockNumHash { number: 10, ..Default::default() }, ..Default::default() @@ -723,7 +730,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); bq.l1_blocks.push(BlockInfo::default()); @@ -747,7 +754,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); bq.l1_blocks.push(BlockInfo::default()); @@ -771,7 +778,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); bq.l1_blocks.push(BlockInfo::default()); bq.l1_blocks.push(BlockInfo::default()); @@ -806,7 +813,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -848,7 +855,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -885,7 +892,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let sb = SingleBatch::default(); bq.next_spans.push(sb.clone()); let next = bq.next_batch(L2BlockInfo::default()).await.unwrap(); @@ -904,7 +911,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let sb = SingleBatch::default(); bq.next_spans.push(sb.clone()); let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); @@ -919,7 +926,7 @@ mod tests { let batch = reader.next_batch(cfg.as_ref()).unwrap(); let mock = TestNextBatchProvider::new(vec![Ok(batch)]); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); assert_eq!(res, PipelineError::NotEnoughData.temp()); assert!(bq.is_last_in_span()); @@ -936,7 +943,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo { l1_origin: BlockNumHash { number: 10, ..Default::default() }, ..Default::default() @@ -1064,7 +1071,7 @@ mod tests { op_blocks: vec![block, second], ..Default::default() }; - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo { block_info: BlockInfo { number: 9, @@ -1093,7 +1100,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(data); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let batch = bq.next_batch(parent).await.unwrap(); assert_eq!(batch, SingleBatch::default()); diff --git a/crates/derive/src/stages/batch/batch_stream.rs b/crates/derive/src/stages/batch/batch_stream.rs index b932420f1..854d20f2d 100644 --- a/crates/derive/src/stages/batch/batch_stream.rs +++ b/crates/derive/src/stages/batch/batch_stream.rs @@ -2,8 +2,11 @@ use crate::{ errors::{PipelineEncodingError, PipelineError, PipelineResult}, + metrics::PipelineMetrics, stages::NextBatchProvider, - traits::{L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + traits::{ + BatchStreamMetrics, L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver, + }, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use async_trait::async_trait; @@ -50,6 +53,8 @@ where config: Arc, /// Used to validate the batches. fetcher: BF, + /// Metrics collector. + metrics: PipelineMetrics, } impl BatchStream @@ -58,8 +63,13 @@ where BF: L2ChainProvider + Debug, { /// Create a new [BatchStream] stage. - pub const fn new(prev: P, config: Arc, fetcher: BF) -> Self { - Self { prev, span: None, buffer: VecDeque::new(), config, fetcher } + pub const fn new( + prev: P, + config: Arc, + fetcher: BF, + metrics: PipelineMetrics, + ) -> Self { + Self { prev, span: None, buffer: VecDeque::new(), config, fetcher, metrics } } /// Returns if the [BatchStream] stage is active based on the @@ -154,8 +164,14 @@ where .await; match validity { - BatchValidity::Accept => self.span = Some(b), + BatchValidity::Accept => { + self.metrics.record_span_batch_accepted(); + self.metrics.record_batch_processed(); + self.span = Some(b) + } BatchValidity::Drop => { + self.metrics.record_span_batch_dropped(); + // Flush the stage. self.flush(); @@ -177,6 +193,9 @@ where } } + self.metrics.record_buffer_size(self.buffer.len()); + self.metrics.record_batch_processed(); + // Attempt to pull a SingleBatch out of the SpanBatch. self.get_single_batch(parent, l1_origins).map(Batch::Single) } @@ -232,7 +251,12 @@ mod test { async fn test_batch_stream_flush() { let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(vec![]); - let mut stream = BatchStream::new(prev, config, TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config, + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); stream.buffer.push_back(SingleBatch::default()); stream.span = Some(SpanBatch::default()); assert!(!stream.buffer.is_empty()); @@ -246,7 +270,12 @@ mod test { async fn test_batch_stream_reset() { let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(vec![]); - let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config.clone(), + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); stream.buffer.push_back(SingleBatch::default()); stream.span = Some(SpanBatch::default()); assert!(!stream.prev.reset); @@ -260,7 +289,12 @@ mod test { async fn test_batch_stream_flush_channel() { let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(vec![]); - let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config.clone(), + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); stream.buffer.push_back(SingleBatch::default()); stream.span = Some(SpanBatch::default()); assert!(!stream.prev.flushed); @@ -279,7 +313,12 @@ mod test { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let config = Arc::new(RollupConfig { holocene_time: Some(100), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(data); - let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config.clone(), + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); // The stage should not be active. assert!(!stream.is_active().unwrap()); @@ -313,7 +352,7 @@ mod test { }); let prev = TestBatchStreamProvider::new(data); let provider = TestL2ChainProvider::default(); - let mut stream = BatchStream::new(prev, config.clone(), provider); + let mut stream = BatchStream::new(prev, config.clone(), provider, PipelineMetrics::no_op()); // The stage should be active. assert!(stream.is_active().unwrap()); @@ -371,7 +410,12 @@ mod test { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(data); - let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config.clone(), + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); // The stage should be active. assert!(stream.is_active().unwrap()); diff --git a/crates/derive/src/stages/channel/channel_reader.rs b/crates/derive/src/stages/channel/channel_reader.rs index 3f4f0a6f1..32f51ea86 100644 --- a/crates/derive/src/stages/channel/channel_reader.rs +++ b/crates/derive/src/stages/channel/channel_reader.rs @@ -2,8 +2,9 @@ use crate::{ errors::{PipelineError, PipelineResult}, + metrics::PipelineMetrics, stages::{decompress_brotli, BatchStreamProvider}, - traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + traits::{ChannelReaderMetrics, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; use alloy_primitives::Bytes; @@ -56,6 +57,8 @@ where next_batch: Option, /// The rollup coonfiguration. cfg: Arc, + /// Metrics collector. + metrics: PipelineMetrics, } impl

ChannelReader

@@ -63,8 +66,8 @@ where P: ChannelReaderProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Create a new [ChannelReader] stage. - pub const fn new(prev: P, cfg: Arc) -> Self { - Self { prev, next_batch: None, cfg } + pub const fn new(prev: P, cfg: Arc, metrics: PipelineMetrics) -> Self { + Self { prev, next_batch: None, cfg, metrics } } /// Creates the batch reader from available channel data. @@ -116,6 +119,7 @@ where /// SAFETY: Only called post-holocene activation. fn flush(&mut self) { debug!(target: "channel-reader", "[POST-HOLOCENE] Flushing channel"); + self.metrics.record_channel_flushed(); self.next_channel(); } @@ -132,7 +136,10 @@ where .next_batch(self.cfg.as_ref()) .ok_or(PipelineError::NotEnoughData.temp()) { - Ok(batch) => Ok(batch), + Ok(batch) => { + self.metrics.record_batch_read(); + Ok(batch) + } Err(e) => { self.next_channel(); Err(e) @@ -272,7 +279,8 @@ mod test { #[tokio::test] async fn test_flush_channel_reader() { let mock = TestChannelReaderProvider::new(vec![Ok(Some(new_compressed_batch_data()))]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); reader.next_batch = Some(BatchReader::new( new_compressed_batch_data(), MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, @@ -284,7 +292,8 @@ mod test { #[tokio::test] async fn test_reset_channel_reader() { let mock = TestChannelReaderProvider::new(vec![Ok(None)]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); reader.next_batch = Some(BatchReader::new( vec![0x00, 0x01, 0x02], MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, @@ -298,7 +307,8 @@ mod test { #[tokio::test] async fn test_next_batch_batch_reader_set_fails() { let mock = TestChannelReaderProvider::new(vec![Err(PipelineError::Eof.temp())]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); assert_eq!(reader.next_batch().await, Err(PipelineError::Eof.temp())); assert!(reader.next_batch.is_none()); } @@ -306,7 +316,8 @@ mod test { #[tokio::test] async fn test_next_batch_batch_reader_no_data() { let mock = TestChannelReaderProvider::new(vec![Ok(None)]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); assert!(matches!( reader.next_batch().await.unwrap_err(), PipelineErrorKind::Temporary(PipelineError::ChannelReaderEmpty) @@ -319,7 +330,8 @@ mod test { let mut first = new_compressed_batch_data(); let second = first.split_to(first.len() / 2); let mock = TestChannelReaderProvider::new(vec![Ok(Some(first)), Ok(Some(second))]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); assert_eq!(reader.next_batch().await, Err(PipelineError::NotEnoughData.temp())); assert!(reader.next_batch.is_none()); } @@ -328,7 +340,8 @@ mod test { async fn test_next_batch_succeeds() { let raw = new_compressed_batch_data(); let mock = TestChannelReaderProvider::new(vec![Ok(Some(raw))]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); let res = reader.next_batch().await.unwrap(); matches!(res, Batch::Span(_)); assert!(reader.next_batch.is_some()); @@ -357,7 +370,7 @@ mod test { let raw = new_compressed_batch_data(); let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let mock = TestChannelReaderProvider::new(vec![Ok(Some(raw))]); - let mut reader = ChannelReader::new(mock, config); + let mut reader = ChannelReader::new(mock, config, PipelineMetrics::no_op()); let res = reader.next_batch().await.unwrap(); matches!(res, Batch::Span(_)); assert!(reader.next_batch.is_some()); diff --git a/crates/derive/src/traits/batch_queue.rs b/crates/derive/src/traits/batch_queue.rs new file mode 100644 index 000000000..912cd7506 --- /dev/null +++ b/crates/derive/src/traits/batch_queue.rs @@ -0,0 +1,9 @@ +/// Metrics for a `BatchQueue` stage. +pub trait BatchQueueMetrics: Send + Sync { + /// Records a batch queued count. + fn record_batches_queued(&self, count: usize); + /// Records a batch dropped. + fn record_batch_dropped(&self); + /// Records an epoch processed. + fn record_epoch_advanced(&self, epoch: u64); +} diff --git a/crates/derive/src/traits/batch_stream.rs b/crates/derive/src/traits/batch_stream.rs new file mode 100644 index 000000000..c5dc78d34 --- /dev/null +++ b/crates/derive/src/traits/batch_stream.rs @@ -0,0 +1,11 @@ +/// Metrics for a `BatchStream` stage. +pub trait BatchStreamMetrics: Send + Sync { + /// Records a batch processed. + fn record_batch_processed(&self); + /// Records a span batch accepted. + fn record_span_batch_accepted(&self); + /// Records a span batch dropped. + fn record_span_batch_dropped(&self); + /// Records the buffer size. + fn record_buffer_size(&self, size: usize); +} diff --git a/crates/derive/src/traits/channel_reader.rs b/crates/derive/src/traits/channel_reader.rs new file mode 100644 index 000000000..ce9c7ea50 --- /dev/null +++ b/crates/derive/src/traits/channel_reader.rs @@ -0,0 +1,7 @@ +/// Metrics for a `ChannelReader`. +pub trait ChannelReaderMetrics: Send + Sync { + /// Records the number of bytes read from the channel. + fn record_batch_read(&self); + /// Records the channel being flushed. + fn record_channel_flushed(&self); +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 0e65d8d40..3736ee9a6 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -32,3 +32,12 @@ pub use frame_queue::{FrameQueueMetrics, FrameQueueProvider}; mod channel_provider; pub use channel_provider::ChannelProviderMetrics; + +mod channel_reader; +pub use channel_reader::ChannelReaderMetrics; + +mod batch_stream; +pub use batch_stream::BatchStreamMetrics; + +mod batch_queue; +pub use batch_queue::BatchQueueMetrics; From 25c54ced3052e651480d91f2729d7a35412b0b26 Mon Sep 17 00:00:00 2001 From: Steph Sinyakov Date: Thu, 7 Nov 2024 16:49:30 +0100 Subject: [PATCH 11/11] feat: add no-op metrics for `AttributesQueue` stage --- crates/derive/src/metrics/mod.rs | 22 +++++++++--- crates/derive/src/metrics/noop.rs | 24 +++++++++++-- crates/derive/src/pipeline/builder.rs | 9 +++-- crates/derive/src/stages/attributes_queue.rs | 38 +++++++++++++++----- crates/derive/src/traits/attributes.rs | 10 ++++++ crates/derive/src/traits/mod.rs | 4 ++- 6 files changed, 88 insertions(+), 19 deletions(-) diff --git a/crates/derive/src/metrics/mod.rs b/crates/derive/src/metrics/mod.rs index d6e3c098b..291170342 100644 --- a/crates/derive/src/metrics/mod.rs +++ b/crates/derive/src/metrics/mod.rs @@ -6,9 +6,9 @@ use crate::{ errors::PipelineErrorKind, pipeline::Signal, traits::{ - BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics, ChannelReaderMetrics, - DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, L1TraversalMetrics, - StepResult, + AttributesQueueMetrics, BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics, + ChannelReaderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, + L1TraversalMetrics, StepResult, }, }; use alloc::sync::Arc; @@ -25,7 +25,7 @@ pub struct PipelineMetrics { pub(crate) channel_reader_metrics: Arc, pub(crate) batch_stream_metrics: Arc, pub(crate) batch_queue_metrics: Arc, - // todo: add more metrics here for each stage + pub(crate) atrirbutes_queue_metrics: Arc, } impl Debug for PipelineMetrics { @@ -149,3 +149,17 @@ impl BatchQueueMetrics for PipelineMetrics { self.batch_queue_metrics.record_epoch_advanced(epoch) } } + +impl AttributesQueueMetrics for PipelineMetrics { + fn record_attributes_created(&self) { + self.atrirbutes_queue_metrics.record_attributes_created() + } + + fn record_batch_loaded(&self) { + self.atrirbutes_queue_metrics.record_batch_loaded() + } + + fn record_attributes_creation_failure(&self) { + self.atrirbutes_queue_metrics.record_attributes_creation_failure() + } +} diff --git a/crates/derive/src/metrics/noop.rs b/crates/derive/src/metrics/noop.rs index b938f5289..83476d80e 100644 --- a/crates/derive/src/metrics/noop.rs +++ b/crates/derive/src/metrics/noop.rs @@ -3,8 +3,9 @@ use crate::{ metrics::PipelineMetrics, pipeline::{Signal, StepResult}, traits::{ - BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics, ChannelReaderMetrics, - DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, L1TraversalMetrics, + AttributesQueueMetrics, BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics, + ChannelReaderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, + L1TraversalMetrics, }, }; use alloc::sync::Arc; @@ -21,7 +22,7 @@ impl PipelineMetrics { channel_reader_metrics: Arc::new(NoopChannelReaderMetrics), batch_stream_metrics: Arc::new(NoopBatchStreamMetrics), batch_queue_metrics: Arc::new(NoopBatchQueueMetrics), - // todo: add more metrics here for each stage + atrirbutes_queue_metrics: Arc::new(NoopAttributesQueueMetrics), } } } @@ -169,3 +170,20 @@ impl BatchQueueMetrics for NoopBatchQueueMetrics { // No-op } } + +#[derive(Debug)] +struct NoopAttributesQueueMetrics; + +impl AttributesQueueMetrics for NoopAttributesQueueMetrics { + fn record_attributes_created(&self) { + // No-op + } + + fn record_batch_loaded(&self) { + // No-op + } + + fn record_attributes_creation_failure(&self) { + // No-op + } +} diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index 2a9f54dac..a6d506bda 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -138,7 +138,6 @@ where let attributes_builder = builder.builder.expect("builder must be set"); let metrics = builder.metrics.expect("metrics must be set"); - // todo: add metrics to the stages // Compose the stage stack. let mut l1_traversal = L1Traversal::new(chain_provider, Arc::clone(&rollup_config), metrics.clone()); @@ -162,8 +161,12 @@ where l2_chain_provider.clone(), metrics.clone(), ); - let attributes = - AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder); + let attributes = AttributesQueue::new( + rollup_config.clone(), + batch_provider, + attributes_builder, + metrics.clone(), + ); // Create the pipeline. Self::new(attributes, rollup_config, l2_chain_provider, metrics) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index f746d365a..987fa5e19 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -2,9 +2,10 @@ use crate::{ errors::{PipelineError, PipelineResult, ResetError}, + metrics::PipelineMetrics, traits::{ - AttributesBuilder, AttributesProvider, NextAttributes, OriginAdvancer, OriginProvider, - Signal, SignalReceiver, + AttributesBuilder, AttributesProvider, AttributesQueueMetrics, NextAttributes, + OriginAdvancer, OriginProvider, Signal, SignalReceiver, }, }; use alloc::{boxed::Box, sync::Arc}; @@ -44,6 +45,8 @@ where batch: Option, /// The attributes builder. builder: AB, + /// Metrics collector. + metrics: PipelineMetrics, } impl AttributesQueue @@ -52,8 +55,13 @@ where AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. - pub const fn new(cfg: Arc, prev: P, builder: AB) -> Self { - Self { cfg, prev, is_last_in_span: false, batch: None, builder } + pub const fn new( + cfg: Arc, + prev: P, + builder: AB, + metrics: PipelineMetrics, + ) -> Self { + Self { cfg, prev, is_last_in_span: false, batch: None, builder, metrics } } /// Loads a [SingleBatch] from the [AttributesProvider] if needed. @@ -62,6 +70,7 @@ where let batch = self.prev.next_batch(parent).await?; self.batch = Some(batch); self.is_last_in_span = self.prev.is_last_in_span(); + self.metrics.record_batch_loaded(); } self.batch.as_ref().cloned().ok_or(PipelineError::Eof.temp()) } @@ -82,6 +91,7 @@ where let attributes = match self.create_next_attributes(batch, parent).await { Ok(attributes) => attributes, Err(e) => { + self.metrics.record_attributes_creation_failure(); return Err(e); } }; @@ -91,6 +101,10 @@ where // Clear out the local state once payload attributes are prepared. self.batch = None; self.is_last_in_span = false; + + // Record that attributes were successfully created. + self.metrics.record_attributes_created(); + Ok(populated_attributes) } @@ -228,7 +242,12 @@ mod tests { let cfg = cfg.unwrap_or_default(); let mock_batch_queue = new_test_attributes_provider(origin, batches); let mock_attributes_builder = TestAttributesBuilder::default(); - AttributesQueue::new(Arc::new(cfg), mock_batch_queue, mock_attributes_builder) + AttributesQueue::new( + Arc::new(cfg), + mock_batch_queue, + mock_attributes_builder, + PipelineMetrics::no_op(), + ) } #[tokio::test] @@ -246,7 +265,8 @@ mod tests { let cfg = RollupConfig::default(); let mock = new_test_attributes_provider(None, vec![]); let mock_builder = TestAttributesBuilder::default(); - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); aq.batch = Some(SingleBatch::default()); assert!(!aq.prev.reset); aq.signal(ResetSignal::default().signal()).await.unwrap(); @@ -340,7 +360,8 @@ mod tests { let mut payload_attributes = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] }; - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let txs = vec![Bytes::default(), Bytes::default()]; let batch = SingleBatch { transactions: txs.clone(), ..Default::default() }; @@ -368,7 +389,8 @@ mod tests { let mock = new_test_attributes_provider(None, vec![Ok(Default::default())]); let mut pa = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(pa.clone())] }; - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); // If we load the batch, we should get the last in span. // But it won't take it so it will be available in the next_attributes call. let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); diff --git a/crates/derive/src/traits/attributes.rs b/crates/derive/src/traits/attributes.rs index 070ac724e..ba988fa4c 100644 --- a/crates/derive/src/traits/attributes.rs +++ b/crates/derive/src/traits/attributes.rs @@ -47,3 +47,13 @@ pub trait AttributesBuilder { epoch: BlockNumHash, ) -> PipelineResult; } + +/// Metrics trait for `AttributesQueue`. +pub trait AttributesQueueMetrics: Send + Sync { + /// Records the creation of a new batch of attributes. + fn record_attributes_created(&self); + /// Records the loading of a new batch of attributes. + fn record_batch_loaded(&self); + /// Records the creation of a new batch of attributes. + fn record_attributes_creation_failure(&self); +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 3736ee9a6..940318a48 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -10,7 +10,9 @@ mod providers; pub use providers::{ChainProvider, L2ChainProvider}; mod attributes; -pub use attributes::{AttributesBuilder, AttributesProvider, NextAttributes}; +pub use attributes::{ + AttributesBuilder, AttributesProvider, AttributesQueueMetrics, NextAttributes, +}; mod data_sources; pub use data_sources::{AsyncIterator, BlobProvider, DataAvailabilityProvider};