Skip to content
4 changes: 4 additions & 0 deletions client/blockchain-service/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use shc_common::types::{
};
use shc_forest_manager::traits::ForestStorageHandler;

use crate::forest_write_lock_manager::ForestWriteLockGuard;
use crate::{
capacity_manager::CapacityRequestData,
handler::BlockchainService,
Expand Down Expand Up @@ -163,6 +164,9 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
QueryMspIdOfBucketId { bucket_id: BucketId<Runtime> },
ReleaseForestRootWriteLock {
forest_root_write_tx: tokio::sync::oneshot::Sender<()>,
// Optional guard to keep the lock alive until this command is processed.
// TODO: Replace the forest root write tx with ForestWriteLockGuard entirely
forest_write_lock_guard: Option<ForestWriteLockGuard<Runtime>>,
},
QueueFileDeletionRequest {
request: FileDeletionRequest<Runtime>,
Expand Down
9 changes: 5 additions & 4 deletions client/blockchain-service/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use codec::{Decode, Encode};
use sc_network::Multiaddr;
use tokio::sync::{oneshot, Mutex};

use crate::forest_write_lock_manager::ForestWriteLockManager;
use crate::types::{
ConfirmStoringRequest, FileDeletionRequest as FileDeletionRequestType, RespondStorageRequest,
};
use shc_actors_derive::{ActorEvent, ActorEventBus};
use shc_common::{
traits::StorageEnableRuntime,
Expand All @@ -14,10 +18,6 @@ use shc_common::{
},
};

use crate::types::{
ConfirmStoringRequest, FileDeletionRequest as FileDeletionRequestType, RespondStorageRequest,
};

// TODO: Add the events from the `pallet-cr-randomness` here to process them in the BlockchainService.

/// Multiple new challenge seeds that have to be responded in order.
Expand Down Expand Up @@ -174,6 +174,7 @@ pub struct ProcessMspRespondStoringRequestData<Runtime: StorageEnableRuntime> {
pub struct ProcessMspRespondStoringRequest<Runtime: StorageEnableRuntime> {
pub data: ProcessMspRespondStoringRequestData<Runtime>,
pub forest_root_write_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
pub forest_write_lock_manager: Arc<Mutex<ForestWriteLockManager<Runtime>>>,
}

#[derive(Debug, Clone, Encode, Decode)]
Expand Down
204 changes: 204 additions & 0 deletions client/blockchain-service/src/forest_write_lock_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
use {
crate::{events::ForestWriteLockTaskData, handler::LOG_TARGET},
log::{error, trace, warn},
shc_common::traits::StorageEnableRuntime,
std::{
collections::VecDeque,
sync::{Arc, Mutex},
},
tokio::sync::{Notify, Semaphore},
};

/// Priority value for forest write lock tickets
#[derive(Clone, Debug)]
pub enum ForestWriteLockPriority {
/// High priority request
High = 0,
/// Medium priority request
Medium = 1,
/// Low priority request
Low = 2,
}

/// Ticket structure for managing forest write lock requests
#[derive(Clone, Debug)]
pub struct ForestWriteLockTicket<Runtime: StorageEnableRuntime> {
/// Identifier for the ticket
pub id: String,
/// Priority of the lock request
pub priority: ForestWriteLockPriority,
/// Data associated with the lock request ticket
pub data: ForestWriteLockTaskData<Runtime>,
}

impl<Runtime: StorageEnableRuntime> ForestWriteLockTicket<Runtime> {
/// Creates a new ForestWriteLockTicket with the given data and determines its priority
/// Since MSPs and BSPs will have a separate queues, we can assign high priority to both types of requests
pub fn new(data: ForestWriteLockTaskData<Runtime>) -> Self {
Self {
id: format!("{:?}", data),
priority: match data {
ForestWriteLockTaskData::SubmitProofRequest(_) => ForestWriteLockPriority::High,
ForestWriteLockTaskData::MspRespondStorageRequest(_) => {
ForestWriteLockPriority::High
}
ForestWriteLockTaskData::ConfirmStoringRequest(_) => {
ForestWriteLockPriority::Medium
}
ForestWriteLockTaskData::StopStoringForInsolventUserRequest(_) => {
ForestWriteLockPriority::Low
}
},
data,
}
}
}

#[derive(Clone, Debug)]
pub struct ForestWriteLockManager<Runtime: StorageEnableRuntime> {
/// Semaphore to limit concurrent write operations
pub lock: Arc<Semaphore>,
/// Queue to manage pending write requests
pub queue: Arc<Mutex<VecDeque<ForestWriteLockTicket<Runtime>>>>,
/// Current holder of the write lock
pub current_holder: Arc<Mutex<Option<ForestWriteLockTicket<Runtime>>>>,
/// Notify mechanism to signal when the lock becomes available
pub notifier: Arc<Notify>,
}

impl<Runtime: StorageEnableRuntime> ForestWriteLockManager<Runtime> {
/// Creates a new ForestWriteLockManager instance
/// - The lock is initialized with a max single permit semaphore
// TODO: Allow the queue to be pre-populated with existing tickets if necessary
pub fn new() -> Self {
Self {
lock: Arc::new(Semaphore::const_new(1)),
queue: Arc::new(Mutex::new(VecDeque::new())),
current_holder: Arc::new(Mutex::new(None)),
notifier: Arc::new(Notify::new()),
}
}

/// Checks if the forest write lock is currently available
pub fn is_available(&self) -> bool {
self.lock.available_permits() > 0
}

/// Attempts to acquire the forest write lock or enqueues the request if the lock is not available
/// - If the lock is available and the queue is empty, it is granted immediately
/// - If the lock is unavailable, the request is enqueued based on its priority
/// - If the lock is available but there are pending requests in the queue, the request is enqueued
/// and the next ticket in the queue is granted the lock
/// Returns a ForestWriteLockGuard that manages the lifetime of the acquired lock
pub async fn acquire(
&self,
data: ForestWriteLockTaskData<Runtime>,
) -> ForestWriteLockGuard<Runtime> {
let ticket = ForestWriteLockTicket::new(data);

// Add the ticket to the priority queue first
// TODO: Handle the edge case where the acquire request is already in the queue
{
self.enqueue(ticket.clone());
}

// If the ticket is at the front of the queue, try to acquire the lock
// Otherwise, wait until it's our turn
loop {
if let Ok(permit) = self.lock.acquire().await {
if self.should_aquire_lock(&ticket) {
if let Ok(mut current_holder) = self.current_holder.try_lock() {
*current_holder = Some(ticket.clone());
permit.forget(); // Will be released manually when the guard is dropped
break;
} else {
warn!(target: LOG_TARGET, "TicketId {}: Failed to acquire lock on the current holder when acquiring forest write lock", ticket.id);
drop(permit);
}
} else {
drop(permit);
}
} else {
warn!(target: LOG_TARGET,"TicketId {}: Failed to acquire forest write lock semaphore", ticket.id);
}
// Not our turn, wait for notification
self.notifier.notified().await;
}

ForestWriteLockGuard {
manager: Arc::new(self.clone()),
ticket_id: ticket.id,
}
}

fn should_aquire_lock(&self, ticket: &ForestWriteLockTicket<Runtime>) -> bool {
if let Ok(mut queue) = self.queue.try_lock() {
if let Some(next_task) = queue.front() {
if next_task.id == ticket.id {
queue.pop_front();
true
} else {
false
}
} else {
false
}
} else {
warn!(target: LOG_TARGET, "TicketId {}: Failed to acquire lock on the queue to check next ticket", ticket.id);
false
}
}

/// Enqueues a ticket based on its priority
/// - High priority tickets are placed before any medium and low priority tickets
/// - Medium priority tickets are placed before any low priority tickets
/// - Low priority tickets are placed at the back of the queue
fn enqueue(&self, ticket: ForestWriteLockTicket<Runtime>) {
if let Ok(mut queue) = self.queue.try_lock() {
match ticket.priority {
ForestWriteLockPriority::Low => queue.push_back(ticket),
ForestWriteLockPriority::Medium => {
let insert_pos = queue
.iter()
.position(|item| matches!(item.priority, ForestWriteLockPriority::Low))
.unwrap_or(queue.len());
queue.insert(insert_pos, ticket);
}
ForestWriteLockPriority::High => {
let insert_pos = queue
.iter()
.position(|item| {
matches!(
item.priority,
ForestWriteLockPriority::Medium | ForestWriteLockPriority::Low
)
})
.unwrap_or(queue.len());
queue.insert(insert_pos, ticket);
}
}
} else {
error!(target: LOG_TARGET,"TicketId {}, Failed to acquire lock on the queue to enqueue ticket", ticket.id);
}
}
}
#[derive(Debug)]
pub struct ForestWriteLockGuard<Runtime: StorageEnableRuntime> {
manager: Arc<ForestWriteLockManager<Runtime>>,
ticket_id: String,
}

impl<Runtime: StorageEnableRuntime> Drop for ForestWriteLockGuard<Runtime> {
/// Release the semaphore permit when the guard is dropped and acquire the next ticket if available
fn drop(&mut self) {
// Release the permit back to the semaphore
self.manager.lock.add_permits(1);
// Clear the current holder
if let Ok(mut holder) = self.manager.current_holder.try_lock() {
*holder = None;
};
// Notify next ticket in the queue
self.manager.notifier.notify_one();
trace!(target: LOG_TARGET, "TicketId {}: Forest root write lock released by ticket", self.ticket_id);
}
}
7 changes: 7 additions & 0 deletions client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ where
}
BlockchainServiceCommand::ReleaseForestRootWriteLock {
forest_root_write_tx,
forest_write_lock_guard,
callback,
} => {
if let Some(managed_bsp_or_msp) = &self.maybe_managed_provider {
Expand All @@ -1254,6 +1255,12 @@ where
if forest_root_write_result.is_ok() {
match managed_bsp_or_msp {
ManagedProvider::Msp(_) => {
// This is a temporary check due to current support for 2 different lock mechnisms
// TODO: Remove this when the old lock mechanism is removed
match forest_write_lock_guard {
Some(guard) => drop(guard),
None => (),
}
self.msp_assign_forest_root_write_lock();
}
ManagedProvider::Bsp(_) => {
Expand Down
35 changes: 29 additions & 6 deletions client/blockchain-service/src/handler_msp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,25 @@ where
}
};

// TODO: Update to use ForestWriteLockManager
// This is done in a closure to avoid borrowing `self` immutably and then mutably.
// Inside of this closure, we borrow `self` mutably when taking ownership of the lock.
{
let forest_root_write_lock = match &mut self.maybe_managed_provider {
Some(ManagedProvider::Msp(msp_handler)) => &mut msp_handler.forest_root_write_lock,
_ => unreachable!("We just checked this is a MSP"),
};
let (forest_root_write_lock, forest_write_lock_manager) =
match &mut self.maybe_managed_provider {
Some(ManagedProvider::Msp(msp_handler)) => (
&mut msp_handler.forest_root_write_lock,
&mut msp_handler.forest_write_lock_manager,
),
_ => unreachable!("We just checked this is a MSP"),
};

if !forest_write_lock_manager.is_available() {
trace!(target: LOG_TARGET, "Waiting for current Forest root write task to finish");
return;
} else {
trace!(target: LOG_TARGET, "Forest root write task finished, lock is released!");
}

if let Some(mut rx) = forest_root_write_lock.take() {
// Note: tasks that get ownership of the lock are responsible for sending a message back when done processing.
Expand Down Expand Up @@ -462,8 +474,13 @@ where

fn msp_emit_forest_write_event(&mut self, data: impl Into<ForestWriteLockTaskData<Runtime>>) {
// Get the MSP's Forest root write lock.
let forest_root_write_lock = match &mut self.maybe_managed_provider {
Some(ManagedProvider::Msp(msp_handler)) => &mut msp_handler.forest_root_write_lock,
let (forest_root_write_lock, forest_root_write_lock_manager) = match &mut self
.maybe_managed_provider
{
Some(ManagedProvider::Msp(msp_handler)) => (
&mut msp_handler.forest_root_write_lock,
&mut msp_handler.forest_write_lock_manager,
),
_ => {
error!(target: LOG_TARGET, "`msp_emit_forest_write_event` should only be called if the node is managing a MSP. Found [{:?}] instead.", self.maybe_managed_provider);
return;
Expand All @@ -479,11 +496,17 @@ where
// event. Clone is required because there is no constraint on the number of listeners that can
// subscribe to the event (and each is guaranteed to receive all emitted events).
let forest_root_write_tx = Arc::new(Mutex::new(Some(tx)));

let forest_write_lock_manager =
Arc::new(Mutex::new(forest_root_write_lock_manager.clone()));

// TODO: Update to use ForestWriteLockManager
match data.into() {
ForestWriteLockTaskData::MspRespondStorageRequest(data) => {
self.emit(ProcessMspRespondStoringRequest {
data,
forest_root_write_tx,
forest_write_lock_manager,
});
}
ForestWriteLockTaskData::StopStoringForInsolventUserRequest(data) => {
Expand Down
1 change: 1 addition & 0 deletions client/blockchain-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod capacity_manager;
pub mod commands;
pub mod events;
pub mod forest_write_lock_manager;
pub mod handler;
pub mod handler_bsp;
pub mod handler_msp;
Expand Down
9 changes: 8 additions & 1 deletion client/blockchain-service/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use sp_runtime::{
};

use crate::{
commands::BlockchainServiceCommandInterfaceExt, handler::LOG_TARGET,
commands::BlockchainServiceCommandInterfaceExt,
forest_write_lock_manager::ForestWriteLockManager, handler::LOG_TARGET,
transaction_manager::wait_for_transaction_status,
};

Expand Down Expand Up @@ -765,6 +766,8 @@ pub struct BspHandler<Runtime: StorageEnableRuntime> {
/// thread (Blockchain Service) and unlock it at the end of the spawned task. The alternative
/// would be to send a [`MutexGuard`].
pub(crate) forest_root_write_lock: Option<tokio::sync::oneshot::Receiver<()>>,
// TODO: Use Forest write lock manager, to prevent multiple tasks from writing to the runtime forest root (send transactions) concurrently.
//pub(crate) forest_write_lock_manager: ForestWriteLockManager<Runtime>,
/// A set of Forest Storage snapshots, ordered by block number and block hash.
///
/// A BSP can have multiple Forest Storage snapshots.
Expand All @@ -778,6 +781,7 @@ impl<Runtime: StorageEnableRuntime> BspHandler<Runtime> {
Self {
bsp_id,
pending_submit_proof_requests: BTreeSet::new(),
//forest_write_lock_manager: ForestWriteLockManager::new(),
forest_root_write_lock: None,
forest_root_snapshots: BTreeSet::new(),
}
Expand All @@ -798,6 +802,8 @@ pub struct MspHandler<Runtime: StorageEnableRuntime> {
/// thread (Blockchain Service) and unlock it at the end of the spawned task. The alternative
/// would be to send a [`MutexGuard`].
pub(crate) forest_root_write_lock: Option<tokio::sync::oneshot::Receiver<()>>,
/// Forest write lock manager, to prevent multiple tasks from writing to the runtime forest root (send transactions) concurrently.
pub(crate) forest_write_lock_manager: ForestWriteLockManager<Runtime>,
/// A map of [`BucketId`] to the Forest Storage snapshots.
///
/// Forest Storage snapshots are stored in a BTreeSet, ordered by block number and block hash.
Expand All @@ -818,6 +824,7 @@ impl<Runtime: StorageEnableRuntime> MspHandler<Runtime> {
Self {
msp_id,
forest_root_write_lock: None,
forest_write_lock_manager: ForestWriteLockManager::new(),
forest_root_snapshots: BTreeMap::new(),
files_to_distribute: HashMap::new(),
}
Expand Down
2 changes: 1 addition & 1 deletion client/src/tasks/bsp_charge_fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ where
// Release the forest root write "lock" and finish the task.
self.storage_hub_handler
.blockchain
.release_forest_root_write_lock(forest_root_write_tx)
.release_forest_root_write_lock(forest_root_write_tx, None)
.await
}
}
Expand Down
Loading
Loading