@@ -5,6 +5,7 @@ use std::collections::VecDeque;
55use std:: fs:: File ;
66use std:: io;
77use std:: io:: { Read , Write } ;
8+ use std:: marker:: Sync ;
89use std:: ops:: DerefMut ;
910use std:: path:: PathBuf ;
1011use std:: sync:: Mutex ;
@@ -115,7 +116,7 @@ fn main() -> Result<()> {
115116 filename => Box :: new ( File :: open ( filename) ?) ,
116117 } ;
117118
118- let mut aastream: Option < Box < dyn Write + Send > > = match (
119+ let mut aastream: Option < Box < dyn Write + Send + Sync > > = match (
119120 matches. value_of ( "aa-file" ) ,
120121 matches. value_of ( "output-prefix" ) ,
121122 ) {
@@ -164,7 +165,7 @@ fn main() -> Result<()> {
164165 Ok ( ( ) )
165166}
166167
167- fn run < R : Read + Send , W : Write + Send > (
168+ fn run < R : Read + Send , W : Write + Send + Sync > (
168169 global : Box < hmm:: Global > ,
169170 locals : Vec < hmm:: Local > ,
170171 inputseqs : R ,
@@ -179,10 +180,11 @@ fn run<R: Read + Send, W: Write + Send>(
179180 . num_threads ( thread_num)
180181 . build_global ( ) ?;
181182
182- let buffer = Mutex :: new ( OutputBuffer :: new ( ) ) ;
183- let aastream = aastream. map ( Mutex :: new) ;
184- let metastream = metastream. map ( Mutex :: new) ;
185- let dnastream = dnastream. map ( Mutex :: new) ;
183+ let hasmeta = metastream. is_some ( ) ;
184+ let hasdna = metastream. is_some ( ) ;
185+ let hasaa = metastream. is_some ( ) ;
186+ let output = Mutex :: new ( OutputBuffer :: new ( aastream, metastream, dnastream) ) ;
187+
186188 Chunked :: new ( 100 , fasta:: Reader :: new ( inputseqs) . into_records ( ) )
187189 . enumerate ( )
188190 . par_bridge ( )
@@ -201,27 +203,28 @@ fn run<R: Read + Send, W: Write + Send>(
201203 nseq,
202204 whole_genome,
203205 ) ;
204- if metastream . is_some ( ) {
206+ if hasmeta {
205207 read_prediction. meta ( & mut metabuf) ?;
206208 }
207- if dnastream . is_some ( ) {
209+ if hasdna {
208210 read_prediction. dna ( & mut dnabuf, formatted) ?;
209211 }
210- if aastream . is_some ( ) {
212+ if hasaa {
211213 read_prediction. protein ( & mut aabuf, whole_genome) ?;
212214 }
213215 }
214- let mut locked_buffer = buffer. lock ( ) . unwrap ( ) ;
215- locked_buffer. set ( index, ( metabuf, dnabuf, aabuf) ) ;
216- for ( metabuf, dnabuf, aabuf) in locked_buffer. deref_mut ( ) {
217- if let Some ( metastream) = & metastream {
218- metastream. lock ( ) . unwrap ( ) . write_all ( & metabuf) ?;
216+ let mut buffer = output. lock ( ) . unwrap ( ) ;
217+ buffer. set ( index, ( metabuf, dnabuf, aabuf) ) ;
218+ let bufs = buffer. deref_mut ( ) . collect :: < Vec < ( Vec < u8 > , Vec < u8 > , Vec < u8 > ) > > ( ) ;
219+ for ( metabuf, dnabuf, aabuf) in bufs {
220+ if let Some ( metastream) = & mut buffer. metastream {
221+ & mut metastream. write_all ( & metabuf) ?;
219222 }
220- if let Some ( dnastream) = & dnastream {
221- dnastream. lock ( ) . unwrap ( ) . write_all ( & dnabuf) ?;
223+ if let Some ( dnastream) = & mut buffer . dnastream {
224+ & mut dnastream. write_all ( & dnabuf) ?;
222225 }
223- if let Some ( aastream) = & aastream {
224- aastream. lock ( ) . unwrap ( ) . write_all ( & aabuf) ?;
226+ if let Some ( aastream) = & mut buffer . aastream {
227+ & mut aastream. write_all ( & aabuf) ?;
225228 }
226229 }
227230 Ok ( ( ) )
@@ -260,16 +263,22 @@ impl<I: Iterator> Iterator for Chunked<I> {
260263 }
261264}
262265
263- struct OutputBuffer < I > {
266+ struct OutputBuffer < I , W : Write + Send > {
264267 next : usize ,
265268 queue : VecDeque < Option < I > > ,
269+ aastream : Option < W > ,
270+ metastream : Option < File > ,
271+ dnastream : Option < File > ,
266272}
267273
268- impl < I > OutputBuffer < I > {
269- fn new ( ) -> Self {
274+ impl < I , W : Write + Send > OutputBuffer < I , W > {
275+ fn new ( aastream : Option < W > , metastream : Option < File > , dnastream : Option < File > ) -> Self {
270276 OutputBuffer {
271277 next : 0 ,
272278 queue : VecDeque :: new ( ) ,
279+ aastream : aastream,
280+ metastream : metastream,
281+ dnastream : dnastream,
273282 }
274283 }
275284
@@ -281,7 +290,7 @@ impl<I> OutputBuffer<I> {
281290 }
282291}
283292
284- impl < I > Iterator for OutputBuffer < I > {
293+ impl < I , W : Write + Send > Iterator for OutputBuffer < I , W > {
285294 type Item = I ;
286295
287296 fn next ( & mut self ) -> Option < I > {
0 commit comments