From 48da7c24bbf6112eb01c97e8c0e3ac937894d3f1 Mon Sep 17 00:00:00 2001 From: Audrey Musselman-Brown Date: Thu, 23 Jun 2016 21:50:13 +0000 Subject: [PATCH 01/16] Convert ADAM pipeline to using Toil Spark support (resolves #325) --- .../adam_gatk_pipeline/align_and_call.py | 5 +- .../adam_pipeline/adam_preprocessing.py | 23 +- src/toil_scripts/spark_utils/__init__.py | 0 src/toil_scripts/spark_utils/spawn_cluster.py | 264 ------------------ 4 files changed, 14 insertions(+), 278 deletions(-) delete mode 100644 src/toil_scripts/spark_utils/__init__.py delete mode 100644 src/toil_scripts/spark_utils/spawn_cluster.py diff --git a/src/toil_scripts/adam_gatk_pipeline/align_and_call.py b/src/toil_scripts/adam_gatk_pipeline/align_and_call.py index bebe20bc..31a29932 100644 --- a/src/toil_scripts/adam_gatk_pipeline/align_and_call.py +++ b/src/toil_scripts/adam_gatk_pipeline/align_and_call.py @@ -120,6 +120,8 @@ import yaml # import toil features from toil.job import Job +# these don't seem necessary! but, must be imported here due to a serialization issue +from toil.lib.spark import spawn_spark_cluster # import job steps from other toil pipelines from toil_scripts.adam_pipeline.adam_preprocessing import * #static_adam_preprocessing_dag @@ -128,9 +130,6 @@ from toil_scripts.gatk_processing.gatk_preprocessing import * #download_gatk_files from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file -# these don't seem necessary! but, must be imported here due to a serialization issue -from toil_scripts.spark_utils.spawn_cluster import * - from toil_scripts.lib.programs import mock_mode # import autoscaling tools diff --git a/src/toil_scripts/adam_pipeline/adam_preprocessing.py b/src/toil_scripts/adam_pipeline/adam_preprocessing.py index 1681a40c..224a16b3 100644 --- a/src/toil_scripts/adam_pipeline/adam_preprocessing.py +++ b/src/toil_scripts/adam_pipeline/adam_preprocessing.py @@ -36,11 +36,12 @@ import yaml from toil.job import Job +from toil.lib.spark import spawn_spark_cluster + from toil_scripts.adam_uberscript.automated_scaling import SparkMasterAddress from toil_scripts.lib import require from toil_scripts.lib.programs import docker_call, mock_mode from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file -from toil_scripts.spark_utils.spawn_cluster import start_spark_hdfs_cluster SPARK_MASTER_PORT = "7077" HDFS_MASTER_PORT = "8020" @@ -264,6 +265,7 @@ def upload_data(master_ip, inputs, hdfs_name, upload_name, spark_on_toil): log.info("Uploading output BAM %s to %s.", hdfs_name, upload_name) call_conductor(master_ip, inputs, hdfs_name, upload_name) + remove_file(master_ip, hdfs_name, spark_on_toil) def download_run_and_upload(job, master_ip, inputs, spark_on_toil): @@ -296,7 +298,7 @@ def download_run_and_upload(job, master_ip, inputs, spark_on_toil): out_file = inputs.output_dir + "/" + sample_name + inputs.suffix + ".bam" upload_data(master_ip, inputs, adam_output, out_file, spark_on_toil) - + remove_file(master_ip, hdfs_subdir, spark_on_toil) except: remove_file(master_ip, hdfs_subdir, spark_on_toil) raise @@ -331,15 +333,14 @@ def static_adam_preprocessing_dag(job, inputs, sample, output_dir, suffix=''): # Dynamic subclusters, i.e. Spark-on-Toil spark_on_toil = True cores = multiprocessing.cpu_count() - start_cluster = job.wrapJobFn(start_spark_hdfs_cluster, - inputs.num_nodes-1, - inputs.memory, - download_run_and_upload, - jArgs=(inputs, spark_on_toil), - jCores=cores, - jMemory="%s G" % - inputs.memory).encapsulate() - job.addChild(start_cluster) + master_ip = spawn_spark_cluster(job, + False, # Sudo + inputs.num_nodes-1, + cores=cores, + memory=inputs.memory) + spark_work = job.wrapJobFn(download_run_and_upload, + master_ip, inputs, spark_on_toil) + job.addChild(spark_work) def scale_external_spark_cluster(num_samples=1): diff --git a/src/toil_scripts/spark_utils/__init__.py b/src/toil_scripts/spark_utils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/toil_scripts/spark_utils/spawn_cluster.py b/src/toil_scripts/spark_utils/spawn_cluster.py deleted file mode 100644 index 8c0a6692..00000000 --- a/src/toil_scripts/spark_utils/spawn_cluster.py +++ /dev/null @@ -1,264 +0,0 @@ -""" -Spawns a Spark cluster backed by HDFS. - -@author Audrey Musselman-Brown, almussel@ucsc.edu -@author Frank Austin Nothaft, fnothaft@berkeley.edu -""" - -import logging -import multiprocessing -import os -from subprocess import call, check_call, check_output -import time - -from toil.job import Job -from toil_scripts.lib.programs import docker_call - -_log = logging.getLogger(__name__) - -def start_spark_hdfs_cluster(job, - numWorkers, - executorMemory, - jFn, - jArgs = [], - jCores = None, - jMemory = None, - jDisk = None): - - # build job requirement dictionary - jReqs = {} - if jCores: - jReqs['cores'] = jCores - if jMemory: - jReqs['memory'] = jMemory - if jDisk: - jReqs['disk'] = jDisk - - masterIP = start_spark_hdfs_master(job, executorMemory) - job.addChildJobFn(start_spark_hdfs_workers, - masterIP, - numWorkers, - executorMemory, - jFn, - jArgs, - jReqs) - - -def start_spark_hdfs_master(job, executorMemory): - """ - Starts the master service. - """ - - _log.info("Starting Spark master and HDFS namenode.") - - masterIP = job.addService(MasterService("%s G" % executorMemory)) - - _log.info("Spark Master and HDFS Namenode started.") - - return masterIP - -def start_spark_hdfs_workers(job, masterIP, numWorkers, executorMemory, jFn, jArgs, jReqs): - """ - Starts the worker services. - """ - _log.info("Starting %d Spark workers and HDFS Datanodes.", numWorkers) - - for i in range(numWorkers): - job.addService(WorkerService(masterIP, "%s G" % executorMemory)) - - job.addChildJobFn(jFn, masterIP, *jArgs, **jReqs) - -class MasterService(Job.Service): - - def __init__(self, memory): - - self.memory = memory - self.cores = multiprocessing.cpu_count() - Job.Service.__init__(self, memory = self.memory, cores = self.cores) - - def start(self, fileStore): - """ - Start spark and hdfs master containers - - fileStore: Unused - """ - - self.IP = check_output(["hostname", "-f",])[:-1] - - _log.info("Started Spark master container.") - self.sparkContainerID = docker_call(tool = "quay.io/ucsc_cgl/apache-spark-master:1.5.2", - docker_parameters = ["--net=host", - "-d", - "-v", "/mnt/ephemeral/:/ephemeral/:rw", - "-e", "SPARK_MASTER_IP="+self.IP, - "-e", "SPARK_LOCAL_DIRS=/ephemeral/spark/local", - "-e", "SPARK_WORKER_DIR=/ephemeral/spark/work"], - rm=False, - check_output = True, - mock = False)[:-1] - _log.info("Started HDFS Datanode.") - self.hdfsContainerID = docker_call(tool = "quay.io/ucsc_cgl/apache-hadoop-master:2.6.2", - docker_parameters = ["--net=host", - "-d"], - parameters = [self.IP], - rm=False, - check_output = True, - mock = False)[:-1] - return self.IP - - - def stop(self, fileStore): - """ - Stop and remove spark and hdfs master containers - - fileStore: Unused - """ - - call(["docker", "exec", self.sparkContainerID, "rm", "-r", "/ephemeral/spark"]) - call(["docker", "stop", self.sparkContainerID]) - call(["docker", "rm", self.sparkContainerID]) - _log.info("Stopped Spark master.") - - call(["docker", "stop", self.hdfsContainerID]) - call(["docker", "rm", self.hdfsContainerID]) - _log.info("Stopped HDFS namenode.") - - return - - - def check(self): - """ - Checks to see if Spark master and HDFS namenode are still running. - """ - - containers = check_output(["docker", "ps", "-q"]) - - return ((self.sparkContainerID in containers) and - (self.hdfsContainerID in containers)) - - -SPARK_MASTER_PORT = "7077" - -class WorkerService(Job.Service): - - def __init__(self, masterIP, memory): - self.masterIP = masterIP - self.memory = memory - self.cores = multiprocessing.cpu_count() - Job.Service.__init__(self, memory = self.memory, cores = self.cores) - - def start(self, fileStore): - """ - Start spark and hdfs worker containers - - fileStore: Unused - """ - # start spark and our datanode - self.sparkContainerID = docker_call(tool = "quay.io/ucsc_cgl/apache-spark-worker:1.5.2", - docker_parameters = ["--net=host", - "-d", - "-v", "/mnt/ephemeral/:/ephemeral/:rw", - "-e", "\"SPARK_MASTER_IP="+self.masterIP+":"+SPARK_MASTER_PORT+"\"", - "-e", "SPARK_LOCAL_DIRS=/ephemeral/spark/local", - "-e", "SPARK_WORKER_DIR=/ephemeral/spark/work"], - parameters = [self.masterIP+":"+SPARK_MASTER_PORT], - rm=False, - check_output = True, - mock = False)[:-1] - self.__start_datanode() - - # fake do/while to check if HDFS is up - hdfs_down = True - retries = 0 - while hdfs_down and (retries < 5): - - _log.info("Sleeping 30 seconds before checking HDFS startup.") - time.sleep(30) - clusterID = "" - try: - clusterID = check_output(["docker", - "exec", - self.hdfsContainerID, - "grep", - "clusterID", - "-R", - "/opt/apache-hadoop/logs"]) - except: - # grep returns a non-zero exit code if the pattern is not found - # we expect to not find the pattern, so a non-zero code is OK - pass - - if "Incompatible" in clusterID: - _log.warning("Hadoop Datanode failed to start with: %s", clusterID) - _log.warning("Retrying container startup, retry #%d.", retries) - retries += 1 - - _log.warning("Removing ephemeral hdfs directory.") - check_call(["docker", - "exec", - self.hdfsContainerID, - "rm", - "-rf", - "/ephemeral/hdfs"]) - - _log.warning("Killing container %s.", self.hdfsContainerID) - check_call(["docker", - "kill", - self.hdfsContainerID]) - - # todo: this is copied code. clean up! - _log.info("Restarting datanode.") - self.__start_datanode() - - else: - _log.info("HDFS datanode started up OK!") - hdfs_down = False - - if retries >= 5: - raise RuntimeError("Failed %d times trying to start HDFS datanode." % retries) - - return - - def __start_datanode(self): - """ - Launches the Hadoop datanode. - """ - self.hdfsContainerID = docker_call(tool = "quay.io/ucsc_cgl/apache-hadoop-worker:2.6.2", - docker_parameters = ["--net=host", - "-d", - "-v", "/mnt/ephemeral/:/ephemeral/:rw"], - parameters = [self.masterIP], - rm=False, - check_output = True, - mock = False)[:-1] - - - def stop(self, fileStore): - """ - Stop spark and hdfs worker containers - - fileStore: Unused - """ - - call(["docker", "exec", self.sparkContainerID, "rm", "-r", "/ephemeral/spark"]) - call(["docker", "stop", self.sparkContainerID]) - call(["docker", "rm", self.sparkContainerID]) - _log.info("Stopped Spark worker.") - - call(["docker", "exec", self.hdfsContainerID, "rm", "-r", "/ephemeral/hdfs"]) - call(["docker", "stop", self.hdfsContainerID]) - call(["docker", "rm", self.hdfsContainerID]) - _log.info("Stopped HDFS datanode.") - - return - - - def check(self): - """ - Checks to see if Spark master and HDFS namenode are still running. - """ - - containers = check_output(["docker", "ps", "-q"]) - - return ((self.sparkContainerID in containers) and - (self.hdfsContainerID in containers)) From 412b2c0544953c286aeb8e0260e14158f56a15e2 Mon Sep 17 00:00:00 2001 From: Audrey Musselman-Brown Date: Tue, 28 Jun 2016 18:46:27 +0000 Subject: [PATCH 02/16] Misc debugging --- src/toil_scripts/adam_pipeline/adam_preprocessing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil_scripts/adam_pipeline/adam_preprocessing.py b/src/toil_scripts/adam_pipeline/adam_preprocessing.py index 224a16b3..436f34f7 100644 --- a/src/toil_scripts/adam_pipeline/adam_preprocessing.py +++ b/src/toil_scripts/adam_pipeline/adam_preprocessing.py @@ -312,7 +312,7 @@ def static_adam_preprocessing_dag(job, inputs, sample, output_dir, suffix=''): inputs.output_dir = output_dir inputs.suffix = suffix - if inputs.master_ip: + if inputs.master_ip is not None: if inputs.master_ip == 'auto': # Static, standalone Spark cluster managed by uberscript spark_on_toil = False @@ -384,7 +384,7 @@ def main(): help='Path to the (filled in) config file, generated with "generate-config". ' '\nDefault value: "%(default)s"') parser_run.add_argument('--sample', help='The full s3 url of the input SAM or BAM file') - parser_run.add_argument('--output-dir', default=None, + parser_run.add_argument('--output-dir', required=True, default=None, help='full path where final results will be output') parser_run.add_argument('-s', '--suffix', default='', help='Additional suffix to add to the names of the output files') From e68197969661a99778daef0e953e1714cfdc3f96 Mon Sep 17 00:00:00 2001 From: Audrey Musselman-Brown Date: Tue, 28 Jun 2016 21:09:23 +0000 Subject: [PATCH 03/16] Bump Toil version --- jenkins.sh | 4 ++-- setup.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/jenkins.sh b/jenkins.sh index 3365b152..513e17a5 100755 --- a/jenkins.sh +++ b/jenkins.sh @@ -11,8 +11,8 @@ export PATH=$PATH:${PWD}/bin # Create Toil venv virtualenv venv . venv/bin/activate -pip install toil==3.1.6 -pip install bd2k-python-lib==1.10.dev6 +pip install toil==3.2.1 +pip install bd2k-python-lib==1.13.dev21 make develop make test make clean diff --git a/setup.py b/setup.py index 42bfa657..a897fa45 100644 --- a/setup.py +++ b/setup.py @@ -21,9 +21,9 @@ from version import version from pkg_resources import parse_version, require, DistributionNotFound -toil_min_version = '3.1.6' -toil_max_version = '3.2.0' -bpl_min_version = '1.10.dev6' +toil_min_version = '3.2.1' +toil_max_version = '3.3.0' +bpl_min_version = '1.13.dev21' # Toil version check -- Raise warning instead of using intall_requires to avoid virtualenv conflicts try: From 89875eec5172dbd302eacf303ef30be17b079b52 Mon Sep 17 00:00:00 2001 From: John Vivian Date: Wed, 29 Jun 2016 14:26:12 -0700 Subject: [PATCH 04/16] Address 3.2.1 fixes --- src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py index 856ba997..240fc36e 100644 --- a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py +++ b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py @@ -40,24 +40,25 @@ def download_sample(job, sample, config): config.file_type, config.paired, config.uuid, config.url = sample config.paired = True if config.paired == 'paired' else False config.cores = min(config.maxCores, multiprocessing.cpu_count()) + disk = '2G' if config.ci_test else '20G' job.fileStore.logToMaster('UUID: {}\nURL: {}\nPaired: {}\nFile Type: {}\nCores: {}\nCIMode: {}'.format( config.uuid, config.url, config.paired, config.file_type, config.cores, config.ci_test)) # Download or locate local file and place in the jobStore tar_id, r1_id, r2_id = None, None, None if config.file_type == 'tar': tar_id = job.addChildJobFn(download_url_job, config.url, cghub_key_path=config.gtkey, - s3_key_path=config.ssec, disk='20G').rv() + s3_key_path=config.ssec, disk=disk).rv() else: if config.paired: require(len(config.url.split(',')) == 2, 'Fastq pairs must have 2 URLS separated by comma') r1_url, r2_url = config.url.split(',') r1_id = job.addChildJobFn(download_url_job, r1_url, cghub_key_path=config.gtkey, - s3_key_path=config.ssec, disk='20G').rv() + s3_key_path=config.ssec, disk=disk).rv() r2_id = job.addChildJobFn(download_url_job, r2_url, cghub_key_path=config.gtkey, - s3_key_path=config.ssec, disk='20G').rv() + s3_key_path=config.ssec, disk=disk).rv() else: r1_id = job.addChildJobFn(download_url_job, config.url, cghub_key_path=config.gtkey, - s3_key_path=config.ssec, disk='20G').rv() + s3_key_path=config.ssec, disk=disk).rv() job.addFollowOnJobFn(preprocessing_declaration, config, tar_id, r1_id, r2_id) @@ -176,7 +177,7 @@ def process_sample_tar(job, config, tar_id): tar_path = os.path.join(work_dir, 'sample.tar') # Untar File and concat subprocess.check_call(['tar', '-xvf', tar_path, '-C', work_dir], stderr=PIPE, stdout=PIPE) - os.remove(os.path.join(work_dir, 'sample.tar')) + job.fileStore.deleteLocalFile(os.path.join(work_dir, 'sample.tar')) fastqs = [] for root, subdir, files in os.walk(work_dir): fastqs.extend([os.path.join(root, x) for x in files]) From 00f782bc8be324c424c057183e94b0b9f181b987 Mon Sep 17 00:00:00 2001 From: John Vivian Date: Wed, 29 Jun 2016 14:49:19 -0700 Subject: [PATCH 05/16] Add mutable=True --- src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py index 240fc36e..777803b7 100644 --- a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py +++ b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py @@ -173,7 +173,7 @@ def process_sample_tar(job, config, tar_id): work_dir = job.fileStore.getLocalTempDir() r1_id, r2_id = None, None # I/O - job.fileStore.readGlobalFile(tar_id, os.path.join(work_dir, 'sample.tar')) + job.fileStore.readGlobalFile(tar_id, os.path.join(work_dir, 'sample.tar'), mutable=True) tar_path = os.path.join(work_dir, 'sample.tar') # Untar File and concat subprocess.check_call(['tar', '-xvf', tar_path, '-C', work_dir], stderr=PIPE, stdout=PIPE) From efed17cccb61eed2576219c5ab745e1859187d6b Mon Sep 17 00:00:00 2001 From: John Vivian Date: Wed, 29 Jun 2016 15:32:30 -0700 Subject: [PATCH 06/16] Omit deleteLocalFile --- src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py index 777803b7..edc73b0e 100644 --- a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py +++ b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py @@ -173,11 +173,10 @@ def process_sample_tar(job, config, tar_id): work_dir = job.fileStore.getLocalTempDir() r1_id, r2_id = None, None # I/O - job.fileStore.readGlobalFile(tar_id, os.path.join(work_dir, 'sample.tar'), mutable=True) + job.fileStore.readGlobalFile(tar_id, os.path.join(work_dir, 'sample.tar')) tar_path = os.path.join(work_dir, 'sample.tar') # Untar File and concat subprocess.check_call(['tar', '-xvf', tar_path, '-C', work_dir], stderr=PIPE, stdout=PIPE) - job.fileStore.deleteLocalFile(os.path.join(work_dir, 'sample.tar')) fastqs = [] for root, subdir, files in os.walk(work_dir): fastqs.extend([os.path.join(root, x) for x in files]) From 5e72b7cc91b46744bf48ccb6468516c274d68679 Mon Sep 17 00:00:00 2001 From: John Vivian Date: Wed, 29 Jun 2016 17:20:01 -0700 Subject: [PATCH 07/16] Add mutable flag for renamed files --- src/toil_scripts/tools/quantifiers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil_scripts/tools/quantifiers.py b/src/toil_scripts/tools/quantifiers.py index 4691dd2a..b1c5e295 100644 --- a/src/toil_scripts/tools/quantifiers.py +++ b/src/toil_scripts/tools/quantifiers.py @@ -107,8 +107,8 @@ def run_rsem_postprocess(job, uuid, rsem_gene_id, rsem_isoform_id): """ work_dir = job.fileStore.getLocalTempDir() # I/O - job.fileStore.readGlobalFile(rsem_gene_id, os.path.join(work_dir, 'rsem_gene.tab')) - job.fileStore.readGlobalFile(rsem_isoform_id, os.path.join(work_dir, 'rsem_isoform.tab')) + job.fileStore.readGlobalFile(rsem_gene_id, os.path.join(work_dir, 'rsem_gene.tab'), mutable=True) + job.fileStore.readGlobalFile(rsem_isoform_id, os.path.join(work_dir, 'rsem_isoform.tab'), mutable=True) # Convert RSEM files into individual .tab files. docker_call(tool='jvivian/rsem_postprocess', parameters=[uuid], work_dir=work_dir) os.rename(os.path.join(work_dir, 'rsem_gene.tab'), os.path.join(work_dir, 'rsem_genes.results')) From 9ade2e6077e9d8ce043eeb77e16c0cefe7f09147 Mon Sep 17 00:00:00 2001 From: John Vivian Date: Wed, 29 Jun 2016 17:35:28 -0700 Subject: [PATCH 08/16] Add disk to test_map_job --- src/toil_scripts/lib/test/test_jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil_scripts/lib/test/test_jobs.py b/src/toil_scripts/lib/test/test_jobs.py index c0c66fc8..763a17bd 100644 --- a/src/toil_scripts/lib/test/test_jobs.py +++ b/src/toil_scripts/lib/test/test_jobs.py @@ -2,11 +2,11 @@ from toil.job import Job -def test_sample_batcher(tmpdir): +def test_map_job(tmpdir): from toil_scripts.lib.jobs import map_job options = Job.Runner.getDefaultOptions(os.path.join(str(tmpdir), 'test_store')) samples = [x for x in xrange(200)] - j = Job.wrapJobFn(map_job, _test_batch, samples, 'a', 'b', 'c') + j = Job.wrapJobFn(map_job, _test_batch, samples, 'a', 'b', 'c', disk='1K') Job.Runner.startToil(j, options) From 405f5341cc3ce897081b1b85dffaec32fe1948ec Mon Sep 17 00:00:00 2001 From: John Vivian Date: Thu, 30 Jun 2016 10:27:32 -0700 Subject: [PATCH 09/16] Use get_work_directory for test_map_job --- src/toil_scripts/lib/test/test_jobs.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/toil_scripts/lib/test/test_jobs.py b/src/toil_scripts/lib/test/test_jobs.py index 763a17bd..c06653a6 100644 --- a/src/toil_scripts/lib/test/test_jobs.py +++ b/src/toil_scripts/lib/test/test_jobs.py @@ -1,10 +1,13 @@ import os from toil.job import Job +from toil_scripts.lib import get_work_directory def test_map_job(tmpdir): from toil_scripts.lib.jobs import map_job - options = Job.Runner.getDefaultOptions(os.path.join(str(tmpdir), 'test_store')) + work_dir = get_work_directory() + options = Job.Runner.getDefaultOptions(os.path.join(work_dir, 'test_store')) + options.workDir = work_dir samples = [x for x in xrange(200)] j = Job.wrapJobFn(map_job, _test_batch, samples, 'a', 'b', 'c', disk='1K') Job.Runner.startToil(j, options) From 54b897b64a700cccae01caf1266f9ee8a600ea21 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Wed, 13 Jul 2016 11:31:00 -0700 Subject: [PATCH 10/16] If toil version is out of range, print the installed version. --- Makefile | 6 +++--- setup.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 8f0652c2..b499aa85 100644 --- a/Makefile +++ b/Makefile @@ -33,8 +33,8 @@ help: @echo "$$help" -python=python2.7 -pip=pip2.7 +python=python +pip=pip tests=src extras= @@ -76,7 +76,7 @@ clean: clean_develop clean_sdist clean_pypi check_venv: @$(python) -c 'import sys; sys.exit( int( not hasattr(sys, "real_prefix") ) )' \ - || ( echo "$(red)A virtualenv must be active.$(normal)" ; false ) + || ( echo "$(red)A virtualenv must be active.$(normal)" ; true ) check_clean_working_copy: diff --git a/setup.py b/setup.py index a897fa45..4e225982 100644 --- a/setup.py +++ b/setup.py @@ -33,9 +33,9 @@ 'http://toil.readthedocs.io/en/latest/installation.html'.format(toil_min_version)) if not parse_version(str(toil_min_version)) <= parse_version(toil_version) < parse_version(toil_max_version): - raise RuntimeError('Need Toil version within range [{},{}). Read about installing Toil at: ' + raise RuntimeError('Need Toil version within range [{},{}) Currrent installed version: {}. Read about installing Toil at: ' 'http://toil.readthedocs.io/en/latest/installation.html' - ''.format(toil_min_version, toil_max_version)) + ''.format(toil_min_version, toil_max_version, toil_version)) # bd2k-python-lib check -- Raise warning instead of install_requires to avoid version conflicts with Toil try: From 58e564caf3d4f36689bcf92b8bf96364ff83e053 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Tue, 12 Jul 2016 14:34:24 -0700 Subject: [PATCH 11/16] Modifies ADAM pipeline to support single machine mode (resolves #210) Adds a local mode to the ADAM pipeline where Spark runs in local mode with the maximum number of cores on the machine. This requires a local-dir switch too, for pointing ADAM to the data directory. --- .../adam_pipeline/adam_preprocessing.py | 54 ++++++++++++++----- src/toil_scripts/lib/files.py | 30 +++++++++-- 2 files changed, 65 insertions(+), 19 deletions(-) diff --git a/src/toil_scripts/adam_pipeline/adam_preprocessing.py b/src/toil_scripts/adam_pipeline/adam_preprocessing.py index 436f34f7..58f6de90 100644 --- a/src/toil_scripts/adam_pipeline/adam_preprocessing.py +++ b/src/toil_scripts/adam_pipeline/adam_preprocessing.py @@ -31,6 +31,7 @@ import logging import multiprocessing import os +import sys import textwrap from subprocess import check_call, check_output @@ -40,6 +41,7 @@ from toil_scripts.adam_uberscript.automated_scaling import SparkMasterAddress from toil_scripts.lib import require +from toil_scripts.lib.files import copy_files, move_files from toil_scripts.lib.programs import docker_call, mock_mode from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file @@ -110,18 +112,26 @@ def call_adam(master_ip, inputs, arguments): :type masterIP: MasterAddress """ - default_params = ["--master", - ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + if inputs.run_local: + master = ["--master", "local[*]"] + work_dir = inputs.local_dir + else: + master = ["--master", + ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),] + work_dir = '.' + + default_params = master + [ "--conf", ("spark.driver.memory=%sg" % inputs.memory), "--conf", ("spark.executor.memory=%sg" % inputs.memory), - "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)), "--conf", "spark.driver.maxResultSize=0", # set max result size to unlimited, see #177 "--"] - docker_call(rm = False, - tool = "quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80", - docker_parameters = master_ip.docker_parameters(["--net=host"]), - parameters = default_params + arguments, + docker_call(rm=False, + tool="quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80", + docker_parameters=master_ip.docker_parameters(["--net=host"]), + parameters=(default_params + arguments), + work_dir=work_dir, mock=False) @@ -277,8 +287,14 @@ def download_run_and_upload(job, master_ip, inputs, spark_on_toil): bam_name = inputs.sample.split('://')[-1].split('/')[-1] sample_name = ".".join(os.path.splitext(bam_name)[:-1]) + hdfs_subdir = sample_name + "-dir" - hdfs_dir = "hdfs://{0}:{1}/{2}".format(master_ip, HDFS_MASTER_PORT, hdfs_subdir) + + if inputs.run_local: + inputs.local_dir = job.fileStore.getLocalTempDir() + hdfs_dir = "/data/" + else: + hdfs_dir = "hdfs://{0}:{1}/{2}".format(master_ip, HDFS_MASTER_PORT, hdfs_subdir) try: hdfs_prefix = hdfs_dir + "/" + sample_name @@ -286,18 +302,26 @@ def download_run_and_upload(job, master_ip, inputs, spark_on_toil): hdfs_snps = hdfs_dir + "/" + inputs.dbsnp.split('://')[-1].split('/')[-1] - download_data(master_ip, inputs, inputs.dbsnp, inputs.sample, hdfs_snps, hdfs_bam) + if not inputs.run_local: + download_data(master_ip, inputs, inputs.dbsnp, inputs.sample, hdfs_snps, hdfs_bam) + else: + copy_files([inputs.sample, inputs.dbsnp], inputs.local_dir) adam_input = hdfs_prefix + ".adam" adam_snps = hdfs_dir + "/snps.var.adam" adam_convert(master_ip, inputs, hdfs_bam, hdfs_snps, adam_input, adam_snps, spark_on_toil) - adam_output = hdfs_prefix + ".processed.adam" + adam_output = hdfs_prefix + ".processed.bam" adam_transform(master_ip, inputs, adam_input, adam_snps, hdfs_dir, adam_output, spark_on_toil) out_file = inputs.output_dir + "/" + sample_name + inputs.suffix + ".bam" - upload_data(master_ip, inputs, adam_output, out_file, spark_on_toil) + if not inputs.run_local: + upload_data(master_ip, inputs, adam_output, out_file, spark_on_toil) + else: + local_adam_output = "%s/%s.processed.bam" % (inputs.local_dir, sample_name) + move_files([local_adam_output], inputs.output_dir) + remove_file(master_ip, hdfs_subdir, spark_on_toil) except: remove_file(master_ip, hdfs_subdir, spark_on_toil) @@ -312,8 +336,8 @@ def static_adam_preprocessing_dag(job, inputs, sample, output_dir, suffix=''): inputs.output_dir = output_dir inputs.suffix = suffix - if inputs.master_ip is not None: - if inputs.master_ip == 'auto': + if inputs.master_ip is not None or inputs.run_local: + if not inputs.run_local and inputs.master_ip == 'auto': # Static, standalone Spark cluster managed by uberscript spark_on_toil = False scale_up = job.wrapJobFn(scale_external_spark_cluster, 1) @@ -369,6 +393,8 @@ def generate_config(): dbsnp: # Required: The full s3 url of a VCF file of known snps memory: # Required: Amount of memory to allocate for Spark Driver and executor. # This should be equal to the available memory on each worker node. + run-local: # Optional: If true, runs ADAM locally and doesn't connect to a cluster. + local-dir: # Required if run-local is true. Sets the local directory to use for input. """[1:]) @@ -383,7 +409,7 @@ def main(): parser_run.add_argument('--config', default='adam_preprocessing.config', type=str, help='Path to the (filled in) config file, generated with "generate-config". ' '\nDefault value: "%(default)s"') - parser_run.add_argument('--sample', help='The full s3 url of the input SAM or BAM file') + parser_run.add_argument('--sample', help='The full s3 url/path to the input SAM or BAM file') parser_run.add_argument('--output-dir', required=True, default=None, help='full path where final results will be output') parser_run.add_argument('-s', '--suffix', default='', diff --git a/src/toil_scripts/lib/files.py b/src/toil_scripts/lib/files.py index 1e9d2954..3b30f1a7 100644 --- a/src/toil_scripts/lib/files.py +++ b/src/toil_scripts/lib/files.py @@ -1,7 +1,7 @@ from contextlib import closing import os -import tarfile import shutil +import tarfile def tarball_files(tar_name, file_paths, output_dir='.', prefix=''): @@ -21,18 +21,38 @@ def tarball_files(tar_name, file_paths, output_dir='.', prefix=''): f_out.add(file_path, arcname=arcname) -def move_files(file_paths, output_dir): +def __forall_files(file_paths, output_dir, op): """ - Moves files from the working directory to the output directory. + Applies a function to a set of files and an output directory. :param str output_dir: Output directory :param list[str] file_paths: Absolute file paths to move """ for file_path in file_paths: if not file_path.startswith('/'): - raise ValueError('Path provided is relative not absolute.') + raise ValueError('Path provided (%s) is relative not absolute.' % file_path) dest = os.path.join(output_dir, os.path.basename(file_path)) - shutil.move(file_path, dest) + op(file_path, dest) + + +def move_files(file_paths, output_dir): + """ + Moves files from the working directory to the output directory. + + :param str output_dir: Output directory + :param list[str] file_paths: Absolute file paths to move + """ + __forall_files(file_paths, output_dir, shutil.move) + + +def copy_files(file_paths, output_dir): + """ + Moves files from the working directory to the output directory. + + :param str output_dir: Output directory + :param list[str] file_paths: Absolute file paths to move + """ + __forall_files(file_paths, output_dir, shutil.copy) def consolidate_tarballs_job(job, fname_to_id): From 272dad65204e01fc35d8c902e20ea882a186cd51 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Wed, 13 Jul 2016 22:03:49 +0000 Subject: [PATCH 12/16] ADAM pipeline was being ran in a loop (resolves #329). --- src/toil_scripts/adam_pipeline/adam_preprocessing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil_scripts/adam_pipeline/adam_preprocessing.py b/src/toil_scripts/adam_pipeline/adam_preprocessing.py index 58f6de90..cdf2ade0 100644 --- a/src/toil_scripts/adam_pipeline/adam_preprocessing.py +++ b/src/toil_scripts/adam_pipeline/adam_preprocessing.py @@ -439,8 +439,8 @@ def main(): for arg in [inputs.dbsnp, inputs.memory]: require(arg, 'Required argument {} missing from config'.format(arg)) - Job.Runner.startToil(Job.wrapJobFn(static_adam_preprocessing_dag, inputs, - args.sample, args.output_dir), args) + Job.Runner.startToil(Job.wrapJobFn(static_adam_preprocessing_dag, inputs, + args.sample, args.output_dir), args) if __name__ == "__main__": main() From 647e588b4073328811fecb10506c5ffe0c300be4 Mon Sep 17 00:00:00 2001 From: John Vivian Date: Wed, 13 Jul 2016 11:03:15 -0700 Subject: [PATCH 13/16] Bump Toil/BPL version and expand upper limit --- jenkins.sh | 4 ++-- setup.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/jenkins.sh b/jenkins.sh index 513e17a5..61228a33 100755 --- a/jenkins.sh +++ b/jenkins.sh @@ -11,8 +11,8 @@ export PATH=$PATH:${PWD}/bin # Create Toil venv virtualenv venv . venv/bin/activate -pip install toil==3.2.1 -pip install bd2k-python-lib==1.13.dev21 +pip install toil==3.3.0 +pip install bd2k-python-lib==1.14a1.dev29 make develop make test make clean diff --git a/setup.py b/setup.py index 4e225982..89d812a1 100644 --- a/setup.py +++ b/setup.py @@ -21,9 +21,9 @@ from version import version from pkg_resources import parse_version, require, DistributionNotFound -toil_min_version = '3.2.1' -toil_max_version = '3.3.0' -bpl_min_version = '1.13.dev21' +toil_min_version = '3.3.0' +toil_max_version = '3.5.0' +bpl_min_version = '1.14a1.dev29' # Toil version check -- Raise warning instead of using intall_requires to avoid virtualenv conflicts try: From f7512d5ac3bcaa40fa2682236c2f1b44020a2861 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Wed, 20 Jul 2016 20:50:07 -0700 Subject: [PATCH 14/16] Allow ADAM to be run outside of Docker (resolves #371) Adds native-adam-path option to adam-pipeline. --- .../adam_pipeline/adam_preprocessing.py | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/toil_scripts/adam_pipeline/adam_preprocessing.py b/src/toil_scripts/adam_pipeline/adam_preprocessing.py index cdf2ade0..97880065 100644 --- a/src/toil_scripts/adam_pipeline/adam_preprocessing.py +++ b/src/toil_scripts/adam_pipeline/adam_preprocessing.py @@ -127,12 +127,19 @@ def call_adam(master_ip, inputs, arguments): "--conf", "spark.driver.maxResultSize=0", # set max result size to unlimited, see #177 "--"] - docker_call(rm=False, - tool="quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80", - docker_parameters=master_ip.docker_parameters(["--net=host"]), - parameters=(default_params + arguments), - work_dir=work_dir, - mock=False) + + # are we running adam via docker, or do we have a native path? + if inputs.native_adam_path is None: + docker_call(rm=False, + tool="quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80", + docker_parameters=master_ip.docker_parameters(["--net=host"]), + parameters=(default_params + arguments), + work_dir=work_dir, + mock=False) + else: + check_call(["%s/bin/adam-submit" % inputs.native_adam_path] + + default_params + + arguments) def remove_file(master_ip, filename, spark_on_toil): @@ -292,7 +299,10 @@ def download_run_and_upload(job, master_ip, inputs, spark_on_toil): if inputs.run_local: inputs.local_dir = job.fileStore.getLocalTempDir() - hdfs_dir = "/data/" + if inputs.native_adam_path is None: + hdfs_dir = "/data/" + else: + hdfs_dir = inputs.local_dir else: hdfs_dir = "hdfs://{0}:{1}/{2}".format(master_ip, HDFS_MASTER_PORT, hdfs_subdir) @@ -395,6 +405,7 @@ def generate_config(): # This should be equal to the available memory on each worker node. run-local: # Optional: If true, runs ADAM locally and doesn't connect to a cluster. local-dir: # Required if run-local is true. Sets the local directory to use for input. + native-adam-path: # Optional: If set, runs ADAM using the local build of ADAM at this path. """[1:]) From 3f2d49706029da76e91f0a856a8b4fa560559511 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Fri, 22 Jul 2016 16:23:36 -0700 Subject: [PATCH 15/16] Rename toil_scripts.tools.QC --> toil_scripts.tools.qc --- src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py | 2 +- src/toil_scripts/tools/{QC.py => qc.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/toil_scripts/tools/{QC.py => qc.py} (100%) diff --git a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py index edc73b0e..ae4a0b67 100644 --- a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py +++ b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py @@ -20,7 +20,7 @@ from toil_scripts.lib.files import move_files from toil_scripts.lib.jobs import map_job from toil_scripts.lib.urls import download_url_job, s3am_upload -from toil_scripts.tools.QC import run_fastqc +from toil_scripts.tools.qc import run_fastqc from toil_scripts.tools.aligners import run_star from toil_scripts.tools.preprocessing import cutadapt from toil_scripts.tools.quantifiers import run_kallisto, run_rsem, run_rsem_postprocess diff --git a/src/toil_scripts/tools/QC.py b/src/toil_scripts/tools/qc.py similarity index 100% rename from src/toil_scripts/tools/QC.py rename to src/toil_scripts/tools/qc.py From eed1ef884ae6121f0aac62ca845665aad11cf398 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Fri, 22 Jul 2016 20:07:07 +0000 Subject: [PATCH 16/16] Adding quinine QC functions, and quinine_pipelines (resolves #380) * Adds the quinine tool (https://github.com/bigdatagenomics/quinine) along with helper functions for running QC on targeted sequencing panels, RNA-seq data, and for estimating sample contamination. * Instantiates these as command line tools in `toil_scripts.quinine_pipelines.metrics`. --- .../quinine_pipelines/__init__.py | 0 .../quinine_pipelines/adam_helpers.py | 57 ++++ src/toil_scripts/quinine_pipelines/metrics.py | 252 ++++++++++++++++++ src/toil_scripts/tools/qc.py | 167 ++++++++++++ 4 files changed, 476 insertions(+) create mode 100644 src/toil_scripts/quinine_pipelines/__init__.py create mode 100644 src/toil_scripts/quinine_pipelines/adam_helpers.py create mode 100644 src/toil_scripts/quinine_pipelines/metrics.py diff --git a/src/toil_scripts/quinine_pipelines/__init__.py b/src/toil_scripts/quinine_pipelines/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/toil_scripts/quinine_pipelines/adam_helpers.py b/src/toil_scripts/quinine_pipelines/adam_helpers.py new file mode 100644 index 00000000..5602d290 --- /dev/null +++ b/src/toil_scripts/quinine_pipelines/adam_helpers.py @@ -0,0 +1,57 @@ +from subprocess import check_call + +from toil_scripts.adam_pipeline.adam_preprocessing import call_adam + +def __call_adam_native(cmd, memory, native_path): + ''' + Calls ADAM running in Spark local mode, where ADAM is not in a docker container. + + :param list cmd: ADAM command line arguments + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + check_call(['%s/bin/adam-submit' % native_path, + '--master', 'local[*]', + '--conf', 'spark.driver.memory=%dg' % memory, + '--'] + cmd) + + +def bam_to_adam_native(bam, parquet, memory, native_path): + ''' + Converts a BAM file into an ADAM AlignmentRecord Parquet file. + + :param str bam: Path to input SAM/BAM file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['transform', bam, parquet], memory, native_path) + + +def feature_to_adam_native(feature, parquet, memory, native_path): + ''' + Converts a feature file (e.g., BED, GTF, GFF) into an ADAM Feature Parquet + file. + + :param str feature: Path to input BED/GTF/GFF/NarrowPeak file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['features2adam', feature, parquet], memory, native_path) + + +def vcf_to_adam_native(vcf, parquet, memory, native_path): + ''' + Converts a VCF file into an ADAM Genotype Parquet file. + + :param str bam: Path to input VCF file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['vcf2adam', vcf, parquet], memory, native_path) diff --git a/src/toil_scripts/quinine_pipelines/metrics.py b/src/toil_scripts/quinine_pipelines/metrics.py new file mode 100644 index 00000000..e1cd1729 --- /dev/null +++ b/src/toil_scripts/quinine_pipelines/metrics.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python2.7 + +import argparse +import os + +from toil.job import Job + +from toil_scripts.tools.qc import ( call_quinine_af_native, + call_quinine_contamination_native, + call_quinine_hs_native, + call_quinine_rna_native ) +from toil_scripts.quinine_pipelines.adam_helpers import ( bam_to_adam_native, + feature_to_adam_native, + vcf_to_adam_native ) + +def run_rna_qc(job, + reads, transcriptome, output_path, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various RNA-seq quality control statistics. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str transcriptome: Path to the transcriptome definition (GTF/GFF). + :param str output_path: Path to write the statistics at. + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the features to ADAM format + adam_features = os.path.join(local_dir, 'transcriptome.adam') + feature_to_adam_native(transcriptome, adam_features, memory, adam_native_path) + + # run the qc job + call_quinine_rna_native(adam_reads, adam_features, output_path, + local_dir, + memory, quinine_native_path) + + +def run_targeted_qc(job, + reads, bait, targets, output_path, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various quality control statistics for reads + sequenced using a hybrid-selection panel that requires targeting. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str bait: Path to the description of the baited regions. + :param str targets: Path to the description of the regions targeted. + :param str output_path: Path to write the statistics at. + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the bait features to ADAM format + adam_bait = os.path.join(local_dir, 'bait.adam') + feature_to_adam_native(bait, adam_bait, memory, adam_native_path) + + # convert the target features to ADAM format + adam_targets = os.path.join(local_dir, 'targets.adam') + feature_to_adam_native(targets, adam_targets, memory, adam_native_path) + + # run the metrics + call_quinine_hs_native(adam_reads, adam_targets, adam_bait, output_path, + local_dir, + memory, quinine_native_path) + + +def run_contamination_estimation(job, + reads, population, sample_vcf, output_path, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various quality control statistics for reads + sequenced using a hybrid-selection panel that requires targeting. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str bait: Path to the description of the baited regions. + :param str targets: Path to the description of the regions targeted. + :param str output_path: Path to write the statistics at. + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the sample vcf to ADAM format + adam_calls = os.path.join(local_dir, 'sample.adam') + vcf_to_adam_native(sample_vcf, adam_calls, memory, adam_native_path) + + # compute MAF's + maf_annotations = os.path.join(local_dir, 'mafs.adam') + call_quinine_af_native(population, maf_annotations, + local_dir, + memory, + quinine_native_path) + + # estimate contamination + call_quinine_contamination_native(adam_reads, + adam_calls, + maf_annotations, + output_path, + local_dir, + memory, + quinine_native_path) + + +def __add_common_args(parser): + ''' + Adds commonly used arguments to a subparser. + + :param argparse.Subparser parser: A subparser to add arguments to. + ''' + + parser.add_argument('--output', + help='Location to write outputs to.', + required=True) + parser.add_argument('--memory', + help='The amount of memory to allocate, in GB. Defaults to 1.', + type=int, + default=1) + parser.add_argument('--adam_native_path', + help='The native path where ADAM is installed.' + 'Defaults to /opt/cgl-docker-lib/adam', + default='/opt/cgl-docker-lib/adam', + type=str) + parser.add_argument('--quinine_native_path', + help='The native path where Quinine is installed.' + 'Defaults to /opt/cgl-docker-lib/quinine', + default='/opt/cgl-docker-lib/quinine', + type=str) + Job.Runner.addToilOptions(parser) + + +def main(): + ''' + Parses arguments and starts the job. + ''' + + # build the argument parser + parser = argparse.ArgumentParser() + + # we run three different commands: hs, cont, rna + subparsers = parser.add_subparsers(dest='command') + parser_rna = subparsers.add_parser('rna', help='Runs the RNA QC metrics.') + parser_hs = subparsers.add_parser('targeted', + help='Runs the QC metrics for a targeted sequencing protocol.') + parser_cont = subparsers.add_parser('contamination', + help='Runs the contamination estimator.') + + # add arguments to the rna panel + parser_rna.add_argument('--reads', + help='The RNA-seq reads.', + type=str, + required=True) + parser_rna.add_argument('--transcriptome', + help='The transcriptome description (e.g., a GENCODE GTF)', + type=str, + required=True) + __add_common_args(parser_rna) + + # add arguments to the hs panel + parser_hs.add_argument('--reads', + help='The aligned reads.', + type=str, + required=True) + parser_hs.add_argument('--bait', + help='The bait used for capturing this panel.', + type=str, + required=True) + parser_hs.add_argument('--targets', + help='The regions covered by this panel.', + type=str, + required=True) + __add_common_args(parser_hs) + + # add arguments for contaimination estimation + parser_cont.add_argument('--reads', + help='The aligned reads.', + type=str, + required=True) + parser_cont.add_argument('--population', + help='The VCF to derive allele frequencies from.', + type=str, + required=True) + parser_cont.add_argument('--sample-vcf', + help='A VCF containing known genotypes for the sample.', + type=str, + required=True) + __add_common_args(parser_cont) + + # parse the arguments + args = parser.parse_args() + + # check which command got called, and set up and run + if args.command == 'rna': + Job.Runner.startToil(Job.wrapJobFn(run_rna_qc, + args.reads, + args.transcriptome, + args.output, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + elif args.command == 'targeted': + Job.Runner.startToil(Job.wrapJobFn(run_targeted_qc, + args.reads, + args.bait, + args.targets, + args.output, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + elif args.command == 'contamination': + Job.Runner.startToil(Job.wrapJobFn(run_contamination_estimation, + args.reads, + args.population, + args.sample_vcf, + args.output, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + else: + raise ValueError('Unknown command: %s' % args.command) + +if __name__ == '__main__': + main() diff --git a/src/toil_scripts/tools/qc.py b/src/toil_scripts/tools/qc.py index c4a05413..6bc18c30 100644 --- a/src/toil_scripts/tools/qc.py +++ b/src/toil_scripts/tools/qc.py @@ -1,5 +1,7 @@ import os +from subprocess import check_call +from toil_scripts.lib import require from toil_scripts.lib.files import tarball_files from toil_scripts.lib.programs import docker_call @@ -27,3 +29,168 @@ def run_fastqc(job, r1_id, r2_id): output_files = [os.path.join(work_dir, x) for x in output_names] tarball_files(tar_name='fastqc.tar.gz', file_paths=output_files, output_dir=work_dir) return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'fastqc.tar.gz')) + + +def __call_quinine(arguments, memory, + run_local=False, local_dir=None, + native_path=None, + leader_ip=None): + ''' + Run Quinine (https://github.com/bigdatagenomics/quinine). + + :param list arguments: Arguments to pass to Quinine. + :param int memory: Amount of memory in GiB to allocate per node. + :param bool run_local: If true, runs quinine in local mode. Default is false. + :param None or string local_dir: If provided, path to a local working + directory. Must be provided if run_local is true. + :param None or string native_path: If set, the path to quinine. If unset, + runs quinine via Docker. Default is None (unset). + :param None or SparkMasterAddress leader_ip: If provided, IP of the Spark + leader node. Must be provided if run_local is false. + ''' + + # TODO: factor this out into a common lib + SPARK_MASTER_PORT=7077 + HDFS_MASTER_PORT=8020 + + # validate input arguments + require((run_local and leader_ip is None) or + (not run_local and leader_ip is not None), + "Either run_local ({0}) must be set or leader_ip ({1}) must not be None.".format(run_local, + leader_ip)) + require(not run_local or local_dir is not None, + "If run_local is set, local_dir must be set.") + + # are we running locally, or not? set up master configuration + if run_local: + master = ["--master", "local[*]"] + work_dir = local_dir + else: + master = ["--master", + ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT))] + work_dir = '.' + + # set default parameters + default_params = master + [ + "--conf", ("spark.driver.memory=%dg" % memory), + "--conf", ("spark.executor.memory=%dg" % memory), + "--conf", "spark.driver.maxResultSize=0", + # set max result size to unlimited, see #177 + "--"] + + # are we running via docker or natively? + if native_path is None: + docker_call(rm=False, + tool="quay.io/ucsc_cgl/quinine", + docker_parameters=master_ip.docker_parameters(['--net=host']), + parameters=(default_params + arguments), + work_dir=work_dir, + mock=False) + else: + check_call(["%s/bin/quinine-submit" % native_path] + + default_params + + arguments) + + +def call_quinine_rna_native(reads, + transcriptome, + output, + work_dir, + memory, + native_path): + ''' + Runs quinine to compute RNA-seq QC stats. + + :param str reads: Local path to input reads. + :param str transcriptome: Local path to transcriptome definition. + :param str output: Path to write stats to. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['rnaMetrics', + reads, transcriptome, + '-statPath', output], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_hs_native(reads, + panel, + bait, + output, + work_dir, + memory, + native_path): + ''' + Runs quinine to compute stats for a hybrid-selection targeted sequencing panel. + + :param str reads: Local path to input reads. + :param str panel: Local path to definition of regions targeted by panel. + :param str bait: Local path to definition of regions tiled by bait. + :param str output: Path to write stats to. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['panelMetrics', + reads, panel, bait, + '-statPath', output + ], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_contamination_native(reads, + genotypes, + annotations, + output, + work_dir, + memory, + native_path): + ''' + Runs quinine to estimate inter-sample contamination. + + :param str reads: Local path to input reads. + :param str genotypes: Local path to genotypes. + :param str annotations: Local path to annotations. + :param str output: Path to write stats to. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['estimateContamination', + reads, genotypes, annotations, + '-statPath', output], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_af_native(vcf, + annotations, + work_dir, + memory, + native_path): + ''' + Runs quinine to estimate inter-sample contamination. + + :param str reads: Local path to input reads. + :param str vcf: Local path to input VCF. + :param str annotations: Local path to annotations. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['loadAlleleFrequency', + vcf, annotations], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path)