Skip to content

feat(backfill): implement archival blob backfill #2175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion crates/walrus-sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub(crate) use crate::utils::{CompletedReasonWeight, WeightedFutures};
use crate::{
active_committees::ActiveCommittees,
config::CommunicationLimits,
error::{ClientError, ClientErrorKind, ClientResult},
error::{ClientError, ClientErrorKind, ClientResult, StoreError},
store_when::StoreWhen,
utils::{WeightedResult, styled_progress_bar, styled_spinner},
};
Expand Down Expand Up @@ -680,6 +680,78 @@ impl<T: ReadClient> Client<T> {

Ok(slivers)
}

/// Encodes the blob and sends metadata and slivers to the selected nodes.
///
/// The function optionally receives a blob ID as input, to check that the blob ID resulting
/// from the encoding matches the expected blob ID. This operation is intended for backfills,
/// and it will not request a certificate from the storage nodes.
///
/// Returns a vector containing the results of the store operations on each node.
pub async fn backfill_blob_to_nodes(
&self,
blob: &[u8],
node_ids: impl IntoIterator<Item = ObjectID>,
encoding_type: EncodingType,
expected_blob_id: Option<BlobId>,
) -> ClientResult<Vec<NodeResult<(), StoreError>>> {
tracing::info!(
?expected_blob_id,
blob_size = blob.len(),
"attempting to backfill blob to nodes"
);
let committees = self.get_committees().await?;
let (pairs, metadata) = self
.encoding_config
.get_for_type(encoding_type)
.encode_with_metadata(blob)
.map_err(ClientError::other)?;

if let Some(expected) = expected_blob_id {
ensure!(
expected == *metadata.blob_id(),
ClientError::store_blob_internal(format!(
"the expected blob ID ({}) does not match the encoded blob ID ({})",
expected,
metadata.blob_id()
))
)
}

let mut pairs_per_node = self
.pairs_per_node(metadata.blob_id(), &pairs, &committees)
.await;

let sliver_write_limit = self
.communication_limits
.max_concurrent_sliver_writes_for_blob_size(
metadata.metadata().unencoded_length(),
&self.encoding_config,
metadata.metadata().encoding_type(),
);
let comms = self.communication_factory.node_write_communications_by_id(
&committees,
Arc::new(Semaphore::new(sliver_write_limit)),
node_ids,
)?;

let store_operations: Vec<_> = comms
.iter()
.map(|nc| {
nc.store_metadata_and_pairs_without_confirmation(
&metadata,
pairs_per_node
.remove(&nc.node_index)
.expect("there are shards for each node"),
)
})
.collect();

// Await on all store operations concurrently.
let results = futures::future::join_all(store_operations).await;

Ok(results)
}
}

impl Client<SuiContractClient> {
Expand Down
46 changes: 46 additions & 0 deletions crates/walrus-sdk/src/client/communication/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use rand::{seq::SliceRandom, thread_rng};
use reqwest::Client as ReqwestClient;
use rustls::pki_types::CertificateDer;
use rustls_native_certs::CertificateResult;
use sui_types::base_types::ObjectID;
use tokio::sync::Semaphore;
use walrus_core::{Epoch, NetworkPublicKey, encoding::EncodingConfig};
use walrus_storage_node_client::{ClientBuildError, StorageNodeClient, StorageNodeClientBuilder};
Expand Down Expand Up @@ -137,6 +138,51 @@ impl NodeCommunicationFactory {
})
}

/// Returns a vector of [`NodeWriteCommunication`] objects, matching the specified node IDs.
pub(crate) fn node_write_communications_by_id<'a>(
&'a self,
committees: &'a ActiveCommittees,
sliver_write_limit: Arc<Semaphore>,
node_ids: impl IntoIterator<Item = ObjectID>,
) -> ClientResult<Vec<NodeWriteCommunication<'a>>> {
self.remove_old_cached_clients(
committees,
&mut self
.client_cache
.lock()
.expect("other threads should not panic"),
);

let write_committee = committees.write_committee();
let node_ids: Vec<_> = node_ids.into_iter().collect();

let comms = write_committee
.members()
.iter()
.enumerate()
.filter_map(|(idx, node)| {
if node_ids.contains(&node.node_id) {
self.create_write_communication(
write_committee,
idx,
sliver_write_limit.clone(),
)
.transpose()
} else {
None
}
})
.collect::<Result<Vec<_>, ClientBuildError>>()
.map_err(|error| {
ClientError::store_blob_internal(format!(
"cannot communicate with one or more of the storage nodes: {}",
error
))
})?;

Ok(comms)
}

/// Builds a [`NodeCommunication`] object for the identified storage node within the
/// committee.
///
Expand Down
39 changes: 31 additions & 8 deletions crates/walrus-sdk/src/client/communication/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,36 @@ impl NodeWriteCommunication<'_> {
pairs: impl IntoIterator<Item = &SliverPair>,
blob_persistence_type: &BlobPersistenceType,
) -> NodeResult<SignedStorageConfirmation, StoreError> {
let result = async {
self.store_metadata_and_pairs_without_confirmation(metadata, pairs)
.await
.take_inner_result()?;

self.get_confirmation_with_retries_inner(
metadata.blob_id(),
self.committee_epoch,
blob_persistence_type,
)
.await
.map_err(StoreError::Confirmation)
}
.await;
tracing::debug!(
blob_id = %metadata.blob_id(),
node = %self.node.public_key,
?result,
"retrieved storage confirmation"
);
self.to_node_result_with_n_shards(result)
}

/// Stores metadata and sliver pairs on a node, but does _not_ requests a storage confirmation.
#[tracing::instrument(level = Level::TRACE, parent = &self.span, skip_all)]
pub async fn store_metadata_and_pairs_without_confirmation(
&self,
metadata: &VerifiedBlobMetadataWithId,
pairs: impl IntoIterator<Item = &SliverPair>,
) -> NodeResult<(), StoreError> {
tracing::debug!(blob_id = %metadata.blob_id(), "storing metadata and sliver pairs");
let result = async {
let metadata_status = self
Expand All @@ -336,14 +366,7 @@ impl NodeWriteCommunication<'_> {
n_stored_slivers,
blob_id = %metadata.blob_id(),
"finished storing slivers on node");

self.get_confirmation_with_retries_inner(
metadata.blob_id(),
self.committee_epoch,
blob_persistence_type,
)
.await
.map_err(StoreError::Confirmation)
Ok(())
}
.await;
tracing::debug!(
Expand Down
1 change: 1 addition & 0 deletions crates/walrus-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ backup = [
]
client = [
"dep:colored",
"dep:object_store",
"dep:prettytable",
]
default = ["client", "deploy", "node"]
Expand Down
2 changes: 1 addition & 1 deletion crates/walrus-service/src/backup/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use diesel_async::{
};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use futures::{StreamExt, stream};
use object_store::{ObjectStore, gcp::GoogleCloudStorageBuilder, local::LocalFileSystem};
use object_store::{ObjectStore, gcp::GoogleCloudStorageBuilder};
use prometheus::core::{AtomicU64, GenericCounter};
use sha2::Digest;
use sui_types::event::EventID;
Expand Down
2 changes: 2 additions & 0 deletions crates/walrus-service/src/client/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use walrus_sdk::{
use walrus_sui::wallet::Wallet;

mod args;
mod backfill;
mod cli_output;
mod runner;

pub use args::{
AggregatorArgs,
App,
Expand Down
34 changes: 34 additions & 0 deletions crates/walrus-service/src/client/cli/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,40 @@ pub enum CliCommands {
#[command(subcommand)]
command: NodeAdminCommands,
},
/// Pull all blobs (filtered by optional prefix specifier) from Google Cloud Storage down into
/// the specified backfill_dir.
PullArchiveBlobs {
/// The Google Cloud Storage bucket to pull from.
#[arg(long)]
gcs_bucket: String,
/// Optional object name prefix filter.
#[arg(long)]
prefix: Option<String>,
/// The directory to pull into.
#[arg(long)]
backfill_dir: String,
/// Durable list of objects already pulled. This is loaded at script initialization and used
/// to avoid pulling redundant objects. Note that looking in the backfill_dir is not a
/// complete solution because the disk space needs to be reclaimed after the archival blobs
/// have been pushed up to the network. So, this `pulled_state` file is ensuring we don't
/// have to start from the beginning in the event that we need to stop and restart the
/// backfill.
#[arg(long)]
pulled_state: PathBuf,
},
/// Upload blob slivers and metadata from a specified directory to the listed storage nodes.
BlobBackfill {
/// The subdirectory when blob-backfill can find blobs. Blobs in this directory must be
/// named with their blob id. Any files that exist in this directory that do not have a
/// conforming blob id name will be skipped.
#[arg(long)]
backfill_dir: PathBuf,
/// The filename where successfully pushed blob IDs will be stored.
#[arg(long)]
pushed_state: PathBuf,
/// The nodes to backfill with slivers and blob metadata.
node_ids: Vec<ObjectID>,
},
}

/// Subcommands for the `info` command.
Expand Down
Loading
Loading