-
Notifications
You must be signed in to change notification settings - Fork 146
Open
Description
When calling .shutdown()
on a CRTTransferManager
class, failures are not raised. After some digging into this repo, I saw
the below code (referenced from here)
def _shutdown(self, cancel=False):
if cancel:
self._cancel_transfers()
try:
self._finish_transfers()
except KeyboardInterrupt:
self._cancel_transfers()
except Exception:
pass
finally:
self._wait_transfers_done()
the pass
statement should probably re-raise the exception, so that errors are actually thrown. What do you think?
For the record, my code was this:
def parallel_upload(bucket: str, data: List[dict], workers=100):
"""performs parallel upload of records
Args:
bucket (str): the bucket
records (List[dict]): records to process
ex. [
{
"key": "path/file.type",
"data": "data"
}
]
workers (int, optional): number of parrallel workers. Defaults to 100.
"""
botocore_config = botocore.config.Config(
max_pool_connections=workers,
tcp_keepalive=True
)
s3client = boto3.Session().client('s3', config=botocore_config)
transfer_config = s3transfer.TransferConfig(max_concurrency=workers)
s3t = s3transfer.create_transfer_manager(s3client, transfer_config)
upload_bytes = []
futures = []
for item in data:
# IMPORTANT! we need to retain the reference to BytesIO outside of this loop otherwise
# the call to shutdown will fail all transfers as there is no longer a reference to the bytes
# being transferred - hence the use of a list to maintain them
upload_bytes.append(BytesIO(item['data'].encode()))
s3t.upload(
upload_bytes[-1], bucket, item['key']
)
# wait for transfers to complete
s3t.shutdown()
print(f"\t - {len(data)} objects uploaded to {bucket} successfully.")
As a work-around, I have manually handled the collection of results from s3t.upload
, like below:
def parallel_upload(bucket: str, data: List[dict], workers=100):
"""performs parallel upload of records
Args:
bucket (str): the bucket
records (List[dict]): records to process
ex. [
{
"key": "path/file.type",
"data": "data"
}
]
workers (int, optional): number of parrallel workers. Defaults to 100.
"""
botocore_config = botocore.config.Config(
max_pool_connections=workers,
tcp_keepalive=True
)
s3client = boto3.Session().client('s3', config=botocore_config)
transfer_config = s3transfer.TransferConfig(max_concurrency=workers)
s3t = s3transfer.create_transfer_manager(s3client, transfer_config)
upload_bytes = []
futures = []
for item in data:
# IMPORTANT! we need to retain the reference to BytesIO outside of this loop otherwise
# the call to shutdown will fail all transfers as there is no longer a reference to the bytes
# being transferred - hence the use of a list to maintain them
upload_bytes.append(BytesIO(item['data'].encode()))
# keep track of all submissions for validation of successful uploads
futures.append(
s3t.upload(
upload_bytes[-1], bucket, item['key']
)
)
# ensure all results do not throw an exception.
for f in futures:
f.result() # if upload fails, exception is thrown from this call
print(f"\t - {len(data)} objects uploaded to {bucket} successfully.")
I simply iterate the results after the queue-ing of uploads, and call .result()
, which works as expected allowing me to concurrently upload while still detecting errors.
r-m-n
Metadata
Metadata
Assignees
Labels
No labels