diff --git a/crates/xds/src/client.rs b/crates/xds/src/client.rs index b3f0d35a1f..a3b4e40bc5 100644 --- a/crates/xds/src/client.rs +++ b/crates/xds/src/client.rs @@ -372,6 +372,104 @@ pub(crate) struct DeltaClientStream { req_tx: tokio::sync::mpsc::Sender, } +/// TEMP custom receiver stream to be able to monitor receiver len +#[derive(Debug)] +pub struct ReceiverStream { + inner: Arc>>, +} + +impl ReceiverStream { + pub fn new(recv: tokio::sync::mpsc::Receiver) -> Self { + Self { + inner: Arc::new(tokio::sync::Mutex::new(recv)), + } + } + + pub fn close(&mut self) { + let mut guard = self.inner.blocking_lock(); + guard.close(); + } +} + +impl ReceiverStream +where + T: Send + 'static, +{ + pub async fn publish_metric(&self, kind: &str) { + let weak_rc = Arc::downgrade(&self.inner); + let kind = kind.to_string(); + let receiver_id = format!("{}", rand::random::()); + // Temp + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + loop { + tokio::select! { + _ = interval.tick() => { + if let Some(arc) = weak_rc.upgrade() { + let guard = arc.lock().await; + receiver_buffer_len(kind.as_str(), receiver_id.as_str()).set(guard.len() as i64); + } else { + tracing::info!("receiver dropped"); + receiver_buffer_len(kind.as_str(), receiver_id.as_str()).set(0); + return; + } + } + } + } + }); + } +} + +impl tokio_stream::Stream for ReceiverStream { + type Item = T; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.inner.try_lock() { + Ok(mut guard) => guard.poll_recv(cx), + Err(_) => std::task::Poll::Pending, + } + } +} + +// impl AsRef> for ReceiverStream { +// fn as_ref(&self) -> &tokio::sync::mpsc::Receiver { +// &self.inner +// } +// } +// +// impl AsMut> for ReceiverStream { +// fn as_mut(&mut self) -> &mut tokio::sync::mpsc::Receiver { +// &mut self.inner +// } +// } + +// impl From> for ReceiverStream { +// fn from(recv: tokio::sync::mpsc::Receiver) -> Self { +// Self::new(recv) +// } +// } + +pub(crate) fn receiver_buffer_len(kind: &str, receiver_id: &str) -> prometheus::IntGauge { + use once_cell::sync::Lazy; + use prometheus::IntGaugeVec; + static RECEIVER_BUFFER_SIZE: Lazy = Lazy::new(|| { + prometheus::register_int_gauge_vec_with_registry! { + prometheus::opts! { + "temp_xds_receiver_buffer_len", + "receiver channel buffer length", + }, + &["kind", "receiver_id"], + crate::metrics::registry(), + } + .unwrap() + }); + + RECEIVER_BUFFER_SIZE.with_label_values(&[kind, receiver_id]) +} + impl DeltaClientStream { #[inline] async fn connect( @@ -382,6 +480,9 @@ impl DeltaClientStream { if let Ok((mut client, ep)) = MdsClient::connect_with_backoff(endpoints).await { let (dcs, requests_rx) = Self::new(); + let receiver_stream = ReceiverStream::new(requests_rx); + receiver_stream.publish_metric("mds_client").await; + // Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately // need to actually send something before the full roundtrip occurs. This can be removed once delta discovery // is fully rolled out @@ -398,7 +499,7 @@ impl DeltaClientStream { .await?; if let Ok(stream) = client - .subscribe_delta_resources(tokio_stream::wrappers::ReceiverStream::new(requests_rx)) + .subscribe_delta_resources(receiver_stream) .in_current_span() .await { @@ -410,6 +511,9 @@ impl DeltaClientStream { let (dcs, requests_rx) = Self::new(); + let receiver_stream = ReceiverStream::new(requests_rx); + receiver_stream.publish_metric("ads_client").await; + // Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately // need to actually send something before the full roundtrip occurs. This can be removed once delta discovery // is fully rolled out @@ -426,7 +530,7 @@ impl DeltaClientStream { .await?; let stream = client - .delta_aggregated_resources(tokio_stream::wrappers::ReceiverStream::new(requests_rx)) + .delta_aggregated_resources(receiver_stream) .in_current_span() .await?; Ok((dcs, stream.into_inner(), ep)) diff --git a/crates/xds/src/config.rs b/crates/xds/src/config.rs index e1b467a437..359d610901 100644 --- a/crates/xds/src/config.rs +++ b/crates/xds/src/config.rs @@ -41,6 +41,7 @@ impl LocalVersions { #[inline] pub fn get(&self, ty: &str) -> parking_lot::MutexGuard<'_, VersionMap> { + tracing::trace!("LocalVersions::get"); let g = self .versions .iter() @@ -304,6 +305,7 @@ pub fn handle_delta_discovery_responses( None }; + tracing::trace!(kind = type_url, nonce = %response.nonce, "yielding request"); yield DeltaDiscoveryRequest { type_url, response_nonce: response.nonce, diff --git a/crates/xds/src/server.rs b/crates/xds/src/server.rs index 3fcffa277d..dd0312bf64 100644 --- a/crates/xds/src/server.rs +++ b/crates/xds/src/server.rs @@ -761,6 +761,7 @@ impl AggregatedControlPlaneDiscoveryService for cs } else { let Some(cs) = client_tracker.get_state(type_url) else { + tracing::trace!(type_url, "no client state"); return Ok(None); }; @@ -773,6 +774,7 @@ impl AggregatedControlPlaneDiscoveryService for .map_err(|error| tonic::Status::internal(error.to_string()))?; if req.resources.is_empty() && req.removed.is_empty() { + tracing::trace!(type_url, "no resources and nothing removed"); return Ok(None); } @@ -897,6 +899,7 @@ impl AggregatedControlPlaneDiscoveryService for }; if client_request.type_url == "ignore-me" { + tracing::trace!("ignore-me received, continuing"); continue; } @@ -924,6 +927,7 @@ impl AggregatedControlPlaneDiscoveryService for let type_url = client_request.type_url.clone(); let Some(response) = responder(Some(client_request), &type_url, &mut client_tracker).unwrap() else { continue; }; + tracing::trace!(kind = type_url, nonce = response.nonce, "yielding response"); yield response; } _ = shutdown.changed() => { diff --git a/src/config/datacenter.rs b/src/config/datacenter.rs index a5d3501cab..6a024238c1 100644 --- a/src/config/datacenter.rs +++ b/src/config/datacenter.rs @@ -16,6 +16,7 @@ pub struct DatacenterMap { impl DatacenterMap { #[inline] pub fn insert(&self, ip: IpAddr, datacenter: Datacenter) -> Option { + tracing::trace!("DatacenterMap::insert"); let old = self.map.insert(ip, datacenter); self.version.fetch_add(1, Relaxed); old @@ -23,11 +24,13 @@ impl DatacenterMap { #[inline] pub fn len(&self) -> usize { + tracing::trace!("DatacenterMap::len"); self.map.len() } #[inline] pub fn is_empty(&self) -> bool { + tracing::trace!("DatacenterMap::is_empty"); self.map.is_empty() } @@ -38,16 +41,19 @@ impl DatacenterMap { #[inline] pub fn get(&self, key: &IpAddr) -> Option> { + tracing::trace!("DatacenterMap::get"); self.map.get(key) } #[inline] pub fn iter(&self) -> dashmap::iter::Iter<'_, IpAddr, Datacenter> { + tracing::trace!("DatacenterMap::iter"); self.map.iter() } #[inline] pub fn remove(&self, ip: IpAddr) { + tracing::trace!("DatacenterMap::remove"); let mut lock = self.removed.lock(); let mut version = 0; @@ -63,6 +69,7 @@ impl DatacenterMap { #[inline] pub fn removed(&self) -> Vec { + tracing::trace!("DatacenterMap::removed"); std::mem::take(&mut self.removed.lock()) } } diff --git a/src/net/cluster.rs b/src/net/cluster.rs index 6c12e80197..d33ffa4c43 100644 --- a/src/net/cluster.rs +++ b/src/net/cluster.rs @@ -302,6 +302,7 @@ where locality: Option, cluster: BTreeSet, ) { + tracing::trace!("ClusterMap::insert"); let _res = self.apply(remote_addr, locality, EndpointSet::new(cluster)); } @@ -311,6 +312,7 @@ where locality: Option, cluster: EndpointSet, ) -> crate::Result<()> { + tracing::trace!("ClusterMap::apply"); if let Some(raddr) = self.localities.get(&locality) { if *raddr != remote_addr { eyre::bail!( @@ -358,26 +360,31 @@ where #[inline] pub fn len(&self) -> usize { + tracing::trace!("ClusterMap::len"); self.map.len() } #[inline] pub fn is_empty(&self) -> bool { + tracing::trace!("ClusterMap::is_empty"); self.map.is_empty() } #[inline] pub fn get(&self, key: &Option) -> Option> { + tracing::trace!("ClusterMap::get"); self.map.get(key) } #[inline] pub fn insert_default(&self, endpoints: BTreeSet) { + tracing::trace!("ClusterMap::insert_default"); self.insert(None, None, endpoints); } #[inline] pub fn remove_endpoint(&self, needle: &Endpoint) -> bool { + tracing::trace!("ClusterMap::remove_endpoint"); let locality = 'l: { for mut entry in self.map.iter_mut() { let set = entry.value_mut(); @@ -403,6 +410,7 @@ where #[inline] pub fn remove_endpoint_if(&self, closure: impl Fn(&Endpoint) -> bool) -> bool { + tracing::trace!("ClusterMap::remove_endpoint_if"); let locality = 'l: { for mut entry in self.map.iter_mut() { let set = entry.value_mut(); @@ -432,6 +440,7 @@ where #[inline] pub fn iter(&self) -> dashmap::iter::Iter<'_, Option, EndpointSet, S> { + tracing::trace!("ClusterMap::iter"); self.map.iter() } @@ -442,6 +451,7 @@ where locality: Option, endpoint: Endpoint, ) -> Option { + tracing::trace!("ClusterMap::replace"); if let Some(raddr) = self.localities.get(&locality) { if *raddr != remote_addr { tracing::trace!("not replacing locality endpoints"); @@ -467,6 +477,7 @@ where #[inline] pub fn endpoints(&self) -> Vec { + tracing::trace!("ClusterMap::endpoints"); let mut endpoints = Vec::with_capacity(self.num_of_endpoints()); for set in self.map.iter() { @@ -477,6 +488,7 @@ where } pub fn nth_endpoint(&self, mut index: usize) -> Option { + tracing::trace!("ClusterMap::nth_endpoint"); for set in self.iter() { let set = &set.value().endpoints; if index < set.len() { @@ -490,6 +502,7 @@ where } pub fn filter_endpoints(&self, f: impl Fn(&Endpoint) -> bool) -> Vec { + tracing::trace!("ClusterMap::filter_endpoints"); let mut endpoints = Vec::new(); for set in self.iter() { @@ -517,6 +530,7 @@ where remote_addr: Option, locality: Locality, ) { + tracing::trace!("ClusterMap::update_unlocated_endpoints"); if let Some(raddr) = self.localities.get(&None) { if *raddr != remote_addr { tracing::trace!("not updating locality"); @@ -537,6 +551,7 @@ where #[inline] fn do_remove_locality(&self, locality: &Option) -> Option { + tracing::trace!("ClusterMap::do_remove_locality"); self.localities.remove(locality); let ret = self.map.remove(locality).map(|(_k, v)| v); @@ -549,6 +564,7 @@ where #[inline] pub fn remove_contributor(&self, remote_addr: Option) { + tracing::trace!("ClusterMap::remove_contributor"); self.localities.retain(|k, v| { let keep = *v != remote_addr; if !keep { @@ -564,6 +580,7 @@ where remote_addr: Option, locality: &Option, ) -> Option { + tracing::trace!("ClusterMap::remove_locality"); { if let Some(raddr) = self.localities.get(locality) { if *raddr != remote_addr { @@ -577,6 +594,7 @@ where } pub fn addresses_for_token(&self, token: Token, addrs: &mut Vec) { + tracing::trace!("ClusterMap::addresses_for_token"); if let Some(ma) = self.token_map.get(&token.0) { addrs.extend(ma.value().iter().cloned()); }