Skip to content

Commit

Permalink
Refactor force upload. Added --cache-dir arg
Browse files Browse the repository at this point in the history
  • Loading branch information
pcstout committed Oct 9, 2020
1 parent 5e55696 commit a073014
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 24 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change Log

## Version 0.0.4 (2020-10-09)
### Added
- Refactor force upload.
- Added `--cache-dir` flag.

## Version 0.0.3 (2020-09-21)
### Added
- Added `--force-upload` flag.
Expand Down
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ SYNAPSE_PASSWORD=your-synapse-password
```text
usage: synapse-uploader [-h] [--version] [-r REMOTE_FOLDER_PATH] [-d DEPTH]
[-t THREADS] [-u USERNAME] [-p PASSWORD]
[-ll LOG_LEVEL] [-ld LOG_DIR] [-f]
[-ll LOG_LEVEL] [-ld LOG_DIR] [-f] [-cd CACHE_DIR]
entity-id local-path
positional arguments:
Expand All @@ -57,19 +57,22 @@ optional arguments:
Set the directory where the log file will be written.
-f, --force-upload Force files to be re-uploaded. This will clear the
local Synapse cache and increment each file's version.
-cd CACHE_DIR, --cache-dir CACHE_DIR
Set the directory where the Synapse cache will be
stored.
```

## Examples

Upload all the folders and files in `~/my_study` to your Project ID `syn123456`:

- Linux: `synapse_uploader syn123456 ~/my_study`
- Windows: `synapse_uploader syn123456 %USERPROFILE%\my_study`
- Linux: `synapse-uploader syn123456 ~/my_study`
- Windows: `synapse-uploader syn123456 %USERPROFILE%\my_study`

Upload all the folders and files in `~/my_study` to your Project ID `syn123456` in the `drafts/my_study` folder:

- Linux: `synapse_uploader syn123456 ~/my_study -r drafts/my_study`
- Windows: `synapse_uploader syn123456 %USERPROFILE%\my_study -r drafts\my_study`
- Linux: `synapse-uploader syn123456 ~/my_study -r drafts/my_study`
- Windows: `synapse-uploader syn123456 %USERPROFILE%\my_study -r drafts\my_study`

> Note: The correct path separator (`\` for Windows and `/` for Linux) must be used in both the `local-folder-path` and the `remote-folder-path`.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@
]
},
install_requires=[
"synapseclient>=2.1.0"
"synapseclient>=2.1.0,<3.0.0"
]
)
2 changes: 1 addition & 1 deletion src/synapse_uploader/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.0.3'
__version__ = '0.0.4'
5 changes: 4 additions & 1 deletion src/synapse_uploader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def main():
help='Force files to be re-uploaded. This will clear the local Synapse cache and increment each file\'s version.',
default=False,
action='store_true')
parser.add_argument('-cd', '--cache-dir',
help='Set the directory where the Synapse cache will be stored.')

args = parser.parse_args()

Expand Down Expand Up @@ -108,7 +110,8 @@ def main():
max_threads=args.threads,
username=args.username,
password=args.password,
force_upload=args.force_upload
force_upload=args.force_upload,
cache_dir=args.cache_dir
).execute()

print('Output logged to: {0}'.format(log_filename))
Expand Down
68 changes: 56 additions & 12 deletions src/synapse_uploader/synapse_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import concurrent.futures
import threading
import logging
from datetime import datetime, timedelta
import functools
from datetime import datetime
import synapseclient as syn
from .utils import Utils

Expand All @@ -26,7 +27,8 @@ def __init__(self,
username=None,
password=None,
synapse_client=None,
force_upload=False):
force_upload=False,
cache_dir=None):

self._synapse_entity_id = synapse_entity_id
self._local_path = Utils.expand_path(local_path)
Expand All @@ -37,6 +39,7 @@ def __init__(self,
self._password = password
self._synapse_client = synapse_client
self._force_upload = force_upload
self._cache_dir = cache_dir

self.start_time = None
self.end_time = None
Expand Down Expand Up @@ -65,11 +68,7 @@ def execute(self):
return

if self._force_upload:
# NOTE: The cache must be purged in order to force the file to re-upload.
print('Forcing upload. Cache will be purged. Entity versions will be incremented.')
# Set the purge date way in the future to account for local time slop.
purge_count = self._synapse_client.cache.purge(datetime.today() + timedelta(weeks=52))
print('{0} files purged from cache.'.format(purge_count))
print('Forcing upload. Entity versions will be incremented.')

remote_entity = self._synapse_client.get(self._synapse_entity_id, downloadFile=False)
remote_entity_is_file = False
Expand Down Expand Up @@ -157,6 +156,8 @@ def _synapse_login(self):
logging.info('Logging into Synapse as: {0}'.format(self._username))
try:
self._synapse_client = syn.Synapse(skip_checks=True)
if self._cache_dir:
self._synapse_client.cache.cache_root_dir = os.path.join(self._cache_dir, '.synapseCache')
self._synapse_client.login(self._username, self._password, silent=True)
except Exception as ex:
self._synapse_client = None
Expand Down Expand Up @@ -243,7 +244,8 @@ def _upload_file_to_synapse(self, local_file, synapse_parent):
return synapse_file

# Skip empty files since these will error when uploading via the synapseclient.
if os.path.getsize(local_file) < 1:
local_file_size = os.path.getsize(local_file)
if local_file_size < 1:
logging.info('Skipping empty file: {0}'.format(local_file))
return synapse_file

Expand All @@ -253,14 +255,29 @@ def _upload_file_to_synapse(self, local_file, synapse_parent):
max_attempts = 5
attempt_number = 0
exception = None
log_success_prefix = 'File'

while attempt_number < max_attempts and not synapse_file:
try:
attempt_number += 1
exception = None
synapse_file = self._synapse_client.store(
syn.File(path=local_file, name=file_name, parent=synapse_parent),
forceVersion=self._force_upload)
needs_upload = True

file_obj = self._find_synapse_file(synapse_parent['id'], local_file)
if file_obj:
file_obj.path = local_file
if self._force_upload:
self._synapse_client.cache.remove(file_obj)
else:
if file_obj['_file_handle']['contentSize'] == local_file_size and \
file_obj['_file_handle']['contentMd5'] == Utils.get_md5(local_file):
needs_upload = False
log_success_prefix = 'File is Current'
else:
file_obj = syn.File(path=local_file, name=file_name, parent=synapse_parent)

if needs_upload or self._force_upload:
synapse_file = self._synapse_client.store(file_obj, forceVersion=self._force_upload)
except Exception as ex:
exception = ex
logging.error('[File ERROR] {0} -> {1} : {2}'.format(local_file, full_synapse_path, str(ex)))
Expand All @@ -273,10 +290,37 @@ def _upload_file_to_synapse(self, local_file, synapse_parent):
self.has_errors = True
logging.error('[File FAILED] {0} -> {1} : {2}'.format(local_file, full_synapse_path, str(exception)))
else:
logging.info('[File] {0} -> {1}'.format(local_file, full_synapse_path))
logging.info('[{0}] {1} -> {2}'.format(log_success_prefix, local_file, full_synapse_path))

return synapse_file

def _find_synapse_file(self, synapse_parent_id, local_file_path):
"""Finds a Synapse file by its parent and local_file name."""
children = self._get_synapse_children(synapse_parent_id)

for child in children:
if child['name'] == os.path.basename(local_file_path):
syn_file = self._synapse_client.get(child['id'], downloadFile=False)
# Synapse can store a file with two names: 1) The entity name 2) the actual filename.
# Check that the actual filename matches the local file name to ensure we have the same file.
if syn_file['_file_handle']['fileName'] != os.path.basename(local_file_path):
for find_child in children:
if find_child == child:
continue
syn_child = self._synapse_client.get(find_child['id'], downloadFile=False)
if syn_child['_file_handle']['fileName'] == os.path.basename(local_file_path):
return syn_child
return syn_file

return None

LRU_MAXSIZE = (os.cpu_count() or 1) * 5

@functools.lru_cache(maxsize=LRU_MAXSIZE, typed=True)
def _get_synapse_children(self, synapse_parent_id):
"""Gets the child files metadata for a parent Synapse container."""
return list(self._synapse_client.getChildren(synapse_parent_id, includeTypes=["file"]))

def _set_synapse_parent(self, parent):
with self._thread_lock:
self._synapse_parents[parent.id] = parent
Expand Down
15 changes: 15 additions & 0 deletions src/synapse_uploader/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import hashlib
import os
import pathlib


class Utils:
KB = 1024
MB = KB * KB
CHUNK_SIZE = 10 * MB

@staticmethod
def app_dir():
Expand Down Expand Up @@ -40,3 +44,14 @@ def ensure_dirs(local_path):
"""
if not os.path.isdir(local_path):
os.makedirs(local_path)

@staticmethod
def get_md5(local_path):
md5 = hashlib.md5()
with open(local_path, mode='rb') as fd:
while True:
chunk = fd.read(Utils.CHUNK_SIZE)
if not chunk:
break
md5.update(chunk)
return md5.hexdigest()
6 changes: 4 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@


def test_cli(mocker):
args = ['', 'syn123', '/tmp', '-r', '10', '-d', '20', '-t', '30', '-u', '40', '-p', '50', '-ll', 'debug', '-f']
args = ['', 'syn123', '/tmp', '-r', '10', '-d', '20', '-t', '30', '-u', '40', '-p', '50', '-ll', 'debug', '-f',
'-cd', '/tmp/cache']
mocker.patch('sys.argv', args)
mocker.patch('src.synapse_uploader.synapse_uploader.SynapseUploader.execute', return_value=None)
mock_init = mocker.patch.object(SynapseUploader, '__init__', return_value=None)
Expand All @@ -18,5 +19,6 @@ def test_cli(mocker):
max_threads=30,
username='40',
password='50',
force_upload=True
force_upload=True,
cache_dir='/tmp/cache'
)
18 changes: 16 additions & 2 deletions tests/test_synapse_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_password_value():
assert syn_uploader._password == password


def test_password_value():
def test_force_upload_value():
for b_value in [True, False]:
syn_uploader = SynapseUploader('None', 'None', force_upload=b_value)
assert syn_uploader._force_upload == b_value
Expand All @@ -106,6 +106,14 @@ def test_synapse_client_value():
assert syn_uploader._synapse_client == client


def test_cache_dir(new_temp_dir):
syn_uploader = SynapseUploader('None', 'None', cache_dir=new_temp_dir)
syn_uploader._synapse_login() is True
full_cache_dir = os.path.join(new_temp_dir, '.synapseCache')
assert syn_uploader._synapse_client.cache.cache_root_dir == full_cache_dir
assert os.path.isdir(full_cache_dir)


def test_login(syn_client, monkeypatch, mocker):
# Uses ENV
syn_uploader = SynapseUploader('None', 'None')
Expand Down Expand Up @@ -312,17 +320,23 @@ def test_upload_file(syn_client, syn_test_helper, new_syn_project, new_temp_file


def test_force_upload(syn_client, syn_test_helper, new_syn_project, new_temp_file):
annotations = {"one": ["two"]}

file_name = os.path.basename(new_temp_file)
syn_file = syn_test_helper.create_file(name=file_name, path=new_temp_file, parent=new_syn_project)
syn_file = syn_test_helper.create_file(name=file_name, path=new_temp_file, parent=new_syn_project,
annotations=annotations)
assert syn_file.versionNumber == 1
assert syn_file.annotations == annotations

SynapseUploader(syn_file.id, new_temp_file, synapse_client=syn_client).execute()
syn_file = syn_client.get(syn_file.id)
assert syn_file.versionNumber == 1
assert syn_file.annotations == annotations

SynapseUploader(syn_file.id, new_temp_file, force_upload=True, synapse_client=syn_client).execute()
syn_file = syn_client.get(syn_file.id)
assert syn_file.versionNumber == 2
assert syn_file.annotations == annotations


def test_upload_max_depth(syn_client, new_syn_project, new_temp_dir):
Expand Down

0 comments on commit a073014

Please sign in to comment.