Skip to content
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c0a9065
Add new subcommands
elmattic Oct 3, 2025
ece8b82
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 3, 2025
92c7841
Add cancelation for v2 exports
elmattic Oct 6, 2025
e5a6b74
Fix message
elmattic Oct 6, 2025
8175449
Return false if no export task is running
elmattic Oct 6, 2025
5be4c76
Typo
elmattic Oct 6, 2025
56a9c8d
Don't persist if export is cancelled
elmattic Oct 6, 2025
64a3c48
Add progress bar
elmattic Oct 7, 2025
7accd2f
Dont display pb if not export task is running
elmattic Oct 7, 2025
2837ea3
Better support for cancel
elmattic Oct 7, 2025
c452a6e
Refactor crate usage
elmattic Oct 7, 2025
c43ae0a
Fix persist
elmattic Oct 7, 2025
d0c4160
Compute progress on node side
elmattic Oct 7, 2025
86c7631
Fix lint errors
elmattic Oct 8, 2025
f683b3f
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 8, 2025
365ae1f
Fix elapsed time
elmattic Oct 8, 2025
1acb32a
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 8, 2025
c837c8a
Update CHANGELOG
elmattic Oct 8, 2025
7760b22
Fix lint error
elmattic Oct 8, 2025
36847bd
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 9, 2025
e7e2148
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 9, 2025
36faa47
Fix CI tests
elmattic Oct 9, 2025
f0755f9
Sort list
elmattic Oct 9, 2025
b3fcffd
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 9, 2025
c9dab4a
Remove max duration
elmattic Oct 9, 2025
741918f
Remove max duration
elmattic Oct 9, 2025
76c390a
Revert
elmattic Oct 9, 2025
ee3bb9c
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 9, 2025
820bcd5
Apply coderabbitai suggestion
elmattic Oct 10, 2025
19147cd
Avoid division by zero
elmattic Oct 10, 2025
a317d85
Apply coderabbitai suggestion
elmattic Oct 10, 2025
04e0a25
Add json formatting
elmattic Oct 10, 2025
8dc3cb6
Clamp with zero
elmattic Oct 10, 2025
4cfe6f3
Lower polling
elmattic Oct 10, 2025
302020b
Address ai comments
elmattic Oct 10, 2025
37b8964
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 10, 2025
e0d3505
Move to tokio CancellationToken
elmattic Oct 10, 2025
969831e
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 10, 2025
d8c06b0
Refactor to use a single global
elmattic Oct 10, 2025
119823f
Apply coderabbitai suggestion
elmattic Oct 10, 2025
2c9c0a7
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 10, 2025
c8a3e1c
Merge branch 'main' into elmattic/snapshot-export-qol
elmattic Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

### Added

- [#6082](https://github.com/ChainSafe/forest/issues/6082) Added `forest-cli snapshot export-status` and `forest-cli snapshot export-cancel` subcommands to monitor or cancel an export, respectively.

### Changed

### Removed
Expand Down
2 changes: 1 addition & 1 deletion src/blocks/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl GetSize for RawBlockHeader {
#[cfg_attr(test, derive(Default))]
#[derive(Debug, GetSize)]
pub struct CachingBlockHeader {
uncached: RawBlockHeader,
pub uncached: RawBlockHeader,
#[get_size(ignore)]
cid: OnceLock<Cid>,
has_ever_been_verified_against_any_signature: AtomicBool,
Expand Down
8 changes: 6 additions & 2 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ use multihash_derive::MultihashDigest as _;
use nunny::Vec as NonEmpty;
use std::fs::File;
use std::io::{Seek as _, SeekFrom};
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::sync::Notify;

pub static CANCEL_EXPORT: LazyLock<Arc<Notify>> = LazyLock::new(|| Arc::new(Notify::new()));

#[derive(Debug, Clone, Default)]
pub struct ExportOptions {
Expand Down Expand Up @@ -161,7 +164,8 @@ async fn export_to_forest_car<D: Digest>(
tipset.clone().chain_owned(Arc::clone(db)),
stateroot_lookup_limit,
)
.with_seen(seen),
.with_seen(seen)
.track_progress(true),
);

// Encode Ipld key-value pairs in zstd frames
Expand Down
94 changes: 90 additions & 4 deletions src/cli/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::cli_shared::snapshot::{self, TrustedVendor};
use crate::db::car::forest::new_forest_car_temp_path_in;
use crate::networks::calibnet;
use crate::rpc::chain::ForestChainExportDiffParams;
use crate::rpc::types::ApiExportResult;
use crate::rpc::{self, chain::ForestChainExportParams, prelude::*};
use crate::shim::policy::policy_constants::CHAIN_FINALITY;
use anyhow::Context as _;
Expand Down Expand Up @@ -42,6 +43,14 @@ pub enum SnapshotCommands {
#[arg(long, value_enum, default_value_t = FilecoinSnapshotVersion::V1)]
format: FilecoinSnapshotVersion,
},
/// Show status of the current export.
ExportStatus {
/// Wait until it completes and print progress.
#[arg(long)]
wait: bool,
},
/// Cancel the current export.
ExportCancel {},
/// Export a diff snapshot between `from` and `to` epochs to `<output_path>`
ExportDiff {
/// `./forest_snapshot_diff_{chain}_{from}_{to}+{depth}.car.zst`.
Expand Down Expand Up @@ -152,22 +161,99 @@ impl SnapshotCommands {

// Manually construct RpcRequest because snapshot export could
// take a few hours on mainnet
let hash_result = client
let export_result = client
.call(ForestChainExport::request((params,))?.with_timeout(Duration::MAX))
.await?;

handle.abort();
pb.finish();
_ = handle.await;

if !dry_run {
if let Some(hash) = hash_result {
if !dry_run && let ApiExportResult::Done(hash_opt) = export_result.clone() {
if let Some(hash) = hash_opt {
save_checksum(&output_path, hash).await?;
}

temp_path.persist(output_path)?;
}

println!("Export completed.");
match export_result {
ApiExportResult::Done(_) => {
println!("Export completed.");
}
ApiExportResult::Cancelled => {
println!("Export cancelled.");
}
}
Ok(())
}
Self::ExportStatus { wait } => {
if wait {
let result = client
.call(
ForestChainExportStatus::request(())?
.with_timeout(Duration::from_secs(30)),
)
.await?;
let elapsed = chrono::Utc::now()
.signed_duration_since(result.start_time)
.to_std()?;
let pb = ProgressBar::new(10000)
.with_elapsed(elapsed)
.with_message("Exporting");
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar}] {percent}% {msg} ",
)
.expect("indicatif template must be valid")
.progress_chars("#>-"),
);
loop {
let result = client
.call(
ForestChainExportStatus::request(())?
.with_timeout(Duration::from_secs(30)),
)
.await?;
if result.cancelled {
pb.set_message("Export cancelled");
pb.abandon();

return Ok(());
}
if !result.exporting {
return Ok(());
}
let position = (result.progress * 10000.0).trunc() as u64;
pb.set_position(position);

if position == 10000 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
pb.finish_with_message("Export completed");

return Ok(());
}
let result = client
.call(
ForestChainExportStatus::request(())?.with_timeout(Duration::from_secs(30)),
)
.await?;
println!("{:?}", result);

Ok(())
}
Self::ExportCancel {} => {
let result = client
.call(
ForestChainExportCancel::request(())?.with_timeout(Duration::from_secs(30)),
)
.await?;
if result {
println!("Export cancelled.");
}
Ok(())
}
Self::ExportDiff {
Expand Down
56 changes: 56 additions & 0 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,58 @@ use crate::shim::clock::ChainEpoch;
use crate::utils::db::car_stream::CarBlock;
use crate::utils::encoding::extract_cids;
use crate::utils::multihash::prelude::*;
use chrono::{DateTime, Utc};
use cid::Cid;
use futures::Stream;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
use std::borrow::Borrow;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::LazyLock;
use std::task::{Context, Poll};

#[derive(Default)]
pub struct ExportStatus {
pub epoch: i64,
pub initial_epoch: i64,
pub exporting: bool,
pub cancelled: bool,
pub start_time: DateTime<Utc>,
}

pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
LazyLock::new(|| ExportStatus::default().into());

fn update_epoch(new_value: i64) {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.epoch = new_value;
if mutex.initial_epoch == 0 {
mutex.initial_epoch = new_value;
}
}

pub fn start_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.epoch = 0;
mutex.initial_epoch = 0;
mutex.exporting = true;
mutex.cancelled = false;
mutex.start_time = Utc::now();
}

pub fn end_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.exporting = false;
}

pub fn cancel_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.exporting = false;
mutex.cancelled = true;
}

fn should_save_block_to_snapshot(cid: Cid) -> bool {
// Don't include identity CIDs.
// We only include raw and dagcbor, for now.
Expand Down Expand Up @@ -112,6 +155,7 @@ pin_project! {
seen: CidHashSet,
stateroot_limit_exclusive: ChainEpoch,
fail_on_dead_links: bool,
track_progress: bool,
}
}

Expand All @@ -126,6 +170,11 @@ impl<DB, T> ChainStream<DB, T> {
self
}

pub fn track_progress(mut self, track_progress: bool) -> Self {
self.track_progress = track_progress;
self
}

#[allow(dead_code)]
pub fn into_seen(self) -> CidHashSet {
self.seen
Expand Down Expand Up @@ -155,6 +204,7 @@ pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T>
seen: CidHashSet::default(),
stateroot_limit_exclusive,
fail_on_dead_links: true,
track_progress: false,
}
}

Expand Down Expand Up @@ -197,6 +247,9 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
}
}
Iterate(epoch, block_cid, _type, cid_vec) => {
if *this.track_progress {
update_epoch(*epoch);
}
while let Some(cid) = cid_vec.pop_front() {
// The link traversal implementation assumes there are three types of encoding:
// 1. DAG_CBOR: needs to be reachable, so we add it to the queue and load.
Expand Down Expand Up @@ -242,6 +295,9 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
for block in tipset.borrow().block_headers() {
let (cid, data) = block.car_block()?;
if this.seen.insert(cid) {
if *this.track_progress {
update_epoch(block.uncached.epoch);
}
// Make sure we always yield a block otherwise.
this.dfs.push_back(Emit(cid, Some(data)));

Expand Down
Loading
Loading