Skip to content

Commit 7ce3dcc

Browse files
committed
Parallelize interval-tree creation.
1 parent dc6eda0 commit 7ce3dcc

File tree

8 files changed

+125
-53
lines changed

8 files changed

+125
-53
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ venv3.6/
88
binaries/
99
src/
1010
poa/
11+
old/
1112

1213
*.pyc
1314

caller.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ void Caller::run() {
7979
<< "\t" << c.seq << "\t"
8080
<< "*" << endl ;
8181
}
82+
83+
std::sort(extender.svs.begin(), extender.svs.end()) ;
8284
for (const SV& sv: extender.svs) {
8385
ovcf << sv << endl ;
8486
}

cluster.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@ struct Cluster {
5959
struct ExtCluster {
6060
std::string chrom ;
6161
std::vector<ExtSFS> seqs ;
62+
63+
ExtCluster() {} ;
6264

63-
ExtCluster(const std::string& _chrom, const std::vector<ExtSFS>& _seqs) {
64-
chrom = _chrom ;
65+
ExtCluster(const std::vector<ExtSFS>& _seqs) {
6566
seqs = _seqs ;
67+
chrom = seqs[0].chrom ;
6668
}
6769
};
6870

extender.cpp

Lines changed: 106 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include <ctime>
2+
13
#include "extender.hpp"
24

35
using namespace std ;
@@ -22,25 +24,28 @@ void Extender::run(int _threads) {
2224
for (int i = 0; i < threads; i++) {
2325
//extended_sfs.insert(extended_sfs.begin(), _p_extended_sfs[i].begin(), _p_extended_sfs[i].end()) ;
2426
for (const auto& extsfs: _p_extended_sfs[i]) {
25-
extended_sfs[extsfs.chrom].push_back(extsfs) ;
27+
extended_sfs.push_back(extsfs) ;
2628
}
2729
clips.insert(clips.begin(), _p_clips[i].begin(), _p_clips[i].end()) ;
2830
}
29-
cout << "Extension complete." << endl ;
31+
lprint({"Extension complete."}) ;
3032
// build a separate interval tree for each chromosome
31-
_p_tree.resize(chromosomes.size()) ;
32-
_p_sfs_clusters.resize(chromosomes.size()) ;
33-
#pragma omp parallel for num_threads(threads) schedule(static,1)
34-
for(int i = 0; i < chromosomes.size(); i++) {
35-
int t = i % threads ;
36-
cluster_interval_tree(chromosomes[i], i) ;
37-
}
33+
cluster_interval_tree() ;
3834
// put all clusters in a single vector
39-
for(int i = 0; i < chromosomes.size(); i++) {
35+
lprint({"Flattening interval clusters.."}) ;
36+
map<pair<int, int>, ExtCluster> _ext_clusters ;
37+
for(int i = 0; i < threads; i++) {
4038
for (const auto& cluster: _p_sfs_clusters[i]) {
41-
ext_clusters.push_back(ExtCluster(chromosomes[i], cluster.second)) ;
39+
if (_ext_clusters.find(cluster.first) == _ext_clusters.end()) {
40+
_ext_clusters.insert(make_pair(cluster.first, ExtCluster(cluster.second))) ;
41+
} else {
42+
_ext_clusters[cluster.first].seqs.insert(_ext_clusters[cluster.first].seqs.begin(), cluster.second.begin(), cluster.second.end()) ;
43+
}
4244
}
4345
}
46+
for (const auto& cluster: _ext_clusters) {
47+
ext_clusters.push_back(cluster.second) ;
48+
}
4449
// process each cluster separately
4550
_p_clusters.resize(threads) ;
4651
cluster() ;
@@ -377,50 +382,94 @@ void Extender::extend_alignment(bam1_t* aln, int index) {
377382
}
378383
}
379384

380-
//void Extender::extend() {
381-
// bam1_t *aln = bam_init1() ;
382-
// hts_itr_t *itr = sam_itr_querys(bam_index, bam_header, chrom.c_str()) ;
383-
// uint n_al = 0 ;
384-
// while (sam_itr_next(bam_file, itr, aln) > 0) {
385-
// if (aln->core.flag & BAM_FUNMAP || aln->core.flag & BAM_FSUPPLEMENTARY || aln->core.flag & BAM_FSECONDARY) {
386-
// continue ;
387-
// }
388-
// char *qname = bam_get_qname(aln);
389-
// if (SFSs->find(qname) == SFSs->end()) {
390-
// continue ;
391-
// }
392-
// extend_alignment(aln, 0) ;
393-
// ++n_al ;
394-
// if (n_al % 10000 == 0) {
395-
// cerr << "Parsed " << n_al << " alignments." << endl;
396-
// }
397-
// }
398-
//}
399-
400-
void Extender::cluster_interval_tree(string chrom, int index) {
401-
lprint({"Clustering", to_string(extended_sfs[chrom].size()), "extended SFS on", chrom + ".."});
402-
for (const ExtSFS &sfs: extended_sfs[chrom]) {
385+
// TODO: this is our performance bottleneck because we can't use multiple threads per chromosome
386+
void Extender::cluster_interval_tree() {
387+
lprint({"Clustering ", to_string(extended_sfs.size()), " extended SFS.."});
388+
// have a tree per chromosome for each thread
389+
_p_tree.resize(threads) ;
390+
time_t f ;
391+
time(&f) ;
392+
time_t s ;
393+
time(&s) ;
394+
time_t u ;
395+
#pragma omp parallel for num_threads(threads) schedule(static,1)
396+
for (int i = 0; i < extended_sfs.size(); i++) {
397+
int t = omp_get_thread_num() ;
398+
const ExtSFS& sfs = extended_sfs[i] ;
403399
vector<pair<int, int>> overlaps ;
404-
_p_tree[index].overlap_find_all({sfs.s, sfs.e}, [&overlaps](auto iter) {
400+
_p_tree[t][sfs.chrom].overlap_find_all({sfs.s, sfs.e}, [&overlaps](auto iter) {
405401
overlaps.push_back(make_pair(iter->low(), iter->high()));
406402
return true;
407403
});
408404
if (overlaps.empty()) {
409-
_p_tree[index].insert({sfs.s, sfs.e});
405+
_p_tree[t][sfs.chrom].insert({sfs.s, sfs.e});
410406
} else {
411407
int mins = sfs.s;
412408
int maxe = sfs.e;
413409
for (const pair<int, int> overlap : overlaps) {
414410
mins = min(mins, overlap.first);
415411
maxe = max(maxe, overlap.second);
416412
}
417-
_p_tree[index].insert({mins, maxe});
413+
_p_tree[t][sfs.chrom].insert({mins, maxe});
414+
}
415+
if (t == 0) {
416+
time(&u) ;
417+
if (u - s > 30) {
418+
cerr << "[I] Processed " << i << " SFS so far. SFS per second: " << std::setw(8) << i / (u - f) << ". Time: " << std::setw(8) << std::fixed << u - f << "\n" ;
419+
time(&s) ;
420+
}
418421
}
419422
}
420-
_p_tree[index].deoverlap();
421-
for (const ExtSFS&sfs: extended_sfs[chrom]) {
422-
auto overlap = _p_tree[index].overlap_find({sfs.s, sfs.e});
423-
_p_sfs_clusters[index][make_pair(overlap->low(), overlap->high())].push_back(sfs);
423+
// deoverlap each tree
424+
lprint({"Compressing interval trees.."}) ;
425+
#pragma omp parallel for num_threads(threads) schedule(static,1)
426+
for (int j = 0; j < threads; j++) {
427+
for (int i = 0; i < chromosomes.size(); i++) {
428+
const auto& chrom = chromosomes[i] ;
429+
_p_tree[j][chrom].deoverlap();
430+
}
431+
}
432+
lprint({"Merging interval trees.."}) ;
433+
_p_sfs_clusters.resize(threads) ;
434+
time(&f) ;
435+
time(&s) ;
436+
#pragma omp parallel for num_threads(threads) schedule(static,1)
437+
for (int i = 0; i < extended_sfs.size(); i++) {
438+
const ExtSFS& sfs = extended_sfs[i] ;
439+
const auto& chrom = sfs.chrom ;
440+
int t = omp_get_thread_num() ;
441+
// this finds the largest interval across all trees that contains this SFS. Any other SFS in any of those
442+
// intervals should map to the exact same interval in the end
443+
int low = sfs.s ;
444+
int high = sfs.e ;
445+
unordered_map<int, bool> checked ;
446+
int size = checked.size() ;
447+
// Parsoa: I might be overthinking this..
448+
while (true) {
449+
for (int j = 0; j < threads; j++) {
450+
if (checked.find(j) == checked.end()) {
451+
auto overlap = _p_tree[j][chrom].overlap_find({low, high});
452+
if (overlap != _p_tree[j][chrom].end()) {
453+
low = min(low, overlap->low()) ;
454+
high = max(high, overlap->high()) ;
455+
checked[j] = true ;
456+
}
457+
}
458+
}
459+
if (checked.size() == size) {
460+
break ;
461+
}
462+
size = checked.size() ;
463+
}
464+
// Same pair will be made by all other threads that have some SFS in this interval
465+
_p_sfs_clusters[t][make_pair(low, high)].push_back(sfs);
466+
if (t == 0) {
467+
time(&u) ;
468+
if (u - s > 30) {
469+
cerr << "[I] Processed " << i << " SFS so far. SFS per second: " << std::setw(8) << i / (u - f) << ". Time: " << std::setw(8) << std::fixed << u - f << "\n" ;
470+
time(&s) ;
471+
}
472+
}
424473
}
425474
}
426475

@@ -438,7 +487,13 @@ void Extender::cluster() {
438487
_p_bam_file[i] = hts_open(config->bam.c_str(), "r") ;
439488
_p_bam_index[i] = sam_index_load(_p_bam_file[i], config->bam.c_str()) ;
440489
_p_bam_header[i] = sam_hdr_read(_p_bam_file[i]) ;
490+
bgzf_mt(_p_bam_file[i]->fp.bgzf, 8, 1) ;
441491
}
492+
time_t f ;
493+
time(&f) ;
494+
time_t s ;
495+
time(&s) ;
496+
time_t u ;
442497
#pragma omp parallel for num_threads(threads) schedule(static,1)
443498
for (int i = 0; i < ext_clusters.size(); i++) {
444499
int t = i % threads ;
@@ -464,7 +519,6 @@ void Extender::cluster() {
464519
uint cov = 0;
465520
string region = chrom + ":" + to_string(cluster_s) + "-" + to_string(cluster_e);
466521
hts_itr_t *itr = sam_itr_querys(_p_bam_index[t], _p_bam_header[t], region.c_str());
467-
// add second level of parallelization inside Extender
468522
while (sam_itr_next(_p_bam_file[t], itr, _p_aln[t]) > 0) {
469523
bam1_t* aln = _p_aln[t] ;
470524
if (aln->core.flag & BAM_FUNMAP || aln->core.flag & BAM_FSUPPLEMENTARY || aln->core.flag & BAM_FSECONDARY) {
@@ -485,9 +539,9 @@ void Extender::cluster() {
485539
len[t] = l ;
486540
seq[t] = (char*) malloc(l + 1) ;
487541
}
488-
uint8_t *q = bam_get_seq(aln) ; //quality string
542+
uint8_t *q = bam_get_seq(aln) ;
489543
for (int i = 0; i < l; i++){
490-
seq[t][i] = seq_nt16_str[bam_seqi(q, i)]; //gets nucleotide id and converts them into IUPAC id.
544+
seq[t][i] = seq_nt16_str[bam_seqi(q, i)];
491545
}
492546
seq[t][l] = '\0';
493547

@@ -534,6 +588,13 @@ void Extender::cluster() {
534588
} else {
535589
++small_extcl ;
536590
}
591+
if (t == 0) {
592+
time(&u) ;
593+
if (u - s > 30) {
594+
cerr << "[I] Processed " << i << " clusters so far. Clusters per second: " << std::setw(8) << i / (u - f) << ". Time: " << std::setw(8) << std::fixed << u - f << "\n" ;
595+
time(&s) ;
596+
}
597+
}
537598
}
538599
for (int i = 0; i < threads; i++) {
539600
free(seq[i]) ;
@@ -584,7 +645,7 @@ vector<pair<uint, char>> Extender::parse_cigar(string cigar) {
584645
}
585646

586647
void Extender::call() {
587-
cout << "Calling SVs from " << clusters.size() << " clusters.." << endl ;
648+
lprint({"Calling SVs from", to_string(clusters.size()), "clusters.."}) ;
588649
#pragma omp parallel for num_threads(threads) schedule(static,1)
589650
for (int _ = 0; _ < clusters.size(); _++) {
590651
int t = _ % threads ;

extender.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class Extender {
4949
std::vector<Cluster> clusters ;
5050
std::vector<ExtCluster> ext_clusters ;
5151
std::unordered_map<std::string, std::vector<SFS>>* SFSs ;
52-
std::unordered_map<std::string, std::vector<ExtSFS>> extended_sfs ;
52+
std::vector<ExtSFS> extended_sfs ;
5353

5454
void extend_parallel() ;
5555
void extend_alignment(bam1_t* aln, int index) ;
@@ -58,7 +58,7 @@ class Extender {
5858
std::pair<int, int> get_unique_kmers(const std::vector<std::pair<int, int>> &alpairs, const uint k, const bool from_end, std::string chrom) ;
5959

6060
void cluster() ;
61-
void cluster_interval_tree(std::string chrom, int index) ;
61+
void cluster_interval_tree() ;
6262

6363
void call() ;
6464
std::vector<std::pair<uint, char>> parse_cigar(std::string) ;
@@ -73,8 +73,8 @@ class Extender {
7373
std::vector<std::vector<Cluster>> _p_clusters ;
7474
std::vector<std::vector<Consensus>> _p_alignments ;
7575
std::vector<std::vector<std::vector<bam1_t*>>> bam_entries ;
76+
std::vector<std::unordered_map<string, interval_tree_t<int>>> _p_tree ;
7677
std::vector<std::map<std::pair<int, int>, std::vector<ExtSFS>>> _p_sfs_clusters ;
77-
std::vector<interval_tree_t<int>> _p_tree ;
7878

7979
public:
8080
Extender(std::unordered_map<std::string, std::vector<SFS>>*) ;

ping_pong.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ int PingPong::search() {
259259
lprint({"BAM input.."});
260260
bam_file = hts_open(config->bam.c_str(), "r") ;
261261
bam_header = sam_hdr_read(bam_file) ; //read header
262+
bgzf_mt(bam_file->fp.bgzf, 8, 1) ;
262263
mode = 1 ;
263264
} else {
264265
lprint({"No input file provided, aborting.."}, 2);
@@ -272,7 +273,7 @@ int PingPong::search() {
272273
fastq_entries.push_back(vector<vector<fastq_entry_t>>(config->threads)) ; // current and next output
273274
}
274275
int p = 0 ;
275-
int batch_size = 10000 ;
276+
int batch_size = (10000 / config->threads) * config->threads ;
276277
lprint({"Extracting SFS strings on", to_string(config->threads), "threads.."});
277278
lprint({"Loading first batch.."});
278279
if (mode == 0) {

reconstructor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ void Reconstructor::run() {
242242
// parse arguments
243243
bam_file = sam_open(config->bam.c_str(), "r") ;
244244
bam_header = sam_hdr_read(bam_file) ; //read header
245+
bgzf_mt(bam_file->fp.bgzf, 8, 1) ;
245246
auto out_bam_path = config->workdir + (config->selective ? "/reconstructed.selective.bam" : "/reconstructed.bam") ;
246247
out_bam_file = sam_open(out_bam_path.c_str(), "wb") ;
247248
int r = sam_hdr_write(out_bam_file, bam_header) ;
@@ -254,7 +255,7 @@ void Reconstructor::run() {
254255
}
255256
int p = 0 ;
256257
int b = 0 ;
257-
int batch_size = 10000 ;
258+
int batch_size = (10000 / config->threads) * config->threads ;
258259
lprint({"Loading first batch.."});
259260
for (int i = 0; i < 2; i++) {
260261
for (int j = 0; j < config->threads; j++) {

sv.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ class SV {
2929
void genotype();
3030

3131
bool operator<(const SV& c) const {
32-
return s < c.s;
32+
if (chrom == c.chrom) {
33+
return s < c.s ;
34+
} else {
35+
return chrom < c.chrom ;
36+
}
3337
}
3438

3539
bool operator==(const SV& c) const {

0 commit comments

Comments
 (0)