Skip to content

Commit e07e626

Browse files
author
Felix Van der Jeugt
committed
implement OutputBuffer for in-order output
1 parent 76e2bc3 commit e07e626

File tree

1 file changed

+52
-9
lines changed

1 file changed

+52
-9
lines changed

src/bin/FragGeneScanRs.rs

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
//! FragGeneScanRs executable
22
#![allow(non_snake_case)]
33

4+
use std::collections::VecDeque;
45
use std::fs::File;
56
use std::io;
67
use std::io::{Read, Write};
8+
use std::ops::DerefMut;
79
use std::path::PathBuf;
810
use std::sync::Mutex;
911

@@ -177,12 +179,14 @@ fn run<R: Read + Send, W: Write + Send>(
177179
.num_threads(thread_num)
178180
.build_global()?;
179181

182+
let buffer = Mutex::new(OutputBuffer::new());
180183
let aastream = aastream.map(Mutex::new);
181184
let metastream = metastream.map(Mutex::new);
182185
let dnastream = dnastream.map(Mutex::new);
183186
Chunked::new(100, fasta::Reader::new(inputseqs).into_records())
187+
.enumerate()
184188
.par_bridge()
185-
.map(|recordvec| {
189+
.map(|(index, recordvec)| {
186190
let mut metabuf = Vec::new();
187191
let mut dnabuf = Vec::new();
188192
let mut aabuf = Vec::new();
@@ -207,14 +211,18 @@ fn run<R: Read + Send, W: Write + Send>(
207211
read_prediction.protein(&mut aabuf, whole_genome)?;
208212
}
209213
}
210-
if let Some(metastream) = &metastream {
211-
metastream.lock().unwrap().write_all(&metabuf)?;
212-
}
213-
if let Some(dnastream) = &dnastream {
214-
dnastream.lock().unwrap().write_all(&dnabuf)?;
215-
}
216-
if let Some(aastream) = &aastream {
217-
aastream.lock().unwrap().write_all(&aabuf)?;
214+
let mut locked_buffer = buffer.lock().unwrap();
215+
locked_buffer.set(index, (metabuf, dnabuf, aabuf));
216+
for (metabuf, dnabuf, aabuf) in locked_buffer.deref_mut() {
217+
if let Some(metastream) = &metastream {
218+
metastream.lock().unwrap().write_all(&metabuf)?;
219+
}
220+
if let Some(dnastream) = &dnastream {
221+
dnastream.lock().unwrap().write_all(&dnabuf)?;
222+
}
223+
if let Some(aastream) = &aastream {
224+
aastream.lock().unwrap().write_all(&aabuf)?;
225+
}
218226
}
219227
Ok(())
220228
})
@@ -251,3 +259,38 @@ impl<I: Iterator> Iterator for Chunked<I> {
251259
}
252260
}
253261
}
262+
263+
struct OutputBuffer<I> {
264+
next: usize,
265+
queue: VecDeque<Option<I>>,
266+
}
267+
268+
impl<I> OutputBuffer<I> {
269+
fn new() -> Self {
270+
OutputBuffer {
271+
next: 0,
272+
queue: VecDeque::new(),
273+
}
274+
}
275+
276+
fn set(&mut self, index: usize, item: I) {
277+
while self.next + self.queue.len() <= index {
278+
self.queue.push_back(None);
279+
}
280+
self.queue[index - self.next] = Some(item);
281+
}
282+
}
283+
284+
impl<I> Iterator for OutputBuffer<I> {
285+
type Item = I;
286+
287+
fn next(&mut self) -> Option<I> {
288+
if self.queue.front().map(Option::is_some).unwrap_or(false) {
289+
let item = self.queue.pop_front().unwrap().unwrap();
290+
self.next += 1;
291+
Some(item)
292+
} else {
293+
None
294+
}
295+
}
296+
}

0 commit comments

Comments
 (0)