Skip to content

Commit bb38181

Browse files
committed
feat(xds): add metrics
1 parent e693d1d commit bb38181

File tree

3 files changed

+138
-12
lines changed

3 files changed

+138
-12
lines changed

crates/xds/src/client.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use tryhard::{
3232

3333
use crate::{
3434
Result,
35+
metrics,
3536
core::Node,
3637
discovery::{
3738
DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse,
@@ -157,6 +158,8 @@ impl<C: ServiceClient> Client<C> {
157158
rand::rng().random_range(0..BACKOFF_MAX_JITTER.as_millis() as _),
158159
);
159160

161+
metrics::client_connect_attempt_backoff_millis(delay);
162+
160163
match error {
161164
RpcSessionError::InvalidEndpoint(error) => {
162165
tracing::error!(?error, "Error creating endpoint");
@@ -179,32 +182,42 @@ impl<C: ServiceClient> Client<C> {
179182
let address = addresses.next();
180183
async move {
181184
match address {
182-
None => Err(RpcSessionError::Receive(tonic::Status::internal(
185+
None => {
186+
let error = RpcSessionError::Receive(tonic::Status::internal(
183187
"Failed initial connection",
184-
))),
188+
));
189+
metrics::client_errors_total(&error);
190+
Err(error)
191+
},
185192
Some(endpoint) => {
186193
tracing::info!("attempting to connect to `{}`", endpoint.uri());
187194
let cendpoint = endpoint.clone();
188195
let endpoint = endpoint.clone().connect_timeout(CONNECTION_TIMEOUT);
189196

190197
// make sure that we have everything we will need in our URI
191198
if endpoint.uri().scheme().is_none() {
192-
return Err(RpcSessionError::InvalidEndpoint(
199+
let error = RpcSessionError::InvalidEndpoint(
193200
"No scheme provided".into(),
194-
));
201+
);
202+
metrics::client_errors_total(&error);
203+
return Err(error);
195204
} else if endpoint.uri().host().is_none() {
196-
return Err(RpcSessionError::InvalidEndpoint(
205+
let error = RpcSessionError::InvalidEndpoint(
197206
"No host provided".into(),
198-
));
207+
);
208+
metrics::client_errors_total(&error);
209+
return Err(error);
199210
}
200211

212+
metrics::client_connect_attempts_total(endpoint.uri());
201213
C::connect_to_endpoint(endpoint)
202214
.instrument(tracing::debug_span!(
203-
"AggregatedDiscoveryServiceClient::connect_to_endpoint"
215+
"C::connect_to_endpoint"
204216
))
205217
.await
206218
.map_err(RpcSessionError::InitialConnect)
207219
.map(|client| (client, cendpoint))
220+
.inspect_err(|error| metrics::client_errors_total(&error))
208221
}
209222
}
210223
}
@@ -235,6 +248,7 @@ impl MdsClient {
235248
}
236249
Err(err) => {
237250
tracing::error!(error = ?err, "failed to acquire aggregated delta stream");
251+
metrics::client_errors_total(&err);
238252
return Err(self);
239253
}
240254
};
@@ -259,18 +273,29 @@ impl MdsClient {
259273

260274
let mut stream = control_plane.delta_aggregated_resources(stream).await?;
261275
is_healthy.store(true, Ordering::SeqCst);
276+
metrics::client_active(true);
262277

263278
while let Some(result) = stream.next().await {
264-
let response = result?;
279+
let response = match result {
280+
Ok(response) => response,
281+
Err(error) => {
282+
metrics::client_errors_total(&error);
283+
break;
284+
}
285+
};
265286
tracing::trace!("received delta discovery response");
266-
ds.send_response(response).await?;
287+
if let Err(error) = ds.send_response(response).await {
288+
metrics::client_errors_total(&error);
289+
break;
290+
}
267291
}
268292

269293
change_watcher.abort();
270294
let _unused = change_watcher.await;
271295
}
272296

273297
is_healthy.store(false, Ordering::SeqCst);
298+
metrics::client_active(false);
274299

275300
//tracing::warn!("lost connection to relay server, retrying");
276301
let new_client = MdsClient::connect_with_backoff(&self.management_servers)

crates/xds/src/metrics.rs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
use once_cell::sync::Lazy;
18-
use prometheus::{IntCounterVec, IntGaugeVec, Registry};
18+
use prometheus::{IntGauge, IntCounterVec, IntGaugeVec, Histogram, Registry};
1919

2020
pub(crate) const NODE_LABEL: &str = "node";
2121
pub(crate) const CONTROL_PLANE_LABEL: &str = "control_plane";
@@ -38,7 +38,7 @@ pub fn registry() -> &'static Registry {
3838
unsafe { REGISTRY }.expect("set_registry must be called")
3939
}
4040

41-
pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge {
41+
pub(crate) fn active_control_planes(control_plane: &str) -> IntGauge {
4242
static ACTIVE_CONTROL_PLANES: Lazy<IntGaugeVec> = Lazy::new(|| {
4343
prometheus::register_int_gauge_vec_with_registry! {
4444
prometheus::opts! {
@@ -54,6 +54,104 @@ pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge
5454
ACTIVE_CONTROL_PLANES.with_label_values(&[control_plane])
5555
}
5656

57+
pub(crate) fn client_active(active: bool) {
58+
static METRIC: Lazy<IntGauge> = Lazy::new(|| {
59+
prometheus::register_int_gauge_with_registry! {
60+
prometheus::opts! {
61+
"provider_grpc_active",
62+
"Whether the gRPC configuration provider is active or not (either 1 or 0).",
63+
},
64+
registry(),
65+
}
66+
.unwrap()
67+
});
68+
69+
METRIC.set(active as _);
70+
}
71+
72+
pub(crate) fn client_connect_attempts_total(address: &impl std::fmt::Display) {
73+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
74+
prometheus::register_int_counter_vec_with_registry! {
75+
prometheus::opts! {
76+
"provider_grpc_connect_attempts_total",
77+
"total number of attempts the gRPC provider has made to connect to `address`.",
78+
},
79+
&["address"],
80+
registry(),
81+
}
82+
.unwrap()
83+
});
84+
85+
METRIC.with_label_values(&[&address.to_string()]).inc();
86+
}
87+
88+
pub(crate) fn client_errors_total(reason: &impl std::fmt::Display) {
89+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
90+
prometheus::register_int_counter_vec_with_registry! {
91+
prometheus::opts! {
92+
"provider_grpc_errors_total",
93+
"total number of errors the gRPC provider has encountered",
94+
},
95+
&["reason"],
96+
registry(),
97+
}
98+
.unwrap()
99+
});
100+
101+
METRIC.with_label_values(&[&reason.to_string()]).inc();
102+
}
103+
104+
pub fn client_connect_attempt_backoff_millis(delay: std::time::Duration) {
105+
pub(crate) const BUCKET_START: f64 = 0.001;
106+
pub(crate) const BUCKET_FACTOR: f64 = 2.0;
107+
pub(crate) const BUCKET_COUNT: usize = 13;
108+
109+
static METRIC: Lazy<Histogram> = Lazy::new(|| {
110+
prometheus::register_histogram_with_registry! {
111+
prometheus::histogram_opts! {
112+
"provider_grpc_connect_attempt_backoff_seconds",
113+
"The backoff duration made when attempting reconnect to a gRPC provider",
114+
prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(),
115+
},
116+
registry(),
117+
}
118+
.unwrap()
119+
});
120+
121+
METRIC.observe(delay.as_secs_f64());
122+
}
123+
124+
pub(crate) fn server_active(active: bool) {
125+
static METRIC: Lazy<IntGauge> = Lazy::new(|| {
126+
prometheus::register_int_gauge_with_registry! {
127+
prometheus::opts! {
128+
"service_grpc_active",
129+
"Whether the gRPC service is active or not (either 1 or 0).",
130+
},
131+
registry(),
132+
}
133+
.unwrap()
134+
});
135+
136+
METRIC.set(active as _);
137+
}
138+
139+
pub(crate) fn server_resource_updates_total(resource: &str) {
140+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
141+
prometheus::register_int_counter_vec_with_registry! {
142+
prometheus::opts! {
143+
"service_grpc_resource_updates_total",
144+
"total number of updates to a `resource` being sent to gRPC clients.",
145+
},
146+
&["address"],
147+
registry(),
148+
}
149+
.unwrap()
150+
});
151+
152+
METRIC.with_label_values(&[resource]).inc();
153+
}
154+
57155
pub(crate) fn delta_discovery_requests(node: &str, type_url: &str) -> prometheus::IntCounter {
58156
static DELTA_DISCOVERY_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
59157
prometheus::register_int_counter_vec_with_registry! {

crates/xds/src/server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,11 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
159159

160160
let server = builder.add_service(server);
161161
tracing::info!("serving management server on port `{}`", listener.port());
162+
metrics::server_active(true);
162163
Ok(server
163164
.serve_with_incoming(listener.into_stream()?)
164-
.map_err(From::from))
165+
.map_err(From::from)
166+
.inspect_err(|_| metrics::server_active(false)))
165167
}
166168

167169
pub fn relay_server(
@@ -200,6 +202,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
200202
is_relay = self.is_relay,
201203
"pushing update"
202204
);
205+
metrics::server_resource_updates_total(resource_type);
203206
if self.tx.send(resource_type).is_err() {
204207
tracing::debug!("no client connections currently subscribed");
205208
}

0 commit comments

Comments
 (0)