diff --git a/curp/proto/message.proto b/curp/proto/message.proto index f87962979..efa075f22 100644 --- a/curp/proto/message.proto +++ b/curp/proto/message.proto @@ -89,11 +89,46 @@ message ShutdownResponse { optional bytes error = 3; } +message ProposeConfChangeRequest { + enum ConfChangeType { + Add = 0; + AddLearner = 1; + Remove = 2; + Update = 3; + } + message ConfChange { + ConfChangeType change_type = 1; + uint64 node_id = 2; + repeated string address = 3; + } + string id = 1; + repeated ConfChange changes = 2; +} + +message Member { + uint64 id = 1; + string name = 2; + string addrs = 3; + bool is_learner = 4; +}; + +message ProposeConfChangeResponse { + enum ConfChangeError { + InvalidConfig = 0; + NodeNotExists = 1; + NodeAlreadyExists = 2; + } + optional ConfChangeError error = 1; + repeated Member members = 2; +} + + service Protocol { rpc Propose(commandpb.ProposeRequest) returns (commandpb.ProposeResponse); rpc WaitSynced(commandpb.WaitSyncedRequest) returns (commandpb.WaitSyncedResponse); rpc Shutdown(ShutdownRequest) returns (ShutdownResponse); + rpc ProposeConfChange (ProposeConfChangeRequest) returns (ProposeConfChangeResponse); rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse); rpc Vote(VoteRequest) returns (VoteResponse); rpc InstallSnapshot(stream InstallSnapshotRequest) diff --git a/curp/src/error.rs b/curp/src/error.rs index e49c22aad..c921919f1 100644 --- a/curp/src/error.rs +++ b/curp/src/error.rs @@ -304,6 +304,22 @@ impl From for CommandSyncError { } } +/// Error type of `apply_conf_change` +#[derive(Error, Debug, Clone, Copy)] +#[allow(clippy::module_name_repetitions)] // this-error generate code false-positive +#[non_exhaustive] +pub enum ApplyConfChangeError { + /// Config is invalid after applying the conf change + #[error("invalid config")] + InvalidConfig, + /// The node to be removed does not exist + #[error("node {0} does not exist")] + NodeNotExists(ServerId), + /// The node to be added already exists + #[error("node {0} already exists")] + NodeAlreadyExists(ServerId), +} + #[cfg(test)] mod test { use curp_test_utils::test_cmd::{ExecuteError, TestCommand}; diff --git a/curp/src/members.rs b/curp/src/members.rs index 956d99d94..c1978ab53 100644 --- a/curp/src/members.rs +++ b/curp/src/members.rs @@ -3,13 +3,14 @@ use std::{ hash::Hasher, }; +use dashmap::{mapref::one::Ref, DashMap}; use itertools::Itertools; /// Server Id pub type ServerId = u64; /// Cluster member -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Member { /// Server id of the member id: ServerId, @@ -19,6 +20,26 @@ pub struct Member { address: String, } +/// Cluster member +impl Member { + /// Create a new `Member` + #[inline] + pub fn new(id: ServerId, name: impl Into, address: impl Into) -> Self { + Self { + id, + name: name.into(), + address: address.into(), + } + } + + /// Get member id + #[must_use] + #[inline] + pub fn id(&self) -> ServerId { + self.id + } +} + /// cluster members information #[derive(Debug)] pub struct ClusterInfo { @@ -27,7 +48,7 @@ pub struct ClusterInfo { /// current server id member_id: ServerId, /// all members information - members: HashMap, + members: DashMap, } impl ClusterInfo { @@ -36,10 +57,10 @@ impl ClusterInfo { /// panic if `all_members` is empty #[inline] #[must_use] - pub fn new(all_members: HashMap, self_name: &str) -> Self { + pub fn new(all_members_addrs: HashMap, self_name: &str) -> Self { let mut member_id = 0; - let mut members = HashMap::new(); - for (name, address) in all_members { + let members = DashMap::new(); + for (name, address) in all_members_addrs { let id = Self::calculate_member_id(&address, "", None); if name == self_name { member_id = id; @@ -57,47 +78,82 @@ impl ClusterInfo { cluster_info } - /// Get server address via server id + /// Get all members #[must_use] #[inline] - pub fn address(&self, id: ServerId) -> Option<&str> { + pub fn all_members(&self) -> HashMap { + self.members + .iter() + .map(|t| (t.id, t.value().clone())) + .collect() + } + + /// Insert a member + #[inline] + pub fn insert(&self, member: Member) { + _ = self.members.insert(member.id, member); + } + + /// Remove a member + #[inline] + pub fn remove(&self, id: &ServerId) { + _ = self.members.remove(id); + } + + /// Update a member + #[inline] + pub fn update(&self, id: &ServerId, address: impl Into) { self.members - .values() - .find(|t| t.id == id) - .map(|t| t.address.as_str()) + .get_mut(id) + .unwrap_or_else(|| unreachable!("member {} not found", id)) + .address = address.into(); + } + + /// Get server address via server id + #[must_use] + #[inline] + pub fn address(&self, id: ServerId) -> Option { + self.members.get(&id).map(|t| t.address.clone()) } /// Get the current member - #[allow(clippy::indexing_slicing)] // self member id must be in members - fn self_member(&self) -> &Member { - &self.members[&self.member_id] + #[allow(clippy::unwrap_used)] // self member id must be in members + fn self_member(&self) -> Ref<'_, u64, Member> { + self.members.get(&self.member_id).unwrap() } /// Get the current server address #[must_use] #[inline] - pub fn self_address(&self) -> &str { - &self.self_member().address + pub fn self_address(&self) -> String { + self.self_member().address.clone() } /// Get the current server id #[must_use] #[inline] - pub fn self_name(&self) -> &str { - &self.self_member().name + pub fn self_name(&self) -> String { + self.self_member().name.clone() } - /// Get peers id + /// Get peers ids #[must_use] #[inline] pub fn peers_ids(&self) -> Vec { self.members - .values() + .iter() .filter(|t| t.id != self.member_id) .map(|t| t.id) .collect() } + /// Get all ids + #[must_use] + #[inline] + pub fn all_ids(&self) -> Vec { + self.members.iter().map(|t| t.id).collect() + } + /// Calculate the member id fn calculate_member_id(address: &str, cluster_name: &str, timestamp: Option) -> ServerId { let mut hasher = DefaultHasher::new(); @@ -112,8 +168,8 @@ impl ClusterInfo { /// Calculate the cluster id fn gen_cluster_id(&mut self) { let mut hasher = DefaultHasher::new(); - for id in self.members.keys().sorted() { - hasher.write_u64(*id); + for id in self.members.iter().map(|t| t.id).sorted() { + hasher.write_u64(id); } self.cluster_id = hasher.finish(); } @@ -135,9 +191,9 @@ impl ClusterInfo { /// Get peers #[must_use] #[inline] - pub fn peers(&self) -> HashMap { + pub fn peers_addrs(&self) -> HashMap { self.members - .values() + .iter() .filter(|t| t.id != self.member_id) .map(|t| (t.id, t.address.clone())) .collect() @@ -146,9 +202,9 @@ impl ClusterInfo { /// Get all members #[must_use] #[inline] - pub fn all_members(&self) -> HashMap { + pub fn all_members_addrs(&self) -> HashMap { self.members - .values() + .iter() .map(|t| (t.id, t.address.clone())) .collect() } @@ -167,7 +223,7 @@ impl ClusterInfo { pub fn get_id_by_name(&self, name: &str) -> Option { self.members .iter() - .find_map(|(_, m)| (m.name == name).then_some(m.id)) + .find_map(|m| (m.name == name).then_some(m.id)) } } @@ -206,7 +262,7 @@ mod tests { ]); let node1 = ClusterInfo::new(all_members, "S1"); - let peers = node1.peers(); + let peers = node1.peers_addrs(); let node1_id = node1.self_id(); let node1_url = node1.self_address(); assert!(!peers.contains_key(&node1_id)); diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index e0a33ca9a..3f31c3eb7 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -18,9 +18,12 @@ pub(crate) use self::proto::{ RedirectData, WaitSyncError as PbWaitSyncErrorOuter, }, messagepb::{ - fetch_read_state_response::ReadState, protocol_server::Protocol, AppendEntriesRequest, - AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest, - FetchReadStateResponse, IdSet, InstallSnapshotRequest, InstallSnapshotResponse, + fetch_read_state_response::ReadState, + propose_conf_change_request::{ConfChange, ConfChangeType}, + protocol_server::Protocol, + AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, + FetchReadStateRequest, FetchReadStateResponse, IdSet, InstallSnapshotRequest, + InstallSnapshotResponse, ProposeConfChangeRequest, ProposeConfChangeResponse, ShutdownRequest, ShutdownResponse, VoteRequest, VoteResponse, }, }; @@ -441,3 +444,43 @@ impl ShutdownRequest { Self { id: id.into() } } } + +#[allow(dead_code)] // TODO: remove this when we implement conf change +#[allow(clippy::as_conversions)] // ConfChangeType is so small that it won't exceed the range of i32 type. +impl ConfChange { + /// Create a new `ConfChange` to add a node + pub(crate) fn add(node_id: ServerId, address: String) -> Self { + Self { + change_type: ConfChangeType::Add as i32, + node_id, + address: vec![address], + } + } + + /// Create a new `ConfChange` to remove a node + pub(crate) fn remove(node_id: ServerId) -> Self { + Self { + change_type: ConfChangeType::Remove as i32, + node_id, + address: vec![], + } + } + + /// Create a new `ConfChange` to update a node + pub(crate) fn update(node_id: ServerId, address: String) -> Self { + Self { + change_type: ConfChangeType::Update as i32, + node_id, + address: vec![address], + } + } + + /// Create a new `ConfChange` to add a learner node + pub(crate) fn add_learner(node_id: ServerId, address: String) -> Self { + Self { + change_type: ConfChangeType::AddLearner as i32, + node_id, + address: vec![address], + } + } +} diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index e1755ebda..a9324560f 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -36,8 +36,9 @@ use crate::{ self, connect::ConnectApi, AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, FetchLeaderRequest, FetchLeaderResponse, FetchReadStateRequest, FetchReadStateResponse, InstallSnapshotRequest, - InstallSnapshotResponse, ProposeRequest, ProposeResponse, ShutdownRequest, - ShutdownResponse, VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, + InstallSnapshotResponse, ProposeConfChangeRequest, ProposeConfChangeResponse, + ProposeRequest, ProposeResponse, ShutdownRequest, ShutdownResponse, VoteRequest, + VoteResponse, WaitSyncedRequest, WaitSyncedResponse, }, server::{cmd_worker::CEEventTxApi, raw_curp::SyncAction, storage::db::DB}, snapshot::{Snapshot, SnapshotMeta}, @@ -120,7 +121,7 @@ pub(super) struct CurpNode { // handlers impl CurpNode { - /// Handle "propose" requests + /// Handle `Propose` requests pub(super) async fn propose(&self, req: ProposeRequest) -> Result { if self.curp.is_shutdown() { return Ok(ProposeResponse::new_error(None, 0, ProposeError::Shutdown)); @@ -141,7 +142,7 @@ impl CurpNode { Ok(resp) } - /// Handle `propose_shutdown` requests + /// Handle `Shutdown` requests pub(super) async fn shutdown( &self, request: ShutdownRequest, @@ -161,6 +162,15 @@ impl CurpNode { Ok(resp) } + /// Handle `ProposeConfChange` requests + #[allow(clippy::todo, clippy::unused_async)] // TODO: this method will be implemented in #437 + pub(super) async fn propose_conf_change( + &self, + _req: ProposeConfChangeRequest, + ) -> Result { + todo!("propose_conf_change") + } + /// Handle `AppendEntries` requests pub(super) fn append_entries( &self, @@ -203,7 +213,7 @@ impl CurpNode { Ok(resp) } - /// handle "wait synced" request + /// handle `WaitSynced` requests pub(super) async fn wait_synced( &self, req: WaitSyncedRequest, @@ -223,7 +233,7 @@ impl CurpNode { Ok(resp) } - /// Handle fetch leader requests + /// Handle `FetchLeader` requests #[allow(clippy::unnecessary_wraps, clippy::needless_pass_by_value)] // To keep type consistent with other request handlers pub(super) fn fetch_leader( &self, @@ -233,18 +243,18 @@ impl CurpNode { Ok(FetchLeaderResponse::new(leader_id, term)) } - /// Handle fetch cluster requests + /// Handle `FetchCluster` requests #[allow(clippy::unnecessary_wraps, clippy::needless_pass_by_value)] // To keep type consistent with other request handlers pub(super) fn fetch_cluster( &self, _req: FetchClusterRequest, ) -> Result { let (leader_id, term) = self.curp.leader(); - let all_members = self.curp.cluster().all_members(); + let all_members = self.curp.cluster().all_members_addrs(); Ok(FetchClusterResponse::new(leader_id, all_members, term)) } - /// Install snapshot + /// Handle `InstallSnapshot` stream #[allow(clippy::integer_arithmetic)] // can't overflow pub(super) async fn install_snapshot( &self, @@ -310,7 +320,7 @@ impl CurpNode { )) } - /// Handle fetch read state requests + /// Handle `FetchReadState` requests #[allow(clippy::needless_pass_by_value)] // To keep type consistent with other request handlers pub(super) fn fetch_read_state( &self, @@ -615,7 +625,7 @@ impl CurpNode { log_rx: tokio::sync::mpsc::UnboundedReceiver>>, ) { let shutdown_listener = shutdown_trigger.subscribe(); - let connects = rpc::connect(cluster_info.peers()) + let connects = rpc::connect(cluster_info.peers_addrs()) .await .collect::>(); let _election_task = tokio::spawn(Self::election_task( @@ -623,7 +633,7 @@ impl CurpNode { connects.clone(), shutdown_trigger.subscribe(), )); - let _big_daemon = tokio::spawn(Self::sync_followers_daemon( + let _sync_followers_daemon = tokio::spawn(Self::sync_followers_daemon( Arc::clone(&curp), connects, shutdown_trigger, diff --git a/curp/src/server/mod.rs b/curp/src/server/mod.rs index 87b773edc..f664189e3 100644 --- a/curp/src/server/mod.rs +++ b/curp/src/server/mod.rs @@ -21,9 +21,10 @@ use crate::{ rpc::{ AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, FetchLeaderRequest, FetchLeaderResponse, FetchReadStateRequest, FetchReadStateResponse, - InstallSnapshotRequest, InstallSnapshotResponse, ProposeRequest, ProposeResponse, - ProtocolServer, ShutdownRequest, ShutdownResponse, VoteRequest, VoteResponse, - WaitSyncedRequest, WaitSyncedResponse, + InstallSnapshotRequest, InstallSnapshotResponse, ProposeConfChangeRequest, + ProposeConfChangeResponse, ProposeRequest, ProposeResponse, ProtocolServer, + ShutdownRequest, ShutdownResponse, VoteRequest, VoteResponse, WaitSyncedRequest, + WaitSyncedResponse, }, }; @@ -84,6 +85,17 @@ impl crate::rpc::Protocol for Rp )) } + #[instrument(skip_all, name = "curp_propose_conf_change")] + async fn propose_conf_change( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + request.metadata().extract_span(); + Ok(tonic::Response::new( + self.inner.propose_conf_change(request.into_inner()).await?, + )) + } + #[instrument(skip_all, name = "curp_wait_synced")] async fn wait_synced( &self, diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index 4c3529449..3d4041c13 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -11,7 +11,7 @@ use std::{ cmp::min, - collections::HashMap, + collections::{HashMap, HashSet}, fmt::Debug, sync::{ atomic::{AtomicU8, Ordering}, @@ -20,6 +20,7 @@ use std::{ }; use clippy_utilities::NumericCast; +use dashmap::DashMap; use event_listener::Event; use itertools::Itertools; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; @@ -41,11 +42,11 @@ use self::{ use super::cmd_worker::CEEventTxApi; use crate::{ cmd::{Command, ProposeId}, - error::ProposeError, + error::{ApplyConfChangeError, ProposeError}, log_entry::LogEntry, - members::{ClusterInfo, ServerId}, + members::{ClusterInfo, Member, ServerId}, role_change::RoleChange, - rpc::{IdSet, ReadState}, + rpc::{ConfChange, ConfChangeType, IdSet, ReadState}, server::{cmd_board::CmdBoardRef, raw_curp::state::VoteResult, spec_pool::SpecPoolRef}, snapshot::{Snapshot, SnapshotMeta}, LogIndex, @@ -147,22 +148,27 @@ struct Context { /// Tx to send cmds to execute and do after sync cmd_tx: Arc>, /// Followers sync event trigger - sync_events: HashMap>, + sync_events: DashMap>, /// Become leader event leader_event: Arc, /// Leader change callback role_change: RC, } -impl Debug for Context { +impl Debug for Context { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Context") - .field("id", &self.cluster_info) - .field("config", &self.cfg) + .field("cluster_info", &self.cluster_info) + .field("cfg", &self.cfg) .field("cb", &self.cb) .field("sp", &self.sp) + .field("ucp", &self.ucp) .field("leader_tx", &self.leader_tx) .field("election_tick", &self.election_tick) + .field("cmd_tx", &"CEEventTxApi") + .field("sync_events", &self.sync_events) + .field("leader_event", &self.leader_event) + .field("role_change", &self.role_change) .finish() } } @@ -258,10 +264,10 @@ impl RawCurp { self.ctx.cmd_tx.send_sp_exe(entry); } - self.ctx.sync_events.iter().for_each(|(id, event)| { - let next = self.lst.get_next_index(*id); + self.ctx.sync_events.iter().for_each(|e| { + let next = self.lst.get_next_index(*e.key()); if next > log_w.base_index && log_w.has_next_batch(next) { - event.notify(1); + e.notify(1); } }); @@ -307,10 +313,10 @@ impl RawCurp { }; let index = entry.index; - self.ctx.sync_events.iter().for_each(|(id, event)| { - let next = self.lst.get_next_index(*id); + self.ctx.sync_events.iter().for_each(|pair| { + let next = self.lst.get_next_index(*pair.key()); if next > log_w.base_index && log_w.has_next_batch(next) { - event.notify(1); + pair.notify(1); } }); @@ -554,7 +560,7 @@ impl RawCurp { // if some entries are recovered, sync with followers immediately self.ctx .sync_events - .values() + .iter() .for_each(|event| event.notify(1)); } @@ -635,7 +641,7 @@ impl RawCurp { uncommitted_pool: UncommittedPoolRef, cfg: Arc, cmd_tx: Arc>, - sync_events: HashMap>, + sync_events: DashMap>, log_tx: mpsc::UnboundedSender>>, role_change: RC, shutdown_trigger: shutdown::Trigger, @@ -650,12 +656,7 @@ impl RawCurp { cfg.candidate_timeout_ticks, )), lst: LeaderState::new(&cluster_info.peers_ids()), - cst: Mutex::new(CandidateState::new( - cluster_info - .peers_ids() - .into_iter() - .chain([cluster_info.self_id()]), - )), + cst: Mutex::new(CandidateState::new(cluster_info.all_ids().into_iter())), log: RwLock::new(Log::new(log_tx, cfg.batch_max_size, cfg.log_entries_cap)), ctx: Context { cluster_info, @@ -690,7 +691,7 @@ impl RawCurp { uncommitted_pool: UncommittedPoolRef, cfg: &Arc, cmd_tx: Arc>, - sync_event: HashMap>, + sync_event: DashMap>, log_tx: mpsc::UnboundedSender>>, voted_for: Option<(u64, ServerId)>, entries: Vec>, @@ -840,7 +841,8 @@ impl RawCurp { self.ctx .sync_events .get(&id) - .unwrap_or_else(|| unreachable!("server id {id} not found")), + .unwrap_or_else(|| unreachable!("server id {id} not found")) + .value(), ) } @@ -866,6 +868,103 @@ impl RawCurp { let leader_commit_index = log_r.commit_index; self.lst.check_all(|f| f.match_index == leader_commit_index) } + + /// Apply conf changes and return true if self node is removed + #[allow(unused)] // TODO: remove this when we implement conf change + pub(super) fn apply_conf_change( + &self, + changes: Vec, + ) -> Result { + assert_eq!(changes.len(), 1, "Joint consensus is not supported yet"); + let Some(conf_change) = changes.into_iter().next() else { + unreachable!("conf change is empty"); + }; + + self.check_new_config(&conf_change)?; + + Ok(self.switch_config(conf_change)) + } + + /// Check if the new config is valid + #[allow(clippy::unimplemented)] // TODO: remove this when we implement conf change + fn check_new_config(&self, conf_change: &ConfChange) -> Result<(), ApplyConfChangeError> { + let mut statuses_ids = self + .lst + .get_all_statuses() + .keys() + .copied() + .chain([self.id()]) + .collect::>(); + let mut config = self.cst.map_lock(|cst_l| cst_l.config.clone()); + let conf_change_type = + ConfChangeType::from_i32(conf_change.change_type).unwrap_or_else(|| { + unreachable!("conf change type {} should valid", conf_change.change_type) + }); + let node_id = conf_change.node_id; + match conf_change_type { + ConfChangeType::Add => { + if !statuses_ids.insert(node_id) || !config.insert(node_id) { + return Err(ApplyConfChangeError::NodeAlreadyExists(node_id)); + } + } + ConfChangeType::Remove => { + if !statuses_ids.remove(&node_id) || !config.remove(node_id) { + return Err(ApplyConfChangeError::NodeNotExists(node_id)); + } + } + ConfChangeType::Update => { + if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) { + return Err(ApplyConfChangeError::NodeNotExists(node_id)); + } + } + ConfChangeType::AddLearner => { + unimplemented!("learner node is not supported yet"); + } + } + if statuses_ids.len() < 3 || config.voters() != &statuses_ids { + return Err(ApplyConfChangeError::InvalidConfig); + } + Ok(()) + } + + /// Switch to a new config and return true if self node is removed + #[allow(clippy::unimplemented)] // TODO: remove unimplemented when learner node is supported + #[allow(clippy::unwrap_used)] // TODO: refactor this when multi-address is supported + fn switch_config(&self, mut conf_change: ConfChange) -> bool { + let node_id = conf_change.node_id; + let conf_change_type = + ConfChangeType::from_i32(conf_change.change_type).unwrap_or_else(|| { + unreachable!("conf change type {} should valid", conf_change.change_type) + }); + match conf_change_type { + ConfChangeType::Add => { + let member = Member::new(node_id, "", conf_change.address.pop().unwrap()); + self.cst + .map_lock(|mut cst_l| _ = cst_l.config.insert(node_id)); + self.lst.insert(node_id); + _ = self.ctx.sync_events.insert(node_id, Arc::new(Event::new())); + self.ctx.cluster_info.insert(member); + false + } + ConfChangeType::Remove => { + self.cst + .map_lock(|mut cst_l| _ = cst_l.config.remove(node_id)); + self.lst.remove(node_id); + _ = self.ctx.sync_events.remove(&node_id); + self.ctx.cluster_info.remove(&node_id); + node_id == self.id() + } + ConfChangeType::Update => { + self.ctx + .cluster_info + .update(&node_id, conf_change.address.pop().unwrap()); + false + } + ConfChangeType::AddLearner => { + unimplemented!("learner node is not supported yet"); + } + } + } } // Utils @@ -1078,11 +1177,8 @@ impl RawCurp { /// When leader retires, it should reset state fn leader_retires(&self) { debug!("leader {} retires", self.id()); - - let mut cb_w = self.ctx.cb.write(); - cb_w.clear(); - let mut ucp_l = self.ctx.ucp.lock(); - ucp_l.clear(); + self.ctx.cb.write().clear(); + self.ctx.ucp.lock().clear(); } } diff --git a/curp/src/server/raw_curp/state.rs b/curp/src/server/raw_curp/state.rs index cd8cecb29..3c651bfb8 100644 --- a/curp/src/server/raw_curp/state.rs +++ b/curp/src/server/raw_curp/state.rs @@ -51,7 +51,7 @@ pub(super) struct CandidateState { } /// Status of a follower -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub(super) struct FollowerStatus { /// Index of the next log entry to send to that follower pub(super) next_index: LogIndex, @@ -59,6 +59,15 @@ pub(super) struct FollowerStatus { pub(super) match_index: LogIndex, } +impl Default for FollowerStatus { + fn default() -> Self { + Self { + next_index: 1, + match_index: 0, + } + } +} + /// Additional state for the leader, all volatile #[derive(Debug)] pub(super) struct LeaderState { @@ -106,19 +115,29 @@ impl LeaderState { Self { statuses: others .iter() - .map(|id| { - ( - *id, - FollowerStatus { - next_index: 1, - match_index: 0, - }, - ) - }) + .map(|o| (*o, FollowerStatus::default())) .collect(), } } + /// Get statuses for all servers + pub(super) fn get_all_statuses(&self) -> HashMap { + self.statuses + .iter() + .map(|e| (*e.key(), *e.value())) + .collect() + } + + /// insert new status for id + pub(super) fn insert(&self, id: ServerId) { + _ = self.statuses.insert(id, FollowerStatus::default()); + } + + /// Remove a status + pub(super) fn remove(&self, id: ServerId) { + _ = self.statuses.remove(&id); + } + /// Get status for a server fn get_status(&self, id: ServerId) -> Ref<'_, u64, FollowerStatus> { self.statuses @@ -188,7 +207,7 @@ trait ClusterConfig { } /// `MajorityConfig` is a set of IDs that uses majority quorums to make decisions. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(super) struct MajorityConfig { /// The voters in the cluster voters: HashSet, @@ -202,6 +221,26 @@ impl MajorityConfig { } } + /// Get voters set + pub(super) fn voters(&self) -> &HashSet { + &self.voters + } + + /// Insert a voter + pub(super) fn insert(&mut self, id: ServerId) -> bool { + self.voters.insert(id) + } + + /// Remove a voter + pub(super) fn remove(&mut self, id: ServerId) -> bool { + self.voters.remove(&id) + } + + /// Check if a voter exists + pub(super) fn contains(&self, id: ServerId) -> bool { + self.voters.contains(&id) + } + /// Get quorum: the smallest number of servers who must be online for the cluster to work pub(super) fn quorum(&self) -> usize { self.voters.len() / 2 + 1 diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index 0d9633b17..2e65e1adf 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -29,6 +29,13 @@ impl RawCurp { self.st.read().role } + fn contains(&self, id: ServerId) -> bool { + self.cluster().all_members().contains_key(&id) + && self.ctx.sync_events.contains_key(&id) + && self.lst.get_all_statuses().contains_key(&id) + && self.cst.lock().config.voters().contains(&id) + } + pub(crate) fn commit_index(&self) -> LogIndex { self.log.read().commit_index } @@ -670,3 +677,105 @@ fn is_synced_should_return_true_when_followers_caught_up_with_leader() { curp.lst.update_match_index(s2_id, 3); assert!(curp.is_synced()); } + +#[traced_test] +#[test] +fn add_node_should_add_new_node_to_curp() { + let curp = { + let exe_tx = MockCEEventTxApi::::default(); + Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change())) + }; + let changes = vec![ConfChange::add(1, "http://127.0.0.1:4567".to_owned())]; + assert!(curp.apply_conf_change(changes).is_ok()); + assert!(curp.contains(1)); +} + +#[traced_test] +#[test] +fn add_exists_node_should_return_node_already_exists_error() { + let curp = { + let exe_tx = MockCEEventTxApi::::default(); + Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change())) + }; + let exists_node_id = curp.cluster().get_id_by_name("S1").unwrap(); + let changes = vec![ConfChange::add( + exists_node_id, + "http://127.0.0.1:4567".to_owned(), + )]; + let resp = curp.apply_conf_change(changes); + let error_match = matches!(resp, Err(ApplyConfChangeError::NodeAlreadyExists(_))); + assert!(error_match); +} + +#[traced_test] +#[test] +fn remove_node_should_remove_node_from_curp() { + let curp = { + let exe_tx = MockCEEventTxApi::::default(); + Arc::new(RawCurp::new_test(5, exe_tx, mock_role_change())) + }; + let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); + let changes = vec![ConfChange::remove(follower_id)]; + let resp = curp.apply_conf_change(changes); + assert!(resp.is_ok()); + assert!(!curp.contains(follower_id)); +} + +#[traced_test] +#[test] +fn apply_conf_change_shoulde_return_true_when_remove_self_node() { + let curp = { + let exe_tx = MockCEEventTxApi::::default(); + Arc::new(RawCurp::new_test(5, exe_tx, mock_role_change())) + }; + let self_id = curp.id(); + let changes = vec![ConfChange::remove(self_id)]; + let resp = curp.apply_conf_change(changes); + assert!(resp.is_ok_and(|b| b)); +} + +#[traced_test] +#[test] +fn remove_non_exists_node_should_return_node_not_exists_error() { + let curp = { + let exe_tx = MockCEEventTxApi::::default(); + Arc::new(RawCurp::new_test(5, exe_tx, mock_role_change())) + }; + let changes = vec![ConfChange::remove(1)]; + let resp = curp.apply_conf_change(changes); + assert!(matches!(resp, Err(ApplyConfChangeError::NodeNotExists(_)))); +} + +#[traced_test] +#[test] +fn remove_node_should_return_invalid_config_error_when_nodes_count_less_than_3() { + let curp = { + let exe_tx = MockCEEventTxApi::::default(); + Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change())) + }; + let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); + let changes = vec![ConfChange::remove(follower_id)]; + let resp = curp.apply_conf_change(changes); + assert!(matches!(resp, Err(ApplyConfChangeError::InvalidConfig))); +} + +#[traced_test] +#[test] +fn update_node_should_update_the_address_of_node() { + let curp = { + let exe_tx = MockCEEventTxApi::::default(); + Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change())) + }; + let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); + assert_eq!(curp.cluster().address(follower_id), Some("S1".to_owned())); + let changes = vec![ConfChange::update( + follower_id, + "http://127.0.0.1:4567".to_owned(), + )]; + let resp = curp.apply_conf_change(changes); + assert!(resp.is_ok()); + assert_eq!( + curp.cluster().address(follower_id), + Some("http://127.0.0.1:4567".to_owned()) + ); +} diff --git a/curp/tests/common/curp_group.rs b/curp/tests/common/curp_group.rs index efa296032..6ed5bf03f 100644 --- a/curp/tests/common/curp_group.rs +++ b/curp/tests/common/curp_group.rs @@ -118,7 +118,7 @@ impl CurpGroup { let ce = TestCE::new(name.clone(), exe_tx, as_tx, xline_storage_config); let cluster_info = Arc::new(ClusterInfo::new(all_members.clone(), &name)); - all = cluster_info.all_members(); + all = cluster_info.all_members_addrs(); let id = cluster_info.self_id(); let role_change_cb = TestRoleChange::default(); diff --git a/simulation/src/curp_group.rs b/simulation/src/curp_group.rs index 45ccc27bc..7f13982a1 100644 --- a/simulation/src/curp_group.rs +++ b/simulation/src/curp_group.rs @@ -75,7 +75,7 @@ impl CurpGroup { let store = Arc::new(Mutex::new(None)); let cluster_info = Arc::new(ClusterInfo::new(all.clone(), &name)); - all_members = cluster_info.all_members(); + all_members = cluster_info.all_members_addrs(); let id = cluster_info.self_id(); let storage_cfg = StorageConfig::RocksDB(storage_path.clone()); let store_c = Arc::clone(&store); diff --git a/xline/src/server/lease_server.rs b/xline/src/server/lease_server.rs index 44b9e3c8c..68e9e7df6 100644 --- a/xline/src/server/lease_server.rs +++ b/xline/src/server/lease_server.rs @@ -246,7 +246,7 @@ where ) }); break self - .follower_keep_alive(request_stream, leader_addr) + .follower_keep_alive(request_stream, &leader_addr) .await?; } }; diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 2e4fd88f8..c6da6e79e 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -326,7 +326,7 @@ impl XlineServer { CurpClient::builder() .local_server_id(self.cluster_info.self_id()) .config(self.client_config) - .build_from_all_members(self.cluster_info.all_members()) + .build_from_all_members(self.cluster_info.all_members_addrs()) .await?, ); @@ -366,13 +366,13 @@ impl XlineServer { id_barrier, *self.server_timeout.range_retry_timeout(), Arc::clone(&client), - self.cluster_info.self_name().to_owned(), + self.cluster_info.self_name(), ), LockServer::new( Arc::clone(&client), Arc::clone(&id_gen), - self.cluster_info.self_name().to_owned(), - self.cluster_info.self_address().to_owned(), + self.cluster_info.self_name(), + self.cluster_info.self_address(), ), LeaseServer::new( lease_storage, @@ -381,11 +381,7 @@ impl XlineServer { id_gen, Arc::clone(&self.cluster_info), ), - AuthServer::new( - auth_storage, - client, - self.cluster_info.self_name().to_owned(), - ), + AuthServer::new(auth_storage, client, self.cluster_info.self_name()), WatchServer::new( watcher, Arc::clone(&header_gen),