Skip to content

Commit 6d951e3

Browse files
committed
Use the paged memory pool in Mountpoint
Signed-off-by: Alessandro Passaro <[email protected]>
1 parent 9d96705 commit 6d951e3

File tree

6 files changed

+76
-25
lines changed

6 files changed

+76
-25
lines changed

mountpoint-s3-fs/examples/mount_from_config.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use mountpoint_s3_fs::{
1919
},
2020
logging::{LoggingConfig, LoggingHandle, error_logger::FileErrorLogger, init_logging},
2121
manifest::{Manifest, ingest_manifest},
22+
memory::PagedPool,
2223
metrics::{self, MetricsSinkHandle},
2324
prefix::Prefix,
2425
s3::config::{ClientConfig, PartConfig, Region, S3Path, TargetThroughputSetting},
@@ -234,14 +235,15 @@ fn mount_filesystem(
234235

235236
// Create the client and runtime
236237
let client_config = config.build_client_config()?;
238+
let pool = PagedPool::new([client_config.part_config.read_size_bytes]);
237239
let client = client_config
238-
.create_client(None)
240+
.create_client(pool.clone(), None)
239241
.context("Failed to create S3 client")?;
240242
let runtime = Runtime::new(client.event_loop_group());
241243

242244
// Create and run the FUSE session
243245
let fuse_session = mp_config
244-
.create_fuse_session(s3_path, client, runtime)
246+
.create_fuse_session(s3_path, client, runtime, Some(pool))
245247
.context("Failed to create FUSE session")?;
246248

247249
Ok(fuse_session)

mountpoint-s3-fs/src/config.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::data_cache::{DataCacheConfig, DiskDataCache, ExpressDataCache, Multil
66
use crate::fuse::config::FuseSessionConfig;
77
use crate::fuse::session::FuseSession;
88
use crate::fuse::{ErrorLogger, S3FuseFilesystem};
9+
use crate::memory::PagedPool;
910
use crate::prefetch::{Prefetcher, PrefetcherBuilder};
1011
use crate::s3::config::S3Path;
1112
use crate::sync::Arc;
@@ -46,11 +47,12 @@ impl MountpointConfig {
4647
s3_path: S3Path,
4748
client: Client,
4849
runtime: Runtime,
50+
memory_pool: Option<PagedPool>,
4951
) -> anyhow::Result<FuseSession>
5052
where
5153
Client: ObjectClient + Clone + Send + Sync + 'static,
5254
{
53-
let prefetcher_builder = create_prefetcher_builder(self.data_cache_config, &client, &runtime)?;
55+
let prefetcher_builder = create_prefetcher_builder(self.data_cache_config, &client, &runtime, memory_pool)?;
5456
tracing::trace!(filesystem_config=?self.filesystem_config, "creating file system");
5557
let fs = S3Filesystem::new(
5658
client,
@@ -72,11 +74,14 @@ fn create_prefetcher_builder<Client>(
7274
data_cache_config: DataCacheConfig,
7375
client: &Client,
7476
runtime: &Runtime,
77+
memory_pool: Option<PagedPool>,
7578
) -> anyhow::Result<PrefetcherBuilder<Client>>
7679
where
7780
Client: ObjectClient + Clone + Send + Sync + 'static,
7881
{
79-
let disk_cache = data_cache_config.disk_cache_config.map(DiskDataCache::new);
82+
let disk_cache = data_cache_config
83+
.disk_cache_config
84+
.map(|config| DiskDataCache::new_with_pool(config, memory_pool));
8085
let express_cache = match data_cache_config.express_cache_config {
8186
None => None,
8287
Some(config) => {

mountpoint-s3-fs/src/data_cache/disk_data_cache.rs

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use tracing::{trace, warn};
2020

2121
use crate::checksums::IntegrityError;
2222
use crate::data_cache::DataCacheError;
23+
use crate::memory::PagedPool;
2324
use crate::object::ObjectId;
2425
use crate::sync::Mutex;
2526

@@ -36,6 +37,7 @@ pub struct DiskDataCache {
3637
config: DiskDataCacheConfig,
3738
/// Tracks blocks usage. `None` when no cache limit was set.
3839
usage: Option<Mutex<UsageInfo<DiskBlockKey>>>,
40+
pool: Option<PagedPool>,
3941
}
4042

4143
/// Configuration for a [DiskDataCache].
@@ -246,20 +248,27 @@ impl DiskBlock {
246248
}
247249

248250
/// Deserialize an instance from `reader`.
249-
fn read(reader: &mut impl Read, block_size: u64) -> Result<Self, DiskBlockReadWriteError> {
251+
fn read(
252+
reader: &mut impl Read,
253+
block_size: u64,
254+
pool: Option<&PagedPool>,
255+
) -> Result<Self, DiskBlockReadWriteError> {
250256
let header: DiskBlockHeader = bincode::decode_from_std_read(reader, BINCODE_CONFIG)?;
251257

252258
if header.block_len > block_size {
253259
return Err(DiskBlockReadWriteError::InvalidBlockLength(header.block_len));
254260
}
255261

256-
let mut buffer = vec![0u8; header.block_len as usize];
257-
reader.read_exact(&mut buffer)?;
262+
let size = header.block_len as usize;
263+
let data = if let Some(pool) = pool {
264+
pool.read_exact(reader, size)?
265+
} else {
266+
let mut buffer = vec![0u8; size];
267+
reader.read_exact(&mut buffer)?;
268+
buffer.into()
269+
};
258270

259-
Ok(Self {
260-
header,
261-
data: buffer.into(),
262-
})
271+
Ok(Self { header, data })
263272
}
264273

265274
/// Serialize this instance to `writer` and return the number of bytes written on success.
@@ -303,14 +312,28 @@ impl From<DiskBlockReadWriteError> for DataCacheError {
303312
}
304313
}
305314

315+
fn usage_info(limit: &CacheLimit) -> Option<Mutex<UsageInfo<DiskBlockKey>>> {
316+
match limit {
317+
CacheLimit::Unbounded => None,
318+
CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
319+
}
320+
}
321+
306322
impl DiskDataCache {
307323
/// Create a new instance of an [DiskDataCache] with the specified configuration.
308324
pub fn new(config: DiskDataCacheConfig) -> Self {
309-
let usage = match config.limit {
310-
CacheLimit::Unbounded => None,
311-
CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
312-
};
313-
DiskDataCache { config, usage }
325+
let usage = usage_info(&config.limit);
326+
DiskDataCache {
327+
config,
328+
usage,
329+
pool: None,
330+
}
331+
}
332+
333+
/// Create a new instance of an [DiskDataCache] with the specified configuration.
334+
pub fn new_with_pool(config: DiskDataCacheConfig, pool: Option<PagedPool>) -> Self {
335+
let usage = usage_info(&config.limit);
336+
DiskDataCache { config, usage, pool }
314337
}
315338

316339
/// Get the relative path for the given block.
@@ -349,7 +372,7 @@ impl DiskDataCache {
349372
return Err(DataCacheError::InvalidBlockContent);
350373
}
351374

352-
let block = DiskBlock::read(&mut file, self.block_size())
375+
let block = DiskBlock::read(&mut file, self.block_size(), self.pool.as_ref())
353376
.inspect_err(|e| warn!(path = ?path.as_ref(), "block could not be deserialized: {:?}", e))?;
354377
let bytes = block
355378
.data(cache_key, block_idx, block_offset)
@@ -1063,7 +1086,7 @@ mod tests {
10631086
// "Corrupt" the serialized value with an invalid length.
10641087
replace_u64_at(&mut buf, offset, u64::MAX);
10651088

1066-
let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH).expect_err("deserialization should fail");
1089+
let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, None).expect_err("deserialization should fail");
10671090
match length_to_corrupt {
10681091
"key" | "etag" => assert!(matches!(
10691092
err,

mountpoint-s3-fs/src/s3/config.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use mountpoint_s3_client::{ObjectClient, S3CrtClient, S3RequestError};
1111
use regex::Regex;
1212
use thiserror::Error;
1313

14+
use crate::memory::PagedPool;
1415
use crate::prefix::{Prefix, PrefixError};
1516

1617
/// Configuration for the S3 Client to use in Mountpoint.
@@ -273,15 +274,20 @@ const INITIAL_READ_WINDOW_SIZE: usize = 1024 * 1024 + 128 * 1024;
273274

274275
impl ClientConfig {
275276
/// Create an [S3CrtClient]
276-
pub fn create_client(self, validate_on_s3_path: Option<&S3Path>) -> anyhow::Result<S3CrtClient> {
277+
pub fn create_client(
278+
self,
279+
memory_pool: PagedPool,
280+
validate_on_s3_path: Option<&S3Path>,
281+
) -> anyhow::Result<S3CrtClient> {
277282
let mut client_config = S3ClientConfig::new()
278283
.auth_config(self.auth_config)
279284
.throughput_target_gbps(self.throughput_target.value())
280285
.read_part_size(self.part_config.read_size_bytes)
281286
.write_part_size(self.part_config.write_size_bytes)
282287
.read_backpressure(true)
283288
.initial_read_window(INITIAL_READ_WINDOW_SIZE)
284-
.user_agent(self.user_agent);
289+
.user_agent(self.user_agent)
290+
.memory_pool(memory_pool);
285291
if let Some(interfaces) = self.bind {
286292
client_config = client_config.network_interface_names(interfaces);
287293
}

mountpoint-s3/src/bin/mock-mount-s3.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use mountpoint_s3_client::mock_client::throughput_client::ThroughputMockClient;
1919
use mountpoint_s3_client::mock_client::{MockClient, MockObject};
2020
use mountpoint_s3_client::types::ETag;
2121
use mountpoint_s3_fs::Runtime;
22+
use mountpoint_s3_fs::memory::PagedPool;
2223
use mountpoint_s3_fs::s3::S3Personality;
2324
use mountpoint_s3_fs::s3::config::{ClientConfig, S3Path, TargetThroughputSetting};
2425

@@ -29,6 +30,7 @@ fn main() -> anyhow::Result<()> {
2930

3031
pub fn create_mock_client(
3132
client_config: ClientConfig,
33+
_pool: PagedPool,
3234
s3_path: &S3Path,
3335
personality: Option<S3Personality>,
3436
) -> anyhow::Result<(Arc<ThroughputMockClient>, Runtime, S3Personality)> {

mountpoint-s3/src/run.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use mountpoint_s3_client::{ObjectClient, S3CrtClient};
1010
use mountpoint_s3_fs::data_cache::{DataCacheConfig, ManagedCacheDir};
1111
use mountpoint_s3_fs::fuse::session::FuseSession;
1212
use mountpoint_s3_fs::logging::init_logging;
13+
use mountpoint_s3_fs::memory::PagedPool;
1314
use mountpoint_s3_fs::s3::S3Personality;
1415
use mountpoint_s3_fs::s3::config::{ClientConfig, S3Path};
1516
use mountpoint_s3_fs::{MountpointConfig, Runtime, metrics};
@@ -174,8 +175,17 @@ fn mount(args: CliArgs, client_builder: impl ClientBuilder) -> anyhow::Result<Fu
174175

175176
let client_config = args.client_config(build_info::FULL_VERSION);
176177

178+
// Set up a paged memory pool
179+
let pool = PagedPool::new([
180+
1024 * 1024,
181+
client_config.part_config.read_size_bytes,
182+
client_config.part_config.write_size_bytes,
183+
]);
184+
pool.schedule_trim(Duration::from_secs(60));
185+
177186
let s3_path = args.s3_path()?;
178-
let (client, runtime, s3_personality) = client_builder.build(client_config, &s3_path, args.personality())?;
187+
let (client, runtime, s3_personality) =
188+
client_builder.build(client_config, pool.clone(), &s3_path, args.personality())?;
179189

180190
let bucket_description = args.bucket_description()?;
181191
tracing::debug!("using S3 personality {s3_personality:?} for {bucket_description}");
@@ -189,7 +199,7 @@ fn mount(args: CliArgs, client_builder: impl ClientBuilder) -> anyhow::Result<Fu
189199
let mount_point_path = format!("{}", fuse_session_config.mount_point());
190200

191201
let mut fuse_session = MountpointConfig::new(fuse_session_config, filesystem_config, data_cache_config)
192-
.create_fuse_session(s3_path, client, runtime)?;
202+
.create_fuse_session(s3_path, client, runtime, Some(pool))?;
193203
tracing::info!("successfully mounted {} at {}", bucket_description, mount_point_path);
194204

195205
if let Some(managed_cache_dir) = managed_cache_dir {
@@ -208,36 +218,39 @@ pub trait ClientBuilder {
208218
fn build(
209219
self,
210220
client_config: ClientConfig,
221+
pool: PagedPool,
211222
s3_path: &S3Path,
212223
personality: Option<S3Personality>,
213224
) -> anyhow::Result<(Self::Client, Runtime, S3Personality)>;
214225
}
215226

216227
impl<F, C> ClientBuilder for F
217228
where
218-
F: FnOnce(ClientConfig, &S3Path, Option<S3Personality>) -> anyhow::Result<(C, Runtime, S3Personality)>,
229+
F: FnOnce(ClientConfig, PagedPool, &S3Path, Option<S3Personality>) -> anyhow::Result<(C, Runtime, S3Personality)>,
219230
C: ObjectClient + Clone + Send + Sync + 'static,
220231
{
221232
type Client = C;
222233

223234
fn build(
224235
self,
225236
client_config: ClientConfig,
237+
pool: PagedPool,
226238
s3_path: &S3Path,
227239
personality: Option<S3Personality>,
228240
) -> anyhow::Result<(Self::Client, Runtime, S3Personality)> {
229-
self(client_config, s3_path, personality)
241+
self(client_config, pool, s3_path, personality)
230242
}
231243
}
232244

233245
// Create a real S3 client
234246
pub fn create_s3_client(
235247
client_config: ClientConfig,
248+
pool: PagedPool,
236249
s3_path: &S3Path,
237250
personality: Option<S3Personality>,
238251
) -> anyhow::Result<(S3CrtClient, Runtime, S3Personality)> {
239252
let client = client_config
240-
.create_client(Some(s3_path))
253+
.create_client(pool, Some(s3_path))
241254
.context("Failed to create S3 client")?;
242255

243256
let runtime = Runtime::new(client.event_loop_group());

0 commit comments

Comments
 (0)