diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index 6f40a5be03..751fbdef6c 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -2,7 +2,7 @@ mod caches; mod farms; use crate::commands::cluster::controller::caches::maintain_caches; -use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex}; +use crate::commands::cluster::controller::farms::maintain_farms; use crate::commands::shared::derive_libp2p_keypair; use crate::commands::shared::network::{configure_network, NetworkArgs}; use anyhow::anyhow; @@ -20,6 +20,7 @@ use std::sync::Arc; use std::time::Duration; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_farmer::cluster::controller::controller_service; +use subspace_farmer::cluster::farmer::FarmIndex; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farmer_cache::FarmerCache; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs index a57a71e6e6..a94a2971c7 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs @@ -9,21 +9,23 @@ use crate::commands::cluster::cache::CACHE_IDENTIFICATION_BROADCAST_INTERVAL; use anyhow::anyhow; use futures::channel::oneshot; use futures::future::FusedFuture; -use futures::{select, FutureExt, StreamExt}; +use futures::{select, stream, FutureExt, StreamExt}; use parking_lot::Mutex; use std::future::{ready, Future}; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; use subspace_farmer::cluster::cache::{ - ClusterCacheIdentifyBroadcast, ClusterCacheIndex, ClusterPieceCache, + ClusterCacheDetailsRequest, ClusterCacheIdentifyBroadcast, + ClusterCacheIdentifySignalCacheBroadcast, ClusterCacheIndex, ClusterPieceCache, + ClusterSingleCacheDetails, }; use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::{PieceCache, PieceCacheId}; use subspace_farmer::farmer_cache::FarmerCache; use tokio::time::MissedTickBehavior; -use tracing::{info, trace, warn}; +use tracing::{debug, info, trace, warn}; const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3); @@ -31,32 +33,41 @@ const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3); struct KnownCache { cache_id: PieceCacheId, last_identification: Instant, +} + +#[derive(Debug)] +struct KnownSingleCache { + single_cache_id: PieceCacheId, + last_identification: Instant, piece_cache: Arc, } #[derive(Debug, Default)] struct KnownCaches { known_caches: Vec, + known_single_caches: Vec, } impl KnownCaches { fn get_all(&self) -> Vec> { - self.known_caches + self.known_single_caches .iter() .map(|known_cache| Arc::clone(&known_cache.piece_cache) as Arc<_>) .collect() } /// Return `true` if farmer cache reinitialization is required - fn update( - &mut self, - cache_id: PieceCacheId, - max_num_elements: u32, - nats_client: &NatsClient, - ) -> bool { + fn update(&mut self, cache_id: PieceCacheId) -> bool { + let last_identification = Instant::now(); if self.known_caches.iter_mut().any(|known_cache| { if known_cache.cache_id == cache_id { - known_cache.last_identification = Instant::now(); + known_cache.last_identification = last_identification; + self.known_single_caches + .iter_mut() + .for_each(|known_single_cache| { + debug!(single_cache_id = %known_single_cache.single_cache_id, "Updating last identification for single cache"); + known_single_cache.last_identification = last_identification; + }); true } else { false @@ -65,23 +76,58 @@ impl KnownCaches { return false; } - let piece_cache = Arc::new(ClusterPieceCache::new( + self.known_caches.push(KnownCache { cache_id, + last_identification, + }); + + true + } + + /// Return `true` if farmer cache reinitialization is required + fn update_single( + &mut self, + single_cache_id: PieceCacheId, + max_num_elements: u32, + nats_client: &NatsClient, + ) -> bool { + if self + .known_single_caches + .iter_mut() + .any(|known_single_cache| { + if known_single_cache.single_cache_id == single_cache_id { + known_single_cache.last_identification = Instant::now(); + true + } else { + false + } + }) + { + return false; + } + + let piece_cache = Arc::new(ClusterPieceCache::new( + single_cache_id, max_num_elements, nats_client.clone(), )); - self.known_caches.push(KnownCache { - cache_id, + self.known_single_caches.push(KnownSingleCache { + single_cache_id, last_identification: Instant::now(), piece_cache, }); true } - fn remove_expired(&mut self) -> impl Iterator + '_ { - self.known_caches.extract_if(|known_cache| { - known_cache.last_identification.elapsed() > CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2 - }) + fn remove_expired(&mut self) -> impl Iterator + '_ { + let elapsed = CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2; + self.known_caches + .retain(move |known_cache| known_cache.last_identification.elapsed() <= elapsed); + + self.known_single_caches + .extract_if(move |known_single_cache| { + known_single_cache.last_identification.elapsed() > elapsed + }) } } @@ -98,10 +144,20 @@ pub(super) async fn maintain_caches( (Box::pin(ready(())) as Pin>>).fuse(); let cache_identify_subscription = pin!(nats_client - .subscribe_to_broadcasts::(Some(cache_group), None) + .subscribe_to_broadcasts::(None, None) .await .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?); + let single_cache_identify_subscription = pin!(nats_client + .subscribe_to_broadcasts::( + Some(cache_group), + None + ) + .await + .map_err(|error| anyhow!( + "Failed to subscribe to single cache identify broadcast: {error}" + ))?); + // Request cache to identify themselves if let Err(error) = nats_client .broadcast(&ClusterControllerCacheIdentifyBroadcast, cache_group) @@ -111,6 +167,7 @@ pub(super) async fn maintain_caches( } let mut cache_identify_subscription = cache_identify_subscription.fuse(); + let mut single_cache_identify_subscription = single_cache_identify_subscription.fuse(); let mut cache_pruning_interval = tokio::time::interval_at( (Instant::now() + CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2, @@ -152,29 +209,64 @@ pub(super) async fn maintain_caches( } select! { - maybe_identify_message = cache_identify_subscription.next() => { + maybe_identify_message = single_cache_identify_subscription.next() => { let Some(identify_message) = maybe_identify_message else { return Err(anyhow!("Cache identify stream ended")); }; - let ClusterCacheIdentifyBroadcast { + let ClusterCacheIdentifySignalCacheBroadcast { cache_id, max_num_elements, } = identify_message; - if known_caches.update(cache_id, max_num_elements, nats_client) { - info!( - %cache_id, - "New cache discovered, scheduling reinitialization" - ); - scheduled_reinitialization_for.replace( - Instant::now() + SCHEDULE_REINITIALIZATION_DELAY, - ); - } else { - trace!( - %cache_id, - "Received identification for already known cache" - ); + process_cache_identify_message( + nats_client, + cache_id, + max_num_elements, + &mut known_caches, + &mut scheduled_reinitialization_for, + ) + } + maybe_identify_message = cache_identify_subscription.next() => { + let Some(identify_message) = maybe_identify_message else { + return Err(anyhow!("Cache identify stream ended")); + }; + + let ClusterCacheIdentifyBroadcast { cache_id, cache_count } = identify_message; + + if !known_caches.update(cache_id) { + // Cache already known, nothing to do + debug!(%cache_id, "Cache already known, nothing to do"); + continue + } + + let cache_ids = cache_id.derive_sub_ids(cache_count.into()); + match nats_client + .stream_request( + ClusterCacheDetailsRequest, + Some(&cache_id.to_string()), + ) + .await + { + Ok(caches_details) => { + let mut caches_details = caches_details.zip(stream::iter(cache_ids)); + while let Some((cache_details, single_cache_id)) = caches_details.next().await { + let ClusterSingleCacheDetails { max_num_elements } = cache_details; + + process_cache_identify_message( + nats_client, + single_cache_id, + max_num_elements, + &mut known_caches, + &mut scheduled_reinitialization_for, + ) + } } + Err(error) => warn!( + %error, + %cache_id, + "Failed to request farmer farm details" + ), + } } _ = cache_pruning_interval.tick().fuse() => { let mut reinit = false; @@ -182,7 +274,7 @@ pub(super) async fn maintain_caches( reinit = true; warn!( - cache_id = %removed_cache.cache_id, + cache_id = %removed_cache.single_cache_id, "Cache expired and removed, scheduling reinitialization" ); } @@ -198,4 +290,26 @@ pub(super) async fn maintain_caches( } } } + + fn process_cache_identify_message( + nats_client: &NatsClient, + cache_id: PieceCacheId, + max_num_elements: u32, + known_caches: &mut KnownCaches, + scheduled_reinitialization_for: &mut Option, + ) { + if known_caches.update_single(cache_id, max_num_elements, nats_client) { + info!( + %cache_id, + "New cache discovered, scheduling reinitialization" + ); + scheduled_reinitialization_for + .replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY); + } else { + trace!( + %cache_id, + "Received identification for already known cache" + ); + } + } } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index e8efec6895..85c9b0719a 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -9,7 +9,7 @@ use anyhow::anyhow; use async_lock::RwLock as AsyncRwLock; use futures::channel::oneshot; use futures::future::FusedFuture; -use futures::stream::FuturesUnordered; +use futures::stream::{self, FuturesUnordered}; use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::collections::hash_map::Entry; @@ -21,18 +21,26 @@ use std::sync::Arc; use std::time::Instant; use subspace_core_primitives::{Blake3Hash, SectorIndex}; use subspace_farmer::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; -use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast}; +use subspace_farmer::cluster::farmer::{ + ClusterFarm, ClusterFarmerFarmDetailsRequest, ClusterFarmerIdentifyBroadcast, + ClusterFarmerIdentifyFarmBroadcast, FarmIndex, +}; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; use tokio::task; use tokio::time::MissedTickBehavior; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; type AddRemoveFuture<'a> = Pin, ClusterFarm)>> + 'a>>; -pub(super) type FarmIndex = u16; +#[derive(Debug)] +struct KnownFarmer { + farmer_id: FarmId, + fingerprint: Blake3Hash, + last_identification: Instant, +} #[derive(Debug)] struct KnownFarm { @@ -56,10 +64,39 @@ enum KnownFarmInsertResult { #[derive(Debug, Default)] struct KnownFarms { + known_farmers: Vec, known_farms: HashMap, } impl KnownFarms { + fn update_farmer(&mut self, farmer_id: FarmId, fingerprint: Blake3Hash) -> bool { + let last_identification = Instant::now(); + if self.known_farmers.iter_mut().any(|known_farmer| { + if known_farmer.farmer_id == farmer_id && known_farmer.fingerprint == fingerprint { + known_farmer.last_identification = last_identification; + self.known_farms.iter_mut().for_each(|(_, known_farm)| { + // All farms in farmer use the same fingerprint + if known_farm.fingerprint == fingerprint { + debug!(farm_id = %known_farm.farm_id, "Updating last identification for farm"); + known_farm.last_identification = last_identification; + } + }); + true + } else { + false + } + }) { + return false; + } + + self.known_farmers.push(KnownFarmer { + farmer_id, + fingerprint, + last_identification, + }); + true + } + fn insert_or_update( &mut self, farm_id: FarmId, @@ -115,8 +152,12 @@ impl KnownFarms { } fn remove_expired(&mut self) -> impl Iterator + '_ { - self.known_farms.extract_if(|_farm_index, known_farm| { - known_farm.last_identification.elapsed() > FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2 + let elapsed = FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2; + self.known_farmers + .retain(move |known_farmer| known_farmer.last_identification.elapsed() <= elapsed); + + self.known_farms.extract_if(move |_farm_index, known_farm| { + known_farm.last_identification.elapsed() > elapsed }) } @@ -143,6 +184,11 @@ pub(super) async fn maintain_farms( ]); let farmer_identify_subscription = pin!(nats_client + .subscribe_to_broadcasts::(None, None) + .await + .map_err(|error| anyhow!("Failed to subscribe to farmer identify broadcast: {error}"))?); + + let farm_identify_subscription = pin!(nats_client .subscribe_to_broadcasts::(None, None) .await .map_err(|error| anyhow!( @@ -157,6 +203,7 @@ pub(super) async fn maintain_farms( warn!(%error, "Failed to send farmer identification broadcast"); } + let mut farm_identify_subscription = farm_identify_subscription.fuse(); let mut farmer_identify_subscription = farmer_identify_subscription.fuse(); let mut farm_pruning_interval = tokio::time::interval_at( (Instant::now() + FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), @@ -200,9 +247,9 @@ pub(super) async fn maintain_farms( } } } - maybe_identify_message = farmer_identify_subscription.next() => { + maybe_identify_message = farm_identify_subscription.next() => { let Some(identify_message) = maybe_identify_message else { - return Err(anyhow!("Farmer identify stream ended")); + return Err(anyhow!("Farm identify stream ended")); }; process_farm_identify_message( @@ -213,6 +260,19 @@ pub(super) async fn maintain_farms( plotted_pieces, ); } + maybe_identify_message = farmer_identify_subscription.next() => { + let Some(identify_message) = maybe_identify_message else { + return Err(anyhow!("Farmer identify stream ended")); + }; + + process_farmer_identify_message( + identify_message, + nats_client, + &mut known_farms, + &mut farms_to_add_remove, + plotted_pieces, + ).await; + } _ = farm_pruning_interval.tick().fuse() => { for (farm_index, removed_farm) in known_farms.remove_expired() { let farm_id = removed_farm.farm_id; @@ -269,6 +329,59 @@ pub(super) async fn maintain_farms( } } +async fn process_farmer_identify_message<'a>( + identify_message: ClusterFarmerIdentifyBroadcast, + nats_client: &'a NatsClient, + known_farms: &mut KnownFarms, + farms_to_add_remove: &mut VecDeque>, + plotted_pieces: &'a Arc>>, +) { + let ClusterFarmerIdentifyBroadcast { + farmer_id, + farms_count, + fingerprint, + } = identify_message; + + if !known_farms.update_farmer(farmer_id, fingerprint) { + // Farmer already known, nothing to do + debug!(%farmer_id, "Farmer already known, nothing to do"); + return; + }; + + let farm_ids = farmer_id.derive_sub_ids(farms_count.into()); + match nats_client + .stream_request( + ClusterFarmerFarmDetailsRequest, + Some(&farmer_id.to_string()), + ) + .await + { + Ok(farms_details) => { + let mut farms_details = farms_details.zip(stream::iter(farm_ids)); + while let Some((farm_details, farm_id)) = farms_details.next().await { + let farm_identify_message = ClusterFarmerIdentifyFarmBroadcast { + farm_id, + total_sectors_count: farm_details.total_sectors_count, + fingerprint, + }; + + process_farm_identify_message( + farm_identify_message, + nats_client, + known_farms, + farms_to_add_remove, + plotted_pieces, + ) + } + } + Err(error) => warn!( + %error, + %farmer_id, + "Failed to request farmer farm details" + ), + } +} + fn process_farm_identify_message<'a>( identify_message: ClusterFarmerIdentifyFarmBroadcast, nats_client: &'a NatsClient, diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index edd624de5a..0a5d66326b 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -29,16 +29,46 @@ const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); /// Type alias for cache index used by cluster. pub type ClusterCacheIndex = u16; +/// Request cache details from cache +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterCacheDetailsRequest; + +impl GenericStreamRequest for ClusterCacheDetailsRequest { + const SUBJECT: &'static str = "subspace.cache.*.details"; + type Response = ClusterSingleCacheDetails; +} + +/// Cache details +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterSingleCacheDetails { + /// Max number of elements in this cache + pub max_num_elements: u32, +} + /// Broadcast with identification details by caches #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterCacheIdentifyBroadcast { + /// Cache ID + pub cache_id: PieceCacheId, + /// Number of caches + pub cache_count: ClusterCacheIndex, +} + +impl GenericBroadcast for ClusterCacheIdentifyBroadcast { + /// `*` here stands for cache group + const SUBJECT: &'static str = "subspace.cache.*.cache-identify"; +} + +/// Broadcast with identification details by caches +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterCacheIdentifySignalCacheBroadcast { /// Cache ID pub cache_id: PieceCacheId, /// Max number of elements in this cache pub max_num_elements: u32, } -impl GenericBroadcast for ClusterCacheIdentifyBroadcast { +impl GenericBroadcast for ClusterCacheIdentifySignalCacheBroadcast { /// `*` here stands for cache group const SUBJECT: &'static str = "subspace.cache.*.identify"; } @@ -209,11 +239,14 @@ pub async fn cache_service( where C: PieceCache, { + let cache_id = PieceCacheId::new(); + let cache_id_string = cache_id.to_string(); + let cache_ids = cache_id.derive_sub_ids(caches.len()); + let caches_details = caches .iter() - .map(|cache| { - let cache_id = *cache.id(); - + .zip(cache_ids) + .map(|(cache, cache_id)| { if primary_instance { info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); } @@ -228,7 +261,21 @@ where if primary_instance { select! { - result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => { + result = identify_responder( + &nats_client, + cache_id, + &caches_details, + cache_group, + identification_broadcast_interval + ).fuse() => { + result + }, + result = caches_details_responder( + &nats_client, + cache_id, + &cache_id_string, + &caches_details, + ).fuse() => { result }, result = write_piece_responder(&nats_client, &caches_details).fuse() => { @@ -269,6 +316,7 @@ where /// per controller instance in order to parallelize more work across threads if needed. async fn identify_responder( nats_client: &NatsClient, + cache_id: PieceCacheId, caches_details: &[CacheDetails<'_, C>], cache_group: &str, identification_broadcast_interval: Duration, @@ -306,14 +354,24 @@ where } last_identification = Instant::now(); - send_identify_broadcast(nats_client, caches_details, cache_group).await; + send_identify_broadcast( + nats_client, + cache_id, + caches_details, + cache_group + ).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Cache self-identification"); - send_identify_broadcast(nats_client, caches_details, cache_group).await; + send_identify_broadcast( + nats_client, + cache_id, + caches_details, + cache_group + ).await; } } } @@ -323,33 +381,96 @@ where async fn send_identify_broadcast( nats_client: &NatsClient, + cache_id: PieceCacheId, caches_details: &[CacheDetails<'_, C>], cache_group: &str, ) where C: PieceCache, { - caches_details - .iter() - .map(|cache| async move { - if let Err(error) = nats_client - .broadcast( - &ClusterCacheIdentifyBroadcast { - cache_id: cache.cache_id, - max_num_elements: cache.cache.max_num_elements(), - }, - cache_group, - ) - .await - { - warn!( - cache_id = %cache.cache_id, - %error, - "Failed to send cache identify notification" - ); + if caches_details.is_empty() { + warn!("No cache, skip sending cache identify notification"); + return; + } + + if let Err(error) = nats_client + .broadcast( + &ClusterCacheIdentifyBroadcast { + cache_id, + cache_count: caches_details.len() as ClusterCacheIndex, + }, + cache_group, + ) + .await + { + warn!(%cache_id, %error, "Failed to send farmer identify notification"); + } +} + +async fn caches_details_responder( + nats_client: &NatsClient, + cache_id: PieceCacheId, + cache_id_string: &str, + caches_details: &[CacheDetails<'_, C>], +) -> anyhow::Result<()> +where + C: PieceCache, +{ + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::from_iter([ + Box::pin(pending()) as Pin + Send>> + ]); + let mut subscription = nats_client + .subscribe_to_stream_requests(Some(cache_id_string), Some(cache_id_string.to_string())) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to farms details {}: {}", + cache_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_caches_details_request( + nats_client, + caches_details, + message, + ))); } - }) - .collect::>() - .collect::>() + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) +} + +async fn process_caches_details_request( + nats_client: &NatsClient, + caches_details: &[CacheDetails<'_, C>], + request: StreamRequest, +) where + C: PieceCache, +{ + trace!(?request, "Caches details request"); + + let stream = Box::new(stream::iter(caches_details.iter().map(|cache_details| { + ClusterSingleCacheDetails { + max_num_elements: cache_details.cache.max_num_elements(), + } + }))); + + nats_client + .stream_response::(request.response_subject, stream) .await; } diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 483f12413e..a2cfdd8b39 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -37,8 +37,41 @@ use tracing::{debug, error, trace, warn}; const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000; const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); +/// Type alias for farm index used by cluster. +pub type FarmIndex = u16; type Handler = Bag, A>; +/// Broadcast with farmer id for identification +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerIdentifyBroadcast { + /// Farmer ID + pub farmer_id: FarmId, + /// Number of farms + pub farms_count: FarmIndex, + /// Farmer fingerprint changes when something about internal farm changes (like allocated space) + pub fingerprint: Blake3Hash, +} + +impl GenericBroadcast for ClusterFarmerIdentifyBroadcast { + const SUBJECT: &'static str = "subspace.farmer.*.farmer-identify"; +} + +/// Request farm details from farmer +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmDetailsRequest; + +impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest { + const SUBJECT: &'static str = "subspace.farmer.*.farm.details"; + type Response = ClusterFarmerFarmDetails; +} + +/// Farm details +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmDetails { + /// Total number of sectors in the farm + pub total_sectors_count: SectorIndex, +} + /// Broadcast with identification details by farmers #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterFarmerIdentifyFarmBroadcast { @@ -357,12 +390,16 @@ pub fn farmer_service( where F: Farm, { + let farmer_id = FarmId::new(); + let farmer_id_string = farmer_id.to_string(); + let farm_ids = farmer_id.derive_sub_ids(farms.len()); + // For each farm start forwarding notifications as broadcast messages and create farm details // that can be used to respond to incoming requests let farms_details = farms .iter() - .map(|farm| { - let farm_id = *farm.id(); + .zip(farm_ids) + .map(|(farm, farm_id)| { let nats_client = nats_client.clone(); let background_tasks = if primary_instance { @@ -478,7 +515,21 @@ where async move { if primary_instance { select! { - result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => { + result = identify_responder( + &nats_client, + farmer_id, + &farmer_id_string, + &farms_details, + identification_broadcast_interval, + ).fuse() => { + result + }, + result = farms_details_responder( + &nats_client, + farmer_id, + &farmer_id_string, + &farms_details, + ).fuse() => { result }, result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { @@ -505,6 +556,8 @@ where /// broadcast in response, also send periodic notifications reminding that farm exists async fn identify_responder( nats_client: &NatsClient, + farmer_id: FarmId, + farmer_id_string: &str, farms_details: &[FarmDetails], identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { @@ -538,14 +591,24 @@ async fn identify_responder( } last_identification = Instant::now(); - send_identify_broadcast(nats_client, farms_details).await; + send_identify_broadcast( + nats_client, + farmer_id, + farmer_id_string, + farms_details, + ).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Farmer self-identification"); - send_identify_broadcast(nats_client, farms_details).await; + send_identify_broadcast( + nats_client, + farmer_id, + farmer_id_string, + farms_details, + ).await; } } } @@ -553,33 +616,111 @@ async fn identify_responder( Ok(()) } -async fn send_identify_broadcast(nats_client: &NatsClient, farms_details: &[FarmDetails]) { - farms_details +async fn send_identify_broadcast( + nats_client: &NatsClient, + farmer_id: FarmId, + farmer_id_string: &str, + farms_details: &[FarmDetails], +) { + if farms_details.is_empty() { + warn!("No farm, skip sending farmer identify notification"); + return; + } + + if let Err(error) = nats_client + .broadcast( + &new_identify_message(farmer_id, farms_details), + farmer_id_string, + ) + .await + { + warn!(%farmer_id, %error, "Failed to send farmer identify notification"); + } +} + +fn new_identify_message( + farmer_id: FarmId, + farms_details: &[FarmDetails], +) -> ClusterFarmerIdentifyBroadcast { + let farmer_id_bytes = farmer_id.encode(); + let farms_sectors_counts = farms_details .iter() - .map(|farm_details| async move { - if let Err(error) = nats_client - .broadcast( - &ClusterFarmerIdentifyFarmBroadcast { - farm_id: farm_details.farm_id, - total_sectors_count: farm_details.total_sectors_count, - fingerprint: blake3_hash_list(&[ - &farm_details.farm_id.encode(), - &farm_details.total_sectors_count.to_le_bytes(), - ]), - }, - &farm_details.farm_id_string, - ) - .await - { - warn!( - farm_id = %farm_details.farm_id, - %error, - "Failed to send farmer identify notification" - ); + .map(|farm_details| farm_details.total_sectors_count.to_le_bytes()) + .collect::>(); + let mut farms_sectors_counts = farms_sectors_counts + .iter() + .map(AsRef::as_ref) + .collect::>(); + farms_sectors_counts.push(farmer_id_bytes.as_slice()); + let fingerprint = blake3_hash_list(farms_sectors_counts.as_slice()); + + ClusterFarmerIdentifyBroadcast { + farmer_id, + farms_count: farms_details.len() as u16, + fingerprint, + } +} + +async fn farms_details_responder( + nats_client: &NatsClient, + farmer_id: FarmId, + farmer_id_string: &str, + farms_details: &[FarmDetails], +) -> anyhow::Result<()> { + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::from_iter([ + Box::pin(pending()) as Pin + Send>> + ]); + let mut subscription = nats_client + .subscribe_to_stream_requests(Some(farmer_id_string), Some(farmer_id_string.to_string())) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to farms details {}: {}", + farmer_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_farms_details_request( + nats_client, + farms_details, + message, + ))); } - }) - .collect::>() - .collect::>() + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) +} + +async fn process_farms_details_request( + nats_client: &NatsClient, + farms_details: &[FarmDetails], + request: StreamRequest, +) { + trace!(?request, "Farms details request"); + + let stream = Box::new(stream::iter(farms_details.iter().map(|farm_details| { + ClusterFarmerFarmDetails { + total_sectors_count: farm_details.total_sectors_count, + } + }))); + + nats_client + .stream_response::(request.response_subject, stream) .await; } diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index c755c2d343..58a81afb15 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -28,6 +28,8 @@ use thiserror::Error; use ulid::Ulid; pub mod plotted_pieces; +#[cfg(test)] +mod tests; /// Erased error type pub type FarmError = Box; @@ -100,6 +102,19 @@ impl PieceCacheId { pub fn new() -> Self { Self::Ulid(Ulid::new()) } + + /// Derive sub IDs + #[inline] + pub fn derive_sub_ids(&self, n: usize) -> Vec { + match self { + PieceCacheId::Ulid(ulid) => { + let ulid = ulid.0; + (0..n as u128) + .map(|i| PieceCacheId::Ulid(Ulid(ulid + i))) + .collect() + } + } + } } /// Offset wrapper for pieces in [`PieceCache`] @@ -499,6 +514,19 @@ impl FarmId { pub fn new() -> Self { Self::Ulid(Ulid::new()) } + + /// Derive sub IDs + #[inline] + pub fn derive_sub_ids(&self, n: usize) -> Vec { + match self { + FarmId::Ulid(ulid) => { + let ulid = ulid.0; + (0..n as u128) + .map(|i| FarmId::Ulid(Ulid(ulid + i))) + .collect() + } + } + } } /// Abstract farm implementation diff --git a/crates/subspace-farmer/src/farm/tests.rs b/crates/subspace-farmer/src/farm/tests.rs new file mode 100644 index 0000000000..0a7d645e02 --- /dev/null +++ b/crates/subspace-farmer/src/farm/tests.rs @@ -0,0 +1,37 @@ +use crate::farm::{FarmId, PieceCacheId}; + +#[test] +fn derive_sub_farm_ids_test() { + let id = FarmId::new(); + let sub_ids = id.derive_sub_ids(128); + assert_eq!(sub_ids.len(), 128); + + match id { + FarmId::Ulid(id) => { + let id: u128 = id.into(); + sub_ids.into_iter().zip(0..128u128).for_each(|(sub_id, i)| { + let FarmId::Ulid(sub_id) = sub_id; + let sub_id: u128 = sub_id.into(); + assert_eq!(sub_id, id + i); + }); + } + }; +} + +#[test] +fn derive_sub_cache_ids_test() { + let id = PieceCacheId::new(); + let sub_ids = id.derive_sub_ids(128); + assert_eq!(sub_ids.len(), 128); + + match id { + PieceCacheId::Ulid(id) => { + let id: u128 = id.into(); + sub_ids.into_iter().zip(0..128u128).for_each(|(sub_id, i)| { + let PieceCacheId::Ulid(sub_id) = sub_id; + let sub_id: u128 = sub_id.into(); + assert_eq!(sub_id, id + i); + }); + } + }; +}