@@ -372,6 +372,102 @@ pub(crate) struct DeltaClientStream {
372372 req_tx : tokio:: sync:: mpsc:: Sender < DeltaDiscoveryRequest > ,
373373}
374374
375+ /// TEMP custom receiver stream to be able to monitor receiver len
376+ #[ derive( Debug ) ]
377+ pub struct ReceiverStream < T > {
378+ inner : Arc < tokio:: sync:: Mutex < tokio:: sync:: mpsc:: Receiver < T > > > ,
379+ }
380+
381+ impl < T > ReceiverStream < T > {
382+ pub fn new ( recv : tokio:: sync:: mpsc:: Receiver < T > ) -> Self {
383+ Self {
384+ inner : Arc :: new ( tokio:: sync:: Mutex :: new ( recv) ) ,
385+ }
386+ }
387+
388+ pub fn close ( & mut self ) {
389+ let mut guard = self . inner . blocking_lock ( ) ;
390+ guard. close ( ) ;
391+ }
392+ }
393+
394+ impl < T > ReceiverStream < T >
395+ where
396+ T : Send + ' static ,
397+ {
398+ pub async fn publish_metric ( & self , kind : & str ) {
399+ let weak_rc = Arc :: downgrade ( & self . inner ) ;
400+ let kind = kind. to_string ( ) ;
401+ // Temp
402+ tokio:: spawn ( async move {
403+ let mut interval = tokio:: time:: interval ( std:: time:: Duration :: from_secs ( 5 ) ) ;
404+ loop {
405+ tokio:: select! {
406+ _ = interval. tick( ) => {
407+ if let Some ( arc) = weak_rc. upgrade( ) {
408+ let guard = arc. lock( ) . await ;
409+ receiver_buffer_len( kind. as_str( ) ) . set( guard. len( ) as i64 ) ;
410+ } else {
411+ tracing:: info!( "receiver dropped" ) ;
412+ return ;
413+ }
414+ }
415+ }
416+ }
417+ } ) ;
418+ }
419+ }
420+
421+ impl < T > tokio_stream:: Stream for ReceiverStream < T > {
422+ type Item = T ;
423+
424+ fn poll_next (
425+ self : std:: pin:: Pin < & mut Self > ,
426+ cx : & mut std:: task:: Context < ' _ > ,
427+ ) -> std:: task:: Poll < Option < Self :: Item > > {
428+ match self . inner . try_lock ( ) {
429+ Ok ( mut guard) => guard. poll_recv ( cx) ,
430+ Err ( _) => std:: task:: Poll :: Pending ,
431+ }
432+ }
433+ }
434+
435+ // impl<T> AsRef<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
436+ // fn as_ref(&self) -> &tokio::sync::mpsc::Receiver<T> {
437+ // &self.inner
438+ // }
439+ // }
440+ //
441+ // impl<T> AsMut<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
442+ // fn as_mut(&mut self) -> &mut tokio::sync::mpsc::Receiver<T> {
443+ // &mut self.inner
444+ // }
445+ // }
446+
447+ // impl<T> From<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
448+ // fn from(recv: tokio::sync::mpsc::Receiver<T>) -> Self {
449+ // Self::new(recv)
450+ // }
451+ // }
452+
453+ pub ( crate ) fn receiver_buffer_len ( kind : & str ) -> prometheus:: IntGauge {
454+ use once_cell:: sync:: Lazy ;
455+ use prometheus:: IntGaugeVec ;
456+ static RECEIVER_BUFFER_SIZE : Lazy < IntGaugeVec > = Lazy :: new ( || {
457+ prometheus:: register_int_gauge_vec_with_registry! {
458+ prometheus:: opts! {
459+ "temp_xds_receiver_buffer_len" ,
460+ "receiver channel buffer length" ,
461+ } ,
462+ & [ "kind" ] ,
463+ crate :: metrics:: registry( ) ,
464+ }
465+ . unwrap ( )
466+ } ) ;
467+
468+ RECEIVER_BUFFER_SIZE . with_label_values ( & [ kind] )
469+ }
470+
375471impl DeltaClientStream {
376472 #[ inline]
377473 async fn connect (
@@ -382,6 +478,9 @@ impl DeltaClientStream {
382478 if let Ok ( ( mut client, ep) ) = MdsClient :: connect_with_backoff ( endpoints) . await {
383479 let ( dcs, requests_rx) = Self :: new ( ) ;
384480
481+ let receiver_stream = ReceiverStream :: new ( requests_rx) ;
482+ receiver_stream. publish_metric ( "mds_client" ) . await ;
483+
385484 // Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
386485 // need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
387486 // is fully rolled out
@@ -398,7 +497,7 @@ impl DeltaClientStream {
398497 . await ?;
399498
400499 if let Ok ( stream) = client
401- . subscribe_delta_resources ( tokio_stream :: wrappers :: ReceiverStream :: new ( requests_rx ) )
500+ . subscribe_delta_resources ( receiver_stream )
402501 . in_current_span ( )
403502 . await
404503 {
@@ -410,6 +509,9 @@ impl DeltaClientStream {
410509
411510 let ( dcs, requests_rx) = Self :: new ( ) ;
412511
512+ let receiver_stream = ReceiverStream :: new ( requests_rx) ;
513+ receiver_stream. publish_metric ( "ads_client" ) . await ;
514+
413515 // Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
414516 // need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
415517 // is fully rolled out
@@ -426,7 +528,7 @@ impl DeltaClientStream {
426528 . await ?;
427529
428530 let stream = client
429- . delta_aggregated_resources ( tokio_stream :: wrappers :: ReceiverStream :: new ( requests_rx ) )
531+ . delta_aggregated_resources ( receiver_stream )
430532 . in_current_span ( )
431533 . await ?;
432534 Ok ( ( dcs, stream. into_inner ( ) , ep) )
0 commit comments