diff --git a/crates/xds/src/client.rs b/crates/xds/src/client.rs index 5358d292f7..98c05287fa 100644 --- a/crates/xds/src/client.rs +++ b/crates/xds/src/client.rs @@ -21,7 +21,7 @@ use std::{ }; use eyre::ContextCompat; -use futures::StreamExt; +use futures::{StreamExt, TryFutureExt}; use rand::Rng; use tonic::transport::{Endpoint, Error as TonicError, channel::Channel as TonicChannel}; use tracing::Instrument; @@ -38,6 +38,7 @@ use crate::{ aggregated_discovery_service_client::AggregatedDiscoveryServiceClient, }, generated::quilkin::relay::v1alpha1::aggregated_control_plane_discovery_service_client::AggregatedControlPlaneDiscoveryServiceClient, + metrics, }; type AdsGrpcClient = AggregatedDiscoveryServiceClient; @@ -110,7 +111,9 @@ impl ServiceClient for MdsGrpcClient { &mut self, stream: S, ) -> tonic::Result>> { - self.stream_aggregated_resources(stream).await + self.stream_aggregated_resources(stream) + .inspect_err(|error| metrics::client_errors_total(&error)) + .await } } @@ -157,6 +160,8 @@ impl Client { rand::rng().random_range(0..BACKOFF_MAX_JITTER.as_millis() as _), ); + metrics::client_connect_attempt_backoff_millis(delay); + match error { RpcSessionError::InvalidEndpoint(error) => { tracing::error!(?error, "Error creating endpoint"); @@ -177,7 +182,7 @@ impl Client { let mut addresses = management_servers.iter().cycle(); let connect_to_server = tryhard::retry_fn(|| { let address = addresses.next(); - async move { + let fut = async move { match address { None => Err(RpcSessionError::Receive(tonic::Status::internal( "Failed initial connection", @@ -198,16 +203,17 @@ impl Client { )); } + metrics::client_connect_attempts_total(endpoint.uri()); C::connect_to_endpoint(endpoint) - .instrument(tracing::debug_span!( - "AggregatedDiscoveryServiceClient::connect_to_endpoint" - )) + .instrument(tracing::debug_span!("C::connect_to_endpoint")) .await .map_err(RpcSessionError::InitialConnect) .map(|client| (client, cendpoint)) } } - } + }; + + fut.inspect_err(|error| metrics::client_errors_total(&error)) }) .with_config(retry_config); @@ -259,11 +265,21 @@ impl MdsClient { let mut stream = control_plane.delta_aggregated_resources(stream).await?; is_healthy.store(true, Ordering::SeqCst); + metrics::client_active(true); while let Some(result) = stream.next().await { - let response = result?; + let response = match result { + Ok(response) => response, + Err(error) => { + metrics::client_errors_total(&error); + break; + } + }; tracing::trace!("received delta discovery response"); - ds.send_response(response).await?; + if let Err(error) = ds.send_response(response).await { + metrics::client_errors_total(&error); + break; + } } change_watcher.abort(); @@ -271,6 +287,7 @@ impl MdsClient { } is_healthy.store(false, Ordering::SeqCst); + metrics::client_active(false); //tracing::warn!("lost connection to relay server, retrying"); let new_client = MdsClient::connect_with_backoff(&self.management_servers) diff --git a/crates/xds/src/metrics.rs b/crates/xds/src/metrics.rs index 6e22457b56..ccf0fcbb8b 100644 --- a/crates/xds/src/metrics.rs +++ b/crates/xds/src/metrics.rs @@ -15,7 +15,7 @@ */ use once_cell::sync::Lazy; -use prometheus::{IntCounterVec, IntGaugeVec, Registry}; +use prometheus::{Histogram, IntCounterVec, IntGauge, IntGaugeVec, Registry}; pub(crate) const NODE_LABEL: &str = "node"; pub(crate) const CONTROL_PLANE_LABEL: &str = "control_plane"; @@ -38,7 +38,7 @@ pub fn registry() -> &'static Registry { unsafe { REGISTRY }.expect("set_registry must be called") } -pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge { +pub(crate) fn active_control_planes(control_plane: &str) -> IntGauge { static ACTIVE_CONTROL_PLANES: Lazy = Lazy::new(|| { prometheus::register_int_gauge_vec_with_registry! { prometheus::opts! { @@ -54,6 +54,104 @@ pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge ACTIVE_CONTROL_PLANES.with_label_values(&[control_plane]) } +pub(crate) fn client_active(active: bool) { + static METRIC: Lazy = Lazy::new(|| { + prometheus::register_int_gauge_with_registry! { + prometheus::opts! { + "provider_grpc_active", + "Whether the gRPC configuration provider is active or not (either 1 or 0).", + }, + registry(), + } + .unwrap() + }); + + METRIC.set(active as _); +} + +pub(crate) fn client_connect_attempts_total(address: &impl std::fmt::Display) { + static METRIC: Lazy = Lazy::new(|| { + prometheus::register_int_counter_vec_with_registry! { + prometheus::opts! { + "provider_grpc_connect_attempts_total", + "total number of attempts the gRPC provider has made to connect to `address`.", + }, + &["address"], + registry(), + } + .unwrap() + }); + + METRIC.with_label_values(&[&address.to_string()]).inc(); +} + +pub(crate) fn client_errors_total(reason: &impl std::fmt::Display) { + static METRIC: Lazy = Lazy::new(|| { + prometheus::register_int_counter_vec_with_registry! { + prometheus::opts! { + "provider_grpc_errors_total", + "total number of errors the gRPC provider has encountered", + }, + &["reason"], + registry(), + } + .unwrap() + }); + + METRIC.with_label_values(&[&reason.to_string()]).inc(); +} + +pub fn client_connect_attempt_backoff_millis(delay: std::time::Duration) { + pub(crate) const BUCKET_START: f64 = 0.001; + pub(crate) const BUCKET_FACTOR: f64 = 2.0; + pub(crate) const BUCKET_COUNT: usize = 13; + + static METRIC: Lazy = Lazy::new(|| { + prometheus::register_histogram_with_registry! { + prometheus::histogram_opts! { + "provider_grpc_connect_attempt_backoff_seconds", + "The backoff duration made when attempting reconnect to a gRPC provider", + prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(), + }, + registry(), + } + .unwrap() + }); + + METRIC.observe(delay.as_secs_f64()); +} + +pub(crate) fn server_active(active: bool) { + static METRIC: Lazy = Lazy::new(|| { + prometheus::register_int_gauge_with_registry! { + prometheus::opts! { + "service_grpc_active", + "Whether the gRPC service is active or not (either 1 or 0).", + }, + registry(), + } + .unwrap() + }); + + METRIC.set(active as _); +} + +pub(crate) fn server_resource_updates_total(resource: &str) { + static METRIC: Lazy = Lazy::new(|| { + prometheus::register_int_counter_vec_with_registry! { + prometheus::opts! { + "service_grpc_resource_updates_total", + "total number of updates to a `resource` being sent to gRPC clients.", + }, + &["address"], + registry(), + } + .unwrap() + }); + + METRIC.with_label_values(&[resource]).inc(); +} + pub(crate) fn delta_discovery_requests(node: &str, type_url: &str) -> prometheus::IntCounter { static DELTA_DISCOVERY_REQUESTS: Lazy = Lazy::new(|| { prometheus::register_int_counter_vec_with_registry! { diff --git a/crates/xds/src/server.rs b/crates/xds/src/server.rs index f91fa22117..6b97f60f7f 100644 --- a/crates/xds/src/server.rs +++ b/crates/xds/src/server.rs @@ -159,9 +159,11 @@ impl ControlPlane { let server = builder.add_service(server); tracing::info!("serving management server on port `{}`", listener.port()); + metrics::server_active(true); Ok(server .serve_with_incoming(listener.into_stream()?) - .map_err(From::from)) + .map_err(From::from) + .inspect_err(|_| metrics::server_active(false))) } pub fn relay_server( @@ -200,6 +202,7 @@ impl ControlPlane { is_relay = self.is_relay, "pushing update" ); + metrics::server_resource_updates_total(resource_type); if self.tx.send(resource_type).is_err() { tracing::debug!("no client connections currently subscribed"); }