1
1
use std:: {
2
+ future:: Future ,
2
3
ptr:: { copy, copy_nonoverlapping} ,
3
4
rc:: Rc ,
4
5
sync:: Arc ,
@@ -10,6 +11,7 @@ use monoio::{
10
11
buf:: IoBufMut ,
11
12
io:: { AsyncReadRent , AsyncReadRentExt , AsyncWriteRent , AsyncWriteRentExt , Splitable } ,
12
13
net:: { TcpConnectOpts , TcpStream } ,
14
+ BufResult ,
13
15
} ;
14
16
use monoio_rustls_fork_shadow_tls:: TlsConnector ;
15
17
use rand:: { prelude:: Distribution , seq:: SliceRandom , Rng } ;
@@ -159,11 +161,11 @@ impl ShadowTlsClient {
159
161
v3 : V3Mode ,
160
162
) -> anyhow:: Result < Self > {
161
163
let mut root_store = RootCertStore :: empty ( ) ;
162
- root_store. add_server_trust_anchors ( webpki_roots:: TLS_SERVER_ROOTS . 0 . iter ( ) . map ( |ta| {
164
+ root_store. add_server_trust_anchors ( webpki_roots:: TLS_SERVER_ROOTS . iter ( ) . map ( |ta| {
163
165
OwnedTrustAnchor :: from_subject_spki_name_constraints (
164
- ta. subject ,
165
- ta. spki ,
166
- ta. name_constraints ,
166
+ ta. subject . as_ref ( ) ,
167
+ ta. subject_public_key_info . as_ref ( ) ,
168
+ ta. name_constraints . as_ref ( ) . map ( |n| n . as_ref ( ) ) ,
167
169
)
168
170
} ) ) ;
169
171
// TLS 1.2 and TLS 1.3 is enabled.
@@ -217,12 +219,12 @@ impl ShadowTlsClient {
217
219
}
218
220
219
221
/// Main relay for V2 protocol.
220
- async fn relay_v2 ( & self , mut in_stream : TcpStream ) -> anyhow:: Result < ( ) > {
221
- let ( mut out_stream, hash, session) = self . connect_v2 ( ) . await ?;
222
+ async fn relay_v2 ( & self , in_stream : TcpStream ) -> anyhow:: Result < ( ) > {
223
+ let ( out_stream, hash, session) = self . connect_v2 ( ) . await ?;
222
224
let mut hash_8b = [ 0 ; 8 ] ;
223
225
unsafe { std:: ptr:: copy_nonoverlapping ( hash. as_ptr ( ) , hash_8b. as_mut_ptr ( ) , 8 ) } ;
224
- let ( out_r, mut out_w) = out_stream. split ( ) ;
225
- let ( mut in_r, mut in_w) = in_stream. split ( ) ;
226
+ let ( out_r, mut out_w) = out_stream. into_split ( ) ;
227
+ let ( mut in_r, mut in_w) = in_stream. into_split ( ) ;
226
228
let mut session_filtered_out_r = crate :: helper_v2:: SessionFilterStream :: new ( session, out_r) ;
227
229
let ( a, b) = monoio:: join!(
228
230
copy_without_application_data( & mut session_filtered_out_r, & mut in_w) ,
@@ -469,78 +471,72 @@ impl<S: AsyncReadRent> StreamWrapper<S> {
469
471
}
470
472
471
473
impl < S : AsyncWriteRent > AsyncWriteRent for StreamWrapper < S > {
472
- type WriteFuture < ' a , T > = S :: WriteFuture < ' a , T > where
473
- T : monoio:: buf:: IoBuf + ' a , Self : ' a ;
474
- type WritevFuture < ' a , T > = S :: WritevFuture < ' a , T > where
475
- T : monoio:: buf:: IoVecBuf + ' a , Self : ' a ;
476
- type FlushFuture < ' a > = S :: FlushFuture < ' a > where Self : ' a ;
477
- type ShutdownFuture < ' a > = S :: ShutdownFuture < ' a > where Self : ' a ;
478
-
479
- fn write < T : monoio:: buf:: IoBuf > ( & mut self , buf : T ) -> Self :: WriteFuture < ' _ , T > {
474
+ #[ inline]
475
+ fn write < T : monoio:: buf:: IoBuf > (
476
+ & mut self ,
477
+ buf : T ,
478
+ ) -> impl Future < Output = BufResult < usize , T > > {
480
479
self . raw . write ( buf)
481
480
}
482
- fn writev < T : monoio:: buf:: IoVecBuf > ( & mut self , buf_vec : T ) -> Self :: WritevFuture < ' _ , T > {
481
+ #[ inline]
482
+ fn writev < T : monoio:: buf:: IoVecBuf > (
483
+ & mut self ,
484
+ buf_vec : T ,
485
+ ) -> impl Future < Output = BufResult < usize , T > > {
483
486
self . raw . writev ( buf_vec)
484
487
}
485
- fn flush ( & mut self ) -> Self :: FlushFuture < ' _ > {
488
+ #[ inline]
489
+ fn flush ( & mut self ) -> impl Future < Output = std:: io:: Result < ( ) > > {
486
490
self . raw . flush ( )
487
491
}
488
- fn shutdown ( & mut self ) -> Self :: ShutdownFuture < ' _ > {
492
+ #[ inline]
493
+ fn shutdown ( & mut self ) -> impl Future < Output = std:: io:: Result < ( ) > > {
489
494
self . raw . shutdown ( )
490
495
}
491
496
}
492
497
493
498
impl < S : AsyncReadRent > AsyncReadRent for StreamWrapper < S > {
494
- type ReadFuture < ' a , B > = impl std:: future:: Future < Output = monoio:: BufResult < usize , B > > +' a where
495
- B : monoio:: buf:: IoBufMut + ' a , S : ' a ;
496
- type ReadvFuture < ' a , B > = impl std:: future:: Future < Output = monoio:: BufResult < usize , B > > +' a where
497
- B : monoio:: buf:: IoVecBufMut + ' a , S : ' a ;
498
-
499
499
// uncancelable
500
- fn read < T : monoio:: buf:: IoBufMut > ( & mut self , mut buf : T ) -> Self :: ReadFuture < ' _ , T > {
501
- async move {
502
- loop {
503
- let owned_buf = self . read_buf . as_mut ( ) . unwrap ( ) ;
504
- let data_len = owned_buf. len ( ) - self . read_pos ;
505
- // there is enough data to copy
506
- if data_len > 0 {
507
- let to_copy = buf. bytes_total ( ) . min ( data_len) ;
508
- unsafe {
509
- copy_nonoverlapping (
510
- owned_buf. as_ptr ( ) . add ( self . read_pos ) ,
511
- buf. write_ptr ( ) ,
512
- to_copy,
513
- ) ;
514
- buf. set_init ( to_copy) ;
515
- } ;
516
- self . read_pos += to_copy;
517
- return ( Ok ( to_copy) , buf) ;
518
- }
500
+ async fn read < T : monoio:: buf:: IoBufMut > ( & mut self , mut buf : T ) -> BufResult < usize , T > {
501
+ loop {
502
+ let owned_buf = self . read_buf . as_mut ( ) . unwrap ( ) ;
503
+ let data_len = owned_buf. len ( ) - self . read_pos ;
504
+ // there is enough data to copy
505
+ if data_len > 0 {
506
+ let to_copy = buf. bytes_total ( ) . min ( data_len) ;
507
+ unsafe {
508
+ copy_nonoverlapping (
509
+ owned_buf. as_ptr ( ) . add ( self . read_pos ) ,
510
+ buf. write_ptr ( ) ,
511
+ to_copy,
512
+ ) ;
513
+ buf. set_init ( to_copy) ;
514
+ } ;
515
+ self . read_pos += to_copy;
516
+ return ( Ok ( to_copy) , buf) ;
517
+ }
519
518
520
- // no data now
521
- match self . feed_data ( ) . await {
522
- Ok ( 0 ) => return ( Ok ( 0 ) , buf) ,
523
- Ok ( _) => continue ,
524
- Err ( e) => return ( Err ( e) , buf) ,
525
- }
519
+ // no data now
520
+ match self . feed_data ( ) . await {
521
+ Ok ( 0 ) => return ( Ok ( 0 ) , buf) ,
522
+ Ok ( _) => continue ,
523
+ Err ( e) => return ( Err ( e) , buf) ,
526
524
}
527
525
}
528
526
}
529
527
530
- fn readv < T : monoio:: buf:: IoVecBufMut > ( & mut self , mut buf : T ) -> Self :: ReadvFuture < ' _ , T > {
531
- async move {
532
- let slice = match monoio:: buf:: IoVecWrapperMut :: new ( buf) {
533
- Ok ( slice) => slice,
534
- Err ( buf) => return ( Ok ( 0 ) , buf) ,
535
- } ;
536
-
537
- let ( result, slice) = self . read ( slice) . await ;
538
- buf = slice. into_inner ( ) ;
539
- if let Ok ( n) = result {
540
- unsafe { buf. set_init ( n) } ;
541
- }
542
- ( result, buf)
528
+ async fn readv < T : monoio:: buf:: IoVecBufMut > ( & mut self , mut buf : T ) -> BufResult < usize , T > {
529
+ let slice = match monoio:: buf:: IoVecWrapperMut :: new ( buf) {
530
+ Ok ( slice) => slice,
531
+ Err ( buf) => return ( Ok ( 0 ) , buf) ,
532
+ } ;
533
+
534
+ let ( result, slice) = self . read ( slice) . await ;
535
+ buf = slice. into_inner ( ) ;
536
+ if let Ok ( n) = result {
537
+ unsafe { buf. set_init ( n) } ;
543
538
}
539
+ ( result, buf)
544
540
}
545
541
}
546
542
0 commit comments