From 749abf64cb578303d6069faea910e43f77844aab Mon Sep 17 00:00:00 2001 From: tediou5 Date: Wed, 11 Dec 2024 16:15:05 +0800 Subject: [PATCH 1/8] chore: rename cache_id => piece_cache_id --- crates/subspace-farmer/src/cluster/cache.rs | 71 +++++++++++---------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index fe8f5dc40f..e973db1d37 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -99,8 +99,8 @@ impl GenericStreamRequest for ClusterCacheContentsRequest { /// Cluster cache implementation #[derive(Debug)] pub struct ClusterPieceCache { - cache_id: PieceCacheId, - cache_id_string: String, + piece_cache_id: PieceCacheId, + piece_cache_id_string: String, max_num_elements: u32, nats_client: NatsClient, } @@ -108,7 +108,7 @@ pub struct ClusterPieceCache { #[async_trait] impl PieceCache for ClusterPieceCache { fn id(&self) -> &PieceCacheId { - &self.cache_id + &self.piece_cache_id } #[inline] @@ -129,7 +129,10 @@ impl PieceCache for ClusterPieceCache { > { Ok(Box::new( self.nats_client - .stream_request(&ClusterCacheContentsRequest, Some(&self.cache_id_string)) + .stream_request( + &ClusterCacheContentsRequest, + Some(&self.piece_cache_id_string), + ) .await? .map(|response| response.map_err(FarmError::from)), )) @@ -149,7 +152,7 @@ impl PieceCache for ClusterPieceCache { piece_index, piece: piece.clone(), }, - Some(&self.cache_id_string), + Some(&self.piece_cache_id_string), ) .await??) } @@ -162,7 +165,7 @@ impl PieceCache for ClusterPieceCache { .nats_client .request( &ClusterCacheReadPieceIndexRequest { offset }, - Some(&self.cache_id_string), + Some(&self.piece_cache_id_string), ) .await??) } @@ -175,7 +178,7 @@ impl PieceCache for ClusterPieceCache { .nats_client .request( &ClusterCacheReadPieceRequest { offset }, - Some(&self.cache_id_string), + Some(&self.piece_cache_id_string), ) .await??) } @@ -198,7 +201,7 @@ impl PieceCache for ClusterPieceCache { .nats_client .stream_request( &ClusterCacheReadPiecesRequest { offsets }, - Some(&self.cache_id_string), + Some(&self.piece_cache_id_string), ) .await? .map(|response| response.map_err(FarmError::from)) @@ -235,13 +238,13 @@ impl ClusterPieceCache { /// [`ClusterCacheIdentifyBroadcast`] #[inline] pub fn new( - cache_id: PieceCacheId, + piece_cache_id: PieceCacheId, max_num_elements: u32, nats_client: NatsClient, ) -> ClusterPieceCache { Self { - cache_id, - cache_id_string: cache_id.to_string(), + piece_cache_id, + piece_cache_id_string: piece_cache_id.to_string(), max_num_elements, nats_client, } @@ -250,8 +253,8 @@ impl ClusterPieceCache { #[derive(Debug)] struct CacheDetails<'a, C> { - cache_id: PieceCacheId, - cache_id_string: String, + piece_cache_id: PieceCacheId, + piece_cache_id_string: String, cache: &'a C, } @@ -270,15 +273,15 @@ where let caches_details = caches .iter() .map(|cache| { - let cache_id = *cache.id(); + let piece_cache_id = *cache.id(); if primary_instance { - info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); + info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); } CacheDetails { - cache_id, - cache_id_string: cache_id.to_string(), + piece_cache_id, + piece_cache_id_string: piece_cache_id.to_string(), cache, } }) @@ -398,7 +401,7 @@ async fn send_identify_broadcast( if let Err(error) = nats_client .broadcast( &ClusterCacheIdentifyBroadcast { - cache_id: cache.cache_id, + cache_id: cache.piece_cache_id, max_num_elements: cache.cache.max_num_elements(), }, cache_group, @@ -406,7 +409,7 @@ async fn send_identify_broadcast( .await { warn!( - cache_id = %cache.cache_id, + piece_cache_id = %cache.piece_cache_id, %error, "Failed to send cache identify notification" ); @@ -429,8 +432,8 @@ where .map(|cache_details| async move { nats_client .request_responder( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |request: ClusterCacheWritePieceRequest| async move { Some( cache_details @@ -441,7 +444,7 @@ where ) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() @@ -462,8 +465,8 @@ where .map(|cache_details| async move { nats_client .request_responder( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |request: ClusterCacheReadPieceIndexRequest| async move { Some( cache_details @@ -474,7 +477,7 @@ where ) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() @@ -495,8 +498,8 @@ where .map(|cache_details| async move { nats_client .request_responder( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |request: ClusterCacheReadPieceRequest| async move { Some( cache_details @@ -507,7 +510,7 @@ where ) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() @@ -528,8 +531,8 @@ where .map(|cache_details| async move { nats_client .stream_request_responder::<_, _, Pin + Send>>, _>( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |ClusterCacheReadPiecesRequest { offsets }| async move { Some( match cache_details @@ -551,7 +554,7 @@ where ) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() @@ -572,8 +575,8 @@ where .map(|cache_details| async move { nats_client .stream_request_responder::<_, _, Pin + Send>>, _>( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |_request: ClusterCacheContentsRequest| async move { Some(match cache_details.cache.contents().await { Ok(contents) => Box::pin(contents.map(|maybe_cache_element| { @@ -589,7 +592,7 @@ where }) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() From 6b814c40b1c06f48adba301a6c0443e37479fed7 Mon Sep 17 00:00:00 2001 From: tediou5 Date: Wed, 11 Dec 2024 12:28:24 +0800 Subject: [PATCH 2/8] feat: optimize cache identification --- crates/subspace-farmer/src/cluster/cache.rs | 123 +++++++++++++----- .../src/cluster/controller/caches.rs | 105 +++++++++------ crates/subspace-farmer/src/farm.rs | 64 ++++++++- 3 files changed, 218 insertions(+), 74 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index e973db1d37..63eb8a4d6e 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -11,7 +11,7 @@ use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use crate::cluster::nats_client::{ GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, }; -use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset}; +use crate::farm::{CacheId, FarmError, PieceCache, PieceCacheId, PieceCacheOffset}; use anyhow::anyhow; use async_trait::async_trait; use futures::stream::FuturesUnordered; @@ -27,18 +27,35 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); +/// Request cache details from cache +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterCacheDetailsRequest; + +impl GenericStreamRequest for ClusterCacheDetailsRequest { + /// `*` here stands for cache group + const SUBJECT: &'static str = "subspace.cache.*.details"; + type Response = ClusterPieceCacheDetails; +} + +/// Cache details +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterPieceCacheDetails { + /// Piece Cache ID + pub piece_cache_id: PieceCacheId, + /// Max number of elements in this cache + pub max_num_elements: u32, +} + /// Broadcast with identification details by caches #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterCacheIdentifyBroadcast { /// Cache ID - pub cache_id: PieceCacheId, - /// Max number of elements in this cache - pub max_num_elements: u32, + pub cache_id: CacheId, } impl GenericBroadcast for ClusterCacheIdentifyBroadcast { /// `*` here stands for cache group - const SUBJECT: &'static str = "subspace.cache.*.identify"; + const SUBJECT: &'static str = "subspace.cache.*.cache-identify"; } /// Write piece into cache @@ -258,6 +275,18 @@ struct CacheDetails<'a, C> { cache: &'a C, } +impl CacheDetails<'_, C> +where + C: PieceCache, +{ + fn derive_identification(&self) -> ClusterPieceCacheDetails { + ClusterPieceCacheDetails { + piece_cache_id: self.piece_cache_id, + max_num_elements: self.cache.max_num_elements(), + } + } +} + /// Create cache service for specified caches that will be processing incoming requests and send /// periodic identify notifications pub async fn cache_service( @@ -270,13 +299,15 @@ pub async fn cache_service( where C: PieceCache, { + let cache_id = CacheId::new(); + let cache_id_string = cache_id.to_string(); + let caches_details = caches .iter() .map(|cache| { let piece_cache_id = *cache.id(); - if primary_instance { - info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); + info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created piece cache"); } CacheDetails { @@ -289,7 +320,20 @@ where if primary_instance { select! { - result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => { + result = identify_responder( + &nats_client, + cache_id, + &caches_details, + cache_group, + identification_broadcast_interval + ).fuse() => { + result + }, + result = caches_details_responder( + &nats_client, + &cache_id_string, + &caches_details + ).fuse() => { result }, result = write_piece_responder(&nats_client, &caches_details).fuse() => { @@ -336,6 +380,7 @@ where /// per controller instance in order to parallelize more work across threads if needed. async fn identify_responder( nats_client: &NatsClient, + cache_id: CacheId, caches_details: &[CacheDetails<'_, C>], cache_group: &str, identification_broadcast_interval: Duration, @@ -373,14 +418,14 @@ where } last_identification = Instant::now(); - send_identify_broadcast(nats_client, caches_details, cache_group).await; + send_identify_broadcast(nats_client, cache_id, caches_details, cache_group).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Cache self-identification"); - send_identify_broadcast(nats_client, caches_details, cache_group).await; + send_identify_broadcast(nats_client, cache_id, caches_details, cache_group).await; } } } @@ -390,34 +435,46 @@ where async fn send_identify_broadcast( nats_client: &NatsClient, + cache_id: CacheId, caches_details: &[CacheDetails<'_, C>], cache_group: &str, ) where C: PieceCache, { - caches_details - .iter() - .map(|cache| async move { - if let Err(error) = nats_client - .broadcast( - &ClusterCacheIdentifyBroadcast { - cache_id: cache.piece_cache_id, - max_num_elements: cache.cache.max_num_elements(), - }, - cache_group, - ) - .await - { - warn!( - piece_cache_id = %cache.piece_cache_id, - %error, - "Failed to send cache identify notification" - ); - } - }) - .collect::>() - .collect::>() - .await; + if caches_details.is_empty() { + warn!("No cache, skip sending cache identify notification"); + return; + } + + if let Err(error) = nats_client + .broadcast(&ClusterCacheIdentifyBroadcast { cache_id }, cache_group) + .await + { + warn!(%cache_id, %error, "Failed to send cache identify notification"); + } +} + +async fn caches_details_responder( + nats_client: &NatsClient, + cache_id_string: &str, + caches_details: &[CacheDetails<'_, C>], +) -> anyhow::Result<()> +where + C: PieceCache, +{ + nats_client + .stream_request_responder( + Some(cache_id_string), + Some(cache_id_string.to_string()), + |_request: ClusterCacheDetailsRequest| async { + Some(stream::iter( + caches_details + .iter() + .map(CacheDetails::derive_identification), + )) + }, + ) + .await } async fn write_piece_responder( diff --git a/crates/subspace-farmer/src/cluster/controller/caches.rs b/crates/subspace-farmer/src/cluster/controller/caches.rs index 5eecf861b2..236da67df0 100644 --- a/crates/subspace-farmer/src/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/cluster/controller/caches.rs @@ -5,30 +5,33 @@ //! cache addition and removal, tries to reduce number of reinitializations that result in potential //! piece cache sync, etc. -use crate::cluster::cache::{ClusterCacheIdentifyBroadcast, ClusterPieceCache}; +use crate::cluster::cache::{ + ClusterCacheDetailsRequest, ClusterCacheIdentifyBroadcast, ClusterPieceCache, + ClusterPieceCacheDetails, +}; use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use crate::cluster::nats_client::NatsClient; -use crate::farm::{PieceCache, PieceCacheId}; +use crate::farm::{CacheId, PieceCache}; use crate::farmer_cache::FarmerCache; use anyhow::anyhow; use futures::channel::oneshot; use futures::future::FusedFuture; -use futures::{select, FutureExt, StreamExt}; +use futures::{select, FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use std::future::{ready, Future}; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::time::MissedTickBehavior; -use tracing::{info, trace, warn}; +use tracing::{debug, info, warn}; const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3); #[derive(Debug)] struct KnownCache { - cache_id: PieceCacheId, + cache_id: CacheId, last_identification: Instant, - piece_cache: Arc, + piece_caches: Vec>, } #[derive(Debug)] @@ -48,17 +51,26 @@ impl KnownCaches { fn get_all(&self) -> Vec> { self.known_caches .iter() - .map(|known_cache| Arc::clone(&known_cache.piece_cache) as Arc<_>) + .flat_map(|known_cache| { + known_cache + .piece_caches + .iter() + .map(|piece_cache| Arc::clone(piece_cache) as Arc<_>) + }) .collect() } /// Return `true` if farmer cache reinitialization is required - fn update( + async fn update_cache( &mut self, - cache_id: PieceCacheId, - max_num_elements: u32, + cache_id: CacheId, + scheduled_reinitialization_for: &mut Option, nats_client: &NatsClient, - ) -> bool { + piece_cache_stream: Fut, + ) where + Fut: Future>, + S: Stream + Unpin, + { if self.known_caches.iter_mut().any(|known_cache| { if known_cache.cache_id == cache_id { known_cache.last_identification = Instant::now(); @@ -67,20 +79,37 @@ impl KnownCaches { false } }) { - return false; + return; } - let piece_cache = Arc::new(ClusterPieceCache::new( - cache_id, - max_num_elements, - nats_client.clone(), - )); + let Some(piece_caches_stream) = piece_cache_stream.await else { + return; + }; + let piece_caches = piece_caches_stream + .map( + |ClusterPieceCacheDetails { + piece_cache_id, + max_num_elements, + }| { + debug!(%cache_id, %piece_cache_id, %max_num_elements, "Discovered new piece cache"); + Arc::new(ClusterPieceCache::new( + piece_cache_id, + max_num_elements, + nats_client.clone(), + )) + }, + ) + .collect() + .await; + + info!(%cache_id, "New cache discovered, scheduling reinitialization"); + scheduled_reinitialization_for.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY); + self.known_caches.push(KnownCache { cache_id, last_identification: Instant::now(), - piece_cache, + piece_caches, }); - true } fn remove_expired(&mut self) -> impl Iterator + '_ { @@ -100,7 +129,6 @@ pub async fn maintain_caches( let mut known_caches = KnownCaches::new(identification_broadcast_interval); let mut scheduled_reinitialization_for = None; - // Farm that is being added/removed right now (if any) let mut cache_reinitialization = (Box::pin(ready(())) as Pin>>).fuse(); @@ -164,24 +192,27 @@ pub async fn maintain_caches( return Err(anyhow!("Cache identify stream ended")); }; - let ClusterCacheIdentifyBroadcast { + let ClusterCacheIdentifyBroadcast { cache_id } = identify_message; + + known_caches.update_cache( cache_id, - max_num_elements, - } = identify_message; - if known_caches.update(cache_id, max_num_elements, nats_client) { - info!( - %cache_id, - "New cache discovered, scheduling reinitialization" - ); - scheduled_reinitialization_for.replace( - Instant::now() + SCHEDULE_REINITIALIZATION_DELAY, - ); - } else { - trace!( - %cache_id, - "Received identification for already known cache" - ); - } + &mut scheduled_reinitialization_for, + nats_client, + async { + nats_client + .stream_request( + &ClusterCacheDetailsRequest, + Some(&cache_id.to_string()), + ) + .await + .inspect_err(|error| warn!( + %error, + %cache_id, + "Failed to request farmer farm details" + )) + .ok() + }, + ).await } _ = cache_pruning_interval.tick().fuse() => { let mut reinit = false; diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index 6d6571f475..32d506549e 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -52,11 +52,67 @@ pub trait PlottedSectors: Send + Sync + fmt::Debug { Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, )] #[serde(untagged)] -pub enum PieceCacheId { +pub enum CacheId { /// Cache ID Ulid(Ulid), } +impl Encode for CacheId { + #[inline] + fn size_hint(&self) -> usize { + 1_usize + + match self { + CacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)), + } + } + + #[inline] + fn encode_to(&self, output: &mut O) { + match self { + CacheId::Ulid(ulid) => { + output.push_byte(0); + Encode::encode_to(&ulid.0, output); + } + } + } +} + +impl EncodeLike for CacheId {} + +impl Decode for CacheId { + #[inline] + fn decode(input: &mut I) -> Result { + match input + .read_byte() + .map_err(|e| e.chain("Could not decode `CacheId`, failed to read variant byte"))? + { + 0 => u128::decode(input) + .map(|ulid| CacheId::Ulid(Ulid(ulid))) + .map_err(|e| e.chain("Could not decode `CacheId::Ulid.0`")), + _ => Err("Could not decode `CacheId`, variant doesn't exist".into()), + } + } +} + +#[allow(clippy::new_without_default)] +impl CacheId { + /// Creates new ID + #[inline] + pub fn new() -> Self { + Self::Ulid(Ulid::new()) + } +} + +/// An identifier for a piece cache, can be used for in logs, thread names, etc. +#[derive( + Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, +)] +#[serde(untagged)] +pub enum PieceCacheId { + /// Piece Cache ID + Ulid(Ulid), +} + impl Encode for PieceCacheId { #[inline] fn size_hint(&self) -> usize { @@ -84,12 +140,12 @@ impl Decode for PieceCacheId { fn decode(input: &mut I) -> Result { match input .read_byte() - .map_err(|e| e.chain("Could not decode `CacheId`, failed to read variant byte"))? + .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))? { 0 => u128::decode(input) .map(|ulid| PieceCacheId::Ulid(Ulid(ulid))) - .map_err(|e| e.chain("Could not decode `CacheId::Ulid.0`")), - _ => Err("Could not decode `CacheId`, variant doesn't exist".into()), + .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")), + _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()), } } } From 91b3dd4f553e48f8afffbf2dd2384bbaef33098a Mon Sep 17 00:00:00 2001 From: tediou5 Date: Mon, 20 Jan 2025 13:27:34 +0800 Subject: [PATCH 3/8] opt: collect piece caches details stream request in the background --- .../src/cluster/controller/caches.rs | 115 ++++++++++-------- 1 file changed, 62 insertions(+), 53 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/controller/caches.rs b/crates/subspace-farmer/src/cluster/controller/caches.rs index 236da67df0..a4a679b376 100644 --- a/crates/subspace-farmer/src/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/cluster/controller/caches.rs @@ -16,7 +16,8 @@ use crate::farmer_cache::FarmerCache; use anyhow::anyhow; use futures::channel::oneshot; use futures::future::FusedFuture; -use futures::{select, FutureExt, Stream, StreamExt}; +use futures::stream::FuturesUnordered; +use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::future::{ready, Future}; use std::pin::{pin, Pin}; @@ -25,6 +26,7 @@ use std::time::{Duration, Instant}; use tokio::time::MissedTickBehavior; use tracing::{debug, info, warn}; +type CollectPieceCachesFuture = Pin>>>; const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3); #[derive(Debug)] @@ -60,17 +62,13 @@ impl KnownCaches { .collect() } - /// Return `true` if farmer cache reinitialization is required - async fn update_cache( + /// Update cache's last identification with given `cache_id` or add it if it doesn't exist + fn update_cache( &mut self, cache_id: CacheId, - scheduled_reinitialization_for: &mut Option, nats_client: &NatsClient, - piece_cache_stream: Fut, - ) where - Fut: Future>, - S: Stream + Unpin, - { + piece_caches_to_add: &mut FuturesUnordered, + ) { if self.known_caches.iter_mut().any(|known_cache| { if known_cache.cache_id == cache_id { known_cache.last_identification = Instant::now(); @@ -82,34 +80,7 @@ impl KnownCaches { return; } - let Some(piece_caches_stream) = piece_cache_stream.await else { - return; - }; - let piece_caches = piece_caches_stream - .map( - |ClusterPieceCacheDetails { - piece_cache_id, - max_num_elements, - }| { - debug!(%cache_id, %piece_cache_id, %max_num_elements, "Discovered new piece cache"); - Arc::new(ClusterPieceCache::new( - piece_cache_id, - max_num_elements, - nats_client.clone(), - )) - }, - ) - .collect() - .await; - - info!(%cache_id, "New cache discovered, scheduling reinitialization"); - scheduled_reinitialization_for.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY); - - self.known_caches.push(KnownCache { - cache_id, - last_identification: Instant::now(), - piece_caches, - }); + piece_caches_to_add.push(collect_new_cache(cache_id, nats_client)); } fn remove_expired(&mut self) -> impl Iterator + '_ { @@ -128,6 +99,8 @@ pub async fn maintain_caches( ) -> anyhow::Result<()> { let mut known_caches = KnownCaches::new(identification_broadcast_interval); + let mut piece_caches_to_add = FuturesUnordered::new(); + let mut scheduled_reinitialization_for = None; let mut cache_reinitialization = (Box::pin(ready(())) as Pin>>).fuse(); @@ -196,23 +169,20 @@ pub async fn maintain_caches( known_caches.update_cache( cache_id, - &mut scheduled_reinitialization_for, nats_client, - async { - nats_client - .stream_request( - &ClusterCacheDetailsRequest, - Some(&cache_id.to_string()), - ) - .await - .inspect_err(|error| warn!( - %error, - %cache_id, - "Failed to request farmer farm details" - )) - .ok() - }, - ).await + &mut piece_caches_to_add, + ) + } + maybe_new_cache = piece_caches_to_add.select_next_some() => { + let Ok(new_cache) = maybe_new_cache else { + // Collecting new cache failed, continue + continue; + }; + + info!(cache_id = %new_cache.cache_id, "New cache discovered, scheduling reinitialization"); + scheduled_reinitialization_for.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY); + + known_caches.known_caches.push(new_cache); } _ = cache_pruning_interval.tick().fuse() => { let mut reinit = false; @@ -237,3 +207,42 @@ pub async fn maintain_caches( } } } + +/// Collect piece caches from the cache and convert them to `ClusterPieceCache` by sending a stream request, +/// then construct a `KnownCache` instance. +fn collect_new_cache(cache_id: CacheId, nats_client: &NatsClient) -> CollectPieceCachesFuture { + let nats_client = nats_client.clone(); + Box::pin(async move { + let piece_caches = nats_client + .stream_request(&ClusterCacheDetailsRequest, Some(&cache_id.to_string())) + .await + .inspect_err(|error| { + warn!( + %error, + %cache_id, + "Failed to request farmer farm details" + ) + })? + .map( + |ClusterPieceCacheDetails { + piece_cache_id, + max_num_elements, + }| { + debug!(%cache_id, %piece_cache_id, %max_num_elements, "Discovered new piece cache"); + Arc::new(ClusterPieceCache::new( + piece_cache_id, + max_num_elements, + nats_client.clone(), + )) + }, + ) + .collect() + .await; + + Ok(KnownCache { + cache_id, + last_identification: Instant::now(), + piece_caches, + }) + }) +} From 3cdc6715e2e5eee2fcc492bc5d90027ef37f1d9f Mon Sep 17 00:00:00 2001 From: tediou5 Date: Wed, 11 Dec 2024 12:28:37 +0800 Subject: [PATCH 4/8] feat: optimize farmer identification --- .../src/cluster/controller/farms.rs | 479 ++++++++++++------ crates/subspace-farmer/src/cluster/farmer.rs | 161 ++++-- crates/subspace-farmer/src/farm.rs | 56 ++ 3 files changed, 499 insertions(+), 197 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/controller/farms.rs b/crates/subspace-farmer/src/cluster/controller/farms.rs index 4c725e3d49..98a2f491fe 100644 --- a/crates/subspace-farmer/src/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/cluster/controller/farms.rs @@ -5,19 +5,21 @@ //! automatically handles dynamic farm addition and removal, etc. use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; -use crate::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast}; +use crate::cluster::farmer::{ + ClusterFarm, ClusterFarmerFarmDetails, ClusterFarmerFarmDetailsRequest, + ClusterFarmerIdentifyBroadcast, +}; use crate::cluster::nats_client::NatsClient; use crate::farm::plotted_pieces::PlottedPieces; -use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; +use crate::farm::{Farm, FarmId, FarmerId, SectorPlottingDetails, SectorUpdate}; use anyhow::anyhow; use async_lock::RwLock as AsyncRwLock; use futures::channel::oneshot; use futures::future::FusedFuture; use futures::stream::FuturesUnordered; -use futures::{select, FutureExt, StreamExt}; +use futures::{select, FutureExt, Stream, StreamExt}; use parking_lot::Mutex; -use std::collections::hash_map::Entry; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::future::{ready, Future}; use std::mem; use std::pin::{pin, Pin}; @@ -27,7 +29,7 @@ use subspace_core_primitives::hashes::Blake3Hash; use subspace_core_primitives::sectors::SectorIndex; use tokio::task; use tokio::time::MissedTickBehavior; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, warn}; type AddRemoveFuture<'a> = Pin, ClusterFarm)>> + 'a>>; @@ -35,102 +37,253 @@ type AddRemoveFuture<'a> = /// Number of farms in a cluster is currently limited to 2^16 pub type FarmIndex = u16; +#[derive(Debug)] +struct KnownFarmer { + farmer_id: FarmerId, + fingerprint: Blake3Hash, + last_identification: Instant, + known_farms: HashMap, +} + #[derive(Debug)] struct KnownFarm { farm_id: FarmId, fingerprint: Blake3Hash, - last_identification: Instant, expired_sender: oneshot::Sender<()>, } -enum KnownFarmInsertResult { - Inserted { - farm_index: FarmIndex, - expired_receiver: oneshot::Receiver<()>, - }, +enum KnownFarmerInsertResult { + Inserted, FingerprintUpdated { - farm_index: FarmIndex, - expired_receiver: oneshot::Receiver<()>, + old_farms: HashMap, }, NotInserted, } +struct KnownFarmInsertResult { + farm_index: FarmIndex, + farm_id: FarmId, + total_sectors_count: u16, + expired_receiver: oneshot::Receiver<()>, + add: bool, + remove: bool, +} + #[derive(Debug)] -struct KnownFarms { +struct KnownFarmers { identification_broadcast_interval: Duration, - known_farms: HashMap, + known_farmers: Vec, } -impl KnownFarms { +impl KnownFarmers { fn new(identification_broadcast_interval: Duration) -> Self { Self { identification_broadcast_interval, - known_farms: HashMap::new(), + known_farmers: Vec::new(), } } - fn insert_or_update( + async fn insert_or_update_farmer( &mut self, - farm_id: FarmId, + farmer_id: FarmerId, fingerprint: Blake3Hash, - ) -> KnownFarmInsertResult { - if let Some(existing_result) = - self.known_farms - .iter_mut() - .find_map(|(&farm_index, known_farm)| { - if known_farm.farm_id == farm_id { - if known_farm.fingerprint == fingerprint { - known_farm.last_identification = Instant::now(); - Some(KnownFarmInsertResult::NotInserted) + farms_stream: Fut, + ) -> Vec + where + Fut: Future>, + S: Stream + Unpin, + { + let result = self + .known_farmers + .iter_mut() + .find_map(|known_farmer| { + let check_farmer_id = known_farmer.farmer_id == farmer_id; + let check_fingerprint = known_farmer.fingerprint == fingerprint; + match (check_farmer_id, check_fingerprint) { + (true, true) => { + debug!(%farmer_id,"Updating last identification for farmer"); + known_farmer.last_identification = Instant::now(); + Some(KnownFarmerInsertResult::NotInserted) + } + (true, false) => { + let old_farms = known_farmer + .known_farms + .drain() + .map(|(farm_index, know_farm)| { + (know_farm.farm_id, (farm_index, know_farm)) + }) + .collect(); + known_farmer.fingerprint = fingerprint; + known_farmer.last_identification = Instant::now(); + Some(KnownFarmerInsertResult::FingerprintUpdated { old_farms }) + } + (false, _) => None, + } + }) + .unwrap_or(KnownFarmerInsertResult::Inserted); + + if let KnownFarmerInsertResult::NotInserted = result { + return vec![]; + } + + let Some(farms_stream) = farms_stream.await else { + return vec![]; + }; + let farms = farms_stream.collect::>().await; + let farm_indices = self.pick_farmer_index(farms.len()); + + match result { + KnownFarmerInsertResult::Inserted => { + let mut known_farmer = KnownFarmer { + farmer_id, + fingerprint, + last_identification: Instant::now(), + known_farms: HashMap::new(), + }; + + let res = farm_indices + .into_iter() + .zip(farms) + .map(|(farm_index, farm_details)| { + let ClusterFarmerFarmDetails { + farm_id, + total_sectors_count, + fingerprint, + } = farm_details; + let (expired_sender, expired_receiver) = oneshot::channel(); + known_farmer.known_farms.insert( + farm_index, + KnownFarm { + farm_id, + fingerprint, + expired_sender, + }, + ); + info!(%farmer_id, %farm_id, %total_sectors_count, "Discovered new farm"); + KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add: true, + remove: false, + } + }) + .collect::>(); + self.known_farmers.push(known_farmer); + res + } + KnownFarmerInsertResult::FingerprintUpdated { mut old_farms } => { + farm_indices + .into_iter() + .zip(farms) + .filter_map(|(farm_index, farm_details)| { + let ClusterFarmerFarmDetails { + farm_id, + total_sectors_count, + fingerprint, + } = farm_details; + if let Some((farm_index, mut known_farm)) = old_farms.remove(&farm_id) { + if known_farm.farm_id == farm_id { + let known_farmer = self + .get_known_farmer(farmer_id) + .expect("Farmer should be available"); + if known_farm.fingerprint == fingerprint { + // Do nothing if farm is already known + known_farmer.known_farms.insert(farm_index, known_farm); + None + } else { + // Update fingerprint + let (expired_sender, expired_receiver) = oneshot::channel(); + known_farm.expired_sender = expired_sender; + known_farmer.known_farms.insert(farm_index, known_farm); + Some(KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add: true, + remove: true, + }) + } + } else { + None + } } else { + // Add new farm let (expired_sender, expired_receiver) = oneshot::channel(); - known_farm.fingerprint = fingerprint; - known_farm.expired_sender = expired_sender; - - Some(KnownFarmInsertResult::FingerprintUpdated { + self.get_known_farmer(farmer_id) + .expect("Farmer should be available") + .known_farms + .insert( + farm_index, + KnownFarm { + farm_id, + fingerprint, + expired_sender, + }, + ); + Some(KnownFarmInsertResult { farm_index, + farm_id, + total_sectors_count, expired_receiver, + add: true, + remove: false, }) } - } else { - None - } - }) - { - return existing_result; + }) + .collect::>() + } + KnownFarmerInsertResult::NotInserted => { + unreachable!("KnownFarmerInsertResult::NotInserted should be handled above") + } } + } - for farm_index in FarmIndex::MIN..=FarmIndex::MAX { - if let Entry::Vacant(entry) = self.known_farms.entry(farm_index) { - let (expired_sender, expired_receiver) = oneshot::channel(); + fn get_known_farmer(&mut self, farmer_id: FarmerId) -> Option<&mut KnownFarmer> { + self.known_farmers + .iter_mut() + .find(|known_farmer| known_farmer.farmer_id == farmer_id) + } - entry.insert(KnownFarm { - farm_id, - fingerprint, - last_identification: Instant::now(), - expired_sender, - }); + fn pick_farmer_index(&self, len: usize) -> Vec { + let used_indices = self + .known_farmers + .iter() + .flat_map(|known_farmer| known_farmer.known_farms.keys()) + .collect::>(); - return KnownFarmInsertResult::Inserted { - farm_index, - expired_receiver, - }; + let mut available_indices = Vec::with_capacity(len); + + for farm_index in FarmIndex::MIN..=FarmIndex::MAX { + if !used_indices.contains(&farm_index) { + if available_indices.len() < len { + available_indices.push(farm_index); + } else { + return available_indices; + } } } - warn!(%farm_id, max_supported_farm_index = %FarmIndex::MAX, "Too many farms, ignoring"); - KnownFarmInsertResult::NotInserted + warn!(max_supported_farm_index = %FarmIndex::MAX, "Too many farms"); + available_indices } fn remove_expired(&mut self) -> impl Iterator + '_ { - self.known_farms.extract_if(|_farm_index, known_farm| { - known_farm.last_identification.elapsed() > self.identification_broadcast_interval * 2 - }) + self.known_farmers + .extract_if(.., |known_farmer| { + known_farmer.last_identification.elapsed() + > self.identification_broadcast_interval * 2 + }) + .flat_map(|known_farmer| known_farmer.known_farms) } fn remove(&mut self, farm_index: FarmIndex) { - self.known_farms.remove(&farm_index); + self.known_farmers.iter_mut().for_each(|known_farmer| { + known_farmer.known_farms.remove(&farm_index); + }); } } @@ -141,7 +294,7 @@ pub async fn maintain_farms( plotted_pieces: &Arc>>, identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { - let mut known_farms = KnownFarms::new(identification_broadcast_interval); + let mut known_farms = KnownFarmers::new(identification_broadcast_interval); // Futures that need to be processed sequentially in order to add/remove farms, if farm was // added, future will resolve with `Some`, `None` if removed @@ -152,11 +305,9 @@ pub async fn maintain_farms( let mut farms = FuturesUnordered::new(); let farmer_identify_subscription = pin!(nats_client - .subscribe_to_broadcasts::(None, None) + .subscribe_to_broadcasts::(None, None) .await - .map_err(|error| anyhow!( - "Failed to subscribe to farmer identify farm broadcast: {error}" - ))?); + .map_err(|error| anyhow!("Failed to subscribe to farmer identify broadcast: {error}"))?); // Request farmer to identify themselves if let Err(error) = nats_client @@ -213,14 +364,31 @@ pub async fn maintain_farms( let Some(identify_message) = maybe_identify_message else { return Err(anyhow!("Farmer identify stream ended")); }; + let farmer_id = identify_message.farmer_id; - process_farm_identify_message( + process_farmer_identify_message( identify_message, nats_client, &mut known_farms, &mut farms_to_add_remove, plotted_pieces, - ); + async { + nats_client + .stream_request( + &ClusterFarmerFarmDetailsRequest, + Some(&farmer_id.to_string()), + ) + .await + .inspect_err(|error| { + warn!( + %error, + %farmer_id, + "Failed to request farmer farm details" + ) + }) + .ok() + }, + ).await; } _ = farm_pruning_interval.tick().fuse() => { for (farm_index, removed_farm) in known_farms.remove_expired() { @@ -278,114 +446,107 @@ pub async fn maintain_farms( } } -fn process_farm_identify_message<'a>( - identify_message: ClusterFarmerIdentifyFarmBroadcast, +async fn process_farmer_identify_message<'a, Fut, S>( + identify_message: ClusterFarmerIdentifyBroadcast, nats_client: &'a NatsClient, - known_farms: &mut KnownFarms, + known_farms: &mut KnownFarmers, farms_to_add_remove: &mut VecDeque>, plotted_pieces: &'a Arc>>, -) { - let ClusterFarmerIdentifyFarmBroadcast { - farm_id, - total_sectors_count, + farms_stream: Fut, +) where + Fut: Future>, + S: Stream + Unpin, +{ + let ClusterFarmerIdentifyBroadcast { + farmer_id, fingerprint, } = identify_message; - let (farm_index, expired_receiver, add, remove) = - match known_farms.insert_or_update(farm_id, fingerprint) { - KnownFarmInsertResult::Inserted { - farm_index, - expired_receiver, - } => { - info!( - %farm_index, - %farm_id, - "Discovered new farm, initializing" - ); - - (farm_index, expired_receiver, true, false) - } - KnownFarmInsertResult::FingerprintUpdated { - farm_index, - expired_receiver, - } => { - info!( - %farm_index, - %farm_id, - "Farm fingerprint updated, re-initializing" - ); - - (farm_index, expired_receiver, true, true) - } - KnownFarmInsertResult::NotInserted => { - trace!( - %farm_id, - "Received identification for already known farm" - ); - // Nothing to do here - return; - } - }; - - if remove { - farms_to_add_remove.push_back(Box::pin(async move { - let plotted_pieces = Arc::clone(plotted_pieces); - - let delete_farm_fut = task::spawn_blocking(move || { - plotted_pieces.write_blocking().delete_farm(farm_index); - }); - if let Err(error) = delete_farm_fut.await { - error!( - %farm_index, - %farm_id, - %error, - "Failed to delete farm that was replaced" - ); - } - - None - })); - } - if add { - farms_to_add_remove.push_back(Box::pin(async move { - match initialize_farm( - farm_index, + for KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add, + remove, + } in known_farms + .insert_or_update_farmer(farmer_id, fingerprint, farms_stream) + .await + { + if remove { + remove_farm( farm_id, - total_sectors_count, - Arc::clone(plotted_pieces), - nats_client, + farm_index, + farms_to_add_remove, + plotted_pieces.clone(), ) - .await - { - Ok(farm) => { - if remove { - info!( - %farm_index, - %farm_id, - "Farm re-initialized successfully" - ); - } else { - info!( - %farm_index, - %farm_id, - "Farm initialized successfully" + .await; + } + + if add { + farms_to_add_remove.push_back(Box::pin(async move { + match initialize_farm( + farm_index, + farm_id, + total_sectors_count, + plotted_pieces.clone(), + nats_client, + ) + .await + { + Ok(farm) => { + if remove { + info!( + %farm_index, + %farm_id, + "Farm re-initialized successfully" + ); + } else { + info!( + %farm_index, + %farm_id, + "Farm initialized successfully" + ); + } + + Some((farm_index, expired_receiver, farm)) + } + Err(error) => { + warn!( + %error, + "Failed to initialize farm {farm_id}" ); + None } - - Some((farm_index, expired_receiver, farm)) } - Err(error) => { - warn!( - %error, - "Failed to initialize farm {farm_id}" - ); - None - } - } - })); + })); + } } } +async fn remove_farm( + farm_id: FarmId, + farm_index: FarmIndex, + farms_to_add_remove: &mut VecDeque>, + plotted_pieces: Arc>>, +) { + farms_to_add_remove.push_back(Box::pin(async move { + let delete_farm_fut = task::spawn_blocking(move || { + plotted_pieces.write_blocking().delete_farm(farm_index); + }); + if let Err(error) = delete_farm_fut.await { + error!( + %farm_index, + %farm_id, + %error, + "Failed to delete farm that was replaced", + ); + } + + None + })); +} + async fn initialize_farm( farm_index: FarmIndex, farm_id: FarmId, diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index d6afd67b74..6ea0c505b9 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -12,7 +12,7 @@ use crate::cluster::nats_client::{ GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, }; use crate::farm::{ - Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader, + Farm, FarmError, FarmId, FarmerId, FarmingNotification, HandlerFn, HandlerId, PieceReader, PlottedSectors, SectorUpdate, }; use crate::utils::AsyncJoinOnDrop; @@ -40,9 +40,31 @@ const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); type Handler = Bag, A>; -/// Broadcast with identification details by farmers +/// Broadcast with farmer id for identification #[derive(Debug, Clone, Encode, Decode)] -pub struct ClusterFarmerIdentifyFarmBroadcast { +pub struct ClusterFarmerIdentifyBroadcast { + /// Farmer ID + pub farmer_id: FarmerId, + /// Farmer fingerprint changes when something about internal farm changes (like allocated space) + pub fingerprint: Blake3Hash, +} + +impl GenericBroadcast for ClusterFarmerIdentifyBroadcast { + const SUBJECT: &'static str = "subspace.farmer.*.farmer-identify"; +} + +/// Request farm details from farmer +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmDetailsRequest; + +impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest { + const SUBJECT: &'static str = "subspace.farmer.*.farm.details"; + type Response = ClusterFarmerFarmDetails; +} + +/// Farm details +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmDetails { /// Farm ID pub farm_id: FarmId, /// Total number of sectors in the farm @@ -51,10 +73,6 @@ pub struct ClusterFarmerIdentifyFarmBroadcast { pub fingerprint: Blake3Hash, } -impl GenericBroadcast for ClusterFarmerIdentifyFarmBroadcast { - const SUBJECT: &'static str = "subspace.farmer.*.identify"; -} - /// Broadcast with sector updates by farmers #[derive(Debug, Clone, Encode, Decode)] struct ClusterFarmerSectorUpdateBroadcast { @@ -236,7 +254,7 @@ impl Farm for ClusterFarm { impl ClusterFarm { /// Create new instance using information from previously received - /// [`ClusterFarmerIdentifyFarmBroadcast`] + /// [`ClusterFarmerIdentifyBroadcast`] pub async fn new( farm_id: FarmId, total_sectors_count: SectorIndex, @@ -344,6 +362,23 @@ struct FarmDetails { _background_tasks: Option>, } +impl FarmDetails { + fn derive_identification(&self) -> ClusterFarmerFarmDetails { + ClusterFarmerFarmDetails { + farm_id: self.farm_id, + total_sectors_count: self.total_sectors_count, + fingerprint: self.derive_fingerprint(), + } + } + + fn derive_fingerprint(&self) -> Blake3Hash { + blake3_hash_list(&[ + &self.farm_id.encode(), + &self.total_sectors_count.to_le_bytes(), + ]) + } +} + /// Create farmer service for specified farms that will be processing incoming requests and send /// periodic identify notifications. /// @@ -358,6 +393,9 @@ pub fn farmer_service( where F: Farm, { + let farmer_id = FarmerId::new(); + let farmer_id_string = farmer_id.to_string(); + // For each farm start forwarding notifications as broadcast messages and create farm details // that can be used to respond to incoming requests let farms_details = farms @@ -479,7 +517,20 @@ where async move { if primary_instance { select! { - result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => { + result = identify_responder( + &nats_client, + farmer_id, + &farmer_id_string, + &farms_details, + identification_broadcast_interval + ).fuse() => { + result + }, + result = farms_details_responder( + &nats_client, + &farmer_id_string, + &farms_details + ).fuse() => { result }, result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { @@ -506,6 +557,8 @@ where /// broadcast in response, also send periodic notifications reminding that farm exists async fn identify_responder( nats_client: &NatsClient, + farmer_id: FarmerId, + farmer_id_string: &str, farms_details: &[FarmDetails], identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { @@ -539,14 +592,14 @@ async fn identify_responder( } last_identification = Instant::now(); - send_identify_broadcast(nats_client, farms_details).await; + send_identify_broadcast(nats_client, farmer_id, farmer_id_string, farms_details).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Farmer self-identification"); - send_identify_broadcast(nats_client, farms_details).await; + send_identify_broadcast(nats_client, farmer_id, farmer_id_string, farms_details).await; } } } @@ -554,34 +607,66 @@ async fn identify_responder( Ok(()) } -async fn send_identify_broadcast(nats_client: &NatsClient, farms_details: &[FarmDetails]) { - farms_details +async fn send_identify_broadcast( + nats_client: &NatsClient, + farmer_id: FarmerId, + farmer_id_string: &str, + farms_details: &[FarmDetails], +) { + if farms_details.is_empty() { + warn!("No farm, skip sending farmer identify notification"); + return; + } + + if let Err(error) = nats_client + .broadcast( + &new_identify_message(farmer_id, farms_details), + farmer_id_string, + ) + .await + { + warn!(%farmer_id, %error, "Failed to send farmer identify notification"); + } +} + +fn new_identify_message( + farmer_id: FarmerId, + farms_details: &[FarmDetails], +) -> ClusterFarmerIdentifyBroadcast { + let farmer_id_bytes = farmer_id.encode(); + let farms_sectors_counts = farms_details .iter() - .map(|farm_details| async move { - if let Err(error) = nats_client - .broadcast( - &ClusterFarmerIdentifyFarmBroadcast { - farm_id: farm_details.farm_id, - total_sectors_count: farm_details.total_sectors_count, - fingerprint: blake3_hash_list(&[ - &farm_details.farm_id.encode(), - &farm_details.total_sectors_count.to_le_bytes(), - ]), - }, - &farm_details.farm_id_string, - ) - .await - { - warn!( - farm_id = %farm_details.farm_id, - %error, - "Failed to send farmer identify notification" - ); - } - }) - .collect::>() - .collect::>() - .await; + .map(|farm_details| farm_details.total_sectors_count.to_le_bytes()) + .collect::>(); + let mut farms_sectors_counts = farms_sectors_counts + .iter() + .map(AsRef::as_ref) + .collect::>(); + farms_sectors_counts.push(farmer_id_bytes.as_slice()); + let fingerprint = blake3_hash_list(farms_sectors_counts.as_slice()); + + ClusterFarmerIdentifyBroadcast { + farmer_id, + fingerprint, + } +} + +async fn farms_details_responder( + nats_client: &NatsClient, + farmer_id_string: &str, + farms_details: &[FarmDetails], +) -> anyhow::Result<()> { + nats_client + .stream_request_responder( + Some(farmer_id_string), + Some(farmer_id_string.to_string()), + |_request: ClusterFarmerFarmDetailsRequest| async { + Some(stream::iter( + farms_details.iter().map(FarmDetails::derive_identification), + )) + }, + ) + .await } async fn plotted_sectors_responder( diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index 32d506549e..28377d8c70 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -522,6 +522,62 @@ impl HandlerId for event_listener_primitives::HandlerId { } } +/// An identifier for a farmer, can be used for in logs, thread names, etc. +#[derive( + Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, +)] +#[serde(untagged)] +pub enum FarmerId { + /// Farmer ID + Ulid(Ulid), +} + +impl Encode for FarmerId { + #[inline] + fn size_hint(&self) -> usize { + 1_usize + + match self { + FarmerId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)), + } + } + + #[inline] + fn encode_to(&self, output: &mut O) { + match self { + FarmerId::Ulid(ulid) => { + output.push_byte(0); + Encode::encode_to(&ulid.0, output); + } + } + } +} + +impl EncodeLike for FarmerId {} + +impl Decode for FarmerId { + #[inline] + fn decode(input: &mut I) -> Result { + match input + .read_byte() + .map_err(|e| e.chain("Could not decode `FarmerId`, failed to read variant byte"))? + { + 0 => u128::decode(input) + .map(|ulid| FarmerId::Ulid(Ulid(ulid))) + .map_err(|e| e.chain("Could not decode `FarmerId::Ulid.0`")), + _ => Err("Could not decode `FarmerId`, variant doesn't exist".into()), + } + } +} + +#[allow(clippy::new_without_default)] +impl FarmerId { + /// Creates new ID + #[inline] + pub fn new() -> Self { + Self::Ulid(Ulid::new()) + } +} + /// An identifier for a farm, can be used for in logs, thread names, etc. #[derive( Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, From 1704ed576865a34b6aea4157d43a3131c69071e6 Mon Sep 17 00:00:00 2001 From: tediou5 Date: Mon, 20 Jan 2025 18:00:29 +0800 Subject: [PATCH 5/8] chore: refactor and moving for cluster::controller::farms --- .../src/cluster/controller/farms.rs | 427 +++++++++--------- 1 file changed, 222 insertions(+), 205 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/controller/farms.rs b/crates/subspace-farmer/src/cluster/controller/farms.rs index 98a2f491fe..c199b264f0 100644 --- a/crates/subspace-farmer/src/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/cluster/controller/farms.rs @@ -17,7 +17,7 @@ use async_lock::RwLock as AsyncRwLock; use futures::channel::oneshot; use futures::future::FusedFuture; use futures::stream::FuturesUnordered; -use futures::{select, FutureExt, Stream, StreamExt}; +use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::collections::{HashMap, HashSet, VecDeque}; use std::future::{ready, Future}; @@ -31,6 +31,8 @@ use tokio::task; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, warn}; +type CollectFarmerFarmsFuture = + Pin>>>>; type AddRemoveFuture<'a> = Pin, ClusterFarm)>> + 'a>>; @@ -53,87 +55,30 @@ struct KnownFarm { } enum KnownFarmerInsertResult { - Inserted, + Inserted { + farmer_id: FarmerId, + fingerprint: Blake3Hash, + }, FingerprintUpdated { + farmer_id: FarmerId, old_farms: HashMap, }, NotInserted, } -struct KnownFarmInsertResult { - farm_index: FarmIndex, - farm_id: FarmId, - total_sectors_count: u16, - expired_receiver: oneshot::Receiver<()>, - add: bool, - remove: bool, -} - -#[derive(Debug)] -struct KnownFarmers { - identification_broadcast_interval: Duration, - known_farmers: Vec, -} - -impl KnownFarmers { - fn new(identification_broadcast_interval: Duration) -> Self { - Self { - identification_broadcast_interval, - known_farmers: Vec::new(), - } - } - - async fn insert_or_update_farmer( - &mut self, - farmer_id: FarmerId, - fingerprint: Blake3Hash, - farms_stream: Fut, - ) -> Vec - where - Fut: Future>, - S: Stream + Unpin, - { - let result = self - .known_farmers - .iter_mut() - .find_map(|known_farmer| { - let check_farmer_id = known_farmer.farmer_id == farmer_id; - let check_fingerprint = known_farmer.fingerprint == fingerprint; - match (check_farmer_id, check_fingerprint) { - (true, true) => { - debug!(%farmer_id,"Updating last identification for farmer"); - known_farmer.last_identification = Instant::now(); - Some(KnownFarmerInsertResult::NotInserted) - } - (true, false) => { - let old_farms = known_farmer - .known_farms - .drain() - .map(|(farm_index, know_farm)| { - (know_farm.farm_id, (farm_index, know_farm)) - }) - .collect(); - known_farmer.fingerprint = fingerprint; - known_farmer.last_identification = Instant::now(); - Some(KnownFarmerInsertResult::FingerprintUpdated { old_farms }) - } - (false, _) => None, - } - }) - .unwrap_or(KnownFarmerInsertResult::Inserted); - - if let KnownFarmerInsertResult::NotInserted = result { - return vec![]; - } - - let Some(farms_stream) = farms_stream.await else { - return vec![]; - }; - let farms = farms_stream.collect::>().await; - let farm_indices = self.pick_farmer_index(farms.len()); - - match result { - KnownFarmerInsertResult::Inserted => { +impl KnownFarmerInsertResult { + fn process( + self, + farms: Vec, + known_farmers: &mut KnownFarmers, + ) -> Vec { + let farm_indices = known_farmers.pick_farmer_index(farms.len()); + + match self { + KnownFarmerInsertResult::Inserted { + farmer_id, + fingerprint, + } => { let mut known_farmer = KnownFarmer { farmer_id, fingerprint, @@ -170,10 +115,13 @@ impl KnownFarmers { } }) .collect::>(); - self.known_farmers.push(known_farmer); + known_farmers.known_farmers.push(known_farmer); res } - KnownFarmerInsertResult::FingerprintUpdated { mut old_farms } => { + KnownFarmerInsertResult::FingerprintUpdated { + farmer_id, + mut old_farms, + } => { farm_indices .into_iter() .zip(farms) @@ -184,36 +132,33 @@ impl KnownFarmers { fingerprint, } = farm_details; if let Some((farm_index, mut known_farm)) = old_farms.remove(&farm_id) { - if known_farm.farm_id == farm_id { - let known_farmer = self - .get_known_farmer(farmer_id) - .expect("Farmer should be available"); - if known_farm.fingerprint == fingerprint { - // Do nothing if farm is already known - known_farmer.known_farms.insert(farm_index, known_farm); - None - } else { - // Update fingerprint - let (expired_sender, expired_receiver) = oneshot::channel(); - known_farm.expired_sender = expired_sender; - known_farmer.known_farms.insert(farm_index, known_farm); - Some(KnownFarmInsertResult { - farm_index, - farm_id, - total_sectors_count, - expired_receiver, - add: true, - remove: true, - }) - } - } else { + let known_farmer = known_farmers + .get_known_farmer(farmer_id) + .expect("Farmer should be available"); + if known_farm.fingerprint == fingerprint { + // Do nothing if farm is already known + known_farmer.known_farms.insert(farm_index, known_farm); None + } else { + // Update fingerprint + let (expired_sender, expired_receiver) = oneshot::channel(); + known_farm.expired_sender = expired_sender; + known_farmer.known_farms.insert(farm_index, known_farm); + Some(KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add: true, + remove: true, + }) } } else { // Add new farm let (expired_sender, expired_receiver) = oneshot::channel(); - self.get_known_farmer(farmer_id) + known_farmers + .get_known_farmer(farmer_id) .expect("Farmer should be available") .known_farms .insert( @@ -241,6 +186,148 @@ impl KnownFarmers { } } } +} + +struct KnownFarmInsertResult { + farm_index: FarmIndex, + farm_id: FarmId, + total_sectors_count: u16, + expired_receiver: oneshot::Receiver<()>, + add: bool, + remove: bool, +} + +impl KnownFarmInsertResult { + fn process<'a>( + self, + nats_client: &'a NatsClient, + farms_to_add_remove: &mut VecDeque>, + plotted_pieces: Arc>>, + ) { + let KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add, + remove, + } = self; + + if remove { + remove_farm( + farm_id, + farm_index, + farms_to_add_remove, + plotted_pieces.clone(), + ); + } + + if add { + farms_to_add_remove.push_back(Box::pin(async move { + match initialize_farm( + farm_index, + farm_id, + total_sectors_count, + plotted_pieces.clone(), + nats_client, + ) + .await + { + Ok(farm) => { + if remove { + info!( + %farm_index, + %farm_id, + "Farm re-initialized successfully" + ); + } else { + info!( + %farm_index, + %farm_id, + "Farm initialized successfully" + ); + } + + Some((farm_index, expired_receiver, farm)) + } + Err(error) => { + warn!( + %error, + "Failed to initialize farm {farm_id}" + ); + None + } + } + })); + } + } +} + +#[derive(Debug)] +struct KnownFarmers { + identification_broadcast_interval: Duration, + known_farmers: Vec, +} + +impl KnownFarmers { + fn new(identification_broadcast_interval: Duration) -> Self { + Self { + identification_broadcast_interval, + known_farmers: Vec::new(), + } + } + + async fn insert_or_update_farmer( + &mut self, + farmer_id: FarmerId, + fingerprint: Blake3Hash, + nats_client: &NatsClient, + ) -> Vec { + let result = self + .known_farmers + .iter_mut() + .find_map(|known_farmer| { + let check_farmer_id = known_farmer.farmer_id == farmer_id; + let check_fingerprint = known_farmer.fingerprint == fingerprint; + match (check_farmer_id, check_fingerprint) { + (true, true) => { + debug!(%farmer_id,"Updating last identification for farmer"); + known_farmer.last_identification = Instant::now(); + Some(KnownFarmerInsertResult::NotInserted) + } + (true, false) => { + let old_farms = known_farmer + .known_farms + .drain() + .map(|(farm_index, know_farm)| { + (know_farm.farm_id, (farm_index, know_farm)) + }) + .collect(); + known_farmer.fingerprint = fingerprint; + known_farmer.last_identification = Instant::now(); + Some(KnownFarmerInsertResult::FingerprintUpdated { + farmer_id, + old_farms, + }) + } + (false, _) => None, + } + }) + .unwrap_or(KnownFarmerInsertResult::Inserted { + farmer_id, + fingerprint, + }); + + if let KnownFarmerInsertResult::NotInserted = result { + return vec![]; + } + + let Ok(farms) = collect_farmer_farms(farmer_id, nats_client).await else { + return vec![]; + }; + + result.process(farms, self) + } fn get_known_farmer(&mut self, farmer_id: FarmerId) -> Option<&mut KnownFarmer> { self.known_farmers @@ -280,7 +367,7 @@ impl KnownFarmers { .flat_map(|known_farmer| known_farmer.known_farms) } - fn remove(&mut self, farm_index: FarmIndex) { + fn remove_farm(&mut self, farm_index: FarmIndex) { self.known_farmers.iter_mut().for_each(|known_farmer| { known_farmer.known_farms.remove(&farm_index); }); @@ -294,7 +381,7 @@ pub async fn maintain_farms( plotted_pieces: &Arc>>, identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { - let mut known_farms = KnownFarmers::new(identification_broadcast_interval); + let mut known_farmers = KnownFarmers::new(identification_broadcast_interval); // Futures that need to be processed sequentially in order to add/remove farms, if farm was // added, future will resolve with `Some`, `None` if removed @@ -333,7 +420,7 @@ pub async fn maintain_farms( select! { (farm_index, result) = farms.select_next_some() => { - known_farms.remove(farm_index); + known_farmers.remove_farm(farm_index); farms_to_add_remove.push_back(Box::pin(async move { let plotted_pieces = Arc::clone(plotted_pieces); @@ -364,34 +451,20 @@ pub async fn maintain_farms( let Some(identify_message) = maybe_identify_message else { return Err(anyhow!("Farmer identify stream ended")); }; - let farmer_id = identify_message.farmer_id; + let ClusterFarmerIdentifyBroadcast { + farmer_id, + fingerprint, + } = identify_message; - process_farmer_identify_message( - identify_message, - nats_client, - &mut known_farms, - &mut farms_to_add_remove, - plotted_pieces, - async { - nats_client - .stream_request( - &ClusterFarmerFarmDetailsRequest, - Some(&farmer_id.to_string()), - ) - .await - .inspect_err(|error| { - warn!( - %error, - %farmer_id, - "Failed to request farmer farm details" - ) - }) - .ok() - }, - ).await; + for farm_insert_result in known_farmers + .insert_or_update_farmer(farmer_id, fingerprint, nats_client) + .await + { + farm_insert_result.process(nats_client, &mut farms_to_add_remove, Arc::clone(plotted_pieces)); + } } _ = farm_pruning_interval.tick().fuse() => { - for (farm_index, removed_farm) in known_farms.remove_expired() { + for (farm_index, removed_farm) in known_farmers.remove_expired() { let farm_id = removed_farm.farm_id; if removed_farm.expired_sender.send(()).is_ok() { @@ -446,85 +519,29 @@ pub async fn maintain_farms( } } -async fn process_farmer_identify_message<'a, Fut, S>( - identify_message: ClusterFarmerIdentifyBroadcast, - nats_client: &'a NatsClient, - known_farms: &mut KnownFarmers, - farms_to_add_remove: &mut VecDeque>, - plotted_pieces: &'a Arc>>, - farms_stream: Fut, -) where - Fut: Future>, - S: Stream + Unpin, -{ - let ClusterFarmerIdentifyBroadcast { - farmer_id, - fingerprint, - } = identify_message; - - for KnownFarmInsertResult { - farm_index, - farm_id, - total_sectors_count, - expired_receiver, - add, - remove, - } in known_farms - .insert_or_update_farmer(farmer_id, fingerprint, farms_stream) - .await - { - if remove { - remove_farm( - farm_id, - farm_index, - farms_to_add_remove, - plotted_pieces.clone(), +/// Collect `ClusterFarmerFarmDetails` from the farmer by sending a stream request +fn collect_farmer_farms(farmer_id: FarmerId, nats_client: &NatsClient) -> CollectFarmerFarmsFuture { + let nats_client = nats_client.clone(); + Box::pin(async move { + Ok(nats_client + .stream_request( + &ClusterFarmerFarmDetailsRequest, + Some(&farmer_id.to_string()), ) - .await; - } - - if add { - farms_to_add_remove.push_back(Box::pin(async move { - match initialize_farm( - farm_index, - farm_id, - total_sectors_count, - plotted_pieces.clone(), - nats_client, + .await + .inspect_err(|error| { + warn!( + %error, + %farmer_id, + "Failed to request farmer farm details" ) - .await - { - Ok(farm) => { - if remove { - info!( - %farm_index, - %farm_id, - "Farm re-initialized successfully" - ); - } else { - info!( - %farm_index, - %farm_id, - "Farm initialized successfully" - ); - } - - Some((farm_index, expired_receiver, farm)) - } - Err(error) => { - warn!( - %error, - "Failed to initialize farm {farm_id}" - ); - None - } - } - })); - } - } + })? + .collect() + .await) + }) } -async fn remove_farm( +fn remove_farm( farm_id: FarmId, farm_index: FarmIndex, farms_to_add_remove: &mut VecDeque>, From 75ddfe6451c705da1faaa3f7bc75c3de537ef4ce Mon Sep 17 00:00:00 2001 From: tediou5 Date: Mon, 20 Jan 2025 17:34:32 +0800 Subject: [PATCH 6/8] opt: collect farms details stream request in the background --- .../src/cluster/controller/farms.rs | 77 ++++++++++++------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/controller/farms.rs b/crates/subspace-farmer/src/cluster/controller/farms.rs index c199b264f0..43502b421a 100644 --- a/crates/subspace-farmer/src/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/cluster/controller/farms.rs @@ -31,8 +31,13 @@ use tokio::task; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, warn}; -type CollectFarmerFarmsFuture = - Pin>>>>; +type CollectFarmerFarmsFuture = Pin< + Box< + dyn Future< + Output = anyhow::Result<(KnownFarmerInsertResult, Vec)>, + >, + >, +>; type AddRemoveFuture<'a> = Pin, ClusterFarm)>> + 'a>>; @@ -277,12 +282,13 @@ impl KnownFarmers { } } - async fn insert_or_update_farmer( + fn insert_or_update_farmer( &mut self, farmer_id: FarmerId, fingerprint: Blake3Hash, nats_client: &NatsClient, - ) -> Vec { + farms_in_farmer_collector: &mut FuturesUnordered, + ) { let result = self .known_farmers .iter_mut() @@ -319,14 +325,10 @@ impl KnownFarmers { }); if let KnownFarmerInsertResult::NotInserted = result { - return vec![]; + return; } - let Ok(farms) = collect_farmer_farms(farmer_id, nats_client).await else { - return vec![]; - }; - - result.process(farms, self) + farms_in_farmer_collector.push(collect_farmer_farms(farmer_id, result, nats_client)); } fn get_known_farmer(&mut self, farmer_id: FarmerId) -> Option<&mut KnownFarmer> { @@ -383,6 +385,7 @@ pub async fn maintain_farms( ) -> anyhow::Result<()> { let mut known_farmers = KnownFarmers::new(identification_broadcast_interval); + let mut farms_in_farmer_collector = FuturesUnordered::::new(); // Futures that need to be processed sequentially in order to add/remove farms, if farm was // added, future will resolve with `Some`, `None` if removed let mut farms_to_add_remove = VecDeque::>::new(); @@ -456,9 +459,20 @@ pub async fn maintain_farms( fingerprint, } = identify_message; - for farm_insert_result in known_farmers - .insert_or_update_farmer(farmer_id, fingerprint, nats_client) - .await + known_farmers.insert_or_update_farmer( + farmer_id, + fingerprint, + nats_client, + &mut farms_in_farmer_collector, + ); + } + maybe_new_farmer_farms = farms_in_farmer_collector.select_next_some() => { + let Ok((farmer_insert_result, farms)) = maybe_new_farmer_farms else { + // Collecting farmer farms failed, continue + continue; + }; + + for farm_insert_result in farmer_insert_result.process(farms, &mut known_farmers) { farm_insert_result.process(nats_client, &mut farms_to_add_remove, Arc::clone(plotted_pieces)); } @@ -520,24 +534,31 @@ pub async fn maintain_farms( } /// Collect `ClusterFarmerFarmDetails` from the farmer by sending a stream request -fn collect_farmer_farms(farmer_id: FarmerId, nats_client: &NatsClient) -> CollectFarmerFarmsFuture { +fn collect_farmer_farms( + farmer_id: FarmerId, + result: KnownFarmerInsertResult, + nats_client: &NatsClient, +) -> CollectFarmerFarmsFuture { let nats_client = nats_client.clone(); Box::pin(async move { - Ok(nats_client - .stream_request( - &ClusterFarmerFarmDetailsRequest, - Some(&farmer_id.to_string()), - ) - .await - .inspect_err(|error| { - warn!( - %error, - %farmer_id, - "Failed to request farmer farm details" + Ok(( + result, + nats_client + .stream_request( + &ClusterFarmerFarmDetailsRequest, + Some(&farmer_id.to_string()), ) - })? - .collect() - .await) + .await + .inspect_err(|error| { + warn!( + %error, + %farmer_id, + "Failed to request farmer farm details" + ) + })? + .collect() + .await, + )) }) } From 03d3e381104290a406b35efa53980ad76edae566 Mon Sep 17 00:00:00 2001 From: tediou5 Date: Fri, 17 Jan 2025 11:25:35 +0800 Subject: [PATCH 7/8] chore: remove unnecessary checks when sending identification broadcast --- crates/subspace-farmer/src/cluster/cache.rs | 27 ++++---------------- crates/subspace-farmer/src/cluster/farmer.rs | 5 ---- 2 files changed, 5 insertions(+), 27 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 63eb8a4d6e..407f980520 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -323,7 +323,6 @@ where result = identify_responder( &nats_client, cache_id, - &caches_details, cache_group, identification_broadcast_interval ).fuse() => { @@ -378,16 +377,12 @@ where /// /// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times /// per controller instance in order to parallelize more work across threads if needed. -async fn identify_responder( +async fn identify_responder( nats_client: &NatsClient, cache_id: CacheId, - caches_details: &[CacheDetails<'_, C>], cache_group: &str, identification_broadcast_interval: Duration, -) -> anyhow::Result<()> -where - C: PieceCache, -{ +) -> anyhow::Result<()> { let mut subscription = nats_client .subscribe_to_broadcasts::(Some(cache_group), None) .await @@ -418,14 +413,14 @@ where } last_identification = Instant::now(); - send_identify_broadcast(nats_client, cache_id, caches_details, cache_group).await; + send_identify_broadcast(nats_client, cache_id, cache_group).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Cache self-identification"); - send_identify_broadcast(nats_client, cache_id, caches_details, cache_group).await; + send_identify_broadcast(nats_client, cache_id, cache_group).await; } } } @@ -433,19 +428,7 @@ where Ok(()) } -async fn send_identify_broadcast( - nats_client: &NatsClient, - cache_id: CacheId, - caches_details: &[CacheDetails<'_, C>], - cache_group: &str, -) where - C: PieceCache, -{ - if caches_details.is_empty() { - warn!("No cache, skip sending cache identify notification"); - return; - } - +async fn send_identify_broadcast(nats_client: &NatsClient, cache_id: CacheId, cache_group: &str) { if let Err(error) = nats_client .broadcast(&ClusterCacheIdentifyBroadcast { cache_id }, cache_group) .await diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 6ea0c505b9..f81e93a719 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -613,11 +613,6 @@ async fn send_identify_broadcast( farmer_id_string: &str, farms_details: &[FarmDetails], ) { - if farms_details.is_empty() { - warn!("No farm, skip sending farmer identify notification"); - return; - } - if let Err(error) = nats_client .broadcast( &new_identify_message(farmer_id, farms_details), From 75b0755051853f331273c5694e9d872c84c5f358 Mon Sep 17 00:00:00 2001 From: tediou5 Date: Fri, 17 Jan 2025 11:10:14 +0800 Subject: [PATCH 8/8] chore: improve comments on cache and farmer's SUBJECT --- crates/subspace-farmer/src/cluster/cache.rs | 7 ++++++- crates/subspace-farmer/src/cluster/farmer.rs | 11 +++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 407f980520..3cda8aa79c 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -32,7 +32,7 @@ const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); pub struct ClusterCacheDetailsRequest; impl GenericStreamRequest for ClusterCacheDetailsRequest { - /// `*` here stands for cache group + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.details"; type Response = ClusterPieceCacheDetails; } @@ -67,6 +67,7 @@ struct ClusterCacheWritePieceRequest { } impl GenericRequest for ClusterCacheWritePieceRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.write-piece"; type Response = Result<(), String>; } @@ -78,6 +79,7 @@ struct ClusterCacheReadPieceIndexRequest { } impl GenericRequest for ClusterCacheReadPieceIndexRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.read-piece-index"; type Response = Result, String>; } @@ -89,6 +91,7 @@ pub(super) struct ClusterCacheReadPieceRequest { } impl GenericRequest for ClusterCacheReadPieceRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.read-piece"; type Response = Result, String>; } @@ -100,6 +103,7 @@ pub(super) struct ClusterCacheReadPiecesRequest { } impl GenericStreamRequest for ClusterCacheReadPiecesRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.read-pieces"; type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>; } @@ -109,6 +113,7 @@ impl GenericStreamRequest for ClusterCacheReadPiecesRequest { struct ClusterCacheContentsRequest; impl GenericStreamRequest for ClusterCacheContentsRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.contents"; type Response = Result<(PieceCacheOffset, Option), String>; } diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index f81e93a719..7ea7c7e5df 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -40,16 +40,17 @@ const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); type Handler = Bag, A>; -/// Broadcast with farmer id for identification +/// Broadcast with cluster farmer id for identification #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterFarmerIdentifyBroadcast { - /// Farmer ID + /// Cluster farmer ID pub farmer_id: FarmerId, /// Farmer fingerprint changes when something about internal farm changes (like allocated space) pub fingerprint: Blake3Hash, } impl GenericBroadcast for ClusterFarmerIdentifyBroadcast { + /// `*` here stands for cluster farmer ID const SUBJECT: &'static str = "subspace.farmer.*.farmer-identify"; } @@ -58,6 +59,7 @@ impl GenericBroadcast for ClusterFarmerIdentifyBroadcast { pub struct ClusterFarmerFarmDetailsRequest; impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest { + /// `*` here stands for cluster farmer ID const SUBJECT: &'static str = "subspace.farmer.*.farm.details"; type Response = ClusterFarmerFarmDetails; } @@ -85,6 +87,7 @@ struct ClusterFarmerSectorUpdateBroadcast { } impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.sector-update"; } @@ -98,6 +101,7 @@ struct ClusterFarmerFarmingNotificationBroadcast { } impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.farming-notification"; } @@ -111,6 +115,7 @@ struct ClusterFarmerSolutionBroadcast { } impl GenericBroadcast for ClusterFarmerSolutionBroadcast { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.solution-response"; } @@ -122,6 +127,7 @@ struct ClusterFarmerReadPieceRequest { } impl GenericRequest for ClusterFarmerReadPieceRequest { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.farm.read-piece"; type Response = Result, String>; } @@ -131,6 +137,7 @@ impl GenericRequest for ClusterFarmerReadPieceRequest { struct ClusterFarmerPlottedSectorsRequest; impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.farm.plotted-sectors"; type Response = Result; }