Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions crates/xds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
};

use eyre::ContextCompat;
use futures::StreamExt;
use futures::{StreamExt, TryFutureExt};
use rand::Rng;
use tonic::transport::{Endpoint, Error as TonicError, channel::Channel as TonicChannel};
use tracing::Instrument;
Expand All @@ -38,6 +38,7 @@ use crate::{
aggregated_discovery_service_client::AggregatedDiscoveryServiceClient,
},
generated::quilkin::relay::v1alpha1::aggregated_control_plane_discovery_service_client::AggregatedControlPlaneDiscoveryServiceClient,
metrics,
};

type AdsGrpcClient = AggregatedDiscoveryServiceClient<TonicChannel>;
Expand Down Expand Up @@ -110,7 +111,9 @@ impl ServiceClient for MdsGrpcClient {
&mut self,
stream: S,
) -> tonic::Result<tonic::Response<tonic::Streaming<Self::Response>>> {
self.stream_aggregated_resources(stream).await
self.stream_aggregated_resources(stream)
.inspect_err(|error| metrics::client_errors_total(&error))
.await
}
}

Expand Down Expand Up @@ -157,6 +160,8 @@ impl<C: ServiceClient> Client<C> {
rand::rng().random_range(0..BACKOFF_MAX_JITTER.as_millis() as _),
);

metrics::client_connect_attempt_backoff_millis(delay);

match error {
RpcSessionError::InvalidEndpoint(error) => {
tracing::error!(?error, "Error creating endpoint");
Expand All @@ -177,7 +182,7 @@ impl<C: ServiceClient> Client<C> {
let mut addresses = management_servers.iter().cycle();
let connect_to_server = tryhard::retry_fn(|| {
let address = addresses.next();
async move {
let fut = async move {
match address {
None => Err(RpcSessionError::Receive(tonic::Status::internal(
"Failed initial connection",
Expand All @@ -198,16 +203,17 @@ impl<C: ServiceClient> Client<C> {
));
}

metrics::client_connect_attempts_total(endpoint.uri());
C::connect_to_endpoint(endpoint)
.instrument(tracing::debug_span!(
"AggregatedDiscoveryServiceClient::connect_to_endpoint"
))
.instrument(tracing::debug_span!("C::connect_to_endpoint"))
.await
.map_err(RpcSessionError::InitialConnect)
.map(|client| (client, cendpoint))
}
}
}
};

fut.inspect_err(|error| metrics::client_errors_total(&error))
})
.with_config(retry_config);

Expand Down Expand Up @@ -259,18 +265,29 @@ impl MdsClient {

let mut stream = control_plane.delta_aggregated_resources(stream).await?;
is_healthy.store(true, Ordering::SeqCst);
metrics::client_active(true);

while let Some(result) = stream.next().await {
let response = result?;
let response = match result {
Ok(response) => response,
Err(error) => {
metrics::client_errors_total(&error);
break;
}
};
tracing::trace!("received delta discovery response");
ds.send_response(response).await?;
if let Err(error) = ds.send_response(response).await {
metrics::client_errors_total(&error);
break;
}
}

change_watcher.abort();
let _unused = change_watcher.await;
}

is_healthy.store(false, Ordering::SeqCst);
metrics::client_active(false);

//tracing::warn!("lost connection to relay server, retrying");
let new_client = MdsClient::connect_with_backoff(&self.management_servers)
Expand Down
102 changes: 100 additions & 2 deletions crates/xds/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

use once_cell::sync::Lazy;
use prometheus::{IntCounterVec, IntGaugeVec, Registry};
use prometheus::{Histogram, IntCounterVec, IntGauge, IntGaugeVec, Registry};

pub(crate) const NODE_LABEL: &str = "node";
pub(crate) const CONTROL_PLANE_LABEL: &str = "control_plane";
Expand All @@ -38,7 +38,7 @@ pub fn registry() -> &'static Registry {
unsafe { REGISTRY }.expect("set_registry must be called")
}

pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge {
pub(crate) fn active_control_planes(control_plane: &str) -> IntGauge {
static ACTIVE_CONTROL_PLANES: Lazy<IntGaugeVec> = Lazy::new(|| {
prometheus::register_int_gauge_vec_with_registry! {
prometheus::opts! {
Expand All @@ -54,6 +54,104 @@ pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge
ACTIVE_CONTROL_PLANES.with_label_values(&[control_plane])
}

pub(crate) fn client_active(active: bool) {
static METRIC: Lazy<IntGauge> = Lazy::new(|| {
prometheus::register_int_gauge_with_registry! {
prometheus::opts! {
"provider_grpc_active",
"Whether the gRPC configuration provider is active or not (either 1 or 0).",
},
registry(),
}
.unwrap()
});

METRIC.set(active as _);
}

pub(crate) fn client_connect_attempts_total(address: &impl std::fmt::Display) {
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"provider_grpc_connect_attempts_total",
"total number of attempts the gRPC provider has made to connect to `address`.",
},
&["address"],
registry(),
}
.unwrap()
});

METRIC.with_label_values(&[&address.to_string()]).inc();
}

pub(crate) fn client_errors_total(reason: &impl std::fmt::Display) {
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"provider_grpc_errors_total",
"total number of errors the gRPC provider has encountered",
},
&["reason"],
registry(),
}
.unwrap()
});

METRIC.with_label_values(&[&reason.to_string()]).inc();
}

pub fn client_connect_attempt_backoff_millis(delay: std::time::Duration) {
pub(crate) const BUCKET_START: f64 = 0.001;
pub(crate) const BUCKET_FACTOR: f64 = 2.0;
pub(crate) const BUCKET_COUNT: usize = 13;

static METRIC: Lazy<Histogram> = Lazy::new(|| {
prometheus::register_histogram_with_registry! {
prometheus::histogram_opts! {
"provider_grpc_connect_attempt_backoff_seconds",
"The backoff duration made when attempting reconnect to a gRPC provider",
prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(),
},
registry(),
}
.unwrap()
});

METRIC.observe(delay.as_secs_f64());
}

pub(crate) fn server_active(active: bool) {
static METRIC: Lazy<IntGauge> = Lazy::new(|| {
prometheus::register_int_gauge_with_registry! {
prometheus::opts! {
"service_grpc_active",
"Whether the gRPC service is active or not (either 1 or 0).",
},
registry(),
}
.unwrap()
});

METRIC.set(active as _);
}

pub(crate) fn server_resource_updates_total(resource: &str) {
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"service_grpc_resource_updates_total",
"total number of updates to a `resource` being sent to gRPC clients.",
},
&["address"],
registry(),
}
.unwrap()
});

METRIC.with_label_values(&[resource]).inc();
}

pub(crate) fn delta_discovery_requests(node: &str, type_url: &str) -> prometheus::IntCounter {
static DELTA_DISCOVERY_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
Expand Down
5 changes: 4 additions & 1 deletion crates/xds/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ impl<C: crate::config::Configuration> ControlPlane<C> {

let server = builder.add_service(server);
tracing::info!("serving management server on port `{}`", listener.port());
metrics::server_active(true);
Ok(server
.serve_with_incoming(listener.into_stream()?)
.map_err(From::from))
.map_err(From::from)
.inspect_err(|_| metrics::server_active(false)))
}

pub fn relay_server(
Expand Down Expand Up @@ -200,6 +202,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
is_relay = self.is_relay,
"pushing update"
);
metrics::server_resource_updates_total(resource_type);
if self.tx.send(resource_type).is_err() {
tracing::debug!("no client connections currently subscribed");
}
Expand Down
Loading