@@ -57,44 +57,6 @@ use tokio::{
5757} ;
5858use tracing:: { debug, info, warn} ;
5959
60- async fn handle_internal_connection_static (
61- listener_name : String ,
62- connection_pair : crate :: transport:: InternalConnectionPair ,
63- filter_chains : Arc < HashMap < FilterChainMatch , FilterchainType > > ,
64- ) -> Result < ( ) > {
65- use crate :: listeners:: filter_state:: DownstreamConnectionMetadata ;
66-
67- debug ! ( "Handling new internal connection for listener '{}'" , listener_name) ;
68-
69- let downstream_metadata = DownstreamConnectionMetadata :: FromInternal {
70- listener_name : listener_name. clone ( ) ,
71- endpoint_id : connection_pair. downstream . metadata ( ) . endpoint_id . clone ( ) ,
72- } ;
73-
74- let filter_chain = match Listener :: select_filterchain ( & filter_chains, & downstream_metadata, None ) ? {
75- Some ( fc) => fc,
76- None => {
77- warn ! ( "No matching filter chain found for internal connection" ) ;
78- return Err ( crate :: Error :: new ( "No matching filter chain" ) ) ;
79- } ,
80- } ;
81-
82- let _downstream_stream = connection_pair. downstream ;
83-
84- match & filter_chain. handler {
85- crate :: listeners:: filterchain:: ConnectionHandler :: Http ( _http_manager) => {
86- info ! ( "Processing internal connection through HTTP filter chain" ) ;
87- tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
88- Ok ( ( ) )
89- } ,
90- crate :: listeners:: filterchain:: ConnectionHandler :: Tcp ( _tcp_proxy) => {
91- info ! ( "Processing internal connection through TCP filter chain" ) ;
92- tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
93- Ok ( ( ) )
94- } ,
95- }
96- }
97-
9860#[ derive( Debug ) ]
9961struct InternalConnectionWorkerPool {
10062 senders : Vec < mpsc:: UnboundedSender < InternalConnectionTask > > ,
@@ -402,7 +364,7 @@ impl Listener {
402364 maybe_route_update = route_updates_receiver. recv( ) => {
403365 //todo: add context to the error here once orion-error lands
404366 match maybe_route_update {
405- Ok ( route_update) => { Self :: process_route_update( & name, & filter_chains, route_update) } ,
367+ Ok ( route_update) => { Self :: process_route_update( name, & filter_chains, route_update) } ,
406368 Err ( e) => { return e. into( ) ; }
407369 }
408370 } ,
@@ -411,7 +373,7 @@ impl Listener {
411373 Ok ( secret_update) => {
412374 // todo: possibly expensive clone - may need to rethink this structure
413375 let mut filter_chains_clone = filter_chains. as_ref( ) . clone( ) ;
414- Self :: process_secret_update( & name, & mut filter_chains_clone, secret_update) ;
376+ Self :: process_secret_update( name, & mut filter_chains_clone, secret_update) ;
415377 filter_chains = Arc :: new( filter_chains_clone) ;
416378 }
417379 Err ( e) => { return e. into( ) ; }
@@ -436,7 +398,7 @@ impl Listener {
436398 let filter_chains = Arc :: new ( filter_chains) ;
437399 let factory = global_internal_connection_factory ( ) ;
438400
439- let ( _handle, mut connection_receiver, _listener_ref) = match factory. register_listener ( name. to_string ( ) ) . await
401+ let ( _handle, mut connection_receiver, _listener_ref) = match factory. register_listener ( name. to_owned ( ) ) . await
440402 {
441403 Ok ( result) => result,
442404 Err ( e) => {
@@ -450,35 +412,32 @@ impl Listener {
450412 loop {
451413 tokio:: select! {
452414 maybe_connection = connection_receiver. recv( ) => {
453- match maybe_connection {
454- Some ( connection_pair) => {
455- debug!( "Internal listener '{}' received new connection" , name) ;
456-
457- let filter_chains_clone = filter_chains. clone( ) ;
458- let listener_name = name. to_string( ) ;
459-
460- // Use worker pool instead of tokio::spawn for better performance
461- // with large numbers of short connections
462- let task = InternalConnectionTask {
463- listener_name,
464- connection_pair,
465- filter_chains: filter_chains_clone,
466- } ;
467-
468- if let Err ( e) = get_internal_worker_pool( ) . submit_task( task) {
469- warn!( "Failed to submit internal connection task: {}" , e) ;
470- }
471- }
472- None => {
473- warn!( "Internal listener '{}' connection channel closed" , name) ;
474- break ;
415+ if let Some ( connection_pair) = maybe_connection {
416+ debug!( "Internal listener '{}' received new connection" , name) ;
417+
418+ let filter_chains_clone = filter_chains. clone( ) ;
419+ let listener_name = name. to_owned( ) ;
420+
421+ // Use worker pool instead of tokio::spawn for better performance
422+ // with large numbers of short connections
423+ let task = InternalConnectionTask {
424+ listener_name,
425+ connection_pair,
426+ filter_chains: filter_chains_clone,
427+ } ;
428+
429+ if let Err ( e) = get_internal_worker_pool( ) . submit_task( task) {
430+ warn!( "Failed to submit internal connection task: {}" , e) ;
475431 }
432+ } else {
433+ warn!( "Internal listener '{}' connection channel closed" , name) ;
434+ break ;
476435 }
477436 } ,
478437 maybe_route_update = route_updates_receiver. recv( ) => {
479438 match maybe_route_update {
480439 Ok ( route_update) => {
481- Self :: process_route_update( & name, & filter_chains, route_update) ;
440+ Self :: process_route_update( name, & filter_chains, route_update) ;
482441 }
483442 Err ( e) => {
484443 error!( "Route update error for internal listener '{}': {}" , name, e) ;
@@ -490,7 +449,7 @@ impl Listener {
490449 match maybe_secret_update {
491450 Ok ( secret_update) => {
492451 let mut filter_chains_clone = filter_chains. as_ref( ) . clone( ) ;
493- Self :: process_secret_update( & name, & mut filter_chains_clone, secret_update) ;
452+ Self :: process_secret_update( name, & mut filter_chains_clone, secret_update) ;
494453 // TODO: Update the shared filter chains state for active connections
495454 }
496455 Err ( e) => {
@@ -616,8 +575,8 @@ impl Listener {
616575
617576 let ssl = AtomicBool :: new ( false ) ;
618577 defer ! {
619- with_metric!( listeners:: DOWNSTREAM_CX_DESTROY , add, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name. to_string ( ) ) ] ) ;
620- with_metric!( listeners:: DOWNSTREAM_CX_ACTIVE , sub, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name. to_string ( ) ) ] ) ;
578+ with_metric!( listeners:: DOWNSTREAM_CX_DESTROY , add, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name. to_owned ( ) ) ] ) ;
579+ with_metric!( listeners:: DOWNSTREAM_CX_ACTIVE , sub, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name. to_owned ( ) ) ] ) ;
621580 if ssl. load( Ordering :: Relaxed ) {
622581 with_metric!( http:: DOWNSTREAM_CX_SSL_ACTIVE , add, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name) ] ) ;
623582 }
@@ -656,7 +615,7 @@ impl Listener {
656615 add,
657616 1 ,
658617 shard_id,
659- & [ KeyValue :: new( "listener" , listener_name. to_string ( ) ) ]
618+ & [ KeyValue :: new( "listener" , listener_name. to_owned ( ) ) ]
660619 ) ;
661620 with_metric ! (
662621 http:: DOWNSTREAM_CX_SSL_ACTIVE ,
@@ -1014,3 +973,38 @@ filter_chains:
1014973 assert_eq ! ( Listener :: select_filterchain( & m, & metadata, Some ( "hello.world" ) ) . unwrap( ) . copied( ) , Some ( 3 ) ) ;
1015974 }
1016975}
976+
977+ async fn handle_internal_connection_static (
978+ listener_name : String ,
979+ connection_pair : crate :: transport:: InternalConnectionPair ,
980+ filter_chains : Arc < HashMap < FilterChainMatch , FilterchainType > > ,
981+ ) -> Result < ( ) > {
982+ use crate :: listeners:: filter_state:: DownstreamConnectionMetadata ;
983+
984+ debug ! ( "Handling new internal connection for listener '{}'" , listener_name) ;
985+
986+ let downstream_metadata = DownstreamConnectionMetadata :: FromInternal {
987+ listener_name : listener_name. clone ( ) ,
988+ endpoint_id : connection_pair. downstream . metadata ( ) . endpoint_id . clone ( ) ,
989+ } ;
990+
991+ let filter_chain = if let Some ( fc) = Listener :: select_filterchain ( & filter_chains, & downstream_metadata, None ) ? { fc } else {
992+ warn ! ( "No matching filter chain found for internal connection" ) ;
993+ return Err ( crate :: Error :: new ( "No matching filter chain" ) ) ;
994+ } ;
995+
996+ let _downstream_stream = connection_pair. downstream ;
997+
998+ match & filter_chain. handler {
999+ crate :: listeners:: filterchain:: ConnectionHandler :: Http ( _http_manager) => {
1000+ info ! ( "Processing internal connection through HTTP filter chain" ) ;
1001+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
1002+ Ok ( ( ) )
1003+ } ,
1004+ crate :: listeners:: filterchain:: ConnectionHandler :: Tcp ( _tcp_proxy) => {
1005+ info ! ( "Processing internal connection through TCP filter chain" ) ;
1006+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
1007+ Ok ( ( ) )
1008+ } ,
1009+ }
1010+ }
0 commit comments