diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index a07ebbe42..d9482b7f0 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -1290,20 +1290,38 @@ impl StorageNode { { event_handle.mark_as_complete(); + self.inner + .metrics + .process_certified_event_duration_milliseconds + .with_label_values(&["CheckExistanceSkip"]) + .observe(start.elapsed().as_millis() as f64); + walrus_utils::with_label!(histogram_set, metrics::STATUS_SKIPPED) .observe(start.elapsed().as_secs_f64()); return Ok(()); } + self.inner + .metrics + .process_certified_event_duration_milliseconds + .with_label_values(&["CheckExistanceIssue"]) + .observe(start.elapsed().as_millis() as f64); + // Slivers and (possibly) metadata are not stored, so initiate blob sync. self.blob_sync_handler - .start_sync(event.blob_id, event.epoch, Some(event_handle)) + .start_sync(event.blob_id, event.epoch, Some(event_handle), Some(start)) .await?; walrus_utils::with_label!(histogram_set, metrics::STATUS_QUEUED) .observe(start.elapsed().as_secs_f64()); + self.inner + .metrics + .process_certified_event_duration_milliseconds + .with_label_values(&["Queued"]) + .observe(start.elapsed().as_millis() as f64); + Ok(()) } @@ -3933,7 +3951,7 @@ mod tests { cluster.nodes[0] .storage_node .blob_sync_handler - .start_sync(*blob.blob_id(), 1, None) + .start_sync(*blob.blob_id(), 1, None, None) .await .unwrap(); diff --git a/crates/walrus-service/src/node/blob_sync.rs b/crates/walrus-service/src/node/blob_sync.rs index b0dd30ab5..6350e0efe 100644 --- a/crates/walrus-service/src/node/blob_sync.rs +++ b/crates/walrus-service/src/node/blob_sync.rs @@ -20,6 +20,7 @@ use rayon::prelude::*; use tokio::{ sync::{Notify, Semaphore}, task::{JoinHandle, JoinSet}, + time::Instant, }; use tokio_metrics::TaskMonitor; use tokio_util::sync::CancellationToken; @@ -241,15 +242,32 @@ impl BlobSyncHandler { blob_id: BlobId, certified_epoch: Epoch, event_handle: Option, + start_time: Option, ) -> Result, TypedStoreError> { let mut in_progress = self .blob_syncs_in_progress .lock() .expect("should be able to acquire lock"); + if start_time.is_some() { + self.node + .metrics + .process_certified_event_duration_milliseconds + .with_label_values(&["AfterLock"]) + .observe(start_time.unwrap().elapsed().as_millis() as f64); + } + let finish_notify = Arc::new(Notify::new()); match in_progress.entry(blob_id) { Entry::Vacant(entry) => { + if start_time.is_some() { + self.node + .metrics + .process_certified_event_duration_milliseconds + .with_label_values(&["AfterEntryLookup"]) + .observe(start_time.unwrap().elapsed().as_millis() as f64); + } + let spawned_trace = tracing::info_span!( parent: &Span::current(), "blob_sync", @@ -277,6 +295,14 @@ impl BlobSyncHandler { cancel_token.clone(), ); + if start_time.is_some() { + self.node + .metrics + .process_certified_event_duration_milliseconds + .with_label_values(&["AfterCreateTaskObj"]) + .observe(start_time.unwrap().elapsed().as_millis() as f64); + } + let notify_clone = finish_notify.clone(); let blob_sync_handler_clone = self.clone(); let permits_clone = self.permits.clone(); @@ -291,10 +317,26 @@ impl BlobSyncHandler { result })); + if start_time.is_some() { + self.node + .metrics + .process_certified_event_duration_milliseconds + .with_label_values(&["AfterSpawnTask"]) + .observe(start_time.unwrap().elapsed().as_millis() as f64); + } + entry.insert(InProgressSyncHandle { cancel_token, blob_sync_handle: Some(sync_handle), }); + + if start_time.is_some() { + self.node + .metrics + .process_certified_event_duration_milliseconds + .with_label_values(&["AfterInsertingTask"]) + .observe(start_time.unwrap().elapsed().as_millis() as f64); + } } Entry::Occupied(_) => { // A blob sync with a lower sequence number is already in progress. We can safely diff --git a/crates/walrus-service/src/node/metrics.rs b/crates/walrus-service/src/node/metrics.rs index 5b317b23a..6c77a5f8e 100644 --- a/crates/walrus-service/src/node/metrics.rs +++ b/crates/walrus-service/src/node/metrics.rs @@ -169,6 +169,12 @@ walrus_utils::metrics::define_metric_set! { #[help = "The number of ongoing blob syncs during node recovery."] node_recovery_ongoing_blob_syncs: IntGauge[], + + #[help = "Time (in milliseconds) spent processing certified events"] + process_certified_event_duration_milliseconds: HistogramVec { + labels: ["step"], + buckets: vec![0.001, 0.01, 0.1, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0], + }, } } diff --git a/crates/walrus-service/src/node/node_recovery.rs b/crates/walrus-service/src/node/node_recovery.rs index a6a77e3de..4cb132773 100644 --- a/crates/walrus-service/src/node/node_recovery.rs +++ b/crates/walrus-service/src/node/node_recovery.rs @@ -148,6 +148,7 @@ impl NodeRecoveryHandler { "certified blob should have an initial certified epoch set", ), None, + None, ) .await; match start_sync_result {