Skip to content

Commit dd50b2a

Browse files
committed
merge debug branch and try_send branch
2 parents f18d2fe + b1c1309 commit dd50b2a

File tree

3 files changed

+75
-75
lines changed

3 files changed

+75
-75
lines changed

crates/xds/src/client.rs

Lines changed: 69 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub type MdsClient = Client<MdsGrpcClient>;
4949

5050
pub(crate) const IDLE_REQUEST_INTERVAL: Duration = Duration::from_secs(30);
5151

52+
const REQUEST_BUFFER_SIZE: usize = 100;
53+
5254
#[tonic::async_trait]
5355
pub trait ServiceClient: Clone + Sized + Send + 'static {
5456
type Request: Clone + Send + Sync + Sized + 'static + std::fmt::Debug;
@@ -476,16 +478,15 @@ impl DeltaClientStream {
476478
) -> Result<(Self, tonic::Streaming<DeltaDiscoveryResponse>, Endpoint)> {
477479
crate::metrics::actions_total(KIND_CLIENT, "connect").inc();
478480
if let Ok((mut client, ep)) = MdsClient::connect_with_backoff(endpoints).await {
479-
let (req_tx, requests_rx) =
480-
tokio::sync::mpsc::channel(100 /*ResourceType::VARIANTS.len()*/);
481+
let (dcs, requests_rx) = Self::new();
481482

482483
let receiver_stream = ReceiverStream::new(requests_rx);
483484
receiver_stream.publish_metric("mds_client").await;
484485

485486
// Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
486487
// need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
487488
// is fully rolled out
488-
req_tx
489+
dcs.req_tx
489490
.send(DeltaDiscoveryRequest {
490491
node: Some(Node {
491492
id: identifier.clone(),
@@ -502,22 +503,21 @@ impl DeltaClientStream {
502503
.in_current_span()
503504
.await
504505
{
505-
return Ok((Self { req_tx }, stream.into_inner(), ep));
506+
return Ok((dcs, stream.into_inner(), ep));
506507
}
507508
}
508509

509510
let (mut client, ep) = AdsClient::connect_with_backoff(endpoints).await?;
510511

511-
let (req_tx, requests_rx) =
512-
tokio::sync::mpsc::channel(100 /*ResourceType::VARIANTS.len()*/);
512+
let (dcs, requests_rx) = Self::new();
513513

514514
let receiver_stream = ReceiverStream::new(requests_rx);
515515
receiver_stream.publish_metric("ads_client").await;
516516

517517
// Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
518518
// need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
519519
// is fully rolled out
520-
req_tx
520+
dcs.req_tx
521521
.send(DeltaDiscoveryRequest {
522522
node: Some(Node {
523523
id: identifier,
@@ -533,16 +533,16 @@ impl DeltaClientStream {
533533
.delta_aggregated_resources(receiver_stream)
534534
.in_current_span()
535535
.await?;
536-
Ok((Self { req_tx }, stream.into_inner(), ep))
536+
Ok((dcs, stream.into_inner(), ep))
537537
}
538538

539539
pub(crate) fn new() -> (Self, tokio::sync::mpsc::Receiver<DeltaDiscoveryRequest>) {
540-
let (req_tx, requests_rx) = tokio::sync::mpsc::channel(1);
540+
let (req_tx, requests_rx) = tokio::sync::mpsc::channel(REQUEST_BUFFER_SIZE);
541541
(Self { req_tx }, requests_rx)
542542
}
543543

544544
#[inline]
545-
pub(crate) async fn refresh(
545+
pub(crate) fn refresh(
546546
&self,
547547
identifier: &str,
548548
subs: Vec<(&'static str, Vec<String>)>,
@@ -551,33 +551,42 @@ impl DeltaClientStream {
551551
crate::metrics::actions_total(KIND_CLIENT, "refresh").inc();
552552
for (rt, names) in subs {
553553
let initial_resource_versions = local.get(rt).clone();
554-
self.req_tx
555-
.send(DeltaDiscoveryRequest {
556-
node: Some(Node {
557-
id: identifier.to_owned(),
558-
user_agent_name: "quilkin".into(),
559-
..Node::default()
560-
}),
561-
type_url: (*rt).to_owned(),
562-
resource_names_subscribe: names.clone(),
563-
initial_resource_versions,
564-
// We (currently) never unsubscribe from resources, since we
565-
// never actually subscribe to particular ones in the first place
566-
resource_names_unsubscribe: Vec::new(),
567-
response_nonce: "".into(),
568-
error_detail: None,
569-
})
570-
.await?;
554+
let refresh_request = DeltaDiscoveryRequest {
555+
node: Some(Node {
556+
id: identifier.to_owned(),
557+
user_agent_name: "quilkin".into(),
558+
..Node::default()
559+
}),
560+
type_url: (*rt).to_owned(),
561+
resource_names_subscribe: names.clone(),
562+
initial_resource_versions,
563+
// We (currently) never unsubscribe from resources, since we
564+
// never actually subscribe to particular ones in the first place
565+
resource_names_unsubscribe: Vec::new(),
566+
response_nonce: "".into(),
567+
error_detail: None,
568+
};
569+
self.send_request(refresh_request)?;
571570
}
572571

573572
Ok(())
574573
}
575574

576-
/// Sends an n/ack "response" in response to the remote response
575+
/// Sends an n/ack "request" in response to the remote response
577576
#[inline]
578-
pub(crate) async fn send_response(&self, response: DeltaDiscoveryRequest) -> Result<()> {
579-
crate::metrics::actions_total(KIND_CLIENT, "respond").inc();
580-
self.req_tx.send(response).await?;
577+
pub(crate) fn ack_response(&self, ack_request: DeltaDiscoveryRequest) -> Result<()> {
578+
// Save 10% or at least 10 spots of the request buffer for refresh requests
579+
if self.req_tx.capacity() < (REQUEST_BUFFER_SIZE / 10).max(10) {
580+
Err(eyre::eyre!("request buffer congested, dropping ACK/NACK"))
581+
} else {
582+
self.send_request(ack_request)
583+
}
584+
}
585+
586+
#[inline]
587+
fn send_request(&self, request: DeltaDiscoveryRequest) -> Result<()> {
588+
crate::metrics::actions_total(KIND_CLIENT, "send_request").inc();
589+
self.req_tx.try_send(request)?;
581590
Ok(())
582591
}
583592
}
@@ -593,7 +602,7 @@ impl DeltaServerStream {
593602
identifier: String,
594603
) -> Result<(Self, tonic::Streaming<DeltaDiscoveryRequest>)> {
595604
crate::metrics::actions_total(KIND_SERVER, "connect").inc();
596-
let (res_tx, responses_rx) = tokio::sync::mpsc::channel(100);
605+
let (res_tx, responses_rx) = tokio::sync::mpsc::channel(REQUEST_BUFFER_SIZE);
597606

598607
res_tx
599608
.send(DeltaDiscoveryResponse {
@@ -630,19 +639,16 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
630639
notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
631640
resources: &'static [(&'static str, &'static [(&'static str, Vec<String>)])],
632641
) -> eyre::Result<tokio::task::JoinHandle<Result<()>>> {
633-
let (mut ds, mut stream, mut connected_endpoint) = match DeltaClientStream::connect(
634-
&endpoints,
635-
identifier.clone(),
636-
)
637-
.await
638-
{
639-
Ok(ds) => ds,
640-
Err(err) => {
641-
crate::metrics::errors_total(KIND_CLIENT, "connect").inc();
642-
tracing::error!(error = ?err, "failed to acquire aggregated delta stream from management server");
643-
return Err(err);
644-
}
645-
};
642+
let (mut client, mut response_stream, mut connected_endpoint) =
643+
DeltaClientStream::connect(&endpoints, identifier.clone())
644+
.await
645+
.inspect_err(|error| {
646+
crate::metrics::errors_total(KIND_CLIENT, "connect").inc();
647+
tracing::error!(
648+
?error,
649+
"failed to acquire aggregated delta stream from management server"
650+
);
651+
})?;
646652

647653
async fn handle_first_response(
648654
stream: &mut tonic::Streaming<DeltaDiscoveryResponse>,
@@ -688,7 +694,7 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
688694
}
689695

690696
let (mut control_plane, resource_subscriptions) = match handle_first_response(
691-
&mut stream,
697+
&mut response_stream,
692698
resources,
693699
)
694700
.await
@@ -708,10 +714,7 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
708714
let local = Arc::new(crate::config::LocalVersions::new(
709715
resource_subscriptions.iter().map(|(s, _)| *s),
710716
));
711-
if let Err(err) = ds
712-
.refresh(&identifier, resource_subscriptions.to_vec(), &local)
713-
.await
714-
{
717+
if let Err(err) = client.refresh(&identifier, resource_subscriptions.to_vec(), &local) {
715718
crate::metrics::errors_total(KIND_CLIENT, "request_failed").inc();
716719
tracing::error!(error = ?err, "failed to send initial resource requests");
717720
return Err(err);
@@ -721,15 +724,15 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
721724
let handle = tokio::task::spawn(
722725
async move {
723726
tracing::trace!("starting xDS delta stream task");
724-
let mut stream = stream;
727+
let mut response_stream = response_stream;
725728
let mut resource_subscriptions = resource_subscriptions;
726729

727730
is_healthy.store(true, Ordering::SeqCst);
728731
loop {
729732
tracing::info!(%control_plane, "creating discovery response handler");
730-
let mut response_stream = crate::config::handle_delta_discovery_responses(
733+
let mut ack_request_stream = crate::config::handle_delta_discovery_responses(
731734
control_plane.clone(),
732-
stream,
735+
response_stream,
733736
config.clone(),
734737
local.clone(),
735738
None,
@@ -738,18 +741,18 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
738741

739742
tracing::info!(%control_plane, "entering xDS stream loop");
740743
loop {
741-
let next_response =
742-
tokio::time::timeout(IDLE_REQUEST_INTERVAL, response_stream.next());
744+
let next_ack_request =
745+
tokio::time::timeout(IDLE_REQUEST_INTERVAL, ack_request_stream.next());
743746

744-
match next_response.await {
745-
Ok(Some(Ok(response))) => {
746-
let node_id = if let Some(node) = &response.node {
747+
match next_ack_request.await {
748+
Ok(Some(Ok(ack_request))) => {
749+
let node_id = if let Some(node) = &ack_request.node {
747750
node.id.clone()
748751
} else {
749752
"unknown".into()
750753
};
751754
tracing::trace!(%node_id, "received delta response");
752-
if let Err(error) = ds.send_response(response).await {
755+
if let Err(error) = client.ack_response(ack_request) {
753756
crate::metrics::errors_total(KIND_CLIENT, "ack_failed").inc();
754757
tracing::error!(%error, %node_id, "failed to ack delta response");
755758
}
@@ -777,12 +780,11 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
777780
}
778781
Err(_) => {
779782
tracing::debug!("exceeded idle request interval sending new requests");
780-
if let Err(error) = ds
781-
.refresh(&identifier, resource_subscriptions.to_vec(), &local)
782-
.await
783+
if let Err(error) =
784+
client.refresh(&identifier, resource_subscriptions.to_vec(), &local)
783785
{
784786
crate::metrics::errors_total(KIND_CLIENT, "refresh").inc();
785-
return Err(error.wrap_err("refresh failed"));
787+
tracing::error!(?error, "refresh failed");
786788
}
787789
}
788790
}
@@ -794,7 +796,7 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
794796
tracing::info!(%control_plane, "Lost connection to xDS, retrying");
795797
match DeltaClientStream::connect(&endpoints, identifier.clone()).await {
796798
Ok(res) => {
797-
(ds, stream, connected_endpoint) = res;
799+
(client, response_stream, connected_endpoint) = res;
798800
}
799801
Err(error) => {
800802
crate::metrics::errors_total(KIND_CLIENT, "connect").inc();
@@ -803,7 +805,7 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
803805
}
804806
}
805807

806-
match handle_first_response(&mut stream, resources).await {
808+
match handle_first_response(&mut response_stream, resources).await {
807809
Ok((id, rs)) => {
808810
control_plane = id;
809811
resource_subscriptions = rs;
@@ -816,9 +818,8 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
816818
}
817819
}
818820

819-
if let Err(error) = ds
820-
.refresh(&identifier, resource_subscriptions.to_vec(), &local)
821-
.await
821+
if let Err(error) =
822+
client.refresh(&identifier, resource_subscriptions.to_vec(), &local)
822823
{
823824
crate::metrics::errors_total(KIND_CLIENT, "refresh").inc();
824825
return Err(error.wrap_err("refresh failed"));

crates/xds/src/metrics.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use once_cell::sync::Lazy;
1818
use prometheus::{IntCounterVec, IntGaugeVec, Registry};
1919

20-
pub(crate) const NODE_LABEL: &str = "node";
20+
pub(crate) const CLIENT_ID_LABEL: &str = "client_id";
2121
pub(crate) const CONTROL_PLANE_LABEL: &str = "control_plane";
2222
pub(crate) const TYPE_LABEL: &str = "type";
2323
pub(crate) const KIND_CLIENT: &str = "client";
@@ -57,20 +57,20 @@ pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge
5757
ACTIVE_CONTROL_PLANES.with_label_values(&[control_plane])
5858
}
5959

60-
pub(crate) fn delta_discovery_requests(node: &str, type_url: &str) -> prometheus::IntCounter {
60+
pub(crate) fn delta_discovery_requests(client_id: &str, type_url: &str) -> prometheus::IntCounter {
6161
static DELTA_DISCOVERY_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
6262
prometheus::register_int_counter_vec_with_registry! {
6363
prometheus::opts! {
6464
"delta_discovery_requests",
6565
"Total number of xDS delta discovery requests",
6666
},
67-
&[NODE_LABEL, TYPE_LABEL],
67+
&[CLIENT_ID_LABEL, TYPE_LABEL],
6868
crate::metrics::registry(),
6969
}
7070
.unwrap()
7171
});
7272

73-
DELTA_DISCOVERY_REQUESTS.with_label_values(&[node, type_url])
73+
DELTA_DISCOVERY_REQUESTS.with_label_values(&[client_id, type_url])
7474
}
7575

7676
pub(crate) fn delta_discovery_responses(

crates/xds/src/server.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,6 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
628628
config.interested_resources(&server_version).collect(),
629629
&local,
630630
)
631-
.await
632631
.map_err(|error| tonic::Status::internal(error.to_string()))?;
633632

634633
let mut response_stream = crate::config::handle_delta_discovery_responses(
@@ -649,7 +648,8 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
649648
match nr {
650649
Ok(Some(Ok(ack))) => {
651650
tracing::trace!("sending ack request");
652-
ds.send_response(ack).await.map_err(|_err| {
651+
ds.ack_response(ack).map_err(|_err| {
652+
crate::metrics::errors_total(KIND_SERVER, "ack_failed").inc();
653653
tonic::Status::internal("this should not be reachable")
654654
})?;
655655
}
@@ -670,7 +670,6 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
670670
config.interested_resources(&server_version).collect(),
671671
&local,
672672
)
673-
.await
674673
.map_err(|error| tonic::Status::internal(error.to_string()))?;
675674
}
676675
}

0 commit comments

Comments
 (0)