Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PipelineMetrics common scheme, no-op version #760

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions bin/client/src/kona.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use kona_common_proc::client_entry;

pub(crate) mod fault;
use fault::{fpvm_handle_register, HINT_WRITER, ORACLE_READER};
use kona_derive::metrics::PipelineMetrics;

/// The size of the LRU cache in the oracle.
const ORACLE_LRU_SIZE: usize = 1024;
Expand All @@ -42,6 +43,7 @@ fn main() -> Result<()> {
let l1_provider = OracleL1ChainProvider::new(boot.clone(), oracle.clone());
let l2_provider = OracleL2ChainProvider::new(boot.clone(), oracle.clone());
let beacon = OracleBlobProvider::new(oracle.clone());
let metrics = PipelineMetrics::no_op();

////////////////////////////////////////////////////////////////
// DERIVATION & EXECUTION //
Expand All @@ -54,6 +56,7 @@ fn main() -> Result<()> {
beacon,
l1_provider,
l2_provider.clone(),
metrics,
)
.await?;

Expand Down
4 changes: 4 additions & 0 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use core::fmt::Debug;
use kona_derive::{
attributes::StatefulAttributesBuilder,
errors::{PipelineErrorKind, ResetError},
metrics::PipelineMetrics,
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
Expand All @@ -37,6 +38,7 @@ use tracing::{error, info, warn};
pub type OraclePipeline<O, B> = DerivationPipeline<
OracleAttributesQueue<OracleDataProvider<O, B>, O>,
OracleL2ChainProvider<O>,
PipelineMetrics,
>;

/// An oracle-backed Ethereum data source.
Expand Down Expand Up @@ -121,6 +123,7 @@ where
blob_provider: B,
mut chain_provider: OracleL1ChainProvider<O>,
mut l2_chain_provider: OracleL2ChainProvider<O>,
metrics: PipelineMetrics,
) -> Result<Self> {
let cfg = Arc::new(boot_info.rollup_config.clone());

Expand Down Expand Up @@ -158,6 +161,7 @@ where
.chain_provider(chain_provider)
.builder(attributes)
.origin(l1_origin)
.metrics(metrics)
.build();

Ok(Self { l2_safe_head, l2_safe_head_header, pipeline })
Expand Down
16 changes: 10 additions & 6 deletions crates/derive-alloy/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Helper to construct a [DerivationPipeline] using online types.

use crate::{
AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback,
};
use kona_derive::{
attributes::StatefulAttributesBuilder,
metrics::PipelineMetrics,
pipeline::{DerivationPipeline, PipelineBuilder},
sources::EthereumDataSource,
stages::{
Expand All @@ -13,13 +17,12 @@ use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::BlockInfo;
use std::sync::Arc;

use crate::{
AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback,
};

/// An online derivation pipeline.
pub type OnlinePipeline =
DerivationPipeline<OnlineAttributesQueue<OnlineDataProvider>, AlloyL2ChainProvider>;
pub type OnlinePipeline = DerivationPipeline<
OnlineAttributesQueue<OnlineDataProvider>,
AlloyL2ChainProvider,
PipelineMetrics,
>;

/// An `online` Ethereum data source.
pub type OnlineDataProvider = EthereumDataSource<
Expand Down Expand Up @@ -63,6 +66,7 @@ pub fn new_online_pipeline(
.chain_provider(chain_provider)
.builder(builder)
.origin(origin)
.metrics(PipelineMetrics::no_op())
.build()
}

Expand Down
6 changes: 6 additions & 0 deletions crates/derive/.idea/.gitignore
refcell marked this conversation as resolved.
Show resolved Hide resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions crates/derive/.idea/derive.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions crates/derive/.idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/derive/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod prelude {

pub mod attributes;
pub mod errors;
pub mod metrics;
pub mod pipeline;
pub mod sources;
pub mod stages;
Expand Down
34 changes: 34 additions & 0 deletions crates/derive/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Metrics for the derivation pipeline.

mod noop;

use alloc::sync::Arc;
use core::fmt::Debug;

use crate::traits::{DerivationPipelineMetrics, StepResult};

/// Composite metrics struct containing metrics for all stages.
pub struct PipelineMetrics {
pub(crate) derivation_pipeline_metrics: Arc<dyn DerivationPipelineMetrics + Send + Sync>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the Arc here - it can be placed around the PipelineMetrics.

If we do this, we can then make a PipelineMetrics::NOOP const. We should use a generic here bound to DerivationPipelineMetrics + Send + Sync.

e.g.

impl<M> PipelineMetrics<M> where M: DerivationPipelineMetrics + Send + Sync {
    /// A no-op implementation for derivation `PipelineMetrics`.
    pub const NOOP: Self<M> = Self { 
        inner: NoopDerivationPipelineMetrics,
    };
}

Copy link
Contributor Author

@steph-rs steph-rs Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we have to use generics for each stage like

impl<M, AM, ...> PipelineMetrics<M, AM, ...>
where
M: DerivationPipelineMetrics + Send + Sync,
AM: AttributesQueueMetrics + Send + Sync,
...

I was trying to avoid this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to do that btw, but how should other metrics be implemented?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're saying... Let me think about this a little bit and get back to you. What you're currently proposing is to place the various stage metrics inside PipelineMetrics and then each stage holds a PipelineMetrics which implements any metrics trait required through the internal field for that stage? Is that right?

e.g.

struct AttributesQueue {
    metrics: PipelineMetrics,
    ...
}
PipelineMetrics {
    attributes_queue_metrics: Arc<dyn AttributesQueueMetrics + Send + Sync>,
    ...
}

impl AttributesQueueMetrics for PipelineMetrics {
   fn some_metrics_method() {
      self.attributes_queue_metrics.some_metrics_method();
   }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, exactly
but I think there's a typo in first block:

pub struct AttributesQueue<P, AB, M>
where
    M: AttributesQueueMetrics + Send + Sync,
{
    ...
    metrics: M,
}

and then we could use self.metrics.some_metrics_method()

I'm not sure about this idea, but it can help us easily add other versions as no-op, or prometheus. I thought this was a problem we wanted to solve.

Does it make sense?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I thought you're intention of placing the various metric types inside PipelineMetrics was to be able to clone PipelineMetrics (doing Arc::clone for each internal Arc<dyn ..> field) and then passing it as a concrete type to each stage. This way, you could just avoid having another generic on each stage, and instead just import the trait that is implemented on the PipelineMetrics type.

This is why in my first block above I specific PipelineMetrics explicitly. And yes like you said - the problem to solve is having different metrics. since PipelineMetrics itself has dynamic fields, you can swap in a prometheus, or no-op, or whatever individual metrics easily, without any api changes.

Let me know if this makes sense. Happy to open a draft PR into this branch to show you a diff of what I'm getting at here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @refcell!
Could you please check the fixes I just pushed and the way I work with AttributesQueueMetrics. If it's okay with you, I'll add other no-op metrics.
But since I don't really see the whole picture of what we actually need in metrics, I will ask you to define the exact methods we want. It will be much appreciated.

// todo: add more metrics here for each stage
refcell marked this conversation as resolved.
Show resolved Hide resolved
}

impl Debug for PipelineMetrics {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("PipelineMetrics").finish()
}

Check warning on line 19 in crates/derive/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/metrics/mod.rs#L17-L19

Added lines #L17 - L19 were not covered by tests
}

impl DerivationPipelineMetrics for PipelineMetrics {
fn record_step_result(&self, result: &StepResult) {
self.derivation_pipeline_metrics.record_step_result(result)
}

fn inc_reset_signals(&self) {
self.derivation_pipeline_metrics.inc_reset_signals()
}

fn inc_flush_channel_signals(&self) {
self.derivation_pipeline_metrics.inc_flush_channel_signals()
}
}
28 changes: 28 additions & 0 deletions crates/derive/src/metrics/noop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::{metrics::PipelineMetrics, pipeline::StepResult, traits::DerivationPipelineMetrics};
use alloc::sync::Arc;

impl PipelineMetrics {
/// No-op implementation for `PipelineMetrics`.
pub fn no_op() -> Self {
Self {
derivation_pipeline_metrics: Arc::new(NoopDerivationPipelineMetrics),
// todo: add more metrics here for each stage
refcell marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/// No-op implementation of `DerivationPipelineMetrics`.
#[derive(Debug)]
struct NoopDerivationPipelineMetrics;

impl DerivationPipelineMetrics for NoopDerivationPipelineMetrics {
fn record_step_result(&self, _result: &StepResult) {
// No-op
}
fn inc_reset_signals(&self) {
// No-op
}
fn inc_flush_channel_signals(&self) {
// No-op
}
}
17 changes: 14 additions & 3 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use super::{AttributesBuilder, DataAvailabilityProvider, DerivationPipeline};
use crate::{
metrics::PipelineMetrics,
stages::{
AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
Expand Down Expand Up @@ -37,6 +38,7 @@ where
builder: Option<B>,
origin: Option<BlockInfo>,
rollup_config: Option<Arc<RollupConfig>>,
metrics: Option<PipelineMetrics>,
}

impl<B, P, T, D> Default for PipelineBuilder<B, P, T, D>
Expand All @@ -54,6 +56,7 @@ where
builder: None,
origin: None,
rollup_config: None,
metrics: None,
}
}
}
Expand Down Expand Up @@ -106,14 +109,20 @@ where
self
}

/// Sets the metrics implementation for the pipeline.
pub fn metrics(mut self, metrics: PipelineMetrics) -> Self {
self.metrics = Some(metrics);
self
}

/// Builds the pipeline.
pub fn build(self) -> DerivationPipeline<AttributesQueueStage<D, P, T, B>, T> {
pub fn build(self) -> DerivationPipeline<AttributesQueueStage<D, P, T, B>, T, PipelineMetrics> {
self.into()
}
}

impl<B, P, T, D> From<PipelineBuilder<B, P, T, D>>
for DerivationPipeline<AttributesQueueStage<D, P, T, B>, T>
for DerivationPipeline<AttributesQueueStage<D, P, T, B>, T, PipelineMetrics>
where
B: AttributesBuilder + Send + Debug,
P: ChainProvider + Send + Sync + Debug,
Expand All @@ -127,7 +136,9 @@ where
let l2_chain_provider = builder.l2_chain_provider.expect("chain_provider must be set");
let dap_source = builder.dap_source.expect("dap_source must be set");
let attributes_builder = builder.builder.expect("builder must be set");
let metrics = builder.metrics.expect("metrics must be set");

// todo: add metrics to the stages
refcell marked this conversation as resolved.
Show resolved Hide resolved
// Compose the stage stack.
let mut l1_traversal = L1Traversal::new(chain_provider, Arc::clone(&rollup_config));
l1_traversal.block = Some(builder.origin.expect("origin must be set"));
Expand All @@ -143,6 +154,6 @@ where
AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder);

// Create the pipeline.
Self::new(attributes, rollup_config, l2_chain_provider)
Self::new(attributes, rollup_config, l2_chain_provider, metrics)
}
}
Loading
Loading