-
Notifications
You must be signed in to change notification settings - Fork 12
/
rna-seek
executable file
·2149 lines (1875 loc) · 91.8 KB
/
rna-seek
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""RNA-seek: an highly reproducible and portable RNA-seq pipeline
About:
This is the main entry for the RNA-seek pipeline.
USAGE:
$ rna-seek <run|build|unlock|cache> [OPTIONS]
Example:
$ rna-seek run --input .tests/*.R?.fastq.gz --output /data/$USER/RNA_hg38 --genome hg38_30 --mode slurm
"""
# Python standard library
from __future__ import print_function
from shutil import copy, copytree
import sys, os, subprocess, re, json, textwrap
# 3rd party imports from pypi
import argparse # potential python3 3rd party package, added in python/3.5
# Pipeline Metadata and globals
__author__ = 'Skyler Kuhn'
__version__ = 'v1.9.5'
__email__ = '[email protected]'
__home__ = os.path.dirname(os.path.abspath(__file__))
_name = os.path.basename(sys.argv[0])
_description = 'a highly-reproducible RNA-seq pipeline'
class Colors():
"""Class encoding for ANSI escape sequeces for styling terminal text.
Any string that is formatting with these styles must be terminated with
the escape sequence, i.e. `Colors.end`.
"""
# Escape sequence
end = '\33[0m'
# Formatting options
bold = '\33[1m'
italic = '\33[3m'
url = '\33[4m'
blink = '\33[5m'
higlighted = '\33[7m'
# Text Colors
black = '\33[30m'
red = '\33[31m'
green = '\33[32m'
yellow = '\33[33m'
blue = '\33[34m'
pink = '\33[35m'
cyan = '\33[96m'
white = '\33[37m'
# Background fill colors
bg_black = '\33[40m'
bg_red = '\33[41m'
bg_green = '\33[42m'
bg_yellow = '\33[43m'
bg_blue = '\33[44m'
bg_pink = '\33[45m'
bg_cyan = '\33[46m'
bg_white = '\33[47m'
def err(*message, **kwargs):
"""Prints any provided args to standard error.
kwargs can be provided to modify print functions
behavior.
@param message <any>:
Values printed to standard error
@params kwargs <print()>
Key words to modify print function behavior
"""
print(*message, file=sys.stderr, **kwargs)
def fatal(*message, **kwargs):
"""Prints any provided args to standard error
and exits with an exit code of 1.
@param message <any>:
Values printed to standard error
@params kwargs <print()>
Key words to modify print function behavior
"""
err(*message, **kwargs)
sys.exit(1)
def exists(testpath):
"""Checks if file exists on the local filesystem.
@param parser <argparse.ArgumentParser() object>:
argparse parser object
@param testpath <str>:
Name of file/directory to check
@return does_exist <boolean>:
True when file/directory exists, False when file/directory does not exist
"""
does_exist = True
if not os.path.exists(testpath):
does_exist = False # File or directory does not exist on the filesystem
return does_exist
def exe_in_path(cmd, path=None):
"""Checks if an executable is in $PATH
@param cmd <str>:
Name of executable to check
@param path <list>:
Optional list of PATHs to check [default: $PATH]
@return <boolean>:
True if exe in PATH, False if not in PATH
"""
if path is None:
path = os.environ["PATH"].split(os.pathsep)
for prefix in path:
filename = os.path.join(prefix, cmd)
executable = os.access(filename, os.X_OK)
is_not_directory = os.path.isfile(filename)
if executable and is_not_directory:
return True
return False
def require(cmds, suggestions, path=None):
"""Enforces an executable is in $PATH
@param cmds list[<str>]:
List of executable names to check
@param suggestions list[<str>]:
Name of module to suggest loading for a given index
in param cmd.
@param path list[<str>]]:
Optional list of PATHs to check [default: $PATH]
@return None
"""
error = False
for i in range(len(cmds)):
available = exe_in_path(cmds[i])
if not available:
c = Colors
error = True
err(
"\n{0}{1}Fatal: {2} is not in $PATH and is required during runtime!{3}".format(c.bg_red, c.white, cmds[i], c.end),
"\n └── Possible solution: please 'module load {0}' and run again!".format(suggestions[i])
)
if error: fatal()
return
def permissions(parser, filename, *args, **kwargs):
"""Checks permissions using os.access() to see the user is authorized to access
a file/directory. Checks for existence, readability, writability and executability via:
os.F_OK (tests existence), os.R_OK (tests read), os.W_OK (tests write), os.X_OK (tests exec).
@param parser <argparse.ArgumentParser() object>:
Argparse parser object
@param filename <str>:
Name of file to check
@return filename <str>:
If file exists and user can read from file
"""
if not exists(filename):
parser.error("File '{}' does not exists! Failed to provide vaild input.".format(filename))
if not os.access(filename, *args, **kwargs):
parser.error("File '{}' exists, but cannot read file due to permissions!".format(filename))
return filename
def check_cache(parser, cache, *args, **kwargs):
"""Check if provided SINGULARITY_CACHE is valid. Singularity caches cannot be
shared across users (and must be owned by the user). Singularity strictly enforces
0700 user permission on on the cache directory and will return a non-zero exitcode.
@param parser <argparse.ArgumentParser() object>:
Argparse parser object
@param cache <str>:
Singularity cache directory
@return cache <str>:
If singularity cache dir is valid
"""
if not exists(cache):
# Cache directory does not exist on filesystem
os.makedirs(cache)
elif os.path.isfile(cache):
# Cache directory exists as file, raise error
parser.error("""\n\t\x1b[6;37;41mFatal: Failed to provided a valid singularity cache!\x1b[0m
The provided --singularity-cache already exists on the filesystem as a file.
Please run {} again with a different --singularity-cache location.
""".format(sys.argv[0]))
elif os.path.isdir(cache):
# Provide cache exists as directory
# Check that the user owns the child cache directory
# May revert to os.getuid() if user id is not sufficent
if exists(os.path.join(cache, 'cache')) and os.stat(os.path.join(cache, 'cache')).st_uid != os.getuid():
# User does NOT own the cache directory, raise error
parser.error("""\n\t\x1b[6;37;41mFatal: Failed to provided a valid singularity cache!\x1b[0m
The provided --singularity-cache already exists on the filesystem with a different owner.
Singularity strictly enforces that the cache directory is not shared across users.
Please run {} again with a different --singularity-cache location.
""".format(sys.argv[0]))
return cache
def _cp_r_safe_(source, target, resources = []):
"""Private function: Given a list paths it will recursively copy each to the
target location. If a target path already exists, it will NOT over-write the
existing paths data.
@param resources <list[str]>:
List of paths to copy over to target location
@params source <str>:
Add a prefix PATH to each resource
@param target <str>:
Target path to copy templates and required resources
"""
for resource in resources:
destination = os.path.join(target, resource)
if not exists(destination):
# Required resources do not exist
copytree(os.path.join(source, resource), destination)
def rename(filename):
"""Dynamically renames FastQ file to have one of the following extensions: *.R1.fastq.gz, *.R2.fastq.gz
To automatically rename the fastq files, a few assumptions are made. If the extension of the
FastQ file cannot be infered, an exception is raised telling the user to fix the filename
of the fastq files.
@param filename <str>:
Original name of file to be renamed
@return filename <str>:
A renamed FastQ filename
"""
# Covers common extensions from SF, SRA, EBI, TCGA, and external sequencing providers
# key = regex to match string and value = how it will be renamed
extensions = {
# Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc.
".R1.f(ast)?q.gz$": ".R1.fastq.gz",
".R2.f(ast)?q.gz$": ".R2.fastq.gz",
# Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc.
# Capture lane information as named group
".R1.(?P<lane>...).f(ast)?q.gz$": ".R1.fastq.gz",
".R2.(?P<lane>...).f(ast)?q.gz$": ".R2.fastq.gz",
# Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc.
"_1.f(ast)?q.gz$": ".R1.fastq.gz",
"_2.f(ast)?q.gz$": ".R2.fastq.gz"
}
if filename.endswith('.R1.fastq.gz') or filename.endswith('.R2.fastq.gz'):
# Filename is already in the correct format
return filename
converted = False
for regex, new_ext in extensions.items():
matched = re.search(regex, filename)
if matched:
# regex matches with a pattern in extensions
converted = True
# Try to get substring for named group lane, retain this in new file extension
# Come back to this later, I am not sure if this is necessary
# That string maybe static (i.e. always the same)
# https://support.illumina.com/help/BaseSpace_OLH_009008/Content/Source/Informatics/BS/NamingConvention_FASTQ-files-swBS.htm#
try: new_ext = "_{}{}".format(matched.group('lane'), new_ext)
except IndexError: pass # Does not contain the named group lane
filename = re.sub(regex, new_ext, filename)
break # only rename once
if not converted:
raise NameError("""\n\tFatal: Failed to rename provided input '{}'!
Cannot determine the extension of the user provided input file.
Please rename the file list above before trying again.
Here is example of acceptable input file extensions:
sampleName.R1.fastq.gz sampleName.R2.fastq.gz
sampleName_R1_001.fastq.gz sampleName_R2_001.fastq.gz
sampleName_1.fastq.gz sampleName_2.fastq.gz
Please also check that your input files are gzipped?
If they are not, please gzip them before proceeding again.
""".format(filename, sys.argv[0])
)
return filename
def _sym_safe_(input_data, target):
"""Creates re-named symlinks for each FastQ file provided
as input. If a symlink already exists, it will not try to create a new symlink.
If relative source PATH is provided, it will be converted to an absolute PATH.
@param input_data <list[<str>]>:
List of input files to symlink to target location
@param target <str>:
Target path to copy templates and required resources
@return input_fastqs list[<str>]:
List of renamed input FastQs
"""
input_fastqs = [] # store renamed fastq file names
for file in input_data:
filename = os.path.basename(file)
renamed = os.path.join(target, rename(filename))
input_fastqs.append(renamed)
if not exists(renamed):
# Create a symlink if it does not already exist
# Follow source symlinks to resolve any binding issues
os.symlink(os.path.abspath(os.path.realpath(file)), renamed)
return input_fastqs
def initialize(sub_args, repo_path, output_path):
"""Initialize the output directory and copy over required pipeline resources.
If user provides a output directory path that already exists on the filesystem
as a file (small chance of happening but possible), a OSError is raised. If the
output directory PATH already EXISTS, it will not try to create the directory.
If a resource also already exists in the output directory (i.e. output/workflow),
it will not try to copy over that directory. In the future, it maybe worth adding
an optional cli arg called --force, that can modifiy this behavior. Returns a list
of renamed FastQ files (i.e. renamed symlinks).
@param sub_args <parser.parse_args() object>:
Parsed arguments for run sub-command
@param repo_path <str>:
Path to RNA-seek source code and its templates
@param output_path <str>:
Pipeline output path, created if it does not exist
@return inputs list[<str>]:
List of pipeline's input FastQ files
"""
if not exists(output_path):
# Pipeline output directory does not exist on filesystem
os.makedirs(output_path)
elif exists(output_path) and os.path.isfile(output_path):
# Provided Path for pipeline output directory exists as file
raise OSError("""\n\tFatal: Failed to create provided pipeline output directory!
User provided --output PATH already exists on the filesystem as a file.
Please run {} again with a different --output PATH.
""".format(sys.argv[0])
)
# Copy over templates are other required resources
required_resources = ['workflow', 'resources', 'config']
_cp_r_safe_(source = repo_path, target = output_path, resources = required_resources)
# Create renamed symlinks to rawdata
inputs = _sym_safe_(input_data = sub_args.input, target = output_path)
return inputs
def join_jsons(templates):
"""Joins multiple JSON files to into one data structure
Used to join multiple template JSON files to create a global config dictionary.
@params templates <list[str]>:
List of template JSON files to join together
@return aggregated <dict>:
Dictionary containing the contents of all the input JSON files
"""
# Get absolute PATH to templates in rna-seek git repo
repo_path = os.path.dirname(os.path.abspath(__file__))
aggregated = {}
for file in templates:
with open(os.path.join(repo_path, file), 'r') as fh:
aggregated.update(json.load(fh))
return aggregated
def add_user_information(config):
"""Adds username and user's home directory to config.
@params config <dict>:
Config dictionary containing metadata to run pipeline
@return config <dict>:
Updated config dictionary containing user information (username and home directory)
"""
# Get PATH to user's home directory
# Method is portable across unix-like OS and Windows
home = os.path.expanduser("~")
# Get username from home directory PATH
username = os.path.split(home)[-1]
# Update config with home directory and username
config['project']['userhome'] = home
config['project']['username'] = username
return config
def get_nends(ifiles):
"""Determines whether the dataset is paired-end or single-end.
If paired-end data, checks to see if both mates (R1 and R2) are present for each sample.
If single-end, nends is set to 1. Else if paired-end, nends is set to 2.
@params ifiles list[<str>]:
List containing pipeline input files (renamed symlinks)
@return nends_status <int>:
Integer reflecting nends status: 1 = se, 2 = pe
"""
# Determine if dataset contains paired-end data
paired_end = False
nends_status = 1
for file in ifiles:
if file.endswith('.R2.fastq.gz'):
paired_end = True
nends_status = 2
break # dataset is paired-end
# Check to see if both mates (R1 and R2) are present paired-end data
if paired_end:
nends = {} # keep count of R1 and R2 for each sample
for file in ifiles:
# Split sample name on file extension
sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0]
if sample not in nends:
nends[sample] = 0
nends[sample] += 1
# Check if samples contain both read mates
missing_mates = [sample for sample, count in nends.items() if count == 1]
if missing_mates:
# Missing an R1 or R2 for a provided input sample
raise NameError("""\n\tFatal: Detected pair-end data but user failed to provide
both mates (R1 and R2) for the following samples:\n\t\t{}\n
Please check that the basename for each sample is consistent across mates.
Here is an example of a consistent basename across mates:
consistent_basename.R1.fastq.gz
consistent_basename.R2.fastq.gz
Please do not run the pipeline with a mixture of single-end and paired-end
samples. This feature is currently not supported within {}, and it is
not recommended either. If this is a priority for your project, please run
paired-end samples and single-end samples separately (in two separate output directories).
If you feel like this functionality should exist, feel free to open an issue on Github.
""".format(missing_mates, sys.argv[0])
)
return nends_status
def get_fastq_screen_paths(fastq_screen_confs, match = 'DATABASE', file_index = -1):
"""Parses fastq_screen.conf files to get the paths of each fastq_screen database.
This path contains bowtie2 indices for reference genome to screen against.
The paths are added as singularity bind points.
@param fastq_screen_confs list[<str>]:
Name of fastq_screen config files to parse
@param match <string>:
Keyword to indicate a line match [default: 'DATABASE']
@param file_index <int>:
Index of line line containing the fastq_screen database path
@return list[<str>]:
Returns a list of fastq_screen database paths
"""
databases = []
for file in fastq_screen_confs:
with open(file, 'r') as fh:
for line in fh:
if line.startswith(match):
db_path = line.strip().split()[file_index]
databases.append(db_path)
return databases
def get_rawdata_bind_paths(input_files):
"""
Gets rawdata bind paths of user provided fastq files.
@params input_files list[<str>]:
List containing user-provided input fastq files
@return bindpaths <set>:
Set of rawdata bind paths
"""
bindpaths = []
for file in input_files:
# Get directory of input file
rawdata_src_path = os.path.dirname(os.path.abspath(os.path.realpath(file)))
if rawdata_src_path not in bindpaths:
bindpaths.append(rawdata_src_path)
return bindpaths
def add_sample_metadata(input_files, config, group=None):
"""Adds sample metadata such as sample basename, label, and group information.
If sample sheet is provided, it will default to using information in that file.
If no sample sheet is provided, it will only add sample basenames and labels.
@params input_files list[<str>]:
List containing pipeline input fastq files
@params config <dict>:
Config dictionary containing metadata to run pipeline
@params group <str>:
Sample sheet containing basename, group, and label for each sample
@return config <dict>:
Updated config with basenames, labels, and groups (if provided)
"""
# TODO: Add functionality for basecase when user has samplesheet
added = []
for file in input_files:
# Split sample name on file extension
sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0]
if sample not in added:
# Only add PE sample information once
added.append(sample)
config['project']['groups']['rsamps'].append(sample)
config['project']['groups']['rlabels'].append(sample)
return config
def add_rawdata_information(sub_args, config, ifiles):
"""Adds information about rawdata provided to pipeline.
Determines whether the dataset is paired-end or single-end and finds the set of all
rawdata directories (needed for -B option when running singularity). If a user provides
paired-end data, checks to see if both mates (R1 and R2) are present for each sample.
@param sub_args <parser.parse_args() object>:
Parsed arguments for run sub-command
@params ifiles list[<str>]:
List containing pipeline input files (renamed symlinks)
@params config <dict>:
Config dictionary containing metadata to run pipeline
@return config <dict>:
Updated config dictionary containing user information (username and home directory)
"""
# Determine whether dataset is paired-end or single-ends
# Updates config['project']['nends']: 1 = single-end, 2 = paired-end
nends = get_nends(ifiles) # Checks PE data for both mates (R1 and R2)
config['project']['nends'] = nends
# Finds the set of rawdata directories to bind
rawdata_paths = get_rawdata_bind_paths(input_files = sub_args.input)
config['project']['datapath'] = ','.join(rawdata_paths)
# Add each sample's basename, label and group info
config = add_sample_metadata(input_files = ifiles, config = config)
return config
def image_cache(sub_args, config):
"""Adds Docker Image URIs, or SIF paths to config if singularity cache option is provided.
If singularity cache option is provided and a local SIF does not exist, a warning is
displayed and the image will be pulled from URI in 'config/containers/images.json'.
@param sub_args <parser.parse_args() object>:
Parsed arguments for run sub-command
@params config <file>:
Docker Image config file
@return config <dict>:
Updated config dictionary containing user information (username and home directory)
"""
# Get absolute PATH to templates in rna-seek git repo
repo_path = os.path.dirname(os.path.abspath(__file__))
images = os.path.join(sub_args.output, 'config','containers', 'images.json')
# Read in config for docker image uris
with open(images, 'r') as fh:
data = json.load(fh)
# Check if local sif exists
for image, uri in data['images'].items():
if sub_args.sif_cache:
sif = os.path.join(sub_args.sif_cache, '{}.sif'.format(os.path.basename(uri).replace(':', '_')))
if not exists(sif):
# If local sif does not exist on in cache, print warning
# and default to pulling from URI in config/containers/images.json
print('Warning: Local image "{}" does not exist in singularity cache'.format(sif), file=sys.stderr)
else:
# Change pointer to image from Registry URI to local SIF
data['images'][image] = sif
config.update(data)
return config
def get_repo_git_commit_hash(repo_path):
"""Gets the git commit hash of the RNA-seek repo.
@param repo_path <str>:
Path to RNA-seek git repo
@return githash <str>:
Latest git commit hash
"""
try:
githash = subprocess.check_output(['git', 'rev-parse', 'HEAD'], stderr=subprocess.STDOUT, cwd = repo_path).strip().decode('utf-8')
# Typecast to fix python3 TypeError (Object of type bytes is not JSON serializable)
# subprocess.check_output() returns a byte string
githash = str(githash)
except Exception as e:
# Github releases are missing the .git directory,
# meaning you cannot get a commit hash, set the
# commit hash to indicate its from a GH release
githash = 'github_release'
return githash
def setup(sub_args, ifiles, repo_path, output_path):
"""Setup the pipeline for execution and creates config file from templates
@param sub_args <parser.parse_args() object>:
Parsed arguments for run sub-command
@param repo_path <str>:
Path to RNA-seek source code and its templates
@param output_path <str>:
Pipeline output path, created if it does not exist
@return config <dict>:
Config dictionary containing metadata to run the pipeline
"""
# Resolves PATH to template for genomic reference files to select from a
# bundled reference genome or a user generated reference genome built via
# rna-seek build subcommand
genome_config = os.path.join(output_path, 'config','genomes', sub_args.genome + '.json')
if sub_args.genome.endswith('.json'):
# Provided a custom reference genome generated by rna-seek build
genome_config = os.path.abspath(sub_args.genome)
required = {
# Template for project-level information
"project": os.path.join(output_path, 'config','templates', 'project.json'),
# Template for genomic reference files
# User provided argument --genome is used to select the template
"genome": genome_config,
# Template for tool information
"tools": os.path.join(output_path, 'config','templates', 'tools.json'),
}
# Global config file for pipeline, config.json
config = join_jsons(required.values()) # uses templates in the rna-seek repo
config = add_user_information(config)
config = add_rawdata_information(sub_args, config, ifiles)
# Resolves if an image needs to be pulled from an OCI registry or
# a local SIF generated from the rna-seek cache subcommand exists
config = image_cache(sub_args, config)
# Add other cli collected info
config['project']['annotation'] = sub_args.genome
config['project']['version'] = __version__
config['project']['workpath'] = os.path.abspath(sub_args.output)
# Add optional cli workflow steps
config['options'] = {}
config['options']['star_2_pass_basic'] = sub_args.star_2_pass_basic
config['options']['small_rna'] = sub_args.small_rna
config['options']['tmp_dir'] = sub_args.tmp_dir
config['options']['shared_resources'] = sub_args.shared_resources
config['options']['prokaryote'] = sub_args.prokaryote
# Get latest git commit hash
git_hash = get_repo_git_commit_hash(repo_path)
config['project']['git_commit_hash'] = git_hash
if sub_args.shared_resources:
# Update paths to shared resources directory
config['bin']['rnaseq']['tool_parameters']['FASTQ_SCREEN_CONFIG'] = os.path.join(sub_args.shared_resources, "fastq_screen_db", "fastq_screen_p1.conf")
config['bin']['rnaseq']['tool_parameters']['FASTQ_SCREEN_CONFIG2'] = os.path.join(sub_args.shared_resources, "fastq_screen_db", "fastq_screen_p2.conf")
config['bin']['rnaseq']['tool_parameters']['KRAKENBACDB'] = os.path.join(sub_args.shared_resources, "20180907_standard_kraken2")
# Save config to output directory
print("\nGenerating config file in '{}'... ".format(os.path.join(output_path, 'config.json')), end = "")
# print(json.dumps(config, indent = 4, sort_keys=True))
with open(os.path.join(output_path, 'config.json'), 'w') as fh:
json.dump(config, fh, indent = 4, sort_keys = True)
print("Done!")
return config
def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Snakefile')):
"""Dryruns the pipeline to ensure there are no errors prior to runnning.
@param outdir <str>:
Pipeline output PATH
@return dryrun_output <str>:
Byte string representation of dryrun command
"""
try:
dryrun_output = subprocess.check_output([
'snakemake', '-npr',
'-s', str(snakefile),
'--use-singularity',
'--rerun-incomplete',
'--cores', '4',
'--configfile={}'.format(config)
], cwd = outdir,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
# Singularity is NOT in $PATH
# Tell user to load both main dependencies to avoid the OSError below
print('Are singularity and snakemake in your PATH? Please check before proceeding again!')
sys.exit("{}\n{}".format(e, e.output.decode("utf-8")))
except OSError as e:
# Catch: OSError: [Errno 2] No such file or directory
# Occurs when command returns a non-zero exit-code
if e.errno == 2 and not exe_in_path('snakemake'):
# Failure caused because snakemake is NOT in $PATH
print('\x1b[6;37;41m\nError: Are snakemake AND singularity in your $PATH?\nPlease check before proceeding again!\x1b[0m', file=sys.stderr)
sys.exit("{}".format(e))
else:
# Failure caused by unknown cause, raise error
raise e
return dryrun_output
def orchestrate(
mode, outdir, additional_bind_paths, alt_cache, threads=2,
submission_script='runner', masterjob='pl:rna-seek',
tmp_dir = '/lscratch/$SLURM_JOBID/'
):
"""Runs RNA-seek pipeline via selected executor: local or slurm.
If 'local' is selected, the pipeline is executed locally on a compute node/instance.
If 'slurm' is selected, jobs will be submited to the cluster using SLURM job scheduler.
Support for additional job schedulers (i.e. PBS, SGE, LSF) may be added in the future.
@param outdir <str>:
Pipeline output PATH
@param mode <str>:
Execution method or mode:
local runs serially a compute instance without submitting to the cluster.
slurm will submit jobs to the cluster using the SLURM job scheduler.
@param additional_bind_paths <str>:
Additional paths to bind to container filesystem (i.e. input file paths)
@param alt_cache <str>:
Alternative singularity cache location
@param threads <str>:
Number of threads to use for local execution method
@param submission_script <str>:
Path to master jobs submission script:
rna-seek run = /path/to/output/resources/runner
rna-seek build = /path/to/output/resources/builder
@param masterjob <str>:
Name of the master job
@return masterjob <subprocess.Popen() object>:
"""
# Add additional singularity bind PATHs
# to mount the local filesystem to the
# containers filesystem, NOTE: these
# PATHs must be an absolute PATHs
outdir = os.path.abspath(outdir)
# Add any default PATHs to bind to
# the container's filesystem, like
# tmp directories, /lscratch
addpaths = []
temp = os.path.dirname(tmp_dir.rstrip('/'))
if temp == os.sep:
temp = tmp_dir.rstrip('/')
if outdir not in additional_bind_paths.split(','):
addpaths.append(outdir)
if temp not in additional_bind_paths.split(','):
addpaths.append(temp)
bindpaths = ','.join(addpaths)
# Set ENV variable 'SINGULARITY_CACHEDIR'
# to output directory
my_env = {}; my_env.update(os.environ)
cache = os.path.join(outdir, ".singularity")
my_env['SINGULARITY_CACHEDIR'] = cache
if alt_cache:
# Override the pipeline's default cache location
my_env['SINGULARITY_CACHEDIR'] = alt_cache
cache = alt_cache
if additional_bind_paths:
# Add Bind PATHs for outdir and tmp dir
if bindpaths:
bindpaths = ",{}".format(bindpaths)
bindpaths = "{}{}".format(additional_bind_paths,bindpaths)
if not exists(os.path.join(outdir, 'logfiles')):
# Create directory for logfiles
os.makedirs(os.path.join(outdir, 'logfiles'))
# Create .singularity directory for installations of snakemake
# without setuid which create a sandbox in the SINGULARITY_CACHEDIR
if not exists(cache):
# Create directory for sandbox and image layers
os.makedirs(cache)
# Run on compute node or instance without submitting jobs to a scheduler
if mode == 'local':
# Run RNA-seek: instantiate main/master process
# Look into later: it maybe worth replacing Popen subprocess with a direct
# snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html
# Create log file for pipeline
logfh = open(os.path.join(outdir, 'logfiles', 'snakemake.log'), 'w')
masterjob = subprocess.Popen([
'snakemake', '-pr',
'--use-singularity',
'--singularity-args', "'-B {}'".format(bindpaths),
'--cores', str(threads),
'--configfile=config.json'
], cwd = outdir, env=my_env)
# Submitting jobs to cluster via SLURM's job scheduler
elif mode == 'slurm':
# Run RNA-seek: instantiate main/master process
# Look into later: it maybe worth replacing Popen subprocess with a direct
# snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html
# snakemake --latency-wait 120 -s $R/Snakefile -d $R --printshellcmds
# --cluster-config $R/cluster.json --keep-going --restart-times 3
# --cluster "sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname}"
# -j 500 --rerun-incomplete --stats $R/Reports/initialqc.stats -T
# 2>&1| tee -a $R/Reports/snakemake.log
# Create log file for master job information
logfh = open(os.path.join(outdir, 'logfiles', 'master.log'), 'w')
# submission_script for rna-seek run is /path/to/output/resources/runner
# submission_script for rna-seek build is /path/to/output/resources/builder
masterjob = subprocess.Popen([
str(os.path.join(outdir, 'resources', str(submission_script))), mode,
'-j', str(masterjob), '-b', str(bindpaths),
'-o', str(outdir), '-c', str(cache), '-t', str(tmp_dir)
], cwd = outdir, stderr=subprocess.STDOUT, stdout=logfh, env=my_env)
return masterjob
def resolve_additional_bind_paths(search_paths):
"""Finds additional singularity bind paths from a list of random paths. Paths are
indexed with a compostite key containing the first two directories of an absolute
file path to avoid issues related to shared names across the /gpfs shared network
filesystem. For each indexed list of file paths, a common path is found. Assumes
that the paths provided are absolute paths, the rna-seek build sub command creates
resource file index with absolute filenames.
@param search_paths list[<str>]:
List of absolute file paths to find common bind paths from
@return common_paths list[<str>]:
Returns a list of common shared file paths to create additional singularity bind paths
"""
common_paths = []
indexed_paths = {}
for ref in search_paths:
# Skip over resources with remote URI and
# skip over strings that are not file PATHS as
# RNA-seek build creates absolute resource PATHS
if ref.lower().startswith('sftp://') or \
ref.lower().startswith('s3://') or \
ref.lower().startswith('gs://') or \
not ref.lower().startswith(os.sep):
continue
# Break up path into directory tokens
path_list = os.path.abspath(ref).split(os.sep)
try: # Create composite index from first two directories
# Avoids issues created by shared /gpfs/ PATHS
index = path_list[1:3]
index = tuple(index)
except IndexError:
index = path_list[1] # ref startswith /
if index not in indexed_paths:
indexed_paths[index] = []
# Create an INDEX to find common PATHS for each root child directory
# like /scratch or /data. This prevents issues when trying to find the
# common path betweeen these two different directories (resolves to /)
indexed_paths[index].append(str(os.sep).join(path_list))
for index, paths in indexed_paths.items():
# Find common paths for each path index
common_paths.append(os.path.dirname(os.path.commonprefix(paths)))
return list(set(common_paths))
def run(sub_args):
"""Initialize, setup, and run the RNA-seek pipeline.
Calls initialize() to create output directory and copy over pipeline resources,
setup() to create the pipeline config file, dryrun() to ensure their are no issues
before running the pipeline, and finally run() to execute the Snakemake workflow.
@param sub_args <parser.parse_args() object>:
Parsed arguments for run sub-command
"""
# Check for required dependencies
# The pipelines has only two requirements:
# snakemake and singularity
require(['snakemake', 'singularity'], ['snakemake', 'singularity'])
# Get PATH to RNA-seek git repository for copying over pipeline resources
git_repo = os.path.dirname(os.path.abspath(__file__))
# Initialize working directory, copy over required pipeline resources
input_files = initialize(sub_args, repo_path = git_repo, output_path = sub_args.output)
# Step pipeline for execution, create config.json config file from templates
config = setup(sub_args, ifiles = input_files, repo_path = git_repo, output_path = sub_args.output)
# Optional Step: Dry-run pipeline
if sub_args.dry_run:
dryrun_output = dryrun(outdir = sub_args.output) # python3 returns byte-string representation
print("\nDry-running RNA-seek pipeline:\n{}".format(dryrun_output.decode("utf-8")))
sys.exit(0)
# Resolve all Singularity Bindpaths
rawdata_bind_paths = config['project']['datapath']
# Get FastQ Screen Database paths
# and other reference genome file paths
fqscreen_cfg1=config['bin']['rnaseq']['tool_parameters']['FASTQ_SCREEN_CONFIG']
fqscreen_cfg2=config['bin']['rnaseq']['tool_parameters']['FASTQ_SCREEN_CONFIG2']
fq_screen_paths = get_fastq_screen_paths([os.path.join(sub_args.output,fqscreen_cfg1), os.path.join(sub_args.output,fqscreen_cfg2)])
kraken_db_path = [config['bin']['rnaseq']['tool_parameters']['KRAKENBACDB']]
genome_bind_paths = resolve_additional_bind_paths(list(config['references']['rnaseq'].values()) + fq_screen_paths + kraken_db_path)
all_bind_paths = "{},{}".format(",".join(genome_bind_paths), rawdata_bind_paths)
# Run pipeline
masterjob = orchestrate(
mode = sub_args.mode,
outdir = sub_args.output,
additional_bind_paths = all_bind_paths,
alt_cache = sub_args.singularity_cache,
threads = sub_args.threads,
tmp_dir = sub_args.tmp_dir
)
# Wait for subprocess to complete,
# this is blocking
masterjob.wait()
# Relay information about submission
# of the master job or the exit code of the
# pipeline that ran in local mode
if sub_args.mode == 'local':
if int(masterjob.returncode) == 0:
print('{} pipeline has successfully completed'.format(_name))
else:
fatal('{} pipeline failed. Please see standard output for more information.')
elif sub_args.mode == 'slurm':
jobid = open(os.path.join(sub_args.output, 'logfiles', 'mjobid.log')).read().strip()
if int(masterjob.returncode) == 0:
print('Successfully submitted master job: ', end="")
else:
fatal('Error occurred when submitting the master job.')
print(jobid)
def unlock(sub_args):
"""Unlocks a previous runs output directory. If snakemake fails ungracefully,
it maybe required to unlock the working directory before proceeding again.
This is rare but it does occasionally happen. Maybe worth add a --force
option to delete the '.snakemake/' directory in the future.
@param sub_args <parser.parse_args() object>:
Parsed arguments for unlock sub-command
"""
print("Unlocking the pipeline's output directory...")
outdir = sub_args.output
# Check for required dependencies: snakemake
require(['snakemake'], ['snakemake'])
try:
unlock_output = subprocess.check_output([
'snakemake', '--unlock',
'--cores', '1',
'--configfile=config.json'
], cwd = outdir,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
# Unlocking process returned a non-zero exit code
sys.exit("{}\n{}".format(e, e.output))
print("Successfully unlocked the pipeline's working directory!")
def _sym_refs(input_data, target, make_copy = False):
"""Creates symlinks for each reference file provided
as input. If a symlink already exists, it will not try to create a new symlink.
If relative source PATH is provided, it will be converted to an absolute PATH.
If a symlink is provided in input_data, the canocial path of the symlink is found
to avoid any future singularity binding issues. A physical copy of the data can
be created by setting make_copy to True (default behavior is to create a symlink).
copy of a file can be created
@param input_data <list[<str>]>:
List of input files to symlink to target location
@param target <str>:
Target path to create sylink (output directory)
@param make_copy <boolean>:
Create a physical copy of data (Default: False)
@return canocial_input_paths list[<str>]:
List of canocical paths for the list of input files (added to singularity bindpaths)
"""
# Find canocial paths of input files for adding to singularity bindpaths
canocial_input_paths = []
for file in input_data:
target_name = os.path.join(os.path.abspath(target), os.path.basename(file))
source_name = os.path.abspath(os.path.realpath(file))
canocial_input_paths.append(os.path.dirname(source_name))
if not exists(target_name):
if not make_copy:
# Create a symlink if it does not already exist
# Follow source symlinks to resolve any binding issues
os.symlink(source_name, target_name)
else:
# Create a physical copy if it does not already exist
copy(file, os.path.abspath(target))
return list(set(canocial_input_paths))
def _configure(sub_args, filename, git_repo):
"""Private function for configure_build() that creates the build.yml