3
3
import argparse
4
4
import multiprocessing as mp
5
5
from pathlib import Path
6
- import pickle
7
6
from queue import Empty
8
7
from uuid import UUID
9
8
@@ -33,9 +32,7 @@ def process_read(get_columns, read, read_ids, extracted_columns):
33
32
col .append (getattr (read , c ))
34
33
35
34
36
- def do_batch_bulk_work (
37
- filename , batches , select_read_ids , get_columns , c_api , result_q
38
- ):
35
+ def do_batch_work (filename , batches , get_columns , c_api , result_q ):
39
36
read_ids = []
40
37
extracted_columns = {"read_id" : read_ids }
41
38
@@ -47,19 +44,13 @@ def do_batch_bulk_work(
47
44
result_q .put (pd .DataFrame (extracted_columns ))
48
45
49
46
50
- def do_batch_search_work (
51
- filename , batches , select_read_ids_pickled , get_columns , c_api , result_q
52
- ):
47
+ def do_search_work (files , select_read_ids , get_columns , c_api , result_q ):
53
48
read_ids = []
54
49
extracted_columns = {"read_id" : read_ids }
50
+ for file in files :
51
+ file = mkr_format .open_combined_file (file , use_c_api = c_api )
55
52
56
- select_read_ids = pickle .loads (select_read_ids_pickled )
57
-
58
- file = mkr_format .open_combined_file (filename , use_c_api = c_api )
59
- for batch in batches :
60
- for read in filter (
61
- lambda x : x .read_id in select_read_ids , file .get_batch (batch ).reads ()
62
- ):
53
+ for read in file .select_reads (UUID (s ) for s in select_read_ids ):
63
54
process_read (get_columns , read , read_ids , extracted_columns )
64
55
65
56
result_q .put (pd .DataFrame (extracted_columns ))
@@ -82,36 +73,34 @@ def run(input_dir, output, select_read_ids=None, get_columns=[], c_api=False):
82
73
files = list (input_dir .glob ("*.mkr" ))
83
74
print (f"Searching for read ids in { [str (f ) for f in files ]} " )
84
75
85
- fn_to_call = do_batch_bulk_work
86
- if select_read_ids is not None :
87
- fn_to_call = do_batch_search_work
88
-
89
- select_read_ids = pickle .dumps (
90
- set (UUID (s ) for s in select_read_ids ) if select_read_ids is not None else None
91
- )
92
-
93
76
processes = []
94
- for filename in files :
95
- file = mkr_format .open_combined_file (filename , use_c_api = c_api )
96
- batches = list (range (file .batch_count ))
97
- approx_chunk_size = max (1 , len (batches ) // runners )
77
+ if select_read_ids is not None :
78
+ approx_chunk_size = max (1 , len (select_read_ids ) // runners )
98
79
start_index = 0
99
- while start_index < len (batches ):
100
- select_batches = batches [start_index : start_index + approx_chunk_size ]
80
+ while start_index < len (select_read_ids ):
81
+ select_ids = select_read_ids [start_index : start_index + approx_chunk_size ]
101
82
p = mp .Process (
102
- target = fn_to_call ,
103
- args = (
104
- filename ,
105
- select_batches ,
106
- select_read_ids ,
107
- get_columns ,
108
- c_api ,
109
- result_queue ,
110
- ),
83
+ target = do_search_work ,
84
+ args = (files , select_ids , get_columns , c_api , result_queue ),
111
85
)
112
86
p .start ()
113
87
processes .append (p )
114
- start_index += len (select_batches )
88
+ start_index += len (select_ids )
89
+ else :
90
+ for filename in files :
91
+ file = mkr_format .open_combined_file (filename , use_c_api = c_api )
92
+ batches = list (range (file .batch_count ))
93
+ approx_chunk_size = max (1 , len (batches ) // runners )
94
+ start_index = 0
95
+ while start_index < len (batches ):
96
+ select_batches = batches [start_index : start_index + approx_chunk_size ]
97
+ p = mp .Process (
98
+ target = do_batch_work ,
99
+ args = (filename , select_batches , get_columns , c_api , result_queue ),
100
+ )
101
+ p .start ()
102
+ processes .append (p )
103
+ start_index += len (select_batches )
115
104
116
105
print ("Wait for processes..." )
117
106
items = []
0 commit comments