1414 * See the License for the specific language governing permissions and
1515 * limitations under the License.
1616 */
17- use crate :: StdError ;
17+
18+ pub mod random;
19+
1820use futures_core:: future:: BoxFuture ;
21+ use std:: error:: Error ;
22+ use tokio:: time:: Duration ;
1923use tower:: { discover:: ServiceList , ServiceExt } ;
2024use tower_service:: Service ;
25+ use tracing:: debug;
2126
2227use crate :: {
2328 codegen:: RpcInvocation ,
29+ invocation:: Metadata ,
2430 invoker:: { clone_body:: CloneBody , clone_invoker:: CloneInvoker } ,
31+ loadbalancer:: random:: RandomLoadBalancer ,
2532 param:: Param ,
33+ protocol:: triple:: triple_invoker:: TripleInvoker ,
2634 svc:: NewService ,
35+ StdError ,
2736} ;
2837
29- use crate :: protocol:: triple:: triple_invoker:: TripleInvoker ;
30-
3138pub struct NewLoadBalancer < N > {
3239 inner : N ,
3340}
3441
3542#[ derive( Clone ) ]
36- pub struct LoadBalancer < S > {
43+ pub struct LoadBalancerSvc < S > {
3744 inner : S , // Routes service
3845}
3946
@@ -53,17 +60,17 @@ where
5360 // NewRoutes
5461 N : NewService < T > ,
5562{
56- type Service = LoadBalancer < N :: Service > ;
63+ type Service = LoadBalancerSvc < N :: Service > ;
5764
5865 fn new_service ( & self , target : T ) -> Self :: Service {
5966 // Routes service
6067 let svc = self . inner . new_service ( target) ;
6168
62- LoadBalancer { inner : svc }
69+ LoadBalancerSvc { inner : svc }
6370 }
6471}
6572
66- impl < N > Service < http:: Request < CloneBody > > for LoadBalancer < N >
73+ impl < N > Service < http:: Request < CloneBody > > for LoadBalancerSvc < N >
6774where
6875 // Routes service
6976 N : Service < ( ) , Response = Vec < CloneInvoker < TripleInvoker > > > + Clone ,
@@ -94,18 +101,87 @@ where
94101 Ok ( routes) => routes,
95102 } ;
96103
97- let service_list: Vec < _ > = routes
98- . into_iter ( )
99- . map ( |invoker| tower:: load:: Constant :: new ( invoker, 1 ) )
100- . collect ( ) ;
104+ // let service_list: Vec<_> = routes
105+ // .into_iter()
106+ // // .map(|invoker| tower::load::Constant::new(invoker, 1))
107+ // .collect();
101108
102- let service_list = ServiceList :: new ( service_list) ;
109+ // let rdm = RandomLoadBalancer::default();
110+ let metadata = Metadata :: from_headers ( req. headers ( ) . clone ( ) ) ;
111+ // let invks = rdm.select_invokers(service_list, metadata);
112+ // invks.oneshot(req).await
113+ // let service_list = ServiceList::new(service_list);
103114
104- let p2c = tower:: balance:: p2c:: Balance :: new ( service_list) ;
115+ // let p2c = tower::balance::p2c::Balance::new(service_list);
116+ // let p: Box<dyn LoadBalancer<Invoker = BoxService<http::Request<CloneBody>, http::Response<UnsyncBoxBody<bytes::Bytes, status::Status>>, Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync> = get_loadbalancer("p2c").into();
117+ let p = get_loadbalancer ( "p2c" ) ;
118+ // let ivk = p.select_invokers(invokers, metadata);
119+ let ivk = p. select_invokers ( routes, metadata) ;
105120
106- p2c . oneshot ( req) . await
121+ ivk . oneshot ( req) . await
107122 } ;
108123
109124 Box :: pin ( fut)
110125 }
111126}
127+
128+ type DubboBoxService = tower:: util:: BoxService <
129+ http:: Request < CloneBody > ,
130+ http:: Response < crate :: BoxBody > ,
131+ Box < dyn Error + Send + Sync > ,
132+ > ;
133+
134+ pub trait LoadBalancer {
135+ type Invoker ;
136+
137+ fn select_invokers (
138+ & self ,
139+ invokers : Vec < CloneInvoker < TripleInvoker > > ,
140+ metadata : Metadata ,
141+ ) -> Self :: Invoker ;
142+ }
143+
144+ fn get_loadbalancer (
145+ loadbalancer : & str ,
146+ ) -> Box < dyn LoadBalancer < Invoker = DubboBoxService > + Send + Sync + ' static > {
147+ match loadbalancer {
148+ "random" => {
149+ println ! ( "random!" ) ;
150+ Box :: new ( RandomLoadBalancer :: default ( ) )
151+ }
152+ "p2c" => Box :: new ( P2cBalancer :: default ( ) ) ,
153+ _ => Box :: new ( P2cBalancer :: default ( ) ) ,
154+ }
155+ }
156+ const DEFAULT_RTT : Duration = Duration :: from_millis ( 30 ) ;
157+ #[ derive( Debug , Default ) ]
158+ pub struct P2cBalancer { }
159+
160+ impl LoadBalancer for P2cBalancer {
161+ type Invoker = DubboBoxService ;
162+
163+ fn select_invokers (
164+ & self ,
165+ invokers : Vec < CloneInvoker < TripleInvoker > > ,
166+ _metadata : Metadata ,
167+ ) -> Self :: Invoker {
168+ debug ! ( "p2c load balancer" ) ;
169+ let service_list: Vec < _ > = invokers
170+ . into_iter ( )
171+ . map ( |invoker| tower:: load:: Constant :: new ( invoker, 1 ) )
172+ . collect ( ) ;
173+
174+ let decay = Duration :: from_secs ( 10 ) ;
175+ let service_list = ServiceList :: new ( service_list) ;
176+ let s = tower:: load:: PeakEwmaDiscover :: new (
177+ service_list,
178+ DEFAULT_RTT ,
179+ decay,
180+ tower:: load:: CompleteOnResponse :: default ( ) ,
181+ ) ;
182+
183+ let p = tower:: balance:: p2c:: Balance :: new ( s) ;
184+ let svc = DubboBoxService :: new ( p) ;
185+ svc
186+ }
187+ }
0 commit comments