diff --git a/crates/walrus-core/src/encoding/quilt_encoding.rs b/crates/walrus-core/src/encoding/quilt_encoding.rs index 5e8de5ff7..aa9fa558d 100644 --- a/crates/walrus-core/src/encoding/quilt_encoding.rs +++ b/crates/walrus-core/src/encoding/quilt_encoding.rs @@ -125,7 +125,7 @@ pub trait QuiltIndexApi: Clone + Into { -> Result<&V::QuiltPatch, QuiltError>; /// Returns the sliver indices of the quilt patches stored in. - fn get_sliver_indices_by_identifiers( + fn get_sliver_indices_for_identifiers( &self, identifiers: &[&str], ) -> Result, QuiltError>; @@ -1878,7 +1878,7 @@ mod tests { ); let required_indices = quilt_index_v1 - .get_sliver_indices_by_identifiers(&[identifier]) + .get_sliver_indices_for_identifiers(&[identifier]) .expect("Should get sliver indices by identifiers"); assert_eq!(required_indices, missing_indices); diff --git a/crates/walrus-core/src/metadata.rs b/crates/walrus-core/src/metadata.rs index 23d9f4cb5..f441237ec 100644 --- a/crates/walrus-core/src/metadata.rs +++ b/crates/walrus-core/src/metadata.rs @@ -124,13 +124,13 @@ pub enum QuiltIndex { impl QuiltIndex { /// Returns the sliver indices of the quilt patch with the given identifiers. - pub fn get_sliver_indices_by_identifiers( + pub fn get_sliver_indices_for_identifiers( &self, identifiers: &[&str], ) -> Result, QuiltError> { match self { QuiltIndex::V1(quilt_index) => { - quilt_index.get_sliver_indices_by_identifiers(identifiers) + quilt_index.get_sliver_indices_for_identifiers(identifiers) } } } @@ -228,7 +228,7 @@ impl QuiltIndexApi for QuiltIndexV1 { } /// If the quilt contains duplicate identifiers, all matching patches are returned. - fn get_sliver_indices_by_identifiers( + fn get_sliver_indices_for_identifiers( &self, identifiers: &[&str], ) -> Result, QuiltError> { diff --git a/crates/walrus-e2e-tests/tests/test_client.rs b/crates/walrus-e2e-tests/tests/test_client.rs index 853c5cba3..4ae905a5a 100644 --- a/crates/walrus-e2e-tests/tests/test_client.rs +++ b/crates/walrus-e2e-tests/tests/test_client.rs @@ -19,7 +19,7 @@ use std::{ }; use indicatif::MultiProgress; -use rand::random; +use rand::{Rng, random, seq::SliceRandom, thread_rng}; #[cfg(msim)] use sui_macros::{clear_fail_point, register_fail_point_if}; use sui_types::base_types::{SUI_ADDRESS_LENGTH, SuiAddress}; @@ -32,14 +32,25 @@ use walrus_core::{ EpochCount, ShardIndex, SliverPairIndex, - encoding::{EncodingConfigTrait as _, Primary}, + encoding::{ + EncodingConfigTrait as _, + Primary, + quilt_encoding::{QuiltApi, QuiltStoreBlob, QuiltStoreBlobOwned, QuiltVersionV1}, + }, merkle::Node, messages::BlobPersistenceType, - metadata::VerifiedBlobMetadataWithId, + metadata::{QuiltMetadata, VerifiedBlobMetadataWithId}, }; use walrus_proc_macros::walrus_simtest; use walrus_sdk::{ - client::{Blocklist, Client, WalrusStoreBlob, WalrusStoreBlobApi, responses::BlobStoreResult}, + client::{ + Blocklist, + Client, + WalrusStoreBlob, + WalrusStoreBlobApi, + quilt_client::QuiltClientConfig, + responses::{BlobStoreResult, QuiltStoreResult}, + }, error::{ ClientError, ClientErrorKind::{ @@ -977,6 +988,117 @@ async fn test_storage_nodes_delete_data_for_deleted_blobs() -> TestResult { Ok(()) } +fn group_identifiers_randomly<'a>(identifiers: &'a mut [&str]) -> Vec> { + identifiers.shuffle(&mut thread_rng()); + + let mut groups = Vec::new(); + let mut current_pos = 0; + + while current_pos < identifiers.len() { + let end_index = thread_rng().gen_range(current_pos..=identifiers.len()); + let group = identifiers[current_pos..end_index].to_vec(); + groups.push(group); + current_pos = end_index; + } + + groups +} + +async_param_test! { + #[ignore = "ignore E2E tests by default"] + #[walrus_simtest] + test_store_quilt -> TestResult : [ + one_blob: (1), + two_blobs: (2), + seven_blobs: (10), + ] +} +/// Tests that a quilt can be stored. +async fn test_store_quilt(blobs_to_create: u32) -> TestResult { + telemetry_subscribers::init_for_testing(); + + let test_nodes_config = TestNodesConfig { + node_weights: vec![7, 7, 7, 7, 7], + ..Default::default() + }; + let test_cluster_builder = + test_cluster::E2eTestSetupBuilder::new().with_test_nodes_config(test_nodes_config); + let (_sui_cluster_handle, _cluster, client, _) = test_cluster_builder.build().await?; + let client = client.as_ref(); + let blobs = walrus_test_utils::random_data_list(314, blobs_to_create as usize); + let encoding_type = DEFAULT_ENCODING; + let quilt_store_blobs = blobs + .iter() + .enumerate() + .map(|(i, blob)| QuiltStoreBlob::new(blob, format!("test-blob-{}", i + 1))) + .collect::>(); + + // Store the quilt. + let quilt_client = client.quilt_client(QuiltClientConfig::new(4, Duration::from_secs(30))); + let quilt = quilt_client + .construct_quilt::(&quilt_store_blobs, encoding_type) + .await?; + let store_operation_result = quilt_client + .reserve_and_store_quilt::( + &quilt, + encoding_type, + 2, + StoreWhen::Always, + BlobPersistence::Permanent, + PostStoreAction::Keep, + ) + .await?; + + let QuiltStoreResult { + blob_store_result, + stored_quilt_blobs, + } = store_operation_result; + let blob_object = match blob_store_result { + BlobStoreResult::NewlyCreated { blob_object, .. } => blob_object, + _ => panic!("Expected NewlyCreated, got {:?}", blob_store_result), + }; + + // Read the blobs in the quilt. + let id_blob_map = quilt_store_blobs + .iter() + .map(|b| (b.identifier(), b)) + .collect::>(); + + let blob_id = blob_object.blob_id; + let quilt_metadata = quilt_client.get_quilt_metadata(&blob_id).await?; + let QuiltMetadata::V1(metadata_v1) = quilt_metadata; + assert_eq!(&metadata_v1.index, quilt.quilt_index()?); + + let mut identifiers = stored_quilt_blobs + .iter() + .map(|b| b.identifier.as_str()) + .collect::>(); + let groups = group_identifiers_randomly(&mut identifiers); + + tracing::info!(groups = ?groups, "test retrieving quilts by groups"); + + for group in groups { + let retrieved_quilt_blobs: Vec = quilt_client + .get_blobs_by_identifiers(&blob_id, &group) + .await?; + + assert_eq!( + retrieved_quilt_blobs.len(), + group.len(), + "Mismatch in number of blobs retrieved from quilt" + ); + + for retrieved_quilt_blob in &retrieved_quilt_blobs { + let original_blob = id_blob_map + .get(retrieved_quilt_blob.identifier()) + .expect("identifier should be present"); + assert_eq!(&retrieved_quilt_blob, original_blob); + } + } + + Ok(()) +} + #[ignore = "ignore E2E tests by default"] #[walrus_simtest] async fn test_blocklist() -> TestResult { diff --git a/crates/walrus-sdk/src/client.rs b/crates/walrus-sdk/src/client.rs index 465315881..dcd1e19fc 100644 --- a/crates/walrus-sdk/src/client.rs +++ b/crates/walrus-sdk/src/client.rs @@ -75,6 +75,7 @@ use self::{ pub(crate) use crate::utils::{CompletedReasonWeight, WeightedFutures}; use crate::{ active_committees::ActiveCommittees, + client::quilt_client::{QuiltClient, QuiltClientConfig}, config::CommunicationLimits, error::{ClientError, ClientErrorKind, ClientResult}, store_when::StoreWhen, @@ -88,6 +89,7 @@ pub use crate::{ pub mod client_types; pub mod communication; pub mod metrics; +pub mod quilt_client; pub mod refresh; pub mod resource; pub mod responses; @@ -1445,6 +1447,11 @@ impl Client { self } + /// Returns a [`QuiltClient`] for storing and retrieving quilts. + pub fn quilt_client(&self, config: QuiltClientConfig) -> QuiltClient<'_, T> { + QuiltClient::new(self, config) + } + /// Stores the already-encoded metadata and sliver pairs for a blob into Walrus, by sending /// sliver pairs to at least 2f+1 shards. /// diff --git a/crates/walrus-sdk/src/client/client_types.rs b/crates/walrus-sdk/src/client/client_types.rs index 9998cc16a..60d12381e 100644 --- a/crates/walrus-sdk/src/client/client_types.rs +++ b/crates/walrus-sdk/src/client/client_types.rs @@ -6,11 +6,13 @@ use std::{fmt::Debug, sync::Arc}; use enum_dispatch::enum_dispatch; +use serde::{Deserialize, Serialize}; use sui_types::base_types::ObjectID; use tracing::{Level, Span, field}; use walrus_core::{ BlobId, - encoding::SliverPair, + QuiltPatchId, + encoding::{SliverPair, quilt_encoding::QuiltPatchInternalIdApi}, messages::ConfirmationCertificate, metadata::{BlobMetadataApi as _, VerifiedBlobMetadataWithId}, }; @@ -27,6 +29,25 @@ use super::{ /// The log level for all WalrusStoreBlob spans. pub(crate) const BLOB_SPAN_LEVEL: Level = Level::DEBUG; +/// Identifies a stored quilt patch. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredQuiltPatch { + /// The identifier of the quilt patch. + pub identifier: String, + /// The quilt patch id. + pub quilt_patch_id: String, +} + +impl StoredQuiltPatch { + /// Create a new stored quilt patch. + pub fn new(blob_id: BlobId, identifier: &str, patch_id: T) -> Self { + Self { + identifier: identifier.to_string(), + quilt_patch_id: QuiltPatchId::new(blob_id, patch_id.to_bytes()).to_string(), + } + } +} + /// API for a blob that is being stored to Walrus. #[enum_dispatch] pub trait WalrusStoreBlobApi<'a, T: Debug + Clone + Send + Sync> { diff --git a/crates/walrus-sdk/src/client/quilt_client.rs b/crates/walrus-sdk/src/client/quilt_client.rs new file mode 100644 index 000000000..fbb626f56 --- /dev/null +++ b/crates/walrus-sdk/src/client/quilt_client.rs @@ -0,0 +1,581 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Client for storing and retrieving quilts. + +use std::{ + collections::HashSet, + fs, + path::{Path, PathBuf}, + time::Duration, +}; + +use walrus_core::{ + BlobId, + EncodingType, + Epoch, + EpochCount, + Sliver, + SliverIndex, + encoding::{Primary, QuiltError, Secondary, SliverData, quilt_encoding::*}, + metadata::{QuiltIndex, QuiltMetadata, QuiltMetadataV1, VerifiedBlobMetadataWithId}, +}; +use walrus_sui::client::{BlobPersistence, PostStoreAction, ReadClient, SuiContractClient}; +use walrus_utils::read_blob_from_file; + +use crate::{ + client::{Client, client_types::StoredQuiltPatch, responses::QuiltStoreResult}, + error::{ClientError, ClientErrorKind, ClientResult}, + store_when::StoreWhen, +}; + +/// Reads all files recursively from a given path and returns them as path-content pairs. +/// +/// If the path is a file, it's read directly. +/// If the path is a directory, its files are read recursively. +/// Returns error if path doesn't exist or is not accessible. +pub fn read_blobs_from_paths>(paths: &[P]) -> ClientResult)>> { + if paths.is_empty() { + return Ok(Vec::new()); + } + + let mut collected_files: HashSet = HashSet::new(); + for path in paths { + let path = path.as_ref(); + + // Validate path existence and accessibility. + if !path.exists() { + return Err(ClientError::from(ClientErrorKind::Other( + format!("Path '{}' does not exist.", path.display()).into(), + ))); + } + + collected_files.extend(get_all_files_from_path(path)?); + } + + let mut collected_files_with_content = Vec::with_capacity(collected_files.len()); + for file_path in collected_files { + let content = read_blob_from_file(&file_path) + .map_err(|e| ClientError::from(ClientErrorKind::Other(e.to_string().into())))?; + collected_files_with_content.push((file_path, content)); + } + + Ok(collected_files_with_content) +} + +/// Get all file paths from a directory recursively. +fn get_all_files_from_path>(path: P) -> ClientResult> { + let path = path.as_ref(); + let mut collected_files = HashSet::new(); + + if path.is_file() { + collected_files.insert(path.to_owned()); + } else if path.is_dir() { + for entry in fs::read_dir(path).map_err(ClientError::other)? { + let current_entry_path = entry.map_err(ClientError::other)?.path(); + collected_files.extend(get_all_files_from_path(¤t_entry_path)?); + } + } + + Ok(collected_files) +} + +/// Configuration for the QuiltClient. +#[derive(Debug, Clone)] +pub struct QuiltClientConfig { + /// The maximum number of attempts to retrieve slivers. + pub max_retrieve_slivers_attempts: usize, + /// The timeout duration for retrieving slivers. + pub timeout_duration: Duration, +} + +impl QuiltClientConfig { + /// Creates a new QuiltClientConfig. + pub fn new(max_retrieve_slivers_attempts: usize, timeout_duration: Duration) -> Self { + Self { + max_retrieve_slivers_attempts, + timeout_duration, + } + } +} + +impl Default for QuiltClientConfig { + fn default() -> Self { + Self { + max_retrieve_slivers_attempts: 2, + timeout_duration: Duration::from_secs(10), + } + } +} + +/// A facade for interacting with Walrus quilt. +#[derive(Debug, Clone)] +pub struct QuiltClient<'a, T> { + client: &'a Client, + config: QuiltClientConfig, +} + +impl<'a, T> QuiltClient<'a, T> { + /// Creates a new QuiltClient. + pub fn new(client: &'a Client, config: QuiltClientConfig) -> Self { + Self { client, config } + } +} + +impl QuiltClient<'_, T> { + /// Retrieves the [`QuiltMetadata`]. + /// + /// If not enough slivers can be retrieved for the quilt index, the entire blob will be read. + pub async fn get_quilt_metadata(&self, quilt_id: &BlobId) -> ClientResult { + self.client.check_blob_id(quilt_id)?; + let (certified_epoch, _) = self + .client + .get_blob_status_and_certified_epoch(quilt_id, None) + .await?; + let metadata = self + .client + .retrieve_metadata(certified_epoch, quilt_id) + .await?; + + // Try to retrieve the quilt index from the slivers. + let quilt_index = + if let Ok(quilt_index) = self.retrieve_quilt_index(&metadata, certified_epoch).await { + quilt_index + } else { + // If the quilt index cannot be retrieved from the slivers, try to retrieve the + // quilt. + tracing::debug!( + "failed to retrieve index slivers, trying to get quilt instead {}", + quilt_id + ); + // TODO(WAL-879): Cache the quilt. + self.get_full_quilt(&metadata, certified_epoch) + .await? + .get_quilt_index()? + }; + + let quilt_metadata = match quilt_index { + QuiltIndex::V1(quilt_index) => QuiltMetadata::V1(QuiltMetadataV1 { + quilt_blob_id: *quilt_id, + metadata: metadata.metadata().clone(), + index: quilt_index.clone(), + }), + }; + + Ok(quilt_metadata) + } + + /// Retrieves the necessary slivers and decodes the quilt index. + /// + /// Returns error if not enough slivers can be retrieved for the quilt index. + async fn retrieve_quilt_index( + &self, + metadata: &VerifiedBlobMetadataWithId, + certified_epoch: Epoch, + ) -> ClientResult { + // Get the first sliver to determine the quilt version. + // + // Since the quilt version is stored as the first byte of the Quilt, it doesn't matter + // whether we get the first primary sliver or the first secondary sliver. + // For now since we only support QuiltV1, we use the first secondary sliver. + let slivers = self + .client + .retrieve_slivers_with_retry::( + metadata, + &[SliverIndex::new(0)], + certified_epoch, + self.config.max_retrieve_slivers_attempts, + self.config.timeout_duration, + ) + .await?; + + let first_sliver = slivers.first().expect("the first sliver should exist"); + let quilt_version = QuiltVersionEnum::new_from_sliver(first_sliver.symbols.data())?; + + let quilt_index = match quilt_version { + QuiltVersionEnum::V1 => { + self.retrieve_quilt_index_internal::( + metadata, + certified_epoch, + first_sliver, + ) + .await? + } + }; + + Ok(quilt_index) + } + + async fn retrieve_quilt_index_internal( + &self, + metadata: &VerifiedBlobMetadataWithId, + certified_epoch: Epoch, + first_sliver: &SliverData, + ) -> ClientResult + where + SliverData: TryFrom, + { + let mut all_slivers = Vec::new(); + let mut refs = vec![first_sliver]; + let first_sliver_refs = [first_sliver].to_vec(); + let mut decoder = V::QuiltConfig::get_decoder(&first_sliver_refs); + + let quilt_index = match decoder.get_or_decode_quilt_index() { + Ok(quilt_index) => quilt_index, + Err(QuiltError::MissingSlivers(indices)) => { + all_slivers.extend( + self.client + .retrieve_slivers_with_retry::( + metadata, + &indices, + certified_epoch, + self.config.max_retrieve_slivers_attempts, + self.config.timeout_duration, + ) + .await?, + ); + refs.extend(all_slivers.iter()); + decoder.add_slivers(&refs); + decoder.get_or_decode_quilt_index()? + } + Err(e) => return Err(e.into()), + }; + + Ok(quilt_index) + } + + /// Retrieves the quilt patches of the given identifiers from the quilt. + pub async fn get_blobs_by_identifiers( + &self, + quilt_id: &BlobId, + identifiers: &[&str], + ) -> ClientResult> { + let metadata = self.get_quilt_metadata(quilt_id).await?; + + let blobs = match metadata { + QuiltMetadata::V1(metadata) => { + self.get_blobs_by_identifiers_impl::( + &metadata.get_verified_metadata(), + &metadata.index.into(), + identifiers, + ) + .await? + } + }; + + Ok(blobs) + } + + /// Retrieves blobs from quilt by identifiers. + async fn get_blobs_by_identifiers_impl( + &self, + metadata: &VerifiedBlobMetadataWithId, + index: &QuiltIndex, + identifiers: &[&str], + ) -> ClientResult> + where + SliverData: TryFrom, + { + // Retrieve slivers for the given identifiers. + let sliver_indices = index.get_sliver_indices_for_identifiers(identifiers)?; + let (certified_epoch, _) = self + .client + .get_blob_status_and_certified_epoch(metadata.blob_id(), None) + .await?; + let retrieved_slivers = self + .client + .retrieve_slivers_with_retry::( + metadata, + &sliver_indices, + certified_epoch, + self.config.max_retrieve_slivers_attempts, + self.config.timeout_duration, + ) + .await; + + if let Ok(slivers) = retrieved_slivers { + let sliver_refs: Vec<_> = slivers.iter().collect(); + let decoder = V::QuiltConfig::get_decoder_with_quilt_index(&sliver_refs, index); + identifiers + .iter() + .map(|identifier| { + decoder + .get_blob_by_identifier(identifier) + .map_err(ClientError::other) + }) + .collect::, _>>() + } else { + let quilt = self.get_full_quilt(metadata, certified_epoch).await?; + identifiers + .iter() + .map(|identifier| { + quilt + .get_blob_by_identifier(identifier) + .map_err(ClientError::other) + }) + .collect::, _>>() + } + } + + /// Retrieves the quilt from Walrus. + async fn get_full_quilt( + &self, + metadata: &VerifiedBlobMetadataWithId, + certified_epoch: Epoch, + ) -> ClientResult { + let quilt = self + .client + .request_slivers_and_decode::(certified_epoch, metadata) + .await?; + let encoding_config_enum = self + .client + .encoding_config() + .get_for_type(metadata.metadata().encoding_type()); + + QuiltEnum::new(quilt, &encoding_config_enum).map_err(ClientError::other) + } +} + +/// Stores quilts. +impl QuiltClient<'_, SuiContractClient> { + /// Constructs a quilt from a list of blobs. + pub async fn construct_quilt( + &self, + blobs: &[QuiltStoreBlob<'_>], + encoding_type: EncodingType, + ) -> ClientResult { + let encoder = V::QuiltConfig::get_encoder( + self.client.encoding_config().get_for_type(encoding_type), + blobs, + ); + + encoder.construct_quilt().map_err(ClientError::other) + } + + /// Converts a list of blobs with paths to a list of [`QuiltStoreBlob`]s. + /// + /// The on-disk file names are used as identifiers for the quilt patches. + /// If the file name is not valid UTF-8, it will be replaced with "unnamed-blob-". + // + // TODO(WAL-887): Use relative paths to deduplicate the identifiers. + fn assign_identifiers_with_paths( + blobs_with_paths: &[(PathBuf, Vec)], + ) -> Vec { + blobs_with_paths + .iter() + .enumerate() + .map(|(i, (path, blob))| { + QuiltStoreBlob::new( + blob, + path.file_name() + .and_then(|file_name| file_name.to_str()) + .map(String::from) + .unwrap_or_else(|| format!("unnamed-blob-{}", i)), + ) + }) + .collect() + } + + /// Constructs a quilt from a list of paths. + /// + /// The paths can be files or directories; if they are directories, their files are read + /// recursively. + // + /// The on-disk file names are used as identifiers for the quilt patches. + /// If the file name is not valid UTF-8, it will be replaced with "unnamed-blob-index". + pub async fn construct_quilt_from_paths>( + &self, + paths: &[P], + encoding_type: EncodingType, + ) -> ClientResult { + let blobs_with_paths = read_blobs_from_paths(paths)?; + if blobs_with_paths.is_empty() { + return Err(ClientError::from(ClientErrorKind::Other( + "No valid files found in the specified folder".into(), + ))); + } + + let quilt_store_blobs: Vec<_> = Self::assign_identifiers_with_paths(&blobs_with_paths); + + self.construct_quilt::(&quilt_store_blobs, encoding_type) + .await + } + + /// Stores all blobs from a list of paths as a quilt. + #[tracing::instrument(skip_all)] + pub async fn reserve_and_store_quilt_from_paths>( + &self, + paths: &[P], + encoding_type: EncodingType, + epochs_ahead: EpochCount, + store_when: StoreWhen, + persistence: BlobPersistence, + post_store: PostStoreAction, + ) -> ClientResult { + let quilt = self + .construct_quilt_from_paths::(paths, encoding_type) + .await?; + let result = self + .reserve_and_store_quilt::( + &quilt, + encoding_type, + epochs_ahead, + store_when, + persistence, + post_store, + ) + .await?; + + Ok(result) + } + + /// Encodes the blobs to a quilt and stores it to Walrus. + #[tracing::instrument(skip_all, fields(blob_id))] + pub async fn reserve_and_store_quilt( + &self, + quilt: &V::Quilt, + encoding_type: EncodingType, + epochs_ahead: EpochCount, + store_when: StoreWhen, + persistence: BlobPersistence, + post_store: PostStoreAction, + ) -> ClientResult { + let result = self + .client + .reserve_and_store_blobs_retry_committees( + &[quilt.data()], + encoding_type, + epochs_ahead, + store_when, + persistence, + post_store, + None, + ) + .await?; + + let blob_store_result = result.first().expect("the first blob should exist").clone(); + let blob_id = blob_store_result + .blob_id() + .expect("the blob should have an id"); + let stored_quilt_blobs = quilt + .quilt_index()? + .patches() + .iter() + .map(|patch| { + StoredQuiltPatch::new(blob_id, patch.identifier(), patch.quilt_patch_internal_id()) + }) + .collect::>(); + + Ok(QuiltStoreResult { + blob_store_result, + stored_quilt_blobs, + }) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use rand::{Rng, thread_rng}; + use tempfile::TempDir; + + use super::*; + + fn create_random_file(dir: &Path, name: &str, size: usize) -> std::io::Result> { + let mut rng = thread_rng(); + let mut content = vec![0u8; size]; + rng.fill(&mut content[..]); + fs::write(dir.join(name), &content)?; + Ok(content) + } + + fn create_random_dir_structure( + base_dir: &Path, + num_files: usize, + max_depth: usize, + current_depth: usize, + ) -> std::io::Result>> { + let mut rng = thread_rng(); + let mut file_contents = HashMap::new(); + + // Create some random subdirectories if we haven't reached max depth. + if current_depth < max_depth && rng.gen_bool(0.3) { + let num_subdirs = rng.gen_range(1..=3); + for i in 0..num_subdirs { + let subdir_name = format!("subdir_{}", i); + let subdir_path = base_dir.join(&subdir_name); + fs::create_dir_all(&subdir_path)?; + + // Recursively create files in subdirectory. + let subdir_contents = create_random_dir_structure( + &subdir_path, + num_files / (current_depth + 1), + max_depth, + current_depth + 1, + )?; + file_contents.extend(subdir_contents); + } + } + + // Create some random files in current directory. + let files_in_dir = if current_depth == max_depth { + num_files + } else { + rng.gen_range(1..=num_files / 2) + }; + + for i in 0..files_in_dir { + let file_name = format!("file_{}.dat", i); + let file_size = rng.gen_range(100..=1000); + let content = create_random_file(base_dir, &file_name, file_size)?; + file_contents.insert(base_dir.join(&file_name), content); + } + + Ok(file_contents) + } + + #[test] + fn test_read_blobs_from_paths_complex() -> ClientResult<()> { + // Create a temporary directory. + let temp_dir = TempDir::new().map_err(ClientError::other)?; + let base_path = temp_dir.path(); + + // Create a complex directory structure with random files. + let expected_files = + create_random_dir_structure(base_path, 20, 3, 0).map_err(ClientError::other)?; + + // Read all files using read_blobs_from_paths. + let read_files = read_blobs_from_paths(&[base_path])?; + + // Convert read files to HashMap for easy comparison. + let read_files_map: HashMap<_, _> = read_files.into_iter().collect(); + + // Verify all expected files were read with correct content. + assert_eq!( + read_files_map.len(), + expected_files.len(), + "Number of files read doesn't match expected." + ); + + for (path, expected_content) in &expected_files { + let actual_content = read_files_map.get(path).expect("File should exist"); + assert_eq!( + actual_content, expected_content, + "Content mismatch for file: {:?}.", + path + ); + } + + // Test with empty paths. + let empty_result = read_blobs_from_paths::<&Path>(&[])?; + assert!(empty_result.is_empty()); + + // Test with non-existent path. + let non_existent = base_path.join("non_existent"); + let result = read_blobs_from_paths(&[&non_existent]); + assert!(result.is_err()); + + Ok(()) + } +} diff --git a/crates/walrus-sdk/src/client/responses.rs b/crates/walrus-sdk/src/client/responses.rs index 3c4c14c24..9589dce84 100644 --- a/crates/walrus-sdk/src/client/responses.rs +++ b/crates/walrus-sdk/src/client/responses.rs @@ -12,7 +12,7 @@ use utoipa::ToSchema; use walrus_core::{BlobId, Epoch}; use walrus_sui::{EventIdSchema, ObjectIdSchema, types::move_structs::Blob}; -use super::resource::RegisterBlobOp; +use super::{client_types::StoredQuiltPatch, resource::RegisterBlobOp}; /// Either an event ID or an object ID. #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] @@ -140,3 +140,13 @@ impl BlobStoreResult { matches!(self, Self::MarkedInvalid { .. } | Self::Error { .. }) } } + +/// Result when attempting to store a quilt. +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct QuiltStoreResult { + /// The result of storing the quilt data as a blob. + pub blob_store_result: BlobStoreResult, + /// The structure of the quilt. + pub stored_quilt_blobs: Vec, +} diff --git a/crates/walrus-sdk/src/error.rs b/crates/walrus-sdk/src/error.rs index 11dff6f63..f6598e145 100644 --- a/crates/walrus-sdk/src/error.rs +++ b/crates/walrus-sdk/src/error.rs @@ -3,7 +3,7 @@ //! The errors for the storage client and the communication with storage nodes. -use walrus_core::{BlobId, EncodingType, Epoch, SliverPairIndex, SliverType}; +use walrus_core::{BlobId, EncodingType, Epoch, SliverPairIndex, SliverType, encoding::QuiltError}; use walrus_storage_node_client::error::{ClientBuildError, NodeError}; use walrus_sui::client::{MIN_STAKING_THRESHOLD, SuiClientError}; @@ -108,6 +108,14 @@ impl ClientError { } } +impl From for ClientError { + fn from(value: QuiltError) -> Self { + ClientError { + kind: Box::new(ClientErrorKind::QuiltError(value.to_string())), + } + } +} + impl From for ClientError { fn from(kind: ClientErrorKind) -> Self { Box::new(kind).into() @@ -210,4 +218,7 @@ pub enum ClientErrorKind { /// An internal error occurred while storing a blob, usually indicating a bug. #[error("store blob internal error: {0}")] StoreBlobInternal(String), + /// An error when storing/retrieving a quilt. + #[error("quilt error: {0}")] + QuiltError(String), } diff --git a/crates/walrus-service/src/client/cli.rs b/crates/walrus-service/src/client/cli.rs index 0780f1b48..8eeb09cd2 100644 --- a/crates/walrus-service/src/client/cli.rs +++ b/crates/walrus-service/src/client/cli.rs @@ -5,8 +5,7 @@ use std::{ fmt::{self, Display}, - fs, - path::{Path, PathBuf}, + path::PathBuf, str::FromStr, }; @@ -391,14 +390,6 @@ fn thousands_separator_float(num: f64, digits: u32) -> String { format!("{}.{:0digits$}", thousands_separator(integer), decimal) } -/// Reads a blob from the filesystem or returns a helpful error message. -pub fn read_blob_from_file(path: impl AsRef) -> anyhow::Result> { - fs::read(&path).context(format!( - "unable to read blob from '{}'", - path.as_ref().display() - )) -} - /// Error type distinguishing between a decimal value that corresponds to a valid blob ID and any /// other parse error. #[derive(Debug, thiserror::Error)] diff --git a/crates/walrus-service/src/client/cli/args.rs b/crates/walrus-service/src/client/cli/args.rs index 83cb840f6..a27ae0d66 100644 --- a/crates/walrus-service/src/client/cli/args.rs +++ b/crates/walrus-service/src/client/cli/args.rs @@ -29,8 +29,9 @@ use walrus_sui::{ types::{StorageNode, move_structs::Authorized}, utils::SuiNetwork, }; +use walrus_utils::read_blob_from_file; -use super::{BlobIdDecimal, HumanReadableBytes, parse_blob_id, read_blob_from_file}; +use super::{BlobIdDecimal, HumanReadableBytes, parse_blob_id}; use crate::client::{config::AuthConfig, daemon::CacheConfig}; /// The command-line arguments for the Walrus client. diff --git a/crates/walrus-service/src/client/cli/runner.rs b/crates/walrus-service/src/client/cli/runner.rs index a9cd713d0..b52e9c389 100644 --- a/crates/walrus-service/src/client/cli/runner.rs +++ b/crates/walrus-service/src/client/cli/runner.rs @@ -56,7 +56,7 @@ use walrus_sdk::{ }; use walrus_storage_node_client::api::BlobStatus; use walrus_sui::wallet::Wallet; -use walrus_utils::metrics::Registry; +use walrus_utils::{metrics::Registry, read_blob_from_file}; use super::args::{ AggregatorArgs, @@ -89,7 +89,6 @@ use crate::{ get_contract_client, get_read_client, get_sui_read_client_from_rpc_node_or_wallet, - read_blob_from_file, success, warning, }, diff --git a/crates/walrus-service/src/node/metrics.rs b/crates/walrus-service/src/node/metrics.rs index dfc9a74fa..f91120be3 100644 --- a/crates/walrus-service/src/node/metrics.rs +++ b/crates/walrus-service/src/node/metrics.rs @@ -309,6 +309,7 @@ impl TelemetryLabel for ClientErrorKind { ClientErrorKind::FailedToLoadCerts(_) => "failed-to-load-certs", ClientErrorKind::Other(_) => "unknown", ClientErrorKind::StoreBlobInternal(_) => "store-blob-internal", + ClientErrorKind::QuiltError(_) => "quilt-error", } } } diff --git a/crates/walrus-utils/src/lib.rs b/crates/walrus-utils/src/lib.rs index 3660bbcca..2e925b1f9 100644 --- a/crates/walrus-utils/src/lib.rs +++ b/crates/walrus-utils/src/lib.rs @@ -1,8 +1,9 @@ // Copyright (c) Walrus Foundation // SPDX-License-Identifier: Apache-2.0 -use std::path::Path; +use std::{fs, path::Path}; +use anyhow::Context; use serde::de::DeserializeOwned; #[cfg(feature = "backoff")] @@ -38,8 +39,6 @@ pub mod tests { /// Load the config from a YAML file located at the provided path. pub fn load_from_yaml, T: DeserializeOwned>(path: P) -> anyhow::Result { - use anyhow::Context; - let path = path.as_ref(); tracing::debug!(path = %path.display(), "[load_from_yaml] reading from file"); @@ -53,6 +52,14 @@ pub fn load_from_yaml, T: DeserializeOwned>(path: P) -> anyhow::R Ok(serde_yaml::from_reader(reader)?) } +/// Reads a blob from the filesystem or returns a helpful error message. +pub fn read_blob_from_file(path: impl AsRef) -> anyhow::Result> { + fs::read(&path).context(format!( + "unable to read blob from '{}'", + path.as_ref().display() + )) +} + /// A macro to print a crumb of information to the console. This is useful for debugging. #[macro_export] macro_rules! crumb {