Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 106 additions & 2 deletions crates/xds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,104 @@ pub(crate) struct DeltaClientStream {
req_tx: tokio::sync::mpsc::Sender<DeltaDiscoveryRequest>,
}

/// TEMP custom receiver stream to be able to monitor receiver len
#[derive(Debug)]
pub struct ReceiverStream<T> {
inner: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<T>>>,
}

impl<T> ReceiverStream<T> {
pub fn new(recv: tokio::sync::mpsc::Receiver<T>) -> Self {
Self {
inner: Arc::new(tokio::sync::Mutex::new(recv)),
}
}

pub fn close(&mut self) {
let mut guard = self.inner.blocking_lock();
guard.close();
}
}

impl<T> ReceiverStream<T>
where
T: Send + 'static,
{
pub async fn publish_metric(&self, kind: &str) {
let weak_rc = Arc::downgrade(&self.inner);
let kind = kind.to_string();
let receiver_id = format!("{}", rand::random::<u8>());
// Temp
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
tokio::select! {
_ = interval.tick() => {
if let Some(arc) = weak_rc.upgrade() {
let guard = arc.lock().await;
receiver_buffer_len(kind.as_str(), receiver_id.as_str()).set(guard.len() as i64);
} else {
tracing::info!("receiver dropped");
receiver_buffer_len(kind.as_str(), receiver_id.as_str()).set(0);
return;
}
}
}
}
});
}
}

impl<T> tokio_stream::Stream for ReceiverStream<T> {
type Item = T;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.inner.try_lock() {
Ok(mut guard) => guard.poll_recv(cx),
Err(_) => std::task::Poll::Pending,
}
}
}

// impl<T> AsRef<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
// fn as_ref(&self) -> &tokio::sync::mpsc::Receiver<T> {
// &self.inner
// }
// }
//
// impl<T> AsMut<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
// fn as_mut(&mut self) -> &mut tokio::sync::mpsc::Receiver<T> {
// &mut self.inner
// }
// }

// impl<T> From<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
// fn from(recv: tokio::sync::mpsc::Receiver<T>) -> Self {
// Self::new(recv)
// }
// }

pub(crate) fn receiver_buffer_len(kind: &str, receiver_id: &str) -> prometheus::IntGauge {
use once_cell::sync::Lazy;
use prometheus::IntGaugeVec;
static RECEIVER_BUFFER_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
prometheus::register_int_gauge_vec_with_registry! {
prometheus::opts! {
"temp_xds_receiver_buffer_len",
"receiver channel buffer length",
},
&["kind", "receiver_id"],
crate::metrics::registry(),
}
.unwrap()
});

RECEIVER_BUFFER_SIZE.with_label_values(&[kind, receiver_id])
}

impl DeltaClientStream {
#[inline]
async fn connect(
Expand All @@ -382,6 +480,9 @@ impl DeltaClientStream {
if let Ok((mut client, ep)) = MdsClient::connect_with_backoff(endpoints).await {
let (dcs, requests_rx) = Self::new();

let receiver_stream = ReceiverStream::new(requests_rx);
receiver_stream.publish_metric("mds_client").await;

// Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
// need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
// is fully rolled out
Expand All @@ -398,7 +499,7 @@ impl DeltaClientStream {
.await?;

if let Ok(stream) = client
.subscribe_delta_resources(tokio_stream::wrappers::ReceiverStream::new(requests_rx))
.subscribe_delta_resources(receiver_stream)
.in_current_span()
.await
{
Expand All @@ -410,6 +511,9 @@ impl DeltaClientStream {

let (dcs, requests_rx) = Self::new();

let receiver_stream = ReceiverStream::new(requests_rx);
receiver_stream.publish_metric("ads_client").await;

// Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
// need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
// is fully rolled out
Expand All @@ -426,7 +530,7 @@ impl DeltaClientStream {
.await?;

let stream = client
.delta_aggregated_resources(tokio_stream::wrappers::ReceiverStream::new(requests_rx))
.delta_aggregated_resources(receiver_stream)
.in_current_span()
.await?;
Ok((dcs, stream.into_inner(), ep))
Expand Down
2 changes: 2 additions & 0 deletions crates/xds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl LocalVersions {

#[inline]
pub fn get(&self, ty: &str) -> parking_lot::MutexGuard<'_, VersionMap> {
tracing::trace!("LocalVersions::get");
let g = self
.versions
.iter()
Expand Down Expand Up @@ -304,6 +305,7 @@ pub fn handle_delta_discovery_responses<C: Configuration>(
None
};

tracing::trace!(kind = type_url, nonce = %response.nonce, "yielding request");
yield DeltaDiscoveryRequest {
type_url,
response_nonce: response.nonce,
Expand Down
4 changes: 4 additions & 0 deletions crates/xds/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
cs
} else {
let Some(cs) = client_tracker.get_state(type_url) else {
tracing::trace!(type_url, "no client state");
return Ok(None);
};

Expand All @@ -773,6 +774,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
.map_err(|error| tonic::Status::internal(error.to_string()))?;

if req.resources.is_empty() && req.removed.is_empty() {
tracing::trace!(type_url, "no resources and nothing removed");
return Ok(None);
}

Expand Down Expand Up @@ -897,6 +899,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
};

if client_request.type_url == "ignore-me" {
tracing::trace!("ignore-me received, continuing");
continue;
}

Expand Down Expand Up @@ -924,6 +927,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
let type_url = client_request.type_url.clone();

let Some(response) = responder(Some(client_request), &type_url, &mut client_tracker).unwrap() else { continue; };
tracing::trace!(kind = type_url, nonce = response.nonce, "yielding response");
yield response;
}
_ = shutdown.changed() => {
Expand Down
7 changes: 7 additions & 0 deletions src/config/datacenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ pub struct DatacenterMap {
impl DatacenterMap {
#[inline]
pub fn insert(&self, ip: IpAddr, datacenter: Datacenter) -> Option<Datacenter> {
tracing::trace!("DatacenterMap::insert");
let old = self.map.insert(ip, datacenter);
self.version.fetch_add(1, Relaxed);
old
}

#[inline]
pub fn len(&self) -> usize {
tracing::trace!("DatacenterMap::len");
self.map.len()
}

#[inline]
pub fn is_empty(&self) -> bool {
tracing::trace!("DatacenterMap::is_empty");
self.map.is_empty()
}

Expand All @@ -38,16 +41,19 @@ impl DatacenterMap {

#[inline]
pub fn get(&self, key: &IpAddr) -> Option<dashmap::mapref::one::Ref<'_, IpAddr, Datacenter>> {
tracing::trace!("DatacenterMap::get");
self.map.get(key)
}

#[inline]
pub fn iter(&self) -> dashmap::iter::Iter<'_, IpAddr, Datacenter> {
tracing::trace!("DatacenterMap::iter");
self.map.iter()
}

#[inline]
pub fn remove(&self, ip: IpAddr) {
tracing::trace!("DatacenterMap::remove");
let mut lock = self.removed.lock();
let mut version = 0;

Expand All @@ -63,6 +69,7 @@ impl DatacenterMap {

#[inline]
pub fn removed(&self) -> Vec<SocketAddr> {
tracing::trace!("DatacenterMap::removed");
std::mem::take(&mut self.removed.lock())
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/net/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ where
locality: Option<Locality>,
cluster: BTreeSet<Endpoint>,
) {
tracing::trace!("ClusterMap::insert");
let _res = self.apply(remote_addr, locality, EndpointSet::new(cluster));
}

Expand All @@ -311,6 +312,7 @@ where
locality: Option<Locality>,
cluster: EndpointSet,
) -> crate::Result<()> {
tracing::trace!("ClusterMap::apply");
if let Some(raddr) = self.localities.get(&locality) {
if *raddr != remote_addr {
eyre::bail!(
Expand Down Expand Up @@ -358,26 +360,31 @@ where

#[inline]
pub fn len(&self) -> usize {
tracing::trace!("ClusterMap::len");
self.map.len()
}

#[inline]
pub fn is_empty(&self) -> bool {
tracing::trace!("ClusterMap::is_empty");
self.map.is_empty()
}

#[inline]
pub fn get(&self, key: &Option<Locality>) -> Option<DashMapRef<'_>> {
tracing::trace!("ClusterMap::get");
self.map.get(key)
}

#[inline]
pub fn insert_default(&self, endpoints: BTreeSet<Endpoint>) {
tracing::trace!("ClusterMap::insert_default");
self.insert(None, None, endpoints);
}

#[inline]
pub fn remove_endpoint(&self, needle: &Endpoint) -> bool {
tracing::trace!("ClusterMap::remove_endpoint");
let locality = 'l: {
for mut entry in self.map.iter_mut() {
let set = entry.value_mut();
Expand All @@ -403,6 +410,7 @@ where

#[inline]
pub fn remove_endpoint_if(&self, closure: impl Fn(&Endpoint) -> bool) -> bool {
tracing::trace!("ClusterMap::remove_endpoint_if");
let locality = 'l: {
for mut entry in self.map.iter_mut() {
let set = entry.value_mut();
Expand Down Expand Up @@ -432,6 +440,7 @@ where

#[inline]
pub fn iter(&self) -> dashmap::iter::Iter<'_, Option<Locality>, EndpointSet, S> {
tracing::trace!("ClusterMap::iter");
self.map.iter()
}

Expand All @@ -442,6 +451,7 @@ where
locality: Option<Locality>,
endpoint: Endpoint,
) -> Option<Endpoint> {
tracing::trace!("ClusterMap::replace");
if let Some(raddr) = self.localities.get(&locality) {
if *raddr != remote_addr {
tracing::trace!("not replacing locality endpoints");
Expand All @@ -467,6 +477,7 @@ where

#[inline]
pub fn endpoints(&self) -> Vec<Endpoint> {
tracing::trace!("ClusterMap::endpoints");
let mut endpoints = Vec::with_capacity(self.num_of_endpoints());

for set in self.map.iter() {
Expand All @@ -477,6 +488,7 @@ where
}

pub fn nth_endpoint(&self, mut index: usize) -> Option<Endpoint> {
tracing::trace!("ClusterMap::nth_endpoint");
for set in self.iter() {
let set = &set.value().endpoints;
if index < set.len() {
Expand All @@ -490,6 +502,7 @@ where
}

pub fn filter_endpoints(&self, f: impl Fn(&Endpoint) -> bool) -> Vec<Endpoint> {
tracing::trace!("ClusterMap::filter_endpoints");
let mut endpoints = Vec::new();

for set in self.iter() {
Expand Down Expand Up @@ -517,6 +530,7 @@ where
remote_addr: Option<std::net::IpAddr>,
locality: Locality,
) {
tracing::trace!("ClusterMap::update_unlocated_endpoints");
if let Some(raddr) = self.localities.get(&None) {
if *raddr != remote_addr {
tracing::trace!("not updating locality");
Expand All @@ -537,6 +551,7 @@ where

#[inline]
fn do_remove_locality(&self, locality: &Option<Locality>) -> Option<EndpointSet> {
tracing::trace!("ClusterMap::do_remove_locality");
self.localities.remove(locality);

let ret = self.map.remove(locality).map(|(_k, v)| v);
Expand All @@ -549,6 +564,7 @@ where

#[inline]
pub fn remove_contributor(&self, remote_addr: Option<std::net::IpAddr>) {
tracing::trace!("ClusterMap::remove_contributor");
self.localities.retain(|k, v| {
let keep = *v != remote_addr;
if !keep {
Expand All @@ -564,6 +580,7 @@ where
remote_addr: Option<std::net::IpAddr>,
locality: &Option<Locality>,
) -> Option<EndpointSet> {
tracing::trace!("ClusterMap::remove_locality");
{
if let Some(raddr) = self.localities.get(locality) {
if *raddr != remote_addr {
Expand All @@ -577,6 +594,7 @@ where
}

pub fn addresses_for_token(&self, token: Token, addrs: &mut Vec<EndpointAddress>) {
tracing::trace!("ClusterMap::addresses_for_token");
if let Some(ma) = self.token_map.get(&token.0) {
addrs.extend(ma.value().iter().cloned());
}
Expand Down
Loading