Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ sc-network = { workspace = true }
sc-network-types = { workspace = true }
sc-service = { workspace = true }
sc-tracing = { workspace = true }
substrate-prometheus-endpoint = { workspace = true }

sp-core = { workspace = true, default-features = true }
sp-keystore = { workspace = true }
Expand Down
18 changes: 18 additions & 0 deletions client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use shc_indexer_db::DbPool;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::SaturatedConversion;
use std::{path::PathBuf, sync::Arc};
use substrate_prometheus_endpoint::Registry;
use tokio::sync::RwLock;

use crate::metrics::MetricsLink;

use shc_actors_framework::actor::{ActorHandle, TaskSpawner};
use shc_blockchain_service::{
capacity_manager::CapacityConfig, handler::BlockchainServiceConfig, spawn_blockchain_service,
Expand Down Expand Up @@ -70,6 +73,7 @@ where
bsp_submit_proof_config: Option<BspSubmitProofConfig>,
blockchain_service_config: Option<BlockchainServiceConfig<Runtime>>,
peer_manager: Option<Arc<BspPeerManager>>,
metrics: MetricsLink,
}

/// Common components to build for any given configuration of [`ShRole`] and [`ShStorageLayer`].
Expand Down Expand Up @@ -98,6 +102,7 @@ where
bsp_submit_proof_config: None,
blockchain_service_config: None,
peer_manager: None,
metrics: MetricsLink::default(),
}
}

Expand Down Expand Up @@ -380,6 +385,15 @@ where
self.blockchain_service_config = Some(blockchain_service_config);
self
}

/// Set the Prometheus metrics registry.
///
/// If the registry is provided, metrics will be registered and available for tasks.
/// If `None`, metrics will be disabled (no-op).
pub fn with_metrics(&mut self, registry: Option<&Registry>) -> &mut Self {
self.metrics = MetricsLink::new(registry);
self
}
}

/// Abstraction trait to build the Storage Layer of a [`ShNodeType`].
Expand Down Expand Up @@ -552,6 +566,7 @@ where
self.indexer_db_pool.clone(),
self.peer_manager.expect("Peer Manager not set"),
None,
self.metrics.clone(),
)
}
}
Expand Down Expand Up @@ -598,6 +613,7 @@ where
self.indexer_db_pool.clone(),
self.peer_manager.expect("Peer Manager not set"),
None,
self.metrics.clone(),
)
}
}
Expand Down Expand Up @@ -645,6 +661,7 @@ where
self.indexer_db_pool.clone(),
self.peer_manager.expect("Peer Manager not set"),
None,
self.metrics.clone(),
)
}
}
Expand Down Expand Up @@ -700,6 +717,7 @@ where
// Not needed by the fisherman service
self.peer_manager.expect("Peer Manager not set"),
self.fisherman,
self.metrics.clone(),
)
}
}
Expand Down
110 changes: 89 additions & 21 deletions client/src/file_download_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ use shc_file_transfer_service::{
};
use shp_file_metadata::{Chunk, ChunkId};

use crate::{bsp_peer_manager::BspPeerManager, download_state_store::DownloadStateStore};
use crate::{
bsp_peer_manager::BspPeerManager,
download_state_store::DownloadStateStore,
inc_counter_by,
metrics::{MetricsLink, STATUS_FAILURE, STATUS_SUCCESS},
observe_histogram,
};

const LOG_TARGET: &str = "file_download_manager";

Expand All @@ -43,7 +49,8 @@ const BEST_PEERS_TO_SELECT: usize = 2;
const RANDOM_PEERS_TO_SELECT: usize = 3;
const MAX_CONCURRENT_BUCKET_DOWNLOADS: usize = 2;

/// Configuration for file download limits and parallelism settings
/// Configuration for file download limits and parallelism settings.
#[derive(Clone, Copy)]
pub struct FileDownloadLimits {
/// Maximum number of files to download in parallel
pub max_concurrent_file_downloads: usize,
Expand Down Expand Up @@ -78,21 +85,6 @@ impl Default for FileDownloadLimits {
}
}

impl Clone for FileDownloadLimits {
fn clone(&self) -> Self {
Self {
max_concurrent_file_downloads: self.max_concurrent_file_downloads,
max_concurrent_chunks_per_file: self.max_concurrent_chunks_per_file,
max_chunks_per_request: self.max_chunks_per_request,
chunk_request_peer_retry_attempts: self.chunk_request_peer_retry_attempts,
download_retry_attempts: self.download_retry_attempts,
best_peers_to_select: self.best_peers_to_select,
random_peers_to_select: self.random_peers_to_select,
max_concurrent_bucket_downloads: self.max_concurrent_bucket_downloads,
}
}
}

/// A bucket lock with metadata about its active status
struct BucketLockInfo {
/// Whether this lock is currently actively downloading (has acquired the mutex)
Expand Down Expand Up @@ -164,26 +156,42 @@ pub struct FileDownloadManager<Runtime: StorageEnableRuntime> {
peer_manager: Arc<BspPeerManager>,
/// Download state store for persistence
download_state_store: Arc<DownloadStateStore<Runtime>>,
/// Prometheus metrics for tracking download throughput
metrics: MetricsLink,
}

impl<Runtime: StorageEnableRuntime> FileDownloadManager<Runtime> {
/// Create a new FileDownloadManager with default limits
/// Create a new [`FileDownloadManager`] with default limits.
///
/// # Arguments
/// * `peer_manager` - The peer manager to use for peer selection and tracking
pub fn new(peer_manager: Arc<BspPeerManager>, data_dir: PathBuf) -> Result<Self> {
Self::with_limits(FileDownloadLimits::default(), peer_manager, data_dir)
/// * `data_dir` - The directory to store download state
/// * `metrics` - The Prometheus metrics link for tracking download throughput
pub fn new(
peer_manager: Arc<BspPeerManager>,
data_dir: PathBuf,
metrics: MetricsLink,
) -> Result<Self> {
Self::with_limits(
FileDownloadLimits::default(),
peer_manager,
data_dir,
metrics,
)
}

/// Create a new FileDownloadManager with specified limits
/// Create a new [`FileDownloadManager`] with specified limits.
///
/// # Arguments
/// * `limits` - The download limits to use
/// * `peer_manager` - The peer manager to use for peer selection and tracking
/// * `data_dir` - The directory to store download state
/// * `metrics` - The Prometheus metrics link for tracking download throughput
pub fn with_limits(
limits: FileDownloadLimits,
peer_manager: Arc<BspPeerManager>,
data_dir: PathBuf,
metrics: MetricsLink,
) -> Result<Self> {
// Create a new download state store
let download_state_store = Arc::new(DownloadStateStore::new(data_dir)?);
Expand All @@ -195,6 +203,7 @@ impl<Runtime: StorageEnableRuntime> FileDownloadManager<Runtime> {
limits,
peer_manager,
download_state_store,
metrics,
})
}

Expand Down Expand Up @@ -380,6 +389,21 @@ impl<Runtime: StorageEnableRuntime> FileDownloadManager<Runtime> {
self.peer_manager
.record_success(peer_id, total_bytes as u64, elapsed.as_millis() as u64)
.await;

// Record successful download throughput metrics
inc_counter_by!(
metrics: self.metrics.as_ref(),
bytes_downloaded_total,
STATUS_SUCCESS,
total_bytes as u64
);
inc_counter_by!(
metrics: self.metrics.as_ref(),
chunks_downloaded_total,
STATUS_SUCCESS,
processed_chunks as u64
);

Ok(true)
}

Expand Down Expand Up @@ -448,6 +472,31 @@ impl<Runtime: StorageEnableRuntime> FileDownloadManager<Runtime> {

if attempt == self.limits.download_retry_attempts {
self.peer_manager.record_failure(peer_id).await;

// Track failed download metrics
let expected_bytes: u64 = chunk_batch
.iter()
.filter_map(|chunk_id| {
file_metadata
.chunk_size_at(chunk_id.as_u64())
.ok()
.map(|size| size as u64)
})
.sum();

inc_counter_by!(
metrics: self.metrics.as_ref(),
bytes_downloaded_total,
STATUS_FAILURE,
expected_bytes
);
inc_counter_by!(
metrics: self.metrics.as_ref(),
chunks_downloaded_total,
STATUS_FAILURE,
chunk_batch.len() as u64
);

return Err(anyhow!(
"Failed to download after {} attempts: {:?}",
attempt + 1,
Expand Down Expand Up @@ -489,6 +538,9 @@ impl<Runtime: StorageEnableRuntime> FileDownloadManager<Runtime> {
.await
.map_err(|e| anyhow!("Failed to acquire file semaphore: {:?}", e))?;

// Track file download start time for metrics
let download_start = std::time::Instant::now();

let file_key = file_metadata.file_key::<HashT<StorageProofsMerkleTrieLayout>>();
let chunks_count = file_metadata.chunks_count();

Expand Down Expand Up @@ -650,12 +702,27 @@ impl<Runtime: StorageEnableRuntime> FileDownloadManager<Runtime> {
}

if !errors.is_empty() && !is_complete {
// Record failed file download duration in histogram
observe_histogram!(
metrics: self.metrics.as_ref(),
file_download_seconds,
STATUS_FAILURE,
download_start.elapsed().as_secs_f64()
);
Err(anyhow!(
"Failed to download file {:?}: {}",
file_key,
errors.join(", ")
))
} else {
// Record successful file download duration in histogram
observe_histogram!(
metrics: self.metrics.as_ref(),
file_download_seconds,
STATUS_SUCCESS,
download_start.elapsed().as_secs_f64()
);

info!(
target: LOG_TARGET,
"Successfully downloaded all chunks for file {:?}", file_key
Expand Down Expand Up @@ -836,6 +903,7 @@ impl<Runtime: StorageEnableRuntime> Clone for FileDownloadManager<Runtime> {
bucket_locks: Arc::clone(&self.bucket_locks),
peer_manager: Arc::clone(&self.peer_manager),
download_state_store: Arc::clone(&self.download_state_store),
metrics: self.metrics.clone(),
}
}
}
16 changes: 15 additions & 1 deletion client/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::{
use tokio::sync::RwLock;

use shc_actors_derive::{subscribe_actor_event, subscribe_actor_event_map};

use crate::metrics::{MetricsLink, StorageHubMetrics};
use shc_actors_framework::{
actor::{ActorHandle, TaskSpawner},
event_bus::EventHandler,
Expand Down Expand Up @@ -115,6 +117,8 @@ where
pub file_download_manager: Arc<FileDownloadManager<Runtime>>,
/// The fisherman service handle (only used for FishermanRole)
pub fisherman: Option<ActorHandle<shc_fisherman_service::FishermanService<Runtime>>>,
/// The Prometheus metrics for this client.
metrics: MetricsLink,
}

impl<NT, Runtime> Debug for StorageHubHandler<NT, Runtime>
Expand Down Expand Up @@ -146,6 +150,7 @@ where
peer_manager: self.peer_manager.clone(),
file_download_manager: self.file_download_manager.clone(),
fisherman: self.fisherman.clone(),
metrics: self.metrics.clone(),
}
}
}
Expand All @@ -165,14 +170,15 @@ where
indexer_db_pool: Option<DbPool>,
peer_manager: Arc<BspPeerManager>,
fisherman: Option<ActorHandle<shc_fisherman_service::FishermanService<Runtime>>>,
metrics: MetricsLink,
) -> Self {
// Get the data directory path from the peer manager's directory
// This assumes the peer manager stores data in a similar location to where we want our download state
let data_dir = std::env::temp_dir().join("storagehub");

// Create a FileDownloadManager with the peer manager already initialized
let file_download_manager = Arc::new(
FileDownloadManager::new(Arc::clone(&peer_manager), data_dir)
FileDownloadManager::new(Arc::clone(&peer_manager), data_dir, metrics.clone())
.expect("Failed to initialize FileDownloadManager"),
);

Expand All @@ -187,8 +193,16 @@ where
peer_manager,
file_download_manager,
fisherman,
metrics,
}
}

/// Get a reference to the Prometheus metrics.
///
/// Returns `None` if metrics are disabled (no Prometheus registry provided).
pub fn metrics(&self) -> Option<&StorageHubMetrics> {
self.metrics.as_ref()
}
}

/// Abstraction trait to run the [`StorageHubHandler`] tasks, according to the set configuration and role.
Expand Down
1 change: 1 addition & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ pub mod download_state_store;
pub mod file_download_manager;
pub mod forest_storage;
pub mod handler;
pub mod metrics;
pub mod tasks;
pub mod types;
Loading
Loading