Skip to content

Commit

Permalink
CLI option to take input from stdin
Browse files Browse the repository at this point in the history
Signed-off-by: qjerome <[email protected]>
  • Loading branch information
qjerome committed Feb 19, 2024
1 parent e7a1bf7 commit 7be93a4
Showing 1 changed file with 40 additions and 22 deletions.
62 changes: 40 additions & 22 deletions src/bin/poppy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
use std::{cmp::max, fmt::Display, fs::File, io::BufRead, mem::size_of, sync::Arc, thread};
use std::{
cmp::max,
fmt::Display,
fs::File,
io::{self, BufRead},
mem::size_of,
sync::Arc,
thread,
};

use anyhow::anyhow;
use clap::Parser;
Expand Down Expand Up @@ -81,6 +89,9 @@ struct Insert {
/// Show progress information
#[clap(short, long)]
progress: bool,
/// Reads input from stdin
#[clap(long)]
stdin: bool,
/// The number of jobs to use to insert into the bloom filter. The original
/// filter is copied into the memory of each job so you can expect the memory
/// of the whole process to be N times the size of the (uncompressed) bloom filter.
Expand Down Expand Up @@ -125,22 +136,30 @@ fn main() -> Result<(), anyhow::Error> {
let bloom_file = File::open(&o.file)?;
let bf = Arc::new(std::sync::Mutex::new(BloomFilter::from_reader(bloom_file)?));

let mut handles = vec![];
let files = o.inputs.clone();
// if we pipe in entries via stdin
if o.stdin {
let mut bf = bf.lock().unwrap();
for line in std::io::BufReader::new(io::stdin()).lines() {
bf.insert(line?)
}
}

let batches = files.chunks(max(files.len() / o.jobs, 1));
// processing files if any
if !o.inputs.is_empty() {
let mut handles = vec![];
let files = o.inputs.clone();

for batch in batches {
let shared = Arc::clone(&bf);
let batch: Vec<String> = batch.to_vec();
let mut copy = shared
.lock()
.map_err(|e| anyhow!("failed to lock mutex: {}", e))?
.clone();
let batches = files.chunks(max(files.len() / o.jobs, 1));

let h = thread::spawn(move || {
{
//println!("Processing batch of {} files", batch.len());
for batch in batches {
let shared = Arc::clone(&bf);
let batch: Vec<String> = batch.to_vec();
let mut copy = shared
.lock()
.map_err(|e| anyhow!("failed to lock mutex: {}", e))?
.clone();

let h = thread::spawn(move || {
for input in batch {
if o.progress {
println!("processing file: {input}");
Expand All @@ -158,18 +177,17 @@ fn main() -> Result<(), anyhow::Error> {
shared.union(&copy)?;

Ok::<(), anyhow::Error>(())
}
});
handles.push(h)
}
});
handles.push(h)
}

for h in handles {
h.join().expect("failed to join thread")?;
for h in handles {
h.join().expect("failed to join thread")?;
}
}

let mut output = File::create(o.file)?;
let b = bf.lock().unwrap();
b.write(&mut output)?;
bf.lock().unwrap().write(&mut output)?;
}
Command::Check(o) => {
let bloom_file = File::open(&o.file)?;
Expand Down

0 comments on commit 7be93a4

Please sign in to comment.