diff --git a/curp/proto/common b/curp/proto/common index e23221bf8..2a4aa2c30 160000 --- a/curp/proto/common +++ b/curp/proto/common @@ -1 +1 @@ -Subproject commit e23221bf8af96243f726b3bebe27c500867573c4 +Subproject commit 2a4aa2c308738f4692ac8622d747fd3d644424c9 diff --git a/curp/src/client.rs b/curp/src/client.rs index 9b5080b1b..65da46600 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -19,14 +19,15 @@ use utils::config::ClientConfig; use crate::{ cmd::{Command, ProposeId}, - error::{ClientBuildError, ClientError, ERROR_LABEL}, + error::{ClientBuildError, ClientError}, members::ServerId, rpc::{ - self, connect::ConnectApi, protocol_client::ProtocolClient, FetchClusterRequest, - FetchClusterResponse, FetchReadStateRequest, ProposeConfChangeRequest, ProposeRequest, - ReadState as PbReadState, ShutdownRequest, SyncResult, WaitSyncedRequest, + self, connect::ConnectApi, protocol_client::ProtocolClient, ConfChange, CurpError, + FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest, Member, + ProposeConfChangeRequest, ProposeRequest, PublishRequest, ReadState as PbReadState, + Redirect, ShutdownRequest, WaitSyncedRequest, }, - ConfChange, ConfChangeError, LogIndex, Member, ProposeError, PublishRequest, + LogIndex, }; /// Protocol client @@ -227,54 +228,7 @@ pub enum ReadState { CommitIndex(LogIndex), } -/// Unpack `tonic::Status` -enum UnpackStatus { - /// CurpServer is shutting down - ShuttingDown, - /// Indicate that the client sent a wait synced request to a non-leader - Redirect(Option, u64), - /// Transport error type - Transport, - /// Encode decode error type - EncodeDecode, - /// Storage error type - Storage, - /// IO error type - IO, - /// Internal error type - Internal, - /// Wrong Cluster Version - WrongClusterVersion, -} - -/// unpack `tonic::Status` and convert it to `UnpackStatus` -fn unpack_status(status: &tonic::Status) -> UnpackStatus { - let meta = status.metadata(); - if let Some(label) = meta.get(ERROR_LABEL) { - match label.to_str().unwrap_or_else(|err| { - unreachable!("error-label should be always able to convert to str: {err:?}") - }) { - "shutting-down" => UnpackStatus::ShuttingDown, - "transport" => UnpackStatus::Transport, - "redirect" => { - let (leader_id, term) = serde_json::from_slice(status.details()).unwrap_or_else(|err| unreachable!(" deserialize (leader_id, term) from status' detail should always success: {err:?}")); - UnpackStatus::Redirect(leader_id, term) - } - "encode-decode" => UnpackStatus::EncodeDecode, - "internal" => UnpackStatus::Internal, - "storage" => UnpackStatus::Storage, - "io" => UnpackStatus::IO, - "wrong-cluster-version" => UnpackStatus::WrongClusterVersion, - unsupported_label => { - unreachable!("unsupported status label {unsupported_label}") - } - } - } else { - // This transport error comes from `tonic` framework - UnpackStatus::Transport - } -} - +#[allow(clippy::wildcard_enum_match_arm)] // TODO: wait refactoring impl Client where C: Command + 'static, @@ -321,43 +275,28 @@ where while let Some(resp_result) = rpcs.next().await { let resp = match resp_result { Ok(resp) => resp.into_inner(), - Err(e) => { - warn!("Propose error: {}", e); - match unpack_status(&e) { - UnpackStatus::ShuttingDown => return Err(ClientError::ShuttingDown), - UnpackStatus::WrongClusterVersion => { + warn!("Propose error: {e:?}"); + match e { + CurpError::ShuttingDown(_) => return Err(ClientError::ShuttingDown), + CurpError::WrongClusterVersion(_) => { return Err(ClientError::WrongClusterVersion) } - UnpackStatus::Redirect(..) - | UnpackStatus::Transport - | UnpackStatus::EncodeDecode - | UnpackStatus::Storage - | UnpackStatus::IO - | UnpackStatus::Internal => continue, + _ => continue, } } }; - self.state - .write() - .check_and_update(resp.leader_id, resp.term); - resp.map_or_else::>>( - |res| { - if let Some(er) = res - .transpose() - .map_err(|e| ClientError::CommandError::(e))? - { - assert!(execute_result.is_none(), "should not set exe result twice"); - execute_result = Some(er); - } - ok_cnt = ok_cnt.wrapping_add(1); - Ok(()) - }, - |err| { - warn!("Propose error: {}", err); - Ok(()) - }, - )??; + resp.map_result::>>(|res| { + if let Some(er) = res + .transpose() + .map_err(|e| ClientError::CommandError::(e))? + { + assert!(execute_result.is_none(), "should not set exe result twice"); + execute_result = Some(er); + } + ok_cnt = ok_cnt.wrapping_add(1); + Ok(()) + })??; if (ok_cnt >= superquorum) && execute_result.is_some() { debug!("fast round for cmd({}) succeed", cmd_arc.id()); return Ok((execute_result, true)); @@ -397,41 +336,34 @@ where { Ok(resp) => resp.into_inner(), Err(e) => { - warn!("wait synced rpc error: {e}"); - match unpack_status(&e) { - UnpackStatus::ShuttingDown => return Err(ClientError::ShuttingDown), - UnpackStatus::WrongClusterVersion => { + warn!("wait synced rpc error: {e:?}"); + match e { + CurpError::ShuttingDown(_) => return Err(ClientError::ShuttingDown), + CurpError::WrongClusterVersion(_) => { return Err(ClientError::WrongClusterVersion) } - UnpackStatus::Transport => { + CurpError::RpcTransport(_) => { // it's quite likely that the leader has crashed, then we should wait for some time and fetch the leader again tokio::time::sleep(retry_timeout.next_retry()).await; self.resend_propose(Arc::clone(&cmd), None).await?; continue; } - UnpackStatus::Redirect(new_leader, term) => { + CurpError::Redirect(Redirect { + leader_id: new_leader, + term, + }) => { self.state.write().check_and_update(new_leader, term); // resend the propose to the new leader self.resend_propose(Arc::clone(&cmd), new_leader).await?; continue; } - UnpackStatus::EncodeDecode - | UnpackStatus::Storage - | UnpackStatus::IO - | UnpackStatus::Internal => return Err(e.into()), + _ => return Err(ClientError::InternalError(format!("{e:?}"))), } } }; - match SyncResult::::try_from(resp).map_err(Into::>::into)? { - SyncResult::Success { er, asr } => { - debug!("slow round for cmd({}) succeeded", cmd.id()); - return Ok((asr, er)); - } - SyncResult::Error(e) => { - return Err(ClientError::CommandError(e)); - } - } + return resp + .map_result::(|res| res.map_err(|e| ClientError::CommandError(e)))?; } Err(ClientError::Timeout) } @@ -484,24 +416,23 @@ where ) .await { - warn!("shutdown rpc error: {e}"); - match unpack_status(&e) { - UnpackStatus::ShuttingDown => return Err(ClientError::ShuttingDown), - UnpackStatus::WrongClusterVersion => { + warn!("shutdown rpc error: {e:?}"); + match e { + CurpError::ShuttingDown(_) => return Err(ClientError::ShuttingDown), + CurpError::WrongClusterVersion(_) => { let cluster = self.fetch_cluster(false).await?; self.set_cluster(cluster).await?; continue; } - UnpackStatus::Redirect(new_leader, term) => { + CurpError::Redirect(Redirect { + leader_id: new_leader, + term, + }) => { self.state.write().check_and_update(new_leader, term); warn!("shutdown: redirect to new leader {new_leader:?}, term is {term}",); continue; } - UnpackStatus::Transport - | UnpackStatus::EncodeDecode - | UnpackStatus::Storage - | UnpackStatus::IO - | UnpackStatus::Internal => { + _ => { tokio::time::sleep(retry_timeout.next_retry()).await; continue; } @@ -545,60 +476,23 @@ where ) .await; - let resp = match resp { - Ok(resp) => { - let resp = resp.into_inner(); - if let Some(rpc::ExeResult::Error(e)) = resp.exe_result { - let err: ProposeError = e.into(); - if matches!(err, ProposeError::Duplicated) { - return Ok(()); - } - } - resp - } - Err(e) => { - // if the propose fails again, need to fetch the leader and try again - warn!("failed to resend propose, {e}"); - if let UnpackStatus::WrongClusterVersion = unpack_status(&e) { + if let Err(e) = resp { + match e { + CurpError::ShuttingDown(_) => return Err(ClientError::ShuttingDown), + CurpError::WrongClusterVersion(_) => { return Err(ClientError::WrongClusterVersion); } - continue; - } - }; - - let mut state_w = self.state.write(); - - match state_w.term.cmp(&resp.term) { - Ordering::Less => { - // reset term only when the resp has leader id to prevent: - // If a server loses contact with its leader, it will update its term for election. Since other servers are all right, the election will not succeed. - // But if the client learns about the new term and updates its term to it, it will never get the true leader. - if let Some(id) = resp.leader_id { - state_w.update_to_term(resp.term); - let done = id == leader_id; - state_w.set_leader(leader_id); - if done { - return Ok(()); - } - } - } - Ordering::Equal => { - if let Some(id) = resp.leader_id { - let done = id == leader_id; - debug_assert!( - state_w.leader.as_ref().map_or(true, |leader| leader == &id), - "there should never be two leader in one term" - ); - if state_w.leader.is_none() { - state_w.set_leader(id); - } - if done { - return Ok(()); - } + CurpError::Duplicated(_) => { + return Ok(()); } + _ => {} } - Ordering::Greater => {} + // if the propose fails again, need to fetch the leader and try again + warn!("failed to resend propose, {e:?}"); + continue; } + + break; } Err(ClientError::Timeout) } @@ -821,7 +715,7 @@ where &self, propose_id: ProposeId, changes: Vec, - ) -> Result, ConfChangeError>, ClientError> { + ) -> Result, CurpError>, ClientError> { debug!( "propose_conf_change with propose_id({}) started", propose_id @@ -837,7 +731,7 @@ where } }; debug!("propose_conf_change request sent to {}", leader_id); - let resp = match self + match self .get_connect(leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) .propose_conf_change( @@ -850,41 +744,39 @@ where ) .await { - Ok(resp) => resp.into_inner(), + Ok(resp) => return Ok(Ok(resp.into_inner().members)), Err(e) => { - warn!("propose_conf_change rpc error: {e}"); - match unpack_status(&e) { - UnpackStatus::ShuttingDown => return Err(ClientError::ShuttingDown), - UnpackStatus::WrongClusterVersion => { + warn!("propose_conf_change rpc error: {e:?}"); + match e { + CurpError::ShuttingDown(_) => return Err(ClientError::ShuttingDown), + CurpError::WrongClusterVersion(_) => { let cluster = self.fetch_cluster(false).await?; self.set_cluster(cluster).await?; continue; } - UnpackStatus::Redirect(new_leader, term) => { + CurpError::Redirect(Redirect { + leader_id: new_leader, + term, + }) => { self.state.write().check_and_update(new_leader, term); + warn!( + "propose_conf_change: redirect to new leader {new_leader:?}, term is {term}", + ); continue; } - UnpackStatus::Transport - | UnpackStatus::EncodeDecode - | UnpackStatus::Storage - | UnpackStatus::IO - | UnpackStatus::Internal => { + CurpError::InvalidConfig(_) + | CurpError::LearnerNotCatchUp(_) + | CurpError::NodeAlreadyExists(_) + | CurpError::NodeNotExists(_) + | CurpError::Duplicated(_) + | CurpError::ExpiredClientId(_) => return Ok(Err(e)), + _ => { tokio::time::sleep(retry_timeout.next_retry()).await; continue; } } } }; - self.state - .write() - .check_and_update(resp.leader_id, resp.term); - return match resp.error { - Some(e) => { - warn!("propose conf change error: {:?}", e); - Ok(Err(e)) - } - None => Ok(Ok(resp.members)), - }; } Err(ClientError::Timeout) } @@ -918,23 +810,25 @@ where ) .await { - warn!("publish rpc error: {e}"); - match unpack_status(&e) { - UnpackStatus::ShuttingDown => return Err(ClientError::ShuttingDown), - UnpackStatus::WrongClusterVersion => { + warn!("publish rpc error: {e:?}"); + match e { + CurpError::ShuttingDown(_) => return Err(ClientError::ShuttingDown), + CurpError::WrongClusterVersion(_) => { let cluster = self.fetch_cluster(false).await?; self.set_cluster(cluster).await?; continue; } - UnpackStatus::Redirect(new_leader, term) => { + CurpError::Redirect(Redirect { + leader_id: new_leader, + term, + }) => { self.state.write().check_and_update(new_leader, term); + warn!( + "propose_conf_change: redirect to new leader {new_leader:?}, term is {term}", + ); continue; } - UnpackStatus::Transport - | UnpackStatus::EncodeDecode - | UnpackStatus::Storage - | UnpackStatus::IO - | UnpackStatus::Internal => { + _ => { tokio::time::sleep(retry_timeout.next_retry()).await; continue; } @@ -971,16 +865,19 @@ where .await { Ok(resp) => resp.into_inner(), - Err(e) => { - if let UnpackStatus::WrongClusterVersion = unpack_status(&e) { + Err(e) => match e { + CurpError::ShuttingDown(_) => return Err(ClientError::ShuttingDown), + CurpError::WrongClusterVersion(_) => { let cluster = self.fetch_cluster(false).await?; self.set_cluster(cluster).await?; continue; } - warn!("fetch read state rpc error: {e}"); - tokio::time::sleep(retry_timeout.next_retry()).await; - continue; - } + _ => { + warn!("fetch read state rpc error: {e:?}"); + tokio::time::sleep(retry_timeout.next_retry()).await; + continue; + } + }, }; let pb_state = resp .read_state @@ -1008,7 +905,8 @@ where FetchClusterRequest::default(), *self.config.initial_retry_timeout(), ) - .await? + .await + .map_err(|e| ClientError::InternalError(format!("{e:?}")))? .into_inner(); Ok(resp) } else { diff --git a/curp/src/error.rs b/curp/src/error.rs index 9b03d75d5..64c7082ec 100644 --- a/curp/src/error.rs +++ b/curp/src/error.rs @@ -3,12 +3,6 @@ use std::io; use curp_external_api::cmd::{Command, PbSerializeError}; use thiserror::Error; -use crate::rpc::ProposeError; - -/// Since there are some different statuses with the same code, Xline using "error-label" -/// to tell apart them. -pub const ERROR_LABEL: &str = "error-label"; - /// Error type of client builder #[allow(clippy::module_name_repetitions)] // this-error generate code false-positive #[derive(Debug, Error)] @@ -63,34 +57,6 @@ pub enum ServerError { RpcError(#[from] tonic::transport::Error), } -impl std::fmt::Display for ProposeError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match *self { - Self::KeyConflict => write!(f, "ProposeError: Key Conflict"), - Self::Duplicated => write!(f, "ProposeError: Duplicated"), - } - } -} - -impl std::error::Error for ProposeError { - fn description(&self) -> &str { - match *self { - Self::KeyConflict => "key conflict error", - Self::Duplicated => "duplicated, the cmd might have already been proposed", - } - } -} - -impl From for ProposeError { - fn from(value: i32) -> Self { - match value { - 0 => Self::KeyConflict, - 1 => Self::Duplicated, - _ => unreachable!("Unknown ProposeError Type"), - } - } -} - /// The union error which includes propose errors and user-defined errors. #[derive(Error, Debug)] #[allow(clippy::module_name_repetitions)] // this-error generate code false-positive diff --git a/curp/src/rpc/connect.rs b/curp/src/rpc/connect.rs index 754c6aa39..2d4c23c8f 100644 --- a/curp/src/rpc/connect.rs +++ b/curp/src/rpc/connect.rs @@ -27,11 +27,11 @@ use crate::{ commandpb::protocol_client::ProtocolClient, inner_messagepb::inner_protocol_client::InnerProtocolClient, }, - AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, - FetchReadStateRequest, FetchReadStateResponse, InstallSnapshotRequest, - InstallSnapshotResponse, ProposeConfChangeRequest, ProposeConfChangeResponse, - ProposeRequest, ProposeResponse, VoteRequest, VoteResponse, WaitSyncedRequest, - WaitSyncedResponse, + AppendEntriesRequest, AppendEntriesResponse, CurpError, FetchClusterRequest, + FetchClusterResponse, FetchReadStateRequest, FetchReadStateResponse, + InstallSnapshotRequest, InstallSnapshotResponse, ProposeConfChangeRequest, + ProposeConfChangeResponse, ProposeRequest, ProposeResponse, VoteRequest, VoteResponse, + WaitSyncedRequest, WaitSyncedResponse, }, snapshot::Snapshot, PublishRequest, PublishResponse, TriggerShutdownRequest, @@ -154,49 +154,49 @@ pub(crate) trait ConnectApi: Send + Sync + 'static { &self, request: ProposeRequest, timeout: Duration, - ) -> Result, tonic::Status>; + ) -> Result, CurpError>; /// Send `ProposeRequest` async fn propose_conf_change( &self, request: ProposeConfChangeRequest, timeout: Duration, - ) -> Result, tonic::Status>; + ) -> Result, CurpError>; /// Send `PublishRequest` async fn publish( &self, request: PublishRequest, timeout: Duration, - ) -> Result, tonic::Status>; + ) -> Result, CurpError>; /// Send `WaitSyncedRequest` async fn wait_synced( &self, request: WaitSyncedRequest, timeout: Duration, - ) -> Result, tonic::Status>; + ) -> Result, CurpError>; /// Send `ShutdownRequest` async fn shutdown( &self, request: ShutdownRequest, timeout: Duration, - ) -> Result, tonic::Status>; + ) -> Result, CurpError>; /// Send `FetchClusterRequest` async fn fetch_cluster( &self, request: FetchClusterRequest, timeout: Duration, - ) -> Result, tonic::Status>; + ) -> Result, CurpError>; /// Send `FetchReadStateRequest` async fn fetch_read_state( &self, request: FetchReadStateRequest, timeout: Duration, - ) -> Result, tonic::Status>; + ) -> Result, CurpError>; } /// Inner Connect interface among different servers @@ -324,12 +324,12 @@ impl ConnectApi for Connect> { &self, request: ProposeRequest, timeout: Duration, - ) -> Result, tonic::Status> { + ) -> Result, CurpError> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.propose(req).await + client.propose(req).await.map_err(Into::into) } /// Send `ShutdownRequest` @@ -338,12 +338,12 @@ impl ConnectApi for Connect> { &self, request: ShutdownRequest, timeout: Duration, - ) -> Result, tonic::Status> { + ) -> Result, CurpError> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.shutdown(req).await + client.shutdown(req).await.map_err(Into::into) } /// Send `ProposeRequest` @@ -352,12 +352,12 @@ impl ConnectApi for Connect> { &self, request: ProposeConfChangeRequest, timeout: Duration, - ) -> Result, tonic::Status> { + ) -> Result, CurpError> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.propose_conf_change(req).await + client.propose_conf_change(req).await.map_err(Into::into) } /// Send `PublishRequest` @@ -366,12 +366,12 @@ impl ConnectApi for Connect> { &self, request: PublishRequest, timeout: Duration, - ) -> Result, tonic::Status> { + ) -> Result, CurpError> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.publish(req).await + client.publish(req).await.map_err(Into::into) } /// Send `WaitSyncedRequest` @@ -380,12 +380,12 @@ impl ConnectApi for Connect> { &self, request: WaitSyncedRequest, timeout: Duration, - ) -> Result, tonic::Status> { + ) -> Result, CurpError> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.wait_synced(req).await + client.wait_synced(req).await.map_err(Into::into) } /// Send `FetchClusterRequest` @@ -393,11 +393,11 @@ impl ConnectApi for Connect> { &self, request: FetchClusterRequest, timeout: Duration, - ) -> Result, tonic::Status> { + ) -> Result, CurpError> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); - client.fetch_cluster(req).await + client.fetch_cluster(req).await.map_err(Into::into) } /// Send `FetchReadStateRequest` @@ -405,11 +405,11 @@ impl ConnectApi for Connect> { &self, request: FetchReadStateRequest, timeout: Duration, - ) -> Result, tonic::Status> { + ) -> Result, CurpError> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); - client.fetch_read_state(req).await + client.fetch_read_state(req).await.map_err(Into::into) } } diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index 305ecaa5a..bca004cee 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -1,17 +1,26 @@ use std::{collections::HashMap, sync::Arc}; use curp_external_api::cmd::{PbCodec, PbSerializeError}; +use prost::Message; use serde::{de::DeserializeOwned, Deserialize, Serialize}; pub use self::proto::{ commandpb::{ cmd_result::Result as CmdResultInner, + curp_error::Err as CurpError, // easy for match + curp_error::Redirect, propose_conf_change_request::{ConfChange, ConfChangeType}, - propose_conf_change_response::Error as ConfChangeError, protocol_client, protocol_server::ProtocolServer, - CmdResult, FetchClusterRequest, FetchClusterResponse, Member, ProposeConfChangeRequest, - ProposeConfChangeResponse, ProposeRequest, ProposeResponse, PublishRequest, + CmdResult, + FetchClusterRequest, + FetchClusterResponse, + Member, + ProposeConfChangeRequest, + ProposeConfChangeResponse, + ProposeRequest, + ProposeResponse, + PublishRequest, PublishResponse, }, inner_messagepb::inner_protocol_server::InnerProtocolServer, @@ -19,10 +28,10 @@ pub use self::proto::{ pub(crate) use self::proto::{ commandpb::{ fetch_read_state_response::{IdSet, ReadState}, - propose_response::ExeResult, protocol_server::Protocol, - FetchReadStateRequest, FetchReadStateResponse, ProposeError, ProposeId as PbProposeId, - ShutdownRequest, ShutdownResponse, WaitSyncedRequest, WaitSyncedResponse, + CurpError as CurpErrorWrapper, FetchReadStateRequest, FetchReadStateResponse, + ProposeId as PbProposeId, ShutdownRequest, ShutdownResponse, WaitSyncedRequest, + WaitSyncedResponse, }, inner_messagepb::{ inner_protocol_server::InnerProtocol, AppendEntriesRequest, AppendEntriesResponse, @@ -128,70 +137,36 @@ impl ProposeRequest { impl ProposeResponse { /// Create an ok propose response - pub(crate) fn new_result( - leader_id: Option, - term: u64, - result: &Result, - ) -> Self { - let exe_result = match *result { - Ok(ref er) => Some(ExeResult::Result(CmdResult { + pub(crate) fn new_result(result: &Result) -> Self { + let result = match *result { + Ok(ref er) => Some(CmdResult { result: Some(CmdResultInner::Ok(er.encode())), - })), - Err(ref e) => Some(ExeResult::Result(CmdResult { + }), + Err(ref e) => Some(CmdResult { result: Some(CmdResultInner::Error(e.encode())), - })), + }), }; - Self { - leader_id, - term, - exe_result, - } + Self { result } } /// Create an empty propose response - #[allow(clippy::unnecessary_wraps)] // To keep the new functions return the same type - pub(crate) fn new_empty(leader_id: Option, term: u64) -> Self { - Self { - leader_id, - term, - exe_result: None, - } - } - - /// Create an error propose response - pub(crate) fn new_error(leader_id: Option, term: u64, error: ProposeError) -> Self { - Self { - leader_id, - term, - exe_result: Some(ExeResult::Error(error.into())), - } + pub(crate) fn new_empty() -> Self { + Self { result: None } } - /// Map response to functions `success` and `failure` - pub(crate) fn map_or_else( - &self, - success: SF, - failure: FF, - ) -> Result + /// Deserialize result in response and take a map function + pub(crate) fn map_result(self, f: F) -> Result where - SF: FnOnce(Option>) -> R, - FF: FnOnce(ProposeError) -> R, + F: FnOnce(Option>) -> R, { - match self.exe_result { - Some(ExeResult::Result(ref rv)) => { - let result = rv.result.as_ref().ok_or(PbSerializeError::EmptyField)?; - let cmd_result = match *result { - CmdResultInner::Ok(ref buf) => Ok(::ER::decode(buf)?), - CmdResultInner::Error(ref buf) => Err(::Error::decode(buf)?), - }; - Ok(success(Some(cmd_result))) - } - Some(ExeResult::Error(err)) => { - let propose_error = err.into(); - Ok(failure(propose_error)) - } - None => Ok(success(None)), - } + let Some(res) = self.result.and_then(|res| res.result) else { + return Ok(f(None)); + }; + let res = match res { + CmdResultInner::Ok(ref buf) => Ok(::ER::decode(buf)?), + CmdResultInner::Error(ref buf) => Err(::Error::decode(buf)?), + }; + Ok(f(Some(res))) } } @@ -217,7 +192,7 @@ impl WaitSyncedRequest { impl WaitSyncedResponse { /// Create a success response - pub(crate) fn new_success(asr: &C::ASR, er: &C::ER) -> Self { + fn new_success(asr: &C::ASR, er: &C::ER) -> Self { Self { after_sync_result: Some(CmdResult { result: Some(CmdResultInner::Ok(asr.encode())), @@ -229,7 +204,7 @@ impl WaitSyncedResponse { } /// Create an error response which includes an execution error - pub(crate) fn new_er_error(er: &C::Error) -> Self { + fn new_er_error(er: &C::Error) -> Self { Self { after_sync_result: None, exe_result: Some(CmdResult { @@ -239,7 +214,7 @@ impl WaitSyncedResponse { } /// Create an error response which includes an `after_sync` error - pub(crate) fn new_asr_error(er: &C::ER, asr_err: &C::Error) -> Self { + fn new_asr_error(er: &C::ER, asr_err: &C::Error) -> Self { Self { after_sync_result: Some(CmdResult { result: Some(CmdResultInner::Error(asr_err.encode())), @@ -264,55 +239,51 @@ impl WaitSyncedResponse { (Err(ref err), _) => WaitSyncedResponse::new_er_error::(err), } } -} -impl TryFrom for SyncResult { - type Error = PbSerializeError; - fn try_from(value: WaitSyncedResponse) -> Result { - let res = match (value.exe_result, value.after_sync_result) { - (None, _) => unreachable!("WaitSyncedResponse should contain a valid exe_result"), - (Some(er), None) => { - if let Some(CmdResultInner::Error(buf)) = er.result { - SyncResult::Error(::Error::decode(buf.as_slice())?) - } else { - unreachable!("asr should not be None") - } + /// Similar to `ProposeResponse::map_result` + pub(crate) fn map_result(self, f: F) -> Result + where + F: FnOnce(Result<(C::ASR, C::ER), C::Error>) -> R, + { + // according to the above methods, we can only get the following response union + // ER: Some(OK), ASR: Some(OK) <- WaitSyncedResponse::new_success + // ER: Some(Err), ASR: None <- WaitSyncedResponse::new_er_error + // ER: Some(OK), ASR: Some(Err) <- WaitSyncedResponse::new_asr_error + let res = match (self.exe_result, self.after_sync_result) { + ( + Some(CmdResult { + result: Some(CmdResultInner::Ok(ref er)), + }), + Some(CmdResult { + result: Some(CmdResultInner::Ok(ref asr)), + }), + ) => { + let er = ::ER::decode(er)?; + let asr = ::ASR::decode(asr)?; + Ok((asr, er)) } - (Some(er), Some(asr)) => { - let er = if let Some(CmdResultInner::Ok(er)) = er.result { - ::ER::decode(er.as_slice())? - } else { - unreachable!("asr should be None when execute failed") - }; - match asr.result { - Some(CmdResultInner::Ok(asr)) => SyncResult::Success { - asr: ::ASR::decode(asr.as_slice())?, - er, - }, - Some(CmdResultInner::Error(err)) => { - SyncResult::Error(::Error::decode(err.as_slice())?) - } - None => { - unreachable!("result of asr should not be None") - } - } + ( + Some(CmdResult { + result: Some(CmdResultInner::Error(ref buf)), + }), + None, + ) + | ( + Some(CmdResult { + result: Some(CmdResultInner::Ok(_)), + }), + Some(CmdResult { + result: Some(CmdResultInner::Error(ref buf)), + }), + ) => { + let er = ::Error::decode(buf.as_slice())?; + Err(er) } + _ => unreachable!("got unexpected WaitSyncedResponse"), }; - Ok(res) - } -} -/// Sync Result -pub(crate) enum SyncResult { - /// If sync succeeds, return asr and er - Success { - /// After Sync Result - asr: C::ASR, - /// Execution Result - er: C::ER, - }, - /// If sync fails, return `SyncError` - Error(C::Error), + Ok(f(res)) + } } impl AppendEntriesRequest { @@ -625,21 +596,6 @@ impl ShutdownRequest { } } -impl ConfChangeError { - /// Create a new `ConfChangeError` with `ProposeError` - pub(crate) fn new_propose(error: ProposeError) -> Self { - Self::Propose(error.into()) - } -} - -impl From for tonic::Status { - #[inline] - fn from(_err: ConfChangeError) -> Self { - // we'd better expose some err messages for client - tonic::Status::invalid_argument("") - } -} - impl PublishRequest { /// Create a new `PublishRequest` #[inline] @@ -664,3 +620,140 @@ impl PublishRequest { .into() } } + +impl CurpError { + /// `KeyConflict` error + pub(crate) fn key_conflict() -> Self { + Self::KeyConflict(()) + } + + /// `Duplicated` error + pub(crate) fn duplicated() -> Self { + Self::Duplicated(()) + } + + /// `ExpiredClientId` error + pub(crate) fn expired_client_id() -> Self { + Self::ExpiredClientId(()) + } + + /// `InvalidConfig` error + pub(crate) fn invalid_config() -> Self { + Self::InvalidConfig(()) + } + + /// `NodeNotExists` error + pub(crate) fn node_not_exist() -> Self { + Self::NodeNotExists(()) + } + + /// `NodeAlreadyExists` error + pub(crate) fn node_already_exists() -> Self { + Self::NodeAlreadyExists(()) + } + + /// `LearnerNotCatchUp` error + pub(crate) fn learner_not_catch_up() -> Self { + Self::LearnerNotCatchUp(()) + } + + /// `ShuttingDown` error + pub(crate) fn shuting_down() -> Self { + Self::ShuttingDown(()) + } + + /// `Duplicated` error + pub(crate) fn wrong_cluster_version() -> Self { + Self::WrongClusterVersion(()) + } + + /// `Redirect` error + pub(crate) fn redirect(leader_id: Option, term: u64) -> Self { + Self::Redirect(Redirect { leader_id, term }) + } + + /// `Internal` error + pub(crate) fn internal(reason: impl Into) -> Self { + Self::Internal(reason.into()) + } +} + +impl From for CurpError { + #[inline] + fn from(value: E) -> Self { + let err: &dyn std::error::Error = &value; + if let Some(status) = err.downcast_ref::() { + if status.code() == tonic::Code::Unavailable { + return Self::RpcTransport(()); + } + if !status.details().is_empty() { + return match CurpErrorWrapper::decode(status.details()) { + Ok(e) => e + .err + .unwrap_or_else(|| unreachable!("err must be set in CurpErrorWrapper")), + Err(dec_err) => Self::internal(dec_err.to_string()), + }; + } + } + // Errors that are not created manually by `CurpError::xxx()` are trivial, + // and errors that need to be known to the client are best created manually. + Self::internal(value.to_string()) + } +} + +impl From for tonic::Status { + #[inline] + fn from(err: CurpError) -> Self { + let (code, msg) = match err { + CurpError::KeyConflict(_) => ( + tonic::Code::AlreadyExists, + "Key conflict error: A key conflict occurred.", + ), + CurpError::Duplicated(_) => ( + tonic::Code::AlreadyExists, + "Duplicated error: The request already sent.", + ), + CurpError::ExpiredClientId(_) => ( + tonic::Code::FailedPrecondition, + "Expired client ID error: The client ID has expired, we cannot tell if this request is duplicated.", + ), + CurpError::InvalidConfig(_) => ( + tonic::Code::InvalidArgument, + "Invalid config error: The provided configuration is invalid.", + ), + CurpError::NodeNotExists(_) => ( + tonic::Code::NotFound, + "Node not found error: The specified node does not exist.", + ), + CurpError::NodeAlreadyExists(_) => ( + tonic::Code::AlreadyExists, + "Node already exists error: The node already exists.", + ), + CurpError::LearnerNotCatchUp(_) => ( + tonic::Code::FailedPrecondition, + "Learner not caught up error: The learner has not caught up.", + ), + CurpError::ShuttingDown(_) => ( + tonic::Code::Unavailable, + "Shutting down error: The service is currently shutting down.", + ), + CurpError::WrongClusterVersion(_) => ( + tonic::Code::FailedPrecondition, + "Wrong cluster version error: The cluster version is incorrect.", + ), + CurpError::Redirect(_) => ( + tonic::Code::ResourceExhausted, + "Redirect error: The request should be redirected to another node.", + ), + CurpError::Internal(_) => ( + tonic::Code::Internal, + "Internal error: An internal error occurred.", + ), + CurpError::RpcTransport(_) => (tonic::Code::Cancelled, "Rpc error: Request cancelled"), + }; + + let details = CurpErrorWrapper { err: Some(err) }.encode_to_vec(); + + tonic::Status::with_details(code, msg, details.into()) + } +} diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index c53151628..fc80bb2d5 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -1,19 +1,16 @@ -use std::{collections::HashMap, fmt::Debug, io, sync::Arc, time::Duration}; +use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; use clippy_utilities::NumericCast; -use curp_external_api::cmd::PbSerializeError; use engine::{SnapshotAllocator, SnapshotApi}; use event_listener::Event; use futures::{pin_mut, stream::FuturesUnordered, Stream, StreamExt}; use madsim::rand::{thread_rng, Rng}; use parking_lot::{Mutex, RwLock}; -use thiserror::Error; use tokio::{ sync::{broadcast, mpsc}, task::JoinHandle, time::MissedTickBehavior, }; -use tonic::{metadata::MetadataMap, Code, Status}; use tracing::{debug, error, info, trace, warn}; use utils::{ config::CurpConfig, @@ -26,214 +23,27 @@ use super::{ gc::run_gc_tasks, raw_curp::{AppendEntries, RawCurp, UncommittedPool, Vote}, spec_pool::{SpecPoolRef, SpeculativePool}, - storage::{StorageApi, StorageError}, + storage::StorageApi, }; use crate::{ cmd::{Command, CommandExecutor}, - error::ERROR_LABEL, log_entry::{EntryData, LogEntry}, members::{ClusterInfo, ServerId}, role_change::RoleChange, rpc::{ self, connect::{InnerConnectApi, InnerConnectApiWrapper}, - AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, - FetchReadStateRequest, FetchReadStateResponse, InstallSnapshotRequest, - InstallSnapshotResponse, ProposeConfChangeRequest, ProposeConfChangeResponse, - ProposeRequest, ProposeResponse, ShutdownRequest, ShutdownResponse, VoteRequest, - VoteResponse, WaitSyncedRequest, WaitSyncedResponse, + AppendEntriesRequest, AppendEntriesResponse, ConfChangeType, CurpError, + FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest, FetchReadStateResponse, + InstallSnapshotRequest, InstallSnapshotResponse, ProposeConfChangeRequest, + ProposeConfChangeResponse, ProposeRequest, ProposeResponse, PublishRequest, + PublishResponse, ShutdownRequest, ShutdownResponse, TriggerShutdownRequest, + TriggerShutdownResponse, VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, }, server::{cmd_worker::CEEventTxApi, raw_curp::SyncAction, storage::db::DB}, snapshot::{Snapshot, SnapshotMeta}, - ConfChangeType, PublishRequest, PublishResponse, TriggerShutdownRequest, - TriggerShutdownResponse, }; -/// Curp error -#[derive(Debug, Error)] -pub(super) enum CurpError { - /// Encode or decode error - #[error("encode or decode error")] - EncodeDecode(String), - /// Storage error - #[error("storage error, {0}")] - Storage(#[from] StorageError), - /// Transport error - #[error("transport error, {0}")] - Transport(String), - /// Io error - #[error("io error, {0}")] - IO(#[from] io::Error), - /// Currently, Internal Error includes the following three types of errors: - /// 1. failed to allocate a new snapshot - /// 2. failed to reset the command executor by snapshot - /// 3. failed to get last applied index from command executor. - #[error("internal error, {0}")] - Internal(String), - /// Curp Server is shutting down - #[error("cluster shutdown")] - ShuttingDown, - /// If client sent a wait synced request to a non-leader - #[error("redirect to {0:?}, term {1}")] - Redirect(Option, u64), - /// Wrong cluster version - #[error("wrong cluster version")] - WrongClusterVersion, -} - -impl From for CurpError { - fn from(err: bincode::Error) -> Self { - Self::EncodeDecode(err.to_string()) - } -} - -impl From for CurpError { - fn from(err: PbSerializeError) -> Self { - Self::EncodeDecode(err.to_string()) - } -} - -/// Generate metadata for `tonic::Status` -fn gen_metadata(label: &str) -> MetadataMap { - let mut meta = MetadataMap::new(); - _ = meta.insert( - ERROR_LABEL, - label.parse().unwrap_or_else(|e| { - unreachable!("convert a empty string to MetadataValue should always success: {e}") - }), - ); - meta -} - -impl From for Status { - #[inline] - fn from(err: CurpError) -> Self { - match err { - CurpError::EncodeDecode(msg) => { - let metadata = gen_metadata("encode-decode"); - Status::with_metadata(Code::Cancelled, msg, metadata) - } - CurpError::Internal(msg) => { - let metadata = gen_metadata("internal"); - Status::with_metadata(Code::Internal, msg, metadata) - } - CurpError::Storage(err) => { - let metadata = gen_metadata("storage"); - Status::with_metadata(Code::Internal, err.to_string(), metadata) - } - CurpError::IO(err) => { - let metadata = gen_metadata("io"); - Status::with_metadata(Code::Internal, err.to_string(), metadata) - } - CurpError::Transport(msg) => { - let metadata = gen_metadata("transport"); - Status::with_metadata(Code::Unavailable, msg, metadata) - } - CurpError::ShuttingDown => { - let metadata = gen_metadata("shutting-down"); - Status::with_metadata(Code::Unavailable, "CurpServer is shutting down", metadata) - } - CurpError::Redirect(leader_id, term) => { - let metadata = gen_metadata("redirect"); - let leader_term = (leader_id, term); - let mut details: Vec = Vec::new(); - serde_json::to_writer(&mut details, &leader_term).unwrap_or_else(|e| { - unreachable!("serialize a tuple (Option, u64) should always succeed: {e}") - }); - Status::with_details_and_metadata( - Code::FailedPrecondition, - "current node is not a leader", - details.into(), - metadata, - ) - } - CurpError::WrongClusterVersion => { - let metadata = gen_metadata("wrong-cluster-version"); - Status::with_metadata(Code::FailedPrecondition, "wrong cluster version", metadata) - } - } - } -} - -/// Internal error encountered when sending `append_entries` -#[derive(Debug, Error)] -enum SendAEError { - /// When self is no longer leader - #[error("self is no longer leader")] - NotLeader, - /// When the follower rejects - #[error("follower rejected")] - Rejected, - /// Transport - #[error("transport error, {0}")] - Transport(String), - /// RpcError - #[error("RPC error: {0}")] - RpcError(String), - /// Encode/Decode error - #[error("encode or decode error")] - EncodeDecode(String), -} - -impl From for SendAEError { - fn from(status: Status) -> Self { - #[allow(clippy::wildcard_enum_match_arm)] - // it's ok to do so since only three status can covert to `SendAEError` - let metadata = status.metadata(); - if let Some(label) = metadata.get(ERROR_LABEL) { - match label.to_str().unwrap_or_else(|err| { - unreachable!("error-label should be always able to convert to str: {err:?}") - }) { - "transport" => Self::Transport(status.message().to_owned()), - "redirect" => Self::NotLeader, - "encode-decode" => Self::EncodeDecode(status.message().to_owned()), - _ => Self::RpcError(status.message().to_owned()), - } - } else { - Self::Transport(status.message().to_owned()) - } - } -} - -impl From for SendAEError { - fn from(err: bincode::Error) -> Self { - Self::EncodeDecode(err.to_string()) - } -} - -/// Internal error encountered when sending snapshot -#[derive(Debug, Error)] -enum SendSnapshotError { - /// When self is no longer leader - #[error("self is no longer leader")] - NotLeader, - /// Transport - #[error("transport error, {0}")] - Transport(String), - /// Rpc error - #[error("RPC error: {0}")] - RpcError(String), -} - -impl From for SendSnapshotError { - fn from(status: Status) -> Self { - #[allow(clippy::wildcard_enum_match_arm)] - // it's ok to do so since `SendSnapshotError` only has two variants. - let metadata = status.metadata(); - if let Some(label) = metadata.get(ERROR_LABEL) { - match label.to_str().unwrap_or_else(|err| { - unreachable!("error-label should be always able to convert to str: {err:?}") - }) { - "transport" => Self::Transport(status.message().to_owned()), - "redirect" => Self::NotLeader, - _ => Self::RpcError(status.message().to_owned()), - } - } else { - Self::Transport(status.message().to_owned()) - } - } -} - /// `CurpNode` represents a single node of curp cluster pub(super) struct CurpNode { /// `RawCurp` state machine @@ -250,28 +60,26 @@ pub(super) struct CurpNode { snapshot_allocator: Box, } -// handlers +/// Handlers for clients impl CurpNode { /// Handle `Propose` requests pub(super) async fn propose(&self, req: ProposeRequest) -> Result { if self.curp.is_shutdown() { - return Err(CurpError::ShuttingDown); + return Err(CurpError::shuting_down()); } self.check_cluster_version(req.cluster_version)?; let cmd: Arc = Arc::new(req.cmd()?); // handle proposal - let ((leader_id, term), result) = self.curp.handle_propose(Arc::clone(&cmd))?; - let resp = match result { - Ok(true) => { - let er_res = CommandBoard::wait_for_er(&self.cmd_board, cmd.id()).await; - ProposeResponse::new_result::(leader_id, term, &er_res) - } - Ok(false) => ProposeResponse::new_empty(leader_id, term), - Err(err) => ProposeResponse::new_error(leader_id, term, err), - }; + let sp_exec = self.curp.handle_propose(Arc::clone(&cmd))?; - Ok(resp) + // if speculatively executed, wait for the result and return + if sp_exec { + let er_res = CommandBoard::wait_for_er(&self.cmd_board, cmd.id()).await; + return Ok(ProposeResponse::new_result::(&er_res)); + } + + Ok(ProposeResponse::new_empty()) } /// Handle `Shutdown` requests @@ -292,21 +100,65 @@ impl CurpNode { ) -> Result { self.check_cluster_version(req.cluster_version)?; let id = req.id(); - let ((leader_id, term), result) = self.curp.handle_propose_conf_change(req.into())?; - let error = match result { - Ok(()) => { - CommandBoard::wait_for_conf(&self.cmd_board, id).await; - None - } - Err(err) => Some(err), - }; + self.curp.handle_propose_conf_change(req.into())?; + CommandBoard::wait_for_conf(&self.cmd_board, id).await; let members = self.curp.cluster().all_members_vec(); - Ok(ProposeConfChangeResponse { - members, + Ok(ProposeConfChangeResponse { members }) + } + + /// handle `WaitSynced` requests + pub(super) async fn wait_synced( + &self, + req: WaitSyncedRequest, + ) -> Result { + if self.curp.is_shutdown() { + return Err(CurpError::shuting_down()); + } + self.check_cluster_version(req.cluster_version)?; + let id = req.propose_id(); + debug!("{} get wait synced request for cmd({id})", self.curp.id()); + + let (er, asr) = CommandBoard::wait_for_er_asr(&self.cmd_board, id).await; + debug!("{} wait synced for cmd({id}) finishes", self.curp.id()); + Ok(WaitSyncedResponse::new_from_result::(er, asr)) + } + + /// Handle `FetchCluster` requests + #[allow(clippy::unnecessary_wraps, clippy::needless_pass_by_value)] // To keep type consistent with other request handlers + pub(super) fn fetch_cluster( + &self, + req: FetchClusterRequest, + ) -> Result { + let (leader_id, term, is_leader) = self.curp.leader(); + let cluster_id = self.curp.cluster().cluster_id(); + let members = if is_leader || !req.linearizable { + self.curp.cluster().all_members_vec() + } else { + // if it is a follower and enabled linearizable read, return empty members + // the client will ignore empty members and retry util it gets response from + // the leader + Vec::new() + }; + let cluster_version = self.curp.cluster().cluster_version(); + Ok(FetchClusterResponse::new( leader_id, term, - error, - }) + cluster_id, + members, + cluster_version, + )) + } + + /// Handle `FetchReadState` requests + #[allow(clippy::needless_pass_by_value)] // To keep type consistent with other request handlers + pub(super) fn fetch_read_state( + &self, + req: FetchReadStateRequest, + ) -> Result { + self.check_cluster_version(req.cluster_version)?; + let cmd = req.cmd()?; + let state = self.curp.handle_fetch_read_state(&cmd); + Ok(FetchReadStateResponse::new(state)) } /// Handle `Publish` requests @@ -314,7 +166,10 @@ impl CurpNode { self.curp.handle_publish(req)?; Ok(PublishResponse::default()) } +} +/// Handlers for peers +impl CurpNode { /// Handle `AppendEntries` requests pub(super) fn append_entries( &self, @@ -381,54 +236,11 @@ impl CurpNode { TriggerShutdownResponse::default() } - /// handle `WaitSynced` requests - pub(super) async fn wait_synced( - &self, - req: WaitSyncedRequest, - ) -> Result { - if self.curp.is_shutdown() { - return Err(CurpError::ShuttingDown); - } - self.check_cluster_version(req.cluster_version)?; - let id = req.propose_id(); - debug!("{} get wait synced request for cmd({id})", self.curp.id()); - - let (er, asr) = CommandBoard::wait_for_er_asr(&self.cmd_board, id).await; - debug!("{} wait synced for cmd({id}) finishes", self.curp.id()); - Ok(WaitSyncedResponse::new_from_result::(er, asr)) - } - - /// Handle `FetchCluster` requests - #[allow(clippy::unnecessary_wraps, clippy::needless_pass_by_value)] // To keep type consistent with other request handlers - pub(super) fn fetch_cluster( - &self, - req: FetchClusterRequest, - ) -> Result { - let (leader_id, term, is_leader) = self.curp.leader(); - let cluster_id = self.curp.cluster().cluster_id(); - let members = if is_leader || !req.linearizable { - self.curp.cluster().all_members_vec() - } else { - // if it is a follower and enabled linearizable read, return empty members - // the client will ignore empty members and retry util it gets response from - // the leader - Vec::new() - }; - let cluster_version = self.curp.cluster().cluster_version(); - Ok(FetchClusterResponse::new( - leader_id, - term, - cluster_id, - members, - cluster_version, - )) - } - /// Handle `InstallSnapshot` stream #[allow(clippy::integer_arithmetic)] // can't overflow - pub(super) async fn install_snapshot( + pub(super) async fn install_snapshot( &self, - req_stream: impl Stream>, + req_stream: impl Stream>, ) -> Result { pin_mut!(req_stream); let mut snapshot = self @@ -436,14 +248,11 @@ impl CurpNode { .allocate_new_snapshot() .await .map_err(|err| { - error!("failed to allocate a new snapshot, {err:?}"); - CurpError::Internal("failed to allocate a new snapshot".to_owned()) + error!("failed to allocate a new snapshot, error: {err}"); + CurpError::internal(format!("failed to allocate a new snapshot, error: {err}")) })?; while let Some(req) = req_stream.next().await { - let req = req.map_err(|e| { - warn!("snapshot stream error, {e}"); - CurpError::Transport(e) - })?; + let req = req?; if !self.curp.verify_install_snapshot( req.term, req.leader_id, @@ -476,31 +285,18 @@ impl CurpNode { .send_reset(Some(snapshot)) .await .map_err(|err| { - let err = CurpError::Internal(format!( + error!("failed to reset the command executor by snapshot, {err}"); + CurpError::internal(format!( "failed to reset the command executor by snapshot, {err}" - )); - error!("{err}"); - err + )) })?; return Ok(InstallSnapshotResponse::new(self.curp.term())); } } - Err(CurpError::Transport( + Err(CurpError::internal( "failed to receive a complete snapshot".to_owned(), )) } - - /// Handle `FetchReadState` requests - #[allow(clippy::needless_pass_by_value)] // To keep type consistent with other request handlers - pub(super) fn fetch_read_state( - &self, - req: FetchReadStateRequest, - ) -> Result { - self.check_cluster_version(req.cluster_version)?; - let cmd = req.cmd()?; - let state = self.curp.handle_fetch_read_state(&cmd); - Ok(FetchReadStateResponse::new(state)) - } } /// Spawned tasks @@ -656,7 +452,7 @@ impl CurpNode { } ConfChangeType::Update =>{ if let Err(e) = curp.update_connect(change.node_id,change.address).await { - error!("update connect {} failed, {}", change.node_id, e); + error!("update connect {} failed, err {:?}", change.node_id, e); continue; } } @@ -731,49 +527,51 @@ impl CurpNode { // (true, empty) => indicates that `batch_timeout` expired, and during this period there is not any log generated. Do nothing // (true | false, not empty) => send append entries if !hb_opt || !is_empty { - let result = Self::send_ae(connect.as_ref(), curp.as_ref(), ae).await; - if let Err(err) = result { - warn!("ae to {} failed, {err}", connect_id); - if matches!(err, SendAEError::NotLeader) { - break true; - } - if is_shutdown_state { - ae_fail_count += 1; - if ae_fail_count >= 5 { - warn!("the follower {} may have been shutdown", connect_id); + match Self::send_ae(connect.as_ref(), curp.as_ref(), ae).await { + Ok((true, _)) => break true, + Ok((false, ae_succeed)) => { + if ae_succeed { + hb_opt = true; + } else { + debug!("ae rejected by {}", connect.id()); + } + // Check Follower shutdown + // When the leader is in the shutdown state, its last log must be shutdown, and if the follower is + // already synced with leader and current AE is a heartbeat, then the follower will commit the shutdown + // log after AE, or when the follower is not synced with the leader, the current AE will send and directly commit + // shutdown log. + if is_shutdown_state + && ((curp.is_synced(connect_id) && is_empty) + || (!curp.is_synced(connect_id) && is_commit_shutdown)) + { + if let Err(e) = connect.trigger_shutdown().await { + warn!("trigger shutdown to {} failed, {e}", connect_id); + } else { + debug!("trigger shutdown to {} success", connect_id); + } break false; } } - } else { - hb_opt = true; - } - // Check Follower shutdown - // When the leader is in the shutdown state, its last log must be shutdown, and if the follower is - // already synced with leader and current AE is a heartbeat, then the follower will commit the shutdown - // log after AE, or when the follower is not synced with the leader, the current AE will send and directly commit - // shutdown log. - if is_shutdown_state - && ((curp.is_synced(connect_id) && is_empty) - || (!curp.is_synced(connect_id) && is_commit_shutdown)) - { - if let Err(e) = connect.trigger_shutdown().await { - warn!("trigger shutdown to {} failed, {e}", connect_id); - } else { - debug!("trigger shutdown to {} success", connect_id); + Err(err) => { + warn!("ae to {} failed, {err:?}", connect.id()); + if is_shutdown_state { + ae_fail_count += 1; + if ae_fail_count >= 5 { + // ae_fail_count = 0; RESET? + warn!("the follower {} may have been shutdown", connect_id); + break false; + } + } } - break false; - } + }; } } SyncAction::Snapshot(rx) => match rx.await { Ok(snapshot) => { - let result = - Self::send_snapshot(connect.as_ref(), curp.as_ref(), snapshot).await; - if let Err(err) = result { - warn!("snapshot to {} failed, {err}", connect.id()); - if matches!(err, SendSnapshotError::NotLeader) { - break true; - } + match Self::send_snapshot(connect.as_ref(), curp.as_ref(), snapshot).await { + Ok(true) => break true, + Err(err) => warn!("snapshot to {} failed, {err:?}", connect.id()), + Ok(false) => {} } } Err(err) => { @@ -824,7 +622,7 @@ impl CurpNode { .collect(); let connects = rpc::inner_connects(cluster_info.peers_addrs()) .await - .map_err(|e| CurpError::Internal(format!("parse peers addresses failed, err {e:?}")))? + .map_err(|e| CurpError::internal(format!("parse peers addresses failed, err {e:?}")))? .collect(); let (log_tx, log_rx) = mpsc::unbounded_channel(); let cmd_board = Arc::new(RwLock::new(CommandBoard::new())); @@ -832,7 +630,7 @@ impl CurpNode { let uncommitted_pool = Arc::new(Mutex::new(UncommittedPool::new())); let last_applied = cmd_executor .last_applied() - .map_err(|e| CurpError::Internal(format!("get applied index error, {e}")))?; + .map_err(|e| CurpError::internal(format!("get applied index error, {e}")))?; let (ce_event_tx, task_rx, done_tx) = conflict_checked_mpmc::channel(Arc::clone(&cmd_executor), shutdown_trigger.clone()); let ce_event_tx: Arc> = Arc::new(ce_event_tx); @@ -994,12 +792,14 @@ impl CurpNode { } /// Send `append_entries` request + /// Return `tonic::Error` if meet network issue + /// Return (`leader_retires`, `ae_succeed`) #[allow(clippy::integer_arithmetic)] // won't overflow async fn send_ae( connect: &(impl InnerConnectApi + ?Sized), curp: &RawCurp, ae: AppendEntries, - ) -> Result<(), SendAEError> { + ) -> Result<(bool, bool), CurpError> { let last_sent_index = (!ae.entries.is_empty()) .then(|| ae.prev_log_index + ae.entries.len().numeric_cast::()); let is_heartbeat = ae.entries.is_empty(); @@ -1023,36 +823,35 @@ impl CurpNode { .await? .into_inner(); - let succeeded = curp - .handle_append_entries_resp( - connect.id(), - last_sent_index, - resp.term, - resp.success, - resp.hint_index, - ) - .map_err(|_e| SendAEError::NotLeader)?; + let Ok(ae_succeed) = curp.handle_append_entries_resp( + connect.id(), + last_sent_index, + resp.term, + resp.success, + resp.hint_index, + ) else { + return Ok((true, false)); + }; - if succeeded { - Ok(()) - } else { - Err(SendAEError::Rejected) - } + Ok((false, ae_succeed)) } /// Send snapshot + /// Return `tonic::Error` if meet network issue + /// Return `leader_retires` async fn send_snapshot( connect: &(impl InnerConnectApi + ?Sized), curp: &RawCurp, snapshot: Snapshot, - ) -> Result<(), SendSnapshotError> { + ) -> Result { let meta = snapshot.meta; let resp = connect .install_snapshot(curp.term(), curp.id(), snapshot) .await? .into_inner(); - curp.handle_snapshot_resp(connect.id(), meta, resp.term) - .map_err(|_e| SendSnapshotError::NotLeader) + Ok(curp + .handle_snapshot_resp(connect.id(), meta, resp.term) + .is_err()) } /// Get a shutdown listener @@ -1068,7 +867,7 @@ impl CurpNode { "client cluster version({}) and server cluster version({}) not match", client_cluster_version, server_cluster_version ); - return Err(CurpError::WrongClusterVersion); + return Err(CurpError::wrong_cluster_version()); } Ok(()) } @@ -1086,8 +885,6 @@ impl Debug for CurpNode { #[cfg(test)] mod tests { - use std::io::{Error, ErrorKind}; - use curp_test_utils::{mock_role_change, sleep_secs, test_cmd::TestCommand}; use tracing_test::traced_test; @@ -1096,15 +893,6 @@ mod tests { rpc::connect::MockInnerConnectApi, server::cmd_worker::MockCEEventTxApi, ConfChange, }; - fn get_error_label(status: &Status) -> &str { - let metadata = status.metadata(); - metadata - .get(ERROR_LABEL) - .expect("error-label should not be None in CurpError") - .to_str() - .expect("error-label must be construct by ascii char") - } - #[traced_test] #[tokio::test] async fn sync_task_will_send_hb() { @@ -1230,48 +1018,4 @@ mod tests { assert!(curp.is_leader()); curp.shutdown_trigger().self_shutdown_and_wait().await; } - - #[test] - fn curp_error_convert_to_tonic_status_should_success() { - let encode_decode = CurpError::EncodeDecode("CurpError::EncodeDecode".to_owned()); - let status: Status = encode_decode.into(); - assert_eq!("encode-decode", get_error_label(&status)); - - let internal = CurpError::Internal("CurpError::Internal".to_owned()); - let status: Status = internal.into(); - assert_eq!("internal", get_error_label(&status)); - - let transport = CurpError::Transport("CurpError::Transport".to_owned()); - let status: Status = transport.into(); - assert_eq!("transport", get_error_label(&status)); - - let shutdown = CurpError::ShuttingDown; - let status: Status = shutdown.into(); - assert_eq!("shutting-down", get_error_label(&status)); - - let redirect_1 = CurpError::Redirect(Some(1), 2); - let status: Status = redirect_1.into(); - assert_eq!("redirect", get_error_label(&status)); - let (leader_id, term): (Option, u64) = serde_json::from_slice(status.details()) - .expect(" deserialize (leader_id, term) from status' detail should always success"); - assert_eq!(leader_id, Some(1)); - assert_eq!(term, 2); - - let redirect_2 = CurpError::Redirect(None, 2); - let status: Status = redirect_2.into(); - assert_eq!("redirect", get_error_label(&status)); - let (leader_id, term): (Option, u64) = serde_json::from_slice(status.details()) - .expect(" deserialize (leader_id, term) from status' detail should always success"); - assert_eq!(leader_id, None); - assert_eq!(term, 2); - - let io = CurpError::IO(Error::new(ErrorKind::Other, "oh no!")); - let status: Status = io.into(); - assert_eq!("io", get_error_label(&status)); - - let bincode_err = Box::new(bincode::ErrorKind::Custom("StorageError".to_owned())); - let storage = CurpError::Storage(bincode_err.into()); - let status: Status = storage.into(); - assert_eq!("storage", get_error_label(&status)); - } } diff --git a/curp/src/server/mod.rs b/curp/src/server/mod.rs index 78277b1d4..9eb2bfcd6 100644 --- a/curp/src/server/mod.rs +++ b/curp/src/server/mod.rs @@ -2,7 +2,6 @@ use std::{fmt::Debug, sync::Arc}; use curp_external_api::cmd::{ConflictCheck, ProposeId}; use engine::SnapshotAllocator; -use futures::TryStreamExt; use serde::{Deserialize, Serialize}; #[cfg(not(madsim))] use tokio::net::TcpListener; @@ -14,7 +13,7 @@ use tracing::info; use tracing::instrument; use utils::{config::CurpConfig, shutdown, tracing::Extract}; -use self::curp_node::{CurpError, CurpNode}; +use self::curp_node::CurpNode; use crate::{ cmd::{Command, CommandExecutor}, error::ServerError, @@ -153,19 +152,6 @@ impl crate::rpc::InnerProtocol f )) } - #[instrument(skip_all, name = "curp_install_snapshot")] - async fn install_snapshot( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status> { - let req_stream = request - .into_inner() - .map_err(|e| format!("snapshot transmission failed at client side, {e}")); - Ok(tonic::Response::new( - self.inner.install_snapshot(req_stream).await?, - )) - } - #[instrument(skip_all, name = "curp_vote")] async fn vote( &self, @@ -185,6 +171,17 @@ impl crate::rpc::InnerProtocol f self.inner.trigger_shutdown(request.get_ref()), )) } + + #[instrument(skip_all, name = "curp_install_snapshot")] + async fn install_snapshot( + &self, + request: tonic::Request>, + ) -> Result, tonic::Status> { + let req_stream = request.into_inner(); + Ok(tonic::Response::new( + self.inner.install_snapshot(req_stream).await?, + )) + } } impl Rpc { @@ -216,7 +213,7 @@ impl Rpc { { Ok(n) => n, Err(err) => { - panic!("failed to create curp service, {err}"); + panic!("failed to create curp service, {err:?}"); } }; diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index 6e64d7868..09530a36e 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -41,7 +41,7 @@ use self::{ log::Log, state::{CandidateState, LeaderState, State}, }; -use super::{cmd_worker::CEEventTxApi, CurpError, PoolEntry}; +use super::{cmd_worker::CEEventTxApi, PoolEntry}; use crate::{ cmd::{Command, ProposeId}, connect::InnerConnectApi, @@ -49,8 +49,8 @@ use crate::{ members::{ClusterInfo, ServerId}, role_change::RoleChange, rpc::{ - connect::InnerConnectApiWrapper, ConfChange, ConfChangeEntry, ConfChangeError, - ConfChangeType, IdSet, Member, ProposeError, ReadState, + connect::InnerConnectApiWrapper, ConfChange, ConfChangeEntry, ConfChangeType, CurpError, + IdSet, Member, ReadState, }, server::{ cmd_board::CmdBoardRef, @@ -234,32 +234,24 @@ impl RawCurp { // Curp handlers impl RawCurp { /// Handle `propose` request - /// Return `((leader_id, term), Ok(spec_executed))` if the proposal succeeds, `Ok(true)` if leader speculatively executed the command - /// Return `((leader_id, term), Err(ProposeError))` if the cmd cannot be speculatively executed or is duplicated - #[allow(clippy::type_complexity)] // it's clear - pub(super) fn handle_propose( - &self, - cmd: Arc, - ) -> Result<((Option, u64), Result), CurpError> { + /// Return `true` if the leader speculatively executed the command + pub(super) fn handle_propose(&self, cmd: Arc) -> Result { debug!("{} gets proposal for cmd({})", self.id(), cmd.id()); let mut conflict = self.insert_sp(Arc::clone(&cmd)); let st_r = self.st.read(); - let info = (st_r.leader_id, st_r.term); // Non-leader doesn't need to sync or execute if st_r.role != Role::Leader { - return Ok(( - info, - if conflict { - Err(ProposeError::KeyConflict) - } else { - Ok(false) - }, - )); + if conflict { + return Err(CurpError::key_conflict()); + } + return Ok(false); } + let id = cmd.id(); if !self.ctx.cb.map_write(|mut cb_w| cb_w.sync.insert(id)) { - return Ok((info, Err(ProposeError::Duplicated))); + return Err(CurpError::duplicated()); } + // leader also needs to check if the cmd conflicts un-synced commands conflict |= self.insert_ucp(Arc::clone(&cmd)); let mut log_w = self.log.write(); @@ -268,22 +260,20 @@ impl RawCurp { self.entry_process(&mut log_w, entry, conflict); - Ok(( - info, - if conflict { - Err(ProposeError::KeyConflict) - } else { - Ok(true) - }, - )) + if conflict { + return Err(CurpError::key_conflict()); + } + + Ok(true) } /// Handle `shutdown` request pub(super) fn handle_shutdown(&self, propose_id: ProposeId) -> Result<(), CurpError> { let st_r = self.st.read(); if st_r.role != Role::Leader { - return Err(CurpError::Redirect(st_r.leader_id, st_r.term)); + return Err(CurpError::redirect(st_r.leader_id, st_r.term)); } + let mut log_w = self.log.write(); let entry = log_w.push(st_r.term, EntryData::Shutdown(propose_id))?; debug!("{} gets new log[{}]", self.id(), entry.index); @@ -292,36 +282,27 @@ impl RawCurp { } /// Handle `propose_conf_change` request - #[allow(clippy::type_complexity)] // it's clear that the `CurpError` is an out-of-bound error pub(super) fn handle_propose_conf_change( &self, conf_change: ConfChangeEntry, - ) -> Result<((Option, u64), Result<(), ConfChangeError>), CurpError> { + ) -> Result<(), CurpError> { debug!( "{} gets conf change for with id {}", self.id(), conf_change.id() ); let st_r = self.st.read(); - let info = (st_r.leader_id, st_r.term); // Non-leader doesn't need to sync or execute if st_r.role != Role::Leader { - return Err(CurpError::Redirect(st_r.leader_id, st_r.term)); - } - if let Err(e) = self.check_new_config(conf_change.changes()) { - return Ok((info, Err(e))); - } - let pool_entry = PoolEntry::from(conf_change.clone()); - let mut conflict = self.insert_sp(pool_entry.clone()); - conflict |= self.insert_ucp(pool_entry); - let id = conf_change.id(); - if !self.ctx.cb.map_write(|mut cb_w| cb_w.sync.insert(id)) { - return Ok(( - info, - Err(ConfChangeError::new_propose(ProposeError::Duplicated)), - )); + return Err(CurpError::redirect(st_r.leader_id, st_r.term)); } + + self.check_new_config(conf_change.changes())?; + + let mut conflict = self.insert_sp(conf_change.clone()); + conflict |= self.insert_ucp(conf_change.clone()); + let changes = conf_change.changes().to_owned(); let mut log_w = self.log.write(); let entry = log_w.push(st_r.term, conf_change)?; @@ -335,7 +316,7 @@ impl RawCurp { FallbackContext::new(Arc::clone(&entry), addrs, name, is_learner), ); self.entry_process(&mut log_w, entry, conflict); - Ok((info, Ok(()))) + Ok(()) } /// Handle `publish` request @@ -343,7 +324,7 @@ impl RawCurp { debug!("{} gets publish with propose id {}", self.id(), req.id()); let st_r = self.st.read(); if st_r.role != Role::Leader { - return Err(CurpError::Redirect(st_r.leader_id, st_r.term)); + return Err(CurpError::redirect(st_r.leader_id, st_r.term)); } let mut log_w = self.log.write(); let entry = log_w.push(st_r.term, req)?; @@ -1039,7 +1020,7 @@ impl RawCurp { } /// Check if the new config is valid - pub(super) fn check_new_config(&self, changes: &[ConfChange]) -> Result<(), ConfChangeError> { + pub(super) fn check_new_config(&self, changes: &[ConfChange]) -> Result<(), CurpError> { assert_eq!(changes.len(), 1, "Joint consensus is not supported yet"); let Some(conf_change) = changes.iter().next() else { unreachable!("conf change is empty"); @@ -1056,27 +1037,27 @@ impl RawCurp { match conf_change.change_type() { ConfChangeType::Add => { if !statuses_ids.insert(node_id) || !config.insert(node_id, false) { - return Err(ConfChangeError::NodeAlreadyExists(())); + return Err(CurpError::node_already_exists()); } } ConfChangeType::Remove => { if !statuses_ids.remove(&node_id) || !config.remove(node_id) { - return Err(ConfChangeError::NodeNotExists(())); + return Err(CurpError::node_not_exist()); } } ConfChangeType::Update => { if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) { - return Err(ConfChangeError::NodeNotExists(())); + return Err(CurpError::node_not_exist()); } } ConfChangeType::AddLearner => { if !statuses_ids.insert(node_id) || !config.insert(node_id, true) { - return Err(ConfChangeError::NodeAlreadyExists(())); + return Err(CurpError::node_already_exists()); } } ConfChangeType::Promote => { if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) { - return Err(ConfChangeError::NodeNotExists(())); + return Err(CurpError::node_not_exist()); } let learner_index = self .lst @@ -1084,7 +1065,7 @@ impl RawCurp { .unwrap_or_else(|| unreachable!("learner should exist here")); let leader_index = self.log.read().last_log_index(); if leader_index.overflow_sub(learner_index) > MAX_PROMOTE_GAP { - return Err(ConfChangeError::LearnerNotCatchUp(())); + return Err(CurpError::learner_not_catch_up()); } } } @@ -1095,7 +1076,7 @@ impl RawCurp { || all_nodes != statuses_ids || !config.voters().is_disjoint(&config.learners) { - return Err(ConfChangeError::InvalidConfig(())); + return Err(CurpError::invalid_config()); } Ok(()) } @@ -1194,10 +1175,7 @@ impl RawCurp { addrs: Vec, ) -> Result<(), CurpError> { match self.ctx.connects.get(&id) { - Some(connect) => connect - .update_addrs(addrs) - .await - .map_err(|err| CurpError::Transport(err.to_string())), + Some(connect) => Ok(connect.update_addrs(addrs).await?), None => Ok(()), } } diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index d9739836f..412b8d052 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -21,7 +21,7 @@ use crate::{ raw_curp::UncommittedPool, spec_pool::SpeculativePool, }, - LogIndex, ProposeConfChangeRequest, + LogIndex, ProposeConfChangeRequest, Redirect, }; // Hooks for tests @@ -128,10 +128,7 @@ fn leader_handle_propose_will_succeed() { RawCurp::new_test(3, exe_tx, mock_role_change()) }; let cmd = Arc::new(TestCommand::default()); - let ((leader_id, term), result) = curp.handle_propose(cmd).unwrap(); - assert_eq!(leader_id, Some(curp.id().clone())); - assert_eq!(term, 1); - assert!(matches!(result, Ok(true))); + assert!(curp.handle_propose(cmd).unwrap()); } #[traced_test] @@ -144,23 +141,16 @@ fn leader_handle_propose_will_reject_conflicted() { }; let cmd1 = Arc::new(TestCommand::new_put(vec![1], 0)); - let ((leader_id, term), result) = curp.handle_propose(cmd1).unwrap(); - assert_eq!(leader_id, Some(curp.id().clone())); - assert_eq!(term, 1); - assert!(matches!(result, Ok(true))); + assert!(curp.handle_propose(cmd1).unwrap()); let cmd2 = Arc::new(TestCommand::new_put(vec![1, 2], 1)); - let ((leader_id, term), result) = curp.handle_propose(cmd2).unwrap(); - assert_eq!(leader_id, Some(curp.id().clone())); - assert_eq!(term, 1); - assert!(matches!(result, Err(ProposeError::KeyConflict))); + let res = curp.handle_propose(cmd2); + assert!(matches!(res, Err(CurpError::KeyConflict(_)))); // leader will also reject cmds that conflict un-synced cmds let cmd3 = Arc::new(TestCommand::new_put(vec![2], 1)); - let ((leader_id, term), result) = curp.handle_propose(cmd3).unwrap(); - assert_eq!(leader_id, Some(curp.id().clone())); - assert_eq!(term, 1); - assert!(matches!(result, Err(ProposeError::KeyConflict))); + let res = curp.handle_propose(cmd3); + assert!(matches!(res, Err(CurpError::KeyConflict(_)))); } #[traced_test] @@ -172,15 +162,10 @@ fn leader_handle_propose_will_reject_duplicated() { RawCurp::new_test(3, exe_tx, mock_role_change()) }; let cmd = Arc::new(TestCommand::default()); - let ((leader_id, term), result) = curp.handle_propose(Arc::clone(&cmd)).unwrap(); - assert_eq!(leader_id, Some(curp.id().clone())); - assert_eq!(term, 1); - assert!(matches!(result, Ok(true))); + assert!(curp.handle_propose(Arc::clone(&cmd)).unwrap()); - let ((leader_id, term), result) = curp.handle_propose(cmd).unwrap(); - assert_eq!(leader_id, Some(curp.id().clone())); - assert_eq!(term, 1); - assert!(matches!(result, Err(ProposeError::Duplicated))); + let res = curp.handle_propose(cmd); + assert!(matches!(res, Err(CurpError::Duplicated(_)))); } #[traced_test] @@ -195,10 +180,7 @@ fn follower_handle_propose_will_succeed() { }; curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1); let cmd = Arc::new(TestCommand::new_get(vec![1])); - let ((leader_id, term), result) = curp.handle_propose(cmd).unwrap(); - assert_eq!(leader_id, None); - assert_eq!(term, 1); - assert!(matches!(result, Ok(false))); + assert!(!curp.handle_propose(cmd).unwrap()); } #[traced_test] @@ -214,16 +196,11 @@ fn follower_handle_propose_will_reject_conflicted() { curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1); let cmd1 = Arc::new(TestCommand::new_get(vec![1])); - let ((leader_id, term), result) = curp.handle_propose(cmd1).unwrap(); - assert_eq!(leader_id, None); - assert_eq!(term, 1); - assert!(matches!(result, Ok(false))); + assert!(!curp.handle_propose(cmd1).unwrap()); let cmd2 = Arc::new(TestCommand::new_get(vec![1])); - let ((leader_id, term), result) = curp.handle_propose(cmd2).unwrap(); - assert_eq!(leader_id, None); - assert_eq!(term, 1); - assert!(matches!(result, Err(ProposeError::KeyConflict))); + let res = curp.handle_propose(cmd2); + assert!(matches!(res, Err(CurpError::KeyConflict(_)))); } /*************** tests for append_entries(heartbeat) **************/ @@ -715,8 +692,13 @@ fn follower_handle_shutdown_will_reject() { }; curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1); let res = curp.handle_shutdown(next_id()); - assert!(res.is_err()); - assert!(matches!(res, Err(CurpError::Redirect(None, 1)))); + assert!(matches!( + res, + Err(CurpError::Redirect(Redirect { + leader_id: None, + term: 1, + })) + )); } #[traced_test] @@ -813,7 +795,7 @@ fn add_exists_node_should_return_node_already_exists_error() { vec!["http://127.0.0.1:4567".to_owned()], )]; let resp = curp.check_new_config(&changes); - let error_match = matches!(resp, Err(ConfChangeError::NodeAlreadyExists(()))); + let error_match = matches!(resp, Err(CurpError::NodeAlreadyExists(()))); assert!(error_match); } @@ -853,7 +835,7 @@ fn remove_non_exists_node_should_return_node_not_exists_error() { }; let changes = vec![ConfChange::remove(1)]; let resp = curp.check_new_config(&changes); - assert!(matches!(resp, Err(ConfChangeError::NodeNotExists(())))); + assert!(matches!(resp, Err(CurpError::NodeNotExists(())))); } #[traced_test] @@ -866,7 +848,7 @@ fn remove_node_should_return_invalid_config_error_when_nodes_count_less_than_3() let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); let changes = vec![ConfChange::remove(follower_id)]; let resp = curp.check_new_config(&changes); - assert!(matches!(resp, Err(ConfChangeError::InvalidConfig(())))); + assert!(matches!(resp, Err(CurpError::InvalidConfig(())))); } #[traced_test] @@ -930,12 +912,8 @@ fn leader_handle_propose_conf_change() { vec!["http://127.0.0.1:4567".to_owned()], )]; let conf_change_entry = ProposeConfChangeRequest::new(ProposeId(0, 0), changes, 0); - let ((leader, term), result) = curp - .handle_propose_conf_change(conf_change_entry.into()) + curp.handle_propose_conf_change(conf_change_entry.into()) .unwrap(); - assert_eq!(leader, Some(curp.id().clone())); - assert_eq!(term, 1); - assert!(result.is_ok()); } #[traced_test] @@ -958,5 +936,11 @@ fn follower_handle_propose_conf_change() { )]; let conf_change_entry = ProposeConfChangeRequest::new(ProposeId(0, 0), changes, 0); let result = curp.handle_propose_conf_change(conf_change_entry.into()); - assert!(matches!(result, Err(CurpError::Redirect(None, 2)))); + assert!(matches!( + result, + Err(CurpError::Redirect(Redirect { + leader_id: None, + term: 2, + })) + )); } diff --git a/curp/tests/it/server.rs b/curp/tests/it/server.rs index e29330ee0..c9879b9ec 100644 --- a/curp/tests/it/server.rs +++ b/curp/tests/it/server.rs @@ -7,7 +7,7 @@ use curp::{ client::{Builder, Client}, error::ClientError, members::ClusterInfo, - ConfChange, ConfChangeError, + ConfChange, CurpError, }; use curp_test_utils::{ init_logger, sleep_millis, sleep_secs, @@ -18,10 +18,7 @@ use test_macros::abort_on_panic; use tokio::net::TcpListener; use utils::{config::ClientConfig, timestamp}; -use crate::common::curp_group::{ - commandpb::propose_response::ExeResult, CurpGroup, FetchClusterRequest, ProposeRequest, - ProposeResponse, -}; +use crate::common::curp_group::{CurpGroup, FetchClusterRequest, ProposeRequest, ProposeResponse}; #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] @@ -162,7 +159,7 @@ async fn fast_round_is_slower_than_slow_round() { .await .unwrap() .into_inner(); - assert!(resp.exe_result.is_none()); + assert!(resp.result.is_none()); } #[tokio::test(flavor = "multi_thread")] @@ -194,19 +191,15 @@ async fn concurrent_cmd_order() { command: bincode::serialize(&cmd1).unwrap(), cluster_version: 0, }) - .await - .expect("propose failed") - .into_inner(); - assert!(matches!(response.exe_result.unwrap(), ExeResult::Error(_))); + .await; + assert!(response.is_err()); let response = leader_connect .propose(ProposeRequest { command: bincode::serialize(&cmd2).unwrap(), cluster_version: 0, }) - .await - .expect("propose failed") - .into_inner(); - assert!(matches!(response.exe_result.unwrap(), ExeResult::Error(_))); + .await; + assert!(response.is_err()); sleep_secs(1).await; @@ -398,7 +391,7 @@ async fn propose_remove_node_should_failed_when_cluster_nodes_equals_to_three() let node_id = group.nodes.keys().next().copied().unwrap(); let changes = vec![ConfChange::remove(node_id)]; let res = client.propose_conf_change(id, changes).await.unwrap(); - assert!(matches!(res, Err(ConfChangeError::InvalidConfig(())))); + assert!(matches!(res, Err(CurpError::InvalidConfig(())))); } #[tokio::test(flavor = "multi_thread")]