Skip to content

Commit c15079d

Browse files
authored
Minor fixes to client and prefetch benchmarks for consistency (#1518)
This change makes prefetch and client benchmarks consistent simplifying the automation ### Does this change impact existing behavior? No, client and prefetch benchmarks only ### Does this change need a changelog entry? Does it require a version change? No, client and prefetch benchmarks only --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). --------- Signed-off-by: Sahitya Damera <[email protected]>
1 parent 1623edb commit c15079d

File tree

2 files changed

+52
-69
lines changed

2 files changed

+52
-69
lines changed

mountpoint-s3-client/examples/client_benchmark.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use tracing_subscriber::EnvFilter;
1818
use tracing_subscriber::fmt::Subscriber;
1919
use tracing_subscriber::util::SubscriberInitExt;
2020

21+
const SECONDS_PER_DAY: u64 = 86400;
22+
2123
/// Like `tracing_subscriber::fmt::init` but sends logs to stderr
2224
fn init_tracing_subscriber() {
2325
RustLogAdapter::try_init().expect("unable to install CRT log adapter");
@@ -44,9 +46,8 @@ fn run_benchmark(
4446
let total_start = Instant::now();
4547
let mut iter_results = Vec::new();
4648
let mut iteration = 0;
47-
let timeout: Instant = total_start
48-
.checked_add(max_duration.unwrap_or(Duration::from_secs(86400)))
49-
.expect("Duration overflow error");
49+
let duration = max_duration.unwrap_or(Duration::from_secs(SECONDS_PER_DAY));
50+
let timeout: Instant = total_start.checked_add(duration).expect("Duration overflow error");
5051

5152
while iteration < num_iterations && Instant::now() < timeout {
5253
let iter_start = Instant::now();
@@ -146,7 +147,7 @@ fn run_benchmark(
146147
"summary": {
147148
"total_bytes": total_bytes,
148149
"total_elapsed_seconds": total_elapsed.as_secs_f64(),
149-
"max_duration_seconds": max_duration,
150+
"max_duration_seconds": duration,
150151
"iterations": iter_results.len(),
151152
},
152153
"iterations": iter_results

mountpoint-s3-fs/examples/prefetch_benchmark.rs

Lines changed: 47 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::error::Error;
22
use std::path::PathBuf;
33
use std::sync::Arc;
4-
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4+
use std::sync::atomic::{AtomicU64, Ordering};
55
use std::thread;
66
use std::time::{Duration, Instant};
77

@@ -21,6 +21,8 @@ use tracing_subscriber::EnvFilter;
2121
use tracing_subscriber::fmt::Subscriber;
2222
use tracing_subscriber::util::SubscriberInitExt;
2323

24+
const SECONDS_PER_DAY: u64 = 86400;
25+
2426
/// Like `tracing_subscriber::fmt::init` but sends logs to stderr
2527
fn init_tracing_subscriber() {
2628
RustLogAdapter::try_init().expect("should succeed as first and only adapter init call");
@@ -98,28 +100,28 @@ pub struct CliArgs {
98100

99101
#[arg(
100102
long,
101-
help = "Maximum runtime in seconds (overrides iterations if specified)",
102-
value_name = "SECONDS"
103+
help = "Maximum duration in seconds (overrides iterations if specified)",
104+
value_name = "SECONDS",
105+
value_parser = parse_duration,
103106
)]
104-
runtime: Option<u64>,
107+
max_duration: Option<Duration>,
105108

106109
#[arg(
107-
long,
108-
help = "Number of concurrent downloads per object",
109-
default_value_t = 1,
110-
value_name = "N"
111-
)]
112-
downloads_per_object: usize,
113-
114-
#[clap(
115110
long,
116111
help = "One or more network interfaces to use when accessing S3. Requires Linux 5.7+ or running as root.",
117-
value_name = "NETWORK_INTERFACE"
112+
value_name = "NETWORK_INTERFACE",
113+
value_delimiter = ','
118114
)]
119-
pub bind: Option<Vec<String>>,
115+
bind: Option<Vec<String>>,
120116

121117
#[clap(long, help = "Output file to write the results to", value_name = "OUTPUT_FILE")]
122-
pub output_file: Option<PathBuf>,
118+
output_file: Option<PathBuf>,
119+
}
120+
121+
fn parse_duration(arg: &str) -> Result<Duration, String> {
122+
arg.parse::<u64>()
123+
.map(Duration::from_secs)
124+
.map_err(|e| format!("Invalid duration: {e}"))
123125
}
124126

125127
fn create_memory_limiter(args: &CliArgs, client: &S3CrtClient) -> Arc<MemoryLimiter<S3CrtClient>> {
@@ -155,21 +157,13 @@ fn main() -> anyhow::Result<()> {
155157
})
156158
.collect::<anyhow::Result<Vec<_>>>()?;
157159

158-
let runtime_exceeded = Arc::new(AtomicBool::new(false));
159160
let total_start = Instant::now();
160-
if let Some(runtime) = args.runtime {
161-
thread::spawn({
162-
let runtime_exceeded = runtime_exceeded.clone();
163-
move || {
164-
thread::sleep(Duration::from_secs(runtime));
165-
runtime_exceeded.store(true, Ordering::SeqCst);
166-
}
167-
});
168-
}
169161
let mut iteration = 0;
170162
let mut total_bytes = 0;
171163
let mut iter_results = Vec::new();
172-
while iteration < args.iterations && !runtime_exceeded.load(Ordering::SeqCst) {
164+
let max_duration = args.max_duration.unwrap_or(Duration::from_secs(SECONDS_PER_DAY));
165+
let timeout: Instant = total_start.checked_add(max_duration).expect("Duration overflow error");
166+
while iteration < args.iterations && Instant::now() < timeout {
173167
let received_bytes = Arc::new(AtomicU64::new(0));
174168
let start = Instant::now();
175169
let manager = Prefetcher::default_builder(client.clone()).build(
@@ -182,34 +176,26 @@ fn main() -> anyhow::Result<()> {
182176
let mut download_tasks = Vec::new();
183177

184178
for (object_id, size) in &object_metadata {
185-
for _ in 0..args.downloads_per_object {
186-
let received_bytes = received_bytes.clone();
187-
let object_id = object_id.clone();
188-
let request = manager.prefetch(bucket.to_string(), object_id.clone(), *size);
189-
let read_size = args.read_size;
190-
let runtime_exceeded_clone = runtime_exceeded.clone();
191-
192-
let task = scope.spawn(move || {
193-
let result = block_on(wait_for_download(
194-
request,
195-
*size,
196-
read_size as u64,
197-
runtime_exceeded_clone,
198-
));
199-
if let Ok(bytes_read) = result {
200-
received_bytes.fetch_add(bytes_read, Ordering::SeqCst);
201-
} else {
202-
// As object download failures can produce
203-
// misleading results, exit the benchmarks
204-
// to avoid confusion.
205-
eprintln!("Download failed: {:?}", result.err());
206-
eprintln!("Exiting benchmarks due to download failure");
207-
std::process::exit(1);
208-
}
209-
});
210-
211-
download_tasks.push(task);
212-
}
179+
let received_bytes = received_bytes.clone();
180+
let object_id = object_id.clone();
181+
let request = manager.prefetch(bucket.to_string(), object_id.clone(), *size);
182+
let read_size = args.read_size;
183+
184+
let task = scope.spawn(move || {
185+
let result = block_on(wait_for_download(request, *size, read_size as u64, timeout));
186+
if let Ok(bytes_read) = result {
187+
received_bytes.fetch_add(bytes_read, Ordering::SeqCst);
188+
} else {
189+
// As object download failures can produce
190+
// misleading results, exit the benchmarks
191+
// to avoid confusion.
192+
eprintln!("Download failed: {:?}", result.err());
193+
eprintln!("Exiting benchmarks due to download failure");
194+
std::process::exit(1);
195+
}
196+
});
197+
198+
download_tasks.push(task);
213199
}
214200

215201
for task in download_tasks {
@@ -228,7 +214,7 @@ fn main() -> anyhow::Result<()> {
228214
iter_results.push(json!({
229215
"iteration": iteration,
230216
"bytes": received_size,
231-
"duration_seconds": elapsed.as_secs_f64(),
217+
"elapsed_seconds": elapsed.as_secs_f64(),
232218
}));
233219
iteration += 1;
234220
}
@@ -244,7 +230,8 @@ fn main() -> anyhow::Result<()> {
244230
let results = json!({
245231
"summary": {
246232
"total_bytes": total_bytes,
247-
"duration_seconds": total_elapsed.as_secs_f64(),
233+
"total_elapsed_seconds": total_elapsed.as_secs_f64(),
234+
"max_duration_seconds": max_duration,
248235
"iterations": iteration,
249236
},
250237
"iterations": iter_results
@@ -259,11 +246,11 @@ async fn wait_for_download(
259246
mut request: PrefetchGetObject<S3CrtClient>,
260247
size: u64,
261248
read_size: u64,
262-
runtime_exceeded: Arc<AtomicBool>,
249+
timeout: Instant,
263250
) -> Result<u64, Box<dyn Error>> {
264251
let mut offset = 0;
265252
let mut total_bytes_read = 0;
266-
while offset < size && !runtime_exceeded.load(Ordering::SeqCst) {
253+
while offset < size && Instant::now() < timeout {
267254
let bytes = request.read(offset, read_size as usize).await?;
268255
let bytes_read = bytes.len() as u64;
269256
offset += bytes_read;
@@ -287,13 +274,8 @@ fn make_s3_client_from_args(args: &CliArgs) -> anyhow::Result<S3CrtClient> {
287274
if let Some(part_size) = args.part_size {
288275
client_config = client_config.part_size(part_size as usize);
289276
}
290-
if let Some(interfaces) = &args.bind {
291-
let nics: Vec<String> = interfaces
292-
.iter()
293-
.flat_map(|iface| iface.split(',').map(|s| s.trim().to_string()))
294-
.filter(|s| !s.is_empty())
295-
.collect();
296-
client_config = client_config.network_interface_names(nics);
277+
if let Some(nics) = &args.bind {
278+
client_config = client_config.network_interface_names(nics.to_vec());
297279
}
298280
Ok(S3CrtClient::new(client_config)?)
299281
}

0 commit comments

Comments
 (0)