Skip to content

Commit 57e1d23

Browse files
authored
s3io_benchmark: Add test object generator (#1767)
**What changed and why?** Currently, `s3io_benchmark` requires s3 object to exist in the bucket (for `read` jobs) which is inconvenient. - Added ability to automatically generate new test objects during initialization for read jobs. - Uses separate S3 CRT/Uploader client stack to not influence benchmark jobs. - Didn't use Rust AWS SDK as it's slower than CRT - Didn't use Rust Transfer Manager as it's not GA yet - Added new config flag `generate_object` with default value of `true`. - Manually tested generating 100GB object. - Automatically overwrite write part size if it exceeds 10K max parts S3 MPU limit - Do not perform upload if object with correct size already exists ### Does this change impact existing behavior? Only benchmark which is currently not used anywhere. ### Does this change need a changelog entry? Does it require a version change? No - benchmark script only. --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). --------- Signed-off-by: Yerzhan Mazhkenov <[email protected]>
1 parent 6976976 commit 57e1d23

File tree

4 files changed

+152
-5
lines changed

4 files changed

+152
-5
lines changed

mountpoint-s3-fs/examples/s3io_benchmark/config.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::path::Path;
44
use std::time::Duration;
55
use thiserror::Error;
66

7+
use crate::test_object_generator::generate_test_objects;
8+
79
/// Top-level configuration structure
810
#[derive(Debug, Clone, Deserialize, PartialEq)]
911
pub struct Config {
@@ -58,6 +60,9 @@ pub struct JobConfig {
5860
/// If specified then job is time-based instead of reading until total bytes equal to file size.
5961
#[serde(default, with = "humantime_serde")]
6062
pub iteration_duration: Option<Duration>,
63+
/// Whether to generate test objects before running read benchmarks.
64+
/// Only applies to read workloads. Default: true.
65+
pub generate_object: Option<bool>,
6166
}
6267

6368
/// Configuration for a single job execution
@@ -77,6 +82,7 @@ pub struct ResolvedJobConfig {
7782
pub iterations: usize,
7883
pub max_duration: Option<Duration>,
7984
pub iteration_duration: Option<Duration>,
85+
pub generate_object: bool,
8086
}
8187

8288
/// Workload type: read or write
@@ -144,7 +150,7 @@ pub fn parse_config_string(content: &str) -> Result<Config, ConfigError> {
144150
}
145151

146152
/// Prepare jobs by resolving configuration inheritance and validating
147-
pub fn prepare_jobs(config: Config) -> Result<Vec<ResolvedJobConfig>, ConfigError> {
153+
pub async fn prepare_jobs(config: Config) -> Result<Vec<ResolvedJobConfig>, ConfigError> {
148154
// Validate global network interfaces if specified
149155
if let Some(bind) = &config.global.bind {
150156
if bind.is_empty() {
@@ -194,7 +200,9 @@ pub fn prepare_jobs(config: Config) -> Result<Vec<ResolvedJobConfig>, ConfigErro
194200
}
195201
}
196202

197-
// TODO: Generate/upload test objects for read workloads
203+
generate_test_objects(&resolved_jobs, &config.global)
204+
.await
205+
.map_err(|e| ConfigError::Validation(format!("Object generation failed: {}", e)))?;
198206

199207
Ok(resolved_jobs)
200208
}
@@ -254,6 +262,12 @@ fn merge_and_resolve(job_name: &str, job: &JobConfig, global: &GlobalConfig) ->
254262
// iteration_duration: Optional, no default (random read only)
255263
let iteration_duration = job.iteration_duration.or(global.job_defaults.iteration_duration);
256264

265+
// generate_object: Optional with default of true
266+
let generate_object = job
267+
.generate_object
268+
.or(global.job_defaults.generate_object)
269+
.unwrap_or(true);
270+
257271
Ok(ResolvedJobConfig {
258272
name: job_name.to_string(),
259273
workload_type,
@@ -268,5 +282,6 @@ fn merge_and_resolve(job_name: &str, job: &JobConfig, global: &GlobalConfig) ->
268282
iterations,
269283
max_duration,
270284
iteration_duration,
285+
generate_object,
271286
})
272287
}

mountpoint-s3-fs/examples/s3io_benchmark/executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ pub enum ExecutionError {
3535
}
3636

3737
pub struct Executor {
38-
client: S3CrtClient,
39-
uploader: Uploader<S3CrtClient>,
38+
pub client: S3CrtClient,
39+
pub uploader: Uploader<S3CrtClient>,
4040
prefetcher: Prefetcher<S3CrtClient>,
4141
}
4242

mountpoint-s3-fs/examples/s3io_benchmark/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod config;
22
mod executor;
33
mod monitoring;
44
mod results;
5+
mod test_object_generator;
56

67
use anyhow::{Context, Result};
78
use clap::Parser;
@@ -45,7 +46,7 @@ async fn run_benchmark() -> Result<()> {
4546
let config = parse_config_file(&cli.config_file).context("Failed to load configuration file")?;
4647

4748
eprintln!("Preparing and validating jobs...");
48-
let resolved_jobs = prepare_jobs(config.clone()).context("Failed to prepare jobs")?;
49+
let resolved_jobs = prepare_jobs(config.clone()).await.context("Failed to prepare jobs")?;
4950

5051
eprintln!("Found {} job(s) to execute", resolved_jobs.len());
5152

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use mountpoint_s3_client::ObjectClient;
2+
use mountpoint_s3_client::types::HeadObjectParams;
3+
use thiserror::Error;
4+
5+
use crate::config::{GlobalConfig, ResolvedJobConfig, WorkloadType};
6+
use crate::executor::Executor;
7+
8+
/// S3 multipart upload has a hard limit of 10,000 parts per upload
9+
/// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
10+
const MAX_PARTS: u64 = 10_000;
11+
12+
#[derive(Debug, Error)]
13+
pub enum ObjectGenerationError {
14+
#[error("Setup failed: {0}")]
15+
Setup(String),
16+
#[error("Upload failed for '{key}': {reason}")]
17+
Upload { key: String, reason: String },
18+
}
19+
20+
// Note: This intentionally creates a separate Executor instance to ensure
21+
// test object generation doesn't influence benchmark jobs
22+
pub async fn generate_test_objects(
23+
jobs: &[ResolvedJobConfig],
24+
global: &GlobalConfig,
25+
) -> Result<(), ObjectGenerationError> {
26+
let jobs_requiring_generation: Vec<&ResolvedJobConfig> = jobs
27+
.iter()
28+
.filter(|job| job.workload_type == WorkloadType::Read && job.generate_object)
29+
.collect();
30+
31+
if jobs_requiring_generation.is_empty() {
32+
return Ok(());
33+
}
34+
35+
// Override write_part_size if any job would exceed S3's 10,000 part limit
36+
let mut adjusted_global = global.clone();
37+
let max_object_size = jobs_requiring_generation
38+
.iter()
39+
.map(|job| job.object_size)
40+
.max()
41+
.unwrap_or(0);
42+
let default_write_part_size = global.write_part_size.unwrap_or(8 * 1024 * 1024) as u64;
43+
let min_required_part_size = max_object_size.div_ceil(MAX_PARTS);
44+
if min_required_part_size > default_write_part_size {
45+
adjusted_global.write_part_size = Some(min_required_part_size as usize);
46+
eprintln!(
47+
"Test Object Generator: Adjusted write_part_size from {} to {} bytes to stay within S3's 10,000 part limit for object size {} bytes",
48+
default_write_part_size, min_required_part_size, max_object_size
49+
);
50+
}
51+
52+
let executor = Executor::new(&adjusted_global).map_err(|e| ObjectGenerationError::Setup(e.to_string()))?;
53+
for job in jobs_requiring_generation {
54+
// Skip generation if object already exists with correct size
55+
match executor
56+
.client
57+
.head_object(&job.bucket, &job.object_key, &HeadObjectParams::new())
58+
.await
59+
{
60+
Ok(head_result) => {
61+
if head_result.size == job.object_size {
62+
eprintln!(
63+
"Test object for job '{}' already exists with correct size: key={}, size={} bytes",
64+
job.name, job.object_key, job.object_size
65+
);
66+
continue;
67+
} else {
68+
eprintln!(
69+
"Test object for job '{}' exists but has wrong size (expected: {}, actual: {}), re-uploading: key={}",
70+
job.name, job.object_size, head_result.size, job.object_key
71+
);
72+
}
73+
}
74+
Err(_) => {
75+
eprintln!(
76+
"Test object for job '{}' does not exist, uploading: key={}",
77+
job.name, job.object_key
78+
);
79+
}
80+
}
81+
82+
upload_test_object(&executor, &job.bucket, &job.object_key, job.object_size, job.write_size).await?;
83+
eprintln!(
84+
"Generated test object for job '{}': key={}, size={} bytes",
85+
job.name, job.object_key, job.object_size
86+
);
87+
}
88+
89+
Ok(())
90+
}
91+
92+
async fn upload_test_object(
93+
executor: &Executor,
94+
bucket: &str,
95+
key: &str,
96+
size: u64,
97+
write_size: usize,
98+
) -> Result<(), ObjectGenerationError> {
99+
let mut request = executor
100+
.uploader
101+
.start_atomic_upload(bucket.to_string(), key.to_string())
102+
.map_err(|e| ObjectGenerationError::Upload {
103+
key: key.to_string(),
104+
reason: format!("Failed to start upload: {}", e),
105+
})?;
106+
107+
let contents = vec![0xab; write_size];
108+
let mut offset = 0u64;
109+
110+
while offset < size {
111+
let remaining = size - offset;
112+
let chunk_size = remaining.min(write_size as u64) as usize;
113+
114+
let bytes_written = request
115+
.write(offset as i64, &contents[..chunk_size])
116+
.await
117+
.map_err(|e| ObjectGenerationError::Upload {
118+
key: key.to_string(),
119+
reason: format!("Write failed at offset {}: {}", offset, e),
120+
})?;
121+
122+
offset += bytes_written as u64;
123+
}
124+
125+
request.complete().await.map_err(|e| ObjectGenerationError::Upload {
126+
key: key.to_string(),
127+
reason: format!("Failed to complete upload: {}", e),
128+
})?;
129+
130+
Ok(())
131+
}

0 commit comments

Comments
 (0)