diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index fe8f5dc40f..3cda8aa79c 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -11,7 +11,7 @@ use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use crate::cluster::nats_client::{ GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, }; -use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset}; +use crate::farm::{CacheId, FarmError, PieceCache, PieceCacheId, PieceCacheOffset}; use anyhow::anyhow; use async_trait::async_trait; use futures::stream::FuturesUnordered; @@ -27,18 +27,35 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); +/// Request cache details from cache +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterCacheDetailsRequest; + +impl GenericStreamRequest for ClusterCacheDetailsRequest { + /// `*` here stands for cache ID + const SUBJECT: &'static str = "subspace.cache.*.details"; + type Response = ClusterPieceCacheDetails; +} + +/// Cache details +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterPieceCacheDetails { + /// Piece Cache ID + pub piece_cache_id: PieceCacheId, + /// Max number of elements in this cache + pub max_num_elements: u32, +} + /// Broadcast with identification details by caches #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterCacheIdentifyBroadcast { /// Cache ID - pub cache_id: PieceCacheId, - /// Max number of elements in this cache - pub max_num_elements: u32, + pub cache_id: CacheId, } impl GenericBroadcast for ClusterCacheIdentifyBroadcast { /// `*` here stands for cache group - const SUBJECT: &'static str = "subspace.cache.*.identify"; + const SUBJECT: &'static str = "subspace.cache.*.cache-identify"; } /// Write piece into cache @@ -50,6 +67,7 @@ struct ClusterCacheWritePieceRequest { } impl GenericRequest for ClusterCacheWritePieceRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.write-piece"; type Response = Result<(), String>; } @@ -61,6 +79,7 @@ struct ClusterCacheReadPieceIndexRequest { } impl GenericRequest for ClusterCacheReadPieceIndexRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.read-piece-index"; type Response = Result, String>; } @@ -72,6 +91,7 @@ pub(super) struct ClusterCacheReadPieceRequest { } impl GenericRequest for ClusterCacheReadPieceRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.read-piece"; type Response = Result, String>; } @@ -83,6 +103,7 @@ pub(super) struct ClusterCacheReadPiecesRequest { } impl GenericStreamRequest for ClusterCacheReadPiecesRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.read-pieces"; type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>; } @@ -92,6 +113,7 @@ impl GenericStreamRequest for ClusterCacheReadPiecesRequest { struct ClusterCacheContentsRequest; impl GenericStreamRequest for ClusterCacheContentsRequest { + /// `*` here stands for cache ID const SUBJECT: &'static str = "subspace.cache.*.contents"; type Response = Result<(PieceCacheOffset, Option), String>; } @@ -99,8 +121,8 @@ impl GenericStreamRequest for ClusterCacheContentsRequest { /// Cluster cache implementation #[derive(Debug)] pub struct ClusterPieceCache { - cache_id: PieceCacheId, - cache_id_string: String, + piece_cache_id: PieceCacheId, + piece_cache_id_string: String, max_num_elements: u32, nats_client: NatsClient, } @@ -108,7 +130,7 @@ pub struct ClusterPieceCache { #[async_trait] impl PieceCache for ClusterPieceCache { fn id(&self) -> &PieceCacheId { - &self.cache_id + &self.piece_cache_id } #[inline] @@ -129,7 +151,10 @@ impl PieceCache for ClusterPieceCache { > { Ok(Box::new( self.nats_client - .stream_request(&ClusterCacheContentsRequest, Some(&self.cache_id_string)) + .stream_request( + &ClusterCacheContentsRequest, + Some(&self.piece_cache_id_string), + ) .await? .map(|response| response.map_err(FarmError::from)), )) @@ -149,7 +174,7 @@ impl PieceCache for ClusterPieceCache { piece_index, piece: piece.clone(), }, - Some(&self.cache_id_string), + Some(&self.piece_cache_id_string), ) .await??) } @@ -162,7 +187,7 @@ impl PieceCache for ClusterPieceCache { .nats_client .request( &ClusterCacheReadPieceIndexRequest { offset }, - Some(&self.cache_id_string), + Some(&self.piece_cache_id_string), ) .await??) } @@ -175,7 +200,7 @@ impl PieceCache for ClusterPieceCache { .nats_client .request( &ClusterCacheReadPieceRequest { offset }, - Some(&self.cache_id_string), + Some(&self.piece_cache_id_string), ) .await??) } @@ -198,7 +223,7 @@ impl PieceCache for ClusterPieceCache { .nats_client .stream_request( &ClusterCacheReadPiecesRequest { offsets }, - Some(&self.cache_id_string), + Some(&self.piece_cache_id_string), ) .await? .map(|response| response.map_err(FarmError::from)) @@ -235,13 +260,13 @@ impl ClusterPieceCache { /// [`ClusterCacheIdentifyBroadcast`] #[inline] pub fn new( - cache_id: PieceCacheId, + piece_cache_id: PieceCacheId, max_num_elements: u32, nats_client: NatsClient, ) -> ClusterPieceCache { Self { - cache_id, - cache_id_string: cache_id.to_string(), + piece_cache_id, + piece_cache_id_string: piece_cache_id.to_string(), max_num_elements, nats_client, } @@ -250,11 +275,23 @@ impl ClusterPieceCache { #[derive(Debug)] struct CacheDetails<'a, C> { - cache_id: PieceCacheId, - cache_id_string: String, + piece_cache_id: PieceCacheId, + piece_cache_id_string: String, cache: &'a C, } +impl CacheDetails<'_, C> +where + C: PieceCache, +{ + fn derive_identification(&self) -> ClusterPieceCacheDetails { + ClusterPieceCacheDetails { + piece_cache_id: self.piece_cache_id, + max_num_elements: self.cache.max_num_elements(), + } + } +} + /// Create cache service for specified caches that will be processing incoming requests and send /// periodic identify notifications pub async fn cache_service( @@ -267,18 +304,20 @@ pub async fn cache_service( where C: PieceCache, { + let cache_id = CacheId::new(); + let cache_id_string = cache_id.to_string(); + let caches_details = caches .iter() .map(|cache| { - let cache_id = *cache.id(); - + let piece_cache_id = *cache.id(); if primary_instance { - info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); + info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created piece cache"); } CacheDetails { - cache_id, - cache_id_string: cache_id.to_string(), + piece_cache_id, + piece_cache_id_string: piece_cache_id.to_string(), cache, } }) @@ -286,7 +325,19 @@ where if primary_instance { select! { - result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => { + result = identify_responder( + &nats_client, + cache_id, + cache_group, + identification_broadcast_interval + ).fuse() => { + result + }, + result = caches_details_responder( + &nats_client, + &cache_id_string, + &caches_details + ).fuse() => { result }, result = write_piece_responder(&nats_client, &caches_details).fuse() => { @@ -331,15 +382,12 @@ where /// /// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times /// per controller instance in order to parallelize more work across threads if needed. -async fn identify_responder( +async fn identify_responder( nats_client: &NatsClient, - caches_details: &[CacheDetails<'_, C>], + cache_id: CacheId, cache_group: &str, identification_broadcast_interval: Duration, -) -> anyhow::Result<()> -where - C: PieceCache, -{ +) -> anyhow::Result<()> { let mut subscription = nats_client .subscribe_to_broadcasts::(Some(cache_group), None) .await @@ -370,14 +418,14 @@ where } last_identification = Instant::now(); - send_identify_broadcast(nats_client, caches_details, cache_group).await; + send_identify_broadcast(nats_client, cache_id, cache_group).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Cache self-identification"); - send_identify_broadcast(nats_client, caches_details, cache_group).await; + send_identify_broadcast(nats_client, cache_id, cache_group).await; } } } @@ -385,36 +433,36 @@ where Ok(()) } -async fn send_identify_broadcast( +async fn send_identify_broadcast(nats_client: &NatsClient, cache_id: CacheId, cache_group: &str) { + if let Err(error) = nats_client + .broadcast(&ClusterCacheIdentifyBroadcast { cache_id }, cache_group) + .await + { + warn!(%cache_id, %error, "Failed to send cache identify notification"); + } +} + +async fn caches_details_responder( nats_client: &NatsClient, + cache_id_string: &str, caches_details: &[CacheDetails<'_, C>], - cache_group: &str, -) where +) -> anyhow::Result<()> +where C: PieceCache, { - caches_details - .iter() - .map(|cache| async move { - if let Err(error) = nats_client - .broadcast( - &ClusterCacheIdentifyBroadcast { - cache_id: cache.cache_id, - max_num_elements: cache.cache.max_num_elements(), - }, - cache_group, - ) - .await - { - warn!( - cache_id = %cache.cache_id, - %error, - "Failed to send cache identify notification" - ); - } - }) - .collect::>() - .collect::>() - .await; + nats_client + .stream_request_responder( + Some(cache_id_string), + Some(cache_id_string.to_string()), + |_request: ClusterCacheDetailsRequest| async { + Some(stream::iter( + caches_details + .iter() + .map(CacheDetails::derive_identification), + )) + }, + ) + .await } async fn write_piece_responder( @@ -429,8 +477,8 @@ where .map(|cache_details| async move { nats_client .request_responder( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |request: ClusterCacheWritePieceRequest| async move { Some( cache_details @@ -441,7 +489,7 @@ where ) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() @@ -462,8 +510,8 @@ where .map(|cache_details| async move { nats_client .request_responder( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |request: ClusterCacheReadPieceIndexRequest| async move { Some( cache_details @@ -474,7 +522,7 @@ where ) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() @@ -495,8 +543,8 @@ where .map(|cache_details| async move { nats_client .request_responder( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |request: ClusterCacheReadPieceRequest| async move { Some( cache_details @@ -507,7 +555,7 @@ where ) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() @@ -528,8 +576,8 @@ where .map(|cache_details| async move { nats_client .stream_request_responder::<_, _, Pin + Send>>, _>( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |ClusterCacheReadPiecesRequest { offsets }| async move { Some( match cache_details @@ -551,7 +599,7 @@ where ) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() @@ -572,8 +620,8 @@ where .map(|cache_details| async move { nats_client .stream_request_responder::<_, _, Pin + Send>>, _>( - Some(cache_details.cache_id_string.as_str()), - Some(cache_details.cache_id_string.clone()), + Some(cache_details.piece_cache_id_string.as_str()), + Some(cache_details.piece_cache_id_string.clone()), |_request: ClusterCacheContentsRequest| async move { Some(match cache_details.cache.contents().await { Ok(contents) => Box::pin(contents.map(|maybe_cache_element| { @@ -589,7 +637,7 @@ where }) }, ) - .instrument(info_span!("", cache_id = %cache_details.cache_id)) + .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id)) .await }) .collect::>() diff --git a/crates/subspace-farmer/src/cluster/controller/caches.rs b/crates/subspace-farmer/src/cluster/controller/caches.rs index 5eecf861b2..a4a679b376 100644 --- a/crates/subspace-farmer/src/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/cluster/controller/caches.rs @@ -5,14 +5,18 @@ //! cache addition and removal, tries to reduce number of reinitializations that result in potential //! piece cache sync, etc. -use crate::cluster::cache::{ClusterCacheIdentifyBroadcast, ClusterPieceCache}; +use crate::cluster::cache::{ + ClusterCacheDetailsRequest, ClusterCacheIdentifyBroadcast, ClusterPieceCache, + ClusterPieceCacheDetails, +}; use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use crate::cluster::nats_client::NatsClient; -use crate::farm::{PieceCache, PieceCacheId}; +use crate::farm::{CacheId, PieceCache}; use crate::farmer_cache::FarmerCache; use anyhow::anyhow; use futures::channel::oneshot; use futures::future::FusedFuture; +use futures::stream::FuturesUnordered; use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::future::{ready, Future}; @@ -20,15 +24,16 @@ use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::time::MissedTickBehavior; -use tracing::{info, trace, warn}; +use tracing::{debug, info, warn}; +type CollectPieceCachesFuture = Pin>>>; const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3); #[derive(Debug)] struct KnownCache { - cache_id: PieceCacheId, + cache_id: CacheId, last_identification: Instant, - piece_cache: Arc, + piece_caches: Vec>, } #[derive(Debug)] @@ -48,17 +53,22 @@ impl KnownCaches { fn get_all(&self) -> Vec> { self.known_caches .iter() - .map(|known_cache| Arc::clone(&known_cache.piece_cache) as Arc<_>) + .flat_map(|known_cache| { + known_cache + .piece_caches + .iter() + .map(|piece_cache| Arc::clone(piece_cache) as Arc<_>) + }) .collect() } - /// Return `true` if farmer cache reinitialization is required - fn update( + /// Update cache's last identification with given `cache_id` or add it if it doesn't exist + fn update_cache( &mut self, - cache_id: PieceCacheId, - max_num_elements: u32, + cache_id: CacheId, nats_client: &NatsClient, - ) -> bool { + piece_caches_to_add: &mut FuturesUnordered, + ) { if self.known_caches.iter_mut().any(|known_cache| { if known_cache.cache_id == cache_id { known_cache.last_identification = Instant::now(); @@ -67,20 +77,10 @@ impl KnownCaches { false } }) { - return false; + return; } - let piece_cache = Arc::new(ClusterPieceCache::new( - cache_id, - max_num_elements, - nats_client.clone(), - )); - self.known_caches.push(KnownCache { - cache_id, - last_identification: Instant::now(), - piece_cache, - }); - true + piece_caches_to_add.push(collect_new_cache(cache_id, nats_client)); } fn remove_expired(&mut self) -> impl Iterator + '_ { @@ -99,8 +99,9 @@ pub async fn maintain_caches( ) -> anyhow::Result<()> { let mut known_caches = KnownCaches::new(identification_broadcast_interval); + let mut piece_caches_to_add = FuturesUnordered::new(); + let mut scheduled_reinitialization_for = None; - // Farm that is being added/removed right now (if any) let mut cache_reinitialization = (Box::pin(ready(())) as Pin>>).fuse(); @@ -164,24 +165,24 @@ pub async fn maintain_caches( return Err(anyhow!("Cache identify stream ended")); }; - let ClusterCacheIdentifyBroadcast { + let ClusterCacheIdentifyBroadcast { cache_id } = identify_message; + + known_caches.update_cache( cache_id, - max_num_elements, - } = identify_message; - if known_caches.update(cache_id, max_num_elements, nats_client) { - info!( - %cache_id, - "New cache discovered, scheduling reinitialization" - ); - scheduled_reinitialization_for.replace( - Instant::now() + SCHEDULE_REINITIALIZATION_DELAY, - ); - } else { - trace!( - %cache_id, - "Received identification for already known cache" - ); - } + nats_client, + &mut piece_caches_to_add, + ) + } + maybe_new_cache = piece_caches_to_add.select_next_some() => { + let Ok(new_cache) = maybe_new_cache else { + // Collecting new cache failed, continue + continue; + }; + + info!(cache_id = %new_cache.cache_id, "New cache discovered, scheduling reinitialization"); + scheduled_reinitialization_for.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY); + + known_caches.known_caches.push(new_cache); } _ = cache_pruning_interval.tick().fuse() => { let mut reinit = false; @@ -206,3 +207,42 @@ pub async fn maintain_caches( } } } + +/// Collect piece caches from the cache and convert them to `ClusterPieceCache` by sending a stream request, +/// then construct a `KnownCache` instance. +fn collect_new_cache(cache_id: CacheId, nats_client: &NatsClient) -> CollectPieceCachesFuture { + let nats_client = nats_client.clone(); + Box::pin(async move { + let piece_caches = nats_client + .stream_request(&ClusterCacheDetailsRequest, Some(&cache_id.to_string())) + .await + .inspect_err(|error| { + warn!( + %error, + %cache_id, + "Failed to request farmer farm details" + ) + })? + .map( + |ClusterPieceCacheDetails { + piece_cache_id, + max_num_elements, + }| { + debug!(%cache_id, %piece_cache_id, %max_num_elements, "Discovered new piece cache"); + Arc::new(ClusterPieceCache::new( + piece_cache_id, + max_num_elements, + nats_client.clone(), + )) + }, + ) + .collect() + .await; + + Ok(KnownCache { + cache_id, + last_identification: Instant::now(), + piece_caches, + }) + }) +} diff --git a/crates/subspace-farmer/src/cluster/controller/farms.rs b/crates/subspace-farmer/src/cluster/controller/farms.rs index 4c725e3d49..43502b421a 100644 --- a/crates/subspace-farmer/src/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/cluster/controller/farms.rs @@ -5,10 +5,13 @@ //! automatically handles dynamic farm addition and removal, etc. use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; -use crate::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast}; +use crate::cluster::farmer::{ + ClusterFarm, ClusterFarmerFarmDetails, ClusterFarmerFarmDetailsRequest, + ClusterFarmerIdentifyBroadcast, +}; use crate::cluster::nats_client::NatsClient; use crate::farm::plotted_pieces::PlottedPieces; -use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; +use crate::farm::{Farm, FarmId, FarmerId, SectorPlottingDetails, SectorUpdate}; use anyhow::anyhow; use async_lock::RwLock as AsyncRwLock; use futures::channel::oneshot; @@ -16,8 +19,7 @@ use futures::future::FusedFuture; use futures::stream::FuturesUnordered; use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; -use std::collections::hash_map::Entry; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::future::{ready, Future}; use std::mem; use std::pin::{pin, Pin}; @@ -27,110 +29,350 @@ use subspace_core_primitives::hashes::Blake3Hash; use subspace_core_primitives::sectors::SectorIndex; use tokio::task; use tokio::time::MissedTickBehavior; -use tracing::{error, info, trace, warn}; - +use tracing::{debug, error, info, warn}; + +type CollectFarmerFarmsFuture = Pin< + Box< + dyn Future< + Output = anyhow::Result<(KnownFarmerInsertResult, Vec)>, + >, + >, +>; type AddRemoveFuture<'a> = Pin, ClusterFarm)>> + 'a>>; /// Number of farms in a cluster is currently limited to 2^16 pub type FarmIndex = u16; +#[derive(Debug)] +struct KnownFarmer { + farmer_id: FarmerId, + fingerprint: Blake3Hash, + last_identification: Instant, + known_farms: HashMap, +} + #[derive(Debug)] struct KnownFarm { farm_id: FarmId, fingerprint: Blake3Hash, - last_identification: Instant, expired_sender: oneshot::Sender<()>, } -enum KnownFarmInsertResult { +enum KnownFarmerInsertResult { Inserted { - farm_index: FarmIndex, - expired_receiver: oneshot::Receiver<()>, + farmer_id: FarmerId, + fingerprint: Blake3Hash, }, FingerprintUpdated { - farm_index: FarmIndex, - expired_receiver: oneshot::Receiver<()>, + farmer_id: FarmerId, + old_farms: HashMap, }, NotInserted, } +impl KnownFarmerInsertResult { + fn process( + self, + farms: Vec, + known_farmers: &mut KnownFarmers, + ) -> Vec { + let farm_indices = known_farmers.pick_farmer_index(farms.len()); + + match self { + KnownFarmerInsertResult::Inserted { + farmer_id, + fingerprint, + } => { + let mut known_farmer = KnownFarmer { + farmer_id, + fingerprint, + last_identification: Instant::now(), + known_farms: HashMap::new(), + }; + + let res = farm_indices + .into_iter() + .zip(farms) + .map(|(farm_index, farm_details)| { + let ClusterFarmerFarmDetails { + farm_id, + total_sectors_count, + fingerprint, + } = farm_details; + let (expired_sender, expired_receiver) = oneshot::channel(); + known_farmer.known_farms.insert( + farm_index, + KnownFarm { + farm_id, + fingerprint, + expired_sender, + }, + ); + info!(%farmer_id, %farm_id, %total_sectors_count, "Discovered new farm"); + KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add: true, + remove: false, + } + }) + .collect::>(); + known_farmers.known_farmers.push(known_farmer); + res + } + KnownFarmerInsertResult::FingerprintUpdated { + farmer_id, + mut old_farms, + } => { + farm_indices + .into_iter() + .zip(farms) + .filter_map(|(farm_index, farm_details)| { + let ClusterFarmerFarmDetails { + farm_id, + total_sectors_count, + fingerprint, + } = farm_details; + if let Some((farm_index, mut known_farm)) = old_farms.remove(&farm_id) { + let known_farmer = known_farmers + .get_known_farmer(farmer_id) + .expect("Farmer should be available"); + if known_farm.fingerprint == fingerprint { + // Do nothing if farm is already known + known_farmer.known_farms.insert(farm_index, known_farm); + None + } else { + // Update fingerprint + let (expired_sender, expired_receiver) = oneshot::channel(); + known_farm.expired_sender = expired_sender; + known_farmer.known_farms.insert(farm_index, known_farm); + Some(KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add: true, + remove: true, + }) + } + } else { + // Add new farm + let (expired_sender, expired_receiver) = oneshot::channel(); + + known_farmers + .get_known_farmer(farmer_id) + .expect("Farmer should be available") + .known_farms + .insert( + farm_index, + KnownFarm { + farm_id, + fingerprint, + expired_sender, + }, + ); + Some(KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add: true, + remove: false, + }) + } + }) + .collect::>() + } + KnownFarmerInsertResult::NotInserted => { + unreachable!("KnownFarmerInsertResult::NotInserted should be handled above") + } + } + } +} + +struct KnownFarmInsertResult { + farm_index: FarmIndex, + farm_id: FarmId, + total_sectors_count: u16, + expired_receiver: oneshot::Receiver<()>, + add: bool, + remove: bool, +} + +impl KnownFarmInsertResult { + fn process<'a>( + self, + nats_client: &'a NatsClient, + farms_to_add_remove: &mut VecDeque>, + plotted_pieces: Arc>>, + ) { + let KnownFarmInsertResult { + farm_index, + farm_id, + total_sectors_count, + expired_receiver, + add, + remove, + } = self; + + if remove { + remove_farm( + farm_id, + farm_index, + farms_to_add_remove, + plotted_pieces.clone(), + ); + } + + if add { + farms_to_add_remove.push_back(Box::pin(async move { + match initialize_farm( + farm_index, + farm_id, + total_sectors_count, + plotted_pieces.clone(), + nats_client, + ) + .await + { + Ok(farm) => { + if remove { + info!( + %farm_index, + %farm_id, + "Farm re-initialized successfully" + ); + } else { + info!( + %farm_index, + %farm_id, + "Farm initialized successfully" + ); + } + + Some((farm_index, expired_receiver, farm)) + } + Err(error) => { + warn!( + %error, + "Failed to initialize farm {farm_id}" + ); + None + } + } + })); + } + } +} + #[derive(Debug)] -struct KnownFarms { +struct KnownFarmers { identification_broadcast_interval: Duration, - known_farms: HashMap, + known_farmers: Vec, } -impl KnownFarms { +impl KnownFarmers { fn new(identification_broadcast_interval: Duration) -> Self { Self { identification_broadcast_interval, - known_farms: HashMap::new(), + known_farmers: Vec::new(), } } - fn insert_or_update( + fn insert_or_update_farmer( &mut self, - farm_id: FarmId, + farmer_id: FarmerId, fingerprint: Blake3Hash, - ) -> KnownFarmInsertResult { - if let Some(existing_result) = - self.known_farms - .iter_mut() - .find_map(|(&farm_index, known_farm)| { - if known_farm.farm_id == farm_id { - if known_farm.fingerprint == fingerprint { - known_farm.last_identification = Instant::now(); - Some(KnownFarmInsertResult::NotInserted) - } else { - let (expired_sender, expired_receiver) = oneshot::channel(); - - known_farm.fingerprint = fingerprint; - known_farm.expired_sender = expired_sender; - - Some(KnownFarmInsertResult::FingerprintUpdated { - farm_index, - expired_receiver, + nats_client: &NatsClient, + farms_in_farmer_collector: &mut FuturesUnordered, + ) { + let result = self + .known_farmers + .iter_mut() + .find_map(|known_farmer| { + let check_farmer_id = known_farmer.farmer_id == farmer_id; + let check_fingerprint = known_farmer.fingerprint == fingerprint; + match (check_farmer_id, check_fingerprint) { + (true, true) => { + debug!(%farmer_id,"Updating last identification for farmer"); + known_farmer.last_identification = Instant::now(); + Some(KnownFarmerInsertResult::NotInserted) + } + (true, false) => { + let old_farms = known_farmer + .known_farms + .drain() + .map(|(farm_index, know_farm)| { + (know_farm.farm_id, (farm_index, know_farm)) }) - } - } else { - None + .collect(); + known_farmer.fingerprint = fingerprint; + known_farmer.last_identification = Instant::now(); + Some(KnownFarmerInsertResult::FingerprintUpdated { + farmer_id, + old_farms, + }) } - }) - { - return existing_result; + (false, _) => None, + } + }) + .unwrap_or(KnownFarmerInsertResult::Inserted { + farmer_id, + fingerprint, + }); + + if let KnownFarmerInsertResult::NotInserted = result { + return; } - for farm_index in FarmIndex::MIN..=FarmIndex::MAX { - if let Entry::Vacant(entry) = self.known_farms.entry(farm_index) { - let (expired_sender, expired_receiver) = oneshot::channel(); + farms_in_farmer_collector.push(collect_farmer_farms(farmer_id, result, nats_client)); + } - entry.insert(KnownFarm { - farm_id, - fingerprint, - last_identification: Instant::now(), - expired_sender, - }); + fn get_known_farmer(&mut self, farmer_id: FarmerId) -> Option<&mut KnownFarmer> { + self.known_farmers + .iter_mut() + .find(|known_farmer| known_farmer.farmer_id == farmer_id) + } - return KnownFarmInsertResult::Inserted { - farm_index, - expired_receiver, - }; + fn pick_farmer_index(&self, len: usize) -> Vec { + let used_indices = self + .known_farmers + .iter() + .flat_map(|known_farmer| known_farmer.known_farms.keys()) + .collect::>(); + + let mut available_indices = Vec::with_capacity(len); + + for farm_index in FarmIndex::MIN..=FarmIndex::MAX { + if !used_indices.contains(&farm_index) { + if available_indices.len() < len { + available_indices.push(farm_index); + } else { + return available_indices; + } } } - warn!(%farm_id, max_supported_farm_index = %FarmIndex::MAX, "Too many farms, ignoring"); - KnownFarmInsertResult::NotInserted + warn!(max_supported_farm_index = %FarmIndex::MAX, "Too many farms"); + available_indices } fn remove_expired(&mut self) -> impl Iterator + '_ { - self.known_farms.extract_if(|_farm_index, known_farm| { - known_farm.last_identification.elapsed() > self.identification_broadcast_interval * 2 - }) + self.known_farmers + .extract_if(.., |known_farmer| { + known_farmer.last_identification.elapsed() + > self.identification_broadcast_interval * 2 + }) + .flat_map(|known_farmer| known_farmer.known_farms) } - fn remove(&mut self, farm_index: FarmIndex) { - self.known_farms.remove(&farm_index); + fn remove_farm(&mut self, farm_index: FarmIndex) { + self.known_farmers.iter_mut().for_each(|known_farmer| { + known_farmer.known_farms.remove(&farm_index); + }); } } @@ -141,8 +383,9 @@ pub async fn maintain_farms( plotted_pieces: &Arc>>, identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { - let mut known_farms = KnownFarms::new(identification_broadcast_interval); + let mut known_farmers = KnownFarmers::new(identification_broadcast_interval); + let mut farms_in_farmer_collector = FuturesUnordered::::new(); // Futures that need to be processed sequentially in order to add/remove farms, if farm was // added, future will resolve with `Some`, `None` if removed let mut farms_to_add_remove = VecDeque::>::new(); @@ -152,11 +395,9 @@ pub async fn maintain_farms( let mut farms = FuturesUnordered::new(); let farmer_identify_subscription = pin!(nats_client - .subscribe_to_broadcasts::(None, None) + .subscribe_to_broadcasts::(None, None) .await - .map_err(|error| anyhow!( - "Failed to subscribe to farmer identify farm broadcast: {error}" - ))?); + .map_err(|error| anyhow!("Failed to subscribe to farmer identify broadcast: {error}"))?); // Request farmer to identify themselves if let Err(error) = nats_client @@ -182,7 +423,7 @@ pub async fn maintain_farms( select! { (farm_index, result) = farms.select_next_some() => { - known_farms.remove(farm_index); + known_farmers.remove_farm(farm_index); farms_to_add_remove.push_back(Box::pin(async move { let plotted_pieces = Arc::clone(plotted_pieces); @@ -213,17 +454,31 @@ pub async fn maintain_farms( let Some(identify_message) = maybe_identify_message else { return Err(anyhow!("Farmer identify stream ended")); }; + let ClusterFarmerIdentifyBroadcast { + farmer_id, + fingerprint, + } = identify_message; - process_farm_identify_message( - identify_message, + known_farmers.insert_or_update_farmer( + farmer_id, + fingerprint, nats_client, - &mut known_farms, - &mut farms_to_add_remove, - plotted_pieces, + &mut farms_in_farmer_collector, ); } + maybe_new_farmer_farms = farms_in_farmer_collector.select_next_some() => { + let Ok((farmer_insert_result, farms)) = maybe_new_farmer_farms else { + // Collecting farmer farms failed, continue + continue; + }; + + for farm_insert_result in farmer_insert_result.process(farms, &mut known_farmers) + { + farm_insert_result.process(nats_client, &mut farms_to_add_remove, Arc::clone(plotted_pieces)); + } + } _ = farm_pruning_interval.tick().fuse() => { - for (farm_index, removed_farm) in known_farms.remove_expired() { + for (farm_index, removed_farm) in known_farmers.remove_expired() { let farm_id = removed_farm.farm_id; if removed_farm.expired_sender.send(()).is_ok() { @@ -278,112 +533,56 @@ pub async fn maintain_farms( } } -fn process_farm_identify_message<'a>( - identify_message: ClusterFarmerIdentifyFarmBroadcast, - nats_client: &'a NatsClient, - known_farms: &mut KnownFarms, - farms_to_add_remove: &mut VecDeque>, - plotted_pieces: &'a Arc>>, -) { - let ClusterFarmerIdentifyFarmBroadcast { - farm_id, - total_sectors_count, - fingerprint, - } = identify_message; - let (farm_index, expired_receiver, add, remove) = - match known_farms.insert_or_update(farm_id, fingerprint) { - KnownFarmInsertResult::Inserted { - farm_index, - expired_receiver, - } => { - info!( - %farm_index, - %farm_id, - "Discovered new farm, initializing" - ); - - (farm_index, expired_receiver, true, false) - } - KnownFarmInsertResult::FingerprintUpdated { - farm_index, - expired_receiver, - } => { - info!( - %farm_index, - %farm_id, - "Farm fingerprint updated, re-initializing" - ); - - (farm_index, expired_receiver, true, true) - } - KnownFarmInsertResult::NotInserted => { - trace!( - %farm_id, - "Received identification for already known farm" - ); - // Nothing to do here - return; - } - }; - - if remove { - farms_to_add_remove.push_back(Box::pin(async move { - let plotted_pieces = Arc::clone(plotted_pieces); - - let delete_farm_fut = task::spawn_blocking(move || { - plotted_pieces.write_blocking().delete_farm(farm_index); - }); - if let Err(error) = delete_farm_fut.await { - error!( - %farm_index, - %farm_id, - %error, - "Failed to delete farm that was replaced" - ); - } - - None - })); - } - - if add { - farms_to_add_remove.push_back(Box::pin(async move { - match initialize_farm( - farm_index, - farm_id, - total_sectors_count, - Arc::clone(plotted_pieces), - nats_client, - ) - .await - { - Ok(farm) => { - if remove { - info!( - %farm_index, - %farm_id, - "Farm re-initialized successfully" - ); - } else { - info!( - %farm_index, - %farm_id, - "Farm initialized successfully" - ); - } - - Some((farm_index, expired_receiver, farm)) - } - Err(error) => { +/// Collect `ClusterFarmerFarmDetails` from the farmer by sending a stream request +fn collect_farmer_farms( + farmer_id: FarmerId, + result: KnownFarmerInsertResult, + nats_client: &NatsClient, +) -> CollectFarmerFarmsFuture { + let nats_client = nats_client.clone(); + Box::pin(async move { + Ok(( + result, + nats_client + .stream_request( + &ClusterFarmerFarmDetailsRequest, + Some(&farmer_id.to_string()), + ) + .await + .inspect_err(|error| { warn!( %error, - "Failed to initialize farm {farm_id}" - ); - None - } - } - })); - } + %farmer_id, + "Failed to request farmer farm details" + ) + })? + .collect() + .await, + )) + }) +} + +fn remove_farm( + farm_id: FarmId, + farm_index: FarmIndex, + farms_to_add_remove: &mut VecDeque>, + plotted_pieces: Arc>>, +) { + farms_to_add_remove.push_back(Box::pin(async move { + let delete_farm_fut = task::spawn_blocking(move || { + plotted_pieces.write_blocking().delete_farm(farm_index); + }); + if let Err(error) = delete_farm_fut.await { + error!( + %farm_index, + %farm_id, + %error, + "Failed to delete farm that was replaced", + ); + } + + None + })); } async fn initialize_farm( diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index d6afd67b74..7ea7c7e5df 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -12,7 +12,7 @@ use crate::cluster::nats_client::{ GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, }; use crate::farm::{ - Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader, + Farm, FarmError, FarmId, FarmerId, FarmingNotification, HandlerFn, HandlerId, PieceReader, PlottedSectors, SectorUpdate, }; use crate::utils::AsyncJoinOnDrop; @@ -40,9 +40,33 @@ const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); type Handler = Bag, A>; -/// Broadcast with identification details by farmers +/// Broadcast with cluster farmer id for identification #[derive(Debug, Clone, Encode, Decode)] -pub struct ClusterFarmerIdentifyFarmBroadcast { +pub struct ClusterFarmerIdentifyBroadcast { + /// Cluster farmer ID + pub farmer_id: FarmerId, + /// Farmer fingerprint changes when something about internal farm changes (like allocated space) + pub fingerprint: Blake3Hash, +} + +impl GenericBroadcast for ClusterFarmerIdentifyBroadcast { + /// `*` here stands for cluster farmer ID + const SUBJECT: &'static str = "subspace.farmer.*.farmer-identify"; +} + +/// Request farm details from farmer +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmDetailsRequest; + +impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest { + /// `*` here stands for cluster farmer ID + const SUBJECT: &'static str = "subspace.farmer.*.farm.details"; + type Response = ClusterFarmerFarmDetails; +} + +/// Farm details +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmDetails { /// Farm ID pub farm_id: FarmId, /// Total number of sectors in the farm @@ -51,10 +75,6 @@ pub struct ClusterFarmerIdentifyFarmBroadcast { pub fingerprint: Blake3Hash, } -impl GenericBroadcast for ClusterFarmerIdentifyFarmBroadcast { - const SUBJECT: &'static str = "subspace.farmer.*.identify"; -} - /// Broadcast with sector updates by farmers #[derive(Debug, Clone, Encode, Decode)] struct ClusterFarmerSectorUpdateBroadcast { @@ -67,6 +87,7 @@ struct ClusterFarmerSectorUpdateBroadcast { } impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.sector-update"; } @@ -80,6 +101,7 @@ struct ClusterFarmerFarmingNotificationBroadcast { } impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.farming-notification"; } @@ -93,6 +115,7 @@ struct ClusterFarmerSolutionBroadcast { } impl GenericBroadcast for ClusterFarmerSolutionBroadcast { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.solution-response"; } @@ -104,6 +127,7 @@ struct ClusterFarmerReadPieceRequest { } impl GenericRequest for ClusterFarmerReadPieceRequest { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.farm.read-piece"; type Response = Result, String>; } @@ -113,6 +137,7 @@ impl GenericRequest for ClusterFarmerReadPieceRequest { struct ClusterFarmerPlottedSectorsRequest; impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest { + /// `*` here stands for single farm ID const SUBJECT: &'static str = "subspace.farmer.*.farm.plotted-sectors"; type Response = Result; } @@ -236,7 +261,7 @@ impl Farm for ClusterFarm { impl ClusterFarm { /// Create new instance using information from previously received - /// [`ClusterFarmerIdentifyFarmBroadcast`] + /// [`ClusterFarmerIdentifyBroadcast`] pub async fn new( farm_id: FarmId, total_sectors_count: SectorIndex, @@ -344,6 +369,23 @@ struct FarmDetails { _background_tasks: Option>, } +impl FarmDetails { + fn derive_identification(&self) -> ClusterFarmerFarmDetails { + ClusterFarmerFarmDetails { + farm_id: self.farm_id, + total_sectors_count: self.total_sectors_count, + fingerprint: self.derive_fingerprint(), + } + } + + fn derive_fingerprint(&self) -> Blake3Hash { + blake3_hash_list(&[ + &self.farm_id.encode(), + &self.total_sectors_count.to_le_bytes(), + ]) + } +} + /// Create farmer service for specified farms that will be processing incoming requests and send /// periodic identify notifications. /// @@ -358,6 +400,9 @@ pub fn farmer_service( where F: Farm, { + let farmer_id = FarmerId::new(); + let farmer_id_string = farmer_id.to_string(); + // For each farm start forwarding notifications as broadcast messages and create farm details // that can be used to respond to incoming requests let farms_details = farms @@ -479,7 +524,20 @@ where async move { if primary_instance { select! { - result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => { + result = identify_responder( + &nats_client, + farmer_id, + &farmer_id_string, + &farms_details, + identification_broadcast_interval + ).fuse() => { + result + }, + result = farms_details_responder( + &nats_client, + &farmer_id_string, + &farms_details + ).fuse() => { result }, result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { @@ -506,6 +564,8 @@ where /// broadcast in response, also send periodic notifications reminding that farm exists async fn identify_responder( nats_client: &NatsClient, + farmer_id: FarmerId, + farmer_id_string: &str, farms_details: &[FarmDetails], identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { @@ -539,14 +599,14 @@ async fn identify_responder( } last_identification = Instant::now(); - send_identify_broadcast(nats_client, farms_details).await; + send_identify_broadcast(nats_client, farmer_id, farmer_id_string, farms_details).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Farmer self-identification"); - send_identify_broadcast(nats_client, farms_details).await; + send_identify_broadcast(nats_client, farmer_id, farmer_id_string, farms_details).await; } } } @@ -554,34 +614,61 @@ async fn identify_responder( Ok(()) } -async fn send_identify_broadcast(nats_client: &NatsClient, farms_details: &[FarmDetails]) { - farms_details +async fn send_identify_broadcast( + nats_client: &NatsClient, + farmer_id: FarmerId, + farmer_id_string: &str, + farms_details: &[FarmDetails], +) { + if let Err(error) = nats_client + .broadcast( + &new_identify_message(farmer_id, farms_details), + farmer_id_string, + ) + .await + { + warn!(%farmer_id, %error, "Failed to send farmer identify notification"); + } +} + +fn new_identify_message( + farmer_id: FarmerId, + farms_details: &[FarmDetails], +) -> ClusterFarmerIdentifyBroadcast { + let farmer_id_bytes = farmer_id.encode(); + let farms_sectors_counts = farms_details .iter() - .map(|farm_details| async move { - if let Err(error) = nats_client - .broadcast( - &ClusterFarmerIdentifyFarmBroadcast { - farm_id: farm_details.farm_id, - total_sectors_count: farm_details.total_sectors_count, - fingerprint: blake3_hash_list(&[ - &farm_details.farm_id.encode(), - &farm_details.total_sectors_count.to_le_bytes(), - ]), - }, - &farm_details.farm_id_string, - ) - .await - { - warn!( - farm_id = %farm_details.farm_id, - %error, - "Failed to send farmer identify notification" - ); - } - }) - .collect::>() - .collect::>() - .await; + .map(|farm_details| farm_details.total_sectors_count.to_le_bytes()) + .collect::>(); + let mut farms_sectors_counts = farms_sectors_counts + .iter() + .map(AsRef::as_ref) + .collect::>(); + farms_sectors_counts.push(farmer_id_bytes.as_slice()); + let fingerprint = blake3_hash_list(farms_sectors_counts.as_slice()); + + ClusterFarmerIdentifyBroadcast { + farmer_id, + fingerprint, + } +} + +async fn farms_details_responder( + nats_client: &NatsClient, + farmer_id_string: &str, + farms_details: &[FarmDetails], +) -> anyhow::Result<()> { + nats_client + .stream_request_responder( + Some(farmer_id_string), + Some(farmer_id_string.to_string()), + |_request: ClusterFarmerFarmDetailsRequest| async { + Some(stream::iter( + farms_details.iter().map(FarmDetails::derive_identification), + )) + }, + ) + .await } async fn plotted_sectors_responder( diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index 6d6571f475..28377d8c70 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -52,11 +52,67 @@ pub trait PlottedSectors: Send + Sync + fmt::Debug { Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, )] #[serde(untagged)] -pub enum PieceCacheId { +pub enum CacheId { /// Cache ID Ulid(Ulid), } +impl Encode for CacheId { + #[inline] + fn size_hint(&self) -> usize { + 1_usize + + match self { + CacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)), + } + } + + #[inline] + fn encode_to(&self, output: &mut O) { + match self { + CacheId::Ulid(ulid) => { + output.push_byte(0); + Encode::encode_to(&ulid.0, output); + } + } + } +} + +impl EncodeLike for CacheId {} + +impl Decode for CacheId { + #[inline] + fn decode(input: &mut I) -> Result { + match input + .read_byte() + .map_err(|e| e.chain("Could not decode `CacheId`, failed to read variant byte"))? + { + 0 => u128::decode(input) + .map(|ulid| CacheId::Ulid(Ulid(ulid))) + .map_err(|e| e.chain("Could not decode `CacheId::Ulid.0`")), + _ => Err("Could not decode `CacheId`, variant doesn't exist".into()), + } + } +} + +#[allow(clippy::new_without_default)] +impl CacheId { + /// Creates new ID + #[inline] + pub fn new() -> Self { + Self::Ulid(Ulid::new()) + } +} + +/// An identifier for a piece cache, can be used for in logs, thread names, etc. +#[derive( + Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, +)] +#[serde(untagged)] +pub enum PieceCacheId { + /// Piece Cache ID + Ulid(Ulid), +} + impl Encode for PieceCacheId { #[inline] fn size_hint(&self) -> usize { @@ -84,12 +140,12 @@ impl Decode for PieceCacheId { fn decode(input: &mut I) -> Result { match input .read_byte() - .map_err(|e| e.chain("Could not decode `CacheId`, failed to read variant byte"))? + .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))? { 0 => u128::decode(input) .map(|ulid| PieceCacheId::Ulid(Ulid(ulid))) - .map_err(|e| e.chain("Could not decode `CacheId::Ulid.0`")), - _ => Err("Could not decode `CacheId`, variant doesn't exist".into()), + .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")), + _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()), } } } @@ -466,6 +522,62 @@ impl HandlerId for event_listener_primitives::HandlerId { } } +/// An identifier for a farmer, can be used for in logs, thread names, etc. +#[derive( + Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, +)] +#[serde(untagged)] +pub enum FarmerId { + /// Farmer ID + Ulid(Ulid), +} + +impl Encode for FarmerId { + #[inline] + fn size_hint(&self) -> usize { + 1_usize + + match self { + FarmerId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)), + } + } + + #[inline] + fn encode_to(&self, output: &mut O) { + match self { + FarmerId::Ulid(ulid) => { + output.push_byte(0); + Encode::encode_to(&ulid.0, output); + } + } + } +} + +impl EncodeLike for FarmerId {} + +impl Decode for FarmerId { + #[inline] + fn decode(input: &mut I) -> Result { + match input + .read_byte() + .map_err(|e| e.chain("Could not decode `FarmerId`, failed to read variant byte"))? + { + 0 => u128::decode(input) + .map(|ulid| FarmerId::Ulid(Ulid(ulid))) + .map_err(|e| e.chain("Could not decode `FarmerId::Ulid.0`")), + _ => Err("Could not decode `FarmerId`, variant doesn't exist".into()), + } + } +} + +#[allow(clippy::new_without_default)] +impl FarmerId { + /// Creates new ID + #[inline] + pub fn new() -> Self { + Self::Ulid(Ulid::new()) + } +} + /// An identifier for a farm, can be used for in logs, thread names, etc. #[derive( Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,