Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions client/blockchain-service/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,17 @@ pub struct DistributeFileToBsp<Runtime: StorageEnableRuntime> {
pub bsp_id: BackupStorageProviderId<Runtime>,
}

/// Request for the MSP to check local file storage for a bucket.
///
/// This event is intended to be handled by MSP-specific tasks that validate or reconcile
/// local file storage state for the given bucket. For now, it is only the event definition
/// and wiring; emission and behaviour are implemented separately.
#[derive(Debug, Clone, ActorEvent)]
#[actor(actor = "blockchain_service")]
pub struct CheckBucketFileStorage<Runtime: StorageEnableRuntime> {
pub bucket_id: BucketId<Runtime>,
}

/// The event bus provider for the BlockchainService actor.
///
/// It holds the event buses for the different events that the BlockchainService actor
Expand Down
16 changes: 10 additions & 6 deletions client/blockchain-service/src/handler_msp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler};

use crate::{
events::{
DistributeFileToBsp, FinalisedBucketMovedAway, FinalisedBucketMutationsApplied,
FinalisedMspStopStoringBucketInsolventUser, FinalisedMspStoppedStoringBucket,
FinalisedStorageRequestRejected, ForestWriteLockTaskData, MoveBucketRequestedForMsp,
NewStorageRequest, ProcessMspRespondStoringRequest, ProcessMspRespondStoringRequestData,
ProcessStopStoringForInsolventUserRequest, ProcessStopStoringForInsolventUserRequestData,
StartMovedBucketDownload,
CheckBucketFileStorage, DistributeFileToBsp, FinalisedBucketMovedAway,
FinalisedBucketMutationsApplied, FinalisedMspStopStoringBucketInsolventUser,
FinalisedMspStoppedStoringBucket, FinalisedStorageRequestRejected, ForestWriteLockTaskData,
MoveBucketRequestedForMsp, NewStorageRequest, ProcessMspRespondStoringRequest,
ProcessMspRespondStoringRequestData, ProcessStopStoringForInsolventUserRequest,
ProcessStopStoringForInsolventUserRequestData, StartMovedBucketDownload,
},
handler::LOG_TARGET,
types::{FileDistributionInfo, FileKeyStatus, ManagedProvider, MultiInstancesNodeRole},
Expand Down Expand Up @@ -1066,6 +1066,10 @@ where
Some(_) => {
trace!(target: LOG_TARGET, "Bucket [0x{:x}] root verified: [0x{:x}]", bucket_id, onchain_root);
verified += 1;

// In this case, we emit a `CheckBucketFileStorage` event to the MSP task to check that the files in
// this bucket's forest have their full data in the file storage, and recover them if necessary.
self.emit(CheckBucketFileStorage { bucket_id });
}
// Success Case: Local forest does not exist and the on-chain root is the default root (bucket is empty).
None => {
Expand Down
2 changes: 2 additions & 0 deletions client/file-transfer-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ sc-network = { workspace = true }
sc-network-types = { workspace = true }
sc-utils = { workspace = true }
sc-tracing = { workspace = true }
sp-api = { workspace = true }

# Local
shc-actors-framework = { workspace = true }
shc-actors-derive = { workspace = true }
shc-common = { workspace = true }
shp-file-key-verifier = { workspace = true }
shp-file-metadata = { workspace = true }
pallet-storage-providers-runtime-api = { workspace = true }

[build-dependencies]
prost-build = { workspace = true }
126 changes: 121 additions & 5 deletions client/file-transfer-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ use std::{
};
use tokio::time::{interval, Duration};

use sc_client_api::HeaderBackend;
use sc_network::{
request_responses::{IncomingRequest, OutgoingResponse, RequestFailure},
service::traits::NetworkService,
IfDisconnected, NetworkPeers, NetworkRequest, ProtocolName, ReputationChange,
};
use sc_network_types::PeerId;
use sc_tracing::tracing::{debug, error, info, trace, warn};
use sp_api::ProvideRuntimeApi;

use pallet_storage_providers_runtime_api::StorageProvidersApi;
use shc_actors_framework::actor::{Actor, ActorEventLoop};
use shc_common::{
blockchain_utils::convert_raw_multiaddresses_to_multiaddr,
traits::StorageEnableRuntime,
types::{
BucketId, DownloadRequestId, FileKey, FileKeyProof, UploadRequestId,
BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE, FILE_CHUNK_SIZE,
BucketId, DownloadRequestId, FileKey, FileKeyProof, ProviderId, StorageHubClient,
UploadRequestId, BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE, FILE_CHUNK_SIZE,
},
};
use shp_file_metadata::ChunkId;
Expand Down Expand Up @@ -106,6 +110,12 @@ pub struct FileTransferService<Runtime: StorageEnableRuntime> {
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Substrate network service that gives access to p2p operations.
network: Arc<dyn NetworkService>,
/// StorageHub client used to query runtime APIs for authorisation checks.
client: Arc<StorageHubClient<Runtime::RuntimeApi>>,
/// Trusted MSP IDs (on-chain IDs) configured for this BSP.
trusted_msps: Vec<ProviderId<Runtime>>,
/// Mapping from trusted MSP peer IDs to their on-chain MSP IDs.
trusted_msp_peers: HashMap<PeerId, ProviderId<Runtime>>,
/// Registry of (peer, file key) pairs for which we accept requests.
peer_file_allow_list: HashSet<(PeerId, FileKey)>,
/// Registry of peers by file key, used for cleanup.
Expand Down Expand Up @@ -559,6 +569,8 @@ impl<Runtime: StorageEnableRuntime> ActorEventLoop<FileTransferService<Runtime>>
async fn run(mut self) {
info!(target: LOG_TARGET, "💾 StorageHub's File Transfer Service starting up!");

self.actor.initialise_trusted_msps().await;

let ticker = interval(Duration::from_secs(1));
let ticker_stream = stream::unfold(ticker, |mut interval| {
Box::pin(async move {
Expand Down Expand Up @@ -616,11 +628,16 @@ impl<Runtime: StorageEnableRuntime> FileTransferService<Runtime> {
protocol_name: ProtocolName,
request_receiver: async_channel::Receiver<IncomingRequest>,
network: Arc<dyn NetworkService>,
client: Arc<StorageHubClient<Runtime::RuntimeApi>>,
trusted_msps: Vec<ProviderId<Runtime>>,
) -> Self {
Self {
protocol_name,
request_receiver,
network,
client,
trusted_msps,
trusted_msp_peers: HashMap::new(),
peer_file_allow_list: HashSet::new(),
peers_by_file: HashMap::new(),
peer_bucket_allow_list: HashSet::new(),
Expand All @@ -635,6 +652,72 @@ impl<Runtime: StorageEnableRuntime> FileTransferService<Runtime> {
}
}

async fn initialise_trusted_msps(&mut self) {
if self.trusted_msps.is_empty() {
return;
}

let current_block_hash = self.client.info().best_hash;
let mut total_peers = 0usize;

for msp_id in self.trusted_msps.clone() {
let multiaddresses = match self
.client
.runtime_api()
.query_provider_multiaddresses(current_block_hash, &msp_id)
{
Ok(Ok(multiaddresses)) => multiaddresses,
Ok(Err(e)) => {
warn!(
target: LOG_TARGET,
"Failed to query provider multiaddresses for trusted MSP [0x{:x}]: {:?}",
msp_id,
e
);
continue;
}
Err(e) => {
warn!(
target: LOG_TARGET,
"Runtime API error querying provider multiaddresses for trusted MSP [0x{:x}]: {:?}",
msp_id,
e
);
continue;
}
};

let parsed_multiaddresses =
convert_raw_multiaddresses_to_multiaddr::<Runtime>(multiaddresses);

let mut msp_peers = 0usize;
for multiaddr in parsed_multiaddresses {
if let Some(peer_id) = PeerId::try_from_multiaddr(&multiaddr) {
self.network.add_known_address(peer_id.into(), multiaddr);
self.trusted_msp_peers.insert(peer_id, msp_id);
msp_peers += 1;
}
}

if msp_peers == 0 {
warn!(
target: LOG_TARGET,
"Trusted MSP [0x{:x}] resolved to 0 peer IDs from on-chain multiaddresses",
msp_id
);
}

total_peers += msp_peers;
}

info!(
target: LOG_TARGET,
"Configured {} trusted MSP(s) with {} resolved peer ID(s)",
self.trusted_msps.len(),
total_peers
);
}

async fn handle_request(
&mut self,
peer: PeerId,
Expand Down Expand Up @@ -820,10 +903,43 @@ impl<Runtime: StorageEnableRuntime> FileTransferService<Runtime> {
}

if let Some(bucket_id) = bucket_id {
self.peer_bucket_allow_list.contains(&(peer, bucket_id))
} else {
false
if self.peer_bucket_allow_list.contains(&(peer, bucket_id)) {
return true;
}

// If this peer belongs to a trusted MSP, validate it is the current MSP for the bucket.
if let Some(trusted_msp_id) = self.trusted_msp_peers.get(&peer) {
let current_block_hash = self.client.info().best_hash;
return match self
.client
.runtime_api()
.query_msp_id_of_bucket_id(current_block_hash, &bucket_id)
{
Ok(Ok(Some(current_msp))) => current_msp == *trusted_msp_id,
Ok(Ok(None)) => false,
Ok(Err(e)) => {
warn!(
target: LOG_TARGET,
"Error querying MSP for bucket [0x{:x}] during download authorisation: {:?}",
bucket_id,
e
);
false
}
Err(e) => {
warn!(
target: LOG_TARGET,
"Runtime API error querying MSP for bucket [0x{:x}] during download authorisation: {:?}",
bucket_id,
e
);
false
}
};
}
}

false
}

fn handle_bad_request(
Expand Down
15 changes: 12 additions & 3 deletions client/file-transfer-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use sc_network::{
use shc_actors_framework::actor::{ActorHandle, ActorSpawner, TaskSpawner};
use shc_common::{
traits::StorageEnableRuntime,
types::{BlockHash, OpaqueBlock, BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE},
types::{
BlockHash, OpaqueBlock, ProviderId, StorageHubClient, BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE,
},
};

pub use self::handler::FileTransferService;
Expand Down Expand Up @@ -91,13 +93,20 @@ pub async fn spawn_file_transfer_service<Runtime: StorageEnableRuntime>(
request_receiver: async_channel::Receiver<IncomingRequest>,
protocol_name: ProtocolName,
network: Arc<dyn NetworkService>,
client: Arc<StorageHubClient<Runtime::RuntimeApi>>,
trusted_msps: Vec<ProviderId<Runtime>>,
) -> ActorHandle<FileTransferService<Runtime>> {
let task_spawner = task_spawner
.with_name("file-transfer-service")
.with_group("network");

let file_transfer_service =
FileTransferService::<Runtime>::new(protocol_name, request_receiver, network);
let file_transfer_service = FileTransferService::<Runtime>::new(
protocol_name,
request_receiver,
network,
client,
trusted_msps,
);

let file_transfer_service_handle = task_spawner.spawn_actor(file_transfer_service);

Expand Down
56 changes: 56 additions & 0 deletions client/forest-manager/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ where
}
}

fn get_all_files(&self) -> Result<Vec<(HasherOutT<T>, FileMetadata)>, ErrorT<T>> {
let trie = TrieDBBuilder::<T>::new(&self.memdb, &self.root).build();
let mut files = Vec::new();
let mut trie_iter = trie
.iter()
.map_err(|_| ForestStorageError::FailedToCreateTrieIterator)?;

while let Some((_, value)) = trie_iter.next().transpose()? {
let metadata = FileMetadata::decode(&mut &value[..])?;
let file_key = metadata.file_key::<T::Hash>();
files.push((file_key, metadata));
}

Ok(files)
}

fn generate_proof(
&self,
challenged_file_keys: Vec<HasherOutT<T>>,
Expand Down Expand Up @@ -174,6 +190,46 @@ mod tests {
use shc_common::types::{Fingerprint, Proven, StorageProofsMerkleTrieLayout};
use sp_core::H256;

#[test]
fn test_get_all_files() {
let mut forest_storage = InMemoryForestStorage::<StorageProofsMerkleTrieLayout>::new();

let metadata_1 = FileMetadata::new(
"Alice".as_bytes().to_vec(),
"bucket".as_bytes().to_vec(),
"location_1".as_bytes().to_vec(),
100,
Fingerprint::default(),
)
.unwrap();
let metadata_2 = FileMetadata::new(
"Bob".as_bytes().to_vec(),
"bucket".as_bytes().to_vec(),
"location_2".as_bytes().to_vec(),
200,
Fingerprint::default(),
)
.unwrap();

let keys = ForestStorage::<StorageProofsMerkleTrieLayout, sh_parachain_runtime::Runtime>::insert_files_metadata(
&mut forest_storage,
&[metadata_1.clone(), metadata_2.clone()],
)
.unwrap();
assert_eq!(keys.len(), 2);

let mut all_files = ForestStorage::<
StorageProofsMerkleTrieLayout,
sh_parachain_runtime::Runtime,
>::get_all_files(&forest_storage)
.unwrap();
all_files.sort_by_key(|(k, _)| *k);

assert_eq!(all_files.len(), 2);
assert_eq!(all_files[0].0, keys[0].min(keys[1]));
assert_eq!(all_files[1].0, keys[0].max(keys[1]));
}

#[test]
fn test_initialization_with_no_existing_root() {
let forest_storage = InMemoryForestStorage::<StorageProofsMerkleTrieLayout>::new();
Expand Down
Loading
Loading