Skip to content

Commit 8acc622

Browse files
committed
feat(xds): add metrics
1 parent e693d1d commit 8acc622

File tree

3 files changed

+130
-12
lines changed

3 files changed

+130
-12
lines changed

crates/xds/src/client.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::{
2121
};
2222

2323
use eyre::ContextCompat;
24-
use futures::StreamExt;
24+
use futures::{StreamExt, TryFutureExt};
2525
use rand::Rng;
2626
use tonic::transport::{Endpoint, Error as TonicError, channel::Channel as TonicChannel};
2727
use tracing::Instrument;
@@ -38,6 +38,7 @@ use crate::{
3838
aggregated_discovery_service_client::AggregatedDiscoveryServiceClient,
3939
},
4040
generated::quilkin::relay::v1alpha1::aggregated_control_plane_discovery_service_client::AggregatedControlPlaneDiscoveryServiceClient,
41+
metrics,
4142
};
4243

4344
type AdsGrpcClient = AggregatedDiscoveryServiceClient<TonicChannel>;
@@ -110,7 +111,9 @@ impl ServiceClient for MdsGrpcClient {
110111
&mut self,
111112
stream: S,
112113
) -> tonic::Result<tonic::Response<tonic::Streaming<Self::Response>>> {
113-
self.stream_aggregated_resources(stream).await
114+
self.stream_aggregated_resources(stream)
115+
.inspect_err(|error| metrics::client_errors_total(&error))
116+
.await
114117
}
115118
}
116119

@@ -157,6 +160,8 @@ impl<C: ServiceClient> Client<C> {
157160
rand::rng().random_range(0..BACKOFF_MAX_JITTER.as_millis() as _),
158161
);
159162

163+
metrics::client_connect_attempt_backoff_millis(delay);
164+
160165
match error {
161166
RpcSessionError::InvalidEndpoint(error) => {
162167
tracing::error!(?error, "Error creating endpoint");
@@ -177,7 +182,7 @@ impl<C: ServiceClient> Client<C> {
177182
let mut addresses = management_servers.iter().cycle();
178183
let connect_to_server = tryhard::retry_fn(|| {
179184
let address = addresses.next();
180-
async move {
185+
let fut = async move {
181186
match address {
182187
None => Err(RpcSessionError::Receive(tonic::Status::internal(
183188
"Failed initial connection",
@@ -198,16 +203,17 @@ impl<C: ServiceClient> Client<C> {
198203
));
199204
}
200205

206+
metrics::client_connect_attempts_total(endpoint.uri());
201207
C::connect_to_endpoint(endpoint)
202-
.instrument(tracing::debug_span!(
203-
"AggregatedDiscoveryServiceClient::connect_to_endpoint"
204-
))
208+
.instrument(tracing::debug_span!("C::connect_to_endpoint"))
205209
.await
206210
.map_err(RpcSessionError::InitialConnect)
207211
.map(|client| (client, cendpoint))
208212
}
209213
}
210-
}
214+
};
215+
216+
fut.inspect_err(|error| metrics::client_errors_total(&error))
211217
})
212218
.with_config(retry_config);
213219

@@ -259,18 +265,29 @@ impl MdsClient {
259265

260266
let mut stream = control_plane.delta_aggregated_resources(stream).await?;
261267
is_healthy.store(true, Ordering::SeqCst);
268+
metrics::client_active(true);
262269

263270
while let Some(result) = stream.next().await {
264-
let response = result?;
271+
let response = match result {
272+
Ok(response) => response,
273+
Err(error) => {
274+
metrics::client_errors_total(&error);
275+
break;
276+
}
277+
};
265278
tracing::trace!("received delta discovery response");
266-
ds.send_response(response).await?;
279+
if let Err(error) = ds.send_response(response).await {
280+
metrics::client_errors_total(&error);
281+
break;
282+
}
267283
}
268284

269285
change_watcher.abort();
270286
let _unused = change_watcher.await;
271287
}
272288

273289
is_healthy.store(false, Ordering::SeqCst);
290+
metrics::client_active(false);
274291

275292
//tracing::warn!("lost connection to relay server, retrying");
276293
let new_client = MdsClient::connect_with_backoff(&self.management_servers)

crates/xds/src/metrics.rs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
use once_cell::sync::Lazy;
18-
use prometheus::{IntCounterVec, IntGaugeVec, Registry};
18+
use prometheus::{Histogram, IntCounterVec, IntGauge, IntGaugeVec, Registry};
1919

2020
pub(crate) const NODE_LABEL: &str = "node";
2121
pub(crate) const CONTROL_PLANE_LABEL: &str = "control_plane";
@@ -38,7 +38,7 @@ pub fn registry() -> &'static Registry {
3838
unsafe { REGISTRY }.expect("set_registry must be called")
3939
}
4040

41-
pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge {
41+
pub(crate) fn active_control_planes(control_plane: &str) -> IntGauge {
4242
static ACTIVE_CONTROL_PLANES: Lazy<IntGaugeVec> = Lazy::new(|| {
4343
prometheus::register_int_gauge_vec_with_registry! {
4444
prometheus::opts! {
@@ -54,6 +54,104 @@ pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge
5454
ACTIVE_CONTROL_PLANES.with_label_values(&[control_plane])
5555
}
5656

57+
pub(crate) fn client_active(active: bool) {
58+
static METRIC: Lazy<IntGauge> = Lazy::new(|| {
59+
prometheus::register_int_gauge_with_registry! {
60+
prometheus::opts! {
61+
"provider_grpc_active",
62+
"Whether the gRPC configuration provider is active or not (either 1 or 0).",
63+
},
64+
registry(),
65+
}
66+
.unwrap()
67+
});
68+
69+
METRIC.set(active as _);
70+
}
71+
72+
pub(crate) fn client_connect_attempts_total(address: &impl std::fmt::Display) {
73+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
74+
prometheus::register_int_counter_vec_with_registry! {
75+
prometheus::opts! {
76+
"provider_grpc_connect_attempts_total",
77+
"total number of attempts the gRPC provider has made to connect to `address`.",
78+
},
79+
&["address"],
80+
registry(),
81+
}
82+
.unwrap()
83+
});
84+
85+
METRIC.with_label_values(&[&address.to_string()]).inc();
86+
}
87+
88+
pub(crate) fn client_errors_total(reason: &impl std::fmt::Display) {
89+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
90+
prometheus::register_int_counter_vec_with_registry! {
91+
prometheus::opts! {
92+
"provider_grpc_errors_total",
93+
"total number of errors the gRPC provider has encountered",
94+
},
95+
&["reason"],
96+
registry(),
97+
}
98+
.unwrap()
99+
});
100+
101+
METRIC.with_label_values(&[&reason.to_string()]).inc();
102+
}
103+
104+
pub fn client_connect_attempt_backoff_millis(delay: std::time::Duration) {
105+
pub(crate) const BUCKET_START: f64 = 0.001;
106+
pub(crate) const BUCKET_FACTOR: f64 = 2.0;
107+
pub(crate) const BUCKET_COUNT: usize = 13;
108+
109+
static METRIC: Lazy<Histogram> = Lazy::new(|| {
110+
prometheus::register_histogram_with_registry! {
111+
prometheus::histogram_opts! {
112+
"provider_grpc_connect_attempt_backoff_seconds",
113+
"The backoff duration made when attempting reconnect to a gRPC provider",
114+
prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(),
115+
},
116+
registry(),
117+
}
118+
.unwrap()
119+
});
120+
121+
METRIC.observe(delay.as_secs_f64());
122+
}
123+
124+
pub(crate) fn server_active(active: bool) {
125+
static METRIC: Lazy<IntGauge> = Lazy::new(|| {
126+
prometheus::register_int_gauge_with_registry! {
127+
prometheus::opts! {
128+
"service_grpc_active",
129+
"Whether the gRPC service is active or not (either 1 or 0).",
130+
},
131+
registry(),
132+
}
133+
.unwrap()
134+
});
135+
136+
METRIC.set(active as _);
137+
}
138+
139+
pub(crate) fn server_resource_updates_total(resource: &str) {
140+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
141+
prometheus::register_int_counter_vec_with_registry! {
142+
prometheus::opts! {
143+
"service_grpc_resource_updates_total",
144+
"total number of updates to a `resource` being sent to gRPC clients.",
145+
},
146+
&["address"],
147+
registry(),
148+
}
149+
.unwrap()
150+
});
151+
152+
METRIC.with_label_values(&[resource]).inc();
153+
}
154+
57155
pub(crate) fn delta_discovery_requests(node: &str, type_url: &str) -> prometheus::IntCounter {
58156
static DELTA_DISCOVERY_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
59157
prometheus::register_int_counter_vec_with_registry! {

crates/xds/src/server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,11 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
159159

160160
let server = builder.add_service(server);
161161
tracing::info!("serving management server on port `{}`", listener.port());
162+
metrics::server_active(true);
162163
Ok(server
163164
.serve_with_incoming(listener.into_stream()?)
164-
.map_err(From::from))
165+
.map_err(From::from)
166+
.inspect_err(|_| metrics::server_active(false)))
165167
}
166168

167169
pub fn relay_server(
@@ -200,6 +202,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
200202
is_relay = self.is_relay,
201203
"pushing update"
202204
);
205+
metrics::server_resource_updates_total(resource_type);
203206
if self.tx.send(resource_type).is_err() {
204207
tracing::debug!("no client connections currently subscribed");
205208
}

0 commit comments

Comments
 (0)