Skip to content
This repository was archived by the owner on Dec 27, 2023. It is now read-only.

Commit fad4c63

Browse files
authored
Merge pull request #104 from olegmoz/87-s3-multipart-abort
#87 - Aborting multipart upload if failed to upload content
2 parents 93673be + 978091c commit fad4c63

File tree

3 files changed

+75
-3
lines changed

3 files changed

+75
-3
lines changed

src/main/java/com/artipie/asto/s3/MultipartUpload.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.reactivestreams.Publisher;
3939
import software.amazon.awssdk.core.async.AsyncRequestBody;
4040
import software.amazon.awssdk.services.s3.S3AsyncClient;
41+
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
4142
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
4243
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
4344
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
@@ -151,6 +152,21 @@ public CompletionStage<Void> complete() {
151152
).thenApply(ignored -> null);
152153
}
153154

155+
/**
156+
* Aborts the upload.
157+
*
158+
* @return Completion stage which is completed when success response received from S3.
159+
*/
160+
public CompletionStage<Void> abort() {
161+
return this.client.abortMultipartUpload(
162+
AbortMultipartUploadRequest.builder()
163+
.bucket(this.bucket)
164+
.key(this.key.string())
165+
.uploadId(this.id)
166+
.build()
167+
).thenApply(ignored -> null);
168+
}
169+
154170
/**
155171
* Uploads part.
156172
*

src/main/java/com/artipie/asto/s3/S3Storage.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Optional;
3434
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionStage;
3536
import java.util.stream.Collectors;
3637
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3738
import software.amazon.awssdk.core.async.SdkPublisher;
@@ -50,6 +51,12 @@
5051
* Storage that holds data in S3 storage.
5152
*
5253
* @since 0.15
54+
* @todo #87:60min Do not await abort to complete if save() failed.
55+
* In case uploading content fails inside {@link S3Storage#save(Key, Content)} method
56+
* we are doing abort() for multipart upload.
57+
* Also whole operation does not complete until abort() is complete.
58+
* It would be better to finish save() operation right away and do abort() in background,
59+
* but it makes testing the method difficult.
5360
*/
5461
public final class S3Storage implements Storage {
5562

@@ -123,9 +130,22 @@ public CompletableFuture<Void> save(final Key key, final Content content) {
123130
).thenApply(
124131
created -> new MultipartUpload(this.client, this.bucket, key, created.uploadId())
125132
).thenCompose(
126-
upload -> upload.upload(content)
127-
.thenCompose(ignored -> upload.complete())
128-
).thenApply(response -> null);
133+
upload -> upload.upload(content).handle(
134+
(ignored, throwable) -> {
135+
final CompletionStage<Void> finished;
136+
if (throwable == null) {
137+
finished = upload.complete();
138+
} else {
139+
final CompletableFuture<Void> promise = new CompletableFuture<>();
140+
finished = promise;
141+
upload.abort().whenComplete(
142+
(result, ex) -> promise.completeExceptionally(throwable)
143+
);
144+
}
145+
return finished;
146+
}
147+
).thenCompose(self -> self)
148+
);
129149
}
130150

131151
@Override

src/test/java/com/artipie/asto/S3StorageTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,26 @@
2525

2626
import com.adobe.testing.s3mock.junit5.S3MockExtension;
2727
import com.amazonaws.services.s3.AmazonS3;
28+
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
29+
import com.amazonaws.services.s3.model.MultipartUpload;
2830
import com.amazonaws.services.s3.model.ObjectMetadata;
2931
import com.amazonaws.services.s3.model.S3Object;
3032
import com.artipie.asto.blocking.BlockingStorage;
3133
import com.artipie.asto.s3.S3Storage;
3234
import com.google.common.io.ByteStreams;
35+
import io.reactivex.Flowable;
3336
import java.io.ByteArrayInputStream;
3437
import java.net.URI;
3538
import java.util.Arrays;
3639
import java.util.Collection;
40+
import java.util.List;
3741
import java.util.UUID;
3842
import java.util.stream.Collectors;
3943
import org.hamcrest.MatcherAssert;
4044
import org.hamcrest.Matchers;
45+
import org.hamcrest.collection.IsEmptyIterable;
4146
import org.hamcrest.core.IsEqual;
47+
import org.junit.jupiter.api.Assertions;
4248
import org.junit.jupiter.api.Test;
4349
import org.junit.jupiter.api.extension.RegisterExtension;
4450
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -50,6 +56,7 @@
5056
* Tests for {@link S3Storage}.
5157
*
5258
* @since 0.15
59+
* @checkstyle ClassDataAbstractionCouplingCheck (2 lines)
5360
*/
5461
class S3StorageTest {
5562

@@ -75,6 +82,35 @@ void shouldUploadObjectWhenSave(final AmazonS3 client) throws Exception {
7582
MatcherAssert.assertThat(downloaded, Matchers.equalTo(data));
7683
}
7784

85+
@Test
86+
void shouldFailToSaveMultipartUploadWhenFailedToReadContent(final AmazonS3 client) {
87+
final String bucket = UUID.randomUUID().toString();
88+
client.createBucket(bucket);
89+
final String key = "fail";
90+
Assertions.assertThrows(
91+
Exception.class,
92+
() -> this.storage(bucket).save(
93+
new Key.From(key),
94+
new Content.From(Flowable.error(new IllegalStateException()))
95+
).join()
96+
);
97+
}
98+
99+
@Test
100+
void shouldAbortMultipartUploadWhenFailedToReadContent(final AmazonS3 client) {
101+
final String bucket = UUID.randomUUID().toString();
102+
client.createBucket(bucket);
103+
final String key = "abort";
104+
this.storage(bucket).save(
105+
new Key.From(key),
106+
new Content.From(Flowable.error(new IllegalStateException()))
107+
).exceptionally(ignore -> null).join();
108+
final List<MultipartUpload> uploads = client.listMultipartUploads(
109+
new ListMultipartUploadsRequest(bucket)
110+
).getMultipartUploads();
111+
MatcherAssert.assertThat(uploads, new IsEmptyIterable<>());
112+
}
113+
78114
@Test
79115
void shouldExistForSavedObject(final AmazonS3 client) {
80116
final String bucket = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)