From 75cb96fbcbab0f0d14362fc9eb6273fcc8421e21 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Tue, 26 Jul 2022 22:05:50 +0800 Subject: [PATCH] Add new message type MsgGroupBroadcast and corresponding handler Signed-off-by: LintianShi --- proto/proto/eraftpb.proto | 13 ++++++ src/config.rs | 5 +++ src/raft.rs | 92 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 103 insertions(+), 7 deletions(-) diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index 1f7c71b2..c1f275d1 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -46,6 +46,17 @@ message Snapshot { SnapshotMetadata metadata = 2; } +// Forward is a type that tells the agent how to forward the MsgGroupBroadcast from the leader. +// +// Field to is the destination of forwarding. +// Field log_term and index is the previous entry of log entries that should be forwarded. +// Entries to be forwarded is the range (index, last_index]. +message Forward { + uint64 to = 1; + uint64 log_term = 2; + uint64 index = 3; +} + enum MessageType { MsgHup = 0; MsgBeat = 1; @@ -66,6 +77,7 @@ enum MessageType { MsgReadIndexResp = 16; MsgRequestPreVote = 17; MsgRequestPreVoteResponse = 18; + MsgGroupBroadcast = 19; } message Message { @@ -89,6 +101,7 @@ message Message { uint64 reject_hint = 11; bytes context = 12; uint64 priority = 14; + repeated Forward forwards = 16; } message HardState { diff --git a/src/config.rs b/src/config.rs index 392540db..103639f0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -67,6 +67,10 @@ pub struct Config { /// rejoins the cluster. pub pre_vote: bool, + /// Enables follower replication. + /// This reduces the across-AZ traffic of cloud deployment. + pub follower_repl: bool, + /// The range of election timeout. In some cases, we hope some nodes has less possibility /// to become leader. This configuration ensures that the randomized election_timeout /// will always be suit in [min_election_tick, max_election_tick). @@ -112,6 +116,7 @@ impl Default for Config { max_inflight_msgs: 256, check_quorum: false, pre_vote: false, + follower_repl: false, min_election_tick: 0, max_election_tick: 0, read_only_option: ReadOnlyOption::Safe, diff --git a/src/raft.rs b/src/raft.rs index fe397ed1..c5527c02 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -236,6 +236,13 @@ pub struct RaftCore { /// Enable this if greater cluster stability is preferred over faster elections. pub pre_vote: bool, + /// Enable follower replication. + /// + /// This enables data replication from a follower to other servers in the same available zone. + /// + /// Enable this for reducing across-AZ traffic of cloud deployment. + pub follower_repl: bool, + skip_bcast_commit: bool, batch_append: bool, @@ -337,6 +344,7 @@ impl Raft { promotable: false, check_quorum: c.check_quorum, pre_vote: c.pre_vote, + follower_repl: c.follower_repl, read_only: ReadOnly::new(c.read_only_option), heartbeat_timeout: c.heartbeat_tick, election_timeout: c.election_tick, @@ -1372,6 +1380,7 @@ impl Raft { if m.get_msg_type() == MessageType::MsgAppend || m.get_msg_type() == MessageType::MsgHeartbeat || m.get_msg_type() == MessageType::MsgSnapshot + || m.get_msg_type() == MessageType::MsgGroupBroadcast && self.follower_repl { self.become_follower(m.term, m.from); } else { @@ -1381,7 +1390,8 @@ impl Raft { } else if m.term < self.term { if (self.check_quorum || self.pre_vote) && (m.get_msg_type() == MessageType::MsgHeartbeat - || m.get_msg_type() == MessageType::MsgAppend) + || m.get_msg_type() == MessageType::MsgAppend + || m.get_msg_type() == MessageType::MsgGroupBroadcast && self.follower_repl) { // We have received messages from a leader at a lower term. It is possible // that these messages were simply delayed in the network, but this could @@ -2314,6 +2324,11 @@ impl Raft { self.leader_id = m.from; self.handle_append_entries(&m); } + MessageType::MsgGroupBroadcast => { + self.election_elapsed = 0; + self.leader_id = m.from; + self.handle_group_broadcast(&m); + } MessageType::MsgHeartbeat => { self.election_elapsed = 0; self.leader_id = m.from; @@ -2425,13 +2440,14 @@ impl Raft { Err(Error::RequestSnapshotDropped) } - // TODO: revoke pub when there is a better way to test. - /// For a given message, append the entries to the log. - pub fn handle_append_entries(&mut self, m: &Message) { + /// Try to append entries, and return the append result. + /// Return true only if the entries in the message has been appended in the log successfully. + pub fn try_append_entries(&mut self, m: &Message) -> bool { if self.pending_request_snapshot != INVALID_INDEX { self.send_request_snapshot(); - return; + return false; } + if m.index < self.raft_log.committed { debug!( self.logger, @@ -2443,13 +2459,14 @@ impl Raft { to_send.index = self.raft_log.committed; to_send.commit = self.raft_log.committed; self.r.send(to_send, &mut self.msgs); - return; + return false; } let mut to_send = Message::default(); to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); + let mut success = true; if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) @@ -2458,7 +2475,7 @@ impl Raft { } else { debug!( self.logger, - "rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \ + "Reject append [logterm: {msg_log_term}, index: {msg_index}] \ from {from}", msg_log_term = m.log_term, msg_index = m.index, @@ -2483,9 +2500,70 @@ impl Raft { to_send.reject = true; to_send.reject_hint = hint_index; to_send.log_term = hint_term.unwrap(); + success = false; } to_send.set_commit(self.raft_log.committed); self.r.send(to_send, &mut self.msgs); + success + } + + // TODO: revoke pub when there is a better way to test. + /// For a given message, append the entries to the log. + pub fn handle_append_entries(&mut self, m: &Message) { + self.try_append_entries(m); + } + + /// For a broadcast, append entries to onw log and forward MsgAppend to other dest. + pub fn handle_group_broadcast(&mut self, m: &Message) { + if self.try_append_entries(m) { + // If the agent fails to append entries from the leader, + // the agent cannot forward MsgAppend. + let agent_id = m.get_to(); + for forward in m.get_forwards() { + // Dest should be in the cluster. + if self.prs().get(forward.get_to()).is_some() { + // Fetch log entries from the forward.index to the last index of log. + let ents = self.raft_log.entries( + forward.get_index() + 1, + self.max_msg_size, + GetEntriesContext(GetEntriesFor::SendAppend { + to: forward.get_to(), + term: m.term, + aggressively: false, + }), + ); + if self + .raft_log + .match_term(forward.get_index(), forward.get_log_term()) + { + let mut m_append = Message::default(); + m_append.to = forward.get_to(); + m_append.from = m.get_from(); + m_append.set_msg_type(MessageType::MsgAppend); + m_append.index = forward.get_index(); + m_append.log_term = forward.get_log_term(); + m_append.set_entries(ents.unwrap().into()); + m_append.commit = m.get_commit(); + m_append.commit_term = m.get_commit_term(); + debug!( + self.logger, + "Peer {} forward MsgAppend from {} to {}", + agent_id, + m_append.from, + m_append.to + ); + self.r.send(m_append, &mut self.msgs) + } else { + warn!( + self.logger, + "The agent's log does not match with index {} log term {} in forward message", + forward.get_index(), + forward.get_log_term() + ); + } + } + } + } } // TODO: revoke pub when there is a better way to test.