Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add apply_conf_change method for RawCurp #433

Merged
merged 4 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions curp/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,46 @@ message ShutdownResponse {
optional bytes error = 3;
}

message ProposeConfChangeRequest {
enum ConfChangeType {
Add = 0;
AddLearner = 1;
Remove = 2;
Update = 3;
}
message ConfChange {
ConfChangeType change_type = 1;
uint64 node_id = 2;
repeated string address = 3;
}
string id = 1;
repeated ConfChange changes = 2;
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
}

message Member {
uint64 id = 1;
string name = 2;
string addrs = 3;
bool is_learner = 4;
};

message ProposeConfChangeResponse {
enum ConfChangeError {
InvalidConfig = 0;
NodeNotExists = 1;
NodeAlreadyExists = 2;
}
optional ConfChangeError error = 1;
repeated Member members = 2;
}


service Protocol {
rpc Propose(commandpb.ProposeRequest) returns (commandpb.ProposeResponse);
rpc WaitSynced(commandpb.WaitSyncedRequest)
returns (commandpb.WaitSyncedResponse);
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse);
rpc ProposeConfChange (ProposeConfChangeRequest) returns (ProposeConfChangeResponse);
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse);
rpc Vote(VoteRequest) returns (VoteResponse);
rpc InstallSnapshot(stream InstallSnapshotRequest)
Expand Down
16 changes: 16 additions & 0 deletions curp/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ impl<C: Command> From<WaitSyncError> for CommandSyncError<C> {
}
}

/// Error type of `apply_conf_change`
#[derive(Error, Debug, Clone, Copy)]
#[allow(clippy::module_name_repetitions)] // this-error generate code false-positive
#[non_exhaustive]
pub enum ApplyConfChangeError {
/// Config is invalid after applying the conf change
#[error("invalid config")]
InvalidConfig,
/// The node to be removed does not exist
#[error("node {0} does not exist")]
NodeNotExists(ServerId),
/// The node to be added already exists
#[error("node {0} already exists")]
NodeAlreadyExists(ServerId),
}

#[cfg(test)]
mod test {
use curp_test_utils::test_cmd::{ExecuteError, TestCommand};
Expand Down
110 changes: 83 additions & 27 deletions curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::{
hash::Hasher,
};

use dashmap::{mapref::one::Ref, DashMap};
use itertools::Itertools;

/// Server Id
pub type ServerId = u64;

/// Cluster member
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Member {
/// Server id of the member
id: ServerId,
Expand All @@ -19,6 +20,26 @@ pub struct Member {
address: String,
}

/// Cluster member
impl Member {
/// Create a new `Member`
#[inline]
pub fn new(id: ServerId, name: impl Into<String>, address: impl Into<String>) -> Self {
Self {
id,
name: name.into(),
address: address.into(),
}
}

/// Get member id
#[must_use]
#[inline]
pub fn id(&self) -> ServerId {
self.id
}
}

/// cluster members information
#[derive(Debug)]
pub struct ClusterInfo {
Expand All @@ -27,7 +48,7 @@ pub struct ClusterInfo {
/// current server id
member_id: ServerId,
/// all members information
members: HashMap<ServerId, Member>,
members: DashMap<ServerId, Member>,
}

impl ClusterInfo {
Expand All @@ -36,10 +57,10 @@ impl ClusterInfo {
/// panic if `all_members` is empty
#[inline]
#[must_use]
pub fn new(all_members: HashMap<String, String>, self_name: &str) -> Self {
pub fn new(all_members_addrs: HashMap<String, String>, self_name: &str) -> Self {
let mut member_id = 0;
let mut members = HashMap::new();
for (name, address) in all_members {
let members = DashMap::new();
for (name, address) in all_members_addrs {
let id = Self::calculate_member_id(&address, "", None);
if name == self_name {
member_id = id;
Expand All @@ -57,47 +78,82 @@ impl ClusterInfo {
cluster_info
}

/// Get server address via server id
/// Get all members
#[must_use]
#[inline]
pub fn address(&self, id: ServerId) -> Option<&str> {
pub fn all_members(&self) -> HashMap<ServerId, Member> {
self.members
.iter()
.map(|t| (t.id, t.value().clone()))
.collect()
}

/// Insert a member
#[inline]
pub fn insert(&self, member: Member) {
_ = self.members.insert(member.id, member);
}

/// Remove a member
#[inline]
pub fn remove(&self, id: &ServerId) {
_ = self.members.remove(id);
}

/// Update a member
#[inline]
pub fn update(&self, id: &ServerId, address: impl Into<String>) {
self.members
.values()
.find(|t| t.id == id)
.map(|t| t.address.as_str())
.get_mut(id)
.unwrap_or_else(|| unreachable!("member {} not found", id))
.address = address.into();
}

/// Get server address via server id
#[must_use]
#[inline]
pub fn address(&self, id: ServerId) -> Option<String> {
self.members.get(&id).map(|t| t.address.clone())
}

/// Get the current member
#[allow(clippy::indexing_slicing)] // self member id must be in members
fn self_member(&self) -> &Member {
&self.members[&self.member_id]
#[allow(clippy::unwrap_used)] // self member id must be in members
fn self_member(&self) -> Ref<'_, u64, Member> {
self.members.get(&self.member_id).unwrap()
}

/// Get the current server address
#[must_use]
#[inline]
pub fn self_address(&self) -> &str {
&self.self_member().address
pub fn self_address(&self) -> String {
self.self_member().address.clone()
}

/// Get the current server id
#[must_use]
#[inline]
pub fn self_name(&self) -> &str {
&self.self_member().name
pub fn self_name(&self) -> String {
self.self_member().name.clone()
}

/// Get peers id
/// Get peers ids
#[must_use]
#[inline]
pub fn peers_ids(&self) -> Vec<ServerId> {
self.members
.values()
.iter()
.filter(|t| t.id != self.member_id)
.map(|t| t.id)
.collect()
}

/// Get all ids
#[must_use]
#[inline]
pub fn all_ids(&self) -> Vec<ServerId> {
self.members.iter().map(|t| t.id).collect()
}

/// Calculate the member id
fn calculate_member_id(address: &str, cluster_name: &str, timestamp: Option<u64>) -> ServerId {
let mut hasher = DefaultHasher::new();
Expand All @@ -112,8 +168,8 @@ impl ClusterInfo {
/// Calculate the cluster id
fn gen_cluster_id(&mut self) {
let mut hasher = DefaultHasher::new();
for id in self.members.keys().sorted() {
hasher.write_u64(*id);
for id in self.members.iter().map(|t| t.id).sorted() {
hasher.write_u64(id);
}
self.cluster_id = hasher.finish();
}
Expand All @@ -135,9 +191,9 @@ impl ClusterInfo {
/// Get peers
#[must_use]
#[inline]
pub fn peers(&self) -> HashMap<ServerId, String> {
pub fn peers_addrs(&self) -> HashMap<ServerId, String> {
self.members
.values()
.iter()
.filter(|t| t.id != self.member_id)
.map(|t| (t.id, t.address.clone()))
.collect()
Expand All @@ -146,9 +202,9 @@ impl ClusterInfo {
/// Get all members
#[must_use]
#[inline]
pub fn all_members(&self) -> HashMap<ServerId, String> {
pub fn all_members_addrs(&self) -> HashMap<ServerId, String> {
self.members
.values()
.iter()
.map(|t| (t.id, t.address.clone()))
.collect()
}
Expand All @@ -167,7 +223,7 @@ impl ClusterInfo {
pub fn get_id_by_name(&self, name: &str) -> Option<ServerId> {
self.members
.iter()
.find_map(|(_, m)| (m.name == name).then_some(m.id))
.find_map(|m| (m.name == name).then_some(m.id))
}
}

Expand Down Expand Up @@ -206,7 +262,7 @@ mod tests {
]);

let node1 = ClusterInfo::new(all_members, "S1");
let peers = node1.peers();
let peers = node1.peers_addrs();
let node1_id = node1.self_id();
let node1_url = node1.self_address();
assert!(!peers.contains_key(&node1_id));
Expand Down
49 changes: 46 additions & 3 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ pub(crate) use self::proto::{
RedirectData, WaitSyncError as PbWaitSyncErrorOuter,
},
messagepb::{
fetch_read_state_response::ReadState, protocol_server::Protocol, AppendEntriesRequest,
AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest,
FetchReadStateResponse, IdSet, InstallSnapshotRequest, InstallSnapshotResponse,
fetch_read_state_response::ReadState,
propose_conf_change_request::{ConfChange, ConfChangeType},
protocol_server::Protocol,
AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse,
FetchReadStateRequest, FetchReadStateResponse, IdSet, InstallSnapshotRequest,
InstallSnapshotResponse, ProposeConfChangeRequest, ProposeConfChangeResponse,
ShutdownRequest, ShutdownResponse, VoteRequest, VoteResponse,
},
};
Expand Down Expand Up @@ -441,3 +444,43 @@ impl ShutdownRequest {
Self { id: id.into() }
}
}

#[allow(dead_code)] // TODO: remove this when we implement conf change
#[allow(clippy::as_conversions)] // ConfChangeType is so small that it won't exceed the range of i32 type.
impl ConfChange {
/// Create a new `ConfChange` to add a node
pub(crate) fn add(node_id: ServerId, address: String) -> Self {
Self {
change_type: ConfChangeType::Add as i32,
node_id,
address: vec![address],
}
}

/// Create a new `ConfChange` to remove a node
pub(crate) fn remove(node_id: ServerId) -> Self {
Self {
change_type: ConfChangeType::Remove as i32,
node_id,
address: vec![],
}
}

/// Create a new `ConfChange` to update a node
pub(crate) fn update(node_id: ServerId, address: String) -> Self {
Self {
change_type: ConfChangeType::Update as i32,
node_id,
address: vec![address],
}
}

/// Create a new `ConfChange` to add a learner node
pub(crate) fn add_learner(node_id: ServerId, address: String) -> Self {
Self {
change_type: ConfChangeType::AddLearner as i32,
node_id,
address: vec![address],
}
}
}
Loading