Skip to content

test: wip #2164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1290,20 +1290,38 @@ impl StorageNode {
{
event_handle.mark_as_complete();

self.inner
.metrics
.process_certified_event_duration_milliseconds
.with_label_values(&["CheckExistanceSkip"])
.observe(start.elapsed().as_millis() as f64);

walrus_utils::with_label!(histogram_set, metrics::STATUS_SKIPPED)
.observe(start.elapsed().as_secs_f64());

return Ok(());
}

self.inner
.metrics
.process_certified_event_duration_milliseconds
.with_label_values(&["CheckExistanceIssue"])
.observe(start.elapsed().as_millis() as f64);

// Slivers and (possibly) metadata are not stored, so initiate blob sync.
self.blob_sync_handler
.start_sync(event.blob_id, event.epoch, Some(event_handle))
.start_sync(event.blob_id, event.epoch, Some(event_handle), Some(start))
.await?;

walrus_utils::with_label!(histogram_set, metrics::STATUS_QUEUED)
.observe(start.elapsed().as_secs_f64());

self.inner
.metrics
.process_certified_event_duration_milliseconds
.with_label_values(&["Queued"])
.observe(start.elapsed().as_millis() as f64);

Ok(())
}

Expand Down Expand Up @@ -3933,7 +3951,7 @@ mod tests {
cluster.nodes[0]
.storage_node
.blob_sync_handler
.start_sync(*blob.blob_id(), 1, None)
.start_sync(*blob.blob_id(), 1, None, None)
.await
.unwrap();

Expand Down
42 changes: 42 additions & 0 deletions crates/walrus-service/src/node/blob_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use rayon::prelude::*;
use tokio::{
sync::{Notify, Semaphore},
task::{JoinHandle, JoinSet},
time::Instant,
};
use tokio_metrics::TaskMonitor;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -241,15 +242,32 @@ impl BlobSyncHandler {
blob_id: BlobId,
certified_epoch: Epoch,
event_handle: Option<EventHandle>,
start_time: Option<Instant>,
) -> Result<Arc<Notify>, TypedStoreError> {
let mut in_progress = self
.blob_syncs_in_progress
.lock()
.expect("should be able to acquire lock");

if start_time.is_some() {
self.node
.metrics
.process_certified_event_duration_milliseconds
.with_label_values(&["AfterLock"])
.observe(start_time.unwrap().elapsed().as_millis() as f64);
}

let finish_notify = Arc::new(Notify::new());
match in_progress.entry(blob_id) {
Entry::Vacant(entry) => {
if start_time.is_some() {
self.node
.metrics
.process_certified_event_duration_milliseconds
.with_label_values(&["AfterEntryLookup"])
.observe(start_time.unwrap().elapsed().as_millis() as f64);
}

let spawned_trace = tracing::info_span!(
parent: &Span::current(),
"blob_sync",
Expand Down Expand Up @@ -277,6 +295,14 @@ impl BlobSyncHandler {
cancel_token.clone(),
);

if start_time.is_some() {
self.node
.metrics
.process_certified_event_duration_milliseconds
.with_label_values(&["AfterCreateTaskObj"])
.observe(start_time.unwrap().elapsed().as_millis() as f64);
}

let notify_clone = finish_notify.clone();
let blob_sync_handler_clone = self.clone();
let permits_clone = self.permits.clone();
Expand All @@ -291,10 +317,26 @@ impl BlobSyncHandler {
result
}));

if start_time.is_some() {
self.node
.metrics
.process_certified_event_duration_milliseconds
.with_label_values(&["AfterSpawnTask"])
.observe(start_time.unwrap().elapsed().as_millis() as f64);
}

entry.insert(InProgressSyncHandle {
cancel_token,
blob_sync_handle: Some(sync_handle),
});

if start_time.is_some() {
self.node
.metrics
.process_certified_event_duration_milliseconds
.with_label_values(&["AfterInsertingTask"])
.observe(start_time.unwrap().elapsed().as_millis() as f64);
}
}
Entry::Occupied(_) => {
// A blob sync with a lower sequence number is already in progress. We can safely
Expand Down
6 changes: 6 additions & 0 deletions crates/walrus-service/src/node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ walrus_utils::metrics::define_metric_set! {

#[help = "The number of ongoing blob syncs during node recovery."]
node_recovery_ongoing_blob_syncs: IntGauge[],

#[help = "Time (in milliseconds) spent processing certified events"]
process_certified_event_duration_milliseconds: HistogramVec {
labels: ["step"],
buckets: vec![0.001, 0.01, 0.1, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0],
},
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/walrus-service/src/node/node_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl NodeRecoveryHandler {
"certified blob should have an initial certified epoch set",
),
None,
None,
)
.await;
match start_sync_result {
Expand Down
Loading