Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace simpledb. #3569

Draft
wants to merge 81 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
d1a8eca
Save progress.
DailyDreaming Mar 5, 2021
c13e238
Save progress.
DailyDreaming Mar 5, 2021
8a5ed4f
.
DailyDreaming Mar 19, 2021
eb20bc9
Progress... ?
DailyDreaming Mar 26, 2021
2785d22
Progress... ?
DailyDreaming Mar 26, 2021
a08ebf0
Update.
DailyDreaming Mar 26, 2021
b9f7653
Rebase.
DailyDreaming Apr 7, 2021
f7555c7
Progress.
DailyDreaming Apr 7, 2021
758ad72
Merge branch 'master' of https://github.com/DataBiosphere/toil into i…
DailyDreaming Apr 13, 2021
87c8b19
Update.
DailyDreaming Apr 14, 2021
4ea1107
Update.
DailyDreaming Apr 14, 2021
623f7ec
More changes.
DailyDreaming Apr 14, 2021
d509d19
More changes.
DailyDreaming Apr 14, 2021
2ecad7c
Rework credentials.
DailyDreaming Apr 14, 2021
5bfaaad
Merge branch 'master' into issues/964-replace-sdb-with-dynamodb
DailyDreaming Apr 14, 2021
9897812
Rework credentials.
DailyDreaming Apr 14, 2021
ce18e3b
Rework credentials.
DailyDreaming Apr 14, 2021
b647571
Merge branch 'issues/964-replace-sdb-with-dynamodb' of https://github…
DailyDreaming Apr 14, 2021
f8079b6
Typing.
DailyDreaming Apr 14, 2021
320172b
Refactor.
DailyDreaming Apr 14, 2021
cfec71f
Refactor.
DailyDreaming Apr 14, 2021
4b22df0
Refactor.
DailyDreaming Apr 14, 2021
7964a9e
Update mypy.
DailyDreaming Apr 14, 2021
5ab613c
Update mypy.
DailyDreaming Apr 14, 2021
b7dbaab
Hmmm...
DailyDreaming Apr 15, 2021
24250fa
Save progress.
DailyDreaming May 8, 2021
b383dd1
Updates.
DailyDreaming May 9, 2021
ce3feb9
Rebase with master.
DailyDreaming May 9, 2021
ffcac1e
Update imports.
DailyDreaming May 9, 2021
d8d1c83
Update imports.
DailyDreaming May 9, 2021
1afd2f6
Update tests.
DailyDreaming May 9, 2021
9161b5f
Linting.
DailyDreaming May 9, 2021
2b9db9b
Linting.
DailyDreaming May 9, 2021
9b59b2e
Updates.
DailyDreaming May 9, 2021
3941b38
Use create_bucket.
DailyDreaming May 9, 2021
f342e66
Remove/stub batch.
DailyDreaming May 9, 2021
f16d926
Remove/stub batch.
DailyDreaming May 9, 2021
2a226dd
Remove/stub batch.
DailyDreaming May 9, 2021
93c528b
More fixes.
DailyDreaming May 10, 2021
555c998
Update.
DailyDreaming May 10, 2021
c77e726
Update.
DailyDreaming May 10, 2021
912da73
Rebase.
DailyDreaming May 10, 2021
32bdaa5
.
DailyDreaming May 10, 2021
58c2112
testFileDeletion
DailyDreaming May 10, 2021
3946645
Update.
DailyDreaming May 11, 2021
fddb0b5
Tests seem to be passing.
DailyDreaming May 12, 2021
522390f
Merge branch 'master' into issues/964-replace-sdb-with-dynamodb
DailyDreaming May 12, 2021
f2d7a89
Fix shared file.
DailyDreaming May 12, 2021
2f89eef
Merge branch 'issues/964-replace-sdb-with-dynamodb' of https://github…
DailyDreaming May 12, 2021
ba9e31d
Dont encrypt shared files.
DailyDreaming Jun 13, 2021
7dceb66
Update.
DailyDreaming Aug 25, 2021
6dd2d6e
Large rebase.
DailyDreaming Aug 25, 2021
d2e833b
Check tests.
DailyDreaming Aug 31, 2021
b873668
Merge branch 'master' of https://github.com/DataBiosphere/toil into i…
DailyDreaming Aug 31, 2021
07c9a3a
Tests.
DailyDreaming Aug 31, 2021
d01da36
Update .gitlab-ci.yml
DailyDreaming Sep 1, 2021
c3a6f77
Update.
DailyDreaming Sep 1, 2021
ebd3eef
Merge branch 'issues/964-replace-sdb-with-dynamodb' of https://github…
DailyDreaming Sep 1, 2021
da68b39
AWS exec preservation.
DailyDreaming Sep 1, 2021
e1a9af4
Update.
DailyDreaming Sep 13, 2021
0ccfa5b
Rebase.
DailyDreaming Sep 14, 2021
d3a6615
Cruft.
DailyDreaming Sep 14, 2021
a3ddb96
Cruft.
DailyDreaming Sep 14, 2021
5551149
Cruft.
DailyDreaming Sep 14, 2021
ba90b83
Consolidate functions.
DailyDreaming Sep 14, 2021
7054c3b
Update.
DailyDreaming Sep 14, 2021
df3121e
Update.
DailyDreaming Sep 14, 2021
6f29bc1
Cruft.
DailyDreaming Sep 14, 2021
3265b15
Cruft.
DailyDreaming Sep 14, 2021
5104ae7
Cruft.
DailyDreaming Sep 14, 2021
28a2861
Cruft.
DailyDreaming Sep 14, 2021
33ea71d
Cruft.
DailyDreaming Sep 14, 2021
a398b0b
Cruft.
DailyDreaming Sep 14, 2021
735ca38
Test bucket deletion.
DailyDreaming Sep 14, 2021
b4416cc
Cruft.
DailyDreaming Sep 14, 2021
aafc5a7
Cruft.
DailyDreaming Sep 14, 2021
6d352ac
Specify region.
DailyDreaming Sep 14, 2021
428a624
Cruft.
DailyDreaming Sep 15, 2021
0341be2
Cruft.
DailyDreaming Sep 15, 2021
ff9dc82
Correct exception.
DailyDreaming Sep 15, 2021
db7fecf
Cruft.
DailyDreaming Sep 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Progress.
DailyDreaming committed Apr 7, 2021
commit f7555c745cf8db6424122f2e8c0089f3ae2b4844
16 changes: 16 additions & 0 deletions src/toil/jobStores/aws/file_info.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,9 @@
import logging
import reprlib
import uuid
from collections import namedtuple
from contextlib import contextmanager
from typing import Optional
from toil.jobStores.aws.utils import uploadFromPath, copyKeyMultipart
from toil.lib.pipes import ReadablePipe, ReadableTransformingPipe
from toil.lib.checksum import compute_checksum_for_file
@@ -34,6 +36,20 @@ class ChecksumError(Exception):
"""Raised when a download from AWS does not contain the correct data."""


class AWSFileMetadata:
def __init__(self,
file_id: str,
owner_id: str,
number_of_chunks: int = 0,
checksum: Optional[str] = None,
sse_key_path: Optional[str] = None):
self.file_id = file_id
self.owner_id = owner_id
self.number_of_chunks = number_of_chunks
self.checksum = checksum
self.sse_key_path = sse_key_path


class AWSFile:
def __init__(self,
fileID,
126 changes: 71 additions & 55 deletions src/toil/jobStores/aws/jobStore.py
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@
NoSuchJobException,
NoSuchJobStoreException)
from toil.jobStores.aws.utils import uploadFile
from toil.jobStores.aws.file_info import AWSFile
from toil.jobStores.aws.file_info import AWSFile, AWSFileMetadata
from toil.lib.compatibility import compat_bytes
from toil.lib.ec2 import establish_boto3_session
from toil.lib.aws.dynamodb import (put_item,
@@ -49,7 +49,10 @@
from toil.lib.aws.s3 import (create_bucket,
delete_bucket,
bucket_exists,
bucket_is_registered_with_toil)
bucket_is_registered_with_toil,
copy_s3_to_s3,
boto_args,
parse_s3_uri)
from toil.lib.ec2nodes import EC2Regions
from toil.lib.checksum import compute_checksum_for_content, compute_checksum_for_file
from toil.lib.retry import retry
@@ -108,7 +111,7 @@ def __init__(self, locator: str, part_size: int = DEFAULT_AWS_PART_SIZE):
self.sse_key = None
self.encryption_args = {}
if self.config.sseKey:
with open(self.config.sseKey, 'rb') as f:
with open(self.config.sseKey, 'r') as f:
self.sse_key = f.read()
assert len(self.sse_key) == 32
self.encryption_args = {'SSECustomerAlgorithm': 'AES256', 'SSECustomerKey': self.sse_key}
@@ -118,7 +121,7 @@ def __init__(self, locator: str, part_size: int = DEFAULT_AWS_PART_SIZE):
self.bucket = None

boto3_session = establish_boto3_session()
self.s3_resource = boto3_session.resource('s3', region_name=self.region, **self.boto_args())
self.s3_resource = boto3_session.resource('s3', region_name=self.region, **boto_args())
self.s3_client = self.s3_resource.meta.client

self._batchedUpdates = []
@@ -184,16 +187,16 @@ def jobs(self):
for job_id in get_primary_key_items(table=self.table, key='jobs', return_key='sort_key'):
yield self.unpickle_job(job_id)

def load_job(self, file_id):
job = self.unpickle_job(file_id)
def load_job(self, job_id: str):
job = self.unpickle_job(job_id)
if job is None:
raise NoSuchJobException(file_id)
raise NoSuchJobException(job_id)
return job

def update_job(self, jobDescription):
self.pickle_job(jobDescription)

def delete_job(self, jobStoreID):
def delete_job(self, jobStoreID: str):
logger.debug("Deleting job %s", jobStoreID)
associated_files = get_item(table=self.table, hash_key='jobs', sort_key=jobStoreID)
delete_item(table=self.table, hash_key='jobs', sort_key=jobStoreID)
@@ -219,39 +222,47 @@ def getEmptyFileStoreID(self, jobStoreID=None, cleanup=False, basename=None):
assert 0 == response.get('ContentLength', None)
return new_file_id

def _importFile(self, otherCls, url, sharedFileName=None, hardlink=False, symlink=False):
def _importFile(self, otherCls, url, sharedFileName=None, hardlink=False, symlink=False) -> FileID:
"""
Upload a file into the s3 bucket jobstore from the source uri and store the file's
metadata inside of dynamodb.

This db entry's existence should always be in sync with the file's existence (when one exists,
so must the other).
"""
# we are uploading from s3 to s3
if issubclass(otherCls, AWSJobStore):
srcObj = self._getObjectForUrl(url, existing=True)
size = srcObj.content_length
if sharedFileName is None:
info = AWSFile.create(srcObj.key)
else:
self._requireValidSharedFileName(sharedFileName)
jobStoreFileID = self._sharedFileID(sharedFileName)
info = AWSFile.loadOrCreate(jobStoreFileID=jobStoreFileID,
ownerID=self.sharedFileOwnerID,
encrypted=None)
info.copyFrom(srcObj)
info.save()
return FileID(info.fileID, size) if sharedFileName is None else None
# make sure the object exists and provide access to s3_blob.content_length
s3_blob = self._getObjectForUrl(url, existing=True)

file_id = str(uuid.uuid4()) if not sharedFileName else self._sharedFileID(sharedFileName)
owner_id = s3_blob.key if not sharedFileName else self.sharedFileOwnerID

# stow file's metadata in db
metadata = json.loads(dict(file_id=file_id, owner_id=owner_id, sse_key_path=self.sse_key))
put_item(table=self.table, hash_key='files', sort_key=file_id, value=metadata)

# upload the uri to our s3 jobstore bucket
# , sse_key=self.sse_key
copy_s3_to_s3(src_bucket=s3_blob.bucket_name, src_key=s3_blob.key,
dst_bucket=self.bucket_name, dst_key=file_id)
return FileID(fileStoreID=file_id, size=s3_blob.content_length) if sharedFileName is None else None
else:
return super(AWSJobStore, self)._importFile(otherCls, url, sharedFileName=sharedFileName)

def _exportFile(self, otherCls, jobStoreFileID, url):
def _exportFile(self, otherCls, jobStoreFileID, url) -> None:
if issubclass(otherCls, AWSJobStore):
dstObj = self._getObjectForUrl(url)
info = AWSFile.loadOrFail(jobStoreFileID)
info.copyTo(dstObj)
s3_blob = self._getObjectForUrl(url)
# fail if db fetch does not exist
# needed?
get_item(table=self.table, hash_key='files', sort_key=jobStoreFileID)
# upload the uri to our s3 jobstore bucket
# , sse_key=self.sse_key
copy_s3_to_s3(src_bucket=self.bucket_name, src_key=jobStoreFileID,
dst_bucket=s3_blob.bucket_name, dst_key=s3_blob.key)
else:
super(AWSJobStore, self)._defaultExportFile(otherCls, jobStoreFileID, url)

@classmethod
def getSize(cls, url: str):
try:
return cls._getObjectForUrl(url, existing=True).content_length
except AWSKeyNotFoundError:
return 0

@classmethod
def _readFromUrl(cls, url, writable):
srcObj = cls._getObjectForUrl(url, existing=True)
@@ -267,22 +278,6 @@ def _writeToUrl(self, readable, url, executable=False):
fileID=dstObj.key,
partSize=DEFAULT_AWS_PART_SIZE)

def boto_args(self):
host = os.environ.get('TOIL_S3_HOST', None)
port = os.environ.get('TOIL_S3_PORT', None)
protocol = 'https'
if os.environ.get('TOIL_S3_USE_SSL', True) == 'False':
protocol = 'http'
if host:
return {'endpoint_url': f'{protocol}://{host}' + f':{port}' if port else ''}
return {}

def load_or_create(self, file_id, owner, encrypt):
pass

def load_or_fail(self, file_id):
pass

def _getObjectForUrl(self, uri: str, existing: Optional[bool] = None):
"""
Extracts a key (object) from a given s3:// URL.
@@ -292,9 +287,7 @@ def _getObjectForUrl(self, uri: str, existing: Optional[bool] = None):

:rtype: S3.Object
"""
if uri.startswith('s3://'):
raise ValueError(f'Invalid schema. Expecting s3 prefix, not: {uri}')
bucket_name, key_name = uri[len('s3://'):].split('/', 1)
bucket_name, key_name = parse_s3_uri(uri)
obj = self.s3_resource.Object(bucket_name, key_name)

try:
@@ -319,7 +312,26 @@ def _supportsUrl(cls, url, export=False):
return url.scheme.lower() == 's3'

def writeFile(self, localFilePath, jobStoreID=None, cleanup=False):
info = AWSFile.create(jobStoreID if cleanup else None)
file_id = jobStoreID if cleanup else None
copy_local_to_s3(localFilePath, dst_bucket, dst_key)
# file_size, file_time = fileSizeAndTime(localFilePath)
# if file_size <= self.maxInlinedSize():
# with open(localFilePath, 'rb') as f:
# self.content = f.read()
# # Clear out any old checksum in case of overwrite
# self.checksum = ''
# else:
# headerArgs = self._s3EncryptionArgs()
# # Create a new Resource in case it needs to be on its own thread
# resource = boto3_session.resource('s3', region_name=self.outer.region)
#
# self.checksum = self._get_file_checksum(localFilePath) if calculateChecksum else None
# self.version = uploadFromPath(localFilePath,
# resource=resource,
# bucketName=self.outer.filesBucket.name,
# fileID=compat_bytes(self.fileID),
# headerArgs=headerArgs,
# partSize=self.outer.partSize)
info.upload(localFilePath, not self.config.disableJobStoreChecksumVerification)
info.save()
return info.fileID
@@ -356,8 +368,12 @@ def updateFileStream(self, jobStoreFileID, encoding=None, errors=None):
def fileExists(self, jobStoreFileID):
return AWSFile.exists(jobStoreFileID)

def getFileSize(self, jobStoreFileID):
return self.getSize(f's3://{self.bucket_name}/{jobStoreFileID}')
def getFileSize(self, jobStoreFileID: str) -> int:
url = f's3://{self.bucket_name}/{jobStoreFileID}'
try:
return self._getObjectForUrl(url, existing=True).content_length
except AWSKeyNotFoundError:
return 0

def readFile(self, jobStoreFileID, localFilePath, symlink=False):
self.load_or_fail(jobStoreFileID)
4 changes: 3 additions & 1 deletion src/toil/jobStores/aws/utils.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,8 @@
# limitations under the License.
import logging
import os
import socket
import errno
from typing import Optional

from boto.exception import (
@@ -24,7 +26,7 @@
from botocore.exceptions import ClientError

from toil.lib.compatibility import compat_bytes
from toil.lib.retry import retry, ErrorCondition
from toil.lib.retry import retry, ErrorCondition, old_retry

logger = logging.getLogger(__name__)

2 changes: 1 addition & 1 deletion src/toil/lib/aws/dynamodb.py
Original file line number Diff line number Diff line change
@@ -214,5 +214,5 @@ def table_exists(table: str) -> bool:
table = db.Table(table)
try:
is_table_existing = table.table_status in ("CREATING", "UPDATING", "DELETING", "ACTIVE")
except ClientError:
except db.ClientError:
return False
37 changes: 35 additions & 2 deletions src/toil/lib/aws/s3.py
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import hashlib
import itertools
import time
import os
from io import BytesIO
from typing import Optional, Tuple, Union

@@ -92,9 +93,23 @@ def bucket_exists(bucket: str) -> Union[bool, s3_boto3_resource.Bucket]:

# TODO: Determine specific retries
@retry()
def bucket_versioning_enabled(self, bucket: str):
def copy_s3_to_s3(src_bucket, src_key, dst_bucket, dst_key):
source = {'Bucket': src_bucket, 'Key': src_key}
dest = s3_boto3_resource.Bucket(dst_bucket)
dest.copy(source, dst_key)


# TODO: Determine specific retries
@retry()
def copy_local_to_s3(local_file_path, dst_bucket, dst_key):
s3_boto3_client.upload_file(local_file_path, dst_bucket, dst_key)


# TODO: Determine specific retries
@retry()
def bucket_versioning_enabled(bucket: str):
versionings = dict(Enabled=True, Disabled=False, Suspended=None)
status = self.s3_resource.BucketVersioning(bucket).status
status = s3_boto3_resource.BucketVersioning(bucket).status
return versionings.get(status) if status else False


@@ -159,3 +174,21 @@ def readFrom(self, readable):
Key=self.file_id,
UploadId=upload_id,
MultipartUpload={"Parts": parts})


def parse_s3_uri(uri: str) -> Tuple[str, str]:
if not uri.startswith('s3://'):
raise ValueError(f'Invalid schema. Expecting s3 prefix, not: {uri}')
bucket_name, key_name = uri[len('s3://'):].split('/', 1)
return bucket_name, key_name


def boto_args():
host = os.environ.get('TOIL_S3_HOST', None)
port = os.environ.get('TOIL_S3_PORT', None)
protocol = 'https'
if os.environ.get('TOIL_S3_USE_SSL', True) == 'False':
protocol = 'http'
if host:
return {'endpoint_url': f'{protocol}://{host}' + f':{port}' if port else ''}
return {}