diff --git a/.gitignore b/.gitignore index 7df26869d..cdb74debd 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ monorepo/ # Environment Variables .env + +# JetBrains +**/.idea/ diff --git a/bin/client/src/kona.rs b/bin/client/src/kona.rs index e1d825a37..fc7d912bd 100644 --- a/bin/client/src/kona.rs +++ b/bin/client/src/kona.rs @@ -17,6 +17,7 @@ use kona_common_proc::client_entry; pub(crate) mod fault; use fault::{fpvm_handle_register, HINT_WRITER, ORACLE_READER}; +use kona_derive::metrics::PipelineMetrics; /// The size of the LRU cache in the oracle. const ORACLE_LRU_SIZE: usize = 1024; @@ -42,6 +43,7 @@ fn main() -> Result<()> { let l1_provider = OracleL1ChainProvider::new(boot.clone(), oracle.clone()); let l2_provider = OracleL2ChainProvider::new(boot.clone(), oracle.clone()); let beacon = OracleBlobProvider::new(oracle.clone()); + let metrics = PipelineMetrics::no_op(); //////////////////////////////////////////////////////////////// // DERIVATION & EXECUTION // @@ -54,6 +56,7 @@ fn main() -> Result<()> { beacon, l1_provider, l2_provider.clone(), + metrics, ) .await?; diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 96da16245..464ed4442 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -13,6 +13,7 @@ use core::fmt::Debug; use kona_derive::{ attributes::StatefulAttributesBuilder, errors::{PipelineErrorKind, ResetError}, + metrics::PipelineMetrics, pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult}, sources::EthereumDataSource, stages::{ @@ -121,6 +122,7 @@ where blob_provider: B, mut chain_provider: OracleL1ChainProvider, mut l2_chain_provider: OracleL2ChainProvider, + metrics: PipelineMetrics, ) -> Result { let cfg = Arc::new(boot_info.rollup_config.clone()); @@ -158,6 +160,7 @@ where .chain_provider(chain_provider) .builder(attributes) .origin(l1_origin) + .metrics(metrics) .build(); Ok(Self { l2_safe_head, l2_safe_head_header, pipeline }) diff --git a/crates/derive-alloy/src/pipeline.rs b/crates/derive-alloy/src/pipeline.rs index d284b0a83..e21a95fd3 100644 --- a/crates/derive-alloy/src/pipeline.rs +++ b/crates/derive-alloy/src/pipeline.rs @@ -1,7 +1,11 @@ //! Helper to construct a [DerivationPipeline] using online types. +use crate::{ + AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback, +}; use kona_derive::{ attributes::StatefulAttributesBuilder, + metrics::PipelineMetrics, pipeline::{DerivationPipeline, PipelineBuilder}, sources::EthereumDataSource, stages::{ @@ -13,10 +17,6 @@ use op_alloy_genesis::RollupConfig; use op_alloy_protocol::BlockInfo; use std::sync::Arc; -use crate::{ - AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback, -}; - /// An online derivation pipeline. pub type OnlinePipeline = DerivationPipeline, AlloyL2ChainProvider>; @@ -63,6 +63,7 @@ pub fn new_online_pipeline( .chain_provider(chain_provider) .builder(builder) .origin(origin) + .metrics(PipelineMetrics::no_op()) .build() } diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index bee907787..3b955bbf9 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -22,6 +22,7 @@ pub mod prelude { pub mod attributes; pub mod errors; +pub mod metrics; pub mod pipeline; pub mod sources; pub mod stages; diff --git a/crates/derive/src/metrics/mod.rs b/crates/derive/src/metrics/mod.rs new file mode 100644 index 000000000..291170342 --- /dev/null +++ b/crates/derive/src/metrics/mod.rs @@ -0,0 +1,165 @@ +//! Metrics for the derivation pipeline. + +mod noop; + +use crate::{ + errors::PipelineErrorKind, + pipeline::Signal, + traits::{ + AttributesQueueMetrics, BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics, + ChannelReaderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, + L1TraversalMetrics, StepResult, + }, +}; +use alloc::sync::Arc; +use core::fmt::Debug; + +/// Composite metrics struct containing metrics for all stages. +#[derive(Clone)] +pub struct PipelineMetrics { + pub(crate) derivation_pipeline_metrics: Arc, + pub(crate) l1_traversal_metrics: Arc, + pub(crate) l1_retrieval_metrics: Arc, + pub(crate) frame_queue_metrics: Arc, + pub(crate) channel_provider_metrics: Arc, + pub(crate) channel_reader_metrics: Arc, + pub(crate) batch_stream_metrics: Arc, + pub(crate) batch_queue_metrics: Arc, + pub(crate) atrirbutes_queue_metrics: Arc, +} + +impl Debug for PipelineMetrics { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("PipelineMetrics").finish() + } +} + +impl DerivationPipelineMetrics for PipelineMetrics { + fn record_step_result(&self, result: &StepResult) { + self.derivation_pipeline_metrics.record_step_result(result) + } + + fn record_signal(&self, signal: &Signal) { + self.derivation_pipeline_metrics.record_signal(signal) + } +} + +impl L1TraversalMetrics for PipelineMetrics { + fn record_block_processed(&self, block_number: u64) { + self.l1_traversal_metrics.record_block_processed(block_number) + } + + fn record_system_config_update(&self) { + self.l1_traversal_metrics.record_system_config_update() + } + + fn record_reorg_detected(&self) { + self.l1_traversal_metrics.record_reorg_detected() + } + + fn record_holocene_activation(&self) { + self.l1_traversal_metrics.record_holocene_activation() + } +} + +impl L1RetrievalMetrics for PipelineMetrics { + fn record_data_fetch_attempt(&self, block_number: u64) { + self.l1_retrieval_metrics.record_data_fetch_attempt(block_number) + } + + fn record_data_fetch_success(&self, block_number: u64) { + self.l1_retrieval_metrics.record_data_fetch_success(block_number) + } + + fn record_data_fetch_failure(&self, block_number: u64, error: &PipelineErrorKind) { + self.l1_retrieval_metrics.record_data_fetch_failure(block_number, error) + } + + fn record_block_processed(&self, block_number: u64) { + self.l1_retrieval_metrics.record_block_processed(block_number) + } +} + +impl FrameQueueMetrics for PipelineMetrics { + fn record_frames_decoded(&self, count: usize) { + self.frame_queue_metrics.record_frames_decoded(count) + } + + fn record_frames_dropped(&self, count: usize) { + self.frame_queue_metrics.record_frames_dropped(count) + } + + fn record_frames_queued(&self, count: usize) { + self.frame_queue_metrics.record_frames_queued(count) + } + + fn record_load_frames_attempt(&self) { + self.frame_queue_metrics.record_load_frames_attempt() + } +} + +impl ChannelProviderMetrics for PipelineMetrics { + fn record_stage_transition(&self, from: &str, to: &str) { + self.channel_provider_metrics.record_stage_transition(from, to) + } + + fn record_data_item_provided(&self) { + self.channel_provider_metrics.record_data_item_provided() + } +} + +impl ChannelReaderMetrics for PipelineMetrics { + fn record_batch_read(&self) { + self.channel_reader_metrics.record_batch_read() + } + + fn record_channel_flushed(&self) { + self.channel_reader_metrics.record_channel_flushed() + } +} + +impl BatchStreamMetrics for PipelineMetrics { + fn record_batch_processed(&self) { + self.batch_stream_metrics.record_batch_processed() + } + + fn record_span_batch_accepted(&self) { + self.batch_stream_metrics.record_span_batch_accepted() + } + + fn record_span_batch_dropped(&self) { + self.batch_stream_metrics.record_span_batch_dropped() + } + + fn record_buffer_size(&self, size: usize) { + self.batch_stream_metrics.record_buffer_size(size) + } +} + +impl BatchQueueMetrics for PipelineMetrics { + fn record_batches_queued(&self, count: usize) { + self.batch_queue_metrics.record_batches_queued(count) + } + + fn record_batch_dropped(&self) { + self.batch_queue_metrics.record_batch_dropped() + } + + fn record_epoch_advanced(&self, epoch: u64) { + self.batch_queue_metrics.record_epoch_advanced(epoch) + } +} + +impl AttributesQueueMetrics for PipelineMetrics { + fn record_attributes_created(&self) { + self.atrirbutes_queue_metrics.record_attributes_created() + } + + fn record_batch_loaded(&self) { + self.atrirbutes_queue_metrics.record_batch_loaded() + } + + fn record_attributes_creation_failure(&self) { + self.atrirbutes_queue_metrics.record_attributes_creation_failure() + } +} diff --git a/crates/derive/src/metrics/noop.rs b/crates/derive/src/metrics/noop.rs new file mode 100644 index 000000000..83476d80e --- /dev/null +++ b/crates/derive/src/metrics/noop.rs @@ -0,0 +1,189 @@ +use crate::{ + errors::PipelineErrorKind, + metrics::PipelineMetrics, + pipeline::{Signal, StepResult}, + traits::{ + AttributesQueueMetrics, BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics, + ChannelReaderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics, + L1TraversalMetrics, + }, +}; +use alloc::sync::Arc; + +impl PipelineMetrics { + /// No-op implementation for `PipelineMetrics`. + pub fn no_op() -> Self { + Self { + derivation_pipeline_metrics: Arc::new(NoopDerivationPipelineMetrics), + l1_traversal_metrics: Arc::new(NoopL1TraversalMetrics), + l1_retrieval_metrics: Arc::new(NoopL1RetrievalMetrics), + frame_queue_metrics: Arc::new(NoopFrameQueueMetrics), + channel_provider_metrics: Arc::new(NoopChannelProviderMetrics), + channel_reader_metrics: Arc::new(NoopChannelReaderMetrics), + batch_stream_metrics: Arc::new(NoopBatchStreamMetrics), + batch_queue_metrics: Arc::new(NoopBatchQueueMetrics), + atrirbutes_queue_metrics: Arc::new(NoopAttributesQueueMetrics), + } + } +} + +/// No-op implementation of `DerivationPipelineMetrics`. +#[derive(Debug)] +struct NoopDerivationPipelineMetrics; + +impl DerivationPipelineMetrics for NoopDerivationPipelineMetrics { + fn record_step_result(&self, _result: &StepResult) { + // No-op + } + + fn record_signal(&self, _signal: &Signal) { + // No-op + } +} + +/// No-op implementation of `L1TraversalMetrics`. +#[derive(Debug)] +struct NoopL1TraversalMetrics; + +impl L1TraversalMetrics for NoopL1TraversalMetrics { + fn record_block_processed(&self, _block_number: u64) { + // No-op + } + + fn record_system_config_update(&self) { + // No-op + } + + fn record_reorg_detected(&self) { + // No-op + } + + fn record_holocene_activation(&self) { + // No-op + } +} + +/// No-op implementation of `L1RetrievalMetrics`. +#[derive(Debug)] +struct NoopL1RetrievalMetrics; + +impl L1RetrievalMetrics for NoopL1RetrievalMetrics { + fn record_data_fetch_attempt(&self, _block_number: u64) { + // No-op + } + + fn record_data_fetch_success(&self, _block_number: u64) { + // No-op + } + + fn record_data_fetch_failure(&self, _block_number: u64, _error: &PipelineErrorKind) { + // No-op + } + + fn record_block_processed(&self, _block_number: u64) { + // No-op + } +} + +/// No-op implementation of `FrameQueueMetrics`. +#[derive(Debug)] +struct NoopFrameQueueMetrics; + +impl FrameQueueMetrics for NoopFrameQueueMetrics { + fn record_frames_decoded(&self, _count: usize) { + // No-op + } + + fn record_frames_dropped(&self, _count: usize) { + // No-op + } + + fn record_frames_queued(&self, _count: usize) { + // No-op + } + + fn record_load_frames_attempt(&self) { + // No-op + } +} + +#[derive(Debug)] +struct NoopChannelProviderMetrics; + +impl ChannelProviderMetrics for NoopChannelProviderMetrics { + fn record_stage_transition(&self, _from: &str, _to: &str) { + // No-op + } + + fn record_data_item_provided(&self) { + // No-op + } +} + +#[derive(Debug)] +struct NoopChannelReaderMetrics; + +impl ChannelReaderMetrics for NoopChannelReaderMetrics { + fn record_batch_read(&self) { + // No-op + } + + fn record_channel_flushed(&self) { + // No-op + } +} + +#[derive(Debug)] +struct NoopBatchStreamMetrics; + +impl BatchStreamMetrics for NoopBatchStreamMetrics { + fn record_batch_processed(&self) { + // No-op + } + + fn record_span_batch_accepted(&self) { + // No-op + } + + fn record_span_batch_dropped(&self) { + // No-op + } + + fn record_buffer_size(&self, _size: usize) { + // No-op + } +} + +#[derive(Debug)] +struct NoopBatchQueueMetrics; + +impl BatchQueueMetrics for NoopBatchQueueMetrics { + fn record_batches_queued(&self, _count: usize) { + // No-op + } + + fn record_batch_dropped(&self) { + // No-op + } + + fn record_epoch_advanced(&self, _epoch: u64) { + // No-op + } +} + +#[derive(Debug)] +struct NoopAttributesQueueMetrics; + +impl AttributesQueueMetrics for NoopAttributesQueueMetrics { + fn record_attributes_created(&self) { + // No-op + } + + fn record_batch_loaded(&self) { + // No-op + } + + fn record_attributes_creation_failure(&self) { + // No-op + } +} diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index 8dc995412..a6d506bda 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -2,6 +2,7 @@ use super::{AttributesBuilder, DataAvailabilityProvider, DerivationPipeline}; use crate::{ + metrics::PipelineMetrics, stages::{ AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, @@ -37,6 +38,7 @@ where builder: Option, origin: Option, rollup_config: Option>, + metrics: Option, } impl Default for PipelineBuilder @@ -54,6 +56,7 @@ where builder: None, origin: None, rollup_config: None, + metrics: None, } } } @@ -106,6 +109,12 @@ where self } + /// Sets the metrics implementation for the pipeline. + pub fn metrics(mut self, metrics: PipelineMetrics) -> Self { + self.metrics = Some(metrics); + self + } + /// Builds the pipeline. pub fn build(self) -> DerivationPipeline, T> { self.into() @@ -127,22 +136,39 @@ where let l2_chain_provider = builder.l2_chain_provider.expect("chain_provider must be set"); let dap_source = builder.dap_source.expect("dap_source must be set"); let attributes_builder = builder.builder.expect("builder must be set"); + let metrics = builder.metrics.expect("metrics must be set"); // Compose the stage stack. - let mut l1_traversal = L1Traversal::new(chain_provider, Arc::clone(&rollup_config)); + let mut l1_traversal = + L1Traversal::new(chain_provider, Arc::clone(&rollup_config), metrics.clone()); l1_traversal.block = Some(builder.origin.expect("origin must be set")); - let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); - let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config)); - let channel_provider = ChannelProvider::new(Arc::clone(&rollup_config), frame_queue); - let channel_reader = ChannelReader::new(channel_provider, Arc::clone(&rollup_config)); - let batch_stream = - BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); - let batch_provider = - BatchProvider::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); - let attributes = - AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder); + let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source, metrics.clone()); + let frame_queue = + FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config), metrics.clone()); + let channel_provider = + ChannelProvider::new(Arc::clone(&rollup_config), frame_queue, metrics.clone()); + let channel_reader = + ChannelReader::new(channel_provider, Arc::clone(&rollup_config), metrics.clone()); + let batch_stream = BatchStream::new( + channel_reader, + rollup_config.clone(), + l2_chain_provider.clone(), + metrics.clone(), + ); + let batch_provider = BatchProvider::new( + rollup_config.clone(), + batch_stream, + l2_chain_provider.clone(), + metrics.clone(), + ); + let attributes = AttributesQueue::new( + rollup_config.clone(), + batch_provider, + attributes_builder, + metrics.clone(), + ); // Create the pipeline. - Self::new(attributes, rollup_config, l2_chain_provider) + Self::new(attributes, rollup_config, l2_chain_provider, metrics) } } diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 6d3f09f09..544acbb32 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -6,7 +6,11 @@ use super::{ }; use crate::{ errors::PipelineErrorKind, - traits::{ActivationSignal, L2ChainProvider, ResetSignal, Signal, SignalReceiver}, + metrics::PipelineMetrics, + traits::{ + ActivationSignal, DerivationPipelineMetrics, L2ChainProvider, ResetSignal, Signal, + SignalReceiver, + }, }; use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc}; use async_trait::async_trait; @@ -33,6 +37,8 @@ where pub rollup_config: Arc, /// The L2 Chain Provider used to fetch the system config on reset. pub l2_chain_provider: P, + /// Metrics collector. + pub metrics: PipelineMetrics, } impl DerivationPipeline @@ -45,8 +51,9 @@ where attributes: S, rollup_config: Arc, l2_chain_provider: P, + metrics: PipelineMetrics, ) -> Self { - Self { attributes, prepared: VecDeque::new(), rollup_config, l2_chain_provider } + Self { attributes, prepared: VecDeque::new(), rollup_config, l2_chain_provider, metrics } } } @@ -95,6 +102,8 @@ where match signal { s @ Signal::Reset(ResetSignal { l2_safe_head, .. }) | s @ Signal::Activation(ActivationSignal { l2_safe_head, .. }) => { + self.metrics.record_signal(&s); + let system_config = self .l2_chain_provider .system_config_by_number( @@ -117,6 +126,8 @@ where } } Signal::FlushChannel => { + self.metrics.record_signal(&signal); + self.attributes.signal(signal).await?; } } @@ -148,7 +159,7 @@ where /// /// [PipelineError]: crate::errors::PipelineError async fn step(&mut self, cursor: L2BlockInfo) -> StepResult { - match self.attributes.next_attributes(cursor).await { + let result = match self.attributes.next_attributes(cursor).await { Ok(a) => { trace!(target: "pipeline", "Prepared L2 attributes: {:?}", a); self.prepared.push_back(a); @@ -167,13 +178,18 @@ where StepResult::StepFailed(err) } }, - } + }; + + self.metrics.record_step_result(&result); + + result } } #[cfg(test)] mod tests { use crate::{ + metrics::PipelineMetrics, pipeline::{DerivationPipeline, PipelineError, StepResult}, test_utils::{TestL2ChainProvider, *}, traits::{ActivationSignal, Pipeline, ResetSignal, Signal, SignalReceiver}, @@ -243,7 +259,9 @@ mod tests { let l2_chain_provider = TestL2ChainProvider::default(); let expected = default_test_payload_attributes(); let attributes = TestNextAttributes { next_attributes: Some(expected) }; - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = PipelineMetrics::no_op(); + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Step on the pipeline and expect the result. let cursor = L2BlockInfo::default(); @@ -256,7 +274,9 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = PipelineMetrics::no_op(); + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Step on the pipeline and expect the result. let cursor = L2BlockInfo::default(); @@ -270,7 +290,9 @@ mod tests { let mut l2_chain_provider = TestL2ChainProvider::default(); l2_chain_provider.system_configs.insert(0, SystemConfig::default()); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = PipelineMetrics::no_op(); + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Signal the pipeline to reset. let result = pipeline.signal(ActivationSignal::default().signal()).await; @@ -282,7 +304,9 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = PipelineMetrics::no_op(); + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Signal the pipeline to reset. let result = pipeline.signal(Signal::FlushChannel).await; @@ -294,7 +318,9 @@ mod tests { let rollup_config = Arc::new(RollupConfig::default()); let l2_chain_provider = TestL2ChainProvider::default(); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = PipelineMetrics::no_op(); + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Signal the pipeline to reset. let result = pipeline.signal(ResetSignal::default().signal()).await.unwrap_err(); @@ -307,7 +333,9 @@ mod tests { let mut l2_chain_provider = TestL2ChainProvider::default(); l2_chain_provider.system_configs.insert(0, SystemConfig::default()); let attributes = TestNextAttributes::default(); - let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + let metrics = PipelineMetrics::no_op(); + let mut pipeline = + DerivationPipeline::new(attributes, rollup_config, l2_chain_provider, metrics); // Signal the pipeline to reset. let result = pipeline.signal(ResetSignal::default().signal()).await; diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index f746d365a..987fa5e19 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -2,9 +2,10 @@ use crate::{ errors::{PipelineError, PipelineResult, ResetError}, + metrics::PipelineMetrics, traits::{ - AttributesBuilder, AttributesProvider, NextAttributes, OriginAdvancer, OriginProvider, - Signal, SignalReceiver, + AttributesBuilder, AttributesProvider, AttributesQueueMetrics, NextAttributes, + OriginAdvancer, OriginProvider, Signal, SignalReceiver, }, }; use alloc::{boxed::Box, sync::Arc}; @@ -44,6 +45,8 @@ where batch: Option, /// The attributes builder. builder: AB, + /// Metrics collector. + metrics: PipelineMetrics, } impl AttributesQueue @@ -52,8 +55,13 @@ where AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. - pub const fn new(cfg: Arc, prev: P, builder: AB) -> Self { - Self { cfg, prev, is_last_in_span: false, batch: None, builder } + pub const fn new( + cfg: Arc, + prev: P, + builder: AB, + metrics: PipelineMetrics, + ) -> Self { + Self { cfg, prev, is_last_in_span: false, batch: None, builder, metrics } } /// Loads a [SingleBatch] from the [AttributesProvider] if needed. @@ -62,6 +70,7 @@ where let batch = self.prev.next_batch(parent).await?; self.batch = Some(batch); self.is_last_in_span = self.prev.is_last_in_span(); + self.metrics.record_batch_loaded(); } self.batch.as_ref().cloned().ok_or(PipelineError::Eof.temp()) } @@ -82,6 +91,7 @@ where let attributes = match self.create_next_attributes(batch, parent).await { Ok(attributes) => attributes, Err(e) => { + self.metrics.record_attributes_creation_failure(); return Err(e); } }; @@ -91,6 +101,10 @@ where // Clear out the local state once payload attributes are prepared. self.batch = None; self.is_last_in_span = false; + + // Record that attributes were successfully created. + self.metrics.record_attributes_created(); + Ok(populated_attributes) } @@ -228,7 +242,12 @@ mod tests { let cfg = cfg.unwrap_or_default(); let mock_batch_queue = new_test_attributes_provider(origin, batches); let mock_attributes_builder = TestAttributesBuilder::default(); - AttributesQueue::new(Arc::new(cfg), mock_batch_queue, mock_attributes_builder) + AttributesQueue::new( + Arc::new(cfg), + mock_batch_queue, + mock_attributes_builder, + PipelineMetrics::no_op(), + ) } #[tokio::test] @@ -246,7 +265,8 @@ mod tests { let cfg = RollupConfig::default(); let mock = new_test_attributes_provider(None, vec![]); let mock_builder = TestAttributesBuilder::default(); - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); aq.batch = Some(SingleBatch::default()); assert!(!aq.prev.reset); aq.signal(ResetSignal::default().signal()).await.unwrap(); @@ -340,7 +360,8 @@ mod tests { let mut payload_attributes = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] }; - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let txs = vec![Bytes::default(), Bytes::default()]; let batch = SingleBatch { transactions: txs.clone(), ..Default::default() }; @@ -368,7 +389,8 @@ mod tests { let mock = new_test_attributes_provider(None, vec![Ok(Default::default())]); let mut pa = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(pa.clone())] }; - let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); + let mut aq = + AttributesQueue::new(Arc::new(cfg), mock, mock_builder, PipelineMetrics::no_op()); // If we load the batch, we should get the last in span. // But it won't take it so it will be available in the next_attributes call. let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); diff --git a/crates/derive/src/stages/batch/batch_provider.rs b/crates/derive/src/stages/batch/batch_provider.rs index 315aa74af..7ba515800 100644 --- a/crates/derive/src/stages/batch/batch_provider.rs +++ b/crates/derive/src/stages/batch/batch_provider.rs @@ -3,6 +3,7 @@ use super::NextBatchProvider; use crate::{ errors::{PipelineError, PipelineResult}, + metrics::PipelineMetrics, stages::{BatchQueue, BatchValidator}, traits::{ AttributesProvider, L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver, @@ -47,6 +48,8 @@ where /// /// Must be [None] if `prev` or `batch_queue` is [Some]. batch_validator: Option>, + /// Metrics collector. + metrics: PipelineMetrics, } impl BatchProvider @@ -55,8 +58,13 @@ where F: L2ChainProvider + Clone + Debug, { /// Creates a new [BatchProvider] with the given configuration and previous stage. - pub const fn new(cfg: Arc, prev: P, provider: F) -> Self { - Self { cfg, provider, prev: Some(prev), batch_queue: None, batch_validator: None } + pub const fn new( + cfg: Arc, + prev: P, + provider: F, + metrics: PipelineMetrics, + ) -> Self { + Self { cfg, provider, prev: Some(prev), batch_queue: None, batch_validator: None, metrics } } /// Attempts to update the active stage of the mux. @@ -68,8 +76,12 @@ where if self.cfg.is_holocene_active(origin.timestamp) { self.batch_validator = Some(BatchValidator::new(self.cfg.clone(), prev)); } else { - self.batch_queue = - Some(BatchQueue::new(self.cfg.clone(), prev, self.provider.clone())); + self.batch_queue = Some(BatchQueue::new( + self.cfg.clone(), + prev, + self.provider.clone(), + self.metrics.clone(), + )); } } else if self.batch_queue.is_some() && self.cfg.is_holocene_active(origin.timestamp) { // If the batch queue is active and Holocene is also active, transition to the batch @@ -83,8 +95,12 @@ where // reorg around Holocene activation. Transition back to the batch queue // until Holocene re-activates. let batch_validator = self.batch_validator.take().expect("Must have batch validator"); - let mut bq = - BatchQueue::new(self.cfg.clone(), batch_validator.prev, self.provider.clone()); + let mut bq = BatchQueue::new( + self.cfg.clone(), + batch_validator.prev, + self.provider.clone(), + self.metrics.clone(), + ); bq.l1_blocks = batch_validator.l1_blocks; self.batch_queue = Some(bq); } @@ -178,6 +194,7 @@ where mod test { use super::BatchProvider; use crate::{ + metrics::PipelineMetrics, test_utils::{TestL2ChainProvider, TestNextBatchProvider}, traits::{OriginProvider, ResetSignal, SignalReceiver}, }; @@ -190,7 +207,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); assert!(batch_provider.attempt_update().is_ok()); assert!(batch_provider.prev.is_none()); @@ -203,7 +221,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig::default()); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); assert!(batch_provider.attempt_update().is_ok()); assert!(batch_provider.prev.is_none()); @@ -216,7 +235,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); batch_provider.attempt_update().unwrap(); @@ -239,7 +259,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); batch_provider.attempt_update().unwrap(); @@ -270,7 +291,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig::default()); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); // Reset the batch provider. batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); @@ -286,7 +308,8 @@ mod test { let provider = TestNextBatchProvider::new(vec![]); let l2_provider = TestL2ChainProvider::default(); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + let mut batch_provider = + BatchProvider::new(cfg, provider, l2_provider, PipelineMetrics::no_op()); // Reset the batch provider. batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); diff --git a/crates/derive/src/stages/batch/batch_queue.rs b/crates/derive/src/stages/batch/batch_queue.rs index dc8ba759f..47fd19441 100644 --- a/crates/derive/src/stages/batch/batch_queue.rs +++ b/crates/derive/src/stages/batch/batch_queue.rs @@ -3,9 +3,10 @@ use super::NextBatchProvider; use crate::{ errors::{PipelineEncodingError, PipelineError, PipelineErrorKind, PipelineResult, ResetError}, + metrics::PipelineMetrics, traits::{ - AttributesProvider, L2ChainProvider, OriginAdvancer, OriginProvider, ResetSignal, Signal, - SignalReceiver, + AttributesProvider, BatchQueueMetrics, L2ChainProvider, OriginAdvancer, OriginProvider, + ResetSignal, Signal, SignalReceiver, }, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; @@ -58,6 +59,8 @@ where pub(crate) next_spans: Vec, /// Used to validate the batches. pub(crate) fetcher: BF, + /// Metrics collector. + metrics: PipelineMetrics, } impl BatchQueue @@ -67,7 +70,7 @@ where { /// Creates a new [BatchQueue] stage. #[allow(clippy::missing_const_for_fn)] - pub fn new(cfg: Arc, prev: P, fetcher: BF) -> Self { + pub fn new(cfg: Arc, prev: P, fetcher: BF, metrics: PipelineMetrics) -> Self { Self { cfg, prev, @@ -76,6 +79,7 @@ where batches: Default::default(), next_spans: Default::default(), fetcher, + metrics, } } @@ -232,6 +236,7 @@ where "Advancing to next epoch: {}, timestamp: {}, epoch timestamp: {}", next_epoch.number, next_timestamp, next_epoch.timestamp ); + self.metrics.record_epoch_advanced(next_epoch.number); self.l1_blocks.remove(0); Err(PipelineError::Eof.temp()) } @@ -252,12 +257,14 @@ where (self.cfg.is_holocene_active(origin.timestamp) && validity.is_future()); if drop { self.prev.flush(); + self.metrics.record_batch_dropped(); return Ok(()); } else if validity.is_outdated() { // If the batch is outdated, we drop it without flushing the previous stage. return Ok(()); } self.batches.push(data); + self.metrics.record_batches_queued(self.batches.len()); Ok(()) } } @@ -492,7 +499,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let sb = SingleBatch::default(); bq.next_spans.push(sb.clone()); @@ -506,7 +513,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.l1_blocks.push(BlockInfo::default()); bq.next_spans.push(SingleBatch::default()); bq.batches.push(BatchWithInclusionBlock { @@ -527,7 +534,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.l1_blocks.push(BlockInfo::default()); bq.next_spans.push(SingleBatch::default()); bq.batches.push(BatchWithInclusionBlock { @@ -569,7 +576,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -600,7 +607,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -634,7 +641,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -669,7 +676,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -685,7 +692,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(data); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let result = bq.derive_next_batch(false, parent).await.unwrap_err(); assert_eq!(result, PipelineError::MissingOrigin.crit()); @@ -702,7 +709,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo { l1_origin: BlockNumHash { number: 10, ..Default::default() }, ..Default::default() @@ -723,7 +730,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); bq.l1_blocks.push(BlockInfo::default()); @@ -747,7 +754,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); bq.l1_blocks.push(BlockInfo::default()); @@ -771,7 +778,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); bq.l1_blocks.push(BlockInfo::default()); bq.l1_blocks.push(BlockInfo::default()); @@ -806,7 +813,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -848,7 +855,7 @@ mod tests { let fetcher = TestL2ChainProvider::default(); // Configure batch queue - let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher, PipelineMetrics::no_op()); bq.origin = Some(BlockInfo::default()); // Set the origin bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq @@ -885,7 +892,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let sb = SingleBatch::default(); bq.next_spans.push(sb.clone()); let next = bq.next_batch(L2BlockInfo::default()).await.unwrap(); @@ -904,7 +911,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let sb = SingleBatch::default(); bq.next_spans.push(sb.clone()); let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); @@ -919,7 +926,7 @@ mod tests { let batch = reader.next_batch(cfg.as_ref()).unwrap(); let mock = TestNextBatchProvider::new(vec![Ok(batch)]); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); assert_eq!(res, PipelineError::NotEnoughData.temp()); assert!(bq.is_last_in_span()); @@ -936,7 +943,7 @@ mod tests { let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo { l1_origin: BlockNumHash { number: 10, ..Default::default() }, ..Default::default() @@ -1064,7 +1071,7 @@ mod tests { op_blocks: vec![block, second], ..Default::default() }; - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo { block_info: BlockInfo { number: 9, @@ -1093,7 +1100,7 @@ mod tests { let cfg = Arc::new(RollupConfig::default()); let mock = TestNextBatchProvider::new(data); let fetcher = TestL2ChainProvider::default(); - let mut bq = BatchQueue::new(cfg, mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher, PipelineMetrics::no_op()); let parent = L2BlockInfo::default(); let batch = bq.next_batch(parent).await.unwrap(); assert_eq!(batch, SingleBatch::default()); diff --git a/crates/derive/src/stages/batch/batch_stream.rs b/crates/derive/src/stages/batch/batch_stream.rs index b932420f1..854d20f2d 100644 --- a/crates/derive/src/stages/batch/batch_stream.rs +++ b/crates/derive/src/stages/batch/batch_stream.rs @@ -2,8 +2,11 @@ use crate::{ errors::{PipelineEncodingError, PipelineError, PipelineResult}, + metrics::PipelineMetrics, stages::NextBatchProvider, - traits::{L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + traits::{ + BatchStreamMetrics, L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver, + }, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use async_trait::async_trait; @@ -50,6 +53,8 @@ where config: Arc, /// Used to validate the batches. fetcher: BF, + /// Metrics collector. + metrics: PipelineMetrics, } impl BatchStream @@ -58,8 +63,13 @@ where BF: L2ChainProvider + Debug, { /// Create a new [BatchStream] stage. - pub const fn new(prev: P, config: Arc, fetcher: BF) -> Self { - Self { prev, span: None, buffer: VecDeque::new(), config, fetcher } + pub const fn new( + prev: P, + config: Arc, + fetcher: BF, + metrics: PipelineMetrics, + ) -> Self { + Self { prev, span: None, buffer: VecDeque::new(), config, fetcher, metrics } } /// Returns if the [BatchStream] stage is active based on the @@ -154,8 +164,14 @@ where .await; match validity { - BatchValidity::Accept => self.span = Some(b), + BatchValidity::Accept => { + self.metrics.record_span_batch_accepted(); + self.metrics.record_batch_processed(); + self.span = Some(b) + } BatchValidity::Drop => { + self.metrics.record_span_batch_dropped(); + // Flush the stage. self.flush(); @@ -177,6 +193,9 @@ where } } + self.metrics.record_buffer_size(self.buffer.len()); + self.metrics.record_batch_processed(); + // Attempt to pull a SingleBatch out of the SpanBatch. self.get_single_batch(parent, l1_origins).map(Batch::Single) } @@ -232,7 +251,12 @@ mod test { async fn test_batch_stream_flush() { let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(vec![]); - let mut stream = BatchStream::new(prev, config, TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config, + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); stream.buffer.push_back(SingleBatch::default()); stream.span = Some(SpanBatch::default()); assert!(!stream.buffer.is_empty()); @@ -246,7 +270,12 @@ mod test { async fn test_batch_stream_reset() { let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(vec![]); - let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config.clone(), + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); stream.buffer.push_back(SingleBatch::default()); stream.span = Some(SpanBatch::default()); assert!(!stream.prev.reset); @@ -260,7 +289,12 @@ mod test { async fn test_batch_stream_flush_channel() { let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(vec![]); - let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config.clone(), + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); stream.buffer.push_back(SingleBatch::default()); stream.span = Some(SpanBatch::default()); assert!(!stream.prev.flushed); @@ -279,7 +313,12 @@ mod test { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let config = Arc::new(RollupConfig { holocene_time: Some(100), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(data); - let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config.clone(), + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); // The stage should not be active. assert!(!stream.is_active().unwrap()); @@ -313,7 +352,7 @@ mod test { }); let prev = TestBatchStreamProvider::new(data); let provider = TestL2ChainProvider::default(); - let mut stream = BatchStream::new(prev, config.clone(), provider); + let mut stream = BatchStream::new(prev, config.clone(), provider, PipelineMetrics::no_op()); // The stage should be active. assert!(stream.is_active().unwrap()); @@ -371,7 +410,12 @@ mod test { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = TestBatchStreamProvider::new(data); - let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + let mut stream = BatchStream::new( + prev, + config.clone(), + TestL2ChainProvider::default(), + PipelineMetrics::no_op(), + ); // The stage should be active. assert!(stream.is_active().unwrap()); diff --git a/crates/derive/src/stages/channel/channel_provider.rs b/crates/derive/src/stages/channel/channel_provider.rs index 1dc868ac1..d32d3341d 100644 --- a/crates/derive/src/stages/channel/channel_provider.rs +++ b/crates/derive/src/stages/channel/channel_provider.rs @@ -3,7 +3,8 @@ use super::{ChannelAssembler, ChannelBank, ChannelReaderProvider, NextFrameProvider}; use crate::{ errors::{PipelineError, PipelineResult}, - traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + metrics::PipelineMetrics, + traits::{ChannelProviderMetrics, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Bytes; @@ -39,6 +40,8 @@ where /// /// Must be [None] if `prev` or `channel_bank` is [Some]. channel_assembler: Option>, + /// Metrics collector. + metrics: PipelineMetrics, } impl

ChannelProvider

@@ -46,8 +49,8 @@ where P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Creates a new [ChannelProvider] with the given configuration and previous stage. - pub const fn new(cfg: Arc, prev: P) -> Self { - Self { cfg, prev: Some(prev), channel_bank: None, channel_assembler: None } + pub const fn new(cfg: Arc, prev: P, metrics: PipelineMetrics) -> Self { + Self { cfg, prev: Some(prev), channel_bank: None, channel_assembler: None, metrics } } /// Attempts to update the active stage of the mux. @@ -57,13 +60,16 @@ where // On the first call to `attempt_update`, we need to determine the active stage to // initialize the mux with. if self.cfg.is_holocene_active(origin.timestamp) { + self.metrics.record_stage_transition("ChannelProvider", "ChannelAssembler"); self.channel_assembler = Some(ChannelAssembler::new(self.cfg.clone(), prev)); } else { + self.metrics.record_stage_transition("ChannelProvider", "ChannelBank"); self.channel_bank = Some(ChannelBank::new(self.cfg.clone(), prev)); } } else if self.channel_bank.is_some() && self.cfg.is_holocene_active(origin.timestamp) { // If the channel bank is active and Holocene is also active, transition to the channel // assembler. + self.metrics.record_stage_transition("ChannelBank", "ChannelAssembler"); let channel_bank = self.channel_bank.take().expect("Must have channel bank"); self.channel_assembler = Some(ChannelAssembler::new(self.cfg.clone(), channel_bank.prev)); @@ -72,6 +78,7 @@ where // If the channel assembler is active, and Holocene is not active, it indicates an L1 // reorg around Holocene activation. Transition back to the channel bank // until Holocene re-activates. + self.metrics.record_stage_transition("ChannelAssembler", "ChannelBank"); let channel_assembler = self.channel_assembler.take().expect("Must have channel assembler"); self.channel_bank = Some(ChannelBank::new(self.cfg.clone(), channel_assembler.prev)); @@ -141,13 +148,23 @@ where async fn next_data(&mut self) -> PipelineResult> { self.attempt_update()?; - if let Some(channel_assembler) = self.channel_assembler.as_mut() { - channel_assembler.next_data().await + let data = if let Some(channel_assembler) = self.channel_assembler.as_mut() { + let data = channel_assembler.next_data().await?; + if data.is_some() { + self.metrics.record_data_item_provided(); + } + data } else if let Some(channel_bank) = self.channel_bank.as_mut() { - channel_bank.next_data().await + let data = channel_bank.next_data().await?; + if data.is_some() { + self.metrics.record_data_item_provided(); + } + data } else { - Err(PipelineError::NotEnoughData.temp()) - } + return Err(PipelineError::NotEnoughData.temp()); + }; + + Ok(data) } } @@ -155,6 +172,7 @@ where mod test { use super::ChannelProvider; use crate::{ + metrics::PipelineMetrics, prelude::{OriginProvider, PipelineError}, stages::ChannelReaderProvider, test_utils::TestNextFrameProvider, @@ -168,7 +186,7 @@ mod test { fn test_channel_provider_assembler_active() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); assert!(channel_provider.attempt_update().is_ok()); assert!(channel_provider.prev.is_none()); @@ -180,7 +198,7 @@ mod test { fn test_channel_provider_bank_active() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig::default()); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); assert!(channel_provider.attempt_update().is_ok()); assert!(channel_provider.prev.is_none()); @@ -192,7 +210,7 @@ mod test { fn test_channel_provider_retain_current_bank() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig::default()); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); // Assert the multiplexer hasn't been initialized. assert!(channel_provider.channel_bank.is_none()); @@ -215,7 +233,7 @@ mod test { fn test_channel_provider_retain_current_assembler() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); // Assert the multiplexer hasn't been initialized. assert!(channel_provider.channel_bank.is_none()); @@ -238,7 +256,7 @@ mod test { fn test_channel_provider_transition_stage() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); channel_provider.attempt_update().unwrap(); @@ -260,7 +278,7 @@ mod test { fn test_channel_provider_transition_stage_backwards() { let provider = TestNextFrameProvider::new(vec![]); let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg, provider); + let mut channel_provider = ChannelProvider::new(cfg, provider, PipelineMetrics::no_op()); channel_provider.attempt_update().unwrap(); @@ -294,7 +312,8 @@ mod test { ]; let provider = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect()); let cfg = Arc::new(RollupConfig::default()); - let mut channel_provider = ChannelProvider::new(cfg.clone(), provider); + let mut channel_provider = + ChannelProvider::new(cfg.clone(), provider, PipelineMetrics::no_op()); // Load in the first frame. assert_eq!( @@ -325,7 +344,8 @@ mod test { ]; let provider = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect()); let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); - let mut channel_provider = ChannelProvider::new(cfg.clone(), provider); + let mut channel_provider = + ChannelProvider::new(cfg.clone(), provider, PipelineMetrics::no_op()); // Load in the first frame. assert_eq!( diff --git a/crates/derive/src/stages/channel/channel_reader.rs b/crates/derive/src/stages/channel/channel_reader.rs index 3f4f0a6f1..32f51ea86 100644 --- a/crates/derive/src/stages/channel/channel_reader.rs +++ b/crates/derive/src/stages/channel/channel_reader.rs @@ -2,8 +2,9 @@ use crate::{ errors::{PipelineError, PipelineResult}, + metrics::PipelineMetrics, stages::{decompress_brotli, BatchStreamProvider}, - traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + traits::{ChannelReaderMetrics, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; use alloy_primitives::Bytes; @@ -56,6 +57,8 @@ where next_batch: Option, /// The rollup coonfiguration. cfg: Arc, + /// Metrics collector. + metrics: PipelineMetrics, } impl

ChannelReader

@@ -63,8 +66,8 @@ where P: ChannelReaderProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Create a new [ChannelReader] stage. - pub const fn new(prev: P, cfg: Arc) -> Self { - Self { prev, next_batch: None, cfg } + pub const fn new(prev: P, cfg: Arc, metrics: PipelineMetrics) -> Self { + Self { prev, next_batch: None, cfg, metrics } } /// Creates the batch reader from available channel data. @@ -116,6 +119,7 @@ where /// SAFETY: Only called post-holocene activation. fn flush(&mut self) { debug!(target: "channel-reader", "[POST-HOLOCENE] Flushing channel"); + self.metrics.record_channel_flushed(); self.next_channel(); } @@ -132,7 +136,10 @@ where .next_batch(self.cfg.as_ref()) .ok_or(PipelineError::NotEnoughData.temp()) { - Ok(batch) => Ok(batch), + Ok(batch) => { + self.metrics.record_batch_read(); + Ok(batch) + } Err(e) => { self.next_channel(); Err(e) @@ -272,7 +279,8 @@ mod test { #[tokio::test] async fn test_flush_channel_reader() { let mock = TestChannelReaderProvider::new(vec![Ok(Some(new_compressed_batch_data()))]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); reader.next_batch = Some(BatchReader::new( new_compressed_batch_data(), MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, @@ -284,7 +292,8 @@ mod test { #[tokio::test] async fn test_reset_channel_reader() { let mock = TestChannelReaderProvider::new(vec![Ok(None)]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); reader.next_batch = Some(BatchReader::new( vec![0x00, 0x01, 0x02], MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, @@ -298,7 +307,8 @@ mod test { #[tokio::test] async fn test_next_batch_batch_reader_set_fails() { let mock = TestChannelReaderProvider::new(vec![Err(PipelineError::Eof.temp())]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); assert_eq!(reader.next_batch().await, Err(PipelineError::Eof.temp())); assert!(reader.next_batch.is_none()); } @@ -306,7 +316,8 @@ mod test { #[tokio::test] async fn test_next_batch_batch_reader_no_data() { let mock = TestChannelReaderProvider::new(vec![Ok(None)]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); assert!(matches!( reader.next_batch().await.unwrap_err(), PipelineErrorKind::Temporary(PipelineError::ChannelReaderEmpty) @@ -319,7 +330,8 @@ mod test { let mut first = new_compressed_batch_data(); let second = first.split_to(first.len() / 2); let mock = TestChannelReaderProvider::new(vec![Ok(Some(first)), Ok(Some(second))]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); assert_eq!(reader.next_batch().await, Err(PipelineError::NotEnoughData.temp())); assert!(reader.next_batch.is_none()); } @@ -328,7 +340,8 @@ mod test { async fn test_next_batch_succeeds() { let raw = new_compressed_batch_data(); let mock = TestChannelReaderProvider::new(vec![Ok(Some(raw))]); - let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + let mut reader = + ChannelReader::new(mock, Arc::new(RollupConfig::default()), PipelineMetrics::no_op()); let res = reader.next_batch().await.unwrap(); matches!(res, Batch::Span(_)); assert!(reader.next_batch.is_some()); @@ -357,7 +370,7 @@ mod test { let raw = new_compressed_batch_data(); let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let mock = TestChannelReaderProvider::new(vec![Ok(Some(raw))]); - let mut reader = ChannelReader::new(mock, config); + let mut reader = ChannelReader::new(mock, config, PipelineMetrics::no_op()); let res = reader.next_batch().await.unwrap(); matches!(res, Batch::Span(_)); assert!(reader.next_batch.is_some()); diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 6e103785c..1efa1c4b5 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -2,29 +2,20 @@ use crate::{ errors::{PipelineError, PipelineResult}, + metrics::PipelineMetrics, stages::NextFrameProvider, - traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + traits::{ + FrameQueueMetrics, FrameQueueProvider, OriginAdvancer, OriginProvider, Signal, + SignalReceiver, + }, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; -use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, Frame}; use tracing::{debug, error, trace}; -/// Provides data frames for the [FrameQueue] stage. -#[async_trait] -pub trait FrameQueueProvider { - /// An item that can be converted into a byte array. - type Item: Into; - - /// Retrieves the next data item from the L1 retrieval stage. - /// If there is data, it pushes it into the next stage. - /// If there is no data, it returns an error. - async fn next_data(&mut self) -> PipelineResult; -} - /// The [FrameQueue] stage of the derivation pipeline. /// This stage takes the output of the [L1Retrieval] stage and parses it into frames. /// @@ -40,6 +31,8 @@ where queue: VecDeque, /// The rollup config. rollup_config: Arc, + /// Metrics collector. + metrics: PipelineMetrics, } impl

FrameQueue

@@ -49,8 +42,8 @@ where /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// /// [L1Retrieval]: crate::stages::L1Retrieval - pub const fn new(prev: P, cfg: Arc) -> Self { - Self { prev, queue: VecDeque::new(), rollup_config: cfg } + pub const fn new(prev: P, cfg: Arc, metrics: PipelineMetrics) -> Self { + Self { prev, queue: VecDeque::new(), rollup_config: cfg, metrics } } /// Returns if holocene is active. @@ -64,6 +57,8 @@ where return; } + let initial_len = self.queue.len(); + let mut i = 0; while i < self.queue.len() - 1 { let prev_frame = &self.queue[i]; @@ -105,6 +100,8 @@ where i += 1; } + + self.metrics.record_frames_dropped(initial_len - self.queue.len()); } /// Loads more frames into the [FrameQueue]. @@ -114,6 +111,8 @@ where return Ok(()); } + self.metrics.record_load_frames_attempt(); + let data = match self.prev.next_data().await { Ok(data) => data, Err(e) => { @@ -130,6 +129,8 @@ where return Ok(()); }; + self.metrics.record_frames_decoded(frames.len()); + // Optimistically extend the queue with the new frames. self.queue.extend(frames); @@ -195,11 +196,12 @@ pub(crate) mod tests { use super::*; use crate::{test_utils::TestFrameQueueProvider, traits::ResetSignal}; use alloc::vec; + use alloy_primitives::Bytes; #[tokio::test] async fn test_frame_queue_reset() { let mock = TestFrameQueueProvider::new(vec![]); - let mut frame_queue = FrameQueue::new(mock, Default::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default(), PipelineMetrics::no_op()); assert!(!frame_queue.prev.reset); frame_queue.signal(ResetSignal::default().signal()).await.unwrap(); assert_eq!(frame_queue.queue.len(), 0); @@ -211,7 +213,7 @@ pub(crate) mod tests { let data = vec![Ok(Bytes::from(vec![0x00]))]; let mut mock = TestFrameQueueProvider::new(data); mock.set_origin(BlockInfo::default()); - let mut frame_queue = FrameQueue::new(mock, Default::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default(), PipelineMetrics::no_op()); assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::NotEnoughData.temp()); @@ -222,7 +224,7 @@ pub(crate) mod tests { let data = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())]; let mut mock = TestFrameQueueProvider::new(data); mock.set_origin(BlockInfo::default()); - let mut frame_queue = FrameQueue::new(mock, Default::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default(), PipelineMetrics::no_op()); assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::NotEnoughData.temp()); diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 7fc7c298c..9ee7351ab 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -2,33 +2,17 @@ use crate::{ errors::{PipelineError, PipelineErrorKind, PipelineResult}, - stages::FrameQueueProvider, + metrics::PipelineMetrics, traits::{ - ActivationSignal, AsyncIterator, DataAvailabilityProvider, OriginAdvancer, OriginProvider, - ResetSignal, Signal, SignalReceiver, + ActivationSignal, AsyncIterator, DataAvailabilityProvider, FrameQueueProvider, + L1RetrievalMetrics, L1RetrievalProvider, OriginAdvancer, OriginProvider, ResetSignal, + Signal, SignalReceiver, }, }; use alloc::boxed::Box; -use alloy_primitives::Address; use async_trait::async_trait; use op_alloy_protocol::BlockInfo; -/// Provides L1 blocks for the [L1Retrieval] stage. -/// This is the previous stage in the pipeline. -#[async_trait] -pub trait L1RetrievalProvider { - /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. - /// This function can only be called once while the stage is in progress, and will return - /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is - /// complete and the [BlockInfo] has been consumed, an [PipelineError::Eof] error is returned. - /// - /// [L1Traversal]: crate::stages::L1Traversal - async fn next_l1_block(&mut self) -> PipelineResult>; - - /// Returns the batcher [Address] from the [op_alloy_genesis::SystemConfig]. - fn batcher_addr(&self) -> Address; -} - /// The [L1Retrieval] stage of the derivation pipeline. /// /// For each L1 [BlockInfo] pulled from the [L1Traversal] stage, [L1Retrieval] fetches the @@ -49,6 +33,8 @@ where pub provider: DAP, /// The current data iterator. pub(crate) data: Option, + /// Metrics collector. + metrics: PipelineMetrics, } impl L1Retrieval @@ -60,8 +46,8 @@ where /// [DataAvailabilityProvider]. /// /// [L1Traversal]: crate::stages::L1Traversal - pub const fn new(prev: P, provider: DAP) -> Self { - Self { prev, provider, data: None } + pub const fn new(prev: P, provider: DAP, metrics: PipelineMetrics) -> Self { + Self { prev, provider, data: None, metrics } } } @@ -91,7 +77,9 @@ where .next_l1_block() .await? // SAFETY: This question mark bubbles up the Eof error. .ok_or(PipelineError::MissingL1Data.temp())?; + self.metrics.record_data_fetch_attempt(next.number); self.data = Some(self.provider.open_data(&next).await?); + self.metrics.record_data_fetch_success(next.number); } match self.data.as_mut().expect("Cannot be None").next().await { @@ -143,13 +131,13 @@ mod tests { test_utils::{TestDAP, TestIter}, }; use alloc::vec; - use alloy_primitives::Bytes; + use alloy_primitives::{Address, Bytes}; #[tokio::test] async fn test_l1_retrieval_flush_channel() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let mut retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); retrieval.data = None; @@ -162,7 +150,7 @@ mod tests { async fn test_l1_retrieval_activation_signal() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let mut retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); retrieval.data = None; @@ -181,7 +169,7 @@ mod tests { async fn test_l1_retrieval_reset_signal() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let mut retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); retrieval.data = None; @@ -200,7 +188,7 @@ mod tests { async fn test_l1_retrieval_origin() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let retrieval = L1Retrieval::new(traversal, dap); + let retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); let expected = BlockInfo::default(); assert_eq!(retrieval.origin(), Some(expected)); } @@ -210,7 +198,7 @@ mod tests { let traversal = new_populated_test_traversal(); let results = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())]; let dap = TestDAP { results, batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let mut retrieval = L1Retrieval::new(traversal, dap, PipelineMetrics::no_op()); assert_eq!(retrieval.data, None); let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); @@ -236,7 +224,12 @@ mod tests { // (traversal) is called in the retrieval stage. let traversal = new_test_traversal(vec![], vec![]); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; + let mut retrieval = L1Retrieval { + prev: traversal, + provider: dap, + data: Some(data), + metrics: PipelineMetrics::no_op(), + }; let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); assert!(retrieval.data.is_some()); @@ -252,7 +245,12 @@ mod tests { }; let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; + let mut retrieval = L1Retrieval { + prev: traversal, + provider: dap, + data: Some(data), + metrics: PipelineMetrics::no_op(), + }; let data = retrieval.next_data().await.unwrap_err(); assert_eq!(data, PipelineError::Eof.temp()); assert!(retrieval.data.is_none()); diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index f43e3cbb9..729fe4ce7 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -2,10 +2,10 @@ use crate::{ errors::{PipelineError, PipelineResult, ResetError}, - stages::L1RetrievalProvider, + metrics::PipelineMetrics, traits::{ - ActivationSignal, ChainProvider, OriginAdvancer, OriginProvider, ResetSignal, Signal, - SignalReceiver, + ActivationSignal, ChainProvider, L1RetrievalProvider, L1TraversalMetrics, OriginAdvancer, + OriginProvider, ResetSignal, Signal, SignalReceiver, }, }; use alloc::{boxed::Box, string::ToString, sync::Arc}; @@ -34,6 +34,8 @@ pub struct L1Traversal { pub system_config: SystemConfig, /// A reference to the rollup config. pub rollup_config: Arc, + /// Metrics collector. + metrics: PipelineMetrics, } #[async_trait] @@ -54,13 +56,14 @@ impl L1RetrievalProvider for L1Traversal { impl L1Traversal { /// Creates a new [L1Traversal] instance. - pub fn new(data_source: F, cfg: Arc) -> Self { + pub fn new(data_source: F, cfg: Arc, metrics: PipelineMetrics) -> Self { Self { block: Some(BlockInfo::default()), data_source, done: false, system_config: SystemConfig::default(), rollup_config: cfg, + metrics, } } } @@ -82,18 +85,23 @@ impl OriginAdvancer for L1Traversal { }; let next_l1_origin = match self.data_source.block_info_by_number(block.number + 1).await { Ok(block) => block, - Err(e) => return Err(PipelineError::Provider(e.to_string()).temp()), + Err(e) => { + return Err(PipelineError::Provider(e.to_string()).temp()); + } }; // Check block hashes for reorgs. if block.hash != next_l1_origin.parent_hash { + self.metrics.record_reorg_detected(); return Err(ResetError::ReorgDetected(block.hash, next_l1_origin.parent_hash).into()); } // Fetch receipts for the next l1 block and update the system config. let receipts = match self.data_source.receipts_by_hash(next_l1_origin.hash).await { Ok(receipts) => receipts, - Err(e) => return Err(PipelineError::Provider(e.to_string()).temp()), + Err(e) => { + return Err(PipelineError::Provider(e.to_string()).temp()); + } }; if let Err(e) = self.system_config.update_with_receipts( @@ -104,6 +112,8 @@ impl OriginAdvancer for L1Traversal { return Err(PipelineError::SystemConfigUpdate(e).crit()); } + self.metrics.record_system_config_update(); + let prev_block_holocene = self.rollup_config.is_holocene_active(block.timestamp); let next_block_holocene = self.rollup_config.is_holocene_active(next_l1_origin.timestamp); @@ -111,9 +121,12 @@ impl OriginAdvancer for L1Traversal { self.block = Some(next_l1_origin); self.done = false; + self.metrics.record_block_processed(next_l1_origin.number); + // If the prev block is not holocene, but the next is, we need to flag this // so the pipeline driver will reset the pipeline for holocene activation. if !prev_block_holocene && next_block_holocene { + self.metrics.record_holocene_activation(); return Err(ResetError::HoloceneActivation.reset()); } @@ -198,7 +211,7 @@ pub(crate) mod tests { let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default(); provider.insert_receipts(hash, vec![receipt.clone()]); } - L1Traversal::new(provider, Arc::new(rollup_config)) + L1Traversal::new(provider, Arc::new(rollup_config), PipelineMetrics::no_op()) } pub(crate) fn new_populated_test_traversal() -> L1Traversal { diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index cec0b6341..5cf71bf6e 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -19,10 +19,10 @@ mod l1_traversal; pub use l1_traversal::L1Traversal; mod l1_retrieval; -pub use l1_retrieval::{L1Retrieval, L1RetrievalProvider}; +pub use l1_retrieval::L1Retrieval; mod frame_queue; -pub use frame_queue::{FrameQueue, FrameQueueProvider}; +pub use frame_queue::FrameQueue; mod channel; pub use channel::{ diff --git a/crates/derive/src/test_utils/frame_queue.rs b/crates/derive/src/test_utils/frame_queue.rs index db23a6a65..26c71f77f 100644 --- a/crates/derive/src/test_utils/frame_queue.rs +++ b/crates/derive/src/test_utils/frame_queue.rs @@ -2,8 +2,7 @@ use crate::{ errors::{PipelineError, PipelineResult}, - stages::FrameQueueProvider, - traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + traits::{FrameQueueProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; diff --git a/crates/derive/src/test_utils/frames.rs b/crates/derive/src/test_utils/frames.rs index 9c4a8eb0a..e59142dc3 100644 --- a/crates/derive/src/test_utils/frames.rs +++ b/crates/derive/src/test_utils/frames.rs @@ -2,6 +2,7 @@ use crate::{ errors::{PipelineError, PipelineErrorKind}, + metrics::PipelineMetrics, stages::{FrameQueue, NextFrameProvider}, test_utils::TestFrameQueueProvider, traits::OriginProvider, @@ -84,7 +85,11 @@ impl FrameQueueBuilder { let config = self.config.unwrap_or_default(); let config = Arc::new(config); let err = self.expected_err.unwrap_or_else(|| PipelineError::Eof.temp()); - FrameQueueAsserter::new(FrameQueue::new(mock, config), self.expected_frames, err) + FrameQueueAsserter::new( + FrameQueue::new(mock, config, PipelineMetrics::no_op()), + self.expected_frames, + err, + ) } } diff --git a/crates/derive/src/test_utils/pipeline.rs b/crates/derive/src/test_utils/pipeline.rs index 8e721a30c..03ee9ab86 100644 --- a/crates/derive/src/test_utils/pipeline.rs +++ b/crates/derive/src/test_utils/pipeline.rs @@ -13,6 +13,7 @@ use op_alloy_rpc_types_engine::OpAttributesWithParent; // Re-export these types used internally to the test pipeline. use crate::{ errors::PipelineError, + metrics::PipelineMetrics, pipeline::{DerivationPipeline, PipelineBuilder, PipelineResult}, stages::{ AttributesQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, @@ -97,5 +98,6 @@ pub fn new_test_pipeline() -> TestPipeline { .builder(TestAttributesBuilder::default()) .chain_provider(TestChainProvider::default()) .l2_chain_provider(TestL2ChainProvider::default()) + .metrics(PipelineMetrics::no_op()) .build() } diff --git a/crates/derive/src/traits/attributes.rs b/crates/derive/src/traits/attributes.rs index 070ac724e..ba988fa4c 100644 --- a/crates/derive/src/traits/attributes.rs +++ b/crates/derive/src/traits/attributes.rs @@ -47,3 +47,13 @@ pub trait AttributesBuilder { epoch: BlockNumHash, ) -> PipelineResult; } + +/// Metrics trait for `AttributesQueue`. +pub trait AttributesQueueMetrics: Send + Sync { + /// Records the creation of a new batch of attributes. + fn record_attributes_created(&self); + /// Records the loading of a new batch of attributes. + fn record_batch_loaded(&self); + /// Records the creation of a new batch of attributes. + fn record_attributes_creation_failure(&self); +} diff --git a/crates/derive/src/traits/batch_queue.rs b/crates/derive/src/traits/batch_queue.rs new file mode 100644 index 000000000..912cd7506 --- /dev/null +++ b/crates/derive/src/traits/batch_queue.rs @@ -0,0 +1,9 @@ +/// Metrics for a `BatchQueue` stage. +pub trait BatchQueueMetrics: Send + Sync { + /// Records a batch queued count. + fn record_batches_queued(&self, count: usize); + /// Records a batch dropped. + fn record_batch_dropped(&self); + /// Records an epoch processed. + fn record_epoch_advanced(&self, epoch: u64); +} diff --git a/crates/derive/src/traits/batch_stream.rs b/crates/derive/src/traits/batch_stream.rs new file mode 100644 index 000000000..c5dc78d34 --- /dev/null +++ b/crates/derive/src/traits/batch_stream.rs @@ -0,0 +1,11 @@ +/// Metrics for a `BatchStream` stage. +pub trait BatchStreamMetrics: Send + Sync { + /// Records a batch processed. + fn record_batch_processed(&self); + /// Records a span batch accepted. + fn record_span_batch_accepted(&self); + /// Records a span batch dropped. + fn record_span_batch_dropped(&self); + /// Records the buffer size. + fn record_buffer_size(&self, size: usize); +} diff --git a/crates/derive/src/traits/channel_provider.rs b/crates/derive/src/traits/channel_provider.rs new file mode 100644 index 000000000..2ab859189 --- /dev/null +++ b/crates/derive/src/traits/channel_provider.rs @@ -0,0 +1,7 @@ +/// Metrics trait for `ChannelProvider`. +pub trait ChannelProviderMetrics: Send + Sync { + /// Records the number of data items consumed and what type of data was consumed. + fn record_stage_transition(&self, from: &str, to: &str); + /// Records the number of data items provided. + fn record_data_item_provided(&self); +} diff --git a/crates/derive/src/traits/channel_reader.rs b/crates/derive/src/traits/channel_reader.rs new file mode 100644 index 000000000..ce9c7ea50 --- /dev/null +++ b/crates/derive/src/traits/channel_reader.rs @@ -0,0 +1,7 @@ +/// Metrics for a `ChannelReader`. +pub trait ChannelReaderMetrics: Send + Sync { + /// Records the number of bytes read from the channel. + fn record_batch_read(&self); + /// Records the channel being flushed. + fn record_channel_flushed(&self); +} diff --git a/crates/derive/src/traits/frame_queue.rs b/crates/derive/src/traits/frame_queue.rs new file mode 100644 index 000000000..f480ad4d1 --- /dev/null +++ b/crates/derive/src/traits/frame_queue.rs @@ -0,0 +1,28 @@ +use crate::errors::PipelineResult; +use alloc::boxed::Box; +use alloy_primitives::Bytes; +use async_trait::async_trait; + +/// Provides data frames for the [FrameQueue] stage. +#[async_trait] +pub trait FrameQueueProvider { + /// An item that can be converted into a byte array. + type Item: Into; + + /// Retrieves the next data item from the L1 retrieval stage. + /// If there is data, it pushes it into the next stage. + /// If there is no data, it returns an error. + async fn next_data(&mut self) -> PipelineResult; +} + +/// Metrics trait for `FrameQueue`. +pub trait FrameQueueMetrics: Send + Sync { + /// Records the number of frames decoded. + fn record_frames_decoded(&self, count: usize); + /// Records the number of frames dropped. + fn record_frames_dropped(&self, count: usize); + /// Records the number of frames queued. + fn record_frames_queued(&self, count: usize); + /// Records the number of frames loaded. + fn record_load_frames_attempt(&self); +} diff --git a/crates/derive/src/traits/l1_retrieval.rs b/crates/derive/src/traits/l1_retrieval.rs new file mode 100644 index 000000000..d38663454 --- /dev/null +++ b/crates/derive/src/traits/l1_retrieval.rs @@ -0,0 +1,33 @@ +use crate::errors::{PipelineErrorKind, PipelineResult}; +use alloc::boxed::Box; +use alloy_primitives::Address; +use async_trait::async_trait; +use op_alloy_protocol::BlockInfo; + +/// Provides L1 blocks for the [L1Retrieval] stage. +/// This is the previous stage in the pipeline. +#[async_trait] +pub trait L1RetrievalProvider { + /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. + /// This function can only be called once while the stage is in progress, and will return + /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is + /// complete and the [BlockInfo] has been consumed, an [PipelineError::Eof] error is returned. + /// + /// [L1Traversal]: crate::stages::L1Traversal + async fn next_l1_block(&mut self) -> PipelineResult>; + + /// Returns the batcher [Address] from the [op_alloy_genesis::SystemConfig]. + fn batcher_addr(&self) -> Address; +} + +/// Metrics trait for `L1Retrieval`. +pub trait L1RetrievalMetrics: Send + Sync { + /// Records the number of data fetch attempts. + fn record_data_fetch_attempt(&self, block_number: u64); + /// Records successful data fetches. + fn record_data_fetch_success(&self, block_number: u64); + /// Records failed data fetches. + fn record_data_fetch_failure(&self, block_number: u64, error: &PipelineErrorKind); + /// Records the number of blocks processed. + fn record_block_processed(&self, block_number: u64); +} diff --git a/crates/derive/src/traits/l1_traversal.rs b/crates/derive/src/traits/l1_traversal.rs new file mode 100644 index 000000000..6f4001276 --- /dev/null +++ b/crates/derive/src/traits/l1_traversal.rs @@ -0,0 +1,11 @@ +/// Metrics trait for `L1Traversal`. +pub trait L1TraversalMetrics: Send + Sync { + /// Records the block number of the last processed block. + fn record_block_processed(&self, block_number: u64); + /// Records system config update. + fn record_system_config_update(&self); + /// Records reorg detection. + fn record_reorg_detected(&self); + /// Records Holocene activation. + fn record_holocene_activation(&self); +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 170d6325b..940318a48 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,13 +2,17 @@ //! pipeline. mod pipeline; -pub use pipeline::{ActivationSignal, Pipeline, ResetSignal, Signal, StepResult}; +pub use pipeline::{ + ActivationSignal, DerivationPipelineMetrics, Pipeline, ResetSignal, Signal, StepResult, +}; mod providers; pub use providers::{ChainProvider, L2ChainProvider}; mod attributes; -pub use attributes::{AttributesBuilder, AttributesProvider, NextAttributes}; +pub use attributes::{ + AttributesBuilder, AttributesProvider, AttributesQueueMetrics, NextAttributes, +}; mod data_sources; pub use data_sources::{AsyncIterator, BlobProvider, DataAvailabilityProvider}; @@ -18,3 +22,24 @@ pub use reset::ResetProvider; mod stages; pub use stages::{OriginAdvancer, OriginProvider, SignalReceiver}; + +mod l1_traversal; +pub use l1_traversal::L1TraversalMetrics; + +mod l1_retrieval; +pub use l1_retrieval::{L1RetrievalMetrics, L1RetrievalProvider}; + +mod frame_queue; +pub use frame_queue::{FrameQueueMetrics, FrameQueueProvider}; + +mod channel_provider; +pub use channel_provider::ChannelProviderMetrics; + +mod channel_reader; +pub use channel_reader::ChannelReaderMetrics; + +mod batch_stream; +pub use batch_stream::BatchStreamMetrics; + +mod batch_queue; +pub use batch_queue::BatchQueueMetrics; diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index 3b1f28d55..d261a59f8 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -101,6 +101,15 @@ pub trait Pipeline: OriginProvider + Iterator { async fn step(&mut self, cursor: L2BlockInfo) -> StepResult; } +/// Metrics trait for `DerivationPipeline`. +pub trait DerivationPipelineMetrics { + /// Records the result of a step in the pipeline. + fn record_step_result(&self, result: &StepResult); + + /// Records a signal event in the pipeline. + fn record_signal(&self, signal: &Signal); +} + #[cfg(test)] mod tests { use super::*;