Skip to content

Commit 29bdd9d

Browse files
authored
Refactor ClientBuilder into a trait and remove use of CliArgs (#1513)
We use a generic parameter in the `run` and `mount` functions to create an S3 client instance (and associated runtime), so they can be used with the actual S3 client and the mock one. This PR changes 2 things: * Replaces the `FnOnce` with a trait, to make it simpler to pass around and extend in the future, * Removes the `CliArgs` argument in favor of `ClientConfig` and other required settings. ### Does this change impact existing behavior? No. ### Does this change need a changelog entry? Does it require a version change? No. --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). Signed-off-by: Alessandro Passaro <[email protected]>
1 parent fa62033 commit 29bdd9d

File tree

5 files changed

+115
-68
lines changed

5 files changed

+115
-68
lines changed

mountpoint-s3-fs/examples/mount_from_config.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use mountpoint_s3_fs::{
2121
manifest::{Manifest, ingest_manifest},
2222
metrics::{self, MetricsSinkHandle},
2323
prefix::Prefix,
24-
s3::config::{ClientConfig, PartConfig, Region, S3Path},
24+
s3::config::{ClientConfig, PartConfig, Region, S3Path, TargetThroughputSetting},
2525
};
2626
use nix::sys::signal::{self, Signal};
2727
use nix::unistd::Pid;
@@ -121,17 +121,17 @@ impl ConfigOptions {
121121
Some(prefix) => format!("{prefix}/mp-exmpl"),
122122
None => "mountpoint-s3-example/mp-exmpl".to_string(),
123123
};
124-
let target_throughput = self.determine_throughput()?;
124+
let throughput_target = self.determine_throughput()?;
125125
Ok(ClientConfig {
126126
region: Region::new_user_specified(self.region.clone()),
127127
endpoint_url: self.endpoint_url.clone(),
128128
addressing_style: AddressingStyle::Automatic,
129129
dual_stack: false,
130130
transfer_acceleration: false,
131-
auth_config: mountpoint_s3_client::config::S3ClientAuthConfig::Default,
131+
auth_config: Default::default(),
132132
requester_pays: false,
133133
expected_bucket_owner: self.expected_bucket_owner.clone(),
134-
throughput_target_gbps: target_throughput,
134+
throughput_target,
135135
bind: None,
136136
part_config: PartConfig::with_part_size(self.part_size.unwrap_or(8388608)),
137137
user_agent: UserAgent::new(Some(user_agent_string)),
@@ -157,23 +157,28 @@ impl ConfigOptions {
157157
DataCacheConfig::default()
158158
}
159159

160-
fn determine_throughput(&self) -> Result<f64> {
160+
fn determine_throughput(&self) -> Result<TargetThroughputSetting> {
161161
match &self.throughput_config {
162162
// TODO(chagem): Remove some code duplication, by moving this logic into fs crate.
163-
ThroughputConfig::Explicit { throughput } => Ok(*throughput),
163+
ThroughputConfig::Explicit { throughput } => Ok(TargetThroughputSetting::User { gbps: *throughput }),
164164
ThroughputConfig::IMDSAutoConfigure => {
165-
const DEFAULT_THROUGHPUT: f64 = 10.0;
166165
let instance_info = InstanceInfo::new();
167166
match autoconfigure::network_throughput(&instance_info) {
168-
Ok(throughput) => Ok(throughput),
167+
Ok(throughput) => Ok(TargetThroughputSetting::Instance { gbps: throughput }),
169168
Err(e) => {
170-
tracing::warn!("Failed to detect network throughput. Using {DEFAULT_THROUGHPUT} gbps: {e:?}");
171-
Ok(DEFAULT_THROUGHPUT)
169+
tracing::warn!(
170+
"Failed to detect network throughput. Using {} gbps: {:?}",
171+
TargetThroughputSetting::DEFAULT_TARGET_THROUGHPUT_GBPS,
172+
e
173+
);
174+
Ok(TargetThroughputSetting::Default)
172175
}
173176
}
174177
}
175178
ThroughputConfig::IMDSLookUp { ec2_instance_type } => {
176-
autoconfigure::get_maximum_network_throughput(ec2_instance_type).context("Unrecognized instance ID")
179+
let target = autoconfigure::get_maximum_network_throughput(ec2_instance_type)
180+
.context("Unrecognized instance ID")?;
181+
Ok(TargetThroughputSetting::Instance { gbps: target })
177182
}
178183
}
179184
}
@@ -228,8 +233,8 @@ fn mount_filesystem(
228233
let s3_path = config.build_s3_path()?;
229234

230235
// Create the client and runtime
231-
let client = config
232-
.build_client_config()?
236+
let client_config = config.build_client_config()?;
237+
let client = client_config
233238
.create_client(None)
234239
.context("Failed to create S3 client")?;
235240
let runtime = Runtime::new(client.event_loop_group());

mountpoint-s3-fs/src/s3/config.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub struct ClientConfig {
4141
pub expected_bucket_owner: Option<String>,
4242

4343
/// Target throughput in Gbps
44-
pub throughput_target_gbps: f64,
44+
pub throughput_target: TargetThroughputSetting,
4545

4646
/// One or more network interfaces to use when accessing S3
4747
pub bind: Option<Vec<String>>,
@@ -182,10 +182,10 @@ fn matches_bucket_regex(bucket_name: &str) -> bool {
182182
#[derive(Debug)]
183183
pub struct PartConfig {
184184
/// Part size for GET in bytes
185-
read_size_bytes: usize,
185+
pub read_size_bytes: usize,
186186

187187
/// Part size for multi-part PUT in bytes
188-
write_size_bytes: usize,
188+
pub write_size_bytes: usize,
189189
}
190190

191191
impl PartConfig {
@@ -238,6 +238,26 @@ impl Display for Region {
238238
}
239239
}
240240

241+
/// Target throughput setting.
242+
#[derive(Debug, Clone, Copy)]
243+
pub enum TargetThroughputSetting {
244+
Default,
245+
User { gbps: f64 },
246+
Instance { gbps: f64 },
247+
}
248+
249+
impl TargetThroughputSetting {
250+
pub const DEFAULT_TARGET_THROUGHPUT_GBPS: f64 = 10.0;
251+
252+
pub fn value(&self) -> f64 {
253+
match self {
254+
TargetThroughputSetting::Default => Self::DEFAULT_TARGET_THROUGHPUT_GBPS,
255+
TargetThroughputSetting::User { gbps } => *gbps,
256+
TargetThroughputSetting::Instance { gbps } => *gbps,
257+
}
258+
}
259+
}
260+
241261
// This is a weird looking number! We really want our first request size to be 1MiB,
242262
// which is a common IO size. But Linux's readahead will try to read an extra 128k on on
243263
// top of a 1MiB read, which we'd have to wait for a second request to service. Because
@@ -256,7 +276,7 @@ impl ClientConfig {
256276
pub fn create_client(self, validate_on_s3_path: Option<&S3Path>) -> anyhow::Result<S3CrtClient> {
257277
let mut client_config = S3ClientConfig::new()
258278
.auth_config(self.auth_config)
259-
.throughput_target_gbps(self.throughput_target_gbps)
279+
.throughput_target_gbps(self.throughput_target.value())
260280
.read_part_size(self.part_config.read_size_bytes)
261281
.write_part_size(self.part_config.write_size_bytes)
262282
.read_backpressure(true)

mountpoint-s3/src/bin/mock-mount-s3.rs

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,55 +20,52 @@ use mountpoint_s3_client::mock_client::{MockClient, MockObject};
2020
use mountpoint_s3_client::types::ETag;
2121
use mountpoint_s3_fs::Runtime;
2222
use mountpoint_s3_fs::s3::S3Personality;
23-
use mountpoint_s3_fs::s3::config::BucketNameOrS3Uri;
23+
use mountpoint_s3_fs::s3::config::{ClientConfig, S3Path, TargetThroughputSetting};
2424

2525
fn main() -> anyhow::Result<()> {
2626
let cli_args = CliArgs::parse();
2727
mountpoint_s3::run(create_mock_client, cli_args)
2828
}
2929

30-
fn create_mock_client(args: &CliArgs) -> anyhow::Result<(Arc<ThroughputMockClient>, Runtime, S3Personality)> {
31-
let bucket_name: String = match &args.bucket_name {
32-
BucketNameOrS3Uri::BucketName(bucket_name) => bucket_name.clone().into(),
33-
BucketNameOrS3Uri::S3Uri(_) => panic!("mock-mount-s3 bucket names do not support S3 URIs"),
34-
};
35-
30+
pub fn create_mock_client(
31+
client_config: ClientConfig,
32+
s3_path: &S3Path,
33+
personality: Option<S3Personality>,
34+
) -> anyhow::Result<(Arc<ThroughputMockClient>, Runtime, S3Personality)> {
3635
// An extra little safety thing to make sure we can distinguish the real mount-s3 binary and
3736
// this one. Buckets starting with "sthree-" are always invalid against real S3:
3837
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
3938
anyhow::ensure!(
40-
&bucket_name.starts_with("sthree-"),
39+
&s3_path.bucket_name.starts_with("sthree-"),
4140
"mock-mount-s3 bucket names must start with `sthree-`"
4241
);
4342

44-
tracing::warn!("using mock client");
45-
4643
// TODO: Actually update the mock client to support different part sizes
4744
let part_size = {
48-
if args.read_part_size.is_some() || args.write_part_size.is_some() {
45+
if client_config.part_config.read_size_bytes != client_config.part_config.write_size_bytes {
4946
tracing::warn!("mock client does not support separate part sizes for reading and writing, ignoring");
5047
}
51-
args.part_size
48+
client_config.part_config.read_size_bytes
5249
};
5350

54-
let s3_personality = if let Some(bucket_type) = &args.bucket_type {
55-
bucket_type.to_personality()
56-
} else {
57-
S3Personality::Standard
58-
};
51+
let s3_personality = personality.unwrap_or(S3Personality::Standard);
5952

6053
let config = MockClient::config()
61-
.bucket(&bucket_name)
54+
.bucket(&s3_path.bucket_name)
6255
.part_size(part_size as usize)
6356
.unordered_list_seed(None)
6457
.enable_backpressure(true)
6558
.initial_read_window_size(1024 * 1024 + 128 * 1024) // matching real MP
6659
.enable_rename(s3_personality.supports_rename_object());
6760

68-
let client = if let Some(max_throughput_gbps) = args.maximum_throughput_gbps {
61+
let client = if let TargetThroughputSetting::User {
62+
gbps: max_throughput_gbps,
63+
} = client_config.throughput_target
64+
{
6965
tracing::info!("mock client limited to {max_throughput_gbps} Gb/s download throughput");
70-
ThroughputMockClient::new(config, max_throughput_gbps as f64)
66+
ThroughputMockClient::new(config, max_throughput_gbps)
7167
} else {
68+
tracing::info!("mock client with no throughput limit");
7269
ThroughputMockClient::new_unlimited_throughput(config)
7370
};
7471

mountpoint-s3/src/cli.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use mountpoint_s3_fs::logging::{LoggingConfig, prepare_log_file_name};
1414
use mountpoint_s3_fs::mem_limiter::MINIMUM_MEM_LIMIT;
1515
use mountpoint_s3_fs::prefix::Prefix;
1616
use mountpoint_s3_fs::s3::S3Personality;
17-
use mountpoint_s3_fs::s3::config::{BucketNameOrS3Uri, ClientConfig, PartConfig, S3Path};
17+
use mountpoint_s3_fs::s3::config::{BucketNameOrS3Uri, ClientConfig, PartConfig, S3Path, TargetThroughputSetting};
1818
use mountpoint_s3_fs::{S3FilesystemConfig, autoconfigure, metrics};
1919
use sysinfo::{RefreshKind, System};
2020

@@ -720,22 +720,22 @@ impl CliArgs {
720720
user_agent
721721
}
722722

723-
fn throughput_target_gbps(&self, instance_info: &InstanceInfo) -> f64 {
724-
const DEFAULT_TARGET_THROUGHPUT: f64 = 10.0;
725-
726-
let throughput_target_gbps = self.maximum_throughput_gbps.map(|t| t as f64).unwrap_or_else(|| {
723+
fn throughput_target_gbps(&self, instance_info: &InstanceInfo) -> TargetThroughputSetting {
724+
let throughput_target_gbps = self.maximum_throughput_gbps.map(|t| TargetThroughputSetting::User { gbps: t as f64 }).unwrap_or_else(|| {
727725
match autoconfigure::network_throughput(instance_info) {
728-
Ok(throughput) => throughput,
726+
Ok(throughput) => TargetThroughputSetting::Instance { gbps: throughput },
729727
Err(e) => {
730728
tracing::warn!(
731-
"failed to detect network throughput. Using {DEFAULT_TARGET_THROUGHPUT} gbps as throughput. \
732-
Use --maximum-throughput-gbps CLI flag to configure a target throughput appropriate for the instance. Detection failed due to: {e:?}",
729+
"failed to detect network throughput. Using {} gbps as throughput. \
730+
Use --maximum-throughput-gbps CLI flag to configure a target throughput appropriate for the instance. Detection failed due to: {:?}",
731+
TargetThroughputSetting::DEFAULT_TARGET_THROUGHPUT_GBPS,
732+
e,
733733
);
734-
DEFAULT_TARGET_THROUGHPUT
734+
TargetThroughputSetting::Default
735735
}
736736
}
737737
});
738-
tracing::info!("target network throughput {throughput_target_gbps} Gbps");
738+
tracing::info!("target network throughput {} Gbps", throughput_target_gbps.value());
739739
throughput_target_gbps
740740
}
741741

@@ -763,7 +763,7 @@ impl CliArgs {
763763
pub fn client_config(&self, version: &str) -> ClientConfig {
764764
let instance_info = InstanceInfo::new();
765765
let user_agent = self.user_agent(&instance_info, version);
766-
let throughput_target_gbps = self.throughput_target_gbps(&instance_info);
766+
let throughput_target = self.throughput_target_gbps(&instance_info);
767767
let region = autoconfigure::get_region(&instance_info, self.region.clone());
768768

769769
ClientConfig {
@@ -775,7 +775,7 @@ impl CliArgs {
775775
auth_config: self.auth_config(),
776776
requester_pays: self.requester_pays,
777777
expected_bucket_owner: self.expected_bucket_owner.clone(),
778-
throughput_target_gbps,
778+
throughput_target,
779779
bind: self.bind.clone(),
780780
part_config: self.part_config(),
781781
user_agent,

mountpoint-s3/src/run.rs

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use mountpoint_s3_fs::data_cache::{DataCacheConfig, ManagedCacheDir};
1111
use mountpoint_s3_fs::fuse::session::FuseSession;
1212
use mountpoint_s3_fs::logging::init_logging;
1313
use mountpoint_s3_fs::s3::S3Personality;
14+
use mountpoint_s3_fs::s3::config::{ClientConfig, S3Path};
1415
use mountpoint_s3_fs::{MountpointConfig, Runtime, metrics};
1516
use nix::sys::signal::Signal;
1617
use nix::unistd::ForkResult;
@@ -19,11 +20,7 @@ use crate::cli::CliArgs;
1920
use crate::{build_info, parse_cli_args};
2021

2122
/// Run Mountpoint with the given [CliArgs].
22-
pub fn run<ClientBuilder, Client>(client_builder: ClientBuilder, args: CliArgs) -> anyhow::Result<()>
23-
where
24-
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
25-
Client: ObjectClient + Clone + Send + Sync + 'static,
26-
{
23+
pub fn run(client_builder: impl ClientBuilder, args: CliArgs) -> anyhow::Result<()> {
2724
let successful_mount_msg = format!(
2825
"{} is mounted at {}",
2926
args.bucket_description()?,
@@ -168,23 +165,21 @@ where
168165
Ok(())
169166
}
170167

171-
fn mount<ClientBuilder, Client>(args: CliArgs, client_builder: ClientBuilder) -> anyhow::Result<FuseSession>
172-
where
173-
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
174-
Client: ObjectClient + Clone + Send + Sync + 'static,
175-
{
168+
fn mount(args: CliArgs, client_builder: impl ClientBuilder) -> anyhow::Result<FuseSession> {
176169
tracing::info!("mount-s3 {}", build_info::FULL_VERSION);
177170
tracing::debug!("{:?}", args);
178171

179172
let fuse_session_config = args.fuse_session_config()?;
180173
let sse = args.server_side_encryption()?;
181174

182-
let (client, runtime, s3_personality) = client_builder(&args)?;
175+
let client_config = args.client_config(build_info::FULL_VERSION);
176+
177+
let s3_path = args.s3_path()?;
178+
let (client, runtime, s3_personality) = client_builder.build(client_config, &s3_path, args.personality())?;
183179

184180
let bucket_description = args.bucket_description()?;
185181
tracing::debug!("using S3 personality {s3_personality:?} for {bucket_description}");
186182

187-
let s3_path = args.s3_path()?;
188183
let filesystem_config = args.filesystem_config(sse.clone(), s3_personality);
189184
let mut data_cache_config = args.data_cache_config(sse)?;
190185

@@ -205,18 +200,48 @@ where
205200
Ok(fuse_session)
206201
}
207202

208-
/// Create a real S3 client
209-
pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, Runtime, S3Personality)> {
210-
let client_config = args.client_config(build_info::FULL_VERSION);
203+
/// Builder for [ObjectClient] implementations.
204+
pub trait ClientBuilder {
205+
type Client: ObjectClient + Clone + Send + Sync + 'static;
206+
207+
/// Build a new client instance.
208+
fn build(
209+
self,
210+
client_config: ClientConfig,
211+
s3_path: &S3Path,
212+
personality: Option<S3Personality>,
213+
) -> anyhow::Result<(Self::Client, Runtime, S3Personality)>;
214+
}
211215

212-
let s3_path = args.s3_path()?;
216+
impl<F, C> ClientBuilder for F
217+
where
218+
F: FnOnce(ClientConfig, &S3Path, Option<S3Personality>) -> anyhow::Result<(C, Runtime, S3Personality)>,
219+
C: ObjectClient + Clone + Send + Sync + 'static,
220+
{
221+
type Client = C;
222+
223+
fn build(
224+
self,
225+
client_config: ClientConfig,
226+
s3_path: &S3Path,
227+
personality: Option<S3Personality>,
228+
) -> anyhow::Result<(Self::Client, Runtime, S3Personality)> {
229+
self(client_config, s3_path, personality)
230+
}
231+
}
232+
233+
// Create a real S3 client
234+
pub fn create_s3_client(
235+
client_config: ClientConfig,
236+
s3_path: &S3Path,
237+
personality: Option<S3Personality>,
238+
) -> anyhow::Result<(S3CrtClient, Runtime, S3Personality)> {
213239
let client = client_config
214-
.create_client(Some(&s3_path))
240+
.create_client(Some(s3_path))
215241
.context("Failed to create S3 client")?;
216242

217243
let runtime = Runtime::new(client.event_loop_group());
218-
let s3_personality = args
219-
.personality()
244+
let s3_personality = personality
220245
.unwrap_or_else(|| S3Personality::infer_from_bucket(&s3_path.bucket_name, &client.endpoint_config()));
221246

222247
Ok((client, runtime, s3_personality))

0 commit comments

Comments
 (0)