Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PipelineMetrics common scheme, no-op version #760

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ monorepo/

# Environment Variables
.env

# JetBrains
**/.idea/
3 changes: 3 additions & 0 deletions bin/client/src/kona.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use kona_common_proc::client_entry;

pub(crate) mod fault;
use fault::{fpvm_handle_register, HINT_WRITER, ORACLE_READER};
use kona_derive::metrics::PipelineMetrics;

/// The size of the LRU cache in the oracle.
const ORACLE_LRU_SIZE: usize = 1024;
Expand All @@ -42,6 +43,7 @@ fn main() -> Result<()> {
let l1_provider = OracleL1ChainProvider::new(boot.clone(), oracle.clone());
let l2_provider = OracleL2ChainProvider::new(boot.clone(), oracle.clone());
let beacon = OracleBlobProvider::new(oracle.clone());
let metrics = PipelineMetrics::no_op();

////////////////////////////////////////////////////////////////
// DERIVATION & EXECUTION //
Expand All @@ -54,6 +56,7 @@ fn main() -> Result<()> {
beacon,
l1_provider,
l2_provider.clone(),
metrics,
)
.await?;

Expand Down
3 changes: 3 additions & 0 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use core::fmt::Debug;
use kona_derive::{
attributes::StatefulAttributesBuilder,
errors::{PipelineErrorKind, ResetError},
metrics::PipelineMetrics,
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
Expand Down Expand Up @@ -121,6 +122,7 @@ where
blob_provider: B,
mut chain_provider: OracleL1ChainProvider<O>,
mut l2_chain_provider: OracleL2ChainProvider<O>,
metrics: PipelineMetrics,
) -> Result<Self> {
let cfg = Arc::new(boot_info.rollup_config.clone());

Expand Down Expand Up @@ -158,6 +160,7 @@ where
.chain_provider(chain_provider)
.builder(attributes)
.origin(l1_origin)
.metrics(metrics)
.build();

Ok(Self { l2_safe_head, l2_safe_head_header, pipeline })
Expand Down
9 changes: 5 additions & 4 deletions crates/derive-alloy/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Helper to construct a [DerivationPipeline] using online types.

use crate::{
AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback,
};
use kona_derive::{
attributes::StatefulAttributesBuilder,
metrics::PipelineMetrics,
pipeline::{DerivationPipeline, PipelineBuilder},
sources::EthereumDataSource,
stages::{
Expand All @@ -13,10 +17,6 @@ use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::BlockInfo;
use std::sync::Arc;

use crate::{
AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback,
};

/// An online derivation pipeline.
pub type OnlinePipeline =
DerivationPipeline<OnlineAttributesQueue<OnlineDataProvider>, AlloyL2ChainProvider>;
Expand Down Expand Up @@ -63,6 +63,7 @@ pub fn new_online_pipeline(
.chain_provider(chain_provider)
.builder(builder)
.origin(origin)
.metrics(PipelineMetrics::no_op())
.build()
}

Expand Down
1 change: 1 addition & 0 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod prelude {

pub mod attributes;
pub mod errors;
pub mod metrics;
pub mod pipeline;
pub mod sources;
pub mod stages;
Expand Down
165 changes: 165 additions & 0 deletions crates/derive/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
//! Metrics for the derivation pipeline.

mod noop;

use crate::{
errors::PipelineErrorKind,
pipeline::Signal,
traits::{
AttributesQueueMetrics, BatchQueueMetrics, BatchStreamMetrics, ChannelProviderMetrics,
ChannelReaderMetrics, DerivationPipelineMetrics, FrameQueueMetrics, L1RetrievalMetrics,
L1TraversalMetrics, StepResult,
},
};
use alloc::sync::Arc;
use core::fmt::Debug;

/// Composite metrics struct containing metrics for all stages.
#[derive(Clone)]
pub struct PipelineMetrics {
pub(crate) derivation_pipeline_metrics: Arc<dyn DerivationPipelineMetrics + Send + Sync>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the Arc here - it can be placed around the PipelineMetrics.

If we do this, we can then make a PipelineMetrics::NOOP const. We should use a generic here bound to DerivationPipelineMetrics + Send + Sync.

e.g.

impl<M> PipelineMetrics<M> where M: DerivationPipelineMetrics + Send + Sync {
    /// A no-op implementation for derivation `PipelineMetrics`.
    pub const NOOP: Self<M> = Self { 
        inner: NoopDerivationPipelineMetrics,
    };
}

Copy link
Contributor Author

@steph-rs steph-rs Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we have to use generics for each stage like

impl<M, AM, ...> PipelineMetrics<M, AM, ...>
where
M: DerivationPipelineMetrics + Send + Sync,
AM: AttributesQueueMetrics + Send + Sync,
...

I was trying to avoid this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to do that btw, but how should other metrics be implemented?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're saying... Let me think about this a little bit and get back to you. What you're currently proposing is to place the various stage metrics inside PipelineMetrics and then each stage holds a PipelineMetrics which implements any metrics trait required through the internal field for that stage? Is that right?

e.g.

struct AttributesQueue {
    metrics: PipelineMetrics,
    ...
}
PipelineMetrics {
    attributes_queue_metrics: Arc<dyn AttributesQueueMetrics + Send + Sync>,
    ...
}

impl AttributesQueueMetrics for PipelineMetrics {
   fn some_metrics_method() {
      self.attributes_queue_metrics.some_metrics_method();
   }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, exactly
but I think there's a typo in first block:

pub struct AttributesQueue<P, AB, M>
where
    M: AttributesQueueMetrics + Send + Sync,
{
    ...
    metrics: M,
}

and then we could use self.metrics.some_metrics_method()

I'm not sure about this idea, but it can help us easily add other versions as no-op, or prometheus. I thought this was a problem we wanted to solve.

Does it make sense?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I thought you're intention of placing the various metric types inside PipelineMetrics was to be able to clone PipelineMetrics (doing Arc::clone for each internal Arc<dyn ..> field) and then passing it as a concrete type to each stage. This way, you could just avoid having another generic on each stage, and instead just import the trait that is implemented on the PipelineMetrics type.

This is why in my first block above I specific PipelineMetrics explicitly. And yes like you said - the problem to solve is having different metrics. since PipelineMetrics itself has dynamic fields, you can swap in a prometheus, or no-op, or whatever individual metrics easily, without any api changes.

Let me know if this makes sense. Happy to open a draft PR into this branch to show you a diff of what I'm getting at here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @refcell!
Could you please check the fixes I just pushed and the way I work with AttributesQueueMetrics. If it's okay with you, I'll add other no-op metrics.
But since I don't really see the whole picture of what we actually need in metrics, I will ask you to define the exact methods we want. It will be much appreciated.

pub(crate) l1_traversal_metrics: Arc<dyn L1TraversalMetrics + Send + Sync>,
pub(crate) l1_retrieval_metrics: Arc<dyn L1RetrievalMetrics + Send + Sync>,
pub(crate) frame_queue_metrics: Arc<dyn FrameQueueMetrics + Send + Sync>,
pub(crate) channel_provider_metrics: Arc<dyn ChannelProviderMetrics + Send + Sync>,
pub(crate) channel_reader_metrics: Arc<dyn ChannelReaderMetrics + Send + Sync>,
pub(crate) batch_stream_metrics: Arc<dyn BatchStreamMetrics + Send + Sync>,
pub(crate) batch_queue_metrics: Arc<dyn BatchQueueMetrics + Send + Sync>,
pub(crate) atrirbutes_queue_metrics: Arc<dyn AttributesQueueMetrics + Send + Sync>,
}

impl Debug for PipelineMetrics {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("PipelineMetrics").finish()
}
}

impl DerivationPipelineMetrics for PipelineMetrics {
fn record_step_result(&self, result: &StepResult) {
self.derivation_pipeline_metrics.record_step_result(result)
}

fn record_signal(&self, signal: &Signal) {
self.derivation_pipeline_metrics.record_signal(signal)
}
}

impl L1TraversalMetrics for PipelineMetrics {
fn record_block_processed(&self, block_number: u64) {
self.l1_traversal_metrics.record_block_processed(block_number)
}

fn record_system_config_update(&self) {
self.l1_traversal_metrics.record_system_config_update()
}

fn record_reorg_detected(&self) {
self.l1_traversal_metrics.record_reorg_detected()
}

fn record_holocene_activation(&self) {
self.l1_traversal_metrics.record_holocene_activation()
}
}

impl L1RetrievalMetrics for PipelineMetrics {
fn record_data_fetch_attempt(&self, block_number: u64) {
self.l1_retrieval_metrics.record_data_fetch_attempt(block_number)
}

fn record_data_fetch_success(&self, block_number: u64) {
self.l1_retrieval_metrics.record_data_fetch_success(block_number)
}

fn record_data_fetch_failure(&self, block_number: u64, error: &PipelineErrorKind) {
self.l1_retrieval_metrics.record_data_fetch_failure(block_number, error)
}

fn record_block_processed(&self, block_number: u64) {
self.l1_retrieval_metrics.record_block_processed(block_number)
}
}

impl FrameQueueMetrics for PipelineMetrics {
fn record_frames_decoded(&self, count: usize) {
self.frame_queue_metrics.record_frames_decoded(count)
}

fn record_frames_dropped(&self, count: usize) {
self.frame_queue_metrics.record_frames_dropped(count)
}

fn record_frames_queued(&self, count: usize) {
self.frame_queue_metrics.record_frames_queued(count)
}

fn record_load_frames_attempt(&self) {
self.frame_queue_metrics.record_load_frames_attempt()
}
}

impl ChannelProviderMetrics for PipelineMetrics {
fn record_stage_transition(&self, from: &str, to: &str) {
self.channel_provider_metrics.record_stage_transition(from, to)
}

fn record_data_item_provided(&self) {
self.channel_provider_metrics.record_data_item_provided()
}
}

impl ChannelReaderMetrics for PipelineMetrics {
fn record_batch_read(&self) {
self.channel_reader_metrics.record_batch_read()
}

fn record_channel_flushed(&self) {
self.channel_reader_metrics.record_channel_flushed()
}
}

impl BatchStreamMetrics for PipelineMetrics {
fn record_batch_processed(&self) {
self.batch_stream_metrics.record_batch_processed()
}

fn record_span_batch_accepted(&self) {
self.batch_stream_metrics.record_span_batch_accepted()
}

fn record_span_batch_dropped(&self) {
self.batch_stream_metrics.record_span_batch_dropped()
}

fn record_buffer_size(&self, size: usize) {
self.batch_stream_metrics.record_buffer_size(size)
}
}

impl BatchQueueMetrics for PipelineMetrics {
fn record_batches_queued(&self, count: usize) {
self.batch_queue_metrics.record_batches_queued(count)
}

fn record_batch_dropped(&self) {
self.batch_queue_metrics.record_batch_dropped()
}

fn record_epoch_advanced(&self, epoch: u64) {
self.batch_queue_metrics.record_epoch_advanced(epoch)
}
}

impl AttributesQueueMetrics for PipelineMetrics {
fn record_attributes_created(&self) {
self.atrirbutes_queue_metrics.record_attributes_created()
}

fn record_batch_loaded(&self) {
self.atrirbutes_queue_metrics.record_batch_loaded()
}

fn record_attributes_creation_failure(&self) {
self.atrirbutes_queue_metrics.record_attributes_creation_failure()
}
}
Loading