Skip to content

Commit

Permalink
refactor:removed node auto shutdown by election
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Oct 30, 2023
1 parent f4aba73 commit d564794
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 67 deletions.
1 change: 1 addition & 0 deletions curp/proto/inner_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ message VoteResponse {
uint64 term = 1;
bool vote_granted = 2;
repeated bytes spec_pool = 3;
bool shutdown_candidate = 4;
}

message InstallSnapshotRequest {
Expand Down
5 changes: 5 additions & 0 deletions curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ impl ClusterInfo {
s.is_learner = true;
}
}

/// Check if cluster contains a node
pub(crate) fn contains(&self, node_id: ServerId) -> bool {
self.members.contains_key(&node_id)
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ impl VoteResponse {
.into_iter()
.map(|c| bincode::serialize(&c))
.collect::<bincode::Result<Vec<Vec<u8>>>>()?,
shutdown_candidate: false,
})
}

Expand All @@ -406,6 +407,17 @@ impl VoteResponse {
term,
vote_granted: false,
spec_pool: vec![],
shutdown_candidate: false,
}
}

/// Create a new shutdown vote response
pub(crate) fn new_shutdown() -> Self {
Self {
term: 0,
vote_granted: false,
spec_pool: vec![],
shutdown_candidate: true,
}
}

Expand Down
16 changes: 7 additions & 9 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
}
VoteResponse::new_accept(term, sp)?
}
Err(term) => VoteResponse::new_reject(term),
Err(Some(term)) => VoteResponse::new_reject(term),
Err(None) => VoteResponse::new_shutdown(),
};

Ok(resp)
Expand Down Expand Up @@ -664,7 +665,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
let id = connect.id();
let batch_timeout = curp.cfg().batch_timeout;
let mut is_shutdown_state = false;
let mut is_remove_state = false;

#[allow(clippy::integer_arithmetic)] // tokio select internal triggered
let leader_retired = loop {
Expand All @@ -684,7 +684,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
}
}
_ = remove_event.listen() => {
is_remove_state = true;
break false;
}
_now = ticker.tick() => {
hb_opt = false;
Expand All @@ -696,8 +696,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
}
}

let can_remove_node = is_remove_state && curp.can_remove_follower_after_hb(id);

let Some(sync_action) = curp.sync(id) else {
break true;
};
Expand All @@ -722,10 +720,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
if is_shutdown_state && is_empty && curp.is_synced() {
break false;
}
if can_remove_node {
curp.remove_node_status(id);
break false;
}
}
}
SyncAction::Snapshot(rx) => match rx.await {
Expand Down Expand Up @@ -922,6 +916,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
pin_mut!(resps);
while let Some((id, resp)) = resps.next().await {
if vote.is_pre_vote {
if resp.shutdown_candidate {
curp.shutdown_trigger().self_shutdown();
return None;
}
let result = curp.handle_pre_vote_resp(id, resp.term, resp.vote_granted);
match result {
Ok(None) => {}
Expand Down
82 changes: 43 additions & 39 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::{
debug, error,
log::{log_enabled, Level},
trace,
trace, warn,
};
use utils::{
config::CurpConfig,
Expand Down Expand Up @@ -475,14 +475,15 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {

/// Handle `vote`
/// Return `Ok(term, spec_pool)` if the vote is granted
/// Return `Err(term)` if the vote is rejected
/// Return `Err(Some(term))` if the vote is rejected
/// The `Err(None)` will never be returned here, just to keep the return type consistent with the `handle_pre_vote`
pub(super) fn handle_vote(
&self,
term: u64,
candidate_id: ServerId,
last_log_index: LogIndex,
last_log_term: u64,
) -> Result<(u64, Vec<PoolEntry<C>>), u64> {
) -> Result<(u64, Vec<PoolEntry<C>>), Option<u64>> {
debug!(
"{} received vote: term({}), last_log_index({}), last_log_term({}), id({})",
self.id(),
Expand All @@ -497,15 +498,15 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {

// calibrate term
if term < st_w.term {
return Err(st_w.term);
return Err(Some(st_w.term));
}
if term > st_w.term {
self.update_to_term_and_become_follower(&mut st_w, term);
}

// check self role
if !matches!(st_w.role, Role::Follower | Role::PreCandidate) {
return Err(st_w.term);
return Err(Some(st_w.term));
}

// check if voted before
Expand All @@ -514,12 +515,12 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
.as_ref()
.map_or(false, |id| id != &candidate_id)
{
return Err(st_w.term);
return Err(Some(st_w.term));
}

// check if the candidate's log is up-to-date
if !log_r.log_up_to_date(last_log_term, last_log_index) {
return Err(st_w.term);
return Err(Some(st_w.term));
}

// grant the vote
Expand All @@ -532,14 +533,15 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {

/// Handle `pre_vote`
/// Return `Ok(term, spec_pool)` if the vote is granted
/// Return `Err(term)` if the vote is rejected
/// Return `Err(Some(term))` if the vote is rejected
/// Return `Err(None)` if the candidate is removed from the cluster
pub(super) fn handle_pre_vote(
&self,
term: u64,
candidate_id: ServerId,
last_log_index: LogIndex,
last_log_term: u64,
) -> Result<(u64, Vec<PoolEntry<C>>), u64> {
) -> Result<(u64, Vec<PoolEntry<C>>), Option<u64>> {
debug!(
"{} received pre vote: term({}), last_log_index({}), last_log_term({}), id({})",
self.id(),
Expand All @@ -551,22 +553,38 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {

let st_r = self.st.read();
let log_r = self.log.read();
let contains_candidate = self.cluster().contains(candidate_id);
let remove_candidate_is_not_committed =
log_r
.fallback_contexts
.iter()
.any(|(_, ctx)| match ctx.origin_entry.entry_data {
EntryData::ConfChange(ref cc) => cc.changes().iter().any(|c| {
matches!(c.change_type(), ConfChangeType::Remove)
&& c.node_id == candidate_id
}),
EntryData::Empty(_) | EntryData::Command(_) | EntryData::Shutdown(_) => false,
});
// extra check to shutdown removed node
if !contains_candidate && !remove_candidate_is_not_committed {
return Err(None);
}

// calibrate term
if term < st_r.term {
return Err(st_r.term);
return Err(Some(st_r.term));
}
if term > st_r.term {
let timeout = st_r.follower_timeout_ticks;
if st_r.leader_id.is_some() && self.ctx.election_tick.load(Ordering::Acquire) < timeout
{
return Err(st_r.term);
return Err(Some(st_r.term));
}
}

// check if the candidate's log is up-to-date
if !log_r.log_up_to_date(last_log_term, last_log_index) {
return Err(st_r.term);
return Err(Some(st_r.term));
}

// grant the vote
Expand Down Expand Up @@ -867,24 +885,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
self.ctx.leader_tx.subscribe()
}

/// Check on the leader that the remove node conf change entry has been synced to the removed follower
pub(super) fn can_remove_follower_after_hb(&self, follower_id: ServerId) -> bool {
if self.ctx.connects.contains_key(&follower_id) {
return false;
}
let match_index = self.lst.get_match_index(follower_id);
let last_conf_change_idx = self.ctx.last_conf_change_idx.load(Ordering::Acquire);
if match_index >= last_conf_change_idx {
return true;
}
false
}

/// Remove a follower's status
pub(super) fn remove_node_status(&self, follower_id: ServerId) {
self.lst.remove(follower_id);
}

/// Get `append_entries` request for `follower_id` that contains the latest log entries
pub(super) fn sync(&self, follower_id: ServerId) -> Option<SyncAction<C>> {
let term = {
Expand All @@ -895,7 +895,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
lst_r.term
};

let next_index = self.lst.get_next_index(follower_id);
let Some(next_index) = self.lst.get_next_index(follower_id) else {
warn!("follower {} is not found, it maybe has been removed", follower_id);
return None;
};
let log_r = self.log.read();
if next_index <= log_r.base_index {
// the log has already been compacted
Expand Down Expand Up @@ -1050,7 +1053,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) {
return Err(ConfChangeError::NodeNotExists(()));
}
let learner_index = self.lst.get_match_index(node_id);
let learner_index = self
.lst
.get_match_index(node_id)
.unwrap_or_else(|| unreachable!("learner should exist here"));
let leader_index = self.log.read().last_log_index();
if leader_index.overflow_sub(learner_index) > MAX_PROMOTE_GAP {
return Err(ConfChangeError::LearnerNotCatchUp(()));
Expand Down Expand Up @@ -1481,12 +1487,9 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
(vec![], String::new(), is_learner)
}
ConfChangeType::Remove => {
// the removed follower need to commit the conf change entry, so when leader applies
// the conf change entry, it will not remove the follower status, and after the log
// entry is committed on the removed follower, leader will remove the follower status
// and stop the sync_follower_task.
self.cst
.map_lock(|mut cst_l| _ = cst_l.config.remove(node_id));
self.lst.remove(node_id);
let m = self.ctx.cluster_info.remove(&node_id);
_ = self.ctx.sync_events.remove(&node_id);
_ = self.ctx.connects.remove(&node_id);
Expand Down Expand Up @@ -1538,9 +1541,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
self.ctx.cmd_tx.send_sp_exe(entry);
}
self.ctx.sync_events.iter().for_each(|e| {
let next = self.lst.get_next_index(*e.key());
if next > log_w.base_index && log_w.has_next_batch(next) {
e.notify(1);
if let Some(next) = self.lst.get_next_index(*e.key()) {
if next > log_w.base_index && log_w.has_next_batch(next) {
e.notify(1);
}
}
});

Expand Down
33 changes: 18 additions & 15 deletions curp/src/server/raw_curp/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use dashmap::{
DashMap,
};
use madsim::rand::{thread_rng, Rng};
use tracing::debug;
use tracing::{debug, warn};

use super::Role;
use crate::{members::ServerId, server::PoolEntry, LogIndex};
Expand Down Expand Up @@ -155,17 +155,13 @@ impl LeaderState {
}

/// Get status for a server
fn get_status(&self, id: ServerId) -> Ref<'_, u64, FollowerStatus> {
self.statuses
.get(&id)
.unwrap_or_else(|| unreachable!("no status for {id}"))
fn get_status(&self, id: ServerId) -> Option<Ref<'_, u64, FollowerStatus>> {
self.statuses.get(&id)
}

/// Get status for a server
fn get_status_mut(&self, id: ServerId) -> RefMut<'_, u64, FollowerStatus> {
self.statuses
.get_mut(&id)
.unwrap_or_else(|| unreachable!("no status for {id}"))
fn get_status_mut(&self, id: ServerId) -> Option<RefMut<'_, u64, FollowerStatus>> {
self.statuses.get_mut(&id)
}

/// Check all followers by `f`
Expand All @@ -174,23 +170,30 @@ impl LeaderState {
}

/// Get `next_index` for server
pub(super) fn get_next_index(&self, id: ServerId) -> LogIndex {
self.get_status(id).next_index
pub(super) fn get_next_index(&self, id: ServerId) -> Option<LogIndex> {
self.get_status(id).map(|s| s.next_index)
}

/// Get `match_index` for server
pub(super) fn get_match_index(&self, id: ServerId) -> LogIndex {
self.get_status(id).match_index
pub(super) fn get_match_index(&self, id: ServerId) -> Option<LogIndex> {
self.get_status(id).map(|s| s.match_index)
}

/// Update `next_index` for server
pub(super) fn update_next_index(&self, id: ServerId, index: LogIndex) {
self.get_status_mut(id).next_index = index;
let Some(mut status) = self.get_status_mut(id) else {
warn!("follower {} is not found, it maybe has been removed", id);
return;
};
status.next_index = index;
}

/// Update `match_index` for server, will update `next_index` if possible
pub(super) fn update_match_index(&self, id: ServerId, index: LogIndex) {
let mut status = self.get_status_mut(id);
let Some(mut status) = self.get_status_mut(id) else {
warn!("follower {} is not found, it maybe has been removed", id);
return;
};
if status.match_index >= index {
return;
}
Expand Down
6 changes: 3 additions & 3 deletions curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ fn heartbeat_will_calibrate_next_index() {

let st_r = curp.st.read();
assert_eq!(st_r.term, 1);
assert_eq!(curp.lst.get_next_index(s1_id), 1);
assert_eq!(curp.lst.get_next_index(s1_id), Some(1));
}

#[traced_test]
Expand Down Expand Up @@ -463,7 +463,7 @@ fn handle_vote_will_reject_smaller_term() {

let s1_id = curp.cluster().get_id_by_name("S1").unwrap();
let result = curp.handle_vote(1, s1_id, 0, 0);
assert_eq!(result.unwrap_err(), 2);
assert_eq!(result.unwrap_err(), Some(2));
}

// #[traced_test]
Expand All @@ -489,7 +489,7 @@ fn handle_vote_will_reject_outdated_candidate() {
curp.st.write().leader_id = None;
let s1_id = curp.cluster().get_id_by_name("S1").unwrap();
let result = curp.handle_vote(3, s1_id, 0, 0);
assert_eq!(result.unwrap_err(), 3);
assert_eq!(result.unwrap_err(), Some(3));
}

#[traced_test]
Expand Down
Loading

0 comments on commit d564794

Please sign in to comment.