diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index dc9fd02f..74b09a5a 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -4333,6 +4333,58 @@ fn test_prevote_with_split_vote() { assert_eq!(network.peers[&3].state, StateRole::Follower, "peer 3 state",); } +/// TestPreVoteWithJudgeSplitPreVote verifies that during split vote, cluster can finish campaign +/// by letting raft node with larger hash value to finish campaign. +#[test] +fn test_prevote_with_judge_split_prevote() { + let l = default_logger(); + for cnt in &[3, 5] { + let peers = (1..=*cnt).map(|id| { + let mut config = new_test_config(id, 10, 1); + config.pre_vote = true; + config.judge_split_prevote = true; + let storage = new_storage(); + storage.initialize_with_conf_state(((1..=*cnt).collect::>(), vec![])); + let mut raft = new_test_raft_with_config(&config, storage, &l); + raft.become_follower(1, INVALID_ID); + Some(raft) + }); + let mut network = Network::new(peers.collect(), &l); + network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + // simulate leader down. followers start split vote. + network.isolate(1); + let msgs = (2..=*cnt) + .map(|id| new_message(id, id, MessageType::MsgHup, 0)) + .collect(); + network.send(msgs); + + // check whether the term values are expected + for id in 2..=*cnt { + assert_eq!(network.peers[&id].term, 3, "[{}] peer {} term", cnt, id); + } + + let mut hashes: Vec<_> = (2..=*cnt).map(|id| (fxhash::hash64(&id), id)).collect(); + hashes.sort_unstable(); + for (_, id) in &hashes[..hashes.len() - 1] { + assert_eq!( + network.peers[&id].state, + StateRole::Follower, + "[{}] peer {} state", + cnt, + id + ); + } + assert_eq!( + network.peers[&hashes[hashes.len() - 1].1].state, + StateRole::Leader, + "[{}] peer {} state", + cnt, + cnt + ); + } +} + // ensure that after a node become pre-candidate, it will checkQuorum correctly. #[test] fn test_prevote_with_check_quorum() { @@ -5292,6 +5344,14 @@ fn test_group_commit_consistent() { /// of the election with both priority and log. #[test] fn test_election_with_priority_log() { + let entry_set = vec![ + ( + "large term", + vec![new_entry(2, 1, SOME_DATA)], + vec![new_entry(1, 1, SOME_DATA)], + ), + ("large index", vec![new_entry(1, 1, SOME_DATA)], vec![]), + ]; let tests = vec![ // log is up to date or not 1..3, priority 1..3, id, state (true, false, false, 3, 1, 1, 1, StateRole::Leader), @@ -5307,30 +5367,37 @@ fn test_election_with_priority_log() { (false, false, true, 1, 1, 3, 1, StateRole::Leader), ]; - for (_i, &(l1, l2, l3, p1, p2, p3, id, state)) in tests.iter().enumerate() { - let l = default_logger(); - let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); - let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l); - let mut n3 = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), &l); - n1.set_priority(p1); - n2.set_priority(p2); - n3.set_priority(p3); - let entries = vec![new_entry(1, 1, SOME_DATA), new_entry(1, 1, SOME_DATA)]; - if l1 { - n1.raft_log.append(&entries); - } - if l2 { - n2.raft_log.append(&entries); - } - if l3 { - n3.raft_log.append(&entries); - } + for (tag, latest_entries, outdated_entries) in entry_set { + for (_i, &(l1, l2, l3, p1, p2, p3, id, state)) in tests.iter().enumerate() { + let l = default_logger(); + let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); + let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l); + let mut n3 = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), &l); + n1.set_priority(p1); + n2.set_priority(p2); + n3.set_priority(p3); + if l1 { + n1.raft_log.append(&latest_entries); + } else { + n1.raft_log.append(&outdated_entries); + } + if l2 { + n2.raft_log.append(&latest_entries); + } else { + n2.raft_log.append(&outdated_entries); + } + if l3 { + n3.raft_log.append(&latest_entries); + } else { + n3.raft_log.append(&outdated_entries); + } - let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l); + let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l); - network.send(vec![new_message(id, id, MessageType::MsgHup, 0)]); + network.send(vec![new_message(id, id, MessageType::MsgHup, 0)]); - assert_eq!(network.peers[&id].state, state); + assert_eq!(network.peers[&id].state, state, "{}", tag); + } } } diff --git a/src/config.rs b/src/config.rs index 392540db..f56c701a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -67,6 +67,10 @@ pub struct Config { /// rejoins the cluster. pub pre_vote: bool, + /// If split vote happens, only vote for the large id. This can save a round of campaign + /// when combined with `pre_vote`. + pub judge_split_prevote: bool, + /// The range of election timeout. In some cases, we hope some nodes has less possibility /// to become leader. This configuration ensures that the randomized election_timeout /// will always be suit in [min_election_tick, max_election_tick). @@ -112,6 +116,7 @@ impl Default for Config { max_inflight_msgs: 256, check_quorum: false, pre_vote: false, + judge_split_prevote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: ReadOnlyOption::Safe, diff --git a/src/raft.rs b/src/raft.rs index fe397ed1..cbfc5710 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp; +use std::cmp::{self, Ordering}; use std::ops::{Deref, DerefMut}; use crate::eraftpb::{ @@ -238,6 +238,7 @@ pub struct RaftCore { skip_bcast_commit: bool, batch_append: bool, + judge_split_prevote: bool, heartbeat_timeout: usize, election_timeout: usize, @@ -337,6 +338,7 @@ impl Raft { promotable: false, check_quorum: c.check_quorum, pre_vote: c.pre_vote, + judge_split_prevote: c.judge_split_prevote, read_only: ReadOnly::new(c.read_only_option), heartbeat_timeout: c.heartbeat_tick, election_timeout: c.election_tick, @@ -1344,6 +1346,18 @@ impl Raft { "msg type" => ?m.get_msg_type(), ); + // When judge_split_prevote, reject explicitly to make candidate exit PreCandiate early + // so it will vote for other peer later. + if self.judge_split_prevote + && m.get_msg_type() == MessageType::MsgRequestPreVote + { + let mut to_send = + new_message(m.from, MessageType::MsgRequestPreVoteResponse, None); + to_send.term = m.term; + to_send.reject = true; + self.r.send(to_send, &mut self.msgs); + } + return Ok(()); } } @@ -1455,10 +1469,7 @@ impl Raft { // ...or this is a PreVote for a future term... (m.get_msg_type() == MessageType::MsgRequestPreVote && m.term > self.term); // ...and we believe the candidate is up to date. - if can_vote - && self.raft_log.is_up_to_date(m.index, m.log_term) - && (m.index > self.raft_log.last_index() || self.priority <= m.priority) - { + if can_vote && self.may_vote(&m) { // When responding to Msg{Pre,}Vote messages we include the term // from the message, not the local term. To see why consider the // case where a single node was previously partitioned away and @@ -1479,6 +1490,10 @@ impl Raft { self.election_elapsed = 0; self.vote = m.from; } + // This means it's in split vote, give up election. + if self.judge_split_prevote && self.state == StateRole::PreCandidate { + self.become_follower(self.term, INVALID_ID); + } } else { self.log_vote_reject(&m); let mut to_send = @@ -1501,6 +1516,40 @@ impl Raft { Ok(()) } + /// Checks if this node may vote for the request. It may vote when from node + /// contains the same logs or more logs. When they contains same logs, priority + /// is considered. If priorities are still the same, and this node is also + /// starting campaign, then split vote happens. In this case, if `judge_split_prevote` + /// and `pre_vote` are enabled, it will vote only when from node has greater ID. + fn may_vote(&self, m: &Message) -> bool { + match self.raft_log.is_up_to_date(m.index, m.log_term) { + Ordering::Greater => true, + Ordering::Equal => { + match self.priority.cmp(&m.priority) { + Ordering::Greater => false, + Ordering::Equal => { + // judge split vote can break symmetry of campaign, but as + // it only happens during split vote, the impact should not + // be significant. + // Transfering leader skips prevote, so they won't have impact + // on the other. + !self.judge_split_prevote + || self.state != StateRole::PreCandidate + || m.get_msg_type() != MessageType::MsgRequestPreVote + || { + let from_id = m.from; + let (my_h, from_h) = + (fxhash::hash64(&self.id), fxhash::hash64(&from_id)); + my_h < from_h || (my_h == from_h && self.id < from_id) + } + } + Ordering::Less => true, + } + } + Ordering::Less => false, + } + } + fn hup(&mut self, transfer_leader: bool) { if self.state == StateRole::Leader { debug!( diff --git a/src/raft_log.rs b/src/raft_log.rs index 67047af1..f2e88127 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp; +use std::cmp::{self, Ordering}; use slog::warn; use slog::Logger; @@ -415,8 +415,11 @@ impl RaftLog { /// later term is more up-to-date. If the logs end with the same term, then /// whichever log has the larger last_index is more up-to-date. If the logs are /// the same, the given log is up-to-date. - pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool { - term > self.last_term() || (term == self.last_term() && last_index >= self.last_index()) + /// Returns `Ordering::Greater` means given log is more up-to-date, `Ordering::Equal` + /// means they have the same logs, `Ordering::Less` means existing logs is more + /// up-to-date. + pub fn is_up_to_date(&self, last_index: u64, term: u64) -> Ordering { + (term, last_index).cmp(&(self.last_term(), self.last_index())) } /// Returns committed and persisted entries since max(`since_idx` + 1, first_index). @@ -663,7 +666,7 @@ impl RaftLog { #[cfg(test)] mod test { use std::{ - cmp, + cmp::{self, Ordering}, panic::{self, AssertUnwindSafe}, }; @@ -756,22 +759,25 @@ mod test { raft_log.append(&previous_ents); let tests = vec![ // greater term, ignore lastIndex - (raft_log.last_index() - 1, 4, true), - (raft_log.last_index(), 4, true), - (raft_log.last_index() + 1, 4, true), + (raft_log.last_index() - 1, 4, Ordering::Greater), + (raft_log.last_index(), 4, Ordering::Greater), + (raft_log.last_index() + 1, 4, Ordering::Greater), // smaller term, ignore lastIndex - (raft_log.last_index() - 1, 2, false), - (raft_log.last_index(), 2, false), - (raft_log.last_index() + 1, 2, false), + (raft_log.last_index() - 1, 2, Ordering::Less), + (raft_log.last_index(), 2, Ordering::Less), + (raft_log.last_index() + 1, 2, Ordering::Less), // equal term, lager lastIndex wins - (raft_log.last_index() - 1, 3, false), - (raft_log.last_index(), 3, true), - (raft_log.last_index() + 1, 3, true), + (raft_log.last_index() - 1, 3, Ordering::Less), + (raft_log.last_index(), 3, Ordering::Equal), + (raft_log.last_index() + 1, 3, Ordering::Greater), ]; for (i, &(last_index, term, up_to_date)) in tests.iter().enumerate() { let g_up_to_date = raft_log.is_up_to_date(last_index, term); if g_up_to_date != up_to_date { - panic!("#{}: uptodate = {}, want {}", i, g_up_to_date, up_to_date); + panic!( + "#{}: uptodate = {:?}, want {:?}", + i, g_up_to_date, up_to_date + ); } } }