From 01d155966f5b554e7a0088f2dcffeb52e0ac9961 Mon Sep 17 00:00:00 2001 From: jabadji Date: Wed, 21 Sep 2022 17:41:56 +0200 Subject: [PATCH 1/3] feat(zstd): begin impl zstandard compression --- Cargo.lock | 48 ++++++++++++++++++++ Cargo.toml | 8 +++- src/error.rs | 1 + src/impls/oscar_doc/compress.rs | 65 +++++++++++++++++++++++++++ src/impls/oscar_doc/filter_tags.rs | 8 +--- src/impls/oscar_doc/mod.rs | 1 + src/impls/oscar_doc/oscar_doc.rs | 63 ++------------------------ src/ops/checksum.rs | 2 +- src/ops/compress.rs | 71 +++++++++++++++++++++++++----- src/ops/sampling.rs | 2 +- 10 files changed, 187 insertions(+), 82 deletions(-) create mode 100644 src/impls/oscar_doc/compress.rs diff --git a/Cargo.lock b/Cargo.lock index 20239dd..228d94e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -122,6 +122,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -550,6 +559,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -726,6 +744,7 @@ dependencies = [ "serde_json", "sha2", "tempfile", + "zstd", ] [[package]] @@ -1388,3 +1407,32 @@ dependencies = [ "syn", "synstructure", ] + +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" +dependencies = [ + "cc", + "libc", +] diff --git a/Cargo.toml b/Cargo.toml index 02a28b0..37790ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,10 @@ authors = ["Pedro J. Ortiz "] edition = "2021" name = "oscar-tools" version = "0.1.0" + +[features] +zstd = ["dep:zstd"] + [dependencies] env_logger = "0.9.0" flate2 = "1.0.22" @@ -14,6 +18,7 @@ rayon = "1.5.1" runiq-lib = "1.2.2" serde_json = "1.0.78" sha2 = "0.10.1" +zstd = {version="0.11.2", optional=true} [dependencies.clap] features = ["derive"] @@ -24,4 +29,5 @@ oscar-io = "0.1.3" tempfile = "3.3.0" [profile.release] -debug = true \ No newline at end of file +debug = true + diff --git a/src/error.rs b/src/error.rs index d041f4e..5b5c307 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ //! Errors + #[derive(Debug)] #[cfg(not(tarpaulin_include))] pub enum Error { diff --git a/src/impls/oscar_doc/compress.rs b/src/impls/oscar_doc/compress.rs new file mode 100644 index 0000000..5cc4158 --- /dev/null +++ b/src/impls/oscar_doc/compress.rs @@ -0,0 +1,65 @@ +use std::path::PathBuf; + +use clap::{arg, ArgMatches}; + +use crate::{cli::Command, error::Error, ops::Compress}; + +/// internal struct for compression op implementation +pub struct CompressDoc; +impl Compress for CompressDoc {} +impl Command for CompressDoc { + fn subcommand() -> clap::App<'static> + where + Self: Sized, + { + clap::App::new("compress") + .about("Compress provided file and/or files in provided folder, up to a depth of 2.") + .long_about("Compression of corpus files and folders. + +This command can be used to compress a single file (by specifying a source and destination file path) or a set of files (by specifying a source and destination folder path). + +If a file path is specified, oscar-tools will compress the given file and write it in the destination file path. +If a folder is specified, oscar-tools will compress files in subfolders and write the compressed files in the destination folder path. + +Only one thread is used if a file is provided. If a folder is provided, takes all threads available. Use -J to specify a different number of threads. + +Only provide a folder (resp. file) as a destination if a folder (resp. file) has been provided. +") + .arg(arg!([SOURCE] "File/folder to compress. If a folder is provided, keeps arborescence and compresses up to a depth of 2.").required(true)) + .arg(arg!([DESTINATION] "File/folder to write to.").required(true)) + .arg(arg!(--del_src "If set, deletes source files as they are being compressed.").required(false)) + .arg(arg!(--compression "Compression to use (gzip, zstd)").required(false).default_value("zstd")) + .arg(arg!(-J --num_threads "Number of threads to use (iif source is a folder). If 0, take all available").default_value("0").required(false)) + } + + fn run(matches: &ArgMatches) -> Result<(), Error> + where + Self: Sized, + { + let src: PathBuf = matches + .value_of("SOURCE") + .expect("Value of 'SOURCE' is required.") + .into(); + let dst: PathBuf = matches + .value_of("DESTINATION") + .expect("Value of 'DESTINATION' is required.") + .into(); + let del_src = matches.is_present("del_src"); + let compression = matches.value_of("compression").unwrap(); + let num_threads: usize = matches + .value_of("num_threads") + .unwrap() + .parse() + .expect("'num_threads' has to be a number."); + if src.is_file() { + CompressDoc::compress_file(&src, &dst, del_src, compression)?; + } else if src.is_dir() { + CompressDoc::compress_corpus(&src, &dst, del_src, compression, num_threads)?; + } else { + return Err( + std::io::Error::new(std::io::ErrorKind::NotFound, format!("{:?}", src)).into(), + ); + } + Ok(()) + } +} diff --git a/src/impls/oscar_doc/filter_tags.rs b/src/impls/oscar_doc/filter_tags.rs index 571c28a..9689a98 100644 --- a/src/impls/oscar_doc/filter_tags.rs +++ b/src/impls/oscar_doc/filter_tags.rs @@ -1,12 +1,7 @@ /*! The goal is to filter the documents based on the annotation ["short s", "header"] * take a document */ -use std::{ - borrow::Cow, - collections::HashSet, - fs::File, - io::{BufWriter}, -}; +use std::{borrow::Cow, collections::HashSet, fs::File, io::BufWriter}; use oscar_io::oscar_doc::{Document, SplitFolderReader, Writer}; @@ -195,7 +190,6 @@ mod test { lang::Lang, oscar_doc::{Document, Metadata, Reader, Writer}, }; - use super::FilterTagDoc; diff --git a/src/impls/oscar_doc/mod.rs b/src/impls/oscar_doc/mod.rs index becb37d..468fa58 100644 --- a/src/impls/oscar_doc/mod.rs +++ b/src/impls/oscar_doc/mod.rs @@ -1,4 +1,5 @@ /*! OSCAR v2 (22.01) operation implementations!*/ +mod compress; mod filter_tags; mod oscar_doc; pub(crate) use oscar_doc::*; diff --git a/src/impls/oscar_doc/oscar_doc.rs b/src/impls/oscar_doc/oscar_doc.rs index 14663ae..ddc9b2e 100644 --- a/src/impls/oscar_doc/oscar_doc.rs +++ b/src/impls/oscar_doc/oscar_doc.rs @@ -1,6 +1,7 @@ //! OSCAR Schema v2 (See [oscar-corpus.com](https://oscar-corpus.com)) operation implementations. //! //! Implementations mostly use default trait implementations, as the format is simple. +use crate::impls::oscar_doc::compress::CompressDoc; use crate::ops::FilterTags; use crate::{ cli::Command, @@ -228,64 +229,6 @@ if SOURCE is a folder, DESTINATION must be an empty folder. Subfolders will be c } } -/// internal struct for compression op implementation -struct CompressDoc; -impl Compress for CompressDoc {} -impl Command for CompressDoc { - fn subcommand() -> clap::App<'static> - where - Self: Sized, - { - clap::App::new("compress") - .about("Compress provided file and/or files in provided folder, up to a depth of 2.") - .long_about("Compression of corpus files and folders. - -This command can be used to compress a single file (by specifying a source and destination file path) or a set of files (by specifying a source and destination folder path). - -If a file path is specified, oscar-tools will compress the given file and write it in the destination file path. -If a folder is specified, oscar-tools will compress files in subfolders and write the compressed files in the destination folder path. - -Only one thread is used if a file is provided. If a folder is provided, takes all threads available. Use -J to specify a different number of threads. - -Only provide a folder (resp. file) as a destination if a folder (resp. file) has been provided. -") - .arg(arg!([SOURCE] "File/folder to compress. If a folder is provided, keeps arborescence and compresses up to a depth of 2.").required(true)) - .arg(arg!([DESTINATION] "File/folder to write to.").required(true)) - .arg(arg!(--del_src "If set, deletes source files as they are being compressed.").required(false)) - .arg(arg!(-J --num_threads "Number of threads to use (iif source is a folder). If 0, take all available").default_value("0").required(false)) - } - - fn run(matches: &ArgMatches) -> Result<(), Error> - where - Self: Sized, - { - let src: PathBuf = matches - .value_of("SOURCE") - .expect("Value of 'SOURCE' is required.") - .into(); - let dst: PathBuf = matches - .value_of("DESTINATION") - .expect("Value of 'DESTINATION' is required.") - .into(); - let del_src = matches.is_present("del_src"); - let num_threads: usize = matches - .value_of("num_threads") - .unwrap() - .parse() - .expect("'num_threads' has to be a number."); - if src.is_file() { - CompressDoc::compress_file(&src, &dst, del_src)?; - } else if src.is_dir() { - CompressDoc::compress_corpus(&src, &dst, del_src, num_threads)?; - } else { - return Err( - std::io::Error::new(std::io::ErrorKind::NotFound, format!("{:?}", src)).into(), - ); - } - Ok(()) - } -} - /// impl block for helper functions related to [ExtractText]. //TODO: move into a proper op impl OscarDoc { @@ -333,8 +276,8 @@ impl OscarDoc { #[cfg(test)] mod tests { - use super::CompressDoc; use super::SplitDoc; + use crate::impls::oscar_doc::compress::CompressDoc; use crate::ops::Split; use crate::{impls::OscarDoc, ops::Compress}; use std::{ @@ -470,7 +413,7 @@ quux // create destination path and compress let tmpdst = tempfile::tempdir().unwrap(); - CompressDoc::compress_folder(tmpdir.path(), tmpdst.path(), false).unwrap(); + CompressDoc::compress_folder(tmpdir.path(), tmpdst.path(), false, "gzip").unwrap(); println!( "{:?}", diff --git a/src/ops/checksum.rs b/src/ops/checksum.rs index 8ed4a59..2d89f55 100644 --- a/src/ops/checksum.rs +++ b/src/ops/checksum.rs @@ -129,7 +129,7 @@ pub trait Checksum { #[cfg(test)] mod tests { use sha2::Digest; - + use std::fs::File; use std::io::Write; use std::path::PathBuf; diff --git a/src/ops/compress.rs b/src/ops/compress.rs index 440ae4c..f65effe 100644 --- a/src/ops/compress.rs +++ b/src/ops/compress.rs @@ -15,7 +15,12 @@ pub trait Compress { /// If `del_src` is set to `true`, removes the file at `src` upon compression completion. /// /// `src` has to exist and be a file, and `dst` should not exist. - fn compress_file(src: &Path, dst: &Path, del_src: bool) -> Result<(), Error> { + fn compress_file( + src: &Path, + dst: &Path, + del_src: bool, + compression: &str, + ) -> Result<(), Error> { if !src.is_file() { warn!("{:?} is not a file: ignoring", src); return Ok(()); @@ -25,15 +30,23 @@ pub trait Compress { // gen filename let filename = src.file_name().unwrap(); let mut dst: PathBuf = [dst.as_os_str(), filename].iter().collect(); - if let Some(ext) = dst.extension() { //TODO remove unwrapping here let extension = String::from(ext.to_str().unwrap()); - if extension == ".gz" { + if extension == ".gz" || extension == ".zst" { warn!("{:?} is already compressed! Skipping.", dst); return Ok(()); } - dst.set_extension(extension + ".gz"); + + match compression { + "gzip" => dst.set_extension(extension + "gz"), + "zstd" => dst.set_extension(extension + "zst"), + _ => { + return Err(Error::Custom(format!( + "Compression {compression} not supported." + ))) + } + }; } else { warn!("File {0:?} has no extension! Fallback to {0:?}.txt.gz", dst); let extension = "txt.gz"; @@ -50,7 +63,7 @@ pub trait Compress { .into()); } let mut dest_file = File::create(dst)?; - compress(&mut dest_file, src_file)?; + compress(&mut dest_file, src_file, compression)?; if del_src { info!("removing {:?}", src); @@ -64,7 +77,12 @@ pub trait Compress { /// If `del_src` is set to `true`, removes the compressed files at `src` upon compression completion. /// The compression is only done at depth=1. /// `src` has to exist and be a file, and `dst` should not exist. - fn compress_folder(src: &Path, dst: &Path, del_src: bool) -> Result<(), Error> { + fn compress_folder( + src: &Path, + dst: &Path, + del_src: bool, + compression: &str, + ) -> Result<(), Error> { //TODO: read dir // if file, error+ignore // if dir, read dir @@ -83,7 +101,7 @@ pub trait Compress { } // construct vector of errors let errors: Vec = files_to_compress - .filter_map(|filepath| Self::compress_file(&filepath, dst, del_src).err()) + .filter_map(|filepath| Self::compress_file(&filepath, dst, del_src, compression).err()) .collect(); if !errors.is_empty() { @@ -99,6 +117,7 @@ pub trait Compress { src: &Path, dst: &Path, del_src: bool, + compression: &str, num_threads: usize, ) -> Result<(), Error> { if num_threads != 1 { @@ -129,7 +148,7 @@ pub trait Compress { // transform source + language // into dest + language - Self::compress_folder(&language_dir, &dst_folder, del_src) + Self::compress_folder(&language_dir, &dst_folder, del_src, compression) }) .collect(); for result in results.into_iter().filter(|r| r.is_err()) { @@ -142,7 +161,19 @@ pub trait Compress { /// Compress a reader into a writer. /// Consumes the whole reader. // TODO: should it be inside the compress trait? -fn compress(dest_file: &mut impl Write, r: T) -> Result<(), Error> { +// TODO: merge compress_gzip and compress_zstd? +fn compress(dest_file: &mut impl Write, r: T, compression: &str) -> Result<(), Error> { + match compression { + "gzip" => compress_gzip(dest_file, r)?, + #[cfg(feature="zstd")] + "zstd" => compress_zstd(dest_file, r)?, + _ => panic!("Unsupported compression method. If you have selected `zstd`, be sure to have enabled the feature."), + }; + Ok(()) +} + +/// compress using GZip +fn compress_gzip(dest_file: &mut impl Write, r: T) -> Result<(), Error> { let mut b = BufReader::new(r); let mut enc = GzEncoder::new(dest_file, Compression::default()); let mut length = 1; @@ -156,6 +187,22 @@ fn compress(dest_file: &mut impl Write, r: T) -> Result<(), Error> { Ok(()) } +/// compress using zstd +#[cfg(feature = "zstd")] +fn compress_zstd(dest_file: &mut impl Write, r: T) -> Result<(), Error> { + let mut b = BufReader::new(r); + let mut enc = zstd::Encoder::new(dest_file, 0)?; + let mut length = 1; + while length > 0 { + let buffer = b.fill_buf()?; + enc.write_all(buffer)?; + length = buffer.len(); + b.consume(length); + } + enc.do_finish()?; + Ok(()) +} + #[cfg(test)] mod test { use std::{fs::File, io::Read, io::Write}; @@ -171,7 +218,7 @@ mod test { // create content and compress let content = "foo"; let mut compressed = Vec::new(); - compress(&mut compressed, content.as_bytes()).unwrap(); + compress(&mut compressed, content.as_bytes(), "gzip").unwrap(); let mut reader = flate2::read::GzDecoder::new(&*compressed); let mut decompressed = String::with_capacity(content.len()); @@ -187,7 +234,7 @@ mod test { let src = tempfile::NamedTempFile::new().unwrap(); let dst = tempfile::NamedTempFile::new().unwrap(); - match Dummy::compress_file(src.path(), dst.path(), false).err() { + match Dummy::compress_file(src.path(), dst.path(), false, "gzip").err() { None => panic!("Should fail!"), Some(error) => match error { crate::error::Error::Io(_) => { @@ -214,7 +261,7 @@ mod test { dst.set_extension("txt.gz"); File::create(&dst).unwrap(); - match Dummy::compress_file(&src, dir.path(), false).err() { + match Dummy::compress_file(&src, dir.path(), false, "gzip").err() { None => panic!("Should fail!"), Some(error) => match error { crate::error::Error::Io(error) => { diff --git a/src/ops/sampling.rs b/src/ops/sampling.rs index 3486cd2..bbbd162 100644 --- a/src/ops/sampling.rs +++ b/src/ops/sampling.rs @@ -1,4 +1,4 @@ -use std::{path::Path}; +use std::path::Path; use crate::error::Error; From 788fdfdb8de5f35d2fc74205ed32e0a1361960ef Mon Sep 17 00:00:00 2001 From: jabadji Date: Thu, 22 Sep 2022 11:22:53 +0200 Subject: [PATCH 2/3] feat(zstd): feature-gate zstd match also fix a tiny filename bug --- src/ops/compress.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/ops/compress.rs b/src/ops/compress.rs index f65effe..924192d 100644 --- a/src/ops/compress.rs +++ b/src/ops/compress.rs @@ -10,6 +10,8 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use crate::error::Error; +const COMPRESSED_FILE_EXTS: [&'static str; 2] = ["gz", "zst"]; + pub trait Compress { /// Compress a file located at `src` to `dst`. /// If `del_src` is set to `true`, removes the file at `src` upon compression completion. @@ -33,14 +35,15 @@ pub trait Compress { if let Some(ext) = dst.extension() { //TODO remove unwrapping here let extension = String::from(ext.to_str().unwrap()); - if extension == ".gz" || extension == ".zst" { + if COMPRESSED_FILE_EXTS.contains(&extension.as_str()) { warn!("{:?} is already compressed! Skipping.", dst); return Ok(()); } match compression { - "gzip" => dst.set_extension(extension + "gz"), - "zstd" => dst.set_extension(extension + "zst"), + "gzip" => dst.set_extension(extension + ".gz"), + #[cfg(feature = "zstd")] + "zstd" => dst.set_extension(extension + ".zst"), _ => { return Err(Error::Custom(format!( "Compression {compression} not supported." @@ -48,8 +51,11 @@ pub trait Compress { } }; } else { - warn!("File {0:?} has no extension! Fallback to {0:?}.txt.gz", dst); - let extension = "txt.gz"; + warn!( + "File {:?} has no extension! Fallback to {0:?}.txt.{compression}", + dst + ); + let extension = format!("txt.{compression}"); dst.set_extension(extension); } From d26ca68ee4cd847cf1774178e2b2a40bfcca8026 Mon Sep 17 00:00:00 2001 From: jabadji Date: Fri, 23 Sep 2022 16:14:52 +0200 Subject: [PATCH 3/3] test(zstd): add missing test for zstd and add feature to CI --- .github/workflows/rust.yml | 31 +++++++++++++++---------------- src/ops/compress.rs | 20 ++++++++++++++++++-- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 5f8a58b..0ab0253 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -2,29 +2,28 @@ name: Rust on: push: - branches: [ master, dev ] + branches: [master, dev] pull_request: - branches: [ master, dev ] + branches: [master, dev] env: CARGO_TERM_COLOR: always jobs: build: - runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Build - run: cargo build --verbose - - - name: Run tests - run: cargo test --verbose - - - name: Run cargo-tarpaulin - uses: actions-rs/tarpaulin@v0.1 - continue-on-error: true - - - name: Upload to codecov.io - uses: codecov/codecov-action@v1 + - uses: actions/checkout@v2 + - name: Build + run: cargo build --verbose + + - name: Run tests + run: cargo test --verbose --features zstd + + - name: Run cargo-tarpaulin + uses: actions-rs/tarpaulin@v0.1 + continue-on-error: true + + - name: Upload to codecov.io + uses: codecov/codecov-action@v1 diff --git a/src/ops/compress.rs b/src/ops/compress.rs index 924192d..56dee41 100644 --- a/src/ops/compress.rs +++ b/src/ops/compress.rs @@ -211,11 +211,15 @@ fn compress_zstd(dest_file: &mut impl Write, r: T) -> Result<(), Error> #[cfg(test)] mod test { - use std::{fs::File, io::Read, io::Write}; + use std::{ + fs::File, + io::Write, + io::{Cursor, Read}, + }; use tempfile::tempdir; - use crate::ops::Compress; + use crate::ops::{compress::compress_zstd, Compress}; use super::compress; @@ -232,6 +236,18 @@ mod test { assert_eq!(content, decompressed); } + #[test] + fn test_compress_ztd() { + // create content and compress + let content = "foo"; + let mut compressed = Vec::new(); + compress(&mut compressed, content.as_bytes(), "zstd").unwrap(); + + let compressed_cursor = Cursor::new(compressed); + let decompressed = zstd::decode_all(compressed_cursor).unwrap(); + let decompressed = String::from_utf8(decompressed).unwrap(); + assert_eq!(content, decompressed); + } #[test] fn test_dst_not_directory() { struct Dummy;