@@ -99,6 +99,48 @@ impl TlsIdentity {
9999 }
100100}
101101
102+ const RESPONSE_PROPAGATION_INTERVAL : Duration = Duration :: from_millis ( 100 ) ;
103+
104+ /// Buffers response broadcasts so they can be propagated at a set interval instead of every time
105+ /// something has changed.
106+ #[ derive( Debug , Default ) ]
107+ struct ResponseBroadcastPropagationBuffer {
108+ changed_resources : std:: sync:: Mutex < std:: collections:: HashSet < String > > ,
109+ }
110+
111+ impl ResponseBroadcastPropagationBuffer {
112+ /// Ingest a resource broadcast message
113+ fn ingest (
114+ & self ,
115+ result : Result < & str , tokio:: sync:: broadcast:: error:: RecvError > ,
116+ ) -> Result < ( ) , tokio:: sync:: broadcast:: error:: RecvError > {
117+ result. map ( |resource| {
118+ let mut guard = match self . changed_resources . lock ( ) {
119+ Ok ( guard) => guard,
120+ Err ( poisoned) => {
121+ let guard = poisoned. into_inner ( ) ;
122+ tracing:: warn!( "recovered from poisoned mutex" ) ;
123+ guard
124+ }
125+ } ;
126+ guard. insert ( resource. into ( ) ) ;
127+ } )
128+ }
129+
130+ /// Flush all changed resources and reset the buffer
131+ fn flush ( & self ) -> Vec < String > {
132+ let mut guard = match self . changed_resources . lock ( ) {
133+ Ok ( guard) => guard,
134+ Err ( poisoned) => {
135+ let guard = poisoned. into_inner ( ) ;
136+ tracing:: warn!( "recovered from poisoned mutex" ) ;
137+ guard
138+ }
139+ } ;
140+ guard. drain ( ) . collect ( )
141+ }
142+ }
143+
102144const VERSION_INFO : & str = "9" ;
103145
104146pub struct ControlPlane < C > {
@@ -371,38 +413,45 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
371413 let stream = async_stream:: try_stream! {
372414 yield response;
373415
416+ // Buffer changes so we only propagate at a set and controlled interval. This reduces
417+ // the network load when we have a high rate of change due to high cluster load.
418+ let buffer = ResponseBroadcastPropagationBuffer :: default ( ) ;
419+ let mut lag_amount: u64 = 0 ;
420+ let mut propagation_interval = tokio:: time:: interval( RESPONSE_PROPAGATION_INTERVAL ) ;
421+
374422 loop {
375423 tokio:: select! {
376- // The resource(s) have changed, inform the connected client , but only
424+ // Inform the connected client if any of the resources have changed , but only
377425 // send the changed resources that the client doesn't already have
378- res = rx. recv( ) => {
379- match res {
380- Ok ( rt) => {
381- match responder( None , rt, & mut client_tracker) {
382- Ok ( Some ( res) ) => yield res,
383- Ok ( None ) => { }
384- Err ( error) => {
385- crate :: metrics:: errors_total( KIND_SERVER , "respond" ) . inc( ) ;
386- tracing:: error!( %error, "responder failed to generate response" ) ;
387- continue ;
388- } ,
426+ _ = propagation_interval. tick( ) => {
427+ // Fetch the changed resources
428+ let mut resources = buffer. flush( ) ;
429+ // If we've been lagging on updates, collect everything instead
430+ if lag_amount > 0 {
431+ tracing:: warn!( lag_amount, "lagged while receiving response broadcasts" ) ;
432+ resources = client_tracker. tracked_resources( ) . collect( ) ;
433+ }
434+ lag_amount = 0 ;
435+ for rt in resources {
436+ match responder( None , & rt, & mut client_tracker) {
437+ Ok ( Some ( res) ) => yield res,
438+ Ok ( None ) => { } ,
439+ Err ( error) => {
440+ crate :: metrics:: errors_total( KIND_SERVER , "respond" ) . inc( ) ;
441+ tracing:: error!( %error, "responder failed to generate response" ) ;
442+ continue ;
389443 }
390444 }
445+ }
446+ }
447+ // A resource has changed, buffer it for propagation
448+ res = rx. recv( ) => {
449+ match buffer. ingest( res) {
450+ Ok ( _) => { } ,
391451 Err ( tokio:: sync:: broadcast:: error:: RecvError :: Closed ) => break ,
392- Err ( tokio:: sync:: broadcast:: error:: RecvError :: Lagged ( _) ) => {
393- let tracked_resources: Vec <_> = client_tracker. tracked_resources( ) . collect( ) ;
394- for rt in tracked_resources {
395- match responder( None , & rt, & mut client_tracker) {
396- Ok ( Some ( res) ) => yield res,
397- Ok ( None ) => { } ,
398- Err ( error) => {
399- crate :: metrics:: errors_total( KIND_SERVER , "respond" ) . inc( ) ;
400- tracing:: error!( %error, "responder failed to generate response" ) ;
401- continue ;
402- }
403- }
404- }
405- }
452+ Err ( tokio:: sync:: broadcast:: error:: RecvError :: Lagged ( amount) ) => {
453+ lag_amount += amount;
454+ } ,
406455 }
407456 }
408457 client_request = streaming. next( ) => {
@@ -797,36 +846,45 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
797846 let stream = async_stream:: try_stream! {
798847 yield response;
799848
849+ // Buffer changes so we only propagate at a set and controlled interval. This reduces
850+ // the network load when we have a high rate of change due to high cluster load.
851+ let buffer = ResponseBroadcastPropagationBuffer :: default ( ) ;
852+ let mut lag_amount: u64 = 0 ;
853+ let mut propagation_interval = tokio:: time:: interval( RESPONSE_PROPAGATION_INTERVAL ) ;
854+
800855 loop {
801856 tokio:: select! {
802- // The resource(s) have changed, inform the connected client , but only
857+ // Inform the connected client if any of the resources have changed , but only
803858 // send the changed resources that the client doesn't already have
804- res = rx. recv( ) => {
805- match res {
806- Ok ( rt) => {
807- match responder( None , rt, & mut client_tracker) {
808- Ok ( Some ( res) ) => yield res,
809- Ok ( None ) => { }
810- Err ( error) => {
811- tracing:: error!( %error, "responder failed to generate response" ) ;
812- continue ;
813- } ,
859+ _ = propagation_interval. tick( ) => {
860+ // Fetch the changed resources
861+ let mut resources = buffer. flush( ) ;
862+ // If we've been lagging on updates, collect everything instead
863+ if lag_amount > 0 {
864+ tracing:: warn!( lag_amount, "lagged while receiving response broadcasts" ) ;
865+ resources = client_tracker. tracked_resources( ) . collect( ) ;
866+ }
867+ lag_amount = 0 ;
868+ for rt in resources {
869+ match responder( None , & rt, & mut client_tracker) {
870+ Ok ( Some ( res) ) => yield res,
871+ Ok ( None ) => { } ,
872+ Err ( error) => {
873+ crate :: metrics:: errors_total( KIND_SERVER , "respond" ) . inc( ) ;
874+ tracing:: error!( %error, "responder failed to generate response" ) ;
875+ continue ;
814876 }
815877 }
878+ }
879+ }
880+ // A resource has changed, buffer it for propagation
881+ res = rx. recv( ) => {
882+ match buffer. ingest( res) {
883+ Ok ( _) => { } ,
816884 Err ( tokio:: sync:: broadcast:: error:: RecvError :: Closed ) => break ,
817- Err ( tokio:: sync:: broadcast:: error:: RecvError :: Lagged ( _) ) => {
818- let tracked_resources: Vec <_> = client_tracker. tracked_resources( ) . collect( ) ;
819- for rt in tracked_resources {
820- match responder( None , & rt, & mut client_tracker) {
821- Ok ( Some ( res) ) => yield res,
822- Ok ( None ) => { } ,
823- Err ( error) => {
824- tracing:: error!( %error, "responder failed to generate response" ) ;
825- continue ;
826- }
827- }
828- }
829- }
885+ Err ( tokio:: sync:: broadcast:: error:: RecvError :: Lagged ( amount) ) => {
886+ lag_amount += amount;
887+ } ,
830888 }
831889 }
832890 client_request = requests. next( ) => {
0 commit comments