Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a couple of bottlenecks in the pipeline #159

Merged
merged 9 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#145](https://github.com/nf-core/rnavar/issues/145) - Converted `star_index` and `gtf` emit channels from queue to value channels in `PREPARE_GENOME` subworkflow
- [#149](https://github.com/nf-core/rnavar/pull/149) - Updated ch_gtf and ch_fasta_fai channels emitted by main.nf
- [#158](https://github.com/nf-core/rnavar/pull/158) - Fixed language server errors and warnings
- [#159](https://github.com/nf-core/rnavar/pull/159) - Fixed a couple of bottlenecks in the pipeline

### Dependencies

Expand Down
14 changes: 6 additions & 8 deletions subworkflows/local/recalibrate/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@ workflow RECALIBRATE {
ch_versions = ch_versions.mix(APPLYBQSR.out.versions.first())

SAMTOOLS_INDEX(bam_recalibrated)
ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions.first())

def bam_recalibrated_index = bam_recalibrated
.join(SAMTOOLS_INDEX.out.bai, by: [0], remainder: true)
.join(SAMTOOLS_INDEX.out.csi, by: [0], remainder: true) // TODO fix this bottleneck
.map{meta, bam_, bai, csi ->
if (bai) [meta, bam_, bai]
else [meta, bam_, csi]
}
def bam_indices = SAMTOOLS_INDEX.out.bai
.mix(SAMTOOLS_INDEX.out.csi)
.mix(SAMTOOLS_INDEX.out.crai)

ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions.first())
def bam_recalibrated_index = bam_recalibrated
.join(bam_indices, failOnMismatch: true, failOnDuplicate: true)

def bam_reports = Channel.empty()

Expand Down
42 changes: 28 additions & 14 deletions subworkflows/local/splitncigar/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,50 @@ workflow SPLITNCIGAR {
main:
def ch_versions = Channel.empty()

def bam_interval = bam.combine(intervals).map{ meta, bam_, bai, intervals_ -> [ meta + [sample:meta.id], bam_, bai, intervals_ ] }
def bam_interval = bam
.combine(intervals)
.map { meta, bam_, bai, intervals_ ->
[ meta + [interval_count:intervals_ instanceof List ? intervals_.size() : 1], bam_, bai, intervals_ ]
}
.transpose(by:3)
.map { meta, bam_, bai, interval ->
def new_meta = meta + [id:"${meta.id}_${interval.baseName}", sample: meta.id]
nvnieuwk marked this conversation as resolved.
Show resolved Hide resolved
[ meta + [id:"${meta.id}_${interval.baseName}", sample: meta.id], bam_, bai, interval ]
}

GATK4_SPLITNCIGARREADS(bam_interval,
fasta,
fai.map{ fai_ -> [[id:'genome'], fai_] },
dict)

bam_splitncigar = GATK4_SPLITNCIGARREADS.out.bam
def bam_splitncigar = GATK4_SPLITNCIGARREADS.out.bam
ch_versions = ch_versions.mix(GATK4_SPLITNCIGARREADS.out.versions)

bam_splitncigar_interval = bam_splitncigar.map{ meta, bam_ -> [ meta + [id:meta.sample] - meta.subMap('sample'), bam_ ] }.groupTuple()
def bam_splitncigar_interval = bam_splitncigar
.map{ meta, bam_ ->
def new_meta = meta + [id:meta.sample] - meta.subMap('sample') - meta.subMap('interval_count')
[ groupKey(new_meta, meta.interval_count), bam_ ]
}
.groupTuple()

SAMTOOLS_MERGE(bam_splitncigar_interval,
SAMTOOLS_MERGE(
bam_splitncigar_interval,
fasta,
fai.map{ fai_ -> [[id:fai_.baseName], fai_] })
fai.map{ fai_ -> [[id:fai_.baseName], fai_] }
)

splitncigar_bam = SAMTOOLS_MERGE.out.bam
def splitncigar_bam = SAMTOOLS_MERGE.out.bam
ch_versions = ch_versions.mix(SAMTOOLS_MERGE.out.versions)

SAMTOOLS_INDEX(splitncigar_bam)
ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions)

splitncigar_bam_bai = splitncigar_bam
.join(SAMTOOLS_INDEX.out.bai, remainder: true)
.join(SAMTOOLS_INDEX.out.csi, remainder: true) // TODO fix this bottleneck
.map{meta, bam_, bai, csi ->
if (bai) [meta, bam_, bai]
else [meta, bam_, csi]
}
def splitncigar_bam_indices = SAMTOOLS_INDEX.out.bai
.mix(SAMTOOLS_INDEX.out.csi)
.mix(SAMTOOLS_INDEX.out.crai)

ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions)
def splitncigar_bam_bai = splitncigar_bam
.join(splitncigar_bam_indices, failOnDuplicate: true, failOnMismatch: true)

emit:
bam_bai = splitncigar_bam_bai
Expand Down
50 changes: 26 additions & 24 deletions workflows/rnavar/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ workflow RNAVAR {
def interval_list_split = Channel.empty()
if (!params.skip_intervallisttools) {
GATK4_INTERVALLISTTOOLS(interval_list)
interval_list_split = GATK4_INTERVALLISTTOOLS.out.interval_list.map{ _meta, bed -> [bed] }.flatten()
interval_list_split = GATK4_INTERVALLISTTOOLS.out.interval_list.map{ _meta, bed -> [bed] }.collect()
}
else {
interval_list_split = interval_list.map { _meta, bed -> bed }
Expand Down Expand Up @@ -160,13 +160,12 @@ workflow RNAVAR {
fasta,
fasta_fai)

def markduplicate_indices = BAM_MARKDUPLICATES_PICARD.out.bai
.mix(BAM_MARKDUPLICATES_PICARD.out.csi)
.mix(BAM_MARKDUPLICATES_PICARD.out.crai)

def genome_bam_bai = BAM_MARKDUPLICATES_PICARD.out.bam
.join(BAM_MARKDUPLICATES_PICARD.out.bai, remainder: true)
.join(BAM_MARKDUPLICATES_PICARD.out.csi, remainder: true) // TODO fix this bottleneck
.map{meta, bam, bai, csi ->
if (bai) [meta, bam, bai]
else [meta, bam, csi]
}
.join(markduplicate_indices, failOnDuplicate:true, failOnMismatch:true)
.mix(PREPARE_ALIGNMENT.out.bam)

//Gather QC ch_reports
Expand Down Expand Up @@ -259,7 +258,12 @@ workflow RNAVAR {
dbsnp_for_haplotypecaller_tbi = dbsnp_tbi.map{ tbi -> [[id:'dbsnp'], tbi] }
}

haplotypecaller_interval_bam = bam_variant_calling.combine(interval_list_split)
def haplotypecaller_interval_bam = bam_variant_calling.combine(interval_list_split)
.map { meta, bam, bai, interval_lists ->
def new_meta = meta + [interval_count: interval_lists instanceof List ? interval_lists.size() : 1]
[ new_meta, bam, bai, interval_lists ]
}
.transpose(by:3)
.map{ meta, bam, bai, interval_list_ ->
[ meta + [ id:meta.id + "_" + interval_list_.baseName, sample:meta.id, variantcaller:'haplotypecaller' ], bam, bai, interval_list_, [] ]
}
Expand All @@ -278,7 +282,13 @@ workflow RNAVAR {
dbsnp_for_haplotypecaller_tbi
)

def haplotypecaller_raw = GATK4_HAPLOTYPECALLER.out.vcf.map{ meta, vcf -> [ meta + [id:meta.sample] - meta.subMap('sample'), vcf ] }.groupTuple() // TODO fix this bottleneck
def haplotypecaller_out = GATK4_HAPLOTYPECALLER.out.vcf
.join(GATK4_HAPLOTYPECALLER.out.tbi, failOnMismatch:true, failOnDuplicate:true)
.map{ meta, vcf, tbi ->
def new_meta = meta + [id:meta.sample] - meta.subMap('sample') - meta.subMap("interval_count")
nvnieuwk marked this conversation as resolved.
Show resolved Hide resolved
[ groupKey(new_meta, meta.interval_count), vcf, tbi ]
}
.groupTuple()

ch_versions = ch_versions.mix(GATK4_HAPLOTYPECALLER.out.versions)

Expand All @@ -288,6 +298,7 @@ workflow RNAVAR {
// MODULE: MergeVCFS from GATK4
// Merge multiple VCF files into one VCF
//
def haplotypecaller_raw = haplotypecaller_out.map { meta, vcfs, _tbis -> [ meta, vcfs ]}
GATK4_MERGEVCFS(
haplotypecaller_raw,
dict
Expand All @@ -301,16 +312,14 @@ workflow RNAVAR {
TABIX(
haplotypecaller_vcf
)
ch_versions = ch_versions.mix(TABIX.out.versions)

def haplotypecaller_indices = TABIX.out.tbi
.mix(TABIX.out.csi)

def haplotypecaller_vcf_tbi = haplotypecaller_vcf
.join(TABIX.out.tbi, by: [0], remainder: true)
.join(TABIX.out.csi, by: [0], remainder: true) // TODO fix this bottleneck
.map{meta, vcf, tbi, csi ->
if (tbi) [meta, vcf, tbi]
else [meta, vcf, csi]
}
.join(haplotypecaller_indices, failOnDuplicate:true, failOnMismatch: true)

ch_versions = ch_versions.mix(TABIX.out.versions)
def final_vcf = Channel.empty()

//
Expand Down Expand Up @@ -363,20 +372,13 @@ workflow RNAVAR {
}

} else {
def combinegvcfs_input = GATK4_HAPLOTYPECALLER.out.vcf
.join(GATK4_HAPLOTYPECALLER.out.tbi, failOnMismatch:true, failOnDuplicate:true)
.map{ meta, vcf, tbi ->
def new_meta = meta + [id:meta.sample]
[new_meta, vcf, tbi]
}
.groupTuple() // TODO fix this bottleneck

//
// MODULE: CombineGVCFS from GATK4
// Merge multiple GVCF files into one GVCF
//
GATK4_COMBINEGVCFS(
combinegvcfs_input,
haplotypecaller_out,
fasta.map { _meta, fasta_ -> fasta_ },
fasta_fai.map { _meta, fai -> fai },
dict.map { _meta, dict_ -> dict_ }
Expand Down
Loading