11use std:: {
22 collections:: { HashMap , HashSet } ,
3+ future:: Future ,
4+ pin:: Pin ,
35 str:: FromStr ,
6+ sync:: Arc ,
47 time:: Duration ,
58} ;
69
@@ -16,13 +19,13 @@ use shc_blockchain_service::{
1619 capacity_manager:: CapacityRequestData ,
1720 commands:: { BlockchainServiceCommandInterface , BlockchainServiceCommandInterfaceExt } ,
1821 events:: { NewStorageRequest , ProcessConfirmStoringRequest } ,
19- types:: { ConfirmStoringRequest , RetryStrategy , SendExtrinsicOptions } ,
22+ types:: { ConfirmStoringRequest , RetryStrategy , SendExtrinsicOptions , WatchTransactionError } ,
2023} ;
2124use shc_common:: {
2225 consts:: CURRENT_FOREST_KEY ,
2326 traits:: StorageEnableRuntime ,
2427 types:: {
25- FileKey , FileKeyWithProof , FileMetadata , HashT , StorageProofsMerkleTrieLayout ,
28+ FileKey , FileKeyWithProof , FileMetadata , HashT , ProviderId , StorageProofsMerkleTrieLayout ,
2629 StorageProviderId , BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE ,
2730 } ,
2831} ;
@@ -76,7 +79,7 @@ pub struct BspUploadFileTask<NT, Runtime>
7679where
7780 NT : ShNodeType < Runtime > ,
7881 NT :: FSH : BspForestStorageHandlerT < Runtime > ,
79- Runtime : StorageEnableRuntime ,
82+ Runtime : StorageEnableRuntime + ' static ,
8083{
8184 storage_hub_handler : StorageHubHandler < NT , Runtime > ,
8285 file_key_cleanup : Option < Runtime :: Hash > ,
@@ -101,7 +104,7 @@ where
101104
102105impl < NT , Runtime > BspUploadFileTask < NT , Runtime >
103106where
104- NT : ShNodeType < Runtime > ,
107+ NT : ShNodeType < Runtime > + ' static ,
105108 NT :: FSH : BspForestStorageHandlerT < Runtime > ,
106109 Runtime : StorageEnableRuntime ,
107110{
@@ -428,8 +431,8 @@ where
428431
429432impl < NT , Runtime > BspUploadFileTask < NT , Runtime >
430433where
431- NT : ShNodeType < Runtime > ,
432- NT :: FSH : BspForestStorageHandlerT < Runtime > ,
434+ NT : ShNodeType < Runtime > + ' static ,
435+ NT :: FSH : BspForestStorageHandlerT < Runtime > + ' static ,
433436 Runtime : StorageEnableRuntime ,
434437{
435438 async fn handle_new_storage_request_event (
@@ -671,12 +674,36 @@ where
671674 }
672675 . into ( ) ;
673676
674- // Send extrinsic and wait for it to be included in the block.
675- let result = self
677+ // Clone necessary data for the retry check.
678+ let cloned_sh_handler = Arc :: new ( self . storage_hub_handler . clone ( ) ) ;
679+ let cloned_own_bsp_id = Arc :: new ( own_bsp_id. clone ( ) ) ;
680+ let cloned_file_key: Arc < Runtime :: Hash > = Arc :: new ( file_key. clone ( ) . into ( ) ) ;
681+
682+ let should_retry = move |error| {
683+ let cloned_sh_handler = Arc :: clone ( & cloned_sh_handler) ;
684+ let cloned_own_bsp_id = Arc :: clone ( & cloned_own_bsp_id) ;
685+ let cloned_file_key = Arc :: clone ( & cloned_file_key) ;
686+
687+ // Check:
688+ // - If we've already successfully volunteered for the file.
689+ // - If the storage request is no longer open to volunteers.
690+ // Also waits for the tick to be able to volunteer for the file has actually been reached,
691+ // not the tick before the BSP can volunteer for the file. To make sure the chain wasn't
692+ // spammed just before the BSP could volunteer for the file.
693+ Box :: pin ( Self :: should_retry_volunteer (
694+ cloned_sh_handler,
695+ cloned_own_bsp_id,
696+ cloned_file_key,
697+ error,
698+ ) ) as Pin < Box < dyn Future < Output = bool > + Send > >
699+ } ;
700+
701+ // Try to send the volunteer extrinsic
702+ if let Err ( e) = self
676703 . storage_hub_handler
677704 . blockchain
678- . send_extrinsic (
679- call. clone ( ) . into ( ) ,
705+ . submit_extrinsic_with_retry (
706+ call. clone ( ) ,
680707 SendExtrinsicOptions :: new (
681708 Duration :: from_secs (
682709 self . storage_hub_handler
@@ -687,57 +714,43 @@ where
687714 Some ( "fileSystem" . to_string ( ) ) ,
688715 Some ( "bspVolunteer" . to_string ( ) ) ,
689716 ) ,
717+ RetryStrategy :: default ( )
718+ . with_max_retries ( self . config . max_try_count )
719+ . with_max_tip ( self . config . max_tip . saturated_into ( ) )
720+ . with_should_retry ( Some ( Box :: new ( should_retry) ) ) ,
721+ false ,
690722 )
691- . await ?
692- . watch_for_success ( & self . storage_hub_handler . blockchain )
693- . await ;
723+ . await
724+ {
725+ error ! ( target: LOG_TARGET , "Failed to volunteer for file {:x}: {:?}" , file_key, e) ;
726+ }
694727
695- if let Err ( e) = result {
728+ // Check if the BSP has been registered as a volunteer for the file.
729+ let volunteer_result = self
730+ . storage_hub_handler
731+ . blockchain
732+ . query_bsp_volunteered_for_file ( own_bsp_id, file_key. into ( ) )
733+ . await
734+ . map_err ( |e| anyhow ! ( "Failed to query BSP volunteered for file: {:?}" , e) ) ?;
735+
736+ // Handle the volunteer result.
737+ if volunteer_result {
738+ info ! (
739+ target: LOG_TARGET ,
740+ "🍾 BSP successfully volunteered for file {:x}" ,
741+ file_key
742+ ) ;
743+ } else {
696744 error ! (
697745 target: LOG_TARGET ,
698- "Failed to volunteer for file {:?}: {:?}" ,
699- file_key,
700- e
746+ "BSP not registered as a volunteer for file {:x}" ,
747+ file_key
701748 ) ;
702-
703- // If the initial call errored out, it could mean the chain was spammed so the tick did not advance.
704- // Wait until the actual earliest volunteer tick to occur and retry volunteering.
705- self . storage_hub_handler
706- . blockchain
707- . wait_for_tick ( earliest_volunteer_tick)
708- . await ?;
709-
710- // Send extrinsic and wait for it to be included in the block.
711- let result = self
712- . storage_hub_handler
713- . blockchain
714- . send_extrinsic (
715- call,
716- SendExtrinsicOptions :: new (
717- Duration :: from_secs (
718- self . storage_hub_handler
719- . provider_config
720- . blockchain_service
721- . extrinsic_retry_timeout ,
722- ) ,
723- Some ( "fileSystem" . to_string ( ) ) ,
724- Some ( "bspVolunteer" . to_string ( ) ) ,
725- ) ,
726- )
727- . await ?
728- . watch_for_success ( & self . storage_hub_handler . blockchain )
729- . await ;
730-
731- if let Err ( e) = result {
732- error ! (
733- target: LOG_TARGET ,
734- "Failed to volunteer for file {:?} after retry in volunteer tick: {:?}" ,
735- file_key,
736- e
737- ) ;
738-
739- self . unvolunteer_file ( file_key. into ( ) ) . await ;
740- }
749+ self . unvolunteer_file ( file_key. into ( ) ) . await ;
750+ return Err ( anyhow ! (
751+ "BSP not registered as a volunteer for file {:x}" ,
752+ file_key
753+ ) ) ;
741754 }
742755
743756 Ok ( ( ) )
@@ -750,6 +763,8 @@ where
750763 & mut self ,
751764 event : RemoteUploadRequest < Runtime > ,
752765 ) -> anyhow:: Result < bool > {
766+ debug ! ( target: LOG_TARGET , "Handling remote upload request for file key {:x}" , event. file_key) ;
767+
753768 let file_key = event. file_key . into ( ) ;
754769 let mut write_file_storage = self . storage_hub_handler . file_storage . write ( ) . await ;
755770
@@ -997,6 +1012,85 @@ where
9971012 return Ok ( true ) ;
9981013 }
9991014
1015+ /// Function to determine if a volunteer request should be retried,
1016+ /// sending the same request again.
1017+ ///
1018+ /// This function will return `true` if and only if the following conditions are met:
1019+ /// 1. If the storage request is no longer open to volunteers.
1020+ /// 2. If we've already successfully volunteered for the file.
1021+ ///
1022+ /// Also waits for the tick to be able to volunteer for the file has actually been reached,
1023+ /// not the tick before the BSP can volunteer for the file. To make sure the chain wasn't
1024+ /// spammed just before the BSP could volunteer for the file.
1025+ async fn should_retry_volunteer (
1026+ sh_handler : Arc < StorageHubHandler < NT , Runtime > > ,
1027+ bsp_id : Arc < ProviderId < Runtime > > ,
1028+ file_key : Arc < Runtime :: Hash > ,
1029+ _error : WatchTransactionError ,
1030+ ) -> bool {
1031+ // Wait for the tick to be able to volunteer for the file has actually been reached.
1032+ let earliest_volunteer_tick = match sh_handler
1033+ . blockchain
1034+ . query_file_earliest_volunteer_tick ( * bsp_id, * file_key)
1035+ . await
1036+ {
1037+ Ok ( tick) => tick,
1038+ Err ( e) => {
1039+ error ! ( target: LOG_TARGET , "Failed to query file earliest volunteer block: {:?}" , e) ;
1040+ return false ;
1041+ }
1042+ } ;
1043+ match sh_handler
1044+ . blockchain
1045+ . wait_for_tick ( earliest_volunteer_tick)
1046+ . await
1047+ {
1048+ Ok ( _) => { }
1049+ Err ( e) => {
1050+ error ! ( target: LOG_TARGET , "Failed to wait for tick: {:?}" , e) ;
1051+ return false ;
1052+ }
1053+ }
1054+
1055+ // Check if the storage request is no longer open to volunteers.
1056+ let can_volunteer = match sh_handler
1057+ . blockchain
1058+ . is_storage_request_open_to_volunteers ( * file_key)
1059+ . await
1060+ {
1061+ Ok ( can_volunteer) => can_volunteer,
1062+ Err ( e) => {
1063+ error ! ( target: LOG_TARGET , "Failed to query file can volunteer: {:?}" , e) ;
1064+ return false ;
1065+ }
1066+ } ;
1067+
1068+ if !can_volunteer {
1069+ warn ! ( target: LOG_TARGET , "Storage request is no longer open to volunteers. Stop retrying." ) ;
1070+ return false ;
1071+ }
1072+
1073+ // Check if we've already successfully volunteered for the file.
1074+ let volunteered = match sh_handler
1075+ . blockchain
1076+ . query_bsp_volunteered_for_file ( * bsp_id, * file_key)
1077+ . await
1078+ {
1079+ Ok ( volunteered) => volunteered,
1080+ Err ( e) => {
1081+ error ! ( target: LOG_TARGET , "Failed to query file volunteered: {:?}" , e) ;
1082+ return false ;
1083+ }
1084+ } ;
1085+
1086+ if volunteered {
1087+ info ! ( target: LOG_TARGET , "Already successfully volunteered for the file. Stop retrying." ) ;
1088+ return false ;
1089+ }
1090+
1091+ return true ;
1092+ }
1093+
10001094 async fn unvolunteer_file ( & self , file_key : Runtime :: Hash ) {
10011095 warn ! ( target: LOG_TARGET , "Unvolunteering file {:?}" , file_key) ;
10021096
0 commit comments