@@ -398,6 +398,7 @@ where
398398 pub async fn publish_metric ( & self , kind : & str ) {
399399 let weak_rc = Arc :: downgrade ( & self . inner ) ;
400400 let kind = kind. to_string ( ) ;
401+ let receiver_id = format ! ( "{}" , rand:: random:: <u8 >( ) ) ;
401402 // Temp
402403 tokio:: spawn ( async move {
403404 let mut interval = tokio:: time:: interval ( std:: time:: Duration :: from_secs ( 5 ) ) ;
@@ -406,9 +407,10 @@ where
406407 _ = interval. tick( ) => {
407408 if let Some ( arc) = weak_rc. upgrade( ) {
408409 let guard = arc. lock( ) . await ;
409- receiver_buffer_len( kind. as_str( ) ) . set( guard. len( ) as i64 ) ;
410+ receiver_buffer_len( kind. as_str( ) , receiver_id . as_str ( ) ) . set( guard. len( ) as i64 ) ;
410411 } else {
411412 tracing:: info!( "receiver dropped" ) ;
413+ receiver_buffer_len( kind. as_str( ) , receiver_id. as_str( ) ) . set( 0 ) ;
412414 return ;
413415 }
414416 }
@@ -450,7 +452,7 @@ impl<T> tokio_stream::Stream for ReceiverStream<T> {
450452// }
451453// }
452454
453- pub ( crate ) fn receiver_buffer_len ( kind : & str ) -> prometheus:: IntGauge {
455+ pub ( crate ) fn receiver_buffer_len ( kind : & str , receiver_id : & str ) -> prometheus:: IntGauge {
454456 use once_cell:: sync:: Lazy ;
455457 use prometheus:: IntGaugeVec ;
456458 static RECEIVER_BUFFER_SIZE : Lazy < IntGaugeVec > = Lazy :: new ( || {
@@ -459,13 +461,13 @@ pub(crate) fn receiver_buffer_len(kind: &str) -> prometheus::IntGauge {
459461 "temp_xds_receiver_buffer_len" ,
460462 "receiver channel buffer length" ,
461463 } ,
462- & [ "kind" ] ,
464+ & [ "kind" , "receiver_id" ] ,
463465 crate :: metrics:: registry( ) ,
464466 }
465467 . unwrap ( )
466468 } ) ;
467469
468- RECEIVER_BUFFER_SIZE . with_label_values ( & [ kind] )
470+ RECEIVER_BUFFER_SIZE . with_label_values ( & [ kind, receiver_id ] )
469471}
470472
471473impl DeltaClientStream {
0 commit comments