Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
2e05ac3
feat(msp): msp batch respond storage requests per block
snowmead Nov 18, 2025
dae7b1a
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Nov 19, 2025
497440b
refactor(tests): use batch storage request api
snowmead Nov 19, 2025
18f8b1f
fix(backend): :bug: validate file upload against bucket owner instead…
santikaplan Nov 19, 2025
bb23fb3
refactor(backend): do not get peer id in upload file (#573)
elfedy Nov 20, 2025
dcac303
feat(client): ✨ Persist pending extrinsics in new Blockchain Service …
ffarall Nov 21, 2025
99d1a6d
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Nov 24, 2025
595731b
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Nov 24, 2025
cd77396
refactor(msp): optimize batch storage request processing
snowmead Nov 25, 2025
bfd7900
fix: fmt
snowmead Nov 25, 2025
d04b42f
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Nov 25, 2025
832d0a3
fix(msp): do not emit storage request event for msp on startup
snowmead Nov 25, 2025
fd41631
fix: compilation
snowmead Nov 25, 2025
a158c0c
fix: fmt
snowmead Nov 25, 2025
3382bdd
fix(test): check if acceptance and confirmations reached in batch sto…
snowmead Nov 25, 2025
deb57da
use batch storage requests for all tests and add logging
snowmead Nov 25, 2025
79d115d
fix lint
snowmead Nov 25, 2025
1ac1b00
fix: skip accepting the storage request and increasing capacity for f…
snowmead Nov 25, 2025
fc7404c
feat(file-system): add msp field to NewStorageRequest event
snowmead Nov 26, 2025
5e4117b
fix: bump api augment version
snowmead Nov 26, 2025
b55bce9
fix(msp): filter query pending storage requests when accepting
snowmead Nov 26, 2025
e09d281
ignore msp already accepted extrinisc errors in batch storage request…
snowmead Nov 26, 2025
85dfbec
Revert "feat(file-system): add msp field to NewStorageRequest event"
snowmead Nov 26, 2025
1efb707
fix(msp): deduplicate file keys in storage request queue
snowmead Nov 26, 2025
a353f2c
revert: bump api augment version
snowmead Nov 26, 2025
58b973f
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Nov 26, 2025
8f45483
rust idioms
snowmead Nov 26, 2025
351fe31
refactor(msp): per-file event-driven storage request processing with …
snowmead Nov 28, 2025
c4e6ce7
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Nov 28, 2025
fef62d8
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Nov 28, 2025
5560a11
fix(blockchain-service): keep deprecated column families for RocksDB …
snowmead Nov 28, 2025
ca7b87e
fix deprecated marcation
snowmead Nov 28, 2025
9347049
refactor(msp): batch process storage requests with retrying
snowmead Dec 1, 2025
9bc490f
fix(msp-tasks): use single instance for concurrent reference to same …
snowmead Dec 1, 2025
82ddfe2
do not trigger retry from this runtime error "KeyProofVerificationFai…
snowmead Dec 1, 2025
2929a58
update docs
snowmead Dec 1, 2025
36aee6a
organize impls
snowmead Dec 1, 2025
87f6500
Remove .mcp.json from version control
snowmead Dec 1, 2025
f95d02a
use debug logs and refactor non idiomatic rust code
snowmead Dec 1, 2025
99b1372
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Dec 1, 2025
82e2cee
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Dec 1, 2025
6e8636c
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Dec 1, 2025
2690371
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Dec 1, 2025
213ec43
add additional verifications and waits for stability purposes
snowmead Dec 1, 2025
2b890ba
fix(test): use finalized block number in waitForIndexing and add warm…
snowmead Dec 2, 2025
4435ee7
refactor: 🚚 Move release and config files to inner directories (#584)
ffarall Dec 1, 2025
fe5f68f
fix: 🚑 avoid deadlocks during the MSP upload process (#582)
TDemeco Dec 2, 2025
805feb8
fix(backend): :bug: avoid concurrent uploads for the same file key (#…
TDemeco Dec 2, 2025
2e685a2
fix: :bug: update column order of migration in indexer DB to match sc…
TDemeco Dec 2, 2025
5c9b045
docs: :memo: Add missing parameters to config files (#588)
ffarall Dec 2, 2025
931759e
Merge remote-tracking branch 'origin/main' into feat/msp-batch-respon…
snowmead Dec 2, 2025
a5c8599
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Dec 2, 2025
8f68c4f
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Dec 2, 2025
7fb3423
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Dec 2, 2025
f5c7719
Merge branch 'main' into feat/msp-batch-respond-storage-requests
snowmead Dec 3, 2025
85e97a7
fix: fmt
snowmead Dec 3, 2025
7f060ee
feat: add RocksDB column family migration system
snowmead Dec 4, 2025
60abed1
chore: remove unnecessary #[non_exhaustive] from MigrationError
snowmead Dec 4, 2025
5416741
fix: fmt
snowmead Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ config.toml
.cursorindexingignore
.specstory/
.claude/

# MCP configuration
.mcp.json
11 changes: 11 additions & 0 deletions client/blockchain-service/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use shc_forest_manager::traits::ForestStorageHandler;

use crate::{
capacity_manager::CapacityRequestData,
events::NewStorageRequest,
handler::BlockchainService,
transaction_manager::wait_for_transaction_status,
types::{
Expand Down Expand Up @@ -106,6 +107,7 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
QueueConfirmBspRequest {
request: ConfirmStoringRequest<Runtime>,
},
#[command(mode = "FireAndForget")]
QueueMspRespondStorageRequest {
request: RespondStorageRequest<Runtime>,
},
Expand Down Expand Up @@ -188,6 +190,15 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
},
#[command(success_type = Vec<BucketId<Runtime>>)]
QueryBucketsForMsp { msp_id: ProviderId<Runtime> },
/// Query pending storage requests for the MSP.
/// If `file_keys` is provided, only query those specific storage requests from storage.
/// If `file_keys` is None, returns all pending storage requests via runtime API.
#[command(success_type = Vec<NewStorageRequest<Runtime>>)]
QueryPendingStorageRequests { file_keys: Option<Vec<FileKey>> },
/// Preprocess a storage request by emitting a NewStorageRequest event.
/// Called by MspUploadFileTask's BatchProcessStorageRequests handler for each pending request.
#[command(mode = "FireAndForget")]
PreprocessStorageRequest { request: NewStorageRequest<Runtime> },
}

/// Interface for interacting with the BlockchainService actor.
Expand Down
13 changes: 13 additions & 0 deletions client/blockchain-service/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,19 @@ pub struct DistributeFileToBsp<Runtime: StorageEnableRuntime> {
#[actor(actor = "blockchain_service")]
pub struct VerifyMspBucketForests {}

/// Event triggered periodically to process batched storage requests for MSPs.
///
/// The semaphore permit is automatically released when the event handler completes or fails,
/// ensuring only one batch processing cycle runs at a time.
#[derive(Debug, Clone, ActorEvent)]
#[actor(actor = "blockchain_service")]
pub struct BatchProcessStorageRequests {
/// Semaphore permit wrapped in Arc to satisfy Clone requirement for events.
/// The permit is held by the event handler for its lifetime,
/// automatically releasing when the handler completes or fails.
pub permit: Arc<tokio::sync::OwnedSemaphorePermit>,
}

/// The event bus provider for the BlockchainService actor.
///
/// It holds the event buses for the different events that the BlockchainService actor
Expand Down
137 changes: 124 additions & 13 deletions client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use shc_forest_manager::traits::ForestStorageHandler;
use crate::{
capacity_manager::{CapacityRequest, CapacityRequestQueue},
commands::BlockchainServiceCommand,
events::BlockchainServiceEventBusProvider,
events::{BlockchainServiceEventBusProvider, NewStorageRequest},
state::{BlockchainServiceStateStore, LastProcessedBlockNumberCf},
transaction_manager::{TransactionManager, TransactionManagerConfig},
types::{
Expand Down Expand Up @@ -945,19 +945,51 @@ where
}
}
}
BlockchainServiceCommand::QueueMspRespondStorageRequest { request, callback } => {
let state_store_context = self.persistent_state.open_rw_context_with_overlay();
state_store_context
.pending_msp_respond_storage_request_deque()
.push_back(request);
state_store_context.commit();
// We check right away if we can process the request so we don't waste time.
self.msp_assign_forest_root_write_lock();
match callback.send(Ok(())) {
Ok(_) => {}
Err(e) => {
error!(target: LOG_TARGET, "Failed to send receiver: {:?}", e);
BlockchainServiceCommand::QueueMspRespondStorageRequest { request } => {
if let Some(ManagedProvider::Msp(msp_handler)) =
&mut self.maybe_managed_provider
{
let file_key = request.file_key;

trace!(
target: LOG_TARGET,
"QueueMspRespondStorageRequest received for file key {:?}",
file_key
);

// Check if file key is already pending (O(1) deduplication).
// `insert` returns true if the key was not present (i.e., we should queue).
if msp_handler
.pending_respond_storage_request_file_keys
.insert(file_key)
{
msp_handler
.pending_respond_storage_requests
.push_back(request);

trace!(
target: LOG_TARGET,
"File key {:?} added to pending queue (size: {})",
file_key,
msp_handler.pending_respond_storage_requests.len()
);

// We check right away if we can process the request so we don't waste time.
self.msp_assign_forest_root_write_lock();
} else {
trace!(
target: LOG_TARGET,
"File key {:?} already pending, skipping",
file_key
);
}
} else {
// Log the invariant violation but don't fail - this is fire-and-forget
error!(
target: LOG_TARGET,
"QueueMspRespondStorageRequest received while not managing an MSP. \
This is an invariant violation - please report to StorageHub team."
);
}
}
BlockchainServiceCommand::QueueSubmitProofRequest { request, callback } => {
Expand Down Expand Up @@ -1283,6 +1315,84 @@ where
}
}
}
BlockchainServiceCommand::QueryPendingStorageRequests {
file_keys,
callback,
} => {
let managed_msp_id = match &self.maybe_managed_provider {
Some(ManagedProvider::Msp(msp_handler)) => msp_handler.msp_id.clone(),
_ => {
error!(target: LOG_TARGET, "`QueryPendingStorageRequests` should only be called if the node is managing a MSP. Found [{:?}] instead.", self.maybe_managed_provider);
match callback.send(Err(anyhow!("Node is not managing an MSP"))) {
Ok(_) => {}
Err(e) => {
error!(target: LOG_TARGET, "Failed to send error: {:?}", e);
}
}
return;
}
};

let current_block_hash = self.client.info().best_hash;

// Query pending storage requests (not yet accepted by MSP)
let storage_requests = match self
.client
.runtime_api()
.pending_storage_requests_by_msp(current_block_hash, managed_msp_id)
{
Ok(mut sr) => {
// If specific file keys provided, look them up directly
match file_keys {
Some(keys) => keys
.into_iter()
.filter_map(|k| {
let file_key = sp_core::H256::from_slice(k.as_ref());
sr.remove(&file_key).map(|metadata| (file_key, metadata))
})
.collect(),
None => sr,
}
}
Err(_) => {
warn!(target: LOG_TARGET, "Failed to get pending storage requests");
match callback.send(Ok(Vec::new())) {
Ok(_) => {}
Err(e) => {
error!(target: LOG_TARGET, "Failed to send empty result: {:?}", e);
}
}
return;
}
};

let new_storage_requests: Vec<NewStorageRequest<Runtime>> = storage_requests
.into_iter()
.map(|(file_key, sr)| NewStorageRequest {
who: sr.owner,
file_key: file_key.into(),
bucket_id: sr.bucket_id,
location: sr.location,
fingerprint: sr.fingerprint.as_ref().into(),
size: sr.size,
user_peer_ids: sr.user_peer_ids,
expires_at: sr.expires_at,
})
.collect();

match callback.send(Ok(new_storage_requests)) {
Ok(_) => {}
Err(e) => {
error!(target: LOG_TARGET, "Failed to send pending storage requests: {:?}", e);
}
}
}
BlockchainServiceCommand::PreprocessStorageRequest { request } => {
// Emit the NewStorageRequest event for this storage request.
// This is called by MspUploadFileTask's BatchProcessStorageRequests handler
// for each pending storage request to trigger per-file processing.
self.emit(request);
}
BlockchainServiceCommand::ReleaseForestRootWriteLock {
forest_root_write_tx,
callback,
Expand Down Expand Up @@ -1592,6 +1702,7 @@ where
.await;
}
Some(ManagedProvider::Msp(_)) => {
self.msp_monitor_block();
self.msp_end_block_processing(block_hash, block_number, tree_route)
.await;
}
Expand Down
Loading
Loading