Skip to content

Commit

Permalink
loader: create S3 bucket objects (which need credentials) on demand
Browse files Browse the repository at this point in the history
The write VFS (that produces replication data) sometimes needs a
loader, but usually doesn't.  Before this commit, creating a loader
could perform (local) network I/O to get credentials.  We now
wait until we actually have to make an S3 API call (to fetch a
chunk) before getting the credentials.

Sample debug printer output:

```
$ cargo test loader -- --nocapture
   Compiling verneuil v0.6.4 (/home/pkhuong/open-sauce/verneuil)
    Finished `test` profile [unoptimized + debuginfo] target(s) in 3.28s
     Running unittests src/lib.rs (target/debug/deps/verneuil-106bf643236b6261)

running 1 test
Delayed Loader: Loader { cache: Cache { write_side: None, auto_sync: true, read_side: ReadOnlyCache { stack: [] } }, remote_sources: None, known_chunks: {} }
Resolved Loader: Loader { cache: Cache { write_side: None, auto_sync: true, read_side: ReadOnlyCache { stack: [] } }, remote_sources: [RedactedBucket { name: "test-bucket", region: UsEast1 }], known_chunks: {} }
test loader::tests::test_loader_no_credential ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 44 filtered out; finished in 0.00s
```
  • Loading branch information
pkhuong committed Oct 6, 2024
1 parent d2fecae commit c21e44e
Showing 1 changed file with 50 additions and 13 deletions.
63 changes: 50 additions & 13 deletions src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ impl std::fmt::Debug for LazyVecBucket {
}

impl LazyVecBucket {
fn new_from_buckets(buckets: Vec<Bucket>) -> Self {
fn new(builder: Box<VecBucketBuilder>) -> Self {
LazyVecBucket {
builder: Default::default(),
buckets: Box::new(Ok(buckets)).into(),
builder: Mutex::new(Some(builder)),
buckets: Default::default(),
}
}

Expand Down Expand Up @@ -376,19 +376,30 @@ impl Loader {
mut cache_builder: kismet_cache::CacheBuilder,
remote: &[ReplicationTarget],
) -> Result<Loader> {
let mut remote_sources = Vec::new();
// Precompute S3 target specs for `bucket_builder`.
let s3_specs = remote
.iter()
.filter(|x| matches!(x, ReplicationTarget::S3(_)))
.cloned()
.collect::<Vec<_>>();

let bucket_builder = move || {
if s3_specs.is_empty() {
return Ok(Vec::new());
}

// We only care about remote S3 sources.
if remote.iter().any(|x| matches!(x, ReplicationTarget::S3(_))) {
let creds =
Credentials::default().map_err(|e| chain_error!(e, "failed to get credentials"))?;

for source in remote {
if let Some(bucket) = create_source(source, &creds, |s3| &s3.chunk_bucket)? {
let mut remote_sources = Vec::new();
for source in s3_specs {
if let Some(bucket) = create_source(&source, &creds, |s3| &s3.chunk_bucket)? {
remote_sources.push(bucket);
}
}
}

Ok(remote_sources)
};

cache_builder = apply_cache_replication_targets(cache_builder, remote);

Expand All @@ -404,7 +415,7 @@ impl Loader {

Ok(Loader {
cache: cache_builder.build(),
remote_sources: LazyVecBucket::new_from_buckets(remote_sources),
remote_sources: LazyVecBucket::new(Box::new(bucket_builder)),
known_chunks: well_known_chunks()
.into_iter()
.map(|chunk| (chunk.fprint(), chunk))
Expand Down Expand Up @@ -735,14 +746,40 @@ mod tests {

#[test]
fn test_loader_no_credential() {
let bucket = Bucket::new_public("test-bucket", awsregion::Region::UsEast1).unwrap();
let called_flag = Arc::new(Mutex::new(false));

let builder_flag = called_flag.clone();
let builder = move || {
*builder_flag.lock().unwrap() = true;
Ok(vec![Bucket::new_public(
"test-bucket",
awsregion::Region::UsEast1,
)
.unwrap()])
};

let loader = Loader {
cache: kismet_cache::CacheBuilder::new().build(),
remote_sources: LazyVecBucket::new_from_buckets(vec![bucket]),
remote_sources: LazyVecBucket::new(Box::new(builder)),
known_chunks: HashMap::new(),
};

println!("Loader: {:?}", loader);
println!("Delayed Loader: {:?}", loader);
let debug_output = format!("{:?}", loader);
assert!(!debug_output.to_lowercase().contains("credential"));
assert!(!debug_output.to_lowercase().contains("key"));
assert!(!debug_output.to_lowercase().contains("token"));

// Should still be delayed
assert!(!*called_flag.lock().unwrap());

// Force computation
let _ = loader.remote_sources.buckets();

// Should have called the builder
assert!(*called_flag.lock().unwrap());

println!("Resolved Loader: {:?}", loader);
let debug_output = format!("{:?}", loader);
assert!(!debug_output.to_lowercase().contains("credential"));
assert!(!debug_output.to_lowercase().contains("key"));
Expand Down

0 comments on commit c21e44e

Please sign in to comment.