Skip to content

Commit 7581848

Browse files
authored
fix(storage): Fix SocketTimeoutException when executing a long multi-part upload (#2973)
1 parent 3e568e7 commit 7581848

File tree

11 files changed

+288
-156
lines changed

11 files changed

+288
-156
lines changed

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/AbortMultiPartUploadWorker.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal class AbortMultiPartUploadWorker(
3333
private val transferStatusUpdater: TransferStatusUpdater,
3434
context: Context,
3535
workerParameters: WorkerParameters
36-
) : BaseTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
36+
) : SuspendingTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
3737

3838
override suspend fun performWork(): Result {
3939
val s3: S3Client = clientProvider.getStorageTransferClient(transferRecord.region, transferRecord.bucketName)

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BaseTransferWorker.kt

Lines changed: 3 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,10 @@
1515

1616
package com.amplifyframework.storage.s3.transfer.worker
1717

18-
import android.app.NotificationChannel
19-
import android.app.NotificationManager
2018
import android.content.Context
2119
import android.net.ConnectivityManager
2220
import android.net.NetworkCapabilities
2321
import android.os.Build
24-
import android.util.Log
25-
import androidx.annotation.RequiresApi
26-
import androidx.core.app.NotificationCompat
27-
import androidx.work.CoroutineWorker
28-
import androidx.work.Data
29-
import androidx.work.ForegroundInfo
30-
import androidx.work.WorkerParameters
31-
import androidx.work.workDataOf
3222
import aws.sdk.kotlin.services.s3.model.ObjectCannedAcl
3323
import aws.sdk.kotlin.services.s3.model.PutObjectRequest
3424
import aws.sdk.kotlin.services.s3.model.RequestPayer
@@ -37,40 +27,15 @@ import aws.sdk.kotlin.services.s3.model.StorageClass
3727
import aws.smithy.kotlin.runtime.content.ByteStream
3828
import aws.smithy.kotlin.runtime.content.fromFile
3929
import aws.smithy.kotlin.runtime.time.Instant
40-
import com.amplifyframework.core.Amplify
41-
import com.amplifyframework.core.category.CategoryType
4230
import com.amplifyframework.storage.ObjectMetadata
43-
import com.amplifyframework.storage.TransferState
44-
import com.amplifyframework.storage.s3.AWSS3StoragePlugin
45-
import com.amplifyframework.storage.s3.R
4631
import com.amplifyframework.storage.s3.transfer.ProgressListener
47-
import com.amplifyframework.storage.s3.transfer.TransferDB
4832
import com.amplifyframework.storage.s3.transfer.TransferRecord
49-
import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater
5033
import java.io.File
51-
import java.lang.Exception
52-
import java.net.SocketException
53-
import kotlinx.coroutines.CancellationException
54-
import kotlinx.coroutines.currentCoroutineContext
55-
import kotlinx.coroutines.isActive
5634

5735
/**
5836
* Base worker to perform transfer file task.
5937
*/
60-
internal abstract class BaseTransferWorker(
61-
private val transferStatusUpdater: TransferStatusUpdater,
62-
private val transferDB: TransferDB,
63-
context: Context,
64-
workerParameters: WorkerParameters
65-
) : CoroutineWorker(context, workerParameters) {
66-
67-
internal lateinit var transferRecord: TransferRecord
68-
internal lateinit var outputData: Data
69-
private val logger =
70-
Amplify.Logging.logger(
71-
CategoryType.STORAGE,
72-
AWSS3StoragePlugin.AWS_S3_STORAGE_LOG_NAMESPACE.format(this::class.java.simpleName)
73-
)
38+
internal interface BaseTransferWorker {
7439

7540
companion object {
7641
internal const val PART_RECORD_ID = "PART_RECORD_ID"
@@ -86,91 +51,7 @@ internal abstract class BaseTransferWorker(
8651
internal const val MULTIPART_UPLOAD: String = "MULTIPART_UPLOAD"
8752
}
8853

89-
override suspend fun doWork(): Result {
90-
// Foreground task is disabled until the foreground notification behavior and the recent customer feedback,
91-
// it will be enabled in future based on the customer request.
92-
val isForegroundTask: Boolean = (inputData.keyValueMap[RUN_AS_FOREGROUND_TASK] ?: false) as Boolean
93-
if (isForegroundTask) {
94-
setForegroundAsync(getForegroundInfo())
95-
}
96-
val result = runCatching {
97-
val transferRecordId =
98-
inputData.keyValueMap[PART_RECORD_ID] as? Int ?: inputData.keyValueMap[TRANSFER_RECORD_ID] as Int
99-
outputData = workDataOf(OUTPUT_TRANSFER_RECORD_ID to inputData.keyValueMap[TRANSFER_RECORD_ID] as Int)
100-
transferDB.getTransferRecordById(transferRecordId)?.let { tr ->
101-
transferRecord = tr
102-
performWork()
103-
} ?: return run {
104-
Result.failure(outputData)
105-
}
106-
}
107-
108-
return when {
109-
result.isSuccess -> {
110-
result.getOrThrow()
111-
}
112-
else -> {
113-
val ex = result.exceptionOrNull()
114-
if (currentCoroutineContext().isActive) {
115-
logger.error("${this.javaClass.simpleName} failed with exception: ${Log.getStackTraceString(ex)}")
116-
}
117-
if (!currentCoroutineContext().isActive && isRetryableError(ex)) {
118-
Result.retry()
119-
} else {
120-
transferStatusUpdater.updateOnError(transferRecord.id, Exception(ex))
121-
transferStatusUpdater.updateTransferState(
122-
transferRecord.id,
123-
TransferState.FAILED
124-
)
125-
Result.failure(outputData)
126-
}
127-
}
128-
}
129-
}
130-
131-
abstract suspend fun performWork(): Result
132-
133-
internal open var maxRetryCount = 0
134-
135-
override suspend fun getForegroundInfo(): ForegroundInfo {
136-
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
137-
createChannel()
138-
}
139-
val appIcon = R.drawable.amplify_storage_transfer_notification_icon
140-
return ForegroundInfo(
141-
1,
142-
NotificationCompat.Builder(
143-
applicationContext,
144-
applicationContext.getString(R.string.amplify_storage_notification_channel_id)
145-
)
146-
.setSmallIcon(appIcon)
147-
.setContentTitle(applicationContext.getString(R.string.amplify_storage_notification_title))
148-
.build()
149-
)
150-
}
151-
152-
private fun isRetryableError(e: Throwable?): Boolean {
153-
return !isNetworkAvailable(applicationContext) ||
154-
runAttemptCount < maxRetryCount ||
155-
e is CancellationException ||
156-
// SocketException is thrown when download is terminated due to network disconnection.
157-
e is SocketException
158-
}
159-
160-
@RequiresApi(Build.VERSION_CODES.O)
161-
private fun createChannel() {
162-
val notificationManager =
163-
applicationContext.getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
164-
notificationManager.createNotificationChannel(
165-
NotificationChannel(
166-
applicationContext.getString(R.string.amplify_storage_notification_channel_id),
167-
applicationContext.getString(R.string.amplify_storage_notification_channel_name),
168-
NotificationManager.IMPORTANCE_DEFAULT
169-
)
170-
)
171-
}
172-
173-
private fun isNetworkAvailable(context: Context): Boolean {
54+
fun isNetworkAvailable(context: Context): Boolean {
17455
val connectivityManager =
17556
context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
17657
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
@@ -198,7 +79,7 @@ internal abstract class BaseTransferWorker(
19879
return false
19980
}
20081

201-
internal fun createPutObjectRequest(
82+
fun createPutObjectRequest(
20283
transferRecord: TransferRecord,
20384
progressListener: ProgressListener?
20485
): PutObjectRequest {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amplifyframework.storage.s3.transfer.worker
17+
18+
import android.content.Context
19+
import android.util.Log
20+
import androidx.work.Data
21+
import androidx.work.Worker
22+
import androidx.work.WorkerParameters
23+
import androidx.work.workDataOf
24+
import com.amplifyframework.core.Amplify
25+
import com.amplifyframework.core.category.CategoryType
26+
import com.amplifyframework.storage.TransferState
27+
import com.amplifyframework.storage.s3.AWSS3StoragePlugin
28+
import com.amplifyframework.storage.s3.transfer.TransferDB
29+
import com.amplifyframework.storage.s3.transfer.TransferRecord
30+
import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater
31+
import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.OUTPUT_TRANSFER_RECORD_ID
32+
import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.PART_RECORD_ID
33+
import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.TRANSFER_RECORD_ID
34+
import java.lang.Exception
35+
import java.net.SocketException
36+
37+
/**
38+
* Base worker to perform transfer file task.
39+
*/
40+
internal abstract class BlockingTransferWorker(
41+
private val transferStatusUpdater: TransferStatusUpdater,
42+
private val transferDB: TransferDB,
43+
context: Context,
44+
workerParameters: WorkerParameters
45+
) : Worker(context, workerParameters), BaseTransferWorker {
46+
47+
internal lateinit var transferRecord: TransferRecord
48+
internal lateinit var outputData: Data
49+
50+
private val logger =
51+
Amplify.Logging.logger(
52+
CategoryType.STORAGE,
53+
AWSS3StoragePlugin.AWS_S3_STORAGE_LOG_NAMESPACE.format(this::class.java.simpleName)
54+
)
55+
56+
override fun doWork(): Result {
57+
val result = runCatching {
58+
val transferRecordId =
59+
inputData.keyValueMap[PART_RECORD_ID] as? Int ?: inputData.keyValueMap[TRANSFER_RECORD_ID] as Int
60+
outputData = workDataOf(OUTPUT_TRANSFER_RECORD_ID to inputData.keyValueMap[TRANSFER_RECORD_ID] as Int)
61+
transferDB.getTransferRecordById(transferRecordId)?.let { tr ->
62+
transferRecord = tr
63+
performWork()
64+
} ?: return run {
65+
Result.failure(outputData)
66+
}
67+
}
68+
69+
return when {
70+
result.isSuccess -> {
71+
result.getOrThrow()
72+
}
73+
else -> {
74+
val ex = result.exceptionOrNull()
75+
logger.error("${this.javaClass.simpleName} failed with exception: ${Log.getStackTraceString(ex)}")
76+
if (isRetryableError(ex)) {
77+
Result.retry()
78+
} else {
79+
transferStatusUpdater.updateOnError(transferRecord.id, Exception(ex))
80+
transferStatusUpdater.updateTransferState(
81+
transferRecord.id,
82+
TransferState.FAILED
83+
)
84+
Result.failure(outputData)
85+
}
86+
}
87+
}
88+
}
89+
90+
abstract fun performWork(): Result
91+
92+
internal open var maxRetryCount = 0
93+
94+
private fun isRetryableError(e: Throwable?): Boolean {
95+
return !isNetworkAvailable(applicationContext) ||
96+
runAttemptCount < maxRetryCount ||
97+
// SocketException is thrown when download is terminated due to network disconnection.
98+
e is SocketException
99+
}
100+
}

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/CompleteMultiPartUploadWorker.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal class CompleteMultiPartUploadWorker(
3333
private val transferStatusUpdater: TransferStatusUpdater,
3434
context: Context,
3535
workerParameters: WorkerParameters
36-
) : BaseTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
36+
) : SuspendingTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
3737

3838
override suspend fun performWork(): Result {
3939
val completedParts = transferDB.queryPartETagsOfUpload(transferRecord.id)

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/DownloadWorker.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal class DownloadWorker(
4646
private val transferStatusUpdater: TransferStatusUpdater,
4747
context: Context,
4848
workerParameters: WorkerParameters
49-
) : BaseTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
49+
) : SuspendingTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
5050

5151
private lateinit var downloadProgressListener: DownloadProgressListener
5252
private val defaultBufferSize = 8192L

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/InitiateMultiPartUploadTransferWorker.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import com.amplifyframework.storage.TransferState
2424
import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider
2525
import com.amplifyframework.storage.s3.transfer.TransferDB
2626
import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater
27+
import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.MULTI_PART_UPLOAD_ID
28+
import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.TRANSFER_RECORD_ID
2729

2830
/**
2931
* Worker to initiate multipart upload
@@ -34,7 +36,7 @@ internal class InitiateMultiPartUploadTransferWorker(
3436
private val transferStatusUpdater: TransferStatusUpdater,
3537
context: Context,
3638
workerParameters: WorkerParameters
37-
) : BaseTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
39+
) : SuspendingTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
3840

3941
override suspend fun performWork(): Result {
4042
val s3: S3Client = clientProvider.getStorageTransferClient(transferRecord.region, transferRecord.bucketName)

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/PartUploadTransferWorker.kt

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider
2626
import com.amplifyframework.storage.s3.transfer.TransferDB
2727
import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater
2828
import com.amplifyframework.storage.s3.transfer.UploadProgressListenerInterceptor
29+
import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.MULTI_PART_UPLOAD_ID
2930
import java.io.File
30-
import kotlinx.coroutines.currentCoroutineContext
31-
import kotlinx.coroutines.isActive
31+
import kotlinx.coroutines.runBlocking
3232

3333
/**
3434
* Worker to upload a part for multipart upload
@@ -39,41 +39,39 @@ internal class PartUploadTransferWorker(
3939
private val transferStatusUpdater: TransferStatusUpdater,
4040
context: Context,
4141
workerParameters: WorkerParameters
42-
) : BaseTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
42+
) : BlockingTransferWorker(transferStatusUpdater, transferDB, context, workerParameters) {
4343

4444
private lateinit var multiPartUploadId: String
4545
private lateinit var partUploadProgressListener: PartUploadProgressListener
4646
override var maxRetryCount = 3
4747

48-
override suspend fun performWork(): Result {
49-
if (!currentCoroutineContext().isActive) {
50-
return Result.retry()
51-
}
48+
override fun performWork(): Result {
5249
transferStatusUpdater.updateTransferState(transferRecord.mainUploadId, TransferState.IN_PROGRESS)
5350
multiPartUploadId = inputData.keyValueMap[MULTI_PART_UPLOAD_ID] as String
5451
partUploadProgressListener = PartUploadProgressListener(transferRecord, transferStatusUpdater)
5552
val s3: S3Client = clientProvider.getStorageTransferClient(transferRecord.region, transferRecord.bucketName)
56-
return s3.withConfig {
57-
interceptors += UploadProgressListenerInterceptor(partUploadProgressListener)
58-
enableAccelerate = transferRecord.useAccelerateEndpoint == 1
59-
}.uploadPart {
60-
bucket = transferRecord.bucketName
61-
key = transferRecord.key
62-
uploadId = multiPartUploadId
63-
body = File(transferRecord.file).asByteStream(
64-
start = transferRecord.fileOffset,
65-
transferRecord.fileOffset + transferRecord.bytesTotal - 1
66-
)
67-
partNumber = transferRecord.partNumber
68-
}.let { response ->
69-
response.eTag?.let { tag ->
70-
transferDB.updateETag(transferRecord.id, tag)
71-
transferDB.updateState(transferRecord.id, TransferState.PART_COMPLETED)
72-
updateProgress()
73-
Result.success(outputData)
74-
} ?: run {
75-
throw IllegalStateException("Etag is empty")
53+
54+
return runBlocking {
55+
s3.withConfig {
56+
interceptors += UploadProgressListenerInterceptor(partUploadProgressListener)
57+
enableAccelerate = transferRecord.useAccelerateEndpoint == 1
58+
}.uploadPart {
59+
bucket = transferRecord.bucketName
60+
key = transferRecord.key
61+
uploadId = multiPartUploadId
62+
body = File(transferRecord.file).asByteStream(
63+
start = transferRecord.fileOffset,
64+
transferRecord.fileOffset + transferRecord.bytesTotal - 1
65+
)
66+
partNumber = transferRecord.partNumber
7667
}
68+
}.eTag?.let { tag ->
69+
transferDB.updateETag(transferRecord.id, tag)
70+
transferDB.updateState(transferRecord.id, TransferState.PART_COMPLETED)
71+
updateProgress()
72+
return Result.success(outputData)
73+
} ?: run {
74+
throw IllegalStateException("Etag is empty")
7775
}
7876
}
7977

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/RouterWorker.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ internal class RouterWorker(
4343
?: throw IllegalArgumentException("Worker class name is missing")
4444
private val workerId = parameter.inputData.getString(BaseTransferWorker.WORKER_ID)
4545

46-
private var delegateWorker: BaseTransferWorker? = null
46+
private var delegateWorker: ListenableWorker? = null
4747

4848
companion object {
4949
internal const val WORKER_CLASS_NAME = "WORKER_CLASS_NAME"

0 commit comments

Comments
 (0)