@@ -20,6 +20,8 @@ mod segment_header;
2020#[ cfg( test) ]
2121mod tests;
2222
23+ pub use segment_header:: MAX_BLOCK_LENGTH ;
24+
2325/// The maximum object length the implementation in this module can reliably handle.
2426///
2527/// Currently objects are limited by the largest block size in the consensus chain, which is 5 MB.
@@ -52,6 +54,9 @@ pub fn max_supported_object_length() -> usize {
5254/// The length of the compact encoding of `max_supported_object_length()`.
5355const MAX_ENCODED_LENGTH_SIZE : usize = 4 ;
5456
57+ /// Used to store the last piece downloaded in an object fetcher batch.
58+ pub type LastPieceCache = ( PieceIndex , Piece ) ;
59+
5560/// Object fetching errors.
5661#[ derive( Debug , PartialEq , Eq , thiserror:: Error ) ]
5762pub enum Error {
@@ -238,11 +243,19 @@ where
238243 /// putting the objects' bytes together.
239244 ///
240245 /// Checks the objects' hashes to make sure the correct bytes are returned.
246+ ///
247+ /// For efficiency, objects in a batch should be sorted by increasing piece index. Objects with
248+ /// the same piece index should be sorted by increasing offset. This allows the last piece to
249+ /// be re-used for the next object in the batch.
250+ ///
251+ /// Batches should be split if the gap between object piece indexes is 6 or more. Those objects
252+ /// can't share any pieces, because a maximum-sized object only uses 6 pieces.
241253 pub async fn fetch_objects (
242254 & self ,
243255 mappings : GlobalObjectMapping ,
244256 ) -> Result < Vec < Vec < u8 > > , Error > {
245257 let mut objects = Vec :: with_capacity ( mappings. objects ( ) . len ( ) ) ;
258+ let mut piece_cache = None ;
246259
247260 // TODO:
248261 // - keep the last downloaded piece until it's no longer needed
@@ -292,7 +305,7 @@ where
292305
293306 // All objects can be assembled from individual pieces, we handle segments by checking
294307 // all possible padding, and parsing and discarding segment headers.
295- let data = self . fetch_object ( mapping) . await ?;
308+ let data = self . fetch_object ( mapping, & mut piece_cache ) . await ?;
296309
297310 objects. push ( data) ;
298311 }
@@ -312,7 +325,11 @@ where
312325 /// or fetch more data.
313326 //
314327 // TODO: return last downloaded piece from fetch_object() and pass them to the next fetch_object()
315- async fn fetch_object ( & self , mapping : GlobalObject ) -> Result < Vec < u8 > , Error > {
328+ async fn fetch_object (
329+ & self ,
330+ mapping : GlobalObject ,
331+ piece_cache : & mut Option < LastPieceCache > ,
332+ ) -> Result < Vec < u8 > , Error > {
316333 let GlobalObject {
317334 piece_index,
318335 offset,
@@ -327,7 +344,9 @@ where
327344
328345 // Get pieces until we have enough data to calculate the object's length(s).
329346 // Objects with their length bytes at the end of a piece are a rare edge case.
330- let piece = self . read_piece ( next_source_piece_index, mapping) . await ?;
347+ let piece = self
348+ . read_piece ( next_source_piece_index, mapping, piece_cache)
349+ . await ?;
331350
332351 // Discard piece data before the offset.
333352 // If this is the first piece in a segment, this automatically skips the segment header.
@@ -367,7 +386,9 @@ where
367386 ) ;
368387
369388 // Get the second piece for the object
370- let piece = self . read_piece ( next_source_piece_index, mapping) . await ?;
389+ let piece = self
390+ . read_piece ( next_source_piece_index, mapping, piece_cache)
391+ . await ?;
371392 // We want all the piece data
372393 let piece_data = piece
373394 . record ( )
@@ -427,7 +448,7 @@ where
427448 // TODO: turn this into a concurrent stream, which cancels piece downloads if they aren't
428449 // needed
429450 let pieces = self
430- . read_pieces ( remaining_piece_indexes. clone ( ) , mapping)
451+ . read_pieces ( remaining_piece_indexes. clone ( ) , mapping, piece_cache )
431452 . await ?
432453 . into_iter ( )
433454 . zip ( remaining_piece_indexes. iter ( ) . copied ( ) )
@@ -476,22 +497,31 @@ where
476497 & self ,
477498 piece_indexes : Arc < [ PieceIndex ] > ,
478499 mapping : GlobalObject ,
500+ piece_cache : & mut Option < LastPieceCache > ,
479501 ) -> Result < Vec < Piece > , Error > {
480- download_pieces ( piece_indexes. clone ( ) , & self . piece_getter )
481- . await
482- . map_err ( |source| {
483- debug ! (
484- ?piece_indexes,
485- error = ?source,
486- ?mapping,
487- "Error fetching pieces during object assembling"
488- ) ;
502+ download_pieces (
503+ piece_indexes. clone ( ) ,
504+ & piece_cache. clone ( ) . with_fallback ( self . piece_getter . clone ( ) ) ,
505+ )
506+ . await
507+ . inspect ( |pieces| {
508+ if let ( Some ( piece_index) , Some ( piece) ) = ( piece_indexes. last ( ) , pieces. last ( ) ) {
509+ * piece_cache = Some ( ( * piece_index, piece. clone ( ) ) )
510+ }
511+ } )
512+ . map_err ( |source| {
513+ debug ! (
514+ ?piece_indexes,
515+ error = ?source,
516+ ?mapping,
517+ "Error fetching pieces during object assembling"
518+ ) ;
489519
490- Error :: PieceGetterError {
491- error : format ! ( "{source:?}" ) ,
492- mapping,
493- }
494- } )
520+ Error :: PieceGetterError {
521+ error : format ! ( "{source:?}" ) ,
522+ mapping,
523+ }
524+ } )
495525 }
496526
497527 /// Read and return a single piece.
@@ -501,28 +531,34 @@ where
501531 & self ,
502532 piece_index : PieceIndex ,
503533 mapping : GlobalObject ,
534+ piece_cache : & mut Option < LastPieceCache > ,
504535 ) -> Result < Piece , Error > {
505- download_pieces ( vec ! [ piece_index] . into ( ) , & self . piece_getter )
506- . await
507- . map ( |pieces| {
508- pieces
509- . first ( )
510- . expect ( "download_pieces always returns exact pieces or error" )
511- . clone ( )
512- } )
513- . map_err ( |source| {
514- debug ! (
515- %piece_index,
516- error = ?source,
517- ?mapping,
518- "Error fetching piece during object assembling"
519- ) ;
536+ let piece_indexes = Arc :: < [ PieceIndex ] > :: from ( vec ! [ piece_index] ) ;
537+ download_pieces (
538+ piece_indexes. clone ( ) ,
539+ & piece_cache. clone ( ) . with_fallback ( self . piece_getter . clone ( ) ) ,
540+ )
541+ . await
542+ . inspect ( |pieces| * piece_cache = Some ( ( piece_index, pieces[ 0 ] . clone ( ) ) ) )
543+ . map ( |pieces| {
544+ pieces
545+ . first ( )
546+ . expect ( "download_pieces always returns exact pieces or error" )
547+ . clone ( )
548+ } )
549+ . map_err ( |source| {
550+ debug ! (
551+ %piece_index,
552+ error = ?source,
553+ ?mapping,
554+ "Error fetching piece during object assembling"
555+ ) ;
520556
521- Error :: PieceGetterError {
522- error : format ! ( "{source:?}" ) ,
523- mapping,
524- }
525- } )
557+ Error :: PieceGetterError {
558+ error : format ! ( "{source:?}" ) ,
559+ mapping,
560+ }
561+ } )
526562 }
527563}
528564
0 commit comments