Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: introduce judge_split_prevote #420

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
109 changes: 88 additions & 21 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4331,6 +4331,58 @@ fn test_prevote_with_split_vote() {
assert_eq!(network.peers[&3].state, StateRole::Follower, "peer 3 state",);
}

/// TestPreVoteWithJudgeSplitPreVote verifies that during split vote, cluster can finish campaign
/// by letting raft node with larger hash value to finish campaign.
#[test]
fn test_prevote_with_judge_split_prevote() {
let l = default_logger();
for cnt in &[3, 5] {
let peers = (1..=*cnt).map(|id| {
let mut config = new_test_config(id, 10, 1);
config.pre_vote = true;
config.judge_split_prevote = true;
let storage = new_storage();
storage.initialize_with_conf_state(((1..=*cnt).collect::<Vec<_>>(), vec![]));
let mut raft = new_test_raft_with_config(&config, storage, &l);
raft.become_follower(1, INVALID_ID);
Some(raft)
});
let mut network = Network::new(peers.collect(), &l);
network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// simulate leader down. followers start split vote.
network.isolate(1);
let msgs = (2..=*cnt)
.map(|id| new_message(id, id, MessageType::MsgHup, 0))
.collect();
network.send(msgs);

// check whether the term values are expected
for id in 2..=*cnt {
assert_eq!(network.peers[&id].term, 3, "[{}] peer {} term", cnt, id);
}

let mut hashes: Vec<_> = (2..=*cnt).map(|id| (fxhash::hash64(&id), id)).collect();
hashes.sort();
for (_, id) in &hashes[..hashes.len() - 1] {
assert_eq!(
network.peers[&id].state,
StateRole::Follower,
"[{}] peer {} state",
cnt,
id
);
}
assert_eq!(
network.peers[&hashes[hashes.len() - 1].1].state,
StateRole::Leader,
"[{}] peer {} state",
cnt,
cnt
);
}
}

// ensure that after a node become pre-candidate, it will checkQuorum correctly.
#[test]
fn test_prevote_with_check_quorum() {
Expand Down Expand Up @@ -5290,6 +5342,14 @@ fn test_group_commit_consistent() {
/// of the election with both priority and log.
#[test]
fn test_election_with_priority_log() {
let entry_set = vec![
(
"large term",
vec![new_entry(2, 1, SOME_DATA)],
vec![new_entry(1, 1, SOME_DATA)],
),
("large index", vec![new_entry(1, 1, SOME_DATA)], vec![]),
];
let tests = vec![
// log is up to date or not 1..3, priority 1..3, id, state
(true, false, false, 3, 1, 1, 1, StateRole::Leader),
Expand All @@ -5305,30 +5365,37 @@ fn test_election_with_priority_log() {
(false, false, true, 1, 1, 3, 1, StateRole::Leader),
];

for (_i, &(l1, l2, l3, p1, p2, p3, id, state)) in tests.iter().enumerate() {
let l = default_logger();
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut n3 = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), &l);
n1.set_priority(p1);
n2.set_priority(p2);
n3.set_priority(p3);
let entries = vec![new_entry(1, 1, SOME_DATA), new_entry(1, 1, SOME_DATA)];
if l1 {
n1.raft_log.append(&entries);
}
if l2 {
n2.raft_log.append(&entries);
}
if l3 {
n3.raft_log.append(&entries);
}
for (tag, latest_entries, outdated_entries) in entry_set {
for (_i, &(l1, l2, l3, p1, p2, p3, id, state)) in tests.iter().enumerate() {
let l = default_logger();
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut n3 = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), &l);
n1.set_priority(p1);
n2.set_priority(p2);
n3.set_priority(p3);
if l1 {
n1.raft_log.append(&latest_entries);
} else {
n1.raft_log.append(&outdated_entries);
}
if l2 {
n2.raft_log.append(&latest_entries);
} else {
n2.raft_log.append(&outdated_entries);
}
if l3 {
n3.raft_log.append(&latest_entries);
} else {
n3.raft_log.append(&outdated_entries);
}

let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l);
let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l);

network.send(vec![new_message(id, id, MessageType::MsgHup, 0)]);
network.send(vec![new_message(id, id, MessageType::MsgHup, 0)]);

assert_eq!(network.peers[&id].state, state);
assert_eq!(network.peers[&id].state, state, "{}", tag);
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub struct Config {
/// rejoins the cluster.
pub pre_vote: bool,

/// If split vote happens, only vote for the large id. This can save a round of campaign
/// when combined with `pre_vote`.
pub judge_split_prevote: bool,

/// The range of election timeout. In some cases, we hope some nodes has less possibility
/// to become leader. This configuration ensures that the randomized election_timeout
/// will always be suit in [min_election_tick, max_election_tick).
Expand Down Expand Up @@ -112,6 +116,7 @@ impl Default for Config {
max_inflight_msgs: 256,
check_quorum: false,
pre_vote: false,
judge_split_prevote: false,
min_election_tick: 0,
max_election_tick: 0,
read_only_option: ReadOnlyOption::Safe,
Expand Down
56 changes: 51 additions & 5 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp;
use std::cmp::{self, Ordering};
use std::ops::{Deref, DerefMut};

use crate::eraftpb::{
Expand Down Expand Up @@ -238,6 +238,7 @@ pub struct RaftCore<T: Storage> {

skip_bcast_commit: bool,
batch_append: bool,
judge_split_prevote: bool,

heartbeat_timeout: usize,
election_timeout: usize,
Expand Down Expand Up @@ -342,6 +343,7 @@ impl<T: Storage> Raft<T> {
promotable: false,
check_quorum: c.check_quorum,
pre_vote: c.pre_vote,
judge_split_prevote: c.judge_split_prevote,
read_only: ReadOnly::new(c.read_only_option),
heartbeat_timeout: c.heartbeat_tick,
election_timeout: c.election_tick,
Expand Down Expand Up @@ -1312,6 +1314,18 @@ impl<T: Storage> Raft<T> {
"msg type" => ?m.get_msg_type(),
);

// When judge_split_prevote, reject explicitly to make candidate exit PreCandiate early
// so it will vote for other peer later.
if self.judge_split_prevote
&& m.get_msg_type() == MessageType::MsgRequestPreVote
{
Comment on lines +1349 to +1353
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems Follower and PreCandidate are not different, both of them can (pre)vote to other peers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follower will ignore all PreVoteResponse. For example, A and B split pre-votes, and C can vote for both A and B. If B is chosen, and A doesn't step down to follower, it will still start campaign when C's prevote is received.

let mut to_send =
new_message(m.from, MessageType::MsgRequestPreVoteResponse, None);
to_send.term = m.term;
to_send.reject = true;
self.r.send(to_send, &mut self.msgs);
}

return Ok(());
}
}
Expand Down Expand Up @@ -1423,10 +1437,7 @@ impl<T: Storage> Raft<T> {
// ...or this is a PreVote for a future term...
(m.get_msg_type() == MessageType::MsgRequestPreVote && m.term > self.term);
// ...and we believe the candidate is up to date.
if can_vote
&& self.raft_log.is_up_to_date(m.index, m.log_term)
&& (m.index > self.raft_log.last_index() || self.priority <= m.priority)
{
if can_vote && self.may_vote(&m) {
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why consider the
// case where a single node was previously partitioned away and
Expand All @@ -1447,6 +1458,10 @@ impl<T: Storage> Raft<T> {
self.election_elapsed = 0;
self.vote = m.from;
}
// This means it's in split vote, give up election.
if self.judge_split_prevote && self.state == StateRole::PreCandidate {
gengliqi marked this conversation as resolved.
Show resolved Hide resolved
self.become_follower(self.term, INVALID_ID);
}
} else {
self.log_vote_reject(&m);
let mut to_send =
Expand All @@ -1469,6 +1484,37 @@ impl<T: Storage> Raft<T> {
Ok(())
}

/// Checks if this node may vote for the request. It may vote when from node
/// contains the same logs or more logs. When they contains same logs, priority
/// is considered. If priorities are still the same, and this node is also
/// starting campaign, then split vote happens. In this case, if `judge_split_prevote`
/// and `pre_vote` are enabled, it will vote only when from node has greater ID.
fn may_vote(&self, m: &Message) -> bool {
match self.raft_log.is_up_to_date(m.index, m.log_term) {
Ordering::Greater => true,
Ordering::Equal => {
match self.priority.cmp(&m.priority) {
Ordering::Greater => false,
Ordering::Equal => {
// judge split vote can break symmetry of campaign, but as
// it only happens during split vote, the impact should not
// be significant.
Comment on lines +1531 to +1533
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we tell if a campaign will end up split vote?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could cause some raft nodes with lower ID impossible to become the leader even we want to transfer leadership to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the configuration size is an odd number and leader is down, split vote can probably happen If two nodes are in PreCandidate state. judge_split_prevote only works on prevote, transfering leader skips prevote, so they won't have impact on the other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

judge_split_prevote only works on prevote

Besides transfer leader, a node needs to pass pre-campaign before start the actual campaign, so judge_split_prevote will impact the whole election process (pre-vote should consider enabled as this is when judge_split_prevote work).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so judge_split_prevote will impact the whole election process...

Indeed, it depends on whether nodes are working slowly. But in my tests, when one node is down, and after elections are finished, the leader count on each nodes don't have much differences (I remove balance leader scheduler before shutdown a node). And even it leads to more leaders on some node, it should not be a problem with the help of PD to reach a eventually balance.

!self.judge_split_prevote
|| self.state != StateRole::PreCandidate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments for transfer leader?

|| m.get_msg_type() != MessageType::MsgRequestPreVote
|| {
let (my_h, from_h) =
(fxhash::hash64(&self.id), fxhash::hash64(&m.from));
my_h < from_h || (my_h == from_h && self.id < m.from)
}
}
Ordering::Less => true,
}
}
Ordering::Less => false,
}
}

fn hup(&mut self, transfer_leader: bool) {
if self.state == StateRole::Leader {
debug!(
Expand Down
34 changes: 20 additions & 14 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp;
use std::cmp::{self, Ordering};

use slog::warn;
use slog::Logger;
Expand Down Expand Up @@ -409,8 +409,11 @@ impl<T: Storage> RaftLog<T> {
/// later term is more up-to-date. If the logs end with the same term, then
/// whichever log has the larger last_index is more up-to-date. If the logs are
/// the same, the given log is up-to-date.
pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool {
term > self.last_term() || (term == self.last_term() && last_index >= self.last_index())
/// Returns `Ordering::Greater` means given log is more up-to-date, `Ordering::Equal`
/// means they have the same logs, `Ordering::Less` means existing logs is more
/// up-to-date.
pub fn is_up_to_date(&self, last_index: u64, term: u64) -> Ordering {
(term, last_index).cmp(&(self.last_term(), self.last_index()))
}

/// Returns committed and persisted entries since max(`since_idx` + 1, first_index).
Expand Down Expand Up @@ -650,7 +653,7 @@ impl<T: Storage> RaftLog<T> {
#[cfg(test)]
mod test {
use std::{
cmp,
cmp::{self, Ordering},
panic::{self, AssertUnwindSafe},
};

Expand Down Expand Up @@ -743,22 +746,25 @@ mod test {
raft_log.append(&previous_ents);
let tests = vec![
// greater term, ignore lastIndex
(raft_log.last_index() - 1, 4, true),
(raft_log.last_index(), 4, true),
(raft_log.last_index() + 1, 4, true),
(raft_log.last_index() - 1, 4, Ordering::Greater),
(raft_log.last_index(), 4, Ordering::Greater),
(raft_log.last_index() + 1, 4, Ordering::Greater),
// smaller term, ignore lastIndex
(raft_log.last_index() - 1, 2, false),
(raft_log.last_index(), 2, false),
(raft_log.last_index() + 1, 2, false),
(raft_log.last_index() - 1, 2, Ordering::Less),
(raft_log.last_index(), 2, Ordering::Less),
(raft_log.last_index() + 1, 2, Ordering::Less),
// equal term, lager lastIndex wins
(raft_log.last_index() - 1, 3, false),
(raft_log.last_index(), 3, true),
(raft_log.last_index() + 1, 3, true),
(raft_log.last_index() - 1, 3, Ordering::Less),
(raft_log.last_index(), 3, Ordering::Equal),
(raft_log.last_index() + 1, 3, Ordering::Greater),
];
for (i, &(last_index, term, up_to_date)) in tests.iter().enumerate() {
let g_up_to_date = raft_log.is_up_to_date(last_index, term);
if g_up_to_date != up_to_date {
panic!("#{}: uptodate = {}, want {}", i, g_up_to_date, up_to_date);
panic!(
"#{}: uptodate = {:?}, want {:?}",
i, g_up_to_date, up_to_date
);
}
}
}
Expand Down