Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding quinine QC functions, and quinine_pipelines (resolves #380) #381

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ help:
@echo "$$help"


python=python2.7
pip=pip2.7
python=python
pip=pip
tests=src
extras=

Expand Down Expand Up @@ -76,7 +76,7 @@ clean: clean_develop clean_sdist clean_pypi

check_venv:
@$(python) -c 'import sys; sys.exit( int( not hasattr(sys, "real_prefix") ) )' \
|| ( echo "$(red)A virtualenv must be active.$(normal)" ; false )
|| ( echo "$(red)A virtualenv must be active.$(normal)" ; true )


check_clean_working_copy:
Expand Down
4 changes: 2 additions & 2 deletions jenkins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export PATH=$PATH:${PWD}/bin
# Create Toil venv
virtualenv venv
. venv/bin/activate
pip install toil==3.1.6
pip install bd2k-python-lib==1.10.dev6
pip install toil==3.3.0
pip install bd2k-python-lib==1.14a1.dev29
make develop
make test
make clean
Expand Down
10 changes: 5 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from version import version
from pkg_resources import parse_version, require, DistributionNotFound

toil_min_version = '3.1.6'
toil_max_version = '3.2.0'
bpl_min_version = '1.10.dev6'
toil_min_version = '3.3.0'
toil_max_version = '3.5.0'
bpl_min_version = '1.14a1.dev29'

# Toil version check -- Raise warning instead of using intall_requires to avoid virtualenv conflicts
try:
Expand All @@ -33,9 +33,9 @@
'http://toil.readthedocs.io/en/latest/installation.html'.format(toil_min_version))

if not parse_version(str(toil_min_version)) <= parse_version(toil_version) < parse_version(toil_max_version):
raise RuntimeError('Need Toil version within range [{},{}). Read about installing Toil at: '
raise RuntimeError('Need Toil version within range [{},{}) Currrent installed version: {}. Read about installing Toil at: '
'http://toil.readthedocs.io/en/latest/installation.html'
''.format(toil_min_version, toil_max_version))
''.format(toil_min_version, toil_max_version, toil_version))

# bd2k-python-lib check -- Raise warning instead of install_requires to avoid version conflicts with Toil
try:
Expand Down
5 changes: 2 additions & 3 deletions src/toil_scripts/adam_gatk_pipeline/align_and_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
import yaml
# import toil features
from toil.job import Job
# these don't seem necessary! but, must be imported here due to a serialization issue
from toil.lib.spark import spawn_spark_cluster

# import job steps from other toil pipelines
from toil_scripts.adam_pipeline.adam_preprocessing import * #static_adam_preprocessing_dag
Expand All @@ -128,9 +130,6 @@
from toil_scripts.gatk_processing.gatk_preprocessing import * #download_gatk_files
from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file

# these don't seem necessary! but, must be imported here due to a serialization issue
from toil_scripts.spark_utils.spawn_cluster import *

from toil_scripts.lib.programs import mock_mode

# import autoscaling tools
Expand Down
94 changes: 66 additions & 28 deletions src/toil_scripts/adam_pipeline/adam_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@
import logging
import multiprocessing
import os
import sys
import textwrap
from subprocess import check_call, check_output

import yaml
from toil.job import Job
from toil.lib.spark import spawn_spark_cluster

from toil_scripts.adam_uberscript.automated_scaling import SparkMasterAddress
from toil_scripts.lib import require
from toil_scripts.lib.files import copy_files, move_files
from toil_scripts.lib.programs import docker_call, mock_mode
from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file
from toil_scripts.spark_utils.spawn_cluster import start_spark_hdfs_cluster

SPARK_MASTER_PORT = "7077"
HDFS_MASTER_PORT = "8020"
Expand Down Expand Up @@ -109,19 +112,34 @@ def call_adam(master_ip, inputs, arguments):

:type masterIP: MasterAddress
"""
default_params = ["--master",
("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)),
if inputs.run_local:
master = ["--master", "local[*]"]
work_dir = inputs.local_dir
else:
master = ["--master",
("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)),
"--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),]
work_dir = '.'

default_params = master + [
"--conf", ("spark.driver.memory=%sg" % inputs.memory),
"--conf", ("spark.executor.memory=%sg" % inputs.memory),
"--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),
"--conf", "spark.driver.maxResultSize=0",
# set max result size to unlimited, see #177
"--"]
docker_call(rm = False,
tool = "quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80",
docker_parameters = master_ip.docker_parameters(["--net=host"]),
parameters = default_params + arguments,
mock=False)

# are we running adam via docker, or do we have a native path?
if inputs.native_adam_path is None:
docker_call(rm=False,
tool="quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80",
docker_parameters=master_ip.docker_parameters(["--net=host"]),
parameters=(default_params + arguments),
work_dir=work_dir,
mock=False)
else:
check_call(["%s/bin/adam-submit" % inputs.native_adam_path] +
default_params +
arguments)


def remove_file(master_ip, filename, spark_on_toil):
Expand Down Expand Up @@ -264,6 +282,7 @@ def upload_data(master_ip, inputs, hdfs_name, upload_name, spark_on_toil):

log.info("Uploading output BAM %s to %s.", hdfs_name, upload_name)
call_conductor(master_ip, inputs, hdfs_name, upload_name)
remove_file(master_ip, hdfs_name, spark_on_toil)


def download_run_and_upload(job, master_ip, inputs, spark_on_toil):
Expand All @@ -275,28 +294,45 @@ def download_run_and_upload(job, master_ip, inputs, spark_on_toil):

bam_name = inputs.sample.split('://')[-1].split('/')[-1]
sample_name = ".".join(os.path.splitext(bam_name)[:-1])

hdfs_subdir = sample_name + "-dir"
hdfs_dir = "hdfs://{0}:{1}/{2}".format(master_ip, HDFS_MASTER_PORT, hdfs_subdir)

if inputs.run_local:
inputs.local_dir = job.fileStore.getLocalTempDir()
if inputs.native_adam_path is None:
hdfs_dir = "/data/"
else:
hdfs_dir = inputs.local_dir
else:
hdfs_dir = "hdfs://{0}:{1}/{2}".format(master_ip, HDFS_MASTER_PORT, hdfs_subdir)

try:
hdfs_prefix = hdfs_dir + "/" + sample_name
hdfs_bam = hdfs_dir + "/" + bam_name

hdfs_snps = hdfs_dir + "/" + inputs.dbsnp.split('://')[-1].split('/')[-1]

download_data(master_ip, inputs, inputs.dbsnp, inputs.sample, hdfs_snps, hdfs_bam)
if not inputs.run_local:
download_data(master_ip, inputs, inputs.dbsnp, inputs.sample, hdfs_snps, hdfs_bam)
else:
copy_files([inputs.sample, inputs.dbsnp], inputs.local_dir)

adam_input = hdfs_prefix + ".adam"
adam_snps = hdfs_dir + "/snps.var.adam"
adam_convert(master_ip, inputs, hdfs_bam, hdfs_snps, adam_input, adam_snps, spark_on_toil)

adam_output = hdfs_prefix + ".processed.adam"
adam_output = hdfs_prefix + ".processed.bam"
adam_transform(master_ip, inputs, adam_input, adam_snps, hdfs_dir, adam_output, spark_on_toil)

out_file = inputs.output_dir + "/" + sample_name + inputs.suffix + ".bam"

upload_data(master_ip, inputs, adam_output, out_file, spark_on_toil)
if not inputs.run_local:
upload_data(master_ip, inputs, adam_output, out_file, spark_on_toil)
else:
local_adam_output = "%s/%s.processed.bam" % (inputs.local_dir, sample_name)
move_files([local_adam_output], inputs.output_dir)

remove_file(master_ip, hdfs_subdir, spark_on_toil)
except:
remove_file(master_ip, hdfs_subdir, spark_on_toil)
raise
Expand All @@ -310,8 +346,8 @@ def static_adam_preprocessing_dag(job, inputs, sample, output_dir, suffix=''):
inputs.output_dir = output_dir
inputs.suffix = suffix

if inputs.master_ip:
if inputs.master_ip == 'auto':
if inputs.master_ip is not None or inputs.run_local:
if not inputs.run_local and inputs.master_ip == 'auto':
# Static, standalone Spark cluster managed by uberscript
spark_on_toil = False
scale_up = job.wrapJobFn(scale_external_spark_cluster, 1)
Expand All @@ -331,15 +367,14 @@ def static_adam_preprocessing_dag(job, inputs, sample, output_dir, suffix=''):
# Dynamic subclusters, i.e. Spark-on-Toil
spark_on_toil = True
cores = multiprocessing.cpu_count()
start_cluster = job.wrapJobFn(start_spark_hdfs_cluster,
inputs.num_nodes-1,
inputs.memory,
download_run_and_upload,
jArgs=(inputs, spark_on_toil),
jCores=cores,
jMemory="%s G" %
inputs.memory).encapsulate()
job.addChild(start_cluster)
master_ip = spawn_spark_cluster(job,
False, # Sudo
inputs.num_nodes-1,
cores=cores,
memory=inputs.memory)
spark_work = job.wrapJobFn(download_run_and_upload,
master_ip, inputs, spark_on_toil)
job.addChild(spark_work)


def scale_external_spark_cluster(num_samples=1):
Expand Down Expand Up @@ -368,6 +403,9 @@ def generate_config():
dbsnp: # Required: The full s3 url of a VCF file of known snps
memory: # Required: Amount of memory to allocate for Spark Driver and executor.
# This should be equal to the available memory on each worker node.
run-local: # Optional: If true, runs ADAM locally and doesn't connect to a cluster.
local-dir: # Required if run-local is true. Sets the local directory to use for input.
native-adam-path: # Optional: If set, runs ADAM using the local build of ADAM at this path.
"""[1:])


Expand All @@ -382,8 +420,8 @@ def main():
parser_run.add_argument('--config', default='adam_preprocessing.config', type=str,
help='Path to the (filled in) config file, generated with "generate-config". '
'\nDefault value: "%(default)s"')
parser_run.add_argument('--sample', help='The full s3 url of the input SAM or BAM file')
parser_run.add_argument('--output-dir', default=None,
parser_run.add_argument('--sample', help='The full s3 url/path to the input SAM or BAM file')
parser_run.add_argument('--output-dir', required=True, default=None,
help='full path where final results will be output')
parser_run.add_argument('-s', '--suffix', default='',
help='Additional suffix to add to the names of the output files')
Expand Down Expand Up @@ -412,8 +450,8 @@ def main():
for arg in [inputs.dbsnp, inputs.memory]:
require(arg, 'Required argument {} missing from config'.format(arg))

Job.Runner.startToil(Job.wrapJobFn(static_adam_preprocessing_dag, inputs,
args.sample, args.output_dir), args)
Job.Runner.startToil(Job.wrapJobFn(static_adam_preprocessing_dag, inputs,
args.sample, args.output_dir), args)

if __name__ == "__main__":
main()
30 changes: 25 additions & 5 deletions src/toil_scripts/lib/files.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from contextlib import closing
import os
import tarfile
import shutil
import tarfile


def tarball_files(tar_name, file_paths, output_dir='.', prefix=''):
Expand All @@ -21,18 +21,38 @@ def tarball_files(tar_name, file_paths, output_dir='.', prefix=''):
f_out.add(file_path, arcname=arcname)


def move_files(file_paths, output_dir):
def __forall_files(file_paths, output_dir, op):
"""
Moves files from the working directory to the output directory.
Applies a function to a set of files and an output directory.

:param str output_dir: Output directory
:param list[str] file_paths: Absolute file paths to move
"""
for file_path in file_paths:
if not file_path.startswith('/'):
raise ValueError('Path provided is relative not absolute.')
raise ValueError('Path provided (%s) is relative not absolute.' % file_path)
dest = os.path.join(output_dir, os.path.basename(file_path))
shutil.move(file_path, dest)
op(file_path, dest)


def move_files(file_paths, output_dir):
"""
Moves files from the working directory to the output directory.

:param str output_dir: Output directory
:param list[str] file_paths: Absolute file paths to move
"""
__forall_files(file_paths, output_dir, shutil.move)


def copy_files(file_paths, output_dir):
"""
Moves files from the working directory to the output directory.

:param str output_dir: Output directory
:param list[str] file_paths: Absolute file paths to move
"""
__forall_files(file_paths, output_dir, shutil.copy)


def consolidate_tarballs_job(job, fname_to_id):
Expand Down
9 changes: 6 additions & 3 deletions src/toil_scripts/lib/test/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os
from toil.job import Job
from toil_scripts.lib import get_work_directory


def test_sample_batcher(tmpdir):
def test_map_job(tmpdir):
from toil_scripts.lib.jobs import map_job
options = Job.Runner.getDefaultOptions(os.path.join(str(tmpdir), 'test_store'))
work_dir = get_work_directory()
options = Job.Runner.getDefaultOptions(os.path.join(work_dir, 'test_store'))
options.workDir = work_dir
samples = [x for x in xrange(200)]
j = Job.wrapJobFn(map_job, _test_batch, samples, 'a', 'b', 'c')
j = Job.wrapJobFn(map_job, _test_batch, samples, 'a', 'b', 'c', disk='1K')
Job.Runner.startToil(j, options)


Expand Down
57 changes: 57 additions & 0 deletions src/toil_scripts/quinine_pipelines/adam_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from subprocess import check_call

from toil_scripts.adam_pipeline.adam_preprocessing import call_adam

def __call_adam_native(cmd, memory, native_path):
'''
Calls ADAM running in Spark local mode, where ADAM is not in a docker container.

:param list<str> cmd: ADAM command line arguments
:param int memory: Amount of memory in GB to allocate.
:param str native_path: String path to the ADAM install directory.
'''

check_call(['%s/bin/adam-submit' % native_path,
'--master', 'local[*]',
'--conf', 'spark.driver.memory=%dg' % memory,
'--'] + cmd)


def bam_to_adam_native(bam, parquet, memory, native_path):
'''
Converts a BAM file into an ADAM AlignmentRecord Parquet file.

:param str bam: Path to input SAM/BAM file.
:param str parquet: Path to save Parquet file at.
:param int memory: Amount of memory in GB to allocate.
:param str native_path: String path to the ADAM install directory.
'''

__call_adam_native(['transform', bam, parquet], memory, native_path)


def feature_to_adam_native(feature, parquet, memory, native_path):
'''
Converts a feature file (e.g., BED, GTF, GFF) into an ADAM Feature Parquet
file.

:param str feature: Path to input BED/GTF/GFF/NarrowPeak file.
:param str parquet: Path to save Parquet file at.
:param int memory: Amount of memory in GB to allocate.
:param str native_path: String path to the ADAM install directory.
'''

__call_adam_native(['features2adam', feature, parquet], memory, native_path)


def vcf_to_adam_native(vcf, parquet, memory, native_path):
'''
Converts a VCF file into an ADAM Genotype Parquet file.

:param str bam: Path to input VCF file.
:param str parquet: Path to save Parquet file at.
:param int memory: Amount of memory in GB to allocate.
:param str native_path: String path to the ADAM install directory.
'''

__call_adam_native(['vcf2adam', vcf, parquet], memory, native_path)
Loading