@@ -380,12 +380,12 @@ impl DeltaClientStream {
380380 ) -> Result < ( Self , tonic:: Streaming < DeltaDiscoveryResponse > , Endpoint ) > {
381381 crate :: metrics:: actions_total ( KIND_CLIENT , "connect" ) . inc ( ) ;
382382 if let Ok ( ( mut client, ep) ) = MdsClient :: connect_with_backoff ( endpoints) . await {
383- let ( req_tx , requests_rx) = tokio :: sync :: mpsc :: channel ( REQUEST_BUFFER_SIZE ) ;
383+ let ( dcs , requests_rx) = Self :: new ( ) ;
384384
385385 // Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
386386 // need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
387387 // is fully rolled out
388- req_tx
388+ dcs . req_tx
389389 . send ( DeltaDiscoveryRequest {
390390 node : Some ( Node {
391391 id : identifier. clone ( ) ,
@@ -402,18 +402,18 @@ impl DeltaClientStream {
402402 . in_current_span ( )
403403 . await
404404 {
405- return Ok ( ( Self { req_tx } , stream. into_inner ( ) , ep) ) ;
405+ return Ok ( ( dcs , stream. into_inner ( ) , ep) ) ;
406406 }
407407 }
408408
409409 let ( mut client, ep) = AdsClient :: connect_with_backoff ( endpoints) . await ?;
410410
411- let ( req_tx , requests_rx) = tokio :: sync :: mpsc :: channel ( REQUEST_BUFFER_SIZE ) ;
411+ let ( dcs , requests_rx) = Self :: new ( ) ;
412412
413413 // Since we are doing exploratory requests to see if the remote endpoint supports delta streams, we unfortunately
414414 // need to actually send something before the full roundtrip occurs. This can be removed once delta discovery
415415 // is fully rolled out
416- req_tx
416+ dcs . req_tx
417417 . send ( DeltaDiscoveryRequest {
418418 node : Some ( Node {
419419 id : identifier,
@@ -429,7 +429,7 @@ impl DeltaClientStream {
429429 . delta_aggregated_resources ( tokio_stream:: wrappers:: ReceiverStream :: new ( requests_rx) )
430430 . in_current_span ( )
431431 . await ?;
432- Ok ( ( Self { req_tx } , stream. into_inner ( ) , ep) )
432+ Ok ( ( dcs , stream. into_inner ( ) , ep) )
433433 }
434434
435435 pub ( crate ) fn new ( ) -> ( Self , tokio:: sync:: mpsc:: Receiver < DeltaDiscoveryRequest > ) {
@@ -438,7 +438,7 @@ impl DeltaClientStream {
438438 }
439439
440440 #[ inline]
441- pub ( crate ) async fn refresh (
441+ pub ( crate ) fn refresh (
442442 & self ,
443443 identifier : & str ,
444444 subs : Vec < ( & ' static str , Vec < String > ) > ,
@@ -447,31 +447,40 @@ impl DeltaClientStream {
447447 crate :: metrics:: actions_total ( KIND_CLIENT , "refresh" ) . inc ( ) ;
448448 for ( rt, names) in subs {
449449 let initial_resource_versions = local. get ( rt) . clone ( ) ;
450- self . req_tx
451- . send ( DeltaDiscoveryRequest {
452- node : Some ( Node {
453- id : identifier. to_owned ( ) ,
454- user_agent_name : "quilkin" . into ( ) ,
455- ..Node :: default ( )
456- } ) ,
457- type_url : ( * rt) . to_owned ( ) ,
458- resource_names_subscribe : names. clone ( ) ,
459- initial_resource_versions,
460- // We (currently) never unsubscribe from resources, since we
461- // never actually subscribe to particular ones in the first place
462- resource_names_unsubscribe : Vec :: new ( ) ,
463- response_nonce : "" . into ( ) ,
464- error_detail : None ,
465- } )
466- . await ?;
450+ let refresh_request = DeltaDiscoveryRequest {
451+ node : Some ( Node {
452+ id : identifier. to_owned ( ) ,
453+ user_agent_name : "quilkin" . into ( ) ,
454+ ..Node :: default ( )
455+ } ) ,
456+ type_url : ( * rt) . to_owned ( ) ,
457+ resource_names_subscribe : names. clone ( ) ,
458+ initial_resource_versions,
459+ // We (currently) never unsubscribe from resources, since we
460+ // never actually subscribe to particular ones in the first place
461+ resource_names_unsubscribe : Vec :: new ( ) ,
462+ response_nonce : "" . into ( ) ,
463+ error_detail : None ,
464+ } ;
465+ self . send_request ( refresh_request) ?;
467466 }
468467
469468 Ok ( ( ) )
470469 }
471470
472471 /// Sends an n/ack "request" in response to the remote response
473472 #[ inline]
474- pub ( crate ) fn send_request ( & self , request : DeltaDiscoveryRequest ) -> Result < ( ) > {
473+ pub ( crate ) fn ack_response ( & self , ack_request : DeltaDiscoveryRequest ) -> Result < ( ) > {
474+ // Save 10% or at least 10 spots of the request buffer for refresh requests
475+ if self . req_tx . capacity ( ) < ( REQUEST_BUFFER_SIZE / 10 ) . max ( 10 ) {
476+ Err ( eyre:: eyre!( "request buffer congested, dropping ACK/NACK" ) )
477+ } else {
478+ self . send_request ( ack_request)
479+ }
480+ }
481+
482+ #[ inline]
483+ fn send_request ( & self , request : DeltaDiscoveryRequest ) -> Result < ( ) > {
475484 crate :: metrics:: actions_total ( KIND_CLIENT , "send_request" ) . inc ( ) ;
476485 self . req_tx . try_send ( request) ?;
477486 Ok ( ( ) )
@@ -601,10 +610,7 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
601610 let local = Arc :: new ( crate :: config:: LocalVersions :: new (
602611 resource_subscriptions. iter ( ) . map ( |( s, _) | * s) ,
603612 ) ) ;
604- if let Err ( err) = client
605- . refresh ( & identifier, resource_subscriptions. to_vec ( ) , & local)
606- . await
607- {
613+ if let Err ( err) = client. refresh ( & identifier, resource_subscriptions. to_vec ( ) , & local) {
608614 crate :: metrics:: errors_total ( KIND_CLIENT , "request_failed" ) . inc ( ) ;
609615 tracing:: error!( error = ?err, "failed to send initial resource requests" ) ;
610616 return Err ( err) ;
@@ -642,7 +648,7 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
642648 "unknown" . into ( )
643649 } ;
644650 tracing:: trace!( %node_id, "received delta response" ) ;
645- if let Err ( error) = client. send_request ( ack_request) {
651+ if let Err ( error) = client. ack_response ( ack_request) {
646652 crate :: metrics:: errors_total ( KIND_CLIENT , "ack_failed" ) . inc ( ) ;
647653 tracing:: error!( %error, %node_id, "failed to ack delta response" ) ;
648654 }
@@ -670,12 +676,11 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
670676 }
671677 Err ( _) => {
672678 tracing:: debug!( "exceeded idle request interval sending new requests" ) ;
673- if let Err ( error) = client
674- . refresh ( & identifier, resource_subscriptions. to_vec ( ) , & local)
675- . await
679+ if let Err ( error) =
680+ client. refresh ( & identifier, resource_subscriptions. to_vec ( ) , & local)
676681 {
677682 crate :: metrics:: errors_total ( KIND_CLIENT , "refresh" ) . inc ( ) ;
678- return Err ( error. wrap_err ( "refresh failed" ) ) ;
683+ tracing :: error! ( ?error , "refresh failed" ) ;
679684 }
680685 }
681686 }
@@ -709,9 +714,8 @@ pub async fn delta_subscribe<C: crate::config::Configuration>(
709714 }
710715 }
711716
712- if let Err ( error) = client
713- . refresh ( & identifier, resource_subscriptions. to_vec ( ) , & local)
714- . await
717+ if let Err ( error) =
718+ client. refresh ( & identifier, resource_subscriptions. to_vec ( ) , & local)
715719 {
716720 crate :: metrics:: errors_total ( KIND_CLIENT , "refresh" ) . inc ( ) ;
717721 return Err ( error. wrap_err ( "refresh failed" ) ) ;
0 commit comments