Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace simpledb. #3569

Draft
wants to merge 81 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
d1a8eca
Save progress.
DailyDreaming Mar 5, 2021
c13e238
Save progress.
DailyDreaming Mar 5, 2021
8a5ed4f
.
DailyDreaming Mar 19, 2021
eb20bc9
Progress... ?
DailyDreaming Mar 26, 2021
2785d22
Progress... ?
DailyDreaming Mar 26, 2021
a08ebf0
Update.
DailyDreaming Mar 26, 2021
b9f7653
Rebase.
DailyDreaming Apr 7, 2021
f7555c7
Progress.
DailyDreaming Apr 7, 2021
758ad72
Merge branch 'master' of https://github.com/DataBiosphere/toil into i…
DailyDreaming Apr 13, 2021
87c8b19
Update.
DailyDreaming Apr 14, 2021
4ea1107
Update.
DailyDreaming Apr 14, 2021
623f7ec
More changes.
DailyDreaming Apr 14, 2021
d509d19
More changes.
DailyDreaming Apr 14, 2021
2ecad7c
Rework credentials.
DailyDreaming Apr 14, 2021
5bfaaad
Merge branch 'master' into issues/964-replace-sdb-with-dynamodb
DailyDreaming Apr 14, 2021
9897812
Rework credentials.
DailyDreaming Apr 14, 2021
ce18e3b
Rework credentials.
DailyDreaming Apr 14, 2021
b647571
Merge branch 'issues/964-replace-sdb-with-dynamodb' of https://github…
DailyDreaming Apr 14, 2021
f8079b6
Typing.
DailyDreaming Apr 14, 2021
320172b
Refactor.
DailyDreaming Apr 14, 2021
cfec71f
Refactor.
DailyDreaming Apr 14, 2021
4b22df0
Refactor.
DailyDreaming Apr 14, 2021
7964a9e
Update mypy.
DailyDreaming Apr 14, 2021
5ab613c
Update mypy.
DailyDreaming Apr 14, 2021
b7dbaab
Hmmm...
DailyDreaming Apr 15, 2021
24250fa
Save progress.
DailyDreaming May 8, 2021
b383dd1
Updates.
DailyDreaming May 9, 2021
ce3feb9
Rebase with master.
DailyDreaming May 9, 2021
ffcac1e
Update imports.
DailyDreaming May 9, 2021
d8d1c83
Update imports.
DailyDreaming May 9, 2021
1afd2f6
Update tests.
DailyDreaming May 9, 2021
9161b5f
Linting.
DailyDreaming May 9, 2021
2b9db9b
Linting.
DailyDreaming May 9, 2021
9b59b2e
Updates.
DailyDreaming May 9, 2021
3941b38
Use create_bucket.
DailyDreaming May 9, 2021
f342e66
Remove/stub batch.
DailyDreaming May 9, 2021
f16d926
Remove/stub batch.
DailyDreaming May 9, 2021
2a226dd
Remove/stub batch.
DailyDreaming May 9, 2021
93c528b
More fixes.
DailyDreaming May 10, 2021
555c998
Update.
DailyDreaming May 10, 2021
c77e726
Update.
DailyDreaming May 10, 2021
912da73
Rebase.
DailyDreaming May 10, 2021
32bdaa5
.
DailyDreaming May 10, 2021
58c2112
testFileDeletion
DailyDreaming May 10, 2021
3946645
Update.
DailyDreaming May 11, 2021
fddb0b5
Tests seem to be passing.
DailyDreaming May 12, 2021
522390f
Merge branch 'master' into issues/964-replace-sdb-with-dynamodb
DailyDreaming May 12, 2021
f2d7a89
Fix shared file.
DailyDreaming May 12, 2021
2f89eef
Merge branch 'issues/964-replace-sdb-with-dynamodb' of https://github…
DailyDreaming May 12, 2021
ba9e31d
Dont encrypt shared files.
DailyDreaming Jun 13, 2021
7dceb66
Update.
DailyDreaming Aug 25, 2021
6dd2d6e
Large rebase.
DailyDreaming Aug 25, 2021
d2e833b
Check tests.
DailyDreaming Aug 31, 2021
b873668
Merge branch 'master' of https://github.com/DataBiosphere/toil into i…
DailyDreaming Aug 31, 2021
07c9a3a
Tests.
DailyDreaming Aug 31, 2021
d01da36
Update .gitlab-ci.yml
DailyDreaming Sep 1, 2021
c3a6f77
Update.
DailyDreaming Sep 1, 2021
ebd3eef
Merge branch 'issues/964-replace-sdb-with-dynamodb' of https://github…
DailyDreaming Sep 1, 2021
da68b39
AWS exec preservation.
DailyDreaming Sep 1, 2021
e1a9af4
Update.
DailyDreaming Sep 13, 2021
0ccfa5b
Rebase.
DailyDreaming Sep 14, 2021
d3a6615
Cruft.
DailyDreaming Sep 14, 2021
a3ddb96
Cruft.
DailyDreaming Sep 14, 2021
5551149
Cruft.
DailyDreaming Sep 14, 2021
ba90b83
Consolidate functions.
DailyDreaming Sep 14, 2021
7054c3b
Update.
DailyDreaming Sep 14, 2021
df3121e
Update.
DailyDreaming Sep 14, 2021
6f29bc1
Cruft.
DailyDreaming Sep 14, 2021
3265b15
Cruft.
DailyDreaming Sep 14, 2021
5104ae7
Cruft.
DailyDreaming Sep 14, 2021
28a2861
Cruft.
DailyDreaming Sep 14, 2021
33ea71d
Cruft.
DailyDreaming Sep 14, 2021
a398b0b
Cruft.
DailyDreaming Sep 14, 2021
735ca38
Test bucket deletion.
DailyDreaming Sep 14, 2021
b4416cc
Cruft.
DailyDreaming Sep 14, 2021
aafc5a7
Cruft.
DailyDreaming Sep 14, 2021
6d352ac
Specify region.
DailyDreaming Sep 14, 2021
428a624
Cruft.
DailyDreaming Sep 15, 2021
0341be2
Cruft.
DailyDreaming Sep 15, 2021
ff9dc82
Correct exception.
DailyDreaming Sep 15, 2021
db7fecf
Cruft.
DailyDreaming Sep 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
.
DailyDreaming committed Mar 19, 2021
commit 8a5ed4fd7120a7e6cafadd4a0abb0c3734410417
2 changes: 1 addition & 1 deletion src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
@@ -1768,7 +1768,7 @@ def startCommitThread(self, jobState):
# Complete the job
self.jobStore.update(self.jobDesc)
# Delete any remnant jobs
list(map(self.jobStore.delete, self.jobsToDelete))
list(map(self.jobStore.delete_job, self.jobsToDelete))
# Delete any remnant files
list(map(self.jobStore.deleteFile, self.filesToDelete))
# Remove the files to delete list, having successfully removed the files
2 changes: 1 addition & 1 deletion src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
@@ -147,7 +147,7 @@ def startCommit(self, jobState=False):
# Complete the job
self.jobStore.update(self.jobDesc)
# Delete any remnant jobs
list(map(self.jobStore.delete, self.jobsToDelete))
list(map(self.jobStore.delete_job, self.jobsToDelete))
# Delete any remnant files
list(map(self.jobStore.deleteFile, self.filesToDelete))
# Remove the files to delete list, having successfully removed the files
4 changes: 2 additions & 2 deletions src/toil/job.py
Original file line number Diff line number Diff line change
@@ -916,8 +916,8 @@ def restartCheckpoint(self, jobStore):
def recursiveDelete(jobDesc):
# Recursive walk the stack to delete all remaining jobs
for otherJobID in jobDesc.successorsAndServiceHosts():
if jobStore.exists(otherJobID):
recursiveDelete(jobStore.load(otherJobID))
if jobStore.job_exists(otherJobID):
recursiveDelete(jobStore.load_job(otherJobID))
else:
logger.debug("Job %s has already been deleted", otherJobID)
if jobDesc.jobStoreID != self.jobStoreID:
24 changes: 12 additions & 12 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
@@ -200,10 +200,10 @@ def loadRootJob(self):
rootJobStoreID = f.read().decode('utf-8')
except NoSuchFileException:
raise JobException('No job has been set as the root in this job store')
if not self.exists(rootJobStoreID):
if not self.job_exists(rootJobStoreID):
raise JobException("The root job '%s' doesn't exist. Either the Toil workflow "
"is finished or has never been started" % rootJobStoreID)
return self.load(rootJobStoreID)
return self.load_job(rootJobStoreID)

# FIXME: This is only used in tests, why do we have it?

@@ -493,29 +493,29 @@ def getJobDescription(jobId):
try:
return jobCache[jobId]
except KeyError:
return self.load(jobId)
return self.load_job(jobId)
else:
return self.load(jobId)
return self.load_job(jobId)

def haveJob(jobId):
assert len(jobId) > 1, "Job ID {} too short; is a string being used as a list?".format(jobId)
if jobCache is not None:
if jobId in jobCache:
return True
else:
return self.exists(jobId)
return self.job_exists(jobId)
else:
return self.exists(jobId)
return self.job_exists(jobId)

def deleteJob(jobId):
if jobCache is not None:
if jobId in jobCache:
del jobCache[jobId]
self.delete(jobId)
self.delete_job(jobId)

def updateJobDescription(jobDescription):
jobCache[jobDescription.jobStoreID] = jobDescription
self.update(jobDescription)
self.update_job(jobDescription)

def getJobDescriptions():
if jobCache is not None:
@@ -749,7 +749,7 @@ def create(self, jobDescription):
raise NotImplementedError()

@abstractmethod
def exists(self, jobStoreID):
def job_exists(self, jobStoreID):
"""
Indicates whether a description of the job with the specified jobStoreID exists in the job store

@@ -794,7 +794,7 @@ def getSharedPublicUrl(self, sharedFileName):
raise NotImplementedError()

@abstractmethod
def load(self, jobStoreID):
def load_job(self, jobStoreID):
"""
Loads the description of the job referenced by the given ID, assigns it
the job store's config, and returns it.
@@ -812,7 +812,7 @@ def load(self, jobStoreID):
raise NotImplementedError()

@abstractmethod
def update(self, jobDescription):
def update_job(self, jobDescription):
"""
Persists changes to the state of the given JobDescription in this store atomically.

@@ -821,7 +821,7 @@ def update(self, jobDescription):
raise NotImplementedError()

@abstractmethod
def delete(self, jobStoreID):
def delete_job(self, jobStoreID):
"""
Removes the JobDescription from the store atomically. You may not then
subsequently call load(), write(), update(), etc. with the same
64 changes: 15 additions & 49 deletions src/toil/jobStores/aws/file_info.py
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@
from toil.lib.pipes import (ReadablePipe,
ReadableTransformingPipe,
WritablePipe)
from toil.lib.checksum import compute_checksum_for_file
from toil.lib.compatibility import compat_bytes
from toil.lib.ec2 import establish_boto3_session
from toil.lib.aws.s3 import MultiPartPipe, SinglePartPipe
@@ -200,40 +201,14 @@ def upload(self, localFilePath, calculateChecksum=True):
# Create a new Resource in case it needs to be on its own thread
resource = boto3_session.resource('s3', region_name=self.outer.region)

self.checksum = self._get_file_checksum(localFilePath) if calculateChecksum else None
self.checksum = compute_checksum_for_file(localFilePath) if calculateChecksum else None
self.version = uploadFromPath(localFilePath,
resource=resource,
bucketName=self.outer.filesBucket.name,
fileID=compat_bytes(self.fileID),
headerArgs=headerArgs,
partSize=self.outer.partSize)

def _start_checksum(self, to_match=None, algorithm='sha1'):
"""
Get a hasher that can be used with _update_checksum and
_finish_checksum.

If to_match is set, it is a precomputed checksum which we expect
the result to match.

The right way to compare checksums is to feed in the checksum to be
matched, so we can see its algorithm, instead of getting a new one
and comparing. If a checksum to match is fed in, _finish_checksum()
will raise a ChecksumError if it isn't matched.
"""

# If we have an expexted result it will go here.
expected = None

if to_match is not None:
parts = to_match.split('$')
algorithm = parts[0]
expected = parts[1]

wrapped = getattr(hashlib, algorithm)()
logger.debug(f'Starting {algorithm} checksum to match {expected}')
return algorithm, wrapped, expected

def _update_checksum(self, checksum_in_progress, data):
"""
Update a checksum in progress from _start_checksum with new data.
@@ -257,15 +232,6 @@ def _finish_checksum(self, checksum_in_progress):

return '$'.join([checksum_in_progress[0], result_hash])

def _get_file_checksum(self, localFilePath, to_match=None):
with open(localFilePath, 'rb') as f:
hasher = self._start_checksum(to_match=to_match)
contents = f.read(1024 * 1024)
while contents != b'':
self._update_checksum(hasher, contents)
contents = f.read(1024 * 1024)
return self._finish_checksum(hasher)

@contextmanager
def uploadStream(self, multipart=True, allowInlining=True):
"""
@@ -281,8 +247,6 @@ def uploadStream(self, multipart=True, allowInlining=True):
info = self
store = self.outer



pipe = MultiPartPipe() if multipart else SinglePartPipe()
with pipe as writable:
yield writable
@@ -360,14 +324,13 @@ def download(self, localFilePath, verifyChecksum=True):
obj.download_file(Filename=tmpPath, ExtraArgs={'VersionId': self.version, **headerArgs})

if verifyChecksum and self.checksum:
try:
# This automatically compares the result and matches the algorithm.
self._get_file_checksum(localFilePath, self.checksum)
except ChecksumError as e:
# Annotate checksum mismatches with file name
raise ChecksumError('Checksums do not match for file %s.' % localFilePath) from e
# The error will get caught and result in a retry of the download until we run out of retries.
# TODO: handle obviously truncated downloads by resuming instead.
algorithm, expected_checksum = self.checksum.split('$')
computed = compute_checksum_for_file(localFilePath, algorithm=algorithm)
if self.checksum != computed:
raise ChecksumError(f'Checksum mismatch for file {localFilePath}. '
f'Expected: {self.checksum} Actual: {computed}')
# The error will get caught and result in a retry of the download until we run out of retries.
# TODO: handle obviously truncated downloads by resuming instead.
else:
assert False

@@ -396,10 +359,11 @@ class HashingPipe(ReadableTransformingPipe):
"""

def transform(self, readable, writable):
hasher = info._start_checksum(to_match=info.checksum)
algorithm, _ = info.checksum.split('$')
hasher = getattr(hashlib, algorithm)()
contents = readable.read(1024 * 1024)
while contents != b'':
info._update_checksum(hasher, contents)
hasher.update(contents)
try:
writable.write(contents)
except BrokenPipeError:
@@ -409,7 +373,9 @@ def transform(self, readable, writable):
contents = readable.read(1024 * 1024)
# We reached EOF in the input.
# Finish checksumming and verify.
info._finish_checksum(hasher)
result_hash = hasher.hexdigest()
if f'{algorithm}${result_hash}' != info.checksum:
raise ChecksumError('')
# Now stop so EOF happens in the output.

with DownloadPipe() as readable:
Loading