Skip to content

Commit fc96ac2

Browse files
committed
merge debug branch and try_send branch
3 parents 4bfecda + d241a05 + ff4aa6e commit fc96ac2

File tree

5 files changed

+245
-52
lines changed

5 files changed

+245
-52
lines changed

crates/xds/src/client.rs

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,104 @@ pub(crate) struct DeltaClientStream {
372372
req_tx: tokio::sync::mpsc::Sender<DeltaDiscoveryRequest>,
373373
}
374374

375+
/// TEMP custom receiver stream to be able to monitor receiver len
376+
#[derive(Debug)]
377+
pub struct ReceiverStream<T> {
378+
inner: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<T>>>,
379+
}
380+
381+
impl<T> ReceiverStream<T> {
382+
pub fn new(recv: tokio::sync::mpsc::Receiver<T>) -> Self {
383+
Self {
384+
inner: Arc::new(tokio::sync::Mutex::new(recv)),
385+
}
386+
}
387+
388+
pub fn close(&mut self) {
389+
let mut guard = self.inner.blocking_lock();
390+
guard.close();
391+
}
392+
}
393+
394+
impl<T> ReceiverStream<T>
395+
where
396+
T: Send + 'static,
397+
{
398+
pub async fn publish_metric(&self, kind: &str) {
399+
let weak_rc = Arc::downgrade(&self.inner);
400+
let kind = kind.to_string();
401+
let receiver_id = format!("{}", rand::random::<u8>());
402+
// Temp
403+
tokio::spawn(async move {
404+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
405+
loop {
406+
tokio::select! {
407+
_ = interval.tick() => {
408+
if let Some(arc) = weak_rc.upgrade() {
409+
let guard = arc.lock().await;
410+
receiver_buffer_len(kind.as_str(), receiver_id.as_str()).set(guard.len() as i64);
411+
} else {
412+
tracing::info!("receiver dropped");
413+
receiver_buffer_len(kind.as_str(), receiver_id.as_str()).set(0);
414+
return;
415+
}
416+
}
417+
}
418+
}
419+
});
420+
}
421+
}
422+
423+
impl<T> tokio_stream::Stream for ReceiverStream<T> {
424+
type Item = T;
425+
426+
fn poll_next(
427+
self: std::pin::Pin<&mut Self>,
428+
cx: &mut std::task::Context<'_>,
429+
) -> std::task::Poll<Option<Self::Item>> {
430+
match self.inner.try_lock() {
431+
Ok(mut guard) => guard.poll_recv(cx),
432+
Err(_) => std::task::Poll::Pending,
433+
}
434+
}
435+
}
436+
437+
// impl<T> AsRef<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
438+
// fn as_ref(&self) -> &tokio::sync::mpsc::Receiver<T> {
439+
// &self.inner
440+
// }
441+
// }
442+
//
443+
// impl<T> AsMut<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
444+
// fn as_mut(&mut self) -> &mut tokio::sync::mpsc::Receiver<T> {
445+
// &mut self.inner
446+
// }
447+
// }
448+
449+
// impl<T> From<tokio::sync::mpsc::Receiver<T>> for ReceiverStream<T> {
450+
// fn from(recv: tokio::sync::mpsc::Receiver<T>) -> Self {
451+
// Self::new(recv)
452+
// }
453+
// }
454+
455+
pub(crate) fn receiver_buffer_len(kind: &str, receiver_id: &str) -> prometheus::IntGauge {
456+
use once_cell::sync::Lazy;
457+
use prometheus::IntGaugeVec;
458+
static RECEIVER_BUFFER_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
459+
prometheus::register_int_gauge_vec_with_registry! {
460+
prometheus::opts! {
461+
"temp_xds_receiver_buffer_len",
462+
"receiver channel buffer length",
463+
},
464+
&["kind", "receiver_id"],
465+
crate::metrics::registry(),
466+
}
467+
.unwrap()
468+
});
469+
470+
RECEIVER_BUFFER_SIZE.with_label_values(&[kind, receiver_id])
471+
}
472+
375473
impl DeltaClientStream {
376474
#[inline]
377475
async fn connect(
@@ -382,6 +480,9 @@ impl DeltaClientStream {
382480
if let Ok((mut client, ep)) = MdsClient::connect_with_backoff(endpoints).await {
383481
let (dcs, requests_rx) = Self::new();
384482

483+
let receiver_stream = ReceiverStream::new(requests_rx);
484+
receiver_stream.publish_metric("mds_client").await;
485+
385486
// Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
386487
// need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
387488
// is fully rolled out
@@ -398,7 +499,7 @@ impl DeltaClientStream {
398499
.await?;
399500

400501
if let Ok(stream) = client
401-
.subscribe_delta_resources(tokio_stream::wrappers::ReceiverStream::new(requests_rx))
502+
.subscribe_delta_resources(receiver_stream)
402503
.in_current_span()
403504
.await
404505
{
@@ -410,6 +511,9 @@ impl DeltaClientStream {
410511

411512
let (dcs, requests_rx) = Self::new();
412513

514+
let receiver_stream = ReceiverStream::new(requests_rx);
515+
receiver_stream.publish_metric("ads_client").await;
516+
413517
// Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
414518
// need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
415519
// is fully rolled out
@@ -426,7 +530,7 @@ impl DeltaClientStream {
426530
.await?;
427531

428532
let stream = client
429-
.delta_aggregated_resources(tokio_stream::wrappers::ReceiverStream::new(requests_rx))
533+
.delta_aggregated_resources(receiver_stream)
430534
.in_current_span()
431535
.await?;
432536
Ok((dcs, stream.into_inner(), ep))

crates/xds/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ impl LocalVersions {
4141

4242
#[inline]
4343
pub fn get(&self, ty: &str) -> parking_lot::MutexGuard<'_, VersionMap> {
44+
tracing::trace!("LocalVersions::get");
4445
let g = self
4546
.versions
4647
.iter()
@@ -304,6 +305,7 @@ pub fn handle_delta_discovery_responses<C: Configuration>(
304305
None
305306
};
306307

308+
tracing::trace!(kind = type_url, nonce = %response.nonce, "yielding request");
307309
yield DeltaDiscoveryRequest {
308310
type_url,
309311
response_nonce: response.nonce,

crates/xds/src/server.rs

Lines changed: 112 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,48 @@ impl TlsIdentity {
9999
}
100100
}
101101

102+
const RESPONSE_PROPAGATION_INTERVAL: Duration = Duration::from_millis(100);
103+
104+
/// Buffers response broadcasts so they can be propagated at a set interval instead of every time
105+
/// something has changed.
106+
#[derive(Debug, Default)]
107+
struct ResponseBroadcastPropagationBuffer {
108+
changed_resources: std::sync::Mutex<std::collections::HashSet<String>>,
109+
}
110+
111+
impl ResponseBroadcastPropagationBuffer {
112+
/// Ingest a resource broadcast message
113+
fn ingest(
114+
&self,
115+
result: Result<&str, tokio::sync::broadcast::error::RecvError>,
116+
) -> Result<(), tokio::sync::broadcast::error::RecvError> {
117+
result.map(|resource| {
118+
let mut guard = match self.changed_resources.lock() {
119+
Ok(guard) => guard,
120+
Err(poisoned) => {
121+
let guard = poisoned.into_inner();
122+
tracing::warn!("recovered from poisoned mutex");
123+
guard
124+
}
125+
};
126+
guard.insert(resource.into());
127+
})
128+
}
129+
130+
/// Flush all changed resources and reset the buffer
131+
fn flush(&self) -> Vec<String> {
132+
let mut guard = match self.changed_resources.lock() {
133+
Ok(guard) => guard,
134+
Err(poisoned) => {
135+
let guard = poisoned.into_inner();
136+
tracing::warn!("recovered from poisoned mutex");
137+
guard
138+
}
139+
};
140+
guard.drain().collect()
141+
}
142+
}
143+
102144
const VERSION_INFO: &str = "9";
103145

104146
pub struct ControlPlane<C> {
@@ -371,38 +413,45 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
371413
let stream = async_stream::try_stream! {
372414
yield response;
373415

416+
// Buffer changes so we only propagate at a set and controlled interval. This reduces
417+
// the network load when we have a high rate of change due to high cluster load.
418+
let buffer = ResponseBroadcastPropagationBuffer::default();
419+
let mut lag_amount: u64 = 0;
420+
let mut propagation_interval = tokio::time::interval(RESPONSE_PROPAGATION_INTERVAL);
421+
374422
loop {
375423
tokio::select! {
376-
// The resource(s) have changed, inform the connected client, but only
424+
// Inform the connected client if any of the resources have changed, but only
377425
// send the changed resources that the client doesn't already have
378-
res = rx.recv() => {
379-
match res {
380-
Ok(rt) => {
381-
match responder(None, rt, &mut client_tracker) {
382-
Ok(Some(res)) => yield res,
383-
Ok(None) => {}
384-
Err(error) => {
385-
crate::metrics::errors_total(KIND_SERVER, "respond").inc();
386-
tracing::error!(%error, "responder failed to generate response");
387-
continue;
388-
},
426+
_ = propagation_interval.tick() => {
427+
// Fetch the changed resources
428+
let mut resources = buffer.flush();
429+
// If we've been lagging on updates, collect everything instead
430+
if lag_amount > 0 {
431+
tracing::warn!(lag_amount, "lagged while receiving response broadcasts");
432+
resources = client_tracker.tracked_resources().collect();
433+
}
434+
lag_amount = 0;
435+
for rt in resources {
436+
match responder(None, &rt, &mut client_tracker) {
437+
Ok(Some(res)) => yield res,
438+
Ok(None) => {},
439+
Err(error) => {
440+
crate::metrics::errors_total(KIND_SERVER, "respond").inc();
441+
tracing::error!(%error, "responder failed to generate response");
442+
continue;
389443
}
390444
}
445+
}
446+
}
447+
// A resource has changed, buffer it for propagation
448+
res = rx.recv() => {
449+
match buffer.ingest(res) {
450+
Ok(_) => {},
391451
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
392-
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
393-
let tracked_resources: Vec<_> = client_tracker.tracked_resources().collect();
394-
for rt in tracked_resources {
395-
match responder(None, &rt, &mut client_tracker) {
396-
Ok(Some(res)) => yield res,
397-
Ok(None) => {},
398-
Err(error) => {
399-
crate::metrics::errors_total(KIND_SERVER, "respond").inc();
400-
tracing::error!(%error, "responder failed to generate response");
401-
continue;
402-
}
403-
}
404-
}
405-
}
452+
Err(tokio::sync::broadcast::error::RecvError::Lagged(amount)) => {
453+
lag_amount += amount;
454+
},
406455
}
407456
}
408457
client_request = streaming.next() => {
@@ -712,6 +761,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
712761
cs
713762
} else {
714763
let Some(cs) = client_tracker.get_state(type_url) else {
764+
tracing::trace!(type_url, "no client state");
715765
return Ok(None);
716766
};
717767

@@ -724,6 +774,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
724774
.map_err(|error| tonic::Status::internal(error.to_string()))?;
725775

726776
if req.resources.is_empty() && req.removed.is_empty() {
777+
tracing::trace!(type_url, "no resources and nothing removed");
727778
return Ok(None);
728779
}
729780

@@ -796,36 +847,45 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
796847
let stream = async_stream::try_stream! {
797848
yield response;
798849

850+
// Buffer changes so we only propagate at a set and controlled interval. This reduces
851+
// the network load when we have a high rate of change due to high cluster load.
852+
let buffer = ResponseBroadcastPropagationBuffer::default();
853+
let mut lag_amount: u64 = 0;
854+
let mut propagation_interval = tokio::time::interval(RESPONSE_PROPAGATION_INTERVAL);
855+
799856
loop {
800857
tokio::select! {
801-
// The resource(s) have changed, inform the connected client, but only
858+
// Inform the connected client if any of the resources have changed, but only
802859
// send the changed resources that the client doesn't already have
803-
res = rx.recv() => {
804-
match res {
805-
Ok(rt) => {
806-
match responder(None, rt, &mut client_tracker) {
807-
Ok(Some(res)) => yield res,
808-
Ok(None) => {}
809-
Err(error) => {
810-
tracing::error!(%error, "responder failed to generate response");
811-
continue;
812-
},
860+
_ = propagation_interval.tick() => {
861+
// Fetch the changed resources
862+
let mut resources = buffer.flush();
863+
// If we've been lagging on updates, collect everything instead
864+
if lag_amount > 0 {
865+
tracing::warn!(lag_amount, "lagged while receiving response broadcasts");
866+
resources = client_tracker.tracked_resources().collect();
867+
}
868+
lag_amount = 0;
869+
for rt in resources {
870+
match responder(None, &rt, &mut client_tracker) {
871+
Ok(Some(res)) => yield res,
872+
Ok(None) => {},
873+
Err(error) => {
874+
crate::metrics::errors_total(KIND_SERVER, "respond").inc();
875+
tracing::error!(%error, "responder failed to generate response");
876+
continue;
813877
}
814878
}
879+
}
880+
}
881+
// A resource has changed, buffer it for propagation
882+
res = rx.recv() => {
883+
match buffer.ingest(res) {
884+
Ok(_) => {},
815885
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
816-
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
817-
let tracked_resources: Vec<_> = client_tracker.tracked_resources().collect();
818-
for rt in tracked_resources {
819-
match responder(None, &rt, &mut client_tracker) {
820-
Ok(Some(res)) => yield res,
821-
Ok(None) => {},
822-
Err(error) => {
823-
tracing::error!(%error, "responder failed to generate response");
824-
continue;
825-
}
826-
}
827-
}
828-
}
886+
Err(tokio::sync::broadcast::error::RecvError::Lagged(amount)) => {
887+
lag_amount += amount;
888+
},
829889
}
830890
}
831891
client_request = requests.next() => {
@@ -839,6 +899,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
839899
};
840900

841901
if client_request.type_url == "ignore-me" {
902+
tracing::trace!("ignore-me received, continuing");
842903
continue;
843904
}
844905

@@ -866,6 +927,7 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
866927
let type_url = client_request.type_url.clone();
867928

868929
let Some(response) = responder(Some(client_request), &type_url, &mut client_tracker).unwrap() else { continue; };
930+
tracing::trace!(kind = type_url, nonce = response.nonce, "yielding response");
869931
yield response;
870932
}
871933
_ = shutdown.changed() => {

0 commit comments

Comments
 (0)