Skip to content

Commit 67c898b

Browse files
committed
merge debug branch and try_send branch
2 parents f0b9c97 + d241a05 commit 67c898b

File tree

5 files changed

+137
-2
lines changed

5 files changed

+137
-2
lines changed

crates/xds/src/client.rs

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,104 @@ pub(crate) struct DeltaClientStream {
372372
req_tx: tokio::sync::mpsc::Sender<DeltaDiscoveryRequest>,
373373
}
374374

375+
/// TEMP custom receiver stream to be able to monitor receiver len
376+
#[derive(Debug)]
377+
pub struct ReceiverStream<T> {
378+
inner: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<T>>>,
379+
}
380+
381+
impl<T> ReceiverStream<T> {
382+
pub fn new(recv: tokio::sync::mpsc::Receiver<T>) -> Self {
383+
Self {
384+
inner: Arc::new(tokio::sync::Mutex::new(recv)),
385+
}
386+
}
387+
388+
pub fn close(&mut self) {
389+
let mut guard = self.inner.blocking_lock();
390+
guard.close();
391+
}
392+
}
393+
394+
impl<T> ReceiverStream<T>
395+
where
396+
T: Send + 'static,
397+
{
398+
pub async fn publish_metric(&self, kind: &str) {
399+
let weak_rc = Arc::downgrade(&self.inner);
400+
let kind = kind.to_string();
401+
let receiver_id = format!("{}", rand::random::<u8>());
402+
// Temp
403+
tokio::spawn(async move {
404+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
405+
loop {
406+
tokio::select! {
407+
_ = interval.tick() => {
408+
if let Some(arc) = weak_rc.upgrade() {
409+
let guard = arc.lock().await;
410+
receiver_buffer_len(kind.as_str(), receiver_id.as_str()).set(guard.len() as i64);
411+
} else {
412+
tracing::info!("receiver dropped");
413+
receiver_buffer_len(kind.as_str(), receiver_id.as_str()).set(0);
414+
return;
415+
}
416+
}
417+
}
418+
}
419+
});
420+
}
421+
}
422+
423+
impl<T> tokio_stream::Stream for ReceiverStream<T> {
424+
type Item = T;
425+
426+
fn poll_next(
427+
self: std::pin::Pin<&mut Self>,
428+
cx: &mut std::task::Context<'_>,
429+
) -> std::task::Poll<Option<Self::Item>> {
430+
match self.inner.try_lock() {
431+
Ok(mut guard) => guard.poll_recv(cx),
432+
Err(_) => std::task::Poll::Pending,
433+
}
434+
}
435+
}
436+
437+
// impl<T> AsRef<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
438+
// fn as_ref(&self) -> &tokio::sync::mpsc::Receiver<T> {
439+
// &self.inner
440+
// }
441+
// }
442+
//
443+
// impl<T> AsMut<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
444+
// fn as_mut(&mut self) -> &mut tokio::sync::mpsc::Receiver<T> {
445+
// &mut self.inner
446+
// }
447+
// }
448+
449+
// impl<T> From<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
450+
// fn from(recv: tokio::sync::mpsc::Receiver<T>) -> Self {
451+
// Self::new(recv)
452+
// }
453+
// }
454+
455+
pub(crate) fn receiver_buffer_len(kind: &str, receiver_id: &str) -> prometheus::IntGauge {
456+
use once_cell::sync::Lazy;
457+
use prometheus::IntGaugeVec;
458+
static RECEIVER_BUFFER_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
459+
prometheus::register_int_gauge_vec_with_registry! {
460+
prometheus::opts! {
461+
"temp_xds_receiver_buffer_len",
462+
"receiver channel buffer length",
463+
},
464+
&["kind", "receiver_id"],
465+
crate::metrics::registry(),
466+
}
467+
.unwrap()
468+
});
469+
470+
RECEIVER_BUFFER_SIZE.with_label_values(&[kind, receiver_id])
471+
}
472+
375473
impl DeltaClientStream {
376474
#[inline]
377475
async fn connect(
@@ -382,6 +480,9 @@ impl DeltaClientStream {
382480
if let Ok((mut client, ep)) = MdsClient::connect_with_backoff(endpoints).await {
383481
let (req_tx, requests_rx) = tokio::sync::mpsc::channel(REQUEST_BUFFER_SIZE);
384482

483+
let receiver_stream = ReceiverStream::new(requests_rx);
484+
receiver_stream.publish_metric("mds_client").await;
485+
385486
// Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
386487
// need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
387488
// is fully rolled out
@@ -398,7 +499,7 @@ impl DeltaClientStream {
398499
.await?;
399500

400501
if let Ok(stream) = client
401-
.subscribe_delta_resources(tokio_stream::wrappers::ReceiverStream::new(requests_rx))
502+
.subscribe_delta_resources(receiver_stream)
402503
.in_current_span()
403504
.await
404505
{
@@ -410,6 +511,9 @@ impl DeltaClientStream {
410511

411512
let (req_tx, requests_rx) = tokio::sync::mpsc::channel(REQUEST_BUFFER_SIZE);
412513

514+
let receiver_stream = ReceiverStream::new(requests_rx);
515+
receiver_stream.publish_metric("ads_client").await;
516+
413517
// Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
414518
// need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
415519
// is fully rolled out
@@ -426,7 +530,7 @@ impl DeltaClientStream {
426530
.await?;
427531

428532
let stream = client
429-
.delta_aggregated_resources(tokio_stream::wrappers::ReceiverStream::new(requests_rx))
533+
.delta_aggregated_resources(receiver_stream)
430534
.in_current_span()
431535
.await?;
432536
Ok((Self { req_tx }, stream.into_inner(), ep))

crates/xds/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ impl LocalVersions {
4141

4242
#[inline]
4343
pub fn get(&self, ty: &str) -> parking_lot::MutexGuard<'_, VersionMap> {
44+
tracing::trace!("LocalVersions::get");
4445
let g = self
4546
.versions
4647
.iter()
@@ -304,6 +305,7 @@ pub fn handle_delta_discovery_responses<C: Configuration>(
304305
None
305306
};
306307

308+
tracing::trace!(kind = type_url, nonce = %response.nonce, "yielding request");
307309
yield DeltaDiscoveryRequest {
308310
type_url,
309311
response_nonce: response.nonce,

crates/xds/src/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
714714
cs
715715
} else {
716716
let Some(cs) = client_tracker.get_state(type_url) else {
717+
tracing::trace!(type_url, "no client state");
717718
return Ok(None);
718719
};
719720

@@ -726,6 +727,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
726727
.map_err(|error| tonic::Status::internal(error.to_string()))?;
727728

728729
if req.resources.is_empty() && req.removed.is_empty() {
730+
tracing::trace!(type_url, "no resources and nothing removed");
729731
return Ok(None);
730732
}
731733

@@ -841,6 +843,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
841843
};
842844

843845
if client_request.type_url == "ignore-me" {
846+
tracing::trace!("ignore-me received, continuing");
844847
continue;
845848
}
846849

@@ -868,6 +871,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
868871
let type_url = client_request.type_url.clone();
869872

870873
let Some(response) = responder(Some(client_request), &type_url, &mut client_tracker).unwrap() else { continue; };
874+
tracing::trace!(kind = type_url, nonce = response.nonce, "yielding response");
871875
yield response;
872876
}
873877
_ = shutdown.changed() => {

src/config/datacenter.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@ pub struct DatacenterMap {
1616
impl DatacenterMap {
1717
#[inline]
1818
pub fn insert(&self, ip: IpAddr, datacenter: Datacenter) -> Option<Datacenter> {
19+
tracing::trace!("DatacenterMap::insert");
1920
let old = self.map.insert(ip, datacenter);
2021
self.version.fetch_add(1, Relaxed);
2122
old
2223
}
2324

2425
#[inline]
2526
pub fn len(&self) -> usize {
27+
tracing::trace!("DatacenterMap::len");
2628
self.map.len()
2729
}
2830

2931
#[inline]
3032
pub fn is_empty(&self) -> bool {
33+
tracing::trace!("DatacenterMap::is_empty");
3134
self.map.is_empty()
3235
}
3336

@@ -38,16 +41,19 @@ impl DatacenterMap {
3841

3942
#[inline]
4043
pub fn get(&self, key: &IpAddr) -> Option<dashmap::mapref::one::Ref<'_, IpAddr, Datacenter>> {
44+
tracing::trace!("DatacenterMap::get");
4145
self.map.get(key)
4246
}
4347

4448
#[inline]
4549
pub fn iter(&self) -> dashmap::iter::Iter<'_, IpAddr, Datacenter> {
50+
tracing::trace!("DatacenterMap::iter");
4651
self.map.iter()
4752
}
4853

4954
#[inline]
5055
pub fn remove(&self, ip: IpAddr) {
56+
tracing::trace!("DatacenterMap::remove");
5157
let mut lock = self.removed.lock();
5258
let mut version = 0;
5359

@@ -63,6 +69,7 @@ impl DatacenterMap {
6369

6470
#[inline]
6571
pub fn removed(&self) -> Vec<SocketAddr> {
72+
tracing::trace!("DatacenterMap::removed");
6673
std::mem::take(&mut self.removed.lock())
6774
}
6875
}

src/net/cluster.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ where
302302
locality: Option<Locality>,
303303
cluster: BTreeSet<Endpoint>,
304304
) {
305+
tracing::trace!("ClusterMap::insert");
305306
let _res = self.apply(remote_addr, locality, EndpointSet::new(cluster));
306307
}
307308

@@ -311,6 +312,7 @@ where
311312
locality: Option<Locality>,
312313
cluster: EndpointSet,
313314
) -> crate::Result<()> {
315+
tracing::trace!("ClusterMap::apply");
314316
if let Some(raddr) = self.localities.get(&locality) {
315317
if *raddr != remote_addr {
316318
eyre::bail!(
@@ -358,26 +360,31 @@ where
358360

359361
#[inline]
360362
pub fn len(&self) -> usize {
363+
tracing::trace!("ClusterMap::len");
361364
self.map.len()
362365
}
363366

364367
#[inline]
365368
pub fn is_empty(&self) -> bool {
369+
tracing::trace!("ClusterMap::is_empty");
366370
self.map.is_empty()
367371
}
368372

369373
#[inline]
370374
pub fn get(&self, key: &Option<Locality>) -> Option<DashMapRef<'_>> {
375+
tracing::trace!("ClusterMap::get");
371376
self.map.get(key)
372377
}
373378

374379
#[inline]
375380
pub fn insert_default(&self, endpoints: BTreeSet<Endpoint>) {
381+
tracing::trace!("ClusterMap::insert_default");
376382
self.insert(None, None, endpoints);
377383
}
378384

379385
#[inline]
380386
pub fn remove_endpoint(&self, needle: &Endpoint) -> bool {
387+
tracing::trace!("ClusterMap::remove_endpoint");
381388
let locality = 'l: {
382389
for mut entry in self.map.iter_mut() {
383390
let set = entry.value_mut();
@@ -403,6 +410,7 @@ where
403410

404411
#[inline]
405412
pub fn remove_endpoint_if(&self, closure: impl Fn(&Endpoint) -> bool) -> bool {
413+
tracing::trace!("ClusterMap::remove_endpoint_if");
406414
let locality = 'l: {
407415
for mut entry in self.map.iter_mut() {
408416
let set = entry.value_mut();
@@ -432,6 +440,7 @@ where
432440

433441
#[inline]
434442
pub fn iter(&self) -> dashmap::iter::Iter<'_, Option<Locality>, EndpointSet, S> {
443+
tracing::trace!("ClusterMap::iter");
435444
self.map.iter()
436445
}
437446

@@ -442,6 +451,7 @@ where
442451
locality: Option<Locality>,
443452
endpoint: Endpoint,
444453
) -> Option<Endpoint> {
454+
tracing::trace!("ClusterMap::replace");
445455
if let Some(raddr) = self.localities.get(&locality) {
446456
if *raddr != remote_addr {
447457
tracing::trace!("not replacing locality endpoints");
@@ -467,6 +477,7 @@ where
467477

468478
#[inline]
469479
pub fn endpoints(&self) -> Vec<Endpoint> {
480+
tracing::trace!("ClusterMap::endpoints");
470481
let mut endpoints = Vec::with_capacity(self.num_of_endpoints());
471482

472483
for set in self.map.iter() {
@@ -477,6 +488,7 @@ where
477488
}
478489

479490
pub fn nth_endpoint(&self, mut index: usize) -> Option<Endpoint> {
491+
tracing::trace!("ClusterMap::nth_endpoint");
480492
for set in self.iter() {
481493
let set = &set.value().endpoints;
482494
if index < set.len() {
@@ -490,6 +502,7 @@ where
490502
}
491503

492504
pub fn filter_endpoints(&self, f: impl Fn(&Endpoint) -> bool) -> Vec<Endpoint> {
505+
tracing::trace!("ClusterMap::filter_endpoints");
493506
let mut endpoints = Vec::new();
494507

495508
for set in self.iter() {
@@ -517,6 +530,7 @@ where
517530
remote_addr: Option<std::net::IpAddr>,
518531
locality: Locality,
519532
) {
533+
tracing::trace!("ClusterMap::update_unlocated_endpoints");
520534
if let Some(raddr) = self.localities.get(&None) {
521535
if *raddr != remote_addr {
522536
tracing::trace!("not updating locality");
@@ -537,6 +551,7 @@ where
537551

538552
#[inline]
539553
fn do_remove_locality(&self, locality: &Option<Locality>) -> Option<EndpointSet> {
554+
tracing::trace!("ClusterMap::do_remove_locality");
540555
self.localities.remove(locality);
541556

542557
let ret = self.map.remove(locality).map(|(_k, v)| v);
@@ -549,6 +564,7 @@ where
549564

550565
#[inline]
551566
pub fn remove_contributor(&self, remote_addr: Option<std::net::IpAddr>) {
567+
tracing::trace!("ClusterMap::remove_contributor");
552568
self.localities.retain(|k, v| {
553569
let keep = *v != remote_addr;
554570
if !keep {
@@ -564,6 +580,7 @@ where
564580
remote_addr: Option<std::net::IpAddr>,
565581
locality: &Option<Locality>,
566582
) -> Option<EndpointSet> {
583+
tracing::trace!("ClusterMap::remove_locality");
567584
{
568585
if let Some(raddr) = self.localities.get(locality) {
569586
if *raddr != remote_addr {
@@ -577,6 +594,7 @@ where
577594
}
578595

579596
pub fn addresses_for_token(&self, token: Token, addrs: &mut Vec<EndpointAddress>) {
597+
tracing::trace!("ClusterMap::addresses_for_token");
580598
if let Some(ma) = self.token_map.get(&token.0) {
581599
addrs.extend(ma.value().iter().cloned());
582600
}

0 commit comments

Comments
 (0)