Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ tower-http = { version = "0.6", features = ["compression-full", "sensitive-heade
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
unsigned-varint = { version = "0.8", features = ["codec"] }
unsigned-varint = { version = "0.8", features = ["codec", "futures"] }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["v4", "serde"] }
walkdir = "2"
Expand Down
1 change: 1 addition & 0 deletions docs/docs/users/reference/cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ generate_markdown_section "forest-tool" "archive sync-bucket"
generate_markdown_section "forest-tool" "db"
generate_markdown_section "forest-tool" "db stats"
generate_markdown_section "forest-tool" "db destroy"
generate_markdown_section "forest-tool" "db import"

generate_markdown_section "forest-tool" "car"
generate_markdown_section "forest-tool" "car concat"
Expand Down
5 changes: 1 addition & 4 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ pub async fn load_actor_bundles_from_path(
"Bundle file not found at {}",
bundle_path.as_ref().display()
);
let mut car_stream = CarStream::new(tokio::io::BufReader::new(
tokio::fs::File::open(bundle_path.as_ref()).await?,
))
.await?;
let mut car_stream = CarStream::new_from_path(bundle_path.as_ref()).await?;

// Validate the bundle
let roots = HashSet::from_iter(car_stream.header_v1.roots.iter());
Expand Down
5 changes: 1 addition & 4 deletions src/daemon/db_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,7 @@ async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()>
to = %to.display(),
"transcoding into forest car"
);
let car_stream = CarStream::new(tokio::io::BufReader::new(
tokio::fs::File::open(from).await?,
))
.await?;
let car_stream = CarStream::new_from_path(from).await?;
let roots = car_stream.header_v1.roots.clone();

let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
Expand Down
2 changes: 1 addition & 1 deletion src/db/car/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub type CacheKey = u64;
type FrameOffset = u64;

/// According to FRC-0108, v2 snapshots have exactly one root pointing to metadata
const V2_SNAPSHOT_ROOT_COUNT: usize = 1;
pub const V2_SNAPSHOT_ROOT_COUNT: usize = 1;

pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock<usize> = LazyLock::new(|| {
const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE";
Expand Down
6 changes: 1 addition & 5 deletions src/tool/subcommands/archive_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,11 +678,7 @@ async fn merge_f3_snapshot(filecoin: PathBuf, f3: PathBuf, output: PathBuf) -> a
let mut f3_data = File::open(f3)?;
let f3_cid = crate::f3::snapshot::get_f3_snapshot_cid(&mut f3_data)?;

let car_stream = CarStream::new(tokio::io::BufReader::new(
tokio::fs::File::open(&filecoin).await?,
))
.await?;

let car_stream = CarStream::new_from_path(&filecoin).await?;
let chain_head = car_stream.header_v1.roots.clone();

println!("f3 snapshot cid: {f3_cid}");
Expand Down
62 changes: 59 additions & 3 deletions src/tool/subcommands/db_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ use std::path::PathBuf;

use crate::cli::subcommands::prompt_confirm;
use crate::cli_shared::{chain_path, read_config};
use crate::db::db_engine::db_root;
use crate::db::BlockstoreWithWriteBuffer;
use crate::db::db_engine::{db_root, open_db};
use crate::networks::NetworkChain;
use crate::utils::db::car_stream::CarStream;
use clap::Subcommand;
use fvm_ipld_blockstore::Blockstore;
use indicatif::{ProgressBar, ProgressStyle};
use tokio_stream::StreamExt;
use tracing::error;

#[derive(Debug, Subcommand)]
Expand All @@ -33,13 +38,25 @@ pub enum DBCommands {
#[arg(long)]
chain: Option<NetworkChain>,
},
/// Import CAR files into the key-value store
Import {
/// Snapshot input paths. Supports `.car`, `.car.zst`, and `.forest.car.zst`.
#[arg(num_args = 1.., required = true)]
snapshot_files: Vec<PathBuf>,
/// Filecoin network chain
#[arg(long, required = true)]
chain: NetworkChain,
/// Optional path to the database folder that powers a Forest node
#[arg(long)]
db: Option<PathBuf>,
},
}

impl DBCommands {
pub async fn run(&self) -> anyhow::Result<()> {
pub async fn run(self) -> anyhow::Result<()> {
match self {
Self::Stats { config, chain } => {
use human_repr::HumanCount;
use human_repr::HumanCount as _;

let (_, config) = read_config(config.as_ref(), chain.clone())?;

Expand Down Expand Up @@ -80,6 +97,45 @@ impl DBCommands {
}
}
}
Self::Import {
snapshot_files,
chain,
db,
} => {
const DB_WRITE_BUFFER_CAPACITY: usize = 10000;

let db_root = if let Some(db) = db {
db
} else {
let (_, config) = read_config(None, Some(chain.clone()))?;
db_root(&chain_path(&config))?
};
println!("Opening parity-db at {}", db_root.display());
let db_writer = BlockstoreWithWriteBuffer::new_with_capacity(
open_db(db_root, &Default::default())?,
DB_WRITE_BUFFER_CAPACITY,
);

let pb = ProgressBar::new_spinner().with_style(
ProgressStyle::with_template("{spinner} {msg}")
.expect("indicatif template must be valid"),
);
pb.enable_steady_tick(std::time::Duration::from_millis(100));

let mut total = 0;
for snap in snapshot_files {
let mut car = CarStream::new_from_path(&snap).await?;
while let Some(b) = car.try_next().await? {
db_writer.put_keyed(&b.cid, &b.data)?;
total += 1;
let text = format!("{total} blocks imported");
pb.set_message(text);
}
}
drop(db_writer);
pb.finish();
Ok(())
}
}
}
}
46 changes: 43 additions & 3 deletions src/utils/db/car_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::chain::FilecoinSnapshotMetadata;
use crate::db::car::plain::read_v2_header;
use crate::utils::io::skip_bytes;
use crate::utils::multihash::prelude::*;
use async_compression::tokio::bufread::ZstdDecoder;
use bytes::{Buf, BufMut, Bytes, BytesMut};
Expand All @@ -14,6 +16,7 @@ use nunny::Vec as NonEmpty;
use pin_project_lite::pin_project;
use serde::{Deserialize, Serialize};
use std::io::{self, Cursor, Read, SeekFrom, Write};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{
Expand All @@ -22,6 +25,7 @@ use tokio::io::{
};
use tokio_util::codec::Encoder;
use tokio_util::codec::FramedRead;
use tokio_util::compat::TokioAsyncReadCompatExt as _;
use tokio_util::either::Either;
use unsigned_varint::codec::UviBytes;

Expand Down Expand Up @@ -164,8 +168,7 @@ impl<ReaderT: AsyncBufRead + Unpin> CarStream<ReaderT> {

// Skip v2 header bytes
if let Some(header_v2) = &header_v2 {
let mut to_skip = vec![0; header_v2.data_offset as usize];
reader.read_exact(&mut to_skip).await?;
reader = skip_bytes(reader, header_v2.data_offset as _).await?;
}

let max_car_v1_bytes = header_v2
Expand All @@ -187,11 +190,39 @@ impl<ReaderT: AsyncBufRead + Unpin> CarStream<ReaderT> {
"invalid first block",
));
}

let first_block = if header_v1.roots.len() == crate::db::car::V2_SNAPSHOT_ROOT_COUNT {
let maybe_metadata_cid = header_v1.roots.first();
if maybe_metadata_cid == &block.cid
&& let Ok(metadata) =
fvm_ipld_encoding::from_slice::<FilecoinSnapshotMetadata>(&block.data)
{
// Skip the F3 block in the block stream
if metadata.f3_data.is_some() {
// manipulate the inner reader directly because `reader.next()` is slow for skipping the large F3 block
let mut inner_reader_compat = reader.into_inner().compat();
let len = unsigned_varint::aio::read_usize(&mut inner_reader_compat)
.await
.map_err(io::Error::other)?;
let inner_reader =
skip_bytes(inner_reader_compat.into_inner(), len as _).await?;
reader = FramedRead::new(inner_reader, uvi_bytes());
}

// Skip the metadata block in the block stream
None
} else {
Some(block)
}
} else {
Some(block)
};

Ok(CarStream {
reader,
header_v1,
header_v2,
first_block: Some(block),
first_block,
})
} else {
Ok(CarStream {
Expand Down Expand Up @@ -257,6 +288,15 @@ impl<ReaderT: AsyncBufRead + AsyncSeek + Unpin> CarStream<ReaderT> {
}
}

impl CarStream<tokio::io::BufReader<tokio::fs::File>> {
pub async fn new_from_path(path: impl AsRef<Path>) -> io::Result<Self> {
Self::new(tokio::io::BufReader::new(
tokio::fs::File::open(path.as_ref()).await?,
))
.await
}
}

impl<ReaderT: AsyncBufRead> Stream for CarStream<ReaderT> {
type Item = io::Result<CarBlock>;

Expand Down
8 changes: 8 additions & 0 deletions src/utils/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ pub fn create_new_sensitive_file(path: &Path) -> Result<File> {
Ok(file)
}

/// Skips `n` bytes from the reader.
pub async fn skip_bytes<T: tokio::io::AsyncRead + Unpin>(reader: T, n: u64) -> std::io::Result<T> {
use tokio::io::AsyncReadExt as _;
let mut take = reader.take(n);
tokio::io::copy(&mut take, &mut tokio::io::sink()).await?;
Ok(take.into_inner())
}

/// Converts a TOML file represented as a string to `S`
///
/// # Example
Expand Down
Loading