Skip to content

Commit 8adc754

Browse files
authored
Fix internal failure on atomic upload (#1733)
Improve handling of errors on `CreateMultiPartUpload` in the atomic upload code path. Similarly to the change in #1728, the issue only manifests when attempting to further write or complete an upload after an error and it does not affect Mountpoint file system users, since that's already prevented at that level. ### Does this change impact existing behavior? No, user-visible behavior not impacted. ### Does this change need a changelog entry? Does it require a version change? No. --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). Signed-off-by: Alessandro Passaro <[email protected]>
1 parent 7520e72 commit 8adc754

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

mountpoint-s3-client/src/failure_client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,10 @@ pub fn countdown_failure_client<Client: ObjectClient>(
435435
},
436436
result_fn: |state| {
437437
state.count += 1;
438-
if state.count >= state.fail_count {
439-
Err(state.error.take().unwrap())
438+
if state.count >= state.fail_count
439+
&& let Some(error) = state.error.take()
440+
{
441+
Err(error)
440442
} else {
441443
Ok(())
442444
}

mountpoint-s3-fs/src/upload/atomic.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ where
109109
}
110110

111111
self.hasher.update(data);
112-
self.request.get_mut().await?.unwrap().write(data).await?;
112+
self.request
113+
.get_mut()
114+
.await?
115+
.ok_or(UploadError::UploadAlreadyTerminated)?
116+
.write(data)
117+
.await?;
113118

114119
self.next_request_offset += data.len() as u64;
115120
Ok(data.len())
@@ -122,7 +127,7 @@ where
122127
.request
123128
.into_inner()
124129
.await?
125-
.unwrap()
130+
.ok_or(UploadError::UploadAlreadyTerminated)?
126131
.review_and_complete(move |review| verify_checksums(review, size, checksum))
127132
.await?;
128133
if let Err(err) = self
@@ -306,6 +311,7 @@ mod tests {
306311
let mut put_failures = HashMap::new();
307312
put_failures.insert(1, Ok((1, MockClientError("error".to_owned().into()))));
308313
put_failures.insert(2, Ok((2, MockClientError("error".to_owned().into()))));
314+
put_failures.insert(3, Err(MockClientError("error".to_owned().into()).into()));
309315

310316
let failure_client = Arc::new(countdown_failure_client(
311317
client.clone(),
@@ -338,6 +344,25 @@ mod tests {
338344
}
339345
assert!(!client.is_upload_in_progress(key));
340346
assert!(!client.contains_key(key));
347+
348+
// Third request fails on first write (because CreateMPU returns an error).
349+
{
350+
let mut request = uploader.start_atomic_upload(bucket.to_owned(), key.to_owned()).unwrap();
351+
352+
let data = b"foo";
353+
request.write(0, data).await.expect_err("first write should fail");
354+
355+
let err = request
356+
.write(0, data)
357+
.await
358+
.expect_err("subsequent writes should also fail");
359+
assert!(matches!(err, UploadError::UploadAlreadyTerminated));
360+
361+
let err = request.complete().await.expect_err("complete should also fail");
362+
assert!(matches!(err, UploadError::UploadAlreadyTerminated));
363+
}
364+
assert!(!client.is_upload_in_progress(key));
365+
assert!(!client.contains_key(key));
341366
}
342367

343368
#[test_case(8000; "divisible by max size")]

0 commit comments

Comments
 (0)