@@ -60,6 +60,7 @@ impl ChannelContext {
60
60
up_traffic_meter,
61
61
down_traffic_meter,
62
62
default_interface,
63
+ default_route_key : AtomicCell :: default ( ) ,
63
64
} ;
64
65
Self {
65
66
inner : Arc :: new ( inner) ,
@@ -86,7 +87,7 @@ pub struct ContextInner {
86
87
// 对称网络增加的udp socket
87
88
sub_udp_socket : RwLock < Vec < UdpSocket > > ,
88
89
// tcp数据发送器
89
- pub ( crate ) packet_map : RwLock < FnvHashMap < SocketAddr , PacketSender > > ,
90
+ pub ( crate ) packet_map : RwLock < FnvHashMap < RouteKey , PacketSender > > ,
90
91
// 路由信息
91
92
pub route_table : RouteTable ,
92
93
// 使用什么协议连接服务器
@@ -98,6 +99,7 @@ pub struct ContextInner {
98
99
pub ( crate ) up_traffic_meter : Option < TrafficMeterMultiAddress > ,
99
100
pub ( crate ) down_traffic_meter : Option < TrafficMeterMultiAddress > ,
100
101
default_interface : LocalInterface ,
102
+ default_route_key : AtomicCell < Option < RouteKey > > ,
101
103
}
102
104
103
105
impl ContextInner {
@@ -107,6 +109,9 @@ impl ContextInner {
107
109
pub fn default_interface ( & self ) -> & LocalInterface {
108
110
& self . default_interface
109
111
}
112
+ pub fn set_default_route_key ( & self , route_key : RouteKey ) {
113
+ self . default_route_key . store ( Some ( route_key) ) ;
114
+ }
110
115
/// 通过sub_udp_socket是否为空来判断是否为锥形网络
111
116
pub fn is_cone ( & self ) -> bool {
112
117
self . sub_udp_socket . read ( ) . is_empty ( )
@@ -175,11 +180,14 @@ impl ContextInner {
175
180
}
176
181
Ok ( ports)
177
182
}
178
- pub fn send_tcp ( & self , buf : & [ u8 ] , addr : SocketAddr ) -> io:: Result < ( ) > {
179
- if let Some ( tcp) = self . packet_map . read ( ) . get ( & addr ) {
183
+ pub fn send_tcp ( & self , buf : & [ u8 ] , route_key : & RouteKey ) -> io:: Result < ( ) > {
184
+ if let Some ( tcp) = self . packet_map . read ( ) . get ( route_key ) {
180
185
tcp. try_send ( buf)
181
186
} else {
182
- Err ( io:: Error :: from ( io:: ErrorKind :: NotFound ) )
187
+ Err ( io:: Error :: new (
188
+ io:: ErrorKind :: NotFound ,
189
+ format ! ( "dest={:?}" , route_key) ,
190
+ ) )
183
191
}
184
192
}
185
193
pub fn send_main_udp ( & self , index : usize , buf : & [ u8 ] , addr : SocketAddr ) -> io:: Result < ( ) > {
@@ -203,7 +211,14 @@ impl ContextInner {
203
211
self . send_main_udp ( self . v4_len , buf. buffer ( ) , addr) ?
204
212
}
205
213
} else {
206
- self . send_tcp ( buf. buffer ( ) , addr) ?
214
+ if let Some ( key) = self . default_route_key . load ( ) {
215
+ self . send_tcp ( buf. buffer ( ) , & key) ?
216
+ } else {
217
+ return Err ( io:: Error :: new (
218
+ io:: ErrorKind :: NotFound ,
219
+ format ! ( "dest={:?}" , addr) ,
220
+ ) ) ;
221
+ }
207
222
}
208
223
if let Some ( up_traffic_meter) = & self . up_traffic_meter {
209
224
up_traffic_meter. add_traffic ( buf. destination ( ) , buf. data_len ( ) ) ;
@@ -300,7 +315,7 @@ impl ContextInner {
300
315
}
301
316
}
302
317
ConnectProtocol :: TCP | ConnectProtocol :: WS | ConnectProtocol :: WSS => {
303
- self . send_tcp ( buf. buffer ( ) , route_key. addr ) ?
318
+ self . send_tcp ( buf. buffer ( ) , & route_key) ?
304
319
}
305
320
}
306
321
if let Some ( up_traffic_meter) = & self . up_traffic_meter {
@@ -376,19 +391,11 @@ impl RouteTable {
376
391
let key = route. route_key ( ) ;
377
392
if only_if_absent {
378
393
if let Some ( ( _, list) ) = self . route_table . read ( ) . get ( & id) {
379
- let mut p2p_num = 0 ;
380
394
for ( x, _) in list {
381
- if x. is_p2p ( ) {
382
- p2p_num += 1 ;
383
- }
384
395
if x. route_key ( ) == key {
385
396
return true ;
386
397
}
387
398
}
388
- if !self . first_latency && p2p_num >= self . channel_num {
389
- // 非优先延迟的情况下,通道满了则不用再添加
390
- return false ;
391
- }
392
399
}
393
400
}
394
401
let mut route_table = self . route_table . write ( ) ;
@@ -413,61 +420,42 @@ impl RouteTable {
413
420
}
414
421
}
415
422
if exist {
416
- // 这个排序还有待优化,因为后加入的大概率排最后,被直接淘汰的概率也大,可能导致更好的通道被移除了
417
423
list. sort_by_key ( |( k, _) | k. rt ) ;
418
- //如果延迟都稳定了,则去除多余通道
419
- for ( route, _) in list. iter ( ) {
420
- if route. rt == DEFAULT_RT {
421
- return true ;
422
- }
423
- }
424
- //延迟优先模式需要更多的通道探测延迟最低的路线
425
- let limit_len = if self . first_latency {
426
- self . channel_num + 2
427
- } else {
428
- self . channel_num
429
- } ;
430
- self . truncate_ ( list, limit_len) ;
431
424
} else {
432
425
if !self . first_latency {
433
426
if route. is_p2p ( ) {
434
427
//非优先延迟的情况下 添加了直连的则排除非直连的
435
428
list. retain ( |( k, _) | k. is_p2p ( ) ) ;
436
429
}
437
- if self . channel_num <= list. len ( ) {
438
- return false ;
439
- }
440
430
} ;
441
- //增加路由表容量,避免波动
442
- let limit_len = self . channel_num * 2 ;
443
431
list. sort_by_key ( |( k, _) | k. rt ) ;
444
- self . truncate_ ( list, limit_len) ;
445
432
list. push ( ( route, AtomicCell :: new ( Instant :: now ( ) ) ) ) ;
446
433
}
447
434
return true ;
448
435
}
449
- fn truncate_ ( & self , list : & mut Vec < ( Route , AtomicCell < Instant > ) > , len : usize ) {
450
- if list. len ( ) <= len {
451
- return ;
452
- }
453
- if self . first_latency {
454
- //找到第一个p2p通道
455
- if let Some ( index) =
456
- list. iter ( )
457
- . enumerate ( )
458
- . find_map ( |( index, ( route, _) ) | if route. is_p2p ( ) { Some ( index) } else { None } )
459
- {
460
- if index >= len {
461
- //保留第一个p2p通道
462
- let route = list. remove ( index) ;
463
- list. truncate ( len - 1 ) ;
464
- list. push ( route) ;
465
- return ;
466
- }
467
- }
468
- }
469
- list. truncate ( len) ;
470
- }
436
+ // 直接移除会导致通道不稳定,所以废弃这个方法,后面改用多余通道不发心跳包,从而让通道自动过期
437
+ // fn truncate_(&self, list: &mut Vec<(Route, AtomicCell<Instant>)>, len: usize) {
438
+ // if list.len() <= len {
439
+ // return;
440
+ // }
441
+ // if self.first_latency {
442
+ // //找到第一个p2p通道
443
+ // if let Some(index) =
444
+ // list.iter()
445
+ // .enumerate()
446
+ // .find_map(|(index, (route, _))| if route.is_p2p() { Some(index) } else { None })
447
+ // {
448
+ // if index >= len {
449
+ // //保留第一个p2p通道
450
+ // let route = list.remove(index);
451
+ // list.truncate(len - 1);
452
+ // list.push(route);
453
+ // return;
454
+ // }
455
+ // }
456
+ // }
457
+ // list.truncate(len);
458
+ // }
471
459
pub fn route ( & self , id : & Ipv4Addr ) -> Option < Vec < Route > > {
472
460
if let Some ( ( _, v) ) = self . route_table . read ( ) . get ( id) {
473
461
Some ( v. iter ( ) . map ( |( i, _) | * i) . collect ( ) )
0 commit comments