Skip to content

Commit

Permalink
[load] handle gzip cases (#10)
Browse files Browse the repository at this point in the history
* handle gzip and exceptions more efficiently

* wip

* unzip archives on the fly, and patch know manifest errors in archive

* fmt

* fix

* patch scan

* clippy

* add fixtures for gz bug

* patch manifest location

* display

* patch unzip paths

* patch glob pattern

* clean

* patch scan dirs test

---------

Co-authored-by: Reginald Tempo <[email protected]>
Co-authored-by: Reginald Fitz Forte <[email protected]>
Co-authored-by: Beauregard MacDiminuendo <[email protected]>
Co-authored-by: Vale LeBrock <[email protected]>
Co-authored-by: Gabi O'Bittern <gabi_o'[email protected]>
Co-authored-by: Lucietta O'Hind <lucietta_o'[email protected]>
Co-authored-by: Beauregard Polecat <[email protected]>
Co-authored-by: Nella Fitz Mezzo <[email protected]>
Co-authored-by: xyz <xyz>
Co-authored-by: Gianna Saint Hart <[email protected]>
Co-authored-by: Rupert O'Lento <rupert_o'[email protected]>
Co-authored-by: Mariella Ritardando <[email protected]>
Co-authored-by: Isa Sforzando <[email protected]>
Co-authored-by: Sandra De Staccato <[email protected]>
  • Loading branch information
14 people authored Jan 19, 2025
1 parent d1fa113 commit d212934
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 33 deletions.
13 changes: 9 additions & 4 deletions src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
load_tx_cypher,
queue::{self, clear_queue, push_queue_from_archive_map},
scan::{ArchiveMap, ManifestInfo},
unzip_temp,
};

use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -69,6 +70,9 @@ pub async fn try_load_one_archive(
pool: &Graph,
batch_size: usize,
) -> Result<BatchTxReturn> {
info!("checking if we need to decompress");
let (archive_path, temp) = unzip_temp::maybe_handle_gz(&man.archive_dir)?;

let mut all_results = BatchTxReturn::new();
match man.contents {
crate::scan::BundleContent::Unknown => todo!(),
Expand All @@ -78,23 +82,24 @@ pub async fn try_load_one_archive(
error!("no framework version detected");
bail!("could not load archive from manifest");
}
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?,
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&archive_path).await?,
crate::scan::FrameworkVersion::V6 => {
extract_current_snapshot(&man.archive_dir).await?
extract_current_snapshot(&archive_path).await?
}
crate::scan::FrameworkVersion::V7 => {
extract_current_snapshot(&man.archive_dir).await?
extract_current_snapshot(&archive_path).await?
}
};
snapshot_batch(&snaps, pool, batch_size, &man.archive_id).await?;
}
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&man.archive_dir).await?;
let (txs, _) = extract_current_transactions(&archive_path).await?;
let batch_res =
load_tx_cypher::tx_batch(&txs, pool, batch_size, &man.archive_id).await?;
all_results.increment(&batch_res);
}
crate::scan::BundleContent::EpochEnding => todo!(),
}
drop(temp);
Ok(all_results)
}
11 changes: 3 additions & 8 deletions src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ impl ManifestInfo {
BundleContent::Unknown => return FrameworkVersion::Unknown,
BundleContent::StateSnapshot => {
let man_path = self.archive_dir.join(self.contents.filename());
dbg!(&man_path);

// first check if the v7 manifest will parse
if let Ok(_bak) = load_snapshot_manifest(&man_path) {
self.version = FrameworkVersion::V7;
Expand Down Expand Up @@ -100,28 +98,25 @@ pub fn scan_dir_archive(
content_opt: Option<BundleContent>,
) -> Result<ArchiveMap> {
let path = parent_dir.canonicalize()?;
// filenames may be in .gz format
let filename = content_opt.unwrap_or(BundleContent::Unknown).filename();
dbg!(&filename);
let pattern = format!(
"{}/**/{}",
"{}/**/{}*", // also try .gz
path.to_str().context("cannot parse starting dir")?,
filename,
);

dbg!(&pattern);

let mut archive = BTreeMap::new();

for entry in glob(&pattern)? {
dbg!(&entry);
match entry {
Ok(manifest_path) => {
let dir = manifest_path
.parent()
.context("no parent dir found")?
.to_owned();
let contents = test_content(&manifest_path);
dbg!(&contents);

let archive_id = dir.file_name().unwrap().to_str().unwrap().to_owned();
let mut m = ManifestInfo {
archive_dir: dir.clone(),
Expand Down
97 changes: 80 additions & 17 deletions src/unzip_temp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,15 @@ use anyhow::{Context, Result};
use diem_temppath::TempPath;
use flate2::read::GzDecoder;
use glob::glob;
use libra_storage::read_tx_chunk::load_tx_chunk_manifest;
use log::{debug, info, warn};
use std::{
fs::File,
io::copy,
path::{Path, PathBuf},
};
use tar::Archive;

// TODO: decompress the files on demand, and don't take up the disk space

// take a single archive file, and get the temp location of the unzipped file
// NOTE: you must return the TempPath to the caller so otherwise when it
// drops out of scope the files will be deleted, this is intentional.
pub fn make_temp_unzipped(archive_file: &Path, tar_opt: bool) -> Result<(PathBuf, TempPath)> {
let temp_dir = TempPath::new();
temp_dir.create_as_dir()?;

let path = decompress_file(archive_file, temp_dir.path(), tar_opt)?;

Ok((path, temp_dir))
}

/// Decompresses a gzip-compressed file at `src_path` and saves the decompressed contents
/// to `dst_dir` with the same file name, but without the `.gz` extension.
fn decompress_file(src_path: &Path, dst_dir: &Path, tar_opt: bool) -> Result<PathBuf> {
Expand Down Expand Up @@ -78,8 +66,9 @@ pub fn decompress_tar_archive(src_path: &Path, dst_dir: &Path) -> Result<()> {
}

/// Unzip all .gz files into the same directory
/// Warning: this will take up a lot of disk space, should not be used in production
pub fn decompress_all_gz(parent_dir: &Path) -> Result<()> {
/// Warning: this will take up a lot of disk space, should not be used in production for all files. Use for on the fly decompression.
/// NOTE: Not for tarballs
pub fn decompress_all_gz(parent_dir: &Path, dst_dir: &Path) -> Result<()> {
let path = parent_dir.canonicalize()?;

let pattern = format!(
Expand All @@ -90,7 +79,7 @@ pub fn decompress_all_gz(parent_dir: &Path) -> Result<()> {
for entry in glob(&pattern)? {
match entry {
Ok(src_path) => {
let _ = decompress_file(&src_path, src_path.parent().unwrap(), false);
let _ = decompress_file(&src_path, dst_dir, false);
}
Err(e) => {
println!("{:?}", e);
Expand All @@ -99,3 +88,77 @@ pub fn decompress_all_gz(parent_dir: &Path) -> Result<()> {
}
Ok(())
}

// The manifest file might have written as .gz, when then should not be.
// TODO: Deprecate when archives sources fixed (currently some epochs in V7 broken for epochs in Jan 2025)
fn maybe_fix_manifest(archive_path: &Path) -> Result<()> {
let pattern = format!("{}/**/*.manifest", archive_path.display());
for manifest_path in glob(&pattern)?.flatten() {
let mut manifest = load_tx_chunk_manifest(&manifest_path)?;
debug!("old manifest:\n{:#}", &serde_json::to_string(&manifest)?);

manifest.chunks.iter_mut().for_each(|e| {
if e.proof.contains(".gz") {
e.proof = e.proof.trim_end_matches(".gz").to_string();
}
if e.transactions.contains(".gz") {
e.transactions = e.transactions.trim_end_matches(".gz").to_string();
}
});
let literal = serde_json::to_string(&manifest)?;

warn!(
"rewriting .manifest file to remove .gz paths, {}, {:#}",
manifest_path.display(),
&literal
);
std::fs::write(&manifest_path, literal.as_bytes())?;
}
Ok(())
}

/// If we are using this tool with .gz files, we will unzip on the fly
/// If the user prefers to not do on the fly, then they need to update
/// their workflow to `gunzip -r` before starting this.
pub fn maybe_handle_gz(archive_path: &Path) -> Result<(PathBuf, Option<TempPath>)> {
// maybe stuff isn't unzipped yet
let pattern = format!("{}/*.*.gz", archive_path.display());
if glob(&pattern)?.count() > 0 {
info!("Decompressing a temp folder. If you do not want to decompress files on the fly (which are not saved), then you workflow to do a `gunzip -r` before starting this.");
let temp_dir = TempPath::new();
temp_dir.create_as_dir()?;
// need to preserve the parent dir name in temp, since the manifest files reference it.
let dir_name = archive_path.file_name().unwrap().to_str().unwrap();
let new_archive_path = temp_dir.path().join(dir_name);
std::fs::create_dir_all(&new_archive_path)?;
decompress_all_gz(archive_path, &new_archive_path)?;
// fix the manifest in the TEMP path
maybe_fix_manifest(temp_dir.path())?;
return Ok((new_archive_path, Some(temp_dir)));
}
// maybe the user unzipped the files

let pattern = format!("{}/*.chunk", archive_path.display());
assert!(
glob(&pattern)?.count() > 0,
"are you sure you decompressed everything here?"
);
maybe_fix_manifest(archive_path)?;

Ok((archive_path.to_path_buf(), None))
}

// take a single archive file, and get the temp location of the unzipped file
// NOTE: you must return the TempPath to the caller so otherwise when it
// drops out of scope the files will be deleted, this is intentional.
pub fn test_helper_temp_unzipped(
archive_file: &Path,
tar_opt: bool,
) -> Result<(PathBuf, TempPath)> {
let temp_dir = TempPath::new();
temp_dir.create_as_dir()?;

let path = decompress_file(archive_file, temp_dir.path(), tar_opt)?;

Ok((path, temp_dir))
}
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions tests/fixtures/v7/transaction_95700001-.46cf/readme.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
has .gz in manifest bug
Binary file not shown.
7 changes: 4 additions & 3 deletions tests/test_scan_dirs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod support;

use anyhow::Result;
use libra_forensic_db::{scan::scan_dir_archive, unzip_temp::make_temp_unzipped};
use libra_forensic_db::{scan::scan_dir_archive, unzip_temp::test_helper_temp_unzipped};
use support::fixtures;

#[test]
Expand All @@ -24,7 +24,8 @@ fn test_scan_dir_for_v7_manifests() -> Result<()> {
let s = scan_dir_archive(&start_here, None)?;

let archives = s.0;
assert!(archives.len() == 3);
// TODO: clean up test fixtures. There are cases of .gz and decompressed data.
assert!(archives.len() == 7);

Ok(())
}
Expand All @@ -40,7 +41,7 @@ fn test_scan_dir_for_compressed_v7_manifests() -> Result<()> {
assert!(archives.0.iter().len() == 0);

// This time the scan should find readable files
let (_, unzipped_dir) = make_temp_unzipped(&start_here, false)?;
let (_, unzipped_dir) = test_helper_temp_unzipped(&start_here, false)?;

let archives = scan_dir_archive(unzipped_dir.path(), None)?;
assert!(archives.0.iter().len() > 0);
Expand Down
24 changes: 23 additions & 1 deletion tests/test_unzip.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
mod support;
use libra_forensic_db::unzip_temp;
use libra_storage::read_tx_chunk::load_tx_chunk_manifest;

#[ignore]
#[test]
fn test_unzip() {
let archive_path = support::fixtures::v7_tx_manifest_fixtures_path();
let (_, temp_unzipped_dir) = unzip_temp::make_temp_unzipped(&archive_path, false).unwrap();
let (_, temp_unzipped_dir) =
unzip_temp::test_helper_temp_unzipped(&archive_path, false).unwrap();

assert!(temp_unzipped_dir.path().exists());
assert!(temp_unzipped_dir
.path()
.join("transaction.manifest")
.exists())
}

#[tokio::test]
async fn test_extract_tx_with_gz_bug_from_archive() -> anyhow::Result<()> {
let fixture_path = support::fixtures::v7_tx_manifest_fixtures_path();
let fixture_path = fixture_path.parent().unwrap();

let (archive_path, temppath_opt) =
unzip_temp::maybe_handle_gz(&fixture_path.join("transaction_95700001-.46cf"))?;

let temp_unzipped = temppath_opt.unwrap();
assert!(temp_unzipped.path().exists());

let manifest = load_tx_chunk_manifest(&archive_path.join("transaction.manifest"))?;

let chunk_path = temp_unzipped.path().join(&manifest.chunks[0].transactions);

assert!(chunk_path.exists());

Ok(())
}

0 comments on commit d212934

Please sign in to comment.