Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,32 @@ def _mount_mp(

Returns Mountpoint version string.
"""
bucket = cfg['s3_bucket']
stub_mode = str(cfg["stub_mode"]).lower()

if cfg['mountpoint_binary'] is None:
mountpoint_args = [
"cargo",
"run",
"--quiet",
"--release",
"--",
"--features=mock",
]

if stub_mode == "s3_client":
# `mock-mount-s3` requires bucket to be prefixed with `sthree-` to verify we're not actually reaching S3
logging.debug("using mock-mount-s3 due to `stub_mode`, bucket will be prefixed with \"sthree-\"")
bucket = f"sthree-{cfg['s3_bucket']}"

mountpoint_args.append("--bin=mock-mount-s3")

# End Cargo command, begin passing arguments to Mountpoint
mountpoint_args.append("--")
else:
mountpoint_args = [cfg['mountpoint_binary']]

os.makedirs(MP_LOGS_DIRECTORY, exist_ok=True)

bucket = cfg['s3_bucket']

mountpoint_version_output = subprocess.check_output([*mountpoint_args, "--version"]).decode("utf-8")
log.info("Mountpoint version: %s", mountpoint_version_output.strip())

Expand Down Expand Up @@ -118,6 +128,10 @@ def _mount_mp(
for network_interface in cfg['network']['interface_names']:
subprocess_args.append(f"--bind={network_interface}")
if (max_throughput := cfg['network']['maximum_throughput_gbps']) is not None:
if stub_mode == "s3_client":
raise ValueError(
"should not use `stub_mode=s3_client` with `maximum_throughput_gbps`, throughput will be limited"
)
subprocess_args.append(f"--maximum-throughput-gbps={max_throughput}")

if cfg['mountpoint_max_background'] is not None:
Expand All @@ -126,14 +140,16 @@ def _mount_mp(
if cfg['mountpoint_congestion_threshold'] is not None:
subprocess_env["UNSTABLE_MOUNTPOINT_CONGESTION_THRESHOLD"] = str(cfg["mountpoint_congestion_threshold"])

stub_mode = str(cfg["stub_mode"]).lower()
if stub_mode != "off" and cfg["mountpoint_binary"] is not None:
raise ValueError("Cannot use `stub_mode` with `mountpoint_binary`, `stub_mode` requires recompilation")
match stub_mode:
case "off":
pass
case "fs_handler":
subprocess_env["MOUNTPOINT_BUILD_STUB_FS_HANDLER"] = "1"
case "s3_client":
# Already handled when building cargo command
pass
case _:
raise ValueError(f"Unknown stub_mode: {stub_mode}")

Expand Down
2 changes: 1 addition & 1 deletion benchmark/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mountpoint_congestion_threshold: !!null
with_bwm: false

# Works automatically ONLY where this script manages compilation. It has no effect if `mountpoint_binary` is set.
stub_mode: "off" # fs_handler
stub_mode: "off" # fs_handler, s3_client

iterations: 1

Expand Down
27 changes: 21 additions & 6 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ use super::MockBackpressureHandle;
/// TODO: make it bi-directional, so that upload throughput can be simulated as well.
pub struct ThroughputMockClient {
inner: MockClient,
/// A throughput rate limiter with one token per byte
rate_limiter: LeakyBucket,
/// A throughput rate limiter with one token per byte.
///
/// If [None], there will be no limit on throughput.
rate_limiter: Option<LeakyBucket>,
}

impl ThroughputMockClient {
Expand All @@ -46,7 +48,8 @@ impl ThroughputMockClient {
.refill_amount(bytes_per_interval as u32)
.max(config.part_size as u32)
.tokens(0)
.build();
.build()
.into();
tracing::info!(?rate_limiter, "new client");

Self {
Expand All @@ -55,6 +58,16 @@ impl ThroughputMockClient {
}
}

/// Create a new [ThroughputMockClient] with the given configuration and no throughput limits.
///
/// This is effectively the same as a [MockClient], but allows you to use the [ThroughputMockClient] type.
pub fn new_unlimited_throughput(config: MockClientConfig) -> Self {
Self {
inner: MockClient::new(config),
rate_limiter: None,
}
}

/// Add an object to this mock client's bucket
pub fn add_object(&self, key: &str, value: MockObject) {
self.inner.add_object(key, value);
Expand All @@ -65,7 +78,7 @@ impl ThroughputMockClient {
pub struct ThroughputGetObjectResponse {
#[pin]
request: MockGetObjectResponse,
rate_limiter: LeakyBucket,
rate_limiter: Option<LeakyBucket>,
}

#[cfg_attr(not(docsrs), async_trait)]
Expand Down Expand Up @@ -94,8 +107,10 @@ impl Stream for ThroughputGetObjectResponse {
this.request.poll_next(cx).map(|next| {
next.map(|item| {
item.inspect(|body_part| {
// Acquire enough tokens for the number of bytes we want to deliver
block_on(this.rate_limiter.acquire(body_part.data.len() as u32));
if let Some(rate_limiter) = this.rate_limiter {
// Acquire enough tokens for the number of bytes we want to deliver
block_on(rate_limiter.acquire(body_part.data.len() as u32));
}
})
})
})
Expand Down
34 changes: 23 additions & 11 deletions mountpoint-s3/src/bin/mock-mount-s3.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! A version of `mount-s3` that targets an in-memory mock S3 backend rather than the real service.
//!
//! The mock S3 backend supports simulating a target network throughput. The
//! --maximum-throughput-gbps command-line argument can be used to set the target throughput, which
//! defaults to 10Gbps.
//! The mock S3 backend supports simulating a target network throughput.
//! The `--maximum-throughput-gbps` command-line argument can be used to optionally limit download throughput.
//!
//! As a safety measure, this binary works only if the bucket name begins with "sthree-". This makes
//! sure we can't accidentally confuse this binary with a real `mount-s3` in any of our testing or
Expand All @@ -12,7 +11,6 @@

use std::sync::Arc;

use anyhow::anyhow;
use clap::Parser;
use futures::executor::ThreadPool;

Expand Down Expand Up @@ -45,21 +43,28 @@ fn create_mock_client(args: &CliArgs) -> anyhow::Result<(Arc<ThroughputMockClien

tracing::warn!("using mock client");

let Some(max_throughput_gbps) = args.maximum_throughput_gbps else {
return Err(anyhow!(
"must set --maximum-throughput-gbps when using mock-mount-s3 binary"
));
// TODO: Actually update the mock client to support different part sizes
let part_size = {
if args.read_part_size.is_some() || args.write_part_size.is_some() {
tracing::warn!("mock client does not support separate part sizes for reading and writing, ignoring");
}
args.part_size
};
tracing::info!("mock client target network throughput {max_throughput_gbps} Gbps");

let config = MockClientConfig {
bucket: bucket_name,
part_size: args.part_size as usize,
part_size: part_size as usize,
unordered_list_seed: None,
enable_backpressure: true,
initial_read_window_size: 1024 * 1024 + 128 * 1024, // matching real MP
};
let client = ThroughputMockClient::new(config, max_throughput_gbps as f64);

let client = if let Some(max_throughput_gbps) = args.maximum_throughput_gbps {
tracing::info!("mock client limited to {max_throughput_gbps} Gb/s download throughput");
ThroughputMockClient::new(config, max_throughput_gbps as f64)
} else {
ThroughputMockClient::new_unlimited_throughput(config)
};

let runtime = Runtime::new(ThreadPool::builder().name_prefix("runtime").create()?);

Expand All @@ -85,6 +90,13 @@ fn create_mock_client(args: &CliArgs) -> anyhow::Result<(Arc<ThroughputMockClien
};
client.add_object(&key, MockObject::ramp(0x11, size as usize, ETag::for_tests()));
}
// Some objects that are useful for benchmarking
for job_num in 0..1024 {
let size_gib = 100;
let size_bytes = size_gib * 1024u64.pow(3);
let key = format!("j{job_num}_{size_gib}GiB.bin");
client.add_object(&key, MockObject::constant(1u8, size_bytes as usize, ETag::for_tests()));
}
client.add_object("hello.txt", MockObject::from_bytes(b"hello world", ETag::for_tests()));
client.add_object("empty", MockObject::from_bytes(b"", ETag::for_tests()));
client.add_object(
Expand Down
Loading