@@ -396,6 +396,7 @@ where
396396 pub async fn publish_metric ( & self , kind : & str ) {
397397 let weak_rc = Arc :: downgrade ( & self . inner ) ;
398398 let kind = kind. to_string ( ) ;
399+ let receiver_id = format ! ( "{}" , rand:: random:: <u8 >( ) ) ;
399400 // Temp
400401 tokio:: spawn ( async move {
401402 let mut interval = tokio:: time:: interval ( std:: time:: Duration :: from_secs ( 5 ) ) ;
@@ -404,9 +405,10 @@ where
404405 _ = interval. tick( ) => {
405406 if let Some ( arc) = weak_rc. upgrade( ) {
406407 let guard = arc. lock( ) . await ;
407- receiver_buffer_len( kind. as_str( ) ) . set( guard. len( ) as i64 ) ;
408+ receiver_buffer_len( kind. as_str( ) , receiver_id . as_str ( ) ) . set( guard. len( ) as i64 ) ;
408409 } else {
409410 tracing:: info!( "receiver dropped" ) ;
411+ receiver_buffer_len( kind. as_str( ) , receiver_id. as_str( ) ) . set( 0 ) ;
410412 return ;
411413 }
412414 }
@@ -448,7 +450,7 @@ impl<T> tokio_stream::Stream for ReceiverStream<T> {
448450// }
449451// }
450452
451- pub ( crate ) fn receiver_buffer_len ( kind : & str ) -> prometheus:: IntGauge {
453+ pub ( crate ) fn receiver_buffer_len ( kind : & str , receiver_id : & str ) -> prometheus:: IntGauge {
452454 use once_cell:: sync:: Lazy ;
453455 use prometheus:: IntGaugeVec ;
454456 static RECEIVER_BUFFER_SIZE : Lazy < IntGaugeVec > = Lazy :: new ( || {
@@ -457,13 +459,13 @@ pub(crate) fn receiver_buffer_len(kind: &str) -> prometheus::IntGauge {
457459 "temp_xds_receiver_buffer_len" ,
458460 "receiver channel buffer length" ,
459461 } ,
460- & [ "kind" ] ,
462+ & [ "kind" , "receiver_id" ] ,
461463 crate :: metrics:: registry( ) ,
462464 }
463465 . unwrap ( )
464466 } ) ;
465467
466- RECEIVER_BUFFER_SIZE . with_label_values ( & [ kind] )
468+ RECEIVER_BUFFER_SIZE . with_label_values ( & [ kind, receiver_id ] )
467469}
468470
469471impl DeltaClientStream {
0 commit comments