From eed70bc550f26154b281bae8b17b46e018f69cc2 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Fri, 23 Sep 2022 18:17:21 +0800 Subject: [PATCH] async log entries fetch for forwarding Signed-off-by: LintianShi --- src/lib.rs | 2 +- src/raft.rs | 147 ++++++++++++++++++++++++------------------------ src/raw_node.rs | 13 +++++ src/storage.rs | 59 +++++++++++-------- 4 files changed, 125 insertions(+), 96 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e712ac84..a505995b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -536,7 +536,7 @@ pub use raw_node::is_empty_snap; pub use raw_node::{LightReady, Peer, RawNode, Ready, SnapshotStatus}; pub use read_only::{ReadOnlyOption, ReadState}; pub use status::Status; -pub use storage::{GetEntriesContext, RaftState, Storage}; +pub use storage::{GetEntriesContext, GetEntriesFor, RaftState, Storage}; pub use tracker::{Inflights, Progress, ProgressState, ProgressTracker}; pub use util::majority; diff --git a/src/raft.rs b/src/raft.rs index df0e807e..e9c7d946 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -844,6 +844,63 @@ impl RaftCore { true } + fn send_forward( + &mut self, + from: u64, + commit: u64, + commit_term: u64, + forward: &Forward, + msgs: &mut Vec, + ) { + let mut m = Message::default(); + m.to = forward.to; + m.from = from; + m.commit = commit; + m.commit_term = commit_term; + m.set_msg_type(MessageType::MsgAppend); + // Fetch log entries from the forward.index to the last index of log. + if self + .raft_log + .match_term(forward.get_index(), forward.get_log_term()) + { + let ents = self.raft_log.entries( + forward.get_index() + 1, + self.max_msg_size, + GetEntriesContext(GetEntriesFor::SendForward { + from, + commit, + commit_term, + term: self.term, + forward: forward.clone(), + }), + ); + + match ents { + Ok(ents) => { + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + m.set_entries(ents.into()); + self.send(m, msgs); + } + Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => {} + _ => { + // Forward MsgAppend with empty entries in order to update commit + // or trigger decrementing next_idx. + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + self.send(m, msgs); + warn!( + self.logger, + "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + } + } + } + } + // send_heartbeat sends an empty MsgAppend fn send_heartbeat( &mut self, @@ -904,6 +961,12 @@ impl Raft { .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); } + /// Forwards an append RPC from the leader to the given peer. + pub fn send_forward(&mut self, from: u64, commit: u64, commit_term: u64, forward: &Forward) { + self.r + .send_forward(from, commit, commit_term, forward, &mut self.msgs); + } + /// Broadcasts heartbeats to all the followers if it's leader. pub fn ping(&mut self) { if self.state == StateRole::Leader { @@ -2541,59 +2604,20 @@ impl Raft { // If the agent fails to append entries from the leader, // the agent cannot forward MsgAppend. for forward in m.get_forwards() { - // Fetch log entries from the forward.index to the last index of log. - if self - .raft_log - .match_term(forward.get_index(), forward.get_log_term()) - { - let ents = self.raft_log.entries( - forward.get_index() + 1, - self.max_msg_size, - GetEntriesContext(GetEntriesFor::SendAppend { - to: forward.get_to(), - term: m.term, - aggressively: false, - }), - ); - - match ents { - Ok(ents) => { - let mut m_append = Message::default(); - m_append.to = forward.get_to(); - m_append.from = m.get_from(); - m_append.set_msg_type(MessageType::MsgAppend); - m_append.index = forward.get_index(); - m_append.log_term = forward.get_log_term(); - m_append.set_entries(ents.into()); - m_append.commit = m.get_commit(); - m_append.commit_term = m.get_commit_term(); - self.r.send(m_append, &mut self.msgs); - } - Err(_) => { - self.dummy_forward(m, forward); - warn!( - self.logger, - "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", - forward.get_index(), - forward.get_log_term(), - forward.get_to() - ); - } - } - } else { - self.dummy_forward(m, forward); - warn!( - self.logger, - "The agent's log does not match with index {} log term {} in forward message to peer {}.", - forward.get_index(), - forward.get_log_term(), - forward.get_to() - ); - } + self.r + .send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs); } } else { for forward in m.get_forwards() { - self.dummy_forward(m, forward); + let mut m_append = Message::default(); + m_append.to = forward.to; + m_append.from = m.from; + m_append.commit = m.commit; + m_append.commit_term = m.commit_term; + m_append.set_msg_type(MessageType::MsgAppend); + m_append.index = forward.get_index(); + m_append.log_term = forward.get_log_term(); + self.r.send(m_append, &mut self.msgs); } info!( self.logger, @@ -2937,29 +2961,6 @@ impl Raft { self.lead_transferee = None; } - // Forward MsgAppend with empty entries in order to update commit - // or trigger decrementing next_idx. - fn dummy_forward(&mut self, m: &Message, forward: &Forward) { - let mut m_append = Message::default(); - m_append.to = forward.get_to(); - m_append.from = m.get_from(); - m_append.set_msg_type(MessageType::MsgAppend); - m_append.index = forward.get_index(); - m_append.log_term = forward.get_log_term(); - m_append.commit = m.get_commit(); - m_append.commit_term = m.get_commit_term(); - - info!( - self.logger, - "The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \ - to peer {id}", - msg_log_term = forward.log_term, - msg_index = forward.index, - id = forward.to; - ); - self.r.send(m_append, &mut self.msgs); - } - fn send_request_snapshot(&mut self) { let mut m = Message::default(); m.set_msg_type(MessageType::MsgAppendResponse); diff --git a/src/raw_node.rs b/src/raw_node.rs index a15a1489..a720bfd0 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -439,6 +439,19 @@ impl RawNode { self.raft.send_append(to) } } + GetEntriesFor::SendForward { + from, + commit, + commit_term, + term, + forward, + } => { + if self.raft.term != term { + // term has changed + return; + } + self.raft.send_forward(from, commit, commit_term, &forward); + } GetEntriesFor::Empty(can_async) if can_async => {} _ => panic!("shouldn't call callback on non-async context"), } diff --git a/src/storage.rs b/src/storage.rs index f9b71fab..ebd85bd2 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -56,9 +56,44 @@ impl RaftState { } } +/// Purpose of getting entries. +#[derive(Debug)] +pub enum GetEntriesFor { + /// For sending entries to followers. + SendAppend { + /// The peer id to which the entries are going to send. + to: u64, + /// The term when the request is issued. + term: u64, + /// Whether to exhaust all the entries. + aggressively: bool, + }, + /// For forwarding entries to followers. + SendForward { + /// The peer id from which the entries are forwarded. + from: u64, + /// The commit index in MsgGroupbroadcast. + commit: u64, + /// The commit term in MsgGroupbroadcast. + commit_term: u64, + /// The term when the request is issued. + term: u64, + /// The forward information in MsgGroupbroadcast. + forward: Forward, + }, + /// For getting committed entries in a ready. + GenReady, + /// For getting entries to check pending conf when transferring leader. + TransferLeader, + /// For getting entries to check pending conf when forwarding commit index by vote messages + CommitByVote, + /// It's not called by the raft itself. + Empty(bool), +} + /// Records the context of the caller who calls entries() of Storage trait. #[derive(Debug)] -pub struct GetEntriesContext(pub(crate) GetEntriesFor); +pub struct GetEntriesContext(pub GetEntriesFor); impl GetEntriesContext { /// Used for callers out of raft. Caller can customize if it supports async. @@ -70,33 +105,13 @@ impl GetEntriesContext { pub fn can_async(&self) -> bool { match self.0 { GetEntriesFor::SendAppend { .. } => true, + GetEntriesFor::SendForward { .. } => true, GetEntriesFor::Empty(can_async) => can_async, _ => false, } } } -#[derive(Debug)] -pub(crate) enum GetEntriesFor { - // for sending entries to followers - SendAppend { - /// the peer id to which the entries are going to send - to: u64, - /// the term when the request is issued - term: u64, - /// whether to exhaust all the entries - aggressively: bool, - }, - // for getting committed entries in a ready - GenReady, - // for getting entries to check pending conf when transferring leader - TransferLeader, - // for getting entries to check pending conf when forwarding commit index by vote messages - CommitByVote, - // It's not called by the raft itself - Empty(bool), -} - /// Storage saves all the information about the current Raft implementation, including Raft Log, /// commit index, the leader to vote for, etc. ///