Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add zstd support #9

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ keywords = ["utilities"]
[dependencies]
thiserror = "^1"

# For auto-gzip handing of files
# For auto-gzip handling of files
flate2 = "^1"

# For auto-zstd handling of files
zstd = "0.12.4"

# For auto-serialization of structs to csv/tsv
csv = "^1"
serde = { version = "^1.0.123", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.58.1"
channel = "1.71.1"
components = ["rustfmt", "clippy"]
138 changes: 118 additions & 20 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! I/O activities, such a slurping a file by lines, or writing a collection of `Serializable`
//! objects to a path.
//!
//! The two core parts of this module are teh [`Io`] and [`DelimFile`] structs. These structs provide
//! The two core parts of this module are the [`Io`] and [`DelimFile`] structs. These structs provide
//! methods for reading and writing to files that transparently handle compression based on the
//! file extension of the path given to the methods.
//!
Expand Down Expand Up @@ -51,13 +51,16 @@ use flate2::bufread::MultiGzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use serde::{de::DeserializeOwned, Serialize};

/// The set of file extensions to treat as GZIPPED
const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"];
use zstd::stream::{Decoder as ZstdDecoder, Encoder as ZstdEncoder};

/// The default buffer size when creating buffered readers/writers
const BUFFER_SIZE: usize = 64 * 1024;

/// The set of file extensions to treat as FASTQ, GZIPPED, or ZSTD
const FASTQ_EXTENSIONS: [&str; 2] = ["fastq", "fq"];
const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"];
const ZSTD_EXTENSIONS: [&str; 1] = ["zst"];

/// Unit-struct that contains associated functions for reading and writing Structs to/from
/// unstructured files.
pub struct Io {
Expand All @@ -78,20 +81,7 @@ impl Io {
Io { compression: flate2::Compression::new(compression), buffer_size }
}

/// Returns true if the path ends with a recognized GZIP file extension
fn is_gzip_path<P: AsRef<Path>>(p: &P) -> bool {
if let Some(ext) = p.as_ref().extension() {
match ext.to_str() {
Some(x) => GZIP_EXTENSIONS.contains(&x),
None => false,
}
} else {
false
}
}

/// Opens a file for reading. Transparently handles reading gzipped files based
/// extension.
/// Opens a file for reading. Transparently handles decoding gzip and zstd files.
pub fn new_reader<P>(&self, p: &P) -> Result<Box<dyn BufRead + Send>>
where
P: AsRef<Path>,
Expand All @@ -101,20 +91,26 @@ impl Io {

if Self::is_gzip_path(p) {
Ok(Box::new(BufReader::with_capacity(self.buffer_size, MultiGzDecoder::new(buf))))
} else if Self::is_zstd_path(p) {
Ok(Box::new(BufReader::with_capacity(
self.buffer_size,
ZstdDecoder::new(buf).map_err(FgError::IoError)?,
)))
} else {
Ok(Box::new(buf))
}
}

/// Opens a file for writing. Transparently handles writing GZIP'd data if the file
/// ends with a recognized GZIP extension.
/// Opens a file for writing. Transparently handles encoding data in gzip and zstd formats.
pub fn new_writer<P>(&self, p: &P) -> Result<BufWriter<Box<dyn Write + Send>>>
where
P: AsRef<Path>,
{
let file = File::create(p).map_err(FgError::IoError)?;
let write: Box<dyn Write + Send> = if Io::is_gzip_path(p) {
Box::new(GzEncoder::new(file, self.compression))
} else if Io::is_zstd_path(p) {
Box::new(ZstdEncoder::new(file, 0).map_err(FgError::IoError)?.auto_finish())
} else {
Box::new(file)
};
Expand Down Expand Up @@ -150,6 +146,36 @@ impl Io {

out.flush().map_err(FgError::IoError)
}

/// Returns true if the path ends with a recognized file extension
fn is_path_with_extension<P: AsRef<Path>, const N: usize>(
p: &P,
extensions: [&str; N],
) -> bool {
if let Some(ext) = p.as_ref().extension() {
match ext.to_str() {
Some(x) => extensions.contains(&x),
None => false,
}
} else {
false
}
}

/// Returns true if the path ends with a recognized FASTQ file extension
pub fn is_fastq_path<P: AsRef<Path>>(p: &P) -> bool {
Self::is_path_with_extension(p, FASTQ_EXTENSIONS)
}

/// Returns true if the path ends with a recognized GZIP file extension
pub fn is_gzip_path<P: AsRef<Path>>(p: &P) -> bool {
Self::is_path_with_extension(p, GZIP_EXTENSIONS)
}

/// Returns true if the path ends with a recognized ZSTD file extension
pub fn is_zstd_path<P: AsRef<Path>>(p: &P) -> bool {
Self::is_path_with_extension(p, ZSTD_EXTENSIONS)
}
}

/// Unit-struct that contains associated functions for reading and writing Structs to/from
Expand Down Expand Up @@ -261,6 +287,7 @@ impl DelimFile {
#[cfg(test)]
mod tests {
use crate::io::{DelimFile, Io};
use rstest::rstest;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;

Expand Down Expand Up @@ -313,6 +340,30 @@ mod tests {
assert_ne!(text.metadata().unwrap().len(), gzipped.metadata().unwrap().len());
}

#[test]
fn test_reading_and_writing_zstd_files() {
let lines = vec!["foo", "bar", "baz"];
let tempdir = TempDir::new().unwrap();
let text = tempdir.path().join("text.txt");
let zstd_compressed = tempdir.path().join("zstd_compressed.txt.zst");
kockan marked this conversation as resolved.
Show resolved Hide resolved

assert_eq!(Io::is_zstd_path(&text), false);
assert_eq!(Io::is_zstd_path(&zstd_compressed), true);

let io = Io::default();
io.write_lines(&text, &mut lines.iter()).unwrap();
io.write_lines(&zstd_compressed, &mut lines.iter()).unwrap();

let r1 = io.read_lines(&text).unwrap();
let r2 = io.read_lines(&zstd_compressed).unwrap();

assert_eq!(r1, lines);
assert_eq!(r2, lines);

// Check whether the two files are different
assert_ne!(text.metadata().unwrap().len(), zstd_compressed.metadata().unwrap().len());
kockan marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn test_reading_and_writing_empty_delim_file() {
let recs: Vec<Rec> = vec![];
Expand Down Expand Up @@ -349,4 +400,51 @@ mod tests {
assert_eq!(from_csv, recs);
assert_eq!(from_tsv, recs);
}

// ############################################################################################
// Tests is_gzip_path()
// ############################################################################################

#[rstest]
#[case("test_fastq.fq.gz", true)] // .fq.gz is valid gzip
#[case("test_fastq.fq.bgz", true)] // .fq.bgz is valid gzip
#[case("test_fastq.fq.tar", false)] // .fq.tar is invalid gzip
fn test_is_gzip_path(#[case] file_name: &str, #[case] expected: bool) {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join(file_name);
let result = Io::is_gzip_path(&file_path);
assert_eq!(result, expected);
}

// ############################################################################################
// Tests is_zstd_path()
// ############################################################################################

#[rstest]
#[case("test_fastq.fq", false)] // .fq is invalid zstd
#[case("test_fastq.fq.gz", false)] // .fq.gz is invalid zstd
#[case("test_fastq.fq.bgz", false)] // .fq.bgz is invalid zstd
#[case("test_fastq.fq.tar", false)] // .fq.tar is invalid zstd
#[case("test_fastq.fq.zst", true)] // .fq.zst is valid zstd
fn test_is_zstd_path(#[case] file_name: &str, #[case] expected: bool) {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join(file_name);
let result = Io::is_zstd_path(&file_path);
assert_eq!(result, expected);
}

// ############################################################################################
// Tests is_fastq_path()
// ############################################################################################

#[rstest]
#[case("test_fastq.fq", true)] // .fq is valid fastq
#[case("test_fastq.fastq", true)] // .fastq is valid fastq
#[case("test_fastq.sam", false)] // .sam is invalid fastq
fn test_is_fastq_path(#[case] file_name: &str, #[case] expected: bool) {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join(file_name);
let result = Io::is_fastq_path(&file_path);
assert_eq!(result, expected);
}
}