@@ -38,6 +38,7 @@ use crate::{
3838 aggregated_discovery_service_client:: AggregatedDiscoveryServiceClient ,
3939 } ,
4040 generated:: quilkin:: relay:: v1alpha1:: aggregated_control_plane_discovery_service_client:: AggregatedControlPlaneDiscoveryServiceClient ,
41+ metrics,
4142} ;
4243
4344type AdsGrpcClient = AggregatedDiscoveryServiceClient < TonicChannel > ;
@@ -157,6 +158,8 @@ impl<C: ServiceClient> Client<C> {
157158 rand:: rng ( ) . random_range ( 0 ..BACKOFF_MAX_JITTER . as_millis ( ) as _ ) ,
158159 ) ;
159160
161+ metrics:: client_connect_attempt_backoff_millis ( delay) ;
162+
160163 match error {
161164 RpcSessionError :: InvalidEndpoint ( error) => {
162165 tracing:: error!( ?error, "Error creating endpoint" ) ;
@@ -179,32 +182,37 @@ impl<C: ServiceClient> Client<C> {
179182 let address = addresses. next ( ) ;
180183 async move {
181184 match address {
182- None => Err ( RpcSessionError :: Receive ( tonic:: Status :: internal (
183- "Failed initial connection" ,
184- ) ) ) ,
185+ None => {
186+ let error = RpcSessionError :: Receive ( tonic:: Status :: internal (
187+ "Failed initial connection" ,
188+ ) ) ;
189+ metrics:: client_errors_total ( & error) ;
190+ Err ( error)
191+ }
185192 Some ( endpoint) => {
186193 tracing:: info!( "attempting to connect to `{}`" , endpoint. uri( ) ) ;
187194 let cendpoint = endpoint. clone ( ) ;
188195 let endpoint = endpoint. clone ( ) . connect_timeout ( CONNECTION_TIMEOUT ) ;
189196
190197 // make sure that we have everything we will need in our URI
191198 if endpoint. uri ( ) . scheme ( ) . is_none ( ) {
192- return Err ( RpcSessionError :: InvalidEndpoint (
193- "No scheme provided" . into ( ) ,
194- ) ) ;
199+ let error =
200+ RpcSessionError :: InvalidEndpoint ( "No scheme provided" . into ( ) ) ;
201+ metrics:: client_errors_total ( & error) ;
202+ return Err ( error) ;
195203 } else if endpoint. uri ( ) . host ( ) . is_none ( ) {
196- return Err ( RpcSessionError :: InvalidEndpoint (
197- "No host provided" . into ( ) ,
198- ) ) ;
204+ let error = RpcSessionError :: InvalidEndpoint ( "No host provided" . into ( ) ) ;
205+ metrics :: client_errors_total ( & error ) ;
206+ return Err ( error ) ;
199207 }
200208
209+ metrics:: client_connect_attempts_total ( endpoint. uri ( ) ) ;
201210 C :: connect_to_endpoint ( endpoint)
202- . instrument ( tracing:: debug_span!(
203- "AggregatedDiscoveryServiceClient::connect_to_endpoint"
204- ) )
211+ . instrument ( tracing:: debug_span!( "C::connect_to_endpoint" ) )
205212 . await
206213 . map_err ( RpcSessionError :: InitialConnect )
207214 . map ( |client| ( client, cendpoint) )
215+ . inspect_err ( |error| metrics:: client_errors_total ( & error) )
208216 }
209217 }
210218 }
@@ -235,6 +243,7 @@ impl MdsClient {
235243 }
236244 Err ( err) => {
237245 tracing:: error!( error = ?err, "failed to acquire aggregated delta stream" ) ;
246+ metrics:: client_errors_total ( & err) ;
238247 return Err ( self ) ;
239248 }
240249 } ;
@@ -259,18 +268,29 @@ impl MdsClient {
259268
260269 let mut stream = control_plane. delta_aggregated_resources ( stream) . await ?;
261270 is_healthy. store ( true , Ordering :: SeqCst ) ;
271+ metrics:: client_active ( true ) ;
262272
263273 while let Some ( result) = stream. next ( ) . await {
264- let response = result?;
274+ let response = match result {
275+ Ok ( response) => response,
276+ Err ( error) => {
277+ metrics:: client_errors_total ( & error) ;
278+ break ;
279+ }
280+ } ;
265281 tracing:: trace!( "received delta discovery response" ) ;
266- ds. send_response ( response) . await ?;
282+ if let Err ( error) = ds. send_response ( response) . await {
283+ metrics:: client_errors_total ( & error) ;
284+ break ;
285+ }
267286 }
268287
269288 change_watcher. abort ( ) ;
270289 let _unused = change_watcher. await ;
271290 }
272291
273292 is_healthy. store ( false , Ordering :: SeqCst ) ;
293+ metrics:: client_active ( false ) ;
274294
275295 //tracing::warn!("lost connection to relay server, retrying");
276296 let new_client = MdsClient :: connect_with_backoff ( & self . management_servers )
0 commit comments