Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion client/blockchain-service/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use sp_api::ApiError;

use pallet_file_system_runtime_api::{
IsStorageRequestOpenToVolunteersError, QueryBspConfirmChunksToProveForFileError,
QueryFileEarliestVolunteerTickError, QueryMspConfirmChunksToProveForFileError,
QueryBspsVolunteeredForFileError, QueryFileEarliestVolunteerTickError,
QueryMspConfirmChunksToProveForFileError,
};
use pallet_payment_streams_runtime_api::GetUsersWithDebtOverThresholdError;
use pallet_proofs_dealer_runtime_api::{
Expand Down Expand Up @@ -92,6 +93,11 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
msp_id: ProofsDealerProviderId<Runtime>,
file_key: Runtime::Hash,
},
#[command(success_type = bool, error_type = QueryBspsVolunteeredForFileError)]
QueryBspVolunteeredForFile {
bsp_id: BackupStorageProviderId<Runtime>,
file_key: Runtime::Hash,
},
#[command(success_type = Vec<Multiaddr>, error_type = QueryProviderMultiaddressesError)]
QueryProviderMultiaddresses { provider_id: ProviderId<Runtime> },
QueueSubmitProofRequest {
Expand Down
27 changes: 26 additions & 1 deletion client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use sp_runtime::{traits::Header, SaturatedConversion, Saturating};

use pallet_file_system_runtime_api::{
FileSystemApi, IsStorageRequestOpenToVolunteersError, QueryBspConfirmChunksToProveForFileError,
QueryFileEarliestVolunteerTickError, QueryMspConfirmChunksToProveForFileError,
QueryBspsVolunteeredForFileError, QueryFileEarliestVolunteerTickError,
QueryMspConfirmChunksToProveForFileError,
};
use pallet_payment_streams_runtime_api::{GetUsersWithDebtOverThresholdError, PaymentStreamsApi};
use pallet_proofs_dealer_runtime_api::{
Expand Down Expand Up @@ -617,6 +618,30 @@ where
}
}
}
BlockchainServiceCommand::QueryBspVolunteeredForFile {
bsp_id,
file_key,
callback,
} => {
let current_block_hash = self.client.info().best_hash;

let bsps_volunteered = self
.client
.runtime_api()
.query_bsps_volunteered_for_file(current_block_hash, file_key)
.unwrap_or_else(|_| Err(QueryBspsVolunteeredForFileError::InternalError));

let volunteered = bsps_volunteered.map(|bsps| bsps.contains(&bsp_id));

match callback.send(volunteered) {
Ok(_) => {
trace!(target: LOG_TARGET, "BSP volunteered status sent successfully");
}
Err(e) => {
error!(target: LOG_TARGET, "Failed to send BSP volunteered status: {:?}", e);
}
}
}
BlockchainServiceCommand::QueryProviderMultiaddresses {
provider_id,
callback,
Expand Down
87 changes: 63 additions & 24 deletions client/src/tasks/bsp_upload_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ where
}
.into();

// Send extrinsic and wait for it to be included in the block.
let result = self
// Try to send the volunteer extrinsic
let mut volunteer_result = self
.storage_hub_handler
.blockchain
.send_extrinsic(
Expand All @@ -692,50 +692,87 @@ where
.watch_for_success(&self.storage_hub_handler.blockchain)
.await;

if let Err(e) = result {
// If the first attempt failed, check if we've already volunteered and retry if needed
if let Err(ref e) = volunteer_result {
error!(
target: LOG_TARGET,
"Failed to volunteer for file {:?}: {:?}",
"Failed to volunteer for file {:x} on first attempt: {:?}",
file_key,
e
);

// If the initial call errored out, it could mean the chain was spammed so the tick did not advance.
// Wait until the actual earliest volunteer tick to occur and retry volunteering.
info!(
target: LOG_TARGET,
"Retrying volunteer for file {:x} at tick {:?}",
file_key,
earliest_volunteer_tick
);

self.storage_hub_handler
.blockchain
.wait_for_tick(earliest_volunteer_tick)
.await?;

// Send extrinsic and wait for it to be included in the block.
let result = self
// Check if we've already successfully volunteered despite the error.
// This is to avoid unvolunteering for the file if we've already successfully volunteered
// and registered it in the file transfer service.
let already_volunteered = self
.storage_hub_handler
.blockchain
.send_extrinsic(
call,
SendExtrinsicOptions::new(
Duration::from_secs(
self.storage_hub_handler
.provider_config
.blockchain_service
.extrinsic_retry_timeout,
.query_bsp_volunteered_for_file(own_bsp_id, file_key.into())
.await
.unwrap_or(false);

// If we have already successfully volunteered, skip retrying.
if already_volunteered {
info!(
target: LOG_TARGET,
"BSP already volunteered for file {:x} successfully",
file_key
);
volunteer_result = Ok(());
} else {
// If we haven't successfully volunteered yet, retry sending the extrinsic.
volunteer_result = self
.storage_hub_handler
.blockchain
.send_extrinsic(
call,
SendExtrinsicOptions::new(
Duration::from_secs(
self.storage_hub_handler
.provider_config
.blockchain_service
.extrinsic_retry_timeout,
),
Some("fileSystem".to_string()),
Some("bspVolunteer".to_string()),
),
Some("fileSystem".to_string()),
Some("bspVolunteer".to_string()),
),
)
.await?
.watch_for_success(&self.storage_hub_handler.blockchain)
.await;
)
.await?
.watch_for_success(&self.storage_hub_handler.blockchain)
.await;
}
}

if let Err(e) = result {
// Handle the volunteer result.
match volunteer_result {
Ok(_) => {
info!(
target: LOG_TARGET,
"BSP volunteered for file {:x} successfully",
file_key
);
}
Err(e) => {
error!(
target: LOG_TARGET,
"Failed to volunteer for file {:?} after retry in volunteer tick: {:?}",
"Failed to volunteer for file {:x} after all attempts: {:?}",
file_key,
e
);

self.unvolunteer_file(file_key.into()).await;
}
}
Expand All @@ -750,6 +787,8 @@ where
&mut self,
event: RemoteUploadRequest<Runtime>,
) -> anyhow::Result<bool> {
debug!(target: LOG_TARGET, "Handling remote upload request for file key {:x}", event.file_key);

let file_key = event.file_key.into();
let mut write_file_storage = self.storage_hub_handler.file_storage.write().await;

Expand Down
49 changes: 42 additions & 7 deletions client/src/tasks/msp_upload_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,16 +358,51 @@ where
.watch_for_success(&self.storage_hub_handler.blockchain)
.await?;

// Remove the files that were rejected from the File Storage.
// Log accepted and rejected files, and remove rejected files from File Storage.
// Accepted files will be added to the Bucket's Forest Storage by the BlockchainService.
for storage_request_msp_bucket_response in storage_request_msp_response {
let mut fs = self.storage_hub_handler.file_storage.write().await;
// Log accepted file keys
if let Some(ref accepted) = storage_request_msp_bucket_response.accept {
let accepted_file_keys: Vec<_> = accepted
.file_keys_and_proofs
.iter()
.map(|fk| fk.file_key)
.collect();

if !accepted_file_keys.is_empty() {
info!(
target: LOG_TARGET,
"Accepted {} file(s) for bucket {:?}: {:?}",
accepted_file_keys.len(),
storage_request_msp_bucket_response.bucket_id,
accepted_file_keys
);
}
}

for RejectedStorageRequest { file_key, .. } in
&storage_request_msp_bucket_response.reject
{
if let Err(e) = fs.delete_file(&file_key) {
error!(target: LOG_TARGET, "Failed to delete file {:?}: {:?}", file_key, e);
// Log and delete rejected file keys
if !storage_request_msp_bucket_response.reject.is_empty() {
let rejected_file_keys: Vec<_> = storage_request_msp_bucket_response
.reject
.iter()
.map(|r| (r.file_key, &r.reason))
.collect();

info!(
target: LOG_TARGET,
"Rejected {} file(s) for bucket {:?}: {:?}",
rejected_file_keys.len(),
storage_request_msp_bucket_response.bucket_id,
rejected_file_keys
);

let mut fs = self.storage_hub_handler.file_storage.write().await;
for RejectedStorageRequest { file_key, .. } in
&storage_request_msp_bucket_response.reject
{
if let Err(e) = fs.delete_file(&file_key) {
error!(target: LOG_TARGET, "Failed to delete file {:?}: {:?}", file_key, e);
}
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions pallets/file-system/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ where
}
};

// Get the current tick number
let current_tick =
<T::ProofDealer as shp_traits::ProofsDealerInterface>::get_current_tick();

// Get the threshold for the BSP to be able to volunteer for the storage request.
// The current eligibility value of this storage request for this BSP has to be greater than
// this for the BSP to be able to volunteer.
Expand All @@ -167,8 +171,6 @@ where
Some(diff) => diff,
None => {
// The BSP's threshold is less than the eligibility current value, which means the BSP is already eligible to volunteer.
let current_tick =
<T::ProofDealer as shp_traits::ProofsDealerInterface>::get_current_tick();
return Ok(current_tick);
}
};
Expand All @@ -183,7 +185,7 @@ where
};

// Compute the earliest tick number at which the BSP can send the volunteer request.
let earliest_volunteer_tick = storage_request_tick.saturating_add(
let earliest_volunteer_tick = current_tick.saturating_add(
T::ThresholdTypeToTickNumber::convert(min_ticks_to_wait_to_volunteer),
);

Expand Down
Loading
Loading