diff --git a/Cargo.lock b/Cargo.lock index dddd2d5..4d87f23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -521,9 +521,10 @@ dependencies = [ "hyper 1.3.1", "k8s-openapi", "kube", + "lasso", + "measured", "opentelemetry", "opentelemetry-otlp", - "prometheus", "schemars", "serde", "serde_json", @@ -648,6 +649,19 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.3.11" @@ -905,6 +919,15 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -915,6 +938,12 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -1359,6 +1388,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" +[[package]] +name = "lasso" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4644821e1c3d7a560fe13d842d13f587c07348a1a05d3a797152d41c90c56df2" +dependencies = [ + "dashmap", + "hashbrown 0.13.2", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1419,6 +1458,36 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "measured" +version = "0.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "652bc741286361c06de8cb4d89b21a6437f120c508c51713663589eeb9928ac5" +dependencies = [ + "bytes", + "crossbeam-utils", + "hashbrown 0.14.5", + "itoa", + "lasso", + "measured-derive", + "memchr", + "parking_lot", + "rustc-hash", + "ryu", +] + +[[package]] +name = "measured-derive" +version = "0.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea497f33e1e856a376c32ad916f69a0bd3c597db1f912a399f842b01a4a685d" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "memchr" version = "2.7.2" @@ -1776,21 +1845,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "prometheus" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" -dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "memchr", - "parking_lot", - "protobuf", - "thiserror", -] - [[package]] name = "prost" version = "0.11.9" @@ -1814,12 +1868,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "protobuf" -version = "2.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" - [[package]] name = "quote" version = "1.0.36" @@ -1933,6 +1981,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 8b6676c..3281ad1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,6 @@ schemars = { version = "0.8.12", features = ["chrono"] } serde = { version = "1.0.185", features = ["derive"] } serde_json = "1.0.105" serde_yaml = "0.9.25" -prometheus = "0.13.3" chrono = { version = "0.4.26", features = ["serde"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] } @@ -44,6 +43,8 @@ opentelemetry-otlp = { version = "0.13.0", features = ["tokio"], optional = true tonic = { version = "0.9", optional = true } thiserror = "1.0.47" anyhow = "1.0.75" +measured = { version = "0.0.21", features = ["lasso"] } +lasso = "0.7.2" [dev-dependencies] assert-json-diff = "2.0.2" diff --git a/src/controller.rs b/src/controller.rs index 88e8972..c021faf 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -53,14 +53,14 @@ pub struct Context { /// Diagnostics read by the web server pub diagnostics: Arc>, /// Prometheus metrics - pub metrics: Metrics, + pub metrics: Arc, } #[instrument(skip(ctx, doc), fields(trace_id))] async fn reconcile(doc: Arc, ctx: Arc) -> Result { let trace_id = telemetry::get_trace_id(); Span::current().record("trace_id", &field::display(&trace_id)); - let _timer = ctx.metrics.count_and_measure(); + let _timer = ctx.metrics.app.reconcile.count_and_measure(); ctx.diagnostics.write().await.last_event = Utc::now(); let ns = doc.namespace().unwrap(); // doc is namespace scoped let docs: Api = Api::namespaced(ctx.client.clone(), &ns); @@ -78,7 +78,7 @@ async fn reconcile(doc: Arc, ctx: Arc) -> Result { fn error_policy(doc: Arc, error: &Error, ctx: Arc) -> Action { warn!("reconcile failed: {:?}", error); - ctx.metrics.reconcile_failure(&doc, error); + ctx.metrics.app.reconcile.set_failure(&doc, error); Action::requeue(Duration::from_secs(5 * 60)) } @@ -171,15 +171,15 @@ impl Diagnostics { pub struct State { /// Diagnostics populated by the reconciler diagnostics: Arc>, - /// Metrics registry - registry: prometheus::Registry, + /// Metrics and encoder + metrics: Arc, } /// State wrapper around the controller outputs for the web server impl State { /// Metrics getter - pub fn metrics(&self) -> Vec { - self.registry.gather() + pub fn metrics(&self) -> Arc { + self.metrics.clone() } /// State getter @@ -191,7 +191,7 @@ impl State { pub fn to_context(&self, client: Client) -> Arc { Arc::new(Context { client, - metrics: Metrics::default().register(&self.registry).unwrap(), + metrics: self.metrics.clone(), diagnostics: self.diagnostics.clone(), }) } @@ -223,7 +223,7 @@ mod test { #[tokio::test] async fn documents_without_finalizer_gets_a_finalizer() { - let (testctx, fakeserver, _) = Context::test(); + let (testctx, fakeserver) = Context::test(); let doc = Document::test(); let mocksrv = fakeserver.run(Scenario::FinalizerCreation(doc.clone())); reconcile(Arc::new(doc), testctx).await.expect("reconciler"); @@ -232,7 +232,7 @@ mod test { #[tokio::test] async fn finalized_doc_causes_status_patch() { - let (testctx, fakeserver, _) = Context::test(); + let (testctx, fakeserver) = Context::test(); let doc = Document::test().finalized(); let mocksrv = fakeserver.run(Scenario::StatusPatch(doc.clone())); reconcile(Arc::new(doc), testctx).await.expect("reconciler"); @@ -241,7 +241,7 @@ mod test { #[tokio::test] async fn finalized_doc_with_hide_causes_event_and_hide_patch() { - let (testctx, fakeserver, _) = Context::test(); + let (testctx, fakeserver) = Context::test(); let doc = Document::test().finalized().needs_hide(); let scenario = Scenario::EventPublishThenStatusPatch("HideRequested".into(), doc.clone()); let mocksrv = fakeserver.run(scenario); @@ -251,7 +251,7 @@ mod test { #[tokio::test] async fn finalized_doc_with_delete_timestamp_causes_delete() { - let (testctx, fakeserver, _) = Context::test(); + let (testctx, fakeserver) = Context::test(); let doc = Document::test().finalized().needs_delete(); let mocksrv = fakeserver.run(Scenario::Cleanup("DeleteRequested".into(), doc.clone())); reconcile(Arc::new(doc), testctx).await.expect("reconciler"); @@ -260,7 +260,7 @@ mod test { #[tokio::test] async fn illegal_doc_reconcile_errors_which_bumps_failure_metric() { - let (testctx, fakeserver, _registry) = Context::test(); + let (testctx, fakeserver) = Context::test(); let doc = Arc::new(Document::illegal().finalized()); let mocksrv = fakeserver.run(Scenario::RadioSilence); let res = reconcile(doc.clone(), testctx.clone()).await; @@ -270,12 +270,8 @@ mod test { assert!(err.to_string().contains("IllegalDocument")); // calling error policy with the reconciler error should cause the correct metric to be set error_policy(doc.clone(), &err, testctx.clone()); - //dbg!("actual metrics: {}", registry.gather()); - let failures = testctx - .metrics - .failures - .with_label_values(&["illegal", "finalizererror(applyfailed(illegaldocument))"]) - .get(); + let metrics = &testctx.metrics.app.reconcile; + let failures = metrics.get_failures("illegal", "finalizererror(applyfailed(illegaldocument))"); assert_eq!(failures, 1); } diff --git a/src/fixtures.rs b/src/fixtures.rs index afe3630..9e99085 100644 --- a/src/fixtures.rs +++ b/src/fixtures.rs @@ -1,9 +1,8 @@ //! Helper methods only available for tests -use crate::{Context, Document, DocumentSpec, DocumentStatus, Metrics, Result, DOCUMENT_FINALIZER}; +use crate::{Context, Document, DocumentSpec, DocumentStatus, Result, DOCUMENT_FINALIZER}; use assert_json_diff::assert_json_include; use http::{Request, Response}; use kube::{client::Body, Client, Resource, ResourceExt}; -use prometheus::Registry; use std::sync::Arc; impl Document { @@ -208,15 +207,14 @@ impl ApiServerVerifier { impl Context { // Create a test context with a mocked kube client, locally registered metrics and default diagnostics - pub fn test() -> (Arc, ApiServerVerifier, Registry) { + pub fn test() -> (Arc, ApiServerVerifier) { let (mock_service, handle) = tower_test::mock::pair::, Response>(); let mock_client = Client::new(mock_service, "default"); - let registry = Registry::default(); let ctx = Self { client: mock_client, - metrics: Metrics::default().register(®istry).unwrap(), + metrics: Arc::default(), diagnostics: Arc::default(), }; - (Arc::new(ctx), ApiServerVerifier(handle), registry) + (Arc::new(ctx), ApiServerVerifier(handle)) } } diff --git a/src/main.rs b/src/main.rs index 65f3c62..e270d92 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,15 @@ #![allow(unused_imports, unused_variables)] use actix_web::{get, middleware, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder}; -pub use controller::{self, telemetry, State}; -use prometheus::{Encoder, TextEncoder}; +pub use controller::{self, telemetry, Metrics, State}; #[get("/metrics")] async fn metrics(c: Data, _req: HttpRequest) -> impl Responder { - let metrics = c.metrics(); - let encoder = TextEncoder::new(); - let mut buffer = vec![]; - encoder.encode(&metrics, &mut buffer).unwrap(); - HttpResponse::Ok().body(buffer) + use measured::metric::group::MetricGroup; + let Metrics { encoder, app } = &*c.metrics(); + let mut encoder = encoder.lock().await; + app.collect_group_into(&mut *encoder).unwrap(); + let bytes = encoder.finish(); + HttpResponse::Ok().body(bytes) } #[get("/health")] diff --git a/src/metrics.rs b/src/metrics.rs index 8e9223a..ad91efb 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,80 +1,69 @@ use crate::{Document, Error}; use kube::ResourceExt; -use prometheus::{histogram_opts, opts, HistogramVec, IntCounter, IntCounterVec, Registry}; -use tokio::time::Instant; +use measured::{ + metric::histogram::{HistogramTimer, Thresholds}, + text::BufferedTextEncoder, + Counter, CounterVec, Histogram, LabelGroup, MetricGroup, +}; +use tokio::sync::Mutex; -#[derive(Clone)] +/// Metrics with handler +#[derive(Default)] pub struct Metrics { - pub reconciliations: IntCounter, - pub failures: IntCounterVec, - pub reconcile_duration: HistogramVec, + pub encoder: Mutex, + pub app: AppMetrics, } -impl Default for Metrics { - fn default() -> Self { - let reconcile_duration = HistogramVec::new( - histogram_opts!( - "doc_controller_reconcile_duration_seconds", - "The duration of reconcile to complete in seconds" - ) - .buckets(vec![0.01, 0.1, 0.25, 0.5, 1., 5., 15., 60.]), - &[], - ) - .unwrap(); - let failures = IntCounterVec::new( - opts!( - "doc_controller_reconciliation_errors_total", - "reconciliation errors", - ), - &["instance", "error"], - ) - .unwrap(); - let reconciliations = - IntCounter::new("doc_controller_reconciliations_total", "reconciliations").unwrap(); - Metrics { - reconciliations, - failures, - reconcile_duration, - } - } +/// All metrics +#[derive(MetricGroup, Default)] +pub struct AppMetrics { + #[metric(namespace = "doc_ctrl_reconcile")] + pub reconcile: ReconcileMetrics, } -impl Metrics { - /// Register API metrics to start tracking them. - pub fn register(self, registry: &Registry) -> Result { - registry.register(Box::new(self.reconcile_duration.clone()))?; - registry.register(Box::new(self.failures.clone()))?; - registry.register(Box::new(self.reconciliations.clone()))?; - Ok(self) - } +/// Metrics related to the reconciler +#[derive(MetricGroup)] +#[metric(new())] +pub struct ReconcileMetrics { + pub runs: Counter, + pub failures: CounterVec, + #[metric(metadata = Thresholds::with_buckets([0.01, 0.1, 0.25, 0.5, 1., 5., 15., 60.]), rename = "duration_seconds")] + pub duration: Histogram<8>, +} - pub fn reconcile_failure(&self, doc: &Document, e: &Error) { - self.failures - .with_label_values(&[doc.name_any().as_ref(), e.metric_label().as_ref()]) - .inc() - } +#[derive(LabelGroup)] +#[label(set = ErrorLabelSet)] +pub struct ErrorLabels<'a> { + #[label(dynamic_with = lasso::ThreadedRodeo, default)] + instance: &'a str, + #[label(dynamic_with = lasso::ThreadedRodeo, default)] + error: &'a str, +} - pub fn count_and_measure(&self) -> ReconcileMeasurer { - self.reconciliations.inc(); - ReconcileMeasurer { - start: Instant::now(), - metric: self.reconcile_duration.clone(), - } +impl Default for ReconcileMetrics { + fn default() -> Self { + ReconcileMetrics::new() } } -/// Smart function duration measurer -/// -/// Relies on Drop to calculate duration and register the observation in the histogram -pub struct ReconcileMeasurer { - start: Instant, - metric: HistogramVec, -} +impl ReconcileMetrics { + pub fn set_failure(&self, doc: &Document, e: &Error) { + self.failures.inc(ErrorLabels { + instance: doc.name_any().as_ref(), + error: e.metric_label().as_ref(), + }) + } + + pub fn count_and_measure(&self) -> HistogramTimer<'_, 8> { + self.runs.inc(); + self.duration.start_timer() + } -impl Drop for ReconcileMeasurer { - fn drop(&mut self) { - #[allow(clippy::cast_precision_loss)] - let duration = self.start.elapsed().as_millis() as f64 / 1000.0; - self.metric.with_label_values(&[]).observe(duration); + #[cfg(test)] + pub fn get_failures(&self, instance: &str, error: &str) -> u64 { + let labels = ErrorLabels { instance, error }; + // awkward, but it gets the job done for tests + let metric = self.failures.get_metric(self.failures.with_labels(labels)); + metric.count.load(std::sync::atomic::Ordering::Relaxed) } }