@@ -5,8 +5,6 @@ use std::collections::VecDeque;
55use std:: fs:: File ;
66use std:: io;
77use std:: io:: { Read , Write } ;
8- use std:: marker:: Sync ;
9- use std:: ops:: DerefMut ;
108use std:: path:: PathBuf ;
119use std:: sync:: Mutex ;
1210
@@ -104,6 +102,10 @@ fn main() -> Result<()> {
104102 . value_name ( "nucleotide_file" )
105103 . takes_value ( true )
106104 . help ( "Output predicted genes to this file (supersedes -o)." ) )
105+ . arg ( Arg :: with_name ( "unordered" )
106+ . short ( "u" )
107+ . long ( "unordered" )
108+ . help ( "Do not preserve record order in output (faster)." ) )
107109 . get_matches ( ) ;
108110
109111 let ( global, locals) = hmm:: get_train_from_file (
@@ -116,7 +118,7 @@ fn main() -> Result<()> {
116118 filename => Box :: new ( File :: open ( filename) ?) ,
117119 } ;
118120
119- let mut aastream: Option < Box < dyn Write + Send + Sync > > = match (
121+ let mut aastream: Option < Box < dyn Write + Send > > = match (
120122 matches. value_of ( "aa-file" ) ,
121123 matches. value_of ( "output-prefix" ) ,
122124 ) {
@@ -126,52 +128,66 @@ fn main() -> Result<()> {
126128 ( None , None ) => None ,
127129 } ;
128130
129- let metastream: Option < File > = match (
131+ let metastream: Option < Box < dyn Write + Send > > = match (
130132 matches. value_of ( "meta-file" ) ,
131133 matches. value_of ( "output-prefix" ) ,
132134 ) {
133- ( Some ( filename) , _) => Some ( File :: create ( filename) ?) ,
135+ ( Some ( filename) , _) => Some ( Box :: new ( File :: create ( filename) ?) ) ,
134136 ( None , Some ( "stdout" ) ) => None ,
135- ( None , Some ( filename) ) => Some ( File :: create ( filename. to_owned ( ) + ".out" ) ?) ,
137+ ( None , Some ( filename) ) => Some ( Box :: new ( File :: create ( filename. to_owned ( ) + ".out" ) ?) ) ,
136138 ( None , None ) => None ,
137139 } ;
138140
139- let dnastream: Option < File > = match (
141+ let dnastream: Option < Box < dyn Write + Send > > = match (
140142 matches. value_of ( "nucleotide-file" ) ,
141143 matches. value_of ( "output-prefix" ) ,
142144 ) {
143- ( Some ( filename) , _) => Some ( File :: create ( filename) ?) ,
145+ ( Some ( filename) , _) => Some ( Box :: new ( File :: create ( filename) ?) ) ,
144146 ( None , Some ( "stdout" ) ) => None ,
145- ( None , Some ( filename) ) => Some ( File :: create ( filename. to_owned ( ) + ".ffn" ) ?) ,
147+ ( None , Some ( filename) ) => Some ( Box :: new ( File :: create ( filename. to_owned ( ) + ".ffn" ) ?) ) ,
146148 ( None , None ) => None ,
147149 } ;
148150
149151 if aastream. is_none ( ) && metastream. is_none ( ) && dnastream. is_none ( ) {
150152 aastream = Some ( Box :: new ( io:: stdout ( ) ) ) ;
151153 }
152154
153- run (
154- global,
155- locals,
156- inputseqs,
157- aastream,
158- metastream,
159- dnastream,
160- matches. value_of ( "complete" ) . unwrap ( ) == "1" ,
161- matches. is_present ( "formatted" ) ,
162- usize:: from_str_radix ( matches. value_of ( "thread-num" ) . unwrap ( ) , 10 ) ?,
163- ) ?;
155+ if matches. is_present ( "unordered" ) {
156+ run (
157+ global,
158+ locals,
159+ inputseqs,
160+ aastream. map ( UnbufferingBuffer :: new) ,
161+ metastream. map ( UnbufferingBuffer :: new) ,
162+ dnastream. map ( UnbufferingBuffer :: new) ,
163+ matches. value_of ( "complete" ) . unwrap ( ) == "1" ,
164+ matches. is_present ( "formatted" ) ,
165+ usize:: from_str_radix ( matches. value_of ( "thread-num" ) . unwrap ( ) , 10 ) ?,
166+ ) ?;
167+ } else {
168+ run (
169+ global,
170+ locals,
171+ inputseqs,
172+ aastream. map ( SortingBuffer :: new) ,
173+ metastream. map ( SortingBuffer :: new) ,
174+ dnastream. map ( SortingBuffer :: new) ,
175+ matches. value_of ( "complete" ) . unwrap ( ) == "1" ,
176+ matches. is_present ( "formatted" ) ,
177+ usize:: from_str_radix ( matches. value_of ( "thread-num" ) . unwrap ( ) , 10 ) ?,
178+ ) ?;
179+ }
164180
165181 Ok ( ( ) )
166182}
167183
168- fn run < R : Read + Send , W : Write + Send + Sync > (
184+ fn run < R : Read + Send , W : WritingBuffer + Send > (
169185 global : Box < hmm:: Global > ,
170186 locals : Vec < hmm:: Local > ,
171187 inputseqs : R ,
172- aastream : Option < W > ,
173- metastream : Option < File > ,
174- dnastream : Option < File > ,
188+ aa_buffer : Option < W > ,
189+ meta_buffer : Option < W > ,
190+ dna_buffer : Option < W > ,
175191 whole_genome : bool ,
176192 formatted : bool ,
177193 thread_num : usize ,
@@ -180,10 +196,9 @@ fn run<R: Read + Send, W: Write + Send + Sync>(
180196 . num_threads ( thread_num)
181197 . build_global ( ) ?;
182198
183- let hasmeta = metastream. is_some ( ) ;
184- let hasdna = metastream. is_some ( ) ;
185- let hasaa = metastream. is_some ( ) ;
186- let output = Mutex :: new ( OutputBuffer :: new ( aastream, metastream, dnastream) ) ;
199+ let meta_buffer = meta_buffer. map ( Mutex :: new) ;
200+ let dna_buffer = dna_buffer. map ( Mutex :: new) ;
201+ let aa_buffer = aa_buffer. map ( Mutex :: new) ;
187202
188203 Chunked :: new ( 100 , fasta:: Reader :: new ( inputseqs) . into_records ( ) )
189204 . enumerate ( )
@@ -203,29 +218,24 @@ fn run<R: Read + Send, W: Write + Send + Sync>(
203218 nseq,
204219 whole_genome,
205220 ) ;
206- if hasmeta {
221+ if meta_buffer . is_some ( ) {
207222 read_prediction. meta ( & mut metabuf) ?;
208223 }
209- if hasdna {
224+ if dna_buffer . is_some ( ) {
210225 read_prediction. dna ( & mut dnabuf, formatted) ?;
211226 }
212- if hasaa {
227+ if aa_buffer . is_some ( ) {
213228 read_prediction. protein ( & mut aabuf, whole_genome) ?;
214229 }
215230 }
216- let mut buffer = output. lock ( ) . unwrap ( ) ;
217- buffer. set ( index, ( metabuf, dnabuf, aabuf) ) ;
218- let bufs = buffer. deref_mut ( ) . collect :: < Vec < ( Vec < u8 > , Vec < u8 > , Vec < u8 > ) > > ( ) ;
219- for ( metabuf, dnabuf, aabuf) in bufs {
220- if let Some ( metastream) = & mut buffer. metastream {
221- & mut metastream. write_all ( & metabuf) ?;
222- }
223- if let Some ( dnastream) = & mut buffer. dnastream {
224- & mut dnastream. write_all ( & dnabuf) ?;
225- }
226- if let Some ( aastream) = & mut buffer. aastream {
227- & mut aastream. write_all ( & aabuf) ?;
228- }
231+ if let Some ( buffer) = & meta_buffer {
232+ buffer. lock ( ) . unwrap ( ) . add ( index, metabuf) ?;
233+ }
234+ if let Some ( buffer) = & dna_buffer {
235+ buffer. lock ( ) . unwrap ( ) . add ( index, dnabuf) ?;
236+ }
237+ if let Some ( buffer) = & aa_buffer {
238+ buffer. lock ( ) . unwrap ( ) . add ( index, aabuf) ?;
229239 }
230240 Ok ( ( ) )
231241 } )
@@ -263,43 +273,55 @@ impl<I: Iterator> Iterator for Chunked<I> {
263273 }
264274}
265275
266- struct OutputBuffer < I , W : Write + Send > {
276+ trait WritingBuffer {
277+ fn add ( & mut self , index : usize , item : Vec < u8 > ) -> Result < ( ) > ;
278+ }
279+
280+ struct SortingBuffer < W : Write + Send > {
267281 next : usize ,
268- queue : VecDeque < Option < I > > ,
269- aastream : Option < W > ,
270- metastream : Option < File > ,
271- dnastream : Option < File > ,
282+ queue : VecDeque < Option < Vec < u8 > > > ,
283+ stream : W ,
272284}
273285
274- impl < I , W : Write + Send > OutputBuffer < I , W > {
275- fn new ( aastream : Option < W > , metastream : Option < File > , dnastream : Option < File > ) -> Self {
276- OutputBuffer {
286+ impl < W : Write + Send > SortingBuffer < W > {
287+ fn new ( stream : W ) -> Self {
288+ SortingBuffer {
277289 next : 0 ,
278290 queue : VecDeque :: new ( ) ,
279- aastream : aastream,
280- metastream : metastream,
281- dnastream : dnastream,
291+ stream : stream,
282292 }
283293 }
294+ }
284295
285- fn set ( & mut self , index : usize , item : I ) {
296+ impl < W : Write + Send > WritingBuffer for SortingBuffer < W > {
297+ fn add ( & mut self , index : usize , item : Vec < u8 > ) -> Result < ( ) > {
286298 while self . next + self . queue . len ( ) <= index {
287299 self . queue . push_back ( None ) ;
288300 }
289301 self . queue [ index - self . next ] = Some ( item) ;
290- }
291- }
292-
293- impl < I , W : Write + Send > Iterator for OutputBuffer < I , W > {
294- type Item = I ;
295302
296- fn next ( & mut self ) -> Option < I > {
297- if self . queue . front ( ) . map ( Option :: is_some) . unwrap_or ( false ) {
303+ while self . queue . front ( ) . map ( Option :: is_some) . unwrap_or ( false ) {
298304 let item = self . queue . pop_front ( ) . unwrap ( ) . unwrap ( ) ;
299305 self . next += 1 ;
300- Some ( item)
301- } else {
302- None
306+ self . stream . write_all ( & item) ?;
303307 }
308+ Ok ( ( ) )
309+ }
310+ }
311+
312+ struct UnbufferingBuffer < W : Write + Send > {
313+ stream : W ,
314+ }
315+
316+ impl < W : Write + Send > UnbufferingBuffer < W > {
317+ fn new ( stream : W ) -> Self {
318+ UnbufferingBuffer { stream }
319+ }
320+ }
321+
322+ impl < W : Write + Send > WritingBuffer for UnbufferingBuffer < W > {
323+ fn add ( & mut self , _: usize , item : Vec < u8 > ) -> Result < ( ) > {
324+ self . stream . write_all ( & item) ?;
325+ Ok ( ( ) )
304326 }
305327}
0 commit comments