@@ -20,6 +20,7 @@ use tracing::{trace, warn};
2020
2121use crate :: checksums:: IntegrityError ;
2222use crate :: data_cache:: DataCacheError ;
23+ use crate :: memory:: { BufferKind , PagedPool } ;
2324use crate :: object:: ObjectId ;
2425use crate :: sync:: Mutex ;
2526
@@ -34,6 +35,7 @@ const HASHED_DIR_SPLIT_INDEX: usize = 2;
3435/// On-disk implementation of [DataCache].
3536pub struct DiskDataCache {
3637 config : DiskDataCacheConfig ,
38+ pool : PagedPool ,
3739 /// Tracks blocks usage. `None` when no cache limit was set.
3840 usage : Option < Mutex < UsageInfo < DiskBlockKey > > > ,
3941}
@@ -246,20 +248,22 @@ impl DiskBlock {
246248 }
247249
248250 /// Deserialize an instance from `reader`.
249- fn read ( reader : & mut impl Read , block_size : u64 ) -> Result < Self , DiskBlockReadWriteError > {
251+ fn read ( reader : & mut impl Read , block_size : u64 , pool : & PagedPool ) -> Result < Self , DiskBlockReadWriteError > {
250252 let header: DiskBlockHeader = bincode:: decode_from_std_read ( reader, BINCODE_CONFIG ) ?;
251253
252254 if header. block_len > block_size {
253255 return Err ( DiskBlockReadWriteError :: InvalidBlockLength ( header. block_len ) ) ;
254256 }
255257
256- let mut buffer = vec ! [ 0u8 ; header. block_len as usize ] ;
257- reader. read_exact ( & mut buffer) ?;
258+ let size = header. block_len as usize ;
259+ let data = {
260+ let mut buffer = pool. get_buffer_mut ( size, BufferKind :: DiskCache ) ;
261+ buffer. set_len_uninit ( size) ;
262+ reader. read_exact ( buffer. as_mut ( ) ) ?;
263+ buffer. into_bytes ( )
264+ } ;
258265
259- Ok ( Self {
260- header,
261- data : buffer. into ( ) ,
262- } )
266+ Ok ( Self { header, data } )
263267 }
264268
265269 /// Serialize this instance to `writer` and return the number of bytes written on success.
@@ -305,12 +309,12 @@ impl From<DiskBlockReadWriteError> for DataCacheError {
305309
306310impl DiskDataCache {
307311 /// Create a new instance of an [DiskDataCache] with the specified configuration.
308- pub fn new ( config : DiskDataCacheConfig ) -> Self {
309- let usage = match config. limit {
312+ pub fn new ( config : DiskDataCacheConfig , pool : PagedPool ) -> Self {
313+ let usage = match & config. limit {
310314 CacheLimit :: Unbounded => None ,
311315 CacheLimit :: TotalSize { .. } | CacheLimit :: AvailableSpace { .. } => Some ( Mutex :: new ( UsageInfo :: new ( ) ) ) ,
312316 } ;
313- DiskDataCache { config, usage }
317+ DiskDataCache { config, pool , usage }
314318 }
315319
316320 /// Get the relative path for the given block.
@@ -349,7 +353,7 @@ impl DiskDataCache {
349353 return Err ( DataCacheError :: InvalidBlockContent ) ;
350354 }
351355
352- let block = DiskBlock :: read ( & mut file, self . block_size ( ) )
356+ let block = DiskBlock :: read ( & mut file, self . block_size ( ) , & self . pool )
353357 . inspect_err ( |e| warn ! ( path = ?path. as_ref( ) , "block could not be deserialized: {:?}" , e) ) ?;
354358 let bytes = block
355359 . data ( cache_key, block_idx, block_offset)
@@ -659,11 +663,15 @@ mod tests {
659663 #[ test]
660664 fn get_path_for_block_key ( ) {
661665 let cache_dir = PathBuf :: from ( "mountpoint-cache/" ) ;
662- let data_cache = DiskDataCache :: new ( DiskDataCacheConfig {
663- cache_directory : cache_dir,
664- block_size : 1024 ,
665- limit : CacheLimit :: Unbounded ,
666- } ) ;
666+ let pool = PagedPool :: new ( [ 1024 ] ) ;
667+ let data_cache = DiskDataCache :: new (
668+ DiskDataCacheConfig {
669+ cache_directory : cache_dir,
670+ block_size : 1024 ,
671+ limit : CacheLimit :: Unbounded ,
672+ } ,
673+ pool,
674+ ) ;
667675
668676 let s3_key = "a" . repeat ( 266 ) ;
669677 let etag = ETag :: for_tests ( ) ;
@@ -687,11 +695,15 @@ mod tests {
687695 #[ test]
688696 fn get_path_for_block_key_huge_block_index ( ) {
689697 let cache_dir = PathBuf :: from ( "mountpoint-cache/" ) ;
690- let data_cache = DiskDataCache :: new ( DiskDataCacheConfig {
691- cache_directory : cache_dir,
692- block_size : 1024 ,
693- limit : CacheLimit :: Unbounded ,
694- } ) ;
698+ let pool = PagedPool :: new ( [ 1024 ] ) ;
699+ let data_cache = DiskDataCache :: new (
700+ DiskDataCacheConfig {
701+ cache_directory : cache_dir,
702+ block_size : 1024 ,
703+ limit : CacheLimit :: Unbounded ,
704+ } ,
705+ pool,
706+ ) ;
695707
696708 let s3_key = "a" . repeat ( 266 ) ;
697709 let etag = ETag :: for_tests ( ) ;
@@ -723,11 +735,15 @@ mod tests {
723735
724736 let block_size = 8 * 1024 * 1024 ;
725737 let cache_directory = tempfile:: tempdir ( ) . unwrap ( ) ;
726- let cache = DiskDataCache :: new ( DiskDataCacheConfig {
727- cache_directory : cache_directory. path ( ) . to_path_buf ( ) ,
728- block_size,
729- limit : CacheLimit :: Unbounded ,
730- } ) ;
738+ let pool = PagedPool :: new ( [ block_size as usize ] ) ;
739+ let cache = DiskDataCache :: new (
740+ DiskDataCacheConfig {
741+ cache_directory : cache_directory. path ( ) . to_path_buf ( ) ,
742+ block_size,
743+ limit : CacheLimit :: Unbounded ,
744+ } ,
745+ pool,
746+ ) ;
731747 let cache_key_1 = ObjectId :: new ( "a" . into ( ) , ETag :: for_tests ( ) ) ;
732748 let cache_key_2 = ObjectId :: new (
733749 "long-key_" . repeat ( 100 ) , // at least 900 chars, exceeding easily 255 chars (UNIX filename limit)
@@ -806,11 +822,15 @@ mod tests {
806822 let slice = data. slice ( 1 ..5 ) ;
807823
808824 let cache_directory = tempfile:: tempdir ( ) . unwrap ( ) ;
809- let cache = DiskDataCache :: new ( DiskDataCacheConfig {
810- cache_directory : cache_directory. path ( ) . to_path_buf ( ) ,
811- block_size : 8 * 1024 * 1024 ,
812- limit : CacheLimit :: Unbounded ,
813- } ) ;
825+ let pool = PagedPool :: new ( [ 8 * 1024 * 1024 ] ) ;
826+ let cache = DiskDataCache :: new (
827+ DiskDataCacheConfig {
828+ cache_directory : cache_directory. path ( ) . to_path_buf ( ) ,
829+ block_size : 8 * 1024 * 1024 ,
830+ limit : CacheLimit :: Unbounded ,
831+ } ,
832+ pool,
833+ ) ;
814834 let cache_key = ObjectId :: new ( "a" . into ( ) , ETag :: for_tests ( ) ) ;
815835
816836 cache
@@ -884,11 +904,15 @@ mod tests {
884904 let small_object_key = ObjectId :: new ( "small" . into ( ) , ETag :: for_tests ( ) ) ;
885905
886906 let cache_directory = tempfile:: tempdir ( ) . unwrap ( ) ;
887- let cache = DiskDataCache :: new ( DiskDataCacheConfig {
888- cache_directory : cache_directory. path ( ) . to_path_buf ( ) ,
889- block_size : BLOCK_SIZE as u64 ,
890- limit : CacheLimit :: TotalSize { max_size : CACHE_LIMIT } ,
891- } ) ;
907+ let pool = PagedPool :: new ( [ BLOCK_SIZE ] ) ;
908+ let cache = DiskDataCache :: new (
909+ DiskDataCacheConfig {
910+ cache_directory : cache_directory. path ( ) . to_path_buf ( ) ,
911+ block_size : BLOCK_SIZE as u64 ,
912+ limit : CacheLimit :: TotalSize { max_size : CACHE_LIMIT } ,
913+ } ,
914+ pool,
915+ ) ;
892916
893917 // Put all of large_object
894918 for ( block_idx, bytes) in large_object_blocks. iter ( ) . enumerate ( ) {
@@ -1063,7 +1087,8 @@ mod tests {
10631087 // "Corrupt" the serialized value with an invalid length.
10641088 replace_u64_at ( & mut buf, offset, u64:: MAX ) ;
10651089
1066- let err = DiskBlock :: read ( & mut Cursor :: new ( buf) , MAX_LENGTH ) . expect_err ( "deserialization should fail" ) ;
1090+ let pool = PagedPool :: new ( [ MAX_LENGTH as usize ] ) ;
1091+ let err = DiskBlock :: read ( & mut Cursor :: new ( buf) , MAX_LENGTH , & pool) . expect_err ( "deserialization should fail" ) ;
10671092 match length_to_corrupt {
10681093 "key" | "etag" => assert ! ( matches!(
10691094 err,
@@ -1078,11 +1103,15 @@ mod tests {
10781103 fn test_concurrent_access ( ) {
10791104 let block_size = 1024 * 1024 ;
10801105 let cache_directory = tempfile:: tempdir ( ) . unwrap ( ) ;
1081- let data_cache = DiskDataCache :: new ( DiskDataCacheConfig {
1082- cache_directory : cache_directory. path ( ) . to_path_buf ( ) ,
1083- block_size : block_size as u64 ,
1084- limit : CacheLimit :: Unbounded ,
1085- } ) ;
1106+ let pool = PagedPool :: new ( [ block_size] ) ;
1107+ let data_cache = DiskDataCache :: new (
1108+ DiskDataCacheConfig {
1109+ cache_directory : cache_directory. path ( ) . to_path_buf ( ) ,
1110+ block_size : block_size as u64 ,
1111+ limit : CacheLimit :: Unbounded ,
1112+ } ,
1113+ pool,
1114+ ) ;
10861115 let data_cache = Arc :: new ( data_cache) ;
10871116
10881117 let cache_key = ObjectId :: new ( "foo" . to_owned ( ) , ETag :: for_tests ( ) ) ;
0 commit comments