Skip to content

Commit 4cc6fc1

Browse files
committed
feat(xds): add metrics
1 parent e693d1d commit 4cc6fc1

File tree

3 files changed

+135
-20
lines changed

3 files changed

+135
-20
lines changed

crates/xds/src/client.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::{
3838
aggregated_discovery_service_client::AggregatedDiscoveryServiceClient,
3939
},
4040
generated::quilkin::relay::v1alpha1::aggregated_control_plane_discovery_service_client::AggregatedControlPlaneDiscoveryServiceClient,
41+
metrics,
4142
};
4243

4344
type AdsGrpcClient = AggregatedDiscoveryServiceClient<TonicChannel>;
@@ -110,7 +111,7 @@ impl ServiceClient for MdsGrpcClient {
110111
&mut self,
111112
stream: S,
112113
) -> tonic::Result<tonic::Response<tonic::Streaming<Self::Response>>> {
113-
self.stream_aggregated_resources(stream).await
114+
self.stream_aggregated_resources(stream).inspect_err(|error| metrics::errors_total(&error)).await
114115
}
115116
}
116117

@@ -157,6 +158,8 @@ impl<C: ServiceClient> Client<C> {
157158
rand::rng().random_range(0..BACKOFF_MAX_JITTER.as_millis() as _),
158159
);
159160

161+
metrics::client_connect_attempt_backoff_millis(delay);
162+
160163
match error {
161164
RpcSessionError::InvalidEndpoint(error) => {
162165
tracing::error!(?error, "Error creating endpoint");
@@ -177,37 +180,37 @@ impl<C: ServiceClient> Client<C> {
177180
let mut addresses = management_servers.iter().cycle();
178181
let connect_to_server = tryhard::retry_fn(|| {
179182
let address = addresses.next();
180-
async move {
183+
let fut = async move {
181184
match address {
182-
None => Err(RpcSessionError::Receive(tonic::Status::internal(
183-
"Failed initial connection",
184-
))),
185+
None => {
186+
let error = RpcSessionError::Receive(tonic::Status::internal(
187+
"Failed initial connection",
188+
));
189+
Err(error)
190+
}
185191
Some(endpoint) => {
186192
tracing::info!("attempting to connect to `{}`", endpoint.uri());
187193
let cendpoint = endpoint.clone();
188194
let endpoint = endpoint.clone().connect_timeout(CONNECTION_TIMEOUT);
189195

190196
// make sure that we have everything we will need in our URI
191197
if endpoint.uri().scheme().is_none() {
192-
return Err(RpcSessionError::InvalidEndpoint(
193-
"No scheme provided".into(),
194-
));
198+
return Err(RpcSessionError::InvalidEndpoint("No scheme provided".into()));
195199
} else if endpoint.uri().host().is_none() {
196-
return Err(RpcSessionError::InvalidEndpoint(
197-
"No host provided".into(),
198-
));
200+
return Err(RpcSessionError::InvalidEndpoint("No host provided".into()));
199201
}
200202

203+
metrics::client_connect_attempts_total(endpoint.uri());
201204
C::connect_to_endpoint(endpoint)
202-
.instrument(tracing::debug_span!(
203-
"AggregatedDiscoveryServiceClient::connect_to_endpoint"
204-
))
205+
.instrument(tracing::debug_span!("C::connect_to_endpoint"))
205206
.await
206207
.map_err(RpcSessionError::InitialConnect)
207208
.map(|client| (client, cendpoint))
208209
}
209210
}
210-
}
211+
};
212+
213+
fut.inspect_err(|error| metrics::client_errors_total(&error))
211214
})
212215
.with_config(retry_config);
213216

@@ -259,18 +262,29 @@ impl MdsClient {
259262

260263
let mut stream = control_plane.delta_aggregated_resources(stream).await?;
261264
is_healthy.store(true, Ordering::SeqCst);
265+
metrics::client_active(true);
262266

263267
while let Some(result) = stream.next().await {
264-
let response = result?;
268+
let response = match result {
269+
Ok(response) => response,
270+
Err(error) => {
271+
metrics::client_errors_total(&error);
272+
break;
273+
}
274+
};
265275
tracing::trace!("received delta discovery response");
266-
ds.send_response(response).await?;
276+
if let Err(error) = ds.send_response(response).await {
277+
metrics::client_errors_total(&error);
278+
break;
279+
}
267280
}
268281

269282
change_watcher.abort();
270283
let _unused = change_watcher.await;
271284
}
272285

273286
is_healthy.store(false, Ordering::SeqCst);
287+
metrics::client_active(false);
274288

275289
//tracing::warn!("lost connection to relay server, retrying");
276290
let new_client = MdsClient::connect_with_backoff(&self.management_servers)

crates/xds/src/metrics.rs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
use once_cell::sync::Lazy;
18-
use prometheus::{IntCounterVec, IntGaugeVec, Registry};
18+
use prometheus::{Histogram, IntCounterVec, IntGauge, IntGaugeVec, Registry};
1919

2020
pub(crate) const NODE_LABEL: &str = "node";
2121
pub(crate) const CONTROL_PLANE_LABEL: &str = "control_plane";
@@ -38,7 +38,7 @@ pub fn registry() -> &'static Registry {
3838
unsafe { REGISTRY }.expect("set_registry must be called")
3939
}
4040

41-
pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge {
41+
pub(crate) fn active_control_planes(control_plane: &str) -> IntGauge {
4242
static ACTIVE_CONTROL_PLANES: Lazy<IntGaugeVec> = Lazy::new(|| {
4343
prometheus::register_int_gauge_vec_with_registry! {
4444
prometheus::opts! {
@@ -54,6 +54,104 @@ pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge
5454
ACTIVE_CONTROL_PLANES.with_label_values(&[control_plane])
5555
}
5656

57+
pub(crate) fn client_active(active: bool) {
58+
static METRIC: Lazy<IntGauge> = Lazy::new(|| {
59+
prometheus::register_int_gauge_with_registry! {
60+
prometheus::opts! {
61+
"provider_grpc_active",
62+
"Whether the gRPC configuration provider is active or not (either 1 or 0).",
63+
},
64+
registry(),
65+
}
66+
.unwrap()
67+
});
68+
69+
METRIC.set(active as _);
70+
}
71+
72+
pub(crate) fn client_connect_attempts_total(address: &impl std::fmt::Display) {
73+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
74+
prometheus::register_int_counter_vec_with_registry! {
75+
prometheus::opts! {
76+
"provider_grpc_connect_attempts_total",
77+
"total number of attempts the gRPC provider has made to connect to `address`.",
78+
},
79+
&["address"],
80+
registry(),
81+
}
82+
.unwrap()
83+
});
84+
85+
METRIC.with_label_values(&[&address.to_string()]).inc();
86+
}
87+
88+
pub(crate) fn client_errors_total(reason: &impl std::fmt::Display) {
89+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
90+
prometheus::register_int_counter_vec_with_registry! {
91+
prometheus::opts! {
92+
"provider_grpc_errors_total",
93+
"total number of errors the gRPC provider has encountered",
94+
},
95+
&["reason"],
96+
registry(),
97+
}
98+
.unwrap()
99+
});
100+
101+
METRIC.with_label_values(&[&reason.to_string()]).inc();
102+
}
103+
104+
pub fn client_connect_attempt_backoff_millis(delay: std::time::Duration) {
105+
pub(crate) const BUCKET_START: f64 = 0.001;
106+
pub(crate) const BUCKET_FACTOR: f64 = 2.0;
107+
pub(crate) const BUCKET_COUNT: usize = 13;
108+
109+
static METRIC: Lazy<Histogram> = Lazy::new(|| {
110+
prometheus::register_histogram_with_registry! {
111+
prometheus::histogram_opts! {
112+
"provider_grpc_connect_attempt_backoff_seconds",
113+
"The backoff duration made when attempting reconnect to a gRPC provider",
114+
prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(),
115+
},
116+
registry(),
117+
}
118+
.unwrap()
119+
});
120+
121+
METRIC.observe(delay.as_secs_f64());
122+
}
123+
124+
pub(crate) fn server_active(active: bool) {
125+
static METRIC: Lazy<IntGauge> = Lazy::new(|| {
126+
prometheus::register_int_gauge_with_registry! {
127+
prometheus::opts! {
128+
"service_grpc_active",
129+
"Whether the gRPC service is active or not (either 1 or 0).",
130+
},
131+
registry(),
132+
}
133+
.unwrap()
134+
});
135+
136+
METRIC.set(active as _);
137+
}
138+
139+
pub(crate) fn server_resource_updates_total(resource: &str) {
140+
static METRIC: Lazy<IntCounterVec> = Lazy::new(|| {
141+
prometheus::register_int_counter_vec_with_registry! {
142+
prometheus::opts! {
143+
"service_grpc_resource_updates_total",
144+
"total number of updates to a `resource` being sent to gRPC clients.",
145+
},
146+
&["address"],
147+
registry(),
148+
}
149+
.unwrap()
150+
});
151+
152+
METRIC.with_label_values(&[resource]).inc();
153+
}
154+
57155
pub(crate) fn delta_discovery_requests(node: &str, type_url: &str) -> prometheus::IntCounter {
58156
static DELTA_DISCOVERY_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
59157
prometheus::register_int_counter_vec_with_registry! {

crates/xds/src/server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,11 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
159159

160160
let server = builder.add_service(server);
161161
tracing::info!("serving management server on port `{}`", listener.port());
162+
metrics::server_active(true);
162163
Ok(server
163164
.serve_with_incoming(listener.into_stream()?)
164-
.map_err(From::from))
165+
.map_err(From::from)
166+
.inspect_err(|_| metrics::server_active(false)))
165167
}
166168

167169
pub fn relay_server(
@@ -200,6 +202,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
200202
is_relay = self.is_relay,
201203
"pushing update"
202204
);
205+
metrics::server_resource_updates_total(resource_type);
203206
if self.tx.send(resource_type).is_err() {
204207
tracing::debug!("no client connections currently subscribed");
205208
}

0 commit comments

Comments
 (0)