Skip to content

Commit

Permalink
fix: fix shutdown and propose_conf_change send to follower
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Sep 17, 2023
1 parent 6fbc51e commit 460e10c
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 69 deletions.
20 changes: 12 additions & 8 deletions curp/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ syntax = "proto3";
package messagepb;

import "command.proto";
import "error.proto";
import "google/protobuf/empty.proto";

message FetchLeaderRequest {
}
Expand Down Expand Up @@ -85,8 +87,7 @@ message ShutdownRequest {
message ShutdownResponse {
optional uint64 leader_id = 1;
uint64 term = 2;
// The original type is ProposeError
optional bytes error = 3;
optional errorpb.ProposeError error = 3;
}

message ProposeConfChangeRequest {
Expand All @@ -113,13 +114,16 @@ message Member {
};

message ProposeConfChangeResponse {
enum ConfChangeError {
InvalidConfig = 0;
NodeNotExists = 1;
NodeAlreadyExists = 2;

oneof error {
google.protobuf.Empty InvalidConfig = 1;
google.protobuf.Empty NodeNotExists = 2;
google.protobuf.Empty NodeAlreadyExists = 3;
errorpb.ProposeError propose = 4;
}
optional ConfChangeError error = 1;
repeated Member members = 2;
repeated Member members = 5;
optional uint64 leader_id = 6;
uint64 term = 7;
}


Expand Down
46 changes: 31 additions & 15 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tokio::time::timeout;
use tracing::{debug, instrument, warn};
use utils::config::ClientConfig;

use crate::rpc::connect::ConnectApiWrapper;
use crate::{
cmd::{Command, ProposeId},
error::{
Expand All @@ -22,10 +21,12 @@ use crate::{
},
members::ServerId,
rpc::{
self, connect::ConnectApi, protocol_client::ProtocolClient, FetchClusterRequest,
FetchClusterResponse, FetchLeaderRequest, FetchReadStateRequest, Member,
ProposeConfChangeRequest, ProposeRequest, ReadState as PbReadState, ShutdownRequest,
SyncResult, WaitSyncedRequest,
self,
connect::{ConnectApi, ConnectApiWrapper},
protocol_client::ProtocolClient,
FetchClusterRequest, FetchClusterResponse, FetchLeaderRequest, FetchReadStateRequest,
Member, ProposeConfChangeRequest, ProposeRequest, ReadState as PbReadState,
ShutdownRequest, SyncResult, WaitSyncedRequest,
},
ConfChangeError, LogIndex,
};
Expand Down Expand Up @@ -65,14 +66,15 @@ impl<C: Command> Builder<C> {
pub async fn build_from_all_members(
&self,
all_members: HashMap<ServerId, String>,
leader_id: Option<ServerId>,
) -> Result<Client<C>, ClientBuildError> {
let Some(config) = self.config else {
return Err(ClientBuildError::invalid_arguments("timeout is required"));
};
let connects = rpc::connect(all_members).await.collect();
let client = Client::<C> {
local_server_id: self.local_server_id,
state: RwLock::new(State::new(None, 0)),
state: RwLock::new(State::new(leader_id, 0)),
config,
connects,
phantom: PhantomData,
Expand Down Expand Up @@ -403,8 +405,12 @@ where
self.state
.write()
.check_and_update(resp.leader_id, resp.term);

match resp.error {
Some(e) => return Err(bincode::deserialize(&e)?),
Some(e) => {
warn!("shutdown error: {:?}", e);
continue;
}
None => return Ok(()),
}
}
Expand Down Expand Up @@ -656,11 +662,14 @@ where
let retry_timeout = *self.config.retry_timeout();
let retry_count = *self.config.retry_count();
for _ in 0..retry_count {
// fetch leader id
let leader_id = self.get_leader_id().await?;

let leader_id = match self.get_leader_id().await {
Ok(leader_id) => leader_id,
Err(e) => {
warn!("failed to fetch leader, {e}");
continue;
}
};
debug!("propose_conf_change request sent to {}", leader_id);

let resp = match self
.get_connect(leader_id)
.unwrap_or_else(|| unreachable!("leader {leader_id} not found"))
Expand All @@ -674,11 +683,18 @@ where
continue;
}
};
self.state
.write()
.check_and_update(resp.leader_id, resp.term);
return match resp.error {
Some(e) => {
let error = ConfChangeError::from_i32(e)
.unwrap_or_else(|| unreachable!("error code from rpc must be valid"));
Ok(Err(error))
warn!("propose conf change error: {:?}", e);
if let ConfChangeError::Propose(ref nl) = e {
if nl.propose_error == Some(ProposeError::NotLeader.into()) {
continue;
}
}
Ok(Err(e))
}
None => Ok(Ok(resp.members)),
};
Expand Down Expand Up @@ -807,7 +823,7 @@ mod tests {
async fn client_builder_should_return_err_when_arguments_invalid() {
let res = Client::<TestCommand>::builder()
.config(ClientConfig::default())
.build_from_all_members(HashMap::from([(123, "addr".to_owned())]))
.build_from_all_members(HashMap::from([(123, "addr".to_owned())]), None)
.await;
assert!(res.is_ok());

Expand Down
25 changes: 24 additions & 1 deletion curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use self::proto::{
commandpb::{cmd_result::Result as CmdResultInner, CmdResult, ProposeRequest, ProposeResponse},
messagepb::{
propose_conf_change_request::{ConfChange, ConfChangeType},
propose_conf_change_response::ConfChangeError,
propose_conf_change_response::Error as ConfChangeError,
protocol_client,
protocol_server::ProtocolServer,
FetchLeaderRequest, FetchLeaderResponse, Member, ProposeConfChangeRequest,
Expand Down Expand Up @@ -538,3 +538,26 @@ impl From<ProposeConfChangeRequest> for ConfChangeEntry {
}
}
}

impl ShutdownResponse {
/// Create a new shutdown response
pub(crate) fn new(leader_id: Option<ServerId>, term: u64, error: Option<ProposeError>) -> Self {
let error = error.map(|e| PbProposeErrorOuter {
propose_error: Some(e.into()),
});
Self {
leader_id,
term,
error,
}
}
}

impl ConfChangeError {
/// Create a new `ConfChangeError` with `ProposeError`
pub(crate) fn new_propose(error: ProposeError) -> Self {
Self::Propose(PbProposeErrorOuter {
propose_error: Some(error.into()),
})
}
}
2 changes: 1 addition & 1 deletion curp/src/server/cmd_board.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<C: Command> CommandBoard<C> {
id: &ProposeId,
) -> Result<bool, ConfChangeError> {
loop {
if let Some(er) = cb.map_read(|cb_r| cb_r.conf_buffer.get(id).copied()) {
if let Some(er) = cb.map_read(|cb_r| cb_r.conf_buffer.get(id).cloned()) {
return er;
}
let listener = cb.write().conf_listener(id);
Expand Down
3 changes: 2 additions & 1 deletion curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ async fn worker_as<
EntryData::ConfChange(ref conf_change) => {
let changes = conf_change.changes().to_owned();
let res = curp.apply_conf_change(changes).await;
let shutdown_self = res.as_ref().is_ok_and(|r| *r);
if let Err(e) = ce.set_last_applied(entry.index) {
error!("failed to set last_applied, {e}");
}
cb.write().insert_conf(entry.id(), res);
if res.is_ok_and(|r| r) {
if shutdown_self {
curp.shutdown_trigger().self_shutdown();
}
true
Expand Down
33 changes: 18 additions & 15 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,13 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
let propose_id = ProposeId::new(request.id);
let ((leader_id, term), result) = self.curp.handle_shutdown(propose_id);
let error = match result {
Ok(()) => None,
Err(err) => Some(bincode::serialize(&err)?),
};
let resp = ShutdownResponse {
leader_id,
term,
error,
Ok(()) => {
CommandBoard::wait_for_shutdown_synced(&self.cmd_board).await;
None
}
Err(err) => Some(err),
};
CommandBoard::wait_for_shutdown_synced(&self.cmd_board).await;
let resp = ShutdownResponse::new(leader_id, term, error);
Ok(resp)
}

Expand All @@ -170,16 +168,21 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
req: ProposeConfChangeRequest,
) -> Result<ProposeConfChangeResponse, CurpError> {
let id = ProposeId::new(req.id.clone());
let ((_leader_id, _term), _result) = self.curp.handle_propose_conf_change(req.into());
let result = CommandBoard::wait_for_conf(&self.cmd_board, &id).await;
// ConfChangeError is so small that it won't exceed the range of i32 type.
#[allow(clippy::as_conversions)]
let ((leader_id, term), result) = self.curp.handle_propose_conf_change(req.into());
let error = match result {
Ok(_) => None,
Err(e) => Some(e as i32),
Ok(()) => {
let apply_result = CommandBoard::wait_for_conf(&self.cmd_board, &id).await;
apply_result.err()
}
Err(err) => Some(err),
};
let members = self.curp.cluster().members();
Ok(ProposeConfChangeResponse { error, members })
Ok(ProposeConfChangeResponse {
members,
leader_id,
term,
error,
})
}

/// Handle `AppendEntries` requests
Expand Down
23 changes: 15 additions & 8 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
pub(super) fn handle_propose_conf_change(
&self,
conf_change: ConfChangeEntry,
) -> ((Option<ServerId>, u64), Result<(), ProposeError>) {
) -> ((Option<ServerId>, u64), Result<(), ConfChangeError>) {
debug!(
"{} gets conf change for with id {}",
self.id(),
Expand All @@ -359,15 +359,21 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {

// Non-leader doesn't need to sync or execute
if st_r.role != Role::Leader {
return (info, Ok(()));
return (
info,
Err(ConfChangeError::new_propose(ProposeError::NotLeader)),
);
}

if !self
.ctx
.cb
.map_write(|mut cb_w| cb_w.sync.insert(conf_change.id().clone()))
{
return (info, Err(ProposeError::Duplicated));
return (
info,
Err(ConfChangeError::new_propose(ProposeError::Duplicated)),
);
}
let mut log_w = self.log.write();
let entry = match log_w.push_conf_change(st_r.term, conf_change) {
Expand All @@ -376,7 +382,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
entry
}
Err(e) => {
return (info, Err(e.into()));
return (info, Err(ConfChangeError::new_propose(e.into())));
}
};

Expand Down Expand Up @@ -755,6 +761,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
};
if is_leader {
let mut st_w = raw_curp.st.write();
st_w.term = 1;
raw_curp.become_leader(&mut st_w);
}
raw_curp
Expand Down Expand Up @@ -1225,25 +1232,25 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
match conf_change_type {
ConfChangeType::Add => {
if !statuses_ids.insert(node_id) || !config.insert(node_id) {
return Err(ConfChangeError::NodeAlreadyExists);
return Err(ConfChangeError::NodeAlreadyExists(()));
}
}
ConfChangeType::Remove => {
if !statuses_ids.remove(&node_id) || !config.remove(&node_id) {
return Err(ConfChangeError::NodeNotExists);
return Err(ConfChangeError::NodeNotExists(()));
}
}
ConfChangeType::Update => {
if statuses_ids.get(&node_id).is_none() || !config.contains(&node_id) {
return Err(ConfChangeError::NodeNotExists);
return Err(ConfChangeError::NodeNotExists(()));
}
}
ConfChangeType::AddLearner => {
unimplemented!("learner node is not supported yet");
}
}
if statuses_ids.len() < 3 || config.voters() != &statuses_ids {
return Err(ConfChangeError::InvalidConfig);
return Err(ConfChangeError::InvalidConfig(()));
}
Ok(())
}
Expand Down
Loading

0 comments on commit 460e10c

Please sign in to comment.