@@ -4,6 +4,7 @@ use chacha20poly1305::{
44 aead:: { generic_array:: GenericArray , AeadInPlace , NewAead } ,
55 ChaChaPoly1305 ,
66} ;
7+ use lazy_static:: lazy_static;
78use pin_project:: pin_project;
89use secrecy:: { ExposeSecret , SecretVec } ;
910use std:: cmp;
@@ -24,6 +25,11 @@ const CHUNK_SIZE: usize = 64 * 1024;
2425const TAG_SIZE : usize = 16 ;
2526const ENCRYPTED_CHUNK_SIZE : usize = CHUNK_SIZE + TAG_SIZE ;
2627
28+ lazy_static ! {
29+ static ref CHUNKS_SIZE : usize = num_cpus:: get( ) * CHUNK_SIZE ;
30+ static ref ENCRYPTED_CHUNKS_SIZE : usize = num_cpus:: get( ) * ENCRYPTED_CHUNK_SIZE ;
31+ }
32+
2733pub ( crate ) struct PayloadKey (
2834 pub ( crate ) GenericArray < u8 , <ChaChaPoly1305 < c2_chacha:: Ietf > as NewAead >:: KeySize > ,
2935) ;
@@ -112,7 +118,7 @@ impl Stream {
112118 StreamWriter {
113119 stream : Self :: new ( key) ,
114120 inner,
115- chunks : Vec :: with_capacity ( CHUNK_SIZE ) ,
121+ chunks : Vec :: with_capacity ( * CHUNKS_SIZE ) ,
116122 #[ cfg( feature = "async" ) ]
117123 encrypted_chunks : None ,
118124 }
@@ -130,7 +136,7 @@ impl Stream {
130136 StreamWriter {
131137 stream : Self :: new ( key) ,
132138 inner,
133- chunks : Vec :: with_capacity ( CHUNK_SIZE ) ,
139+ chunks : Vec :: with_capacity ( * CHUNKS_SIZE ) ,
134140 encrypted_chunks : None ,
135141 }
136142 }
@@ -146,7 +152,7 @@ impl Stream {
146152 StreamReader {
147153 stream : Self :: new ( key) ,
148154 inner,
149- encrypted_chunks : vec ! [ 0 ; ENCRYPTED_CHUNK_SIZE ] ,
155+ encrypted_chunks : vec ! [ 0 ; * ENCRYPTED_CHUNKS_SIZE ] ,
150156 encrypted_pos : 0 ,
151157 start : StartPos :: Implicit ( 0 ) ,
152158 cur_plaintext_pos : 0 ,
@@ -166,7 +172,7 @@ impl Stream {
166172 StreamReader {
167173 stream : Self :: new ( key) ,
168174 inner,
169- encrypted_chunks : vec ! [ 0 ; ENCRYPTED_CHUNK_SIZE ] ,
175+ encrypted_chunks : vec ! [ 0 ; * ENCRYPTED_CHUNKS_SIZE ] ,
170176 encrypted_pos : 0 ,
171177 start : StartPos :: Implicit ( 0 ) ,
172178 cur_plaintext_pos : 0 ,
@@ -282,13 +288,13 @@ impl<W: Write> Write for StreamWriter<W> {
282288 let mut bytes_written = 0 ;
283289
284290 while !buf. is_empty ( ) {
285- let to_write = cmp:: min ( CHUNK_SIZE - self . chunks . len ( ) , buf. len ( ) ) ;
291+ let to_write = cmp:: min ( * CHUNKS_SIZE - self . chunks . len ( ) , buf. len ( ) ) ;
286292 self . chunks . extend_from_slice ( & buf[ ..to_write] ) ;
287293 bytes_written += to_write;
288294 buf = & buf[ to_write..] ;
289295
290- // At this point, either buf is empty, or we have a full chunk .
291- assert ! ( buf. is_empty( ) || self . chunks. len( ) == CHUNK_SIZE ) ;
296+ // At this point, either buf is empty, or we have a full set of chunks .
297+ assert ! ( buf. is_empty( ) || self . chunks. len( ) == * CHUNKS_SIZE ) ;
292298
293299 // Only encrypt the chunk if we have more data to write, as the last
294300 // chunk must be written in finish().
@@ -340,16 +346,16 @@ impl<W: AsyncWrite> AsyncWrite for StreamWriter<W> {
340346 ) -> Poll < io:: Result < usize > > {
341347 ready ! ( self . as_mut( ) . poll_flush_chunk( cx) ) ?;
342348
343- let to_write = cmp:: min ( CHUNK_SIZE - self . chunks . len ( ) , buf. len ( ) ) ;
349+ let to_write = cmp:: min ( * CHUNKS_SIZE - self . chunks . len ( ) , buf. len ( ) ) ;
344350
345351 self . as_mut ( )
346352 . project ( )
347353 . chunks
348354 . extend_from_slice ( & buf[ ..to_write] ) ;
349355 buf = & buf[ to_write..] ;
350356
351- // At this point, either buf is empty, or we have a full chunk .
352- assert ! ( buf. is_empty( ) || self . chunks. len( ) == CHUNK_SIZE ) ;
357+ // At this point, either buf is empty, or we have a full set of chunks .
358+ assert ! ( buf. is_empty( ) || self . chunks. len( ) == * CHUNKS_SIZE ) ;
353359
354360 // Only encrypt the chunk if we have more data to write, as the last
355361 // chunk must be written in poll_close().
@@ -442,7 +448,7 @@ impl<R> StreamReader<R> {
442448 // multiple of the chunk size. In that case, we try decrypting twice on a
443449 // decryption failure.
444450 // TODO: Generalise to multiple chunks.
445- let last = chunks. len ( ) < ENCRYPTED_CHUNK_SIZE ;
451+ let last = chunks. len ( ) < * ENCRYPTED_CHUNKS_SIZE ;
446452
447453 self . chunks = match ( self . stream . decrypt_chunks ( chunks, last) , last) {
448454 ( Ok ( chunk) , _) => Some ( chunk) ,
@@ -462,16 +468,16 @@ impl<R> StreamReader<R> {
462468 return 0 ;
463469 }
464470
465- // TODO: Generalise to multiple chunks.
466- let chunk = self . chunks . as_ref ( ) . unwrap ( ) ;
467- let cur_chunk_offset = self . cur_plaintext_pos as usize % CHUNK_SIZE ;
471+ let chunks = self . chunks . as_ref ( ) . unwrap ( ) ;
472+ let cur_chunks_offset = self . cur_plaintext_pos as usize % * CHUNKS_SIZE ;
468473
469- let to_read = cmp:: min ( chunk . expose_secret ( ) . len ( ) - cur_chunk_offset , buf. len ( ) ) ;
474+ let to_read = cmp:: min ( chunks . expose_secret ( ) . len ( ) - cur_chunks_offset , buf. len ( ) ) ;
470475
471- buf[ ..to_read]
472- . copy_from_slice ( & chunk. expose_secret ( ) [ cur_chunk_offset..cur_chunk_offset + to_read] ) ;
476+ buf[ ..to_read] . copy_from_slice (
477+ & chunks. expose_secret ( ) [ cur_chunks_offset..cur_chunks_offset + to_read] ,
478+ ) ;
473479 self . cur_plaintext_pos += to_read as u64 ;
474- if self . cur_plaintext_pos % CHUNK_SIZE as u64 == 0 {
480+ if self . cur_plaintext_pos % * CHUNKS_SIZE as u64 == 0 {
475481 // We've finished with the current chunks.
476482 self . chunks = None ;
477483 }
@@ -483,7 +489,7 @@ impl<R> StreamReader<R> {
483489impl < R : Read > Read for StreamReader < R > {
484490 fn read ( & mut self , buf : & mut [ u8 ] ) -> io:: Result < usize > {
485491 if self . chunks . is_none ( ) {
486- while self . encrypted_pos < ENCRYPTED_CHUNK_SIZE {
492+ while self . encrypted_pos < * ENCRYPTED_CHUNKS_SIZE {
487493 match self
488494 . inner
489495 . read ( & mut self . encrypted_chunks [ self . encrypted_pos ..] )
@@ -511,7 +517,7 @@ impl<R: AsyncRead + Unpin> AsyncRead for StreamReader<R> {
511517 buf : & mut [ u8 ] ,
512518 ) -> Poll < Result < usize , Error > > {
513519 if self . chunks . is_none ( ) {
514- while self . encrypted_pos < ENCRYPTED_CHUNK_SIZE {
520+ while self . encrypted_pos < * ENCRYPTED_CHUNKS_SIZE {
515521 let this = self . as_mut ( ) . project ( ) ;
516522 match ready ! ( this
517523 . inner
@@ -587,12 +593,10 @@ impl<R: Read + Seek> Seek for StreamReader<R> {
587593 }
588594 } ;
589595
590- // TODO: Generalise to multiple chunks.
591-
592- let cur_chunk_index = self . cur_plaintext_pos / CHUNK_SIZE as u64 ;
596+ let cur_chunk_index = self . cur_plaintext_pos / * CHUNKS_SIZE as u64 ;
593597
594- let target_chunk_index = target_pos / CHUNK_SIZE as u64 ;
595- let target_chunk_offset = target_pos % CHUNK_SIZE as u64 ;
598+ let target_chunk_index = target_pos / * CHUNKS_SIZE as u64 ;
599+ let target_chunk_offset = target_pos % * CHUNKS_SIZE as u64 ;
596600
597601 if target_chunk_index == cur_chunk_index {
598602 // We just need to reposition ourselves within the current chunk.
@@ -603,10 +607,10 @@ impl<R: Read + Seek> Seek for StreamReader<R> {
603607
604608 // Seek to the beginning of the target chunk
605609 self . inner . seek ( SeekFrom :: Start (
606- start + ( target_chunk_index * ENCRYPTED_CHUNK_SIZE as u64 ) ,
610+ start + ( target_chunk_index * * ENCRYPTED_CHUNKS_SIZE as u64 ) ,
607611 ) ) ?;
608612 self . stream . nonce . set_counter ( target_chunk_index) ;
609- self . cur_plaintext_pos = target_chunk_index * CHUNK_SIZE as u64 ;
613+ self . cur_plaintext_pos = target_chunk_index * * CHUNKS_SIZE as u64 ;
610614
611615 // Read and drop bytes from the chunk to reach the target position.
612616 if target_chunk_offset > 0 {
0 commit comments