Skip to content

Commit f7a4502

Browse files
committed
Use pool in cache and uploader and fix memory limiter
Signed-off-by: Alessandro Passaro <[email protected]>
1 parent 48ae275 commit f7a4502

27 files changed

+347
-187
lines changed

mountpoint-s3-fs/benches/cache_serialization.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use criterion::{BenchmarkGroup, Criterion, criterion_group, criterion_main};
88

99
use mountpoint_s3_client::types::ETag;
1010
use mountpoint_s3_fs::data_cache::{ChecksummedBytes, DataCache, DiskDataCache, DiskDataCacheConfig};
11+
use mountpoint_s3_fs::memory::PagedPool;
1112
use mountpoint_s3_fs::object::ObjectId;
1213
use rand::RngCore;
1314
use tempfile::TempDir;
@@ -39,7 +40,8 @@ fn cache_read_benchmark(group: &mut BenchmarkGroup<'_, WallTime>, dir_path: &Pat
3940
block_size: BLOCK_SIZE,
4041
limit: mountpoint_s3_fs::data_cache::CacheLimit::Unbounded,
4142
};
42-
let cache = DiskDataCache::new(config);
43+
let pool = PagedPool::new([BLOCK_SIZE as usize]);
44+
let cache = DiskDataCache::new(config, pool);
4345
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
4446
let bytes = ChecksummedBytes::new(data.to_owned().into());
4547

mountpoint-s3-fs/examples/fs_benchmark.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use fuser::{BackgroundSession, MountOption, Session};
88
use mountpoint_s3_client::S3CrtClient;
99
use mountpoint_s3_client::config::{EndpointConfig, RustLogAdapter, S3ClientConfig};
1010
use mountpoint_s3_fs::fuse::S3FuseFilesystem;
11+
use mountpoint_s3_fs::memory::PagedPool;
1112
use mountpoint_s3_fs::prefetch::Prefetcher;
1213
use mountpoint_s3_fs::{Runtime, S3Filesystem, S3FilesystemConfig};
1314
use tempfile::tempdir;
@@ -139,11 +140,13 @@ fn mount_file_system(
139140
region: &str,
140141
throughput_target_gbps: Option<f64>,
141142
) -> BackgroundSession {
143+
let pool = PagedPool::new([8 * 1024 * 1024]);
142144
let mut config = S3ClientConfig::new().endpoint_config(EndpointConfig::new(region));
143145
let initial_read_window_size = 1024 * 1024 + 128 * 1024;
144146
config = config
145147
.read_backpressure(true)
146-
.initial_read_window(initial_read_window_size);
148+
.initial_read_window(initial_read_window_size)
149+
.memory_pool(pool.clone());
147150
if let Some(throughput_target_gbps) = throughput_target_gbps {
148151
config = config.throughput_target_gbps(throughput_target_gbps);
149152
}
@@ -164,6 +167,7 @@ fn mount_file_system(
164167
let fs = S3Filesystem::new(
165168
client,
166169
prefetcher_builder,
170+
pool,
167171
runtime,
168172
bucket_name,
169173
&Default::default(),

mountpoint-s3-fs/examples/mount_from_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ fn mount_filesystem(
243243

244244
// Create and run the FUSE session
245245
let fuse_session = mp_config
246-
.create_fuse_session(s3_path, client, runtime)
246+
.create_fuse_session(s3_path, client, runtime, pool)
247247
.context("Failed to create FUSE session")?;
248248

249249
Ok(fuse_session)

mountpoint-s3-fs/examples/prefetch_benchmark.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use mountpoint_s3_client::types::HeadObjectParams;
1313
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
1414
use mountpoint_s3_fs::Runtime;
1515
use mountpoint_s3_fs::mem_limiter::MemoryLimiter;
16+
use mountpoint_s3_fs::memory::PagedPool;
1617
use mountpoint_s3_fs::object::ObjectId;
1718
use mountpoint_s3_fs::prefetch::{PrefetchGetObject, Prefetcher, PrefetcherConfig};
1819
use serde_json::{json, to_writer};
@@ -164,9 +165,10 @@ fn main() -> anyhow::Result<()> {
164165
let args = CliArgs::parse();
165166

166167
let bucket = args.bucket.as_str();
167-
let client_config = args.s3_client_config();
168+
let pool = PagedPool::new([args.part_size.unwrap_or(8 * 1024 * 1024) as usize]);
169+
let client_config = args.s3_client_config().memory_pool(pool.clone());
168170
let client = S3CrtClient::new(client_config).context("failed to create S3 CRT client")?;
169-
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), args.memory_target()));
171+
let mem_limiter = Arc::new(MemoryLimiter::new(pool, args.memory_target()));
170172
let runtime = Runtime::new(client.event_loop_group());
171173

172174
// Verify if all objects exist and collect metadata

mountpoint-s3-fs/examples/upload_benchmark.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use mountpoint_s3_client::config::{Allocator, EndpointConfig, RustLogAdapter, S3
66
use mountpoint_s3_client::types::ChecksumAlgorithm;
77
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
88
use mountpoint_s3_fs::mem_limiter::MemoryLimiter;
9+
use mountpoint_s3_fs::memory::PagedPool;
910
use mountpoint_s3_fs::upload::{Uploader, UploaderConfig};
1011
use mountpoint_s3_fs::{Runtime, ServerSideEncryption};
1112
use sysinfo::{RefreshKind, System};
@@ -100,10 +101,12 @@ fn main() {
100101
let endpoint_uri = Uri::new_from_str(&Allocator::default(), url).expect("Failed to parse endpoint URL");
101102
endpoint_config = endpoint_config.endpoint(endpoint_uri);
102103
}
104+
let pool = PagedPool::new([args.write_part_size]);
103105
let mut config = S3ClientConfig::new()
104106
.endpoint_config(endpoint_config)
105107
.throughput_target_gbps(args.throughput_target_gbps as f64)
106-
.write_part_size(args.write_part_size);
108+
.write_part_size(args.write_part_size)
109+
.memory_pool(pool.clone());
107110
if let Some(crt_mem_limit_gib) = args.crt_memory_limit_gib {
108111
config = config.memory_limit_in_bytes(crt_mem_limit_gib * 1024 * 1024 * 1024);
109112
}
@@ -118,7 +121,7 @@ fn main() {
118121
let sys = System::new_with_specifics(RefreshKind::everything());
119122
(sys.total_memory() as f64 * 0.95) as u64
120123
};
121-
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target));
124+
let mem_limiter = Arc::new(MemoryLimiter::new(pool.clone(), max_memory_target));
122125

123126
let buffer_size = args.write_part_size;
124127
let server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());
@@ -134,6 +137,7 @@ fn main() {
134137
let uploader = Uploader::new(
135138
client.clone(),
136139
runtime.clone(),
140+
pool.clone(),
137141
mem_limiter,
138142
UploaderConfig::new(buffer_size)
139143
.server_side_encryption(server_side_encryption)

mountpoint-s3-fs/src/config.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::data_cache::{DataCacheConfig, DiskDataCache, ExpressDataCache, Multil
66
use crate::fuse::config::FuseSessionConfig;
77
use crate::fuse::session::FuseSession;
88
use crate::fuse::{ErrorLogger, S3FuseFilesystem};
9+
use crate::memory::PagedPool;
910
use crate::prefetch::{Prefetcher, PrefetcherBuilder};
1011
use crate::s3::config::S3Path;
1112
use crate::sync::Arc;
@@ -46,15 +47,18 @@ impl MountpointConfig {
4647
s3_path: S3Path,
4748
client: Client,
4849
runtime: Runtime,
50+
memory_pool: PagedPool,
4951
) -> anyhow::Result<FuseSession>
5052
where
5153
Client: ObjectClient + Clone + Send + Sync + 'static,
5254
{
53-
let prefetcher_builder = create_prefetcher_builder(self.data_cache_config, &client, &runtime)?;
55+
let prefetcher_builder =
56+
create_prefetcher_builder(self.data_cache_config, &client, &runtime, memory_pool.clone())?;
5457
tracing::trace!(filesystem_config=?self.filesystem_config, "creating file system");
5558
let fs = S3Filesystem::new(
5659
client,
5760
prefetcher_builder,
61+
memory_pool,
5862
runtime,
5963
&s3_path.bucket_name,
6064
&s3_path.prefix,
@@ -72,11 +76,14 @@ fn create_prefetcher_builder<Client>(
7276
data_cache_config: DataCacheConfig,
7377
client: &Client,
7478
runtime: &Runtime,
79+
memory_pool: PagedPool,
7580
) -> anyhow::Result<PrefetcherBuilder<Client>>
7681
where
7782
Client: ObjectClient + Clone + Send + Sync + 'static,
7883
{
79-
let disk_cache = data_cache_config.disk_cache_config.map(DiskDataCache::new);
84+
let disk_cache = data_cache_config
85+
.disk_cache_config
86+
.map(|config| DiskDataCache::new(config, memory_pool));
8087
let express_cache = match data_cache_config.express_cache_config {
8188
None => None,
8289
Some(config) => {

mountpoint-s3-fs/src/data_cache/disk_data_cache.rs

Lines changed: 71 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use tracing::{trace, warn};
2020

2121
use crate::checksums::IntegrityError;
2222
use crate::data_cache::DataCacheError;
23+
use crate::memory::{BufferKind, PagedPool};
2324
use crate::object::ObjectId;
2425
use crate::sync::Mutex;
2526

@@ -34,6 +35,7 @@ const HASHED_DIR_SPLIT_INDEX: usize = 2;
3435
/// On-disk implementation of [DataCache].
3536
pub struct DiskDataCache {
3637
config: DiskDataCacheConfig,
38+
pool: PagedPool,
3739
/// Tracks blocks usage. `None` when no cache limit was set.
3840
usage: Option<Mutex<UsageInfo<DiskBlockKey>>>,
3941
}
@@ -246,20 +248,22 @@ impl DiskBlock {
246248
}
247249

248250
/// Deserialize an instance from `reader`.
249-
fn read(reader: &mut impl Read, block_size: u64) -> Result<Self, DiskBlockReadWriteError> {
251+
fn read(reader: &mut impl Read, block_size: u64, pool: &PagedPool) -> Result<Self, DiskBlockReadWriteError> {
250252
let header: DiskBlockHeader = bincode::decode_from_std_read(reader, BINCODE_CONFIG)?;
251253

252254
if header.block_len > block_size {
253255
return Err(DiskBlockReadWriteError::InvalidBlockLength(header.block_len));
254256
}
255257

256-
let mut buffer = vec![0u8; header.block_len as usize];
257-
reader.read_exact(&mut buffer)?;
258+
let size = header.block_len as usize;
259+
let data = {
260+
let mut buffer = pool.get_buffer_mut(size, BufferKind::DiskCache);
261+
buffer.set_len_uninit(size);
262+
reader.read_exact(buffer.as_mut())?;
263+
buffer.into_bytes()
264+
};
258265

259-
Ok(Self {
260-
header,
261-
data: buffer.into(),
262-
})
266+
Ok(Self { header, data })
263267
}
264268

265269
/// Serialize this instance to `writer` and return the number of bytes written on success.
@@ -305,12 +309,12 @@ impl From<DiskBlockReadWriteError> for DataCacheError {
305309

306310
impl DiskDataCache {
307311
/// Create a new instance of an [DiskDataCache] with the specified configuration.
308-
pub fn new(config: DiskDataCacheConfig) -> Self {
309-
let usage = match config.limit {
312+
pub fn new(config: DiskDataCacheConfig, pool: PagedPool) -> Self {
313+
let usage = match &config.limit {
310314
CacheLimit::Unbounded => None,
311315
CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
312316
};
313-
DiskDataCache { config, usage }
317+
DiskDataCache { config, pool, usage }
314318
}
315319

316320
/// Get the relative path for the given block.
@@ -349,7 +353,7 @@ impl DiskDataCache {
349353
return Err(DataCacheError::InvalidBlockContent);
350354
}
351355

352-
let block = DiskBlock::read(&mut file, self.block_size())
356+
let block = DiskBlock::read(&mut file, self.block_size(), &self.pool)
353357
.inspect_err(|e| warn!(path = ?path.as_ref(), "block could not be deserialized: {:?}", e))?;
354358
let bytes = block
355359
.data(cache_key, block_idx, block_offset)
@@ -659,11 +663,15 @@ mod tests {
659663
#[test]
660664
fn get_path_for_block_key() {
661665
let cache_dir = PathBuf::from("mountpoint-cache/");
662-
let data_cache = DiskDataCache::new(DiskDataCacheConfig {
663-
cache_directory: cache_dir,
664-
block_size: 1024,
665-
limit: CacheLimit::Unbounded,
666-
});
666+
let pool = PagedPool::new([1024]);
667+
let data_cache = DiskDataCache::new(
668+
DiskDataCacheConfig {
669+
cache_directory: cache_dir,
670+
block_size: 1024,
671+
limit: CacheLimit::Unbounded,
672+
},
673+
pool,
674+
);
667675

668676
let s3_key = "a".repeat(266);
669677
let etag = ETag::for_tests();
@@ -687,11 +695,15 @@ mod tests {
687695
#[test]
688696
fn get_path_for_block_key_huge_block_index() {
689697
let cache_dir = PathBuf::from("mountpoint-cache/");
690-
let data_cache = DiskDataCache::new(DiskDataCacheConfig {
691-
cache_directory: cache_dir,
692-
block_size: 1024,
693-
limit: CacheLimit::Unbounded,
694-
});
698+
let pool = PagedPool::new([1024]);
699+
let data_cache = DiskDataCache::new(
700+
DiskDataCacheConfig {
701+
cache_directory: cache_dir,
702+
block_size: 1024,
703+
limit: CacheLimit::Unbounded,
704+
},
705+
pool,
706+
);
695707

696708
let s3_key = "a".repeat(266);
697709
let etag = ETag::for_tests();
@@ -723,11 +735,15 @@ mod tests {
723735

724736
let block_size = 8 * 1024 * 1024;
725737
let cache_directory = tempfile::tempdir().unwrap();
726-
let cache = DiskDataCache::new(DiskDataCacheConfig {
727-
cache_directory: cache_directory.path().to_path_buf(),
728-
block_size,
729-
limit: CacheLimit::Unbounded,
730-
});
738+
let pool = PagedPool::new([block_size as usize]);
739+
let cache = DiskDataCache::new(
740+
DiskDataCacheConfig {
741+
cache_directory: cache_directory.path().to_path_buf(),
742+
block_size,
743+
limit: CacheLimit::Unbounded,
744+
},
745+
pool,
746+
);
731747
let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
732748
let cache_key_2 = ObjectId::new(
733749
"long-key_".repeat(100), // at least 900 chars, exceeding easily 255 chars (UNIX filename limit)
@@ -806,11 +822,15 @@ mod tests {
806822
let slice = data.slice(1..5);
807823

808824
let cache_directory = tempfile::tempdir().unwrap();
809-
let cache = DiskDataCache::new(DiskDataCacheConfig {
810-
cache_directory: cache_directory.path().to_path_buf(),
811-
block_size: 8 * 1024 * 1024,
812-
limit: CacheLimit::Unbounded,
813-
});
825+
let pool = PagedPool::new([8 * 1024 * 1024]);
826+
let cache = DiskDataCache::new(
827+
DiskDataCacheConfig {
828+
cache_directory: cache_directory.path().to_path_buf(),
829+
block_size: 8 * 1024 * 1024,
830+
limit: CacheLimit::Unbounded,
831+
},
832+
pool,
833+
);
814834
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
815835

816836
cache
@@ -884,11 +904,15 @@ mod tests {
884904
let small_object_key = ObjectId::new("small".into(), ETag::for_tests());
885905

886906
let cache_directory = tempfile::tempdir().unwrap();
887-
let cache = DiskDataCache::new(DiskDataCacheConfig {
888-
cache_directory: cache_directory.path().to_path_buf(),
889-
block_size: BLOCK_SIZE as u64,
890-
limit: CacheLimit::TotalSize { max_size: CACHE_LIMIT },
891-
});
907+
let pool = PagedPool::new([BLOCK_SIZE]);
908+
let cache = DiskDataCache::new(
909+
DiskDataCacheConfig {
910+
cache_directory: cache_directory.path().to_path_buf(),
911+
block_size: BLOCK_SIZE as u64,
912+
limit: CacheLimit::TotalSize { max_size: CACHE_LIMIT },
913+
},
914+
pool,
915+
);
892916

893917
// Put all of large_object
894918
for (block_idx, bytes) in large_object_blocks.iter().enumerate() {
@@ -1063,7 +1087,8 @@ mod tests {
10631087
// "Corrupt" the serialized value with an invalid length.
10641088
replace_u64_at(&mut buf, offset, u64::MAX);
10651089

1066-
let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH).expect_err("deserialization should fail");
1090+
let pool = PagedPool::new([MAX_LENGTH as usize]);
1091+
let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, &pool).expect_err("deserialization should fail");
10671092
match length_to_corrupt {
10681093
"key" | "etag" => assert!(matches!(
10691094
err,
@@ -1078,11 +1103,15 @@ mod tests {
10781103
fn test_concurrent_access() {
10791104
let block_size = 1024 * 1024;
10801105
let cache_directory = tempfile::tempdir().unwrap();
1081-
let data_cache = DiskDataCache::new(DiskDataCacheConfig {
1082-
cache_directory: cache_directory.path().to_path_buf(),
1083-
block_size: block_size as u64,
1084-
limit: CacheLimit::Unbounded,
1085-
});
1106+
let pool = PagedPool::new([block_size]);
1107+
let data_cache = DiskDataCache::new(
1108+
DiskDataCacheConfig {
1109+
cache_directory: cache_directory.path().to_path_buf(),
1110+
block_size: block_size as u64,
1111+
limit: CacheLimit::Unbounded,
1112+
},
1113+
pool,
1114+
);
10861115
let data_cache = Arc::new(data_cache);
10871116

10881117
let cache_key = ObjectId::new("foo".to_owned(), ETag::for_tests());

mountpoint-s3-fs/src/data_cache/multilevel_cache.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ mod tests {
116116
use super::*;
117117
use crate::checksums::ChecksummedBytes;
118118
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ExpressDataCacheConfig};
119+
use crate::memory::PagedPool;
119120

120121
use futures::executor::ThreadPool;
121122
use mountpoint_s3_client::mock_client::MockClient;
@@ -128,11 +129,15 @@ mod tests {
128129

129130
fn create_disk_cache() -> (TempDir, Arc<DiskDataCache>) {
130131
let cache_directory = tempfile::tempdir().unwrap();
131-
let cache = DiskDataCache::new(DiskDataCacheConfig {
132-
cache_directory: cache_directory.path().to_path_buf(),
133-
block_size: BLOCK_SIZE,
134-
limit: CacheLimit::Unbounded,
135-
});
132+
let pool = PagedPool::new([BLOCK_SIZE as usize, PART_SIZE]);
133+
let cache = DiskDataCache::new(
134+
DiskDataCacheConfig {
135+
cache_directory: cache_directory.path().to_path_buf(),
136+
block_size: BLOCK_SIZE,
137+
limit: CacheLimit::Unbounded,
138+
},
139+
pool,
140+
);
136141
(cache_directory, Arc::new(cache))
137142
}
138143

0 commit comments

Comments
 (0)