Skip to content

Commit 1efbff6

Browse files
committed
Use the paged memory pool + ClientBuilder changes
Signed-off-by: Alessandro Passaro <[email protected]>
1 parent 21eeb2f commit 1efbff6

File tree

7 files changed

+165
-81
lines changed

7 files changed

+165
-81
lines changed

mountpoint-s3-fs/examples/mount_from_config.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ use mountpoint_s3_fs::{
1919
},
2020
logging::{LoggingConfig, LoggingHandle, error_logger::FileErrorLogger, init_logging},
2121
manifest::{Manifest, ingest_manifest},
22+
memory::PagedPool,
2223
metrics::{self, MetricsSinkHandle},
2324
prefix::Prefix,
24-
s3::config::{ClientConfig, PartConfig, Region, S3Path},
25+
s3::config::{ClientConfig, PartConfig, Region, S3Path, TargetThroughputSetting},
2526
};
2627
use nix::sys::signal::{self, Signal};
2728
use nix::unistd::Pid;
@@ -121,17 +122,17 @@ impl ConfigOptions {
121122
Some(prefix) => format!("{prefix}/mp-exmpl"),
122123
None => "mountpoint-s3-example/mp-exmpl".to_string(),
123124
};
124-
let target_throughput = self.determine_throughput()?;
125+
let throughput_target = self.determine_throughput()?;
125126
Ok(ClientConfig {
126127
region: Region::new_user_specified(self.region.clone()),
127128
endpoint_url: self.endpoint_url.clone(),
128129
addressing_style: AddressingStyle::Automatic,
129130
dual_stack: false,
130131
transfer_acceleration: false,
131-
auth_config: mountpoint_s3_client::config::S3ClientAuthConfig::Default,
132+
auth_config: Default::default(),
132133
requester_pays: false,
133134
expected_bucket_owner: self.expected_bucket_owner.clone(),
134-
throughput_target_gbps: target_throughput,
135+
throughput_target,
135136
bind: None,
136137
part_config: PartConfig::with_part_size(self.part_size.unwrap_or(8388608)),
137138
user_agent: UserAgent::new(Some(user_agent_string)),
@@ -157,23 +158,28 @@ impl ConfigOptions {
157158
DataCacheConfig::default()
158159
}
159160

160-
fn determine_throughput(&self) -> Result<f64> {
161+
fn determine_throughput(&self) -> Result<TargetThroughputSetting> {
161162
match &self.throughput_config {
162163
// TODO(chagem): Remove some code duplication, by moving this logic into fs crate.
163-
ThroughputConfig::Explicit { throughput } => Ok(*throughput),
164+
ThroughputConfig::Explicit { throughput } => Ok(TargetThroughputSetting::User { gbps: *throughput }),
164165
ThroughputConfig::IMDSAutoConfigure => {
165-
const DEFAULT_THROUGHPUT: f64 = 10.0;
166166
let instance_info = InstanceInfo::new();
167167
match autoconfigure::network_throughput(&instance_info) {
168-
Ok(throughput) => Ok(throughput),
168+
Ok(throughput) => Ok(TargetThroughputSetting::Instance { gbps: throughput }),
169169
Err(e) => {
170-
tracing::warn!("Failed to detect network throughput. Using {DEFAULT_THROUGHPUT} gbps: {e:?}");
171-
Ok(DEFAULT_THROUGHPUT)
170+
tracing::warn!(
171+
"Failed to detect network throughput. Using {} gbps: {:?}",
172+
TargetThroughputSetting::DEFAULT_TARGET_THROUGHPUT,
173+
e
174+
);
175+
Ok(TargetThroughputSetting::Default)
172176
}
173177
}
174178
}
175179
ThroughputConfig::IMDSLookUp { ec2_instance_type } => {
176-
autoconfigure::get_maximum_network_throughput(ec2_instance_type).context("Unrecognized instance ID")
180+
let target = autoconfigure::get_maximum_network_throughput(ec2_instance_type)
181+
.context("Unrecognized instance ID")?;
182+
Ok(TargetThroughputSetting::Instance { gbps: target })
177183
}
178184
}
179185
}
@@ -228,15 +234,16 @@ fn mount_filesystem(
228234
let s3_path = config.build_s3_path()?;
229235

230236
// Create the client and runtime
231-
let client = config
232-
.build_client_config()?
233-
.create_client(None)
237+
let client_config = config.build_client_config()?;
238+
let pool = PagedPool::new([client_config.part_config.read_size_bytes]);
239+
let client = client_config
240+
.create_client(pool.clone(), None)
234241
.context("Failed to create S3 client")?;
235242
let runtime = Runtime::new(client.event_loop_group());
236243

237244
// Create and run the FUSE session
238245
let fuse_session = mp_config
239-
.create_fuse_session(s3_path, client, runtime)
246+
.create_fuse_session(s3_path, client, runtime, Some(pool))
240247
.context("Failed to create FUSE session")?;
241248

242249
Ok(fuse_session)

mountpoint-s3-fs/src/config.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::data_cache::{DataCacheConfig, DiskDataCache, ExpressDataCache, Multil
66
use crate::fuse::config::FuseSessionConfig;
77
use crate::fuse::session::FuseSession;
88
use crate::fuse::{ErrorLogger, S3FuseFilesystem};
9+
use crate::memory::PagedPool;
910
use crate::prefetch::{Prefetcher, PrefetcherBuilder};
1011
use crate::s3::config::S3Path;
1112
use crate::sync::Arc;
@@ -46,11 +47,12 @@ impl MountpointConfig {
4647
s3_path: S3Path,
4748
client: Client,
4849
runtime: Runtime,
50+
memory_pool: Option<PagedPool>,
4951
) -> anyhow::Result<FuseSession>
5052
where
5153
Client: ObjectClient + Clone + Send + Sync + 'static,
5254
{
53-
let prefetcher_builder = create_prefetcher_builder(self.data_cache_config, &client, &runtime)?;
55+
let prefetcher_builder = create_prefetcher_builder(self.data_cache_config, &client, &runtime, memory_pool)?;
5456
tracing::trace!(filesystem_config=?self.filesystem_config, "creating file system");
5557
let fs = S3Filesystem::new(
5658
client,
@@ -72,11 +74,14 @@ fn create_prefetcher_builder<Client>(
7274
data_cache_config: DataCacheConfig,
7375
client: &Client,
7476
runtime: &Runtime,
77+
memory_pool: Option<PagedPool>,
7578
) -> anyhow::Result<PrefetcherBuilder<Client>>
7679
where
7780
Client: ObjectClient + Clone + Send + Sync + 'static,
7881
{
79-
let disk_cache = data_cache_config.disk_cache_config.map(DiskDataCache::new);
82+
let disk_cache = data_cache_config
83+
.disk_cache_config
84+
.map(|config| DiskDataCache::new_with_pool(config, memory_pool));
8085
let express_cache = match data_cache_config.express_cache_config {
8186
None => None,
8287
Some(config) => {

mountpoint-s3-fs/src/data_cache/disk_data_cache.rs

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use tracing::{trace, warn};
2020

2121
use crate::checksums::IntegrityError;
2222
use crate::data_cache::DataCacheError;
23+
use crate::memory::PagedPool;
2324
use crate::object::ObjectId;
2425
use crate::sync::Mutex;
2526

@@ -36,6 +37,7 @@ pub struct DiskDataCache {
3637
config: DiskDataCacheConfig,
3738
/// Tracks blocks usage. `None` when no cache limit was set.
3839
usage: Option<Mutex<UsageInfo<DiskBlockKey>>>,
40+
pool: Option<PagedPool>,
3941
}
4042

4143
/// Configuration for a [DiskDataCache].
@@ -246,20 +248,27 @@ impl DiskBlock {
246248
}
247249

248250
/// Deserialize an instance from `reader`.
249-
fn read(reader: &mut impl Read, block_size: u64) -> Result<Self, DiskBlockReadWriteError> {
251+
fn read(
252+
reader: &mut impl Read,
253+
block_size: u64,
254+
pool: Option<&PagedPool>,
255+
) -> Result<Self, DiskBlockReadWriteError> {
250256
let header: DiskBlockHeader = bincode::decode_from_std_read(reader, BINCODE_CONFIG)?;
251257

252258
if header.block_len > block_size {
253259
return Err(DiskBlockReadWriteError::InvalidBlockLength(header.block_len));
254260
}
255261

256-
let mut buffer = vec![0u8; header.block_len as usize];
257-
reader.read_exact(&mut buffer)?;
262+
let size = header.block_len as usize;
263+
let data = if let Some(pool) = pool {
264+
pool.read_exact(reader, size)?
265+
} else {
266+
let mut buffer = vec![0u8; size];
267+
reader.read_exact(&mut buffer)?;
268+
buffer.into()
269+
};
258270

259-
Ok(Self {
260-
header,
261-
data: buffer.into(),
262-
})
271+
Ok(Self { header, data })
263272
}
264273

265274
/// Serialize this instance to `writer` and return the number of bytes written on success.
@@ -303,14 +312,28 @@ impl From<DiskBlockReadWriteError> for DataCacheError {
303312
}
304313
}
305314

315+
fn usage_info(limit: &CacheLimit) -> Option<Mutex<UsageInfo<DiskBlockKey>>> {
316+
match limit {
317+
CacheLimit::Unbounded => None,
318+
CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
319+
}
320+
}
321+
306322
impl DiskDataCache {
307323
/// Create a new instance of an [DiskDataCache] with the specified configuration.
308324
pub fn new(config: DiskDataCacheConfig) -> Self {
309-
let usage = match config.limit {
310-
CacheLimit::Unbounded => None,
311-
CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
312-
};
313-
DiskDataCache { config, usage }
325+
let usage = usage_info(&config.limit);
326+
DiskDataCache {
327+
config,
328+
usage,
329+
pool: None,
330+
}
331+
}
332+
333+
/// Create a new instance of an [DiskDataCache] with the specified configuration.
334+
pub fn new_with_pool(config: DiskDataCacheConfig, pool: Option<PagedPool>) -> Self {
335+
let usage = usage_info(&config.limit);
336+
DiskDataCache { config, usage, pool }
314337
}
315338

316339
/// Get the relative path for the given block.
@@ -349,7 +372,7 @@ impl DiskDataCache {
349372
return Err(DataCacheError::InvalidBlockContent);
350373
}
351374

352-
let block = DiskBlock::read(&mut file, self.block_size())
375+
let block = DiskBlock::read(&mut file, self.block_size(), self.pool.as_ref())
353376
.inspect_err(|e| warn!(path = ?path.as_ref(), "block could not be deserialized: {:?}", e))?;
354377
let bytes = block
355378
.data(cache_key, block_idx, block_offset)
@@ -1063,7 +1086,7 @@ mod tests {
10631086
// "Corrupt" the serialized value with an invalid length.
10641087
replace_u64_at(&mut buf, offset, u64::MAX);
10651088

1066-
let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH).expect_err("deserialization should fail");
1089+
let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, None).expect_err("deserialization should fail");
10671090
match length_to_corrupt {
10681091
"key" | "etag" => assert!(matches!(
10691092
err,

mountpoint-s3-fs/src/s3/config.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use mountpoint_s3_client::{ObjectClient, S3CrtClient, S3RequestError};
1111
use regex::Regex;
1212
use thiserror::Error;
1313

14+
use crate::memory::PagedPool;
1415
use crate::prefix::{Prefix, PrefixError};
1516

1617
/// Configuration for the S3 Client to use in Mountpoint.
@@ -41,7 +42,7 @@ pub struct ClientConfig {
4142
pub expected_bucket_owner: Option<String>,
4243

4344
/// Target throughput in Gbps
44-
pub throughput_target_gbps: f64,
45+
pub throughput_target: TargetThroughputSetting,
4546

4647
/// One or more network interfaces to use when accessing S3
4748
pub bind: Option<Vec<String>>,
@@ -182,10 +183,10 @@ fn matches_bucket_regex(bucket_name: &str) -> bool {
182183
#[derive(Debug)]
183184
pub struct PartConfig {
184185
/// Part size for GET in bytes
185-
read_size_bytes: usize,
186+
pub read_size_bytes: usize,
186187

187188
/// Part size for multi-part PUT in bytes
188-
write_size_bytes: usize,
189+
pub write_size_bytes: usize,
189190
}
190191

191192
impl PartConfig {
@@ -238,6 +239,26 @@ impl Display for Region {
238239
}
239240
}
240241

242+
/// Target throughput setting.
243+
#[derive(Debug, Clone, Copy)]
244+
pub enum TargetThroughputSetting {
245+
Default,
246+
User { gbps: f64 },
247+
Instance { gbps: f64 },
248+
}
249+
250+
impl TargetThroughputSetting {
251+
pub const DEFAULT_TARGET_THROUGHPUT: f64 = 10.0;
252+
253+
pub fn value(&self) -> f64 {
254+
match self {
255+
TargetThroughputSetting::Default => Self::DEFAULT_TARGET_THROUGHPUT,
256+
TargetThroughputSetting::User { gbps } => *gbps,
257+
TargetThroughputSetting::Instance { gbps } => *gbps,
258+
}
259+
}
260+
}
261+
241262
// This is a weird looking number! We really want our first request size to be 1MiB,
242263
// which is a common IO size. But Linux's readahead will try to read an extra 128k on on
243264
// top of a 1MiB read, which we'd have to wait for a second request to service. Because
@@ -253,15 +274,20 @@ const INITIAL_READ_WINDOW_SIZE: usize = 1024 * 1024 + 128 * 1024;
253274

254275
impl ClientConfig {
255276
/// Create an [S3CrtClient]
256-
pub fn create_client(self, validate_on_s3_path: Option<&S3Path>) -> anyhow::Result<S3CrtClient> {
277+
pub fn create_client(
278+
self,
279+
memory_pool: PagedPool,
280+
validate_on_s3_path: Option<&S3Path>,
281+
) -> anyhow::Result<S3CrtClient> {
257282
let mut client_config = S3ClientConfig::new()
258283
.auth_config(self.auth_config)
259-
.throughput_target_gbps(self.throughput_target_gbps)
284+
.throughput_target_gbps(self.throughput_target.value())
260285
.read_part_size(self.part_config.read_size_bytes)
261286
.write_part_size(self.part_config.write_size_bytes)
262287
.read_backpressure(true)
263288
.initial_read_window(INITIAL_READ_WINDOW_SIZE)
264-
.user_agent(self.user_agent);
289+
.user_agent(self.user_agent)
290+
.memory_pool(memory_pool);
265291
if let Some(interfaces) = self.bind {
266292
client_config = client_config.network_interface_names(interfaces);
267293
}

mountpoint-s3/src/bin/mock-mount-s3.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,56 +19,55 @@ use mountpoint_s3_client::mock_client::throughput_client::ThroughputMockClient;
1919
use mountpoint_s3_client::mock_client::{MockClient, MockObject};
2020
use mountpoint_s3_client::types::ETag;
2121
use mountpoint_s3_fs::Runtime;
22+
use mountpoint_s3_fs::memory::PagedPool;
2223
use mountpoint_s3_fs::s3::S3Personality;
23-
use mountpoint_s3_fs::s3::config::BucketNameOrS3Uri;
24+
use mountpoint_s3_fs::s3::config::{ClientConfig, S3Path, TargetThroughputSetting};
2425

2526
fn main() -> anyhow::Result<()> {
2627
let cli_args = CliArgs::parse();
2728
mountpoint_s3::run(create_mock_client, cli_args)
2829
}
2930

30-
fn create_mock_client(args: &CliArgs) -> anyhow::Result<(Arc<ThroughputMockClient>, Runtime, S3Personality)> {
31-
let bucket_name: String = match &args.bucket_name {
32-
BucketNameOrS3Uri::BucketName(bucket_name) => bucket_name.clone().into(),
33-
BucketNameOrS3Uri::S3Uri(_) => panic!("mock-mount-s3 bucket names do not support S3 URIs"),
34-
};
35-
31+
pub fn create_mock_client(
32+
client_config: ClientConfig,
33+
_pool: PagedPool,
34+
s3_path: S3Path,
35+
personality: Option<S3Personality>,
36+
) -> anyhow::Result<(Arc<ThroughputMockClient>, Runtime, S3Personality)> {
3637
// An extra little safety thing to make sure we can distinguish the real mount-s3 binary and
3738
// this one. Buckets starting with "sthree-" are always invalid against real S3:
3839
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
3940
anyhow::ensure!(
40-
&bucket_name.starts_with("sthree-"),
41+
&s3_path.bucket_name.starts_with("sthree-"),
4142
"mock-mount-s3 bucket names must start with `sthree-`"
4243
);
4344

44-
tracing::warn!("using mock client");
45-
4645
// TODO: Actually update the mock client to support different part sizes
4746
let part_size = {
48-
if args.read_part_size.is_some() || args.write_part_size.is_some() {
47+
if client_config.part_config.read_size_bytes != client_config.part_config.write_size_bytes {
4948
tracing::warn!("mock client does not support separate part sizes for reading and writing, ignoring");
5049
}
51-
args.part_size
50+
client_config.part_config.read_size_bytes
5251
};
5352

54-
let s3_personality = if let Some(bucket_type) = &args.bucket_type {
55-
bucket_type.to_personality()
56-
} else {
57-
S3Personality::Standard
58-
};
53+
let s3_personality = personality.unwrap_or(S3Personality::Standard);
5954

6055
let config = MockClient::config()
61-
.bucket(&bucket_name)
56+
.bucket(&s3_path.bucket_name)
6257
.part_size(part_size as usize)
6358
.unordered_list_seed(None)
6459
.enable_backpressure(true)
6560
.initial_read_window_size(1024 * 1024 + 128 * 1024) // matching real MP
6661
.enable_rename(s3_personality.supports_rename_object());
6762

68-
let client = if let Some(max_throughput_gbps) = args.maximum_throughput_gbps {
63+
let client = if let TargetThroughputSetting::User {
64+
gbps: max_throughput_gbps,
65+
} = client_config.throughput_target
66+
{
6967
tracing::info!("mock client limited to {max_throughput_gbps} Gb/s download throughput");
70-
ThroughputMockClient::new(config, max_throughput_gbps as f64)
68+
ThroughputMockClient::new(config, max_throughput_gbps)
7169
} else {
70+
tracing::info!("mock client with no throughput limit");
7271
ThroughputMockClient::new_unlimited_throughput(config)
7372
};
7473

0 commit comments

Comments
 (0)