From e3cdf3425f6fa37808ec94d5bfe3b35494e1e4d1 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Wed, 11 Oct 2023 11:46:30 +0800 Subject: [PATCH] refactor: refactor: refactor the error-handling logic in curp module and its client Refs: #463 Signed-off-by: Phoeniix Zhao --- Cargo.lock | 2 + benchmark/src/bench_client.rs | 5 +- curp/src/client.rs | 186 +++++++------- curp/src/error.rs | 231 +++--------------- curp/src/rpc/connect.rs | 59 +++-- curp/src/rpc/mod.rs | 126 +++++----- curp/src/server/curp_node.rs | 88 +++++-- curp/src/server/mod.rs | 7 - curp/src/server/raw_curp/mod.rs | 45 ++-- curp/src/server/raw_curp/tests.rs | 30 +-- curp/tests/server.rs | 10 +- xline-client/Cargo.toml | 1 + xline-client/examples/auth.rs | 3 +- xline-client/examples/auth_role.rs | 2 +- xline-client/examples/auth_user.rs | 2 +- xline-client/examples/error_handling.rs | 18 +- xline-client/examples/kv.rs | 4 +- xline-client/examples/lease.rs | 2 +- xline-client/examples/lock.rs | 2 +- xline-client/examples/maintenance.rs | 5 +- xline-client/examples/watch.rs | 4 +- xline-client/src/clients/auth.rs | 18 +- xline-client/src/clients/lease.rs | 6 +- xline-client/src/clients/lock.rs | 4 +- xline-client/src/clients/maintenance.rs | 12 +- xline-client/src/clients/watch.rs | 6 +- xline-client/src/error.rs | 105 +++++++- xline-client/src/lib.rs | 26 +- xline-client/src/types/lease.rs | 4 +- xline-client/src/types/watch.rs | 10 +- xline-client/tests/auth.rs | 8 +- xline-client/tests/common.rs | 4 +- xline-client/tests/kv.rs | 10 +- xline-client/tests/lease.rs | 8 +- xline-client/tests/lock.rs | 8 +- xline-client/tests/maintenance.rs | 2 +- xline-client/tests/watch.rs | 2 +- xline-test-utils/Cargo.toml | 1 + .../src/bin/validation_lock_client.rs | 2 +- xline/src/server/auth_server.rs | 6 +- xline/src/server/command.rs | 19 +- xline/src/server/kv_server.rs | 10 +- xline/src/server/lease_server.rs | 10 +- xline/src/server/lock_server.rs | 6 +- xline/src/storage/compact/mod.rs | 6 +- .../src/storage/compact/periodic_compactor.rs | 8 +- .../src/storage/compact/revision_compactor.rs | 8 +- xlinectl/src/command/lease/keep_alive.rs | 4 +- xlinectl/src/command/snapshot.rs | 11 +- 49 files changed, 545 insertions(+), 611 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d821513cd..181aff490 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3458,6 +3458,7 @@ dependencies = [ name = "xline-client" version = "0.1.0" dependencies = [ + "anyhow", "async-stream", "clippy-utilities 0.1.0", "curp", @@ -3482,6 +3483,7 @@ dependencies = [ name = "xline-test-utils" version = "0.1.0" dependencies = [ + "anyhow", "clap 4.4.6", "curp", "futures", diff --git a/benchmark/src/bench_client.rs b/benchmark/src/bench_client.rs index 3b7989447..e3a511c91 100644 --- a/benchmark/src/bench_client.rs +++ b/benchmark/src/bench_client.rs @@ -3,8 +3,9 @@ use std::fmt::Debug; use anyhow::Result; use etcd_client::{Client as EtcdClient, PutOptions}; use thiserror::Error; +use xline::server::Command; use xline_client::{ - error::ClientError, + error::XlineClientError as ClientError, types::kv::{PutRequest, PutResponse}, Client, ClientOptions, }; @@ -18,7 +19,7 @@ pub(crate) enum BenchClientError { EtcdError(#[from] etcd_client::Error), /// Error from `xline_client` #[error("xline_client error: {0}")] - XlineError(#[from] ClientError), + XlineError(#[from] ClientError), } /// Benchmark client diff --git a/curp/src/client.rs b/curp/src/client.rs index a2ac9849b..a3234ff4d 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -15,10 +15,7 @@ use utils::config::ClientConfig; use crate::{ cmd::{Command, ProposeId}, - error::{ - ClientBuildError, CommandProposeError, CommandSyncError, ProposeError, RpcError, - WaitSyncError, - }, + error::{ClientBuildError, ClientError, ProposeError}, members::ServerId, rpc::{ self, connect::ConnectApi, protocol_client::ProtocolClient, FetchClusterRequest, @@ -220,6 +217,36 @@ pub enum ReadState { CommitIndex(LogIndex), } +/// get `term` and `leader_id` from a `tonic::Status` (Redirect) +fn parse_redirect_status(metadata: &tonic::metadata::MetadataMap) -> (Option, u64) { + let term = metadata + .get("term") + .unwrap_or_else(|| unreachable!("The Redirect must contain a valid term")) + .to_str() + .unwrap_or_else(|err| { + unreachable!("a valid term should not includes any non-ascii character: {err}") + }) + .parse::() + .unwrap_or_else(|err| { + unreachable!("parse term in a Redirect should always be successful: {err}") + }); + + if let Some(leader_id) = metadata.get("leader_id") { + let leader_id = leader_id + .to_str() + .unwrap_or_else(|err| { + unreachable!("a valid term should not includes any non-ascii character: {err}") + }) + .parse::() + .unwrap_or_else(|err| { + unreachable!("parse leader_id in a Redirect should always be successful: {err}") + }); + (Some(leader_id), term) + } else { + (None, term) + } +} + impl Client where C: Command + 'static, @@ -237,7 +264,7 @@ where async fn fast_round( &self, cmd_arc: Arc, - ) -> Result<(Option<::ER>, bool), CommandProposeError> { + ) -> Result<(Option<::ER>, bool), ClientError> { debug!("fast round for cmd({}) started", cmd_arc.id()); let req = ProposeRequest::new(cmd_arc.as_ref()); @@ -261,6 +288,9 @@ where let resp = match resp_result { Ok(resp) => resp.into_inner(), Err(e) => { + if e.code() == tonic::Code::Unavailable { + return Err(ClientError::ShuttingDown); + } warn!("Propose error: {}", e); continue; } @@ -268,11 +298,11 @@ where self.state .write() .check_and_update(resp.leader_id, resp.term); - resp.map_or_else::( + resp.map_or_else::>>( |res| { if let Some(er) = res .transpose() - .map_err(|e| CommandProposeError::Execute(e))? + .map_err(|e| ClientError::CommandError::(e))? { assert!(execute_result.is_none(), "should not set exe result twice"); execute_result = Some(er); @@ -281,15 +311,10 @@ where Ok(()) }, |err| { - if matches!(err, ProposeError::Shutdown) { - Err(CommandProposeError::Propose(ProposeError::Shutdown)) - } else { - warn!("Propose error: {}", err); - Ok(()) - } + warn!("Propose error: {}", err); + Ok(()) }, - ) - .map_err(Into::::into)??; + )??; if (ok_cnt >= superquorum) && execute_result.is_some() { debug!("fast round for cmd({}) succeed", cmd_arc.id()); return Ok((execute_result, true)); @@ -303,7 +328,7 @@ where async fn slow_round( &self, cmd: Arc, - ) -> Result<(::ASR, ::ER), CommandProposeError> { + ) -> Result<(::ASR, ::ER), ClientError> { debug!("slow round for cmd({}) started", cmd.id()); let retry_timeout = *self.config.retry_timeout(); let retry_count = *self.config.retry_count(); @@ -330,47 +355,39 @@ where Ok(resp) => resp.into_inner(), Err(e) => { warn!("wait synced rpc error: {e}"); - // it's quite likely that the leader has crashed, then we should wait for some time and fetch the leader again - tokio::time::sleep(retry_timeout).await; - self.resend_propose(Arc::clone(&cmd), None).await?; + if e.code() == tonic::Code::Unavailable { + return Err(ClientError::ShuttingDown); + } else if e.code() == tonic::Code::FailedPrecondition { + let metadata = e.metadata(); + let (new_leader, term) = parse_redirect_status(metadata); + self.state.write().check_and_update(new_leader, term); + // resend the propose to the new leader + self.resend_propose(Arc::clone(&cmd), new_leader).await?; + } else { + // it's quite likely that the leader has crashed, then we should wait for some time and fetch the leader again + tokio::time::sleep(retry_timeout).await; + self.resend_propose(Arc::clone(&cmd), None).await?; + } continue; } }; - match resp.into::().map_err(Into::::into)? { + match resp.into::().map_err(Into::>::into)? { SyncResult::Success { er, asr } => { debug!("slow round for cmd({}) succeeded", cmd.id()); return Ok((asr, er)); } - SyncResult::Error(CommandSyncError::Shutdown) => { - return Err(CommandProposeError::Propose(ProposeError::Shutdown)); - } - SyncResult::Error(CommandSyncError::WaitSync(WaitSyncError::Redirect( - new_leader, - term, - ))) => { - self.state.write().check_and_update(new_leader, term); - self.resend_propose(Arc::clone(&cmd), new_leader).await?; // resend the propose to the new leader - } - SyncResult::Error(CommandSyncError::WaitSync(e)) => { - return Err( - ProposeError::SyncedError(WaitSyncError::Other(e.to_string())).into(), - ); - } - SyncResult::Error(CommandSyncError::Execute(e)) => { - return Err(CommandProposeError::Execute(e)); - } - SyncResult::Error(CommandSyncError::AfterSync(e)) => { - return Err(CommandProposeError::AfterSync(e)); + SyncResult::Error(e) => { + return Err(ClientError::CommandError(e)); } } } - Err(CommandProposeError::Propose(ProposeError::Timeout)) + Err(ClientError::Timeout) } /// The shutdown rpc of curp protocol #[instrument(skip_all)] - pub async fn shutdown(&self) -> Result<(), ProposeError> { + pub async fn shutdown(&self) -> Result<(), ClientError> { let retry_count = *self.config.retry_count(); for _ in 0..retry_count { let leader_id = match self.get_leader_id().await { @@ -381,7 +398,7 @@ where } }; debug!("shutdown request sent to {}", leader_id); - let resp = match self + if let Err(e) = self .get_connect(leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) .shutdown( @@ -390,26 +407,23 @@ where ) .await { - Ok(resp) => resp.into_inner(), - Err(e) => { - warn!("shutdown rpc error: {e}"); - tokio::time::sleep(*self.config.retry_timeout()).await; + // Leader may not be correct leader, resend the shutdown request to the new leader + if e.code() == tonic::Code::FailedPrecondition { + let metadata = e.metadata(); + let (new_leader, term) = parse_redirect_status(metadata); + self.state + .write() + .check_and_update(new_leader, term); + warn!("shutdown: redirect to new leader {new_leader:?}, term is {term}",); continue; } + warn!("shutdown rpc error: {e}"); + tokio::time::sleep(*self.config.retry_timeout()).await; + continue; }; - self.state - .write() - .check_and_update(resp.leader_id, resp.term); - match resp.error { - Some(e) => { - // Leader may not be correct leader, resend the shutdown request to the new leader - warn!("shutdown error: {:?}", e); - continue; - } - None => return Ok(()), - } + return Ok(()); } - Err(ProposeError::Timeout) + Err(ClientError::Timeout) } /// Resend the propose only to the leader. This is used when leader changes and we need to ensure that the propose is received by the new leader. @@ -417,7 +431,7 @@ where &self, cmd: Arc, mut new_leader: Option, - ) -> Result<(), ProposeError> { + ) -> Result<(), ClientError> { let retry_count = *self.config.retry_count(); for _ in 0..retry_count { tokio::time::sleep(*self.config.retry_timeout()).await; @@ -501,15 +515,15 @@ where Ordering::Greater => {} } } - Err(ProposeError::Timeout) + Err(ClientError::Timeout) } /// Send linearizable fetch cluster requests to all servers /// Note: The fetched cluster may still be outdated /// # Errors - /// `ProposeError::Timeout` if timeout + /// `ClientError::Timeout` if timeout #[inline] - async fn fetch_cluster(&self) -> Result { + async fn fetch_cluster(&self) -> Result> { let retry_count = *self.config.retry_count(); for _ in 0..retry_count { let connects = self.all_connects(); @@ -567,12 +581,12 @@ where // TODO: let user configure it according to average leader election cost tokio::time::sleep(*self.config.retry_timeout()).await; } - Err(ProposeError::Timeout) + Err(ClientError::Timeout) } /// Send fetch leader requests to all servers until there is a leader /// Note: The fetched leader may still be outdated - async fn fetch_leader(&self) -> Result { + async fn fetch_leader(&self) -> Result> { let retry_count = *self.config.retry_count(); for _ in 0..retry_count { let res = self.fetch_cluster().await?; @@ -583,14 +597,14 @@ where // This timeout is a bit different. It refers to the situation where // multiple attempts to fetch the cluster are successful, but there // is no leader id (very rare). - Err(ProposeError::Timeout) + Err(ClientError::Timeout) } /// Get leader id from the state or fetch it from servers /// # Errors - /// `ProposeError::Timeout` if timeout + /// `ClientError::Timeout` if timeout #[inline] - pub async fn get_leader_id(&self) -> Result { + pub async fn get_leader_id(&self) -> Result> { let notify = Arc::clone(&self.state.read().leader_notify); let retry_timeout = *self.config.retry_timeout(); let retry_count = *self.config.retry_count(); @@ -602,13 +616,13 @@ where return self.fetch_leader().await; } } - Err(ProposeError::Timeout) + Err(ClientError::Timeout) } /// Propose the request to servers, if use_fast_path is false, it will wait for the synced index /// # Errors - /// `CommandProposeError::Execute` if execution error is met - /// `CommandProposeError::AfterSync` error met while syncing logs to followers + /// `ClientError::Execute` if execution error is met + /// `ClientError::AfterSync` error met while syncing logs to followers /// # Panics /// If leader index is out of bound of all the connections, panic #[inline] @@ -618,7 +632,7 @@ where &self, cmd: C, use_fast_path: bool, - ) -> Result<(C::ER, Option), CommandProposeError> { + ) -> Result<(C::ER, Option), ClientError> { let cmd_arc = Arc::new(cmd); let fast_round = self.fast_round(Arc::clone(&cmd_arc)); let slow_round = self.slow_round(cmd_arc); @@ -664,7 +678,7 @@ where /// # Errors /// `ProposeError::EncodingError` encoding error met while deserializing the propose id #[inline] - pub async fn fetch_read_state(&self, cmd: &C) -> Result { + pub async fn fetch_read_state(&self, cmd: &C) -> Result> { let retry_timeout = *self.config.retry_timeout(); let retry_count = *self.config.retry_count(); for _ in 0..retry_count { @@ -706,7 +720,7 @@ where }; return Ok(state); } - Err(ProposeError::Timeout) + Err(ClientError::Timeout) } /// Fetch the current cluster from the curp server where is on the same node. @@ -714,7 +728,7 @@ where /// we will fallback to fetch the full cluster for the response if fetching local /// failed. #[inline] - async fn fetch_local_cluster(&self) -> Result { + async fn fetch_local_cluster(&self) -> Result> { if let Some(local_server) = self.local_server_id { let resp = self .get_connect(local_server) @@ -730,9 +744,9 @@ where /// Fetch the current leader id without cache /// # Errors - /// `ProposeError::Timeout` if timeout + /// `ClientError::Timeout` if timeout #[inline] - pub async fn get_leader_id_from_curp(&self) -> Result> { + pub async fn get_leader_id_from_curp(&self) -> Result> { if let Ok(FetchClusterResponse { leader_id: Some(leader_id), .. @@ -740,9 +754,7 @@ where { return Ok(leader_id); } - self.fetch_leader() - .await - .map_err(|e| CommandProposeError::Propose(e)) + self.fetch_leader().await } /// Fetch the current cluster without cache, return the leader and the members @@ -752,15 +764,13 @@ where pub async fn get_cluster_from_curp( &self, linearizable: bool, - ) -> Result> { + ) -> Result> { if !linearizable { if let Ok(resp) = self.fetch_local_cluster().await { return Ok(resp); } } - self.fetch_cluster() - .await - .map_err(|e| CommandProposeError::Propose(e)) + self.fetch_cluster().await } /// Get the connect by server id @@ -777,9 +787,9 @@ where /// /// # Errors /// - /// `ProposeError::Timeout` if timeout + /// `ClientError::Timeout` if timeout #[allow(clippy::unused_async)] // TODO: grant a client id from server - async fn get_client_id(&self) -> Result { + async fn get_client_id(&self) -> Result> { Ok(rand::random()) } @@ -792,9 +802,9 @@ where /// Generate a propose id /// /// # Errors - /// `ProposeError::Timeout` if timeout + /// `ClientError::Timeout` if timeout #[inline] - pub async fn gen_propose_id(&self) -> Result> { + pub async fn gen_propose_id(&self) -> Result> { let client_id = self.get_client_id().await?; let seq_num = self.new_seq_num(); Ok(generate_propose_id(client_id, seq_num)) diff --git a/curp/src/error.rs b/curp/src/error.rs index 82af769cc..a880a6a8c 100644 --- a/curp/src/error.rs +++ b/curp/src/error.rs @@ -7,10 +7,7 @@ use thiserror::Error; use crate::{ members::ServerId, - rpc::{ - PbCommandSyncError, PbCommandSyncErrorOuter, PbProposeError, PbProposeErrorOuter, - PbWaitSyncError, PbWaitSyncErrorOuter, RedirectData, - }, + rpc::{PbProposeError, PbProposeErrorOuter}, }; /// Error type of client builder @@ -68,31 +65,16 @@ pub enum ServerError { } /// The error met during propose phase -#[derive(Error, Debug, Clone, Serialize, Deserialize)] +#[derive(Error, Debug, Clone, Copy, Serialize, Deserialize)] #[allow(clippy::module_name_repetitions)] // this-error generate code false-positive #[non_exhaustive] pub enum ProposeError { - /// Current node is not leader - #[error("not leader")] - NotLeader, - /// Cluster already shutdown - #[error("cluster shutdown")] - Shutdown, - /// Propose timeout - #[error("timeout")] - Timeout, /// The command conflicts with keys in the speculative commands #[error("key conflict error")] KeyConflict, /// The command has already been proposed before #[error("duplicated, the cmd might have already been proposed")] Duplicated, - /// Command syncing error - #[error("syncing error {0}")] - SyncedError(WaitSyncError), - /// Encode error - #[error("encode error: {0}")] - EncodeError(String), } impl TryFrom for ProposeError { @@ -101,17 +83,8 @@ impl TryFrom for ProposeError { #[inline] fn try_from(err: PbProposeError) -> Result { Ok(match err { - PbProposeError::Timeout(_) => ProposeError::Timeout, - PbProposeError::NotLeader(_) => ProposeError::NotLeader, - PbProposeError::Shutdown(_) => ProposeError::Shutdown, PbProposeError::KeyConflict(_) => ProposeError::KeyConflict, PbProposeError::Duplicated(_) => ProposeError::Duplicated, - PbProposeError::WaitSyncError(e) => ProposeError::SyncedError( - e.wait_sync_error - .ok_or(PbSerializeError::EmptyField)? - .into(), - ), - PbProposeError::EncodeError(s) => ProposeError::EncodeError(s), }) } } @@ -120,15 +93,8 @@ impl From for PbProposeErrorOuter { #[inline] fn from(err: ProposeError) -> Self { let e = match err { - ProposeError::Timeout => PbProposeError::Timeout(()), - ProposeError::NotLeader => PbProposeError::NotLeader(()), - ProposeError::Shutdown => PbProposeError::Shutdown(()), ProposeError::KeyConflict => PbProposeError::KeyConflict(()), ProposeError::Duplicated => PbProposeError::Duplicated(()), - ProposeError::SyncedError(e) => PbProposeError::WaitSyncError(PbWaitSyncErrorOuter { - wait_sync_error: Some(e.into()), - }), - ProposeError::EncodeError(s) => PbProposeError::EncodeError(s), }; PbProposeErrorOuter { propose_error: Some(e), @@ -136,17 +102,10 @@ impl From for PbProposeErrorOuter { } } -impl From for ProposeError { - #[inline] - fn from(err: PbSerializeError) -> Self { - ProposeError::EncodeError(err.to_string()) - } -} - impl PbCodec for ProposeError { #[inline] fn encode(&self) -> Vec { - PbProposeErrorOuter::from(self.clone()).encode_to_vec() + PbProposeErrorOuter::from(*self).encode_to_vec() } #[inline] @@ -158,149 +117,48 @@ impl PbCodec for ProposeError { } } -/// The error met during propose phase -#[derive(Error, Debug, Serialize, Deserialize)] -#[allow(clippy::module_name_repetitions)] // this-error generate code false-positive -#[error("rcp error {0}")] -pub struct RpcError(String); - -impl From for RpcError { - #[inline] - fn from(e: tonic::transport::Error) -> Self { - Self(e.to_string()) - } -} - -impl From for RpcError { - #[inline] - fn from(e: tonic::Status) -> Self { - Self(e.to_string()) - } -} - -impl From for ProposeError { - #[inline] - fn from(e: bincode::Error) -> Self { - Self::EncodeError(e.to_string()) - } -} - /// The union error which includes propose errors and user-defined errors. -#[derive(Error, Debug, Serialize, Deserialize)] -#[allow(clippy::module_name_repetitions)] // this-error generate code false-positive -#[non_exhaustive] -pub enum CommandProposeError { - /// Curp propose error - #[error("propose error: {0:?}")] - Propose(#[from] ProposeError), - /// User defined execute error - #[error("execute error: {0}")] - Execute(C::Error), - /// User defined after sync error - #[error("after sync error: {0}")] - AfterSync(C::Error), -} - -/// Wait synced error -#[derive(Clone, Error, Serialize, Deserialize, Debug)] +#[derive(Error, Debug)] #[allow(clippy::module_name_repetitions)] // this-error generate code false-positive #[non_exhaustive] -pub enum WaitSyncError { - /// If client sent a wait synced request to a non-leader - #[error("redirect to {0:?}, term {1}")] - Redirect(Option, u64), - /// Other error - #[error("other: {0}")] - Other(String), +pub enum ClientError { + /// Command error + #[error("command execute error {0}")] + CommandError(C::Error), + /// Io error + #[error("IO error {0}")] + IoError(String), + /// Rpc error + #[error("RPC error: {0}")] + OutOfBound(#[from] tonic::Status), + /// Arguments invalid error,it's for outer client + #[error("Invalid arguments: {0}")] + InvalidArgs(String), + /// Internal Error in client + #[error("Client Internal error: {0}")] + InternalError(String), + /// Request Timeout + #[error("Request timeout")] + Timeout, + /// Server is shutting down + #[error("Curp Server is shutting down")] + ShuttingDown, + /// Serialize and Deserialize Error + #[error("EncodeDecode error: {0}")] + EncodeDecode(String), } -impl From for WaitSyncError { +impl From for ClientError { #[inline] - fn from(err: PbWaitSyncError) -> Self { - match err { - PbWaitSyncError::Redirect(data) => WaitSyncError::Redirect(data.server_id, data.term), - PbWaitSyncError::Other(s) => WaitSyncError::Other(s), - } - } -} - -impl From for PbWaitSyncError { - fn from(err: WaitSyncError) -> Self { - match err { - WaitSyncError::Redirect(server_id, term) => { - PbWaitSyncError::Redirect(RedirectData { server_id, term }) - } - WaitSyncError::Other(s) => PbWaitSyncError::Other(s), - } - } -} - -/// The union error which includes sync errors and user-defined errors. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub(crate) enum CommandSyncError { - /// Cluster already shutdown - Shutdown, - /// If wait sync went wrong - WaitSync(WaitSyncError), - /// If the execution of the cmd went wrong - Execute(C::Error), - /// If after sync of the cmd went wrong - AfterSync(C::Error), -} - -impl From> for PbCommandSyncError { - fn from(err: CommandSyncError) -> Self { - match err { - CommandSyncError::Shutdown => PbCommandSyncError::Shutdown(()), - CommandSyncError::WaitSync(e) => PbCommandSyncError::WaitSync(PbWaitSyncErrorOuter { - wait_sync_error: Some(e.into()), - }), - CommandSyncError::Execute(e) => PbCommandSyncError::Execute(e.encode()), - CommandSyncError::AfterSync(e) => PbCommandSyncError::AfterSync(e.encode()), - } - } -} - -impl TryFrom for CommandSyncError { - type Error = PbSerializeError; - - fn try_from(err: PbCommandSyncError) -> Result { - Ok(match err { - PbCommandSyncError::Shutdown(_) => CommandSyncError::Shutdown, - PbCommandSyncError::WaitSync(e) => CommandSyncError::WaitSync( - e.wait_sync_error - .ok_or(PbSerializeError::EmptyField)? - .into(), - ), - PbCommandSyncError::Execute(e) => { - CommandSyncError::Execute(::Error::decode(&e)?) - } - PbCommandSyncError::AfterSync(e) => { - CommandSyncError::AfterSync(::Error::decode(&e)?) - } - }) - } -} - -impl PbCodec for CommandSyncError { - fn encode(&self) -> Vec { - PbCommandSyncErrorOuter { - command_sync_error: Some(self.clone().into()), - } - .encode_to_vec() - } - - fn decode(buf: &[u8]) -> Result { - PbCommandSyncErrorOuter::decode(buf)? - .command_sync_error - .ok_or(PbSerializeError::EmptyField)? - .try_into() + fn from(err: PbSerializeError) -> Self { + Self::EncodeDecode(err.to_string()) } } -impl From for CommandSyncError { - fn from(err: WaitSyncError) -> Self { - Self::WaitSync(err) +impl From for ClientError { + #[inline] + fn from(err: bincode::Error) -> Self { + Self::EncodeDecode(err.to_string()) } } @@ -322,8 +180,6 @@ pub enum ApplyConfChangeError { #[cfg(test)] mod test { - use curp_test_utils::test_cmd::{ExecuteError, TestCommand}; - use super::*; #[test] @@ -333,19 +189,4 @@ mod test { ::decode(&err.encode()).expect("decode should success"); assert!(matches!(err, _decoded_err)); } - - #[test] - fn cmd_sync_error_serialization_is_ok() { - let err: CommandSyncError = - CommandSyncError::WaitSync(WaitSyncError::Other("msg".to_owned())); - let _decoded_err = as PbCodec>::decode(&err.encode()) - .expect("decode should success"); - assert!(matches!(err, _decoded_err)); - - let err1: CommandSyncError = - CommandSyncError::Execute(ExecuteError("msg".to_owned())); - let _decoded_err1 = as PbCodec>::decode(&err1.encode()) - .expect("decode should success"); - assert!(matches!(err1, _decoded_err1)); - } } diff --git a/curp/src/rpc/connect.rs b/curp/src/rpc/connect.rs index bc3383ae6..3cb4a5d0d 100644 --- a/curp/src/rpc/connect.rs +++ b/curp/src/rpc/connect.rs @@ -20,7 +20,6 @@ use utils::tracing::Inject; use super::{ShutdownRequest, ShutdownResponse}; use crate::{ - error::RpcError, members::ServerId, rpc::{ proto::{ @@ -96,42 +95,42 @@ pub(crate) trait ConnectApi: Send + Sync + 'static { fn id(&self) -> ServerId; /// Update server addresses, the new addresses will override the old ones - async fn update_addrs(&self, addrs: Vec) -> Result<(), RpcError>; + async fn update_addrs(&self, addrs: Vec) -> Result<(), tonic::transport::Error>; /// Send `ProposeRequest` async fn propose( &self, request: ProposeRequest, timeout: Duration, - ) -> Result, RpcError>; + ) -> Result, tonic::Status>; /// Send `WaitSyncedRequest` async fn wait_synced( &self, request: WaitSyncedRequest, timeout: Duration, - ) -> Result, RpcError>; + ) -> Result, tonic::Status>; /// Send `ShutdownRequest` async fn shutdown( &self, request: ShutdownRequest, timeout: Duration, - ) -> Result, RpcError>; + ) -> Result, tonic::Status>; /// Send `FetchClusterRequest` async fn fetch_cluster( &self, request: FetchClusterRequest, timeout: Duration, - ) -> Result, RpcError>; + ) -> Result, tonic::Status>; /// Send `FetchReadStateRequest` async fn fetch_read_state( &self, request: FetchReadStateRequest, timeout: Duration, - ) -> Result, RpcError>; + ) -> Result, tonic::Status>; } /// Inner Connect interface among different servers @@ -142,21 +141,21 @@ pub(crate) trait InnerConnectApi: Send + Sync + 'static { fn id(&self) -> ServerId; /// Update server addresses, the new addresses will override the old ones - async fn update_addrs(&self, addrs: Vec) -> Result<(), RpcError>; + async fn update_addrs(&self, addrs: Vec) -> Result<(), tonic::transport::Error>; /// Send `AppendEntriesRequest` async fn append_entries( &self, request: AppendEntriesRequest, timeout: Duration, - ) -> Result, RpcError>; + ) -> Result, tonic::Status>; /// Send `VoteRequest` async fn vote( &self, request: VoteRequest, timeout: Duration, - ) -> Result, RpcError>; + ) -> Result, tonic::Status>; /// Send a snapshot async fn install_snapshot( @@ -164,7 +163,7 @@ pub(crate) trait InnerConnectApi: Send + Sync + 'static { term: u64, leader_id: ServerId, snapshot: Snapshot, - ) -> Result, RpcError>; + ) -> Result, tonic::Status>; } /// The connection struct to hold the real rpc connections, it may failed to connect, but it also @@ -184,7 +183,7 @@ pub(crate) struct Connect { impl Connect { /// Update server addresses, the new addresses will override the old ones - async fn inner_update_addrs(&self, addrs: Vec) -> Result<(), RpcError> { + async fn inner_update_addrs(&self, addrs: Vec) -> Result<(), tonic::transport::Error> { let mut old = self.addrs.lock().await; let old_addrs: HashSet = old.iter().cloned().collect(); let new_addrs: HashSet = addrs.iter().cloned().collect(); @@ -216,7 +215,7 @@ impl ConnectApi for Connect> { } /// Update server addresses, the new addresses will override the old ones - async fn update_addrs(&self, addrs: Vec) -> Result<(), RpcError> { + async fn update_addrs(&self, addrs: Vec) -> Result<(), tonic::transport::Error> { self.inner_update_addrs(addrs).await } @@ -226,12 +225,12 @@ impl ConnectApi for Connect> { &self, request: ProposeRequest, timeout: Duration, - ) -> Result, RpcError> { + ) -> Result, tonic::Status> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.propose(req).await.map_err(Into::into) + client.propose(req).await } /// Send `WaitSyncedRequest` @@ -240,12 +239,12 @@ impl ConnectApi for Connect> { &self, request: WaitSyncedRequest, timeout: Duration, - ) -> Result, RpcError> { + ) -> Result, tonic::Status> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.wait_synced(req).await.map_err(Into::into) + client.wait_synced(req).await } /// Send `ShutdownRequest` @@ -253,12 +252,12 @@ impl ConnectApi for Connect> { &self, request: ShutdownRequest, timeout: Duration, - ) -> Result, RpcError> { + ) -> Result, tonic::Status> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.shutdown(req).await.map_err(Into::into) + client.shutdown(req).await } /// Send `FetchLeaderRequest` @@ -266,11 +265,11 @@ impl ConnectApi for Connect> { &self, request: FetchClusterRequest, timeout: Duration, - ) -> Result, RpcError> { + ) -> Result, tonic::Status> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); - client.fetch_cluster(req).await.map_err(Into::into) + client.fetch_cluster(req).await } /// Send `FetchReadStateRequest` @@ -278,11 +277,11 @@ impl ConnectApi for Connect> { &self, request: FetchReadStateRequest, timeout: Duration, - ) -> Result, RpcError> { + ) -> Result, tonic::Status> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); - client.fetch_read_state(req).await.map_err(Into::into) + client.fetch_read_state(req).await } } @@ -294,7 +293,7 @@ impl InnerConnectApi for Connect> { } /// Update server addresses, the new addresses will override the old ones - async fn update_addrs(&self, addrs: Vec) -> Result<(), RpcError> { + async fn update_addrs(&self, addrs: Vec) -> Result<(), tonic::transport::Error> { self.inner_update_addrs(addrs).await } @@ -303,11 +302,11 @@ impl InnerConnectApi for Connect> { &self, request: AppendEntriesRequest, timeout: Duration, - ) -> Result, RpcError> { + ) -> Result, tonic::Status> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); - client.append_entries(req).await.map_err(Into::into) + client.append_entries(req).await } /// Send `VoteRequest` @@ -315,11 +314,11 @@ impl InnerConnectApi for Connect> { &self, request: VoteRequest, timeout: Duration, - ) -> Result, RpcError> { + ) -> Result, tonic::Status> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); - client.vote(req).await.map_err(Into::into) + client.vote(req).await } async fn install_snapshot( @@ -327,10 +326,10 @@ impl InnerConnectApi for Connect> { term: u64, leader_id: ServerId, snapshot: Snapshot, - ) -> Result, RpcError> { + ) -> Result, tonic::Status> { let stream = install_snapshot_stream(term, leader_id, snapshot); let mut client = self.rpc_connect.clone(); - client.install_snapshot(stream).await.map_err(Into::into) + client.install_snapshot(stream).await } } diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index da4bc3ca1..7e102480c 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -5,18 +5,8 @@ use serde::{de::DeserializeOwned, Serialize}; use self::proto::commandpb::{cmd_result::Result as CmdResultInner, CmdResult}; pub(crate) use self::proto::{ - commandpb::{ - propose_response::ExeResult, - wait_synced_response::{Success, SyncResult as SyncResultRaw}, - WaitSyncedRequest, WaitSyncedResponse, - }, - errorpb::{ - command_sync_error::CommandSyncError as PbCommandSyncError, - propose_error::ProposeError as PbProposeError, - wait_sync_error::WaitSyncError as PbWaitSyncError, - CommandSyncError as PbCommandSyncErrorOuter, ProposeError as PbProposeErrorOuter, - RedirectData, WaitSyncError as PbWaitSyncErrorOuter, - }, + commandpb::{propose_response::ExeResult, WaitSyncedRequest, WaitSyncedResponse}, + errorpb::{propose_error::ProposeError as PbProposeError, ProposeError as PbProposeErrorOuter}, inner_messagepb::{ inner_protocol_server::InnerProtocol, AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse, VoteRequest, VoteResponse, @@ -40,7 +30,7 @@ pub use self::proto::{ }; use crate::{ cmd::{Command, ProposeId}, - error::{CommandSyncError, ProposeError, WaitSyncError}, + error::ProposeError, log_entry::LogEntry, members::{Member, ServerId}, LogIndex, @@ -133,7 +123,7 @@ impl ProposeResponse { ) -> Self { let exe_result = match *result { Ok(ref er) => Some(ExeResult::Result(CmdResult { - result: Some(CmdResultInner::Er(er.encode())), + result: Some(CmdResultInner::Ok(er.encode())), })), Err(ref e) => Some(ExeResult::Result(CmdResult { result: Some(CmdResultInner::Error(e.encode())), @@ -179,7 +169,7 @@ impl ProposeResponse { Some(ExeResult::Result(ref rv)) => { let result = rv.result.as_ref().ok_or(PbSerializeError::EmptyField)?; let cmd_result = match *result { - CmdResultInner::Er(ref buf) => Ok(::ER::decode(buf)?), + CmdResultInner::Ok(ref buf) => Ok(::ER::decode(buf)?), CmdResultInner::Error(ref buf) => Err(::Error::decode(buf)?), }; Ok(success(Some(cmd_result))) @@ -213,65 +203,73 @@ impl WaitSyncedResponse { /// Create a success response pub(crate) fn new_success(asr: &C::ASR, er: &C::ER) -> Self { Self { - sync_result: Some(SyncResultRaw::Success(Success { - after_sync_result: asr.encode(), - exe_result: er.encode(), - })), + after_sync_result: Some(CmdResult { + result: Some(CmdResultInner::Ok(asr.encode())), + }), + exe_result: Some(CmdResult { + result: Some(CmdResultInner::Ok(er.encode())), + }), } } - /// Create an error response - pub(crate) fn new_error(err: CommandSyncError) -> Self { + /// Create an error response which includes an execution error + pub(crate) fn new_er_error(er: &C::Error) -> Self { Self { - sync_result: Some(SyncResultRaw::Error(PbCommandSyncErrorOuter { - command_sync_error: Some(err.into()), - })), + after_sync_result: None, + exe_result: Some(CmdResult { + result: Some(CmdResultInner::Error(er.encode())), + }), + } + } + + /// Create an error response which includes an `after_sync` error + pub(crate) fn new_asr_error(er: &C::ER, asr_err: &C::Error) -> Self { + Self { + after_sync_result: Some(CmdResult { + result: Some(CmdResultInner::Error(asr_err.encode())), + }), + exe_result: Some(CmdResult { + result: Some(CmdResultInner::Ok(er.encode())), + }), } } /// Create a new response from execution result and `after_sync` result pub(crate) fn new_from_result( - er: Option>, + er: Result, asr: Option>, ) -> Self { match (er, asr) { - (None | Some(Err(_)), Some(_)) => { - unreachable!("should not call after sync if execution fails") - } - (None, None) => WaitSyncedResponse::new_error::( - WaitSyncError::Other("can't get er result".to_owned()).into(), - ), // this is highly unlikely to happen, - (Some(Err(err)), None) => { - WaitSyncedResponse::new_error(CommandSyncError::::Execute(err)) - } - // The er is ignored as the propose has failed - (Some(Ok(_er)), Some(Err(err))) => { - WaitSyncedResponse::new_error(CommandSyncError::::AfterSync(err)) - } - (Some(Ok(er)), Some(Ok(asr))) => WaitSyncedResponse::new_success::(&asr, &er), - // The er is ignored as the propose has failed - (Some(Ok(_er)), None) => { - WaitSyncedResponse::new_error::( - WaitSyncError::Other("can't get after sync result".to_owned()).into(), - ) // this is highly unlikely to happen, - } + (Ok(ref er), Some(Err(ref asr_err))) => WaitSyncedResponse::new_asr_error::(er, asr_err), + (Ok(ref er), Some(Ok(ref asr))) => WaitSyncedResponse::new_success::(asr, er), + (Ok(ref _er), None) => unreachable!("can't get after sync result"), + (Err(ref err), _) => WaitSyncedResponse::new_er_error::(err), } } /// Into deserialized result pub(crate) fn into(self) -> Result, PbSerializeError> { - let res = match self.sync_result { - None => unreachable!("WaitSyncedResponse should contain valid sync_result"), - Some(SyncResultRaw::Success(success)) => SyncResult::Success { - asr: ::ASR::decode(&success.after_sync_result)?, - er: ::ER::decode(&success.exe_result)?, - }, - Some(SyncResultRaw::Error(err)) => { - let cmd_sync_err = err - .command_sync_error - .ok_or(PbSerializeError::EmptyField)? - .try_into()?; - SyncResult::Error(cmd_sync_err) + let res = match (self.exe_result, self.after_sync_result) { + (None, _) => unreachable!("WaitSyncedResponse should contain a valid exe_result"), + (Some(er), None) => { + if let Some(CmdResultInner::Error(buf)) = er.result { + SyncResult::Error(::Error::decode(buf.as_slice())?) + } else { + unreachable!("er should not be None") + } + } + (Some(er), Some(asr)) => { + let er = if let Some(CmdResultInner::Ok(er)) = er.result { + ::ER::decode(er.as_slice())? + } else { + unreachable!("") + }; + let asr = if let Some(CmdResultInner::Ok(asr)) = asr.result { + ::ASR::decode(asr.as_slice())? + } else { + unreachable!("") + }; + SyncResult::Success { asr, er } } }; Ok(res) @@ -288,7 +286,7 @@ pub(crate) enum SyncResult { er: C::ER, }, /// If sync fails, return `SyncError` - Error(CommandSyncError), + Error(C::Error), } impl AppendEntriesRequest { @@ -476,18 +474,6 @@ impl ConfChange { } } -impl ShutdownResponse { - /// Create a new shutdown response - pub(crate) fn new(leader_id: Option, term: u64, error: Option) -> Self { - let error = error.map(Into::into); - Self { - leader_id, - term, - error, - } - } -} - impl From for tonic::Status { #[inline] fn from(_err: ConfChangeError) -> Self { diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index 09b7b4f45..622cc777d 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -28,7 +28,6 @@ use super::{ }; use crate::{ cmd::{Command, CommandExecutor}, - error::{CommandSyncError, ProposeError, RpcError}, log_entry::LogEntry, members::{ClusterInfo, ServerId}, role_change::RoleChange, @@ -58,9 +57,18 @@ pub(super) enum CurpError { /// Io error #[error("io error, {0}")] IO(#[from] io::Error), - /// Internal error + /// Currently, Internal Error includes the following three types of errors: + /// 1. failed to allocate a new snapshot + /// 2. failed to reset the command executor by snapshot + /// 3. failed to get last applied index from command executor. #[error("internal error, {0}")] Internal(String), + /// Curp Server is shutting down + #[error("cluster shutdown")] + ShuttingDown, + /// If client sent a wait synced request to a non-leader + #[error("redirect to {0:?}, term {1}")] + Redirect(Option, u64), } impl From for CurpError { @@ -68,12 +76,36 @@ impl From for CurpError { Self::EncodeDecode(err.to_string()) } } + impl From for CurpError { fn from(err: PbSerializeError) -> Self { Self::EncodeDecode(err.to_string()) } } +impl From for tonic::Status { + #[inline] + fn from(err: CurpError) -> Self { + match err { + CurpError::EncodeDecode(msg) => tonic::Status::cancelled(msg), + CurpError::Internal(msg) => tonic::Status::internal(msg), + CurpError::Storage(err) => tonic::Status::internal(err.to_string()), + CurpError::IO(err) => tonic::Status::internal(err.to_string()), + CurpError::Transport(msg) => tonic::Status::unknown(msg), + CurpError::ShuttingDown => tonic::Status::unavailable("CurpServer is shutting down"), + CurpError::Redirect(leader_id, term) => { + let mut status = tonic::Status::failed_precondition("current node is not a leader"); + let meta = status.metadata_mut(); + if let Some(id) = leader_id { + _ = meta.insert("leader_id", id.into()); + } + _ = meta.insert("term", term.into()); + status + } + } + } +} + /// Internal error encountered when sending `append_entries` #[derive(Debug, Error)] enum SendAEError { @@ -85,10 +117,22 @@ enum SendAEError { Rejected, /// Transport #[error("transport error, {0}")] - Transport(#[from] RpcError), + Transport(String), /// Encode/Decode error #[error("encode or decode error")] - EncodeDecode(#[from] bincode::Error), + EncodeDecode(String), +} + +impl From for SendAEError { + fn from(status: tonic::Status) -> Self { + #[allow(clippy::wildcard_enum_match_arm)] // it's ok to do so since only three status can covert to `SendAEError` + match status.code() { + tonic::Code::Cancelled => Self::EncodeDecode(status.message().to_owned()), + tonic::Code::FailedPrecondition => Self::NotLeader, + tonic::Code::Unknown => Self::Transport(status.message().to_owned()), + _ => unreachable!("This tonic::Status {status:?} cannot covert to SendAEError"), + } + } } /// Internal error encountered when sending snapshot @@ -99,7 +143,18 @@ enum SendSnapshotError { NotLeader, /// Transport #[error("transport error, {0}")] - Transport(#[from] RpcError), + Transport(String), +} + +impl From for SendSnapshotError { + fn from(status: tonic::Status) -> Self { + #[allow(clippy::wildcard_enum_match_arm)] // it's ok to do so since `SendSnapshotError` only has two variants. + match status.code() { + tonic::Code::FailedPrecondition => Self::NotLeader, + tonic::Code::Unknown => Self::Transport(status.message().to_owned()), + _ => unreachable!("This tonic::Status {status:?} cannot covert to SendSnapshotError"), + } + } } /// `CurpNode` represents a single node of curp cluster @@ -123,12 +178,12 @@ impl CurpNode { /// Handle `Propose` requests pub(super) async fn propose(&self, req: ProposeRequest) -> Result { if self.curp.is_shutdown() { - return Ok(ProposeResponse::new_error(None, 0, ProposeError::Shutdown)); + return Err(CurpError::ShuttingDown); } let cmd: Arc = Arc::new(req.cmd()?); // handle proposal - let ((leader_id, term), result) = self.curp.handle_propose(Arc::clone(&cmd)); + let ((leader_id, term), result) = self.curp.handle_propose(Arc::clone(&cmd))?; let resp = match result { Ok(true) => { let er_res = CommandBoard::wait_for_er(&self.cmd_board, cmd.id()).await; @@ -146,12 +201,8 @@ impl CurpNode { &self, _request: ShutdownRequest, ) -> Result { - let ((leader_id, term), result) = self.curp.handle_shutdown(); - let error = match result { - Ok(()) => None, - Err(err) => Some(err), - }; - let resp = ShutdownResponse::new(leader_id, term, error); + self.curp.handle_shutdown()?; + let resp = ShutdownResponse::default(); CommandBoard::wait_for_shutdown_synced(&self.cmd_board).await; Ok(resp) } @@ -213,18 +264,14 @@ impl CurpNode { req: WaitSyncedRequest, ) -> Result { if self.curp.is_shutdown() { - return Ok(WaitSyncedResponse::new_error::( - CommandSyncError::Shutdown, - )); + return Err(CurpError::ShuttingDown); } let id = req.propose_id(); debug!("{} get wait synced request for cmd({id})", self.curp.id()); let (er, asr) = CommandBoard::wait_for_er_asr(&self.cmd_board, id).await; - let resp = WaitSyncedResponse::new_from_result::(Some(er), asr); - debug!("{} wait synced for cmd({id}) finishes", self.curp.id()); - Ok(resp) + Ok(WaitSyncedResponse::new_from_result::(er, asr)) } /// Handle `FetchCluster` requests @@ -702,7 +749,8 @@ impl CurpNode { ae.prev_log_term, ae.entries, ae.leader_commit, - )?; + ) + .map_err(|err| SendAEError::EncodeDecode(err.to_string()))?; debug!("{} send ae to {}", curp.id(), connect.id()); let resp = connect diff --git a/curp/src/server/mod.rs b/curp/src/server/mod.rs index e22074572..d1c1c0cd1 100644 --- a/curp/src/server/mod.rs +++ b/curp/src/server/mod.rs @@ -356,10 +356,3 @@ impl Rpc { self.inner.shutdown_listener() } } - -impl From for tonic::Status { - #[inline] - fn from(err: CurpError) -> Self { - tonic::Status::internal(err.to_string()) - } -} diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index 65e89877c..7de1a6a49 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -39,7 +39,7 @@ use self::{ log::Log, state::{CandidateState, LeaderState, State}, }; -use super::cmd_worker::CEEventTxApi; +use super::{cmd_worker::CEEventTxApi, CurpError}; use crate::{ cmd::{Command, ProposeId}, error::{ApplyConfChangeError, ProposeError}, @@ -207,7 +207,7 @@ impl RawCurp { pub(super) fn handle_propose( &self, cmd: Arc, - ) -> ((Option, u64), Result) { + ) -> Result<((Option, u64), Result), CurpError> { debug!("{} gets proposal for cmd({})", self.id(), cmd.id()); let mut conflict = self .ctx @@ -219,14 +219,14 @@ impl RawCurp { // Non-leader doesn't need to sync or execute if st_r.role != Role::Leader { - return ( + return Ok(( info, if conflict { Err(ProposeError::KeyConflict) } else { Ok(false) }, - ); + )); } if !self @@ -234,7 +234,7 @@ impl RawCurp { .cb .map_write(|mut cb_w| cb_w.sync.insert(cmd.id().clone())) { - return (info, Err(ProposeError::Duplicated)); + return Ok((info, Err(ProposeError::Duplicated))); } // leader also needs to check if the cmd conflicts un-synced commands @@ -248,15 +248,8 @@ impl RawCurp { }); let mut log_w = self.log.write(); - let entry = match log_w.push_cmd(st_r.term, cmd) { - Ok(entry) => { - debug!("{} gets new log[{}]", self.id(), entry.index); - entry - } - Err(e) => { - return (info, Err(e.into())); - } - }; + let entry = log_w.push_cmd(st_r.term, cmd)?; + debug!("{} gets new log[{}]", self.id(), entry.index); let index = entry.index; if !conflict { @@ -279,35 +272,29 @@ impl RawCurp { self.apply(&mut *log_w); } - ( + Ok(( info, if conflict { Err(ProposeError::KeyConflict) } else { Ok(true) }, - ) + )) } /// Handle `shutdown` request - pub(super) fn handle_shutdown(&self) -> ((Option, u64), Result<(), ProposeError>) { + pub(super) fn handle_shutdown( + &self, + ) -> Result<(), CurpError> { let st_r = self.st.read(); - let info = (st_r.leader_id, st_r.term); if st_r.role != Role::Leader { - return (info, Err(ProposeError::NotLeader)); + return Err(CurpError::Redirect(st_r.leader_id, st_r.term)); } let mut log_w = self.log.write(); - let entry = match log_w.push_shutdown(st_r.term) { - Ok(entry) => { - debug!("{} gets new log[{}]", self.id(), entry.index); - entry - } - Err(e) => { - return (info, Err(e.into())); - } - }; + let entry = log_w.push_shutdown(st_r.term)?; + debug!("{} gets new log[{}]", self.id(), entry.index); let index = entry.index; self.ctx.sync_events.iter().for_each(|pair| { @@ -325,7 +312,7 @@ impl RawCurp { self.apply(&mut *log_w); } - (info, Ok(())) + Ok(()) } /// Handle `append_entries` diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index 028d41bbf..a8dbb473e 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -92,7 +92,7 @@ fn leader_handle_propose_will_succeed() { RawCurp::new_test(3, exe_tx, mock_role_change()) }; let cmd = Arc::new(TestCommand::default()); - let ((leader_id, term), result) = curp.handle_propose(cmd); + let ((leader_id, term), result) = curp.handle_propose(cmd).unwrap(); assert_eq!(leader_id, Some(curp.id().clone())); assert_eq!(term, 0); assert!(matches!(result, Ok(true))); @@ -108,20 +108,20 @@ fn leader_handle_propose_will_reject_conflicted() { }; let cmd1 = Arc::new(TestCommand::new_put(vec![1], 0)); - let ((leader_id, term), result) = curp.handle_propose(cmd1); + let ((leader_id, term), result) = curp.handle_propose(cmd1).unwrap(); assert_eq!(leader_id, Some(curp.id().clone())); assert_eq!(term, 0); assert!(matches!(result, Ok(true))); let cmd2 = Arc::new(TestCommand::new_put(vec![1, 2], 1)); - let ((leader_id, term), result) = curp.handle_propose(cmd2); + let ((leader_id, term), result) = curp.handle_propose(cmd2).unwrap(); assert_eq!(leader_id, Some(curp.id().clone())); assert_eq!(term, 0); assert!(matches!(result, Err(ProposeError::KeyConflict))); // leader will also reject cmds that conflict un-synced cmds let cmd3 = Arc::new(TestCommand::new_put(vec![2], 1)); - let ((leader_id, term), result) = curp.handle_propose(cmd3); + let ((leader_id, term), result) = curp.handle_propose(cmd3).unwrap(); assert_eq!(leader_id, Some(curp.id().clone())); assert_eq!(term, 0); assert!(matches!(result, Err(ProposeError::KeyConflict))); @@ -136,12 +136,12 @@ fn leader_handle_propose_will_reject_duplicated() { RawCurp::new_test(3, exe_tx, mock_role_change()) }; let cmd = Arc::new(TestCommand::default()); - let ((leader_id, term), result) = curp.handle_propose(Arc::clone(&cmd)); + let ((leader_id, term), result) = curp.handle_propose(Arc::clone(&cmd)).unwrap(); assert_eq!(leader_id, Some(curp.id().clone())); assert_eq!(term, 0); assert!(matches!(result, Ok(true))); - let ((leader_id, term), result) = curp.handle_propose(cmd); + let ((leader_id, term), result) = curp.handle_propose(cmd).unwrap(); assert_eq!(leader_id, Some(curp.id().clone())); assert_eq!(term, 0); assert!(matches!(result, Err(ProposeError::Duplicated))); @@ -159,7 +159,7 @@ fn follower_handle_propose_will_succeed() { }; curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1); let cmd = Arc::new(TestCommand::new_get(vec![1])); - let ((leader_id, term), result) = curp.handle_propose(cmd); + let ((leader_id, term), result) = curp.handle_propose(cmd).unwrap(); assert_eq!(leader_id, None); assert_eq!(term, 1); assert!(matches!(result, Ok(false))); @@ -178,13 +178,13 @@ fn follower_handle_propose_will_reject_conflicted() { curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1); let cmd1 = Arc::new(TestCommand::new_get(vec![1])); - let ((leader_id, term), result) = curp.handle_propose(cmd1); + let ((leader_id, term), result) = curp.handle_propose(cmd1).unwrap(); assert_eq!(leader_id, None); assert_eq!(term, 1); assert!(matches!(result, Ok(false))); let cmd2 = Arc::new(TestCommand::new_get(vec![1])); - let ((leader_id, term), result) = curp.handle_propose(cmd2); + let ((leader_id, term), result) = curp.handle_propose(cmd2).unwrap(); assert_eq!(leader_id, None); assert_eq!(term, 1); assert!(matches!(result, Err(ProposeError::KeyConflict))); @@ -655,10 +655,7 @@ fn leader_handle_shutdown_will_succeed() { let exe_tx = MockCEEventTxApi::::default(); RawCurp::new_test(3, exe_tx, mock_role_change()) }; - let ((leader_id, term), result) = curp.handle_shutdown(); - assert_eq!(leader_id, Some(curp.id().clone())); - assert_eq!(term, 0); - assert!(matches!(result, Ok(()))); + assert!(curp.handle_shutdown().is_ok()); } #[traced_test] @@ -670,10 +667,9 @@ fn follower_handle_shutdown_will_reject() { RawCurp::new_test(3, exe_tx, mock_role_change()) }; curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1); - let ((leader_id, term), result) = curp.handle_shutdown(); - assert_eq!(leader_id, None); - assert_eq!(term, 1); - assert!(matches!(result, Err(ProposeError::NotLeader))); + let res = curp.handle_shutdown(); + assert!(res.is_err()); + assert!(matches!(res, Err(CurpError::Redirect(None, 1)))); } #[traced_test] diff --git a/curp/tests/server.rs b/curp/tests/server.rs index f965004dc..08b8a1574 100644 --- a/curp/tests/server.rs +++ b/curp/tests/server.rs @@ -3,10 +3,7 @@ use std::{sync::Arc, time::Duration}; use clippy_utilities::NumericCast; -use curp::{ - client::Builder, - error::{CommandProposeError, ProposeError}, -}; +use curp::{client::Builder, error::ClientError}; use curp_test_utils::{ init_logger, sleep_millis, sleep_secs, test_cmd::{TestCommand, TestCommandResult}, @@ -275,10 +272,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() { let res = client .propose(TestCommand::new_put(vec![888], 1), false) .await; - assert!(matches!( - res, - Err(CommandProposeError::Propose(ProposeError::Shutdown)) - )); + assert!(matches!(res, Err(ClientError::ShuttingDown))); let collection = collection_task.await.unwrap(); sleep_secs(3).await; // wait for the cluster to shutdown diff --git a/xline-client/Cargo.toml b/xline-client/Cargo.toml index 7bc7895b8..d1d7d6807 100644 --- a/xline-client/Cargo.toml +++ b/xline-client/Cargo.toml @@ -11,6 +11,7 @@ categories = ["Client"] keywords = ["Client", "Xline", "RPC"] [dependencies] +anyhow = "1.0.75" async-stream = "0.3.5" clippy-utilities = "0.1.0" curp = { path = "../curp" } diff --git a/xline-client/examples/auth.rs b/xline-client/examples/auth.rs index b2b43586e..07f73f4b2 100644 --- a/xline-client/examples/auth.rs +++ b/xline-client/examples/auth.rs @@ -1,4 +1,5 @@ -use xline_client::{error::Result, Client, ClientOptions}; +use anyhow::Result; +use xline_client::{Client, ClientOptions}; #[tokio::main] async fn main() -> Result<()> { diff --git a/xline-client/examples/auth_role.rs b/xline-client/examples/auth_role.rs index 1db9430f8..2319dd8ff 100644 --- a/xline-client/examples/auth_role.rs +++ b/xline-client/examples/auth_role.rs @@ -1,5 +1,5 @@ +use anyhow::Result; use xline_client::{ - error::Result, types::auth::{ AuthRoleAddRequest, AuthRoleDeleteRequest, AuthRoleGetRequest, AuthRoleGrantPermissionRequest, AuthRoleRevokePermissionRequest, Permission, diff --git a/xline-client/examples/auth_user.rs b/xline-client/examples/auth_user.rs index 29a01dd1b..416135834 100644 --- a/xline-client/examples/auth_user.rs +++ b/xline-client/examples/auth_user.rs @@ -1,5 +1,5 @@ +use anyhow::Result; use xline_client::{ - error::Result, types::auth::{ AuthUserAddRequest, AuthUserChangePasswordRequest, AuthUserDeleteRequest, AuthUserGetRequest, AuthUserGrantRoleRequest, AuthUserRevokeRoleRequest, diff --git a/xline-client/examples/error_handling.rs b/xline-client/examples/error_handling.rs index 3536d81bb..3ac5534c7 100644 --- a/xline-client/examples/error_handling.rs +++ b/xline-client/examples/error_handling.rs @@ -1,11 +1,7 @@ //! An example to show how the errors are organized in `xline-client` -use curp::error::CommandProposeError; +use anyhow::Result; use xline::storage::ExecuteError; -use xline_client::{ - error::{ClientError, Result}, - types::kv::PutRequest, - Client, ClientOptions, -}; +use xline_client::{error::XlineClientError, types::kv::PutRequest, Client, ClientOptions}; #[tokio::main] async fn main() -> Result<()> { @@ -24,14 +20,10 @@ async fn main() -> Result<()> { .await; let err = resp.unwrap_err(); - // We first match the client error - let ClientError::ProposeError(pe) = err else { - unreachable!("client.put should not return any errors other than PropseError, but it receives {err:?}") - }; - // Then we match the inner error returned by the Curp server. + // We match the inner error returned by the Curp server. // The command should failed at execution stage. - let CommandProposeError::Execute(ee) = pe else { - unreachable!("the propose error should be an Execute error, but it is {pe:?}") + let XlineClientError::CommandError(ee) = err else { + unreachable!("the propose error should be an Execute error, but it is {err:?}") }; assert!( diff --git a/xline-client/examples/kv.rs b/xline-client/examples/kv.rs index 26d18fa21..fe4b786b6 100644 --- a/xline-client/examples/kv.rs +++ b/xline-client/examples/kv.rs @@ -1,5 +1,5 @@ +use anyhow::Result; use xline_client::{ - error::ClientError as Error, types::kv::{ CompactionRequest, Compare, CompareResult, DeleteRangeRequest, PutRequest, RangeRequest, TxnOp, TxnRequest, @@ -8,7 +8,7 @@ use xline_client::{ }; #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<()> { // the name and address of all curp members let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"]; diff --git a/xline-client/examples/lease.rs b/xline-client/examples/lease.rs index 086e62233..24f1babe5 100644 --- a/xline-client/examples/lease.rs +++ b/xline-client/examples/lease.rs @@ -1,5 +1,5 @@ +use anyhow::Result; use xline_client::{ - error::Result, types::lease::{ LeaseGrantRequest, LeaseKeepAliveRequest, LeaseRevokeRequest, LeaseTimeToLiveRequest, }, diff --git a/xline-client/examples/lock.rs b/xline-client/examples/lock.rs index 3c873c1eb..e3ad2e123 100644 --- a/xline-client/examples/lock.rs +++ b/xline-client/examples/lock.rs @@ -1,5 +1,5 @@ +use anyhow::Result; use xline_client::{ - error::Result, types::lock::{LockRequest, UnlockRequest}, Client, ClientOptions, }; diff --git a/xline-client/examples/maintenance.rs b/xline-client/examples/maintenance.rs index 3cb246111..5eb81db91 100644 --- a/xline-client/examples/maintenance.rs +++ b/xline-client/examples/maintenance.rs @@ -1,7 +1,8 @@ -use xline_client::{error::ClientError as Error, Client, ClientOptions}; +use anyhow::Result; +use xline_client::{Client, ClientOptions}; #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<()> { // the name and address of all curp members let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"]; diff --git a/xline-client/examples/watch.rs b/xline-client/examples/watch.rs index 8d0a8d1dc..55e0a3d27 100644 --- a/xline-client/examples/watch.rs +++ b/xline-client/examples/watch.rs @@ -1,11 +1,11 @@ +use anyhow::Result; use xline_client::{ - error::ClientError as Error, types::{kv::PutRequest, watch::WatchRequest}, Client, ClientOptions, }; #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<()> { // the name and address of all curp members let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"]; diff --git a/xline-client/src/clients/auth.rs b/xline-client/src/clients/auth.rs index 067803e4c..1892b0115 100644 --- a/xline-client/src/clients/auth.rs +++ b/xline-client/src/clients/auth.rs @@ -17,7 +17,7 @@ use xlineapi::{ }; use crate::{ - error::{ClientError, Result}, + error::{Result, XlineClientError}, types::auth::{ AuthRoleAddRequest, AuthRoleDeleteRequest, AuthRoleGetRequest, AuthRoleGrantPermissionRequest, AuthRoleRevokePermissionRequest, AuthUserAddRequest, @@ -219,7 +219,9 @@ impl AuthClient { #[inline] pub async fn user_add(&self, mut request: AuthUserAddRequest) -> Result { if request.inner.name.is_empty() { - return Err(ClientError::InvalidArgs(String::from("user name is empty"))); + return Err(XlineClientError::InvalidArgs(String::from( + "user name is empty", + ))); } let need_password = request .inner @@ -227,7 +229,7 @@ impl AuthClient { .as_ref() .map_or(true, |o| !o.no_password); if need_password && request.inner.password.is_empty() { - return Err(ClientError::InvalidArgs(String::from( + return Err(XlineClientError::InvalidArgs(String::from( "password is required but not provided", ))); } @@ -378,7 +380,9 @@ impl AuthClient { mut request: AuthUserChangePasswordRequest, ) -> Result { if request.inner.password.is_empty() { - return Err(ClientError::InvalidArgs(String::from("role name is empty"))); + return Err(XlineClientError::InvalidArgs(String::from( + "role name is empty", + ))); } let hashed_password = Self::hash_password(request.inner.password.as_bytes()); request.inner.hashed_password = hashed_password; @@ -486,7 +490,9 @@ impl AuthClient { #[inline] pub async fn role_add(&self, request: AuthRoleAddRequest) -> Result { if request.inner.name.is_empty() { - return Err(ClientError::InvalidArgs(String::from("role name is empty"))); + return Err(XlineClientError::InvalidArgs(String::from( + "role name is empty", + ))); } self.handle_req(request.inner, false).await } @@ -638,7 +644,7 @@ impl AuthClient { request: AuthRoleGrantPermissionRequest, ) -> Result { if request.inner.perm.is_none() { - return Err(ClientError::InvalidArgs(String::from( + return Err(XlineClientError::InvalidArgs(String::from( "Permission not given", ))); } diff --git a/xline-client/src/clients/lease.rs b/xline-client/src/clients/lease.rs index 56c4526a5..86e1db1be 100644 --- a/xline-client/src/clients/lease.rs +++ b/xline-client/src/clients/lease.rs @@ -10,7 +10,7 @@ use xlineapi::{ }; use crate::{ - error::{ClientError, Result}, + error::{Result, XlineClientError}, lease_gen::LeaseIdGenerator, types::lease::{ LeaseGrantRequest, LeaseKeepAliveRequest, LeaseKeeper, LeaseRevokeRequest, @@ -168,7 +168,7 @@ impl LeaseClient { sender .try_send(request.into()) - .map_err(|e| ClientError::LeaseError(e.to_string()))?; + .map_err(|e| XlineClientError::LeaseError(e.to_string()))?; let mut stream = self .lease_client @@ -179,7 +179,7 @@ impl LeaseClient { let id = match stream.message().await? { Some(resp) => resp.id, None => { - return Err(ClientError::LeaseError(String::from( + return Err(XlineClientError::LeaseError(String::from( "failed to create lease keeper", ))); } diff --git a/xline-client/src/clients/lock.rs b/xline-client/src/clients/lock.rs index e78084942..a22a425a5 100644 --- a/xline-client/src/clients/lock.rs +++ b/xline-client/src/clients/lock.rs @@ -20,7 +20,7 @@ use xlineapi::{ use crate::{ clients::{lease::LeaseClient, watch::WatchClient}, - error::{ClientError, Result}, + error::{Result, XlineClientError}, lease_gen::LeaseIdGenerator, types::{ lease::LeaseGrantRequest, @@ -174,7 +174,7 @@ impl LockClient { Ok(res) => { let res = Into::::into(res.0.into_inner()); if res.kvs.is_empty() { - return Err(ClientError::RpcError(String::from("session expired"))); + return Err(XlineClientError::RpcError(String::from("session expired"))); } res.header } diff --git a/xline-client/src/clients/maintenance.rs b/xline-client/src/clients/maintenance.rs index 6947d8499..54bc80883 100644 --- a/xline-client/src/clients/maintenance.rs +++ b/xline-client/src/clients/maintenance.rs @@ -3,10 +3,7 @@ use std::{fmt::Debug, sync::Arc}; use tonic::{transport::Channel, Streaming}; use xlineapi::{SnapshotRequest, SnapshotResponse}; -use crate::{ - error::{ClientError, Result}, - AuthService, -}; +use crate::{error::Result, AuthService}; /// Client for Maintenance operations. #[derive(Clone, Debug)] @@ -66,11 +63,6 @@ impl MaintenanceClient { /// ``` #[inline] pub async fn snapshot(&mut self) -> Result> { - Ok(self - .inner - .snapshot(SnapshotRequest {}) - .await - .map_err(Into::::into)? - .into_inner()) + Ok(self.inner.snapshot(SnapshotRequest {}).await?.into_inner()) } } diff --git a/xline-client/src/clients/watch.rs b/xline-client/src/clients/watch.rs index 1f134a6ca..99faeb2a6 100644 --- a/xline-client/src/clients/watch.rs +++ b/xline-client/src/clients/watch.rs @@ -5,7 +5,7 @@ use tonic::transport::Channel; use xlineapi::{self, RequestUnion, WatchResponse}; use crate::{ - error::{ClientError, Result}, + error::{Result, XlineClientError}, types::watch::{WatchRequest, Watcher}, AuthService, }; @@ -95,7 +95,7 @@ impl WatchClient { request_sender .try_send(request) - .map_err(|e| ClientError::WatchError(e.to_string()))?; + .map_err(|e| XlineClientError::WatchError(e.to_string()))?; let mut response_stream = self.inner.watch(request_receiver).await?.into_inner(); @@ -105,7 +105,7 @@ impl WatchClient { resp.watch_id } None => { - return Err(ClientError::WatchError(String::from( + return Err(XlineClientError::WatchError(String::from( "failed to create watch", ))); } diff --git a/xline-client/src/error.rs b/xline-client/src/error.rs index ddd85ddfc..750d9d9ba 100644 --- a/xline-client/src/error.rs +++ b/xline-client/src/error.rs @@ -1,47 +1,126 @@ -use curp::error::ClientBuildError; +use curp::{ + cmd::Command as CurpCommand, + error::{ClientBuildError, ClientError}, +}; use thiserror::Error; use xline::server::Command; /// The result type for `xline-client` -pub type Result = std::result::Result; +pub type Result = std::result::Result>; + +/// Error type of client builder +#[allow(clippy::module_name_repetitions)] // this-error generate code false-positive +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum XlineClientBuildError { + /// Rpc error + #[error("Rpc error: {0}")] + RpcError(String), + /// Invalid arguments + #[error("Invalid arguments: {0}")] + InvalidArguments(String), + /// Authentication error + #[error("Authenticate error: {0}")] + AuthError(String), +} + +impl XlineClientBuildError { + /// Create a new `XlineClientBuildError::InvalidArguments` + #[inline] + #[must_use] + pub fn invalid_arguments(msg: &str) -> Self { + Self::InvalidArguments(msg.to_owned()) + } +} + +impl From for XlineClientBuildError { + #[inline] + fn from(e: tonic::transport::Error) -> Self { + Self::RpcError(e.to_string()) + } +} + +impl From for XlineClientBuildError { + #[inline] + fn from(e: tonic::Status) -> Self { + Self::RpcError(e.to_string()) + } +} + +impl From for XlineClientBuildError { + #[inline] + fn from(e: ClientBuildError) -> Self { + match e { + ClientBuildError::InvalidArguments(e) => Self::InvalidArguments(e), + ClientBuildError::RpcError(e) => Self::RpcError(e), + _ => unreachable!("unknown ClientBuildError type"), + } + } +} /// The error type for `xline-client` #[derive(Error, Debug)] #[non_exhaustive] -pub enum ClientError { - /// Propose error - #[error("propose error {0}")] - ProposeError(#[from] curp::error::CommandProposeError), +pub enum XlineClientError { + /// Command error + #[error("command execute error {0}")] + CommandError(C::Error), /// Io error #[error("IO error {0}")] - IoError(#[from] std::io::Error), - /// Rpc error + IoError(String), + /// RPC error #[error("rpc error: {0}")] RpcError(String), /// Arguments invalid error #[error("Invalid arguments: {0}")] InvalidArgs(String), + /// Internal Error + #[error("Client Internal error: {0}")] + InternalError(String), /// Error in watch client #[error("Watch client error: {0}")] WatchError(String), /// Error in lease client #[error("Lease client error: {0}")] LeaseError(String), - /// Curp client build error - #[error("Curp client build error: {0}")] - BuildError(#[from] ClientBuildError), + /// Request Timeout + #[error("Request timeout")] + Timeout, + /// Server is shutting down + #[error("Curp Server is shutting down")] + ShuttingDown, + /// Serialize and Deserialize Error + #[error("EncodeDecode error: {0}")] + EncodeDecode(String), } -impl From for ClientError { +impl From for XlineClientError { #[inline] fn from(e: tonic::transport::Error) -> Self { Self::RpcError(e.to_string()) } } -impl From for ClientError { +impl From for XlineClientError { #[inline] fn from(e: tonic::Status) -> Self { Self::RpcError(e.to_string()) } } + +impl From> for XlineClientError { + #[inline] + fn from(e: ClientError) -> Self { + match e { + ClientError::CommandError(e) => Self::CommandError(e), + ClientError::IoError(e) => Self::IoError(e), + ClientError::OutOfBound(s) => Self::RpcError(s.to_string()), + ClientError::InvalidArgs(e) => Self::InvalidArgs(e), + ClientError::InternalError(e) => Self::InternalError(e), + ClientError::Timeout => Self::Timeout, + ClientError::ShuttingDown => Self::ShuttingDown, + ClientError::EncodeDecode(e) => Self::EncodeDecode(e), + _ => unreachable!("unknow ClientError type"), + } + } +} diff --git a/xline-client/src/lib.rs b/xline-client/src/lib.rs index 2602acf69..3d45ff56c 100644 --- a/xline-client/src/lib.rs +++ b/xline-client/src/lib.rs @@ -163,18 +163,19 @@ use crate::{ AuthClient, ClusterClient, ElectionClient, KvClient, LeaseClient, LockClient, MaintenanceClient, WatchClient, }, - error::{ClientError, Result}, + error::XlineClientBuildError, }; /// Sub-clients for each type of API pub mod clients; -/// Error definitions for `xline-client`. -pub mod error; /// Lease Id generator mod lease_gen; /// Request type definitions. pub mod types; +/// Error definitions for `xline-client`. +pub mod error; + /// Xline client #[derive(Clone, Debug)] pub struct Client { @@ -204,7 +205,10 @@ impl Client { /// If `Self::build_channel` fails. #[inline] #[allow(clippy::pattern_type_mismatch)] // allow mismatch in map - pub async fn connect(all_members: S, options: ClientOptions) -> Result + pub async fn connect( + all_members: S, + options: ClientOptions, + ) -> Result where E: AsRef, S: IntoIterator, @@ -227,7 +231,8 @@ impl Client { let mut tmp_auth = AuthClient::new(Arc::clone(&curp_client), channel.clone(), None); let resp = tmp_auth .authenticate(types::auth::AuthenticateRequest::new(username, password)) - .await?; + .await + .map_err(|err| XlineClientBuildError::AuthError(err.to_string()))?; Some(resp.token) } @@ -266,17 +271,16 @@ impl Client { } /// Build a tonic load balancing channel. - async fn build_channel(addrs: Vec) -> Result { + async fn build_channel(addrs: Vec) -> Result { let (channel, tx) = Channel::balance_channel(64); for mut addr in addrs { if !addr.starts_with("http://") { addr.insert_str(0, "http://"); } - let endpoint = Channel::builder( - addr.parse() - .map_err(|_e| ClientError::InvalidArgs(String::from("Invalid uri")))?, - ); + let endpoint = Channel::builder(addr.parse().map_err(|_e| { + XlineClientBuildError::InvalidArguments(String::from("Invalid uri")) + })?); tx.send(tower::discover::Change::Insert( endpoint.uri().clone(), @@ -427,7 +431,7 @@ where type Future = S::Future; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } diff --git a/xline-client/src/types/lease.rs b/xline-client/src/types/lease.rs index 4723d80c4..fbf39fad6 100644 --- a/xline-client/src/types/lease.rs +++ b/xline-client/src/types/lease.rs @@ -4,7 +4,7 @@ pub use xlineapi::{ LeaseStatus, LeaseTimeToLiveResponse, }; -use crate::error::{ClientError, Result}; +use crate::error::{Result, XlineClientError}; /// The lease keep alive handle. #[derive(Debug)] @@ -39,7 +39,7 @@ impl LeaseKeeper { pub fn keep_alive(&mut self) -> Result<()> { self.sender .try_send(LeaseKeepAliveRequest::new(self.id).into()) - .map_err(|e| ClientError::LeaseError(e.to_string())) + .map_err(|e| XlineClientError::LeaseError(e.to_string())) } } diff --git a/xline-client/src/types/watch.rs b/xline-client/src/types/watch.rs index b98d2b45f..92dda45d1 100644 --- a/xline-client/src/types/watch.rs +++ b/xline-client/src/types/watch.rs @@ -5,7 +5,7 @@ use xline::server::KeyRange; pub use xlineapi::{Event, EventType, KeyValue, WatchResponse}; use xlineapi::{RequestUnion, WatchCancelRequest, WatchProgressRequest}; -use crate::error::{ClientError, Result}; +use crate::error::{Result, XlineClientError}; /// The watching handle. #[derive(Debug)] @@ -44,7 +44,7 @@ impl Watcher { self.sender .try_send(request) - .map_err(|e| ClientError::WatchError(e.to_string())) + .map_err(|e| XlineClientError::WatchError(e.to_string())) } /// Cancels this watcher. @@ -62,7 +62,7 @@ impl Watcher { self.sender .try_send(request) - .map_err(|e| ClientError::WatchError(e.to_string())) + .map_err(|e| XlineClientError::WatchError(e.to_string())) } /// Cancels watch by specified `watch_id`. @@ -78,7 +78,7 @@ impl Watcher { self.sender .try_send(request) - .map_err(|e| ClientError::WatchError(e.to_string())) + .map_err(|e| XlineClientError::WatchError(e.to_string())) } /// Requests a watch stream progress status be sent in the watch response stream as soon as @@ -95,7 +95,7 @@ impl Watcher { self.sender .try_send(request) - .map_err(|e| ClientError::WatchError(e.to_string())) + .map_err(|e| XlineClientError::WatchError(e.to_string())) } } diff --git a/xline-client/tests/auth.rs b/xline-client/tests/auth.rs index 5be396b2a..ed1d7368b 100644 --- a/xline-client/tests/auth.rs +++ b/xline-client/tests/auth.rs @@ -14,7 +14,7 @@ mod common; #[tokio::test(flavor = "multi_thread")] async fn role_operations_should_success_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.auth_client(); let role1 = "role1"; let role2 = "role2"; @@ -52,7 +52,7 @@ async fn role_operations_should_success_in_normal_path() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn permission_operations_should_success_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.auth_client(); let role1 = "role1"; @@ -122,7 +122,7 @@ async fn permission_operations_should_success_in_normal_path() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn user_operations_should_success_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.auth_client(); let name1 = "usr1"; @@ -154,7 +154,7 @@ async fn user_operations_should_success_in_normal_path() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn user_role_operations_should_success_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.auth_client(); let name1 = "usr1"; diff --git a/xline-client/tests/common.rs b/xline-client/tests/common.rs index 70b206242..17bf0e39b 100644 --- a/xline-client/tests/common.rs +++ b/xline-client/tests/common.rs @@ -1,7 +1,7 @@ -use xline_client::{error::Result, Client, ClientOptions}; +use xline_client::{error::XlineClientBuildError, Client, ClientOptions}; use xline_test_utils::Cluster; -pub async fn get_cluster_client() -> Result<(Cluster, Client)> { +pub async fn get_cluster_client() -> Result<(Cluster, Client), XlineClientBuildError> { let mut cluster = Cluster::new(3).await; cluster.start().await; let client = Client::connect(cluster.addrs(), ClientOptions::default()).await?; diff --git a/xline-client/tests/kv.rs b/xline-client/tests/kv.rs index 57e820b0b..3061b29dc 100644 --- a/xline-client/tests/kv.rs +++ b/xline-client/tests/kv.rs @@ -14,7 +14,7 @@ mod common; #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn put_should_success_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.kv_client(); let request = PutRequest::new("put", "123"); @@ -48,7 +48,7 @@ async fn put_should_success_in_normal_path() -> Result<()> { #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn range_should_fetches_previously_put_keys() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.kv_client(); client.put(PutRequest::new("get10", "10")).await?; @@ -99,7 +99,7 @@ async fn range_should_fetches_previously_put_keys() -> Result<()> { #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn delete_should_remove_previously_put_kvs() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.kv_client(); client.put(PutRequest::new("del10", "10")).await?; @@ -173,7 +173,7 @@ async fn delete_should_remove_previously_put_kvs() -> Result<()> { #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn txn_should_execute_as_expected() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.kv_client(); client.put(PutRequest::new("txn01", "01")).await?; @@ -238,7 +238,7 @@ async fn txn_should_execute_as_expected() -> Result<()> { #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn compact_should_remove_previous_revision() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.kv_client(); client.put(PutRequest::new("compact", "0")).await?; diff --git a/xline-client/tests/lease.rs b/xline-client/tests/lease.rs index 21f54acfe..5aaa08cbe 100644 --- a/xline-client/tests/lease.rs +++ b/xline-client/tests/lease.rs @@ -10,7 +10,7 @@ mod common; #[tokio::test(flavor = "multi_thread")] async fn grant_revoke_should_success_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let mut client = client.lease_client(); let resp = client.grant(LeaseGrantRequest::new(123)).await?; @@ -23,7 +23,7 @@ async fn grant_revoke_should_success_in_normal_path() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn keep_alive_should_success_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let mut client = client.lease_client(); let resp = client.grant(LeaseGrantRequest::new(60)).await?; @@ -44,7 +44,7 @@ async fn keep_alive_should_success_in_normal_path() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn time_to_live_ttl_is_consistent_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let mut client = client.lease_client(); let lease_id = 200; @@ -71,7 +71,7 @@ async fn leases_should_include_granted_in_normal_path() -> Result<()> { let lease2 = 101; let lease3 = 102; - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let mut client = client.lease_client(); let resp = client diff --git a/xline-client/tests/lock.rs b/xline-client/tests/lock.rs index 6da01d7f3..df0ff2d21 100644 --- a/xline-client/tests/lock.rs +++ b/xline-client/tests/lock.rs @@ -11,7 +11,7 @@ mod common; #[tokio::test(flavor = "multi_thread")] async fn lock_unlock_should_success_in_normal_path() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.lock_client(); let resp = client @@ -28,7 +28,7 @@ async fn lock_unlock_should_success_in_normal_path() -> Result<()> { #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.lock_client(); let client_c = client.clone(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); @@ -69,7 +69,7 @@ async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> { #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn lock_should_timeout_when_ttl_is_set() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.lock_client(); let _resp = client @@ -92,7 +92,7 @@ async fn lock_should_timeout_when_ttl_is_set() -> Result<()> { #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn lock_should_unlock_after_cancelled() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let client = client.lock_client(); let client_c = client.clone(); // first acquire the lock diff --git a/xline-client/tests/maintenance.rs b/xline-client/tests/maintenance.rs index aa6baff43..5a5052920 100644 --- a/xline-client/tests/maintenance.rs +++ b/xline-client/tests/maintenance.rs @@ -5,7 +5,7 @@ mod common; #[tokio::test(flavor = "multi_thread")] async fn snapshot_should_get_valid_data() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let mut client = client.maintenance_client(); let mut msg = client.snapshot().await?; diff --git a/xline-client/tests/watch.rs b/xline-client/tests/watch.rs index 617c236df..e02f59bd1 100644 --- a/xline-client/tests/watch.rs +++ b/xline-client/tests/watch.rs @@ -13,7 +13,7 @@ mod common; #[tokio::test(flavor = "multi_thread")] async fn watch_should_receive_consistent_events() -> Result<()> { - let (_cluster, client) = get_cluster_client().await?; + let (_cluster, client) = get_cluster_client().await.unwrap(); let mut watch_client = client.watch_client(); let kv_client = client.kv_client(); diff --git a/xline-test-utils/Cargo.toml b/xline-test-utils/Cargo.toml index 202ddbcc5..08a1cccbb 100644 --- a/xline-test-utils/Cargo.toml +++ b/xline-test-utils/Cargo.toml @@ -11,6 +11,7 @@ license = "Apache-2.0" readme = "README.md" [dependencies] +anyhow = "1.0.75" clap = { version = "4.4.4", features = ["derive"] } curp = { path = "../curp" } futures = "0.3.28" diff --git a/xline-test-utils/src/bin/validation_lock_client.rs b/xline-test-utils/src/bin/validation_lock_client.rs index 485ee65f9..19fcd51e5 100644 --- a/xline-test-utils/src/bin/validation_lock_client.rs +++ b/xline-test-utils/src/bin/validation_lock_client.rs @@ -1,8 +1,8 @@ //! this binary is only used for the validation of lock service +use anyhow::Result; use clap::{Parser, Subcommand}; use xline_client::{ - error::Result, types::lock::{LockRequest, UnlockRequest}, Client, ClientOptions, }; diff --git a/xline/src/server/auth_server.rs b/xline/src/server/auth_server.rs index d1ee27b6e..45e133feb 100644 --- a/xline/src/server/auth_server.rs +++ b/xline/src/server/auth_server.rs @@ -10,7 +10,7 @@ use tracing::debug; use xlineapi::RequestWithToken; use super::command::{ - command_from_request_wrapper, propose_err_to_status, Command, CommandResponse, SyncResponse, + client_err_to_status, command_from_request_wrapper, Command, CommandResponse, SyncResponse, }; use crate::{ request_validation::RequestValidator, @@ -77,13 +77,13 @@ where .client .gen_propose_id() .await - .map_err(propose_err_to_status)?; + .map_err(client_err_to_status)?; let cmd = command_from_request_wrapper::(propose_id, wrapper, None); self.client .propose(cmd, use_fast_path) .await - .map_err(propose_err_to_status) + .map_err(client_err_to_status) } /// Hash password diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index 8d49b2d6e..023e52260 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -9,7 +9,7 @@ use curp::{ Command as CurpCommand, CommandExecutor as CurpCommandExecutor, ConflictCheck, PbCodec, PbSerializeError, ProposeId, }, - error::{CommandProposeError, ProposeError}, + error::ClientError, LogIndex, }; use engine::Snapshot; @@ -754,11 +754,11 @@ where Command::new(keys, wrapper, propose_id) } -/// Convert `CommandProposeError` to `tonic::Status` -pub(super) fn propose_err_to_status(err: CommandProposeError) -> tonic::Status { +/// Convert `ClientError` to `tonic::Status` +pub(super) fn client_err_to_status(err: ClientError) -> tonic::Status { #[allow(clippy::wildcard_enum_match_arm)] match err { - CommandProposeError::Execute(e) | CommandProposeError::AfterSync(e) => { + ClientError::CommandError(e) => { // If an error occurs during the `prepare` or `execute` stages, `after_sync` will // not be invoked. In this case, `wait_synced` will return the errors generated // in the first two stages. Therefore, if the response from `slow_round` arrives @@ -766,13 +766,12 @@ pub(super) fn propose_err_to_status(err: CommandProposeError) -> tonic: // even though `after_sync` is not called. tonic::Status::from(e) } - CommandProposeError::Propose(ProposeError::SyncedError(e)) => { - tonic::Status::unknown(e.to_string()) - } - CommandProposeError::Propose(ProposeError::EncodeError(e)) => tonic::Status::internal(e), - CommandProposeError::Propose(ProposeError::Timeout) => tonic::Status::internal("timeout"), + ClientError::ShuttingDown => tonic::Status::unavailable("Curp Server is shutting down"), + ClientError::OutOfBound(status) => status, + ClientError::EncodeDecode(msg) => tonic::Status::internal(msg), + ClientError::Timeout => tonic::Status::unavailable("request timed out"), - _ => unreachable!("propose err {err:?}"), + _ => unreachable!("curp client error {err:?}"), } } diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index a4e3699aa..b0e7d1f3d 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -9,7 +9,7 @@ use xlineapi::ResponseWrapper; use super::{ auth_server::get_token, barriers::{IdBarrier, IndexBarrier}, - command::{propose_err_to_status, Command, CommandResponse, SyncResponse}, + command::{client_err_to_status, Command, CommandResponse, SyncResponse}, }; use crate::{ request_validation::RequestValidator, @@ -98,13 +98,13 @@ where .client .gen_propose_id() .await - .map_err(propose_err_to_status)?; + .map_err(client_err_to_status)?; let cmd = command_from_request_wrapper::(propose_id, wrapper, None); self.client .propose(cmd, use_fast_path) .await - .map_err(propose_err_to_status) + .map_err(client_err_to_status) } /// Update revision of `ResponseHeader` @@ -207,7 +207,7 @@ where .client .gen_propose_id() .await - .map_err(propose_err_to_status)?; + .map_err(client_err_to_status)?; let cmd = command_from_request_wrapper::(propose_id, wrapper, None); if !is_serializable { self.wait_read_state(&cmd).await?; @@ -308,7 +308,7 @@ where .client .gen_propose_id() .await - .map_err(propose_err_to_status)?; + .map_err(client_err_to_status)?; let cmd = command_from_request_wrapper::(propose_id, wrapper, None); if !is_serializable { self.wait_read_state(&cmd).await?; diff --git a/xline/src/server/lease_server.rs b/xline/src/server/lease_server.rs index 8ca3b44b0..d27c62be3 100644 --- a/xline/src/server/lease_server.rs +++ b/xline/src/server/lease_server.rs @@ -13,7 +13,7 @@ use xlineapi::RequestWithToken; use super::{ auth_server::get_token, command::{ - command_from_request_wrapper, propose_err_to_status, Command, CommandResponse, SyncResponse, + client_err_to_status, command_from_request_wrapper, Command, CommandResponse, SyncResponse, }, }; use crate::{ @@ -127,14 +127,14 @@ where .client .gen_propose_id() .await - .map_err(propose_err_to_status)?; + .map_err(client_err_to_status)?; let cmd = command_from_request_wrapper(propose_id, wrapper, Some(self.lease_storage.as_ref())); self.client .propose(cmd, use_fast_path) .await - .map_err(propose_err_to_status) + .map_err(client_err_to_status) } /// Handle keep alive at leader @@ -319,7 +319,7 @@ where .client .get_leader_id_from_curp() .await - .map_err(propose_err_to_status)?; + .map_err(client_err_to_status)?; // Given that a candidate server may become a leader when it won the election or // a follower when it lost the election. Therefore we need to double check here. // We can directly invoke leader_keep_alive when a candidate becomes a leader. @@ -375,7 +375,7 @@ where .client .get_leader_id_from_curp() .await - .map_err(propose_err_to_status)?; + .map_err(client_err_to_status)?; let leader_addrs = self.cluster_info.addrs(leader_id).unwrap_or_else(|| { unreachable!( "The address of leader {} not found in all_members {:?}", diff --git a/xline/src/server/lock_server.rs b/xline/src/server/lock_server.rs index c55e5c247..b03b3f976 100644 --- a/xline/src/server/lock_server.rs +++ b/xline/src/server/lock_server.rs @@ -10,7 +10,7 @@ use xlineapi::{EventType, RequestWithToken}; use super::{ auth_server::get_token, command::{ - command_from_request_wrapper, propose_err_to_status, Command, CommandResponse, KeyRange, + client_err_to_status, command_from_request_wrapper, Command, CommandResponse, KeyRange, SyncResponse, }, }; @@ -85,13 +85,13 @@ where .client .gen_propose_id() .await - .map_err(propose_err_to_status)?; + .map_err(client_err_to_status)?; let cmd = command_from_request_wrapper::(propose_id, wrapper, None); self.client .propose(cmd, use_fast_path) .await - .map_err(propose_err_to_status) + .map_err(client_err_to_status) } /// Crate txn for try acquire lock diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index 268071a37..8a4693b37 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; -use curp::{client::Client, error::CommandProposeError}; +use curp::{client::Client, error::ClientError}; use event_listener::Event; use periodic_compactor::PeriodicCompactor; use revision_compactor::RevisionCompactor; @@ -44,12 +44,12 @@ pub(crate) trait Compactor: std::fmt::Debug + Send + Sync { #[async_trait] pub(crate) trait Compactable: std::fmt::Debug + Send + Sync { /// do compact - async fn compact(&self, revision: i64) -> Result<(), CommandProposeError>; + async fn compact(&self, revision: i64) -> Result<(), ClientError>; } #[async_trait] impl Compactable for Client { - async fn compact(&self, revision: i64) -> Result<(), CommandProposeError> { + async fn compact(&self, revision: i64) -> Result<(), ClientError> { let request = CompactionRequest { revision, physical: false, diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs index 396ca477f..ffbddf34d 100644 --- a/xline/src/storage/compact/periodic_compactor.rs +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -8,7 +8,7 @@ use std::{ }; use clippy_utilities::OverflowArithmetic; -use curp::error::CommandProposeError; +use curp::error::ClientError; use tracing::{info, warn}; use utils::shutdown; @@ -118,10 +118,8 @@ impl PeriodicCompactor { ); return target_revision; } - if let Err( - CommandProposeError::Execute(ExecuteError::RevisionCompacted(_, compacted_rev)) - | CommandProposeError::AfterSync(ExecuteError::RevisionCompacted(_, compacted_rev)), - ) = res + if let Err(ClientError::CommandError(ExecuteError::RevisionCompacted(_, compacted_rev))) = + res { info!( "required revision {} has been compacted, the current compacted revision is {}, period = {:?}, took {:?}", diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs index 5d2ae7a12..83bf255ba 100644 --- a/xline/src/storage/compact/revision_compactor.rs +++ b/xline/src/storage/compact/revision_compactor.rs @@ -7,7 +7,7 @@ use std::{ }; use clippy_utilities::OverflowArithmetic; -use curp::error::CommandProposeError; +use curp::error::ClientError; use tracing::{info, warn}; use utils::shutdown; @@ -77,10 +77,8 @@ impl RevisionCompactor { ); return Some(target_revision); } - if let Err( - CommandProposeError::Execute(ExecuteError::RevisionCompacted(_, compacted_rev)) - | CommandProposeError::AfterSync(ExecuteError::RevisionCompacted(_, compacted_rev)), - ) = res + if let Err(ClientError::CommandError(ExecuteError::RevisionCompacted(_, compacted_rev))) = + res { info!( "required revision {} has been compacted, the current compacted revision is {}, retention = {:?}", diff --git a/xlinectl/src/command/lease/keep_alive.rs b/xlinectl/src/command/lease/keep_alive.rs index 4d4b3c54b..6638b63b6 100644 --- a/xlinectl/src/command/lease/keep_alive.rs +++ b/xlinectl/src/command/lease/keep_alive.rs @@ -4,7 +4,7 @@ use clap::{arg, value_parser, ArgMatches, Command}; use tokio::signal::ctrl_c; use tonic::Streaming; use xline_client::{ - error::{ClientError, Result}, + error::{Result, XlineClientError}, types::lease::{LeaseKeepAliveRequest, LeaseKeeper}, Client, }; @@ -62,7 +62,7 @@ async fn keep_alive_loop( if let Some(resp) = stream.message().await? { resp.print(); if resp.ttl < 0 { - return Err(ClientError::InvalidArgs(String::from( + return Err(XlineClientError::InvalidArgs(String::from( "lease keepalive response has negative ttl", ))); } diff --git a/xlinectl/src/command/snapshot.rs b/xlinectl/src/command/snapshot.rs index b13553380..08a741680 100644 --- a/xlinectl/src/command/snapshot.rs +++ b/xlinectl/src/command/snapshot.rs @@ -1,7 +1,10 @@ use std::{fs::File, io::Write, path::PathBuf}; use clap::{arg, ArgMatches, Command}; -use xline_client::{error::Result, Client}; +use xline_client::{ + error::{Result, XlineClientError}, + Client, +}; /// Definition of `snapshot` command pub(crate) fn command() -> Command { @@ -26,14 +29,16 @@ pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result return Ok(()); } - let mut file = File::create(path)?; + let mut file = + File::create(path).map_err(|err| XlineClientError::IoError(err.to_string()))?; let mut all = Vec::new(); while let Some(data) = resp.message().await? { all.extend_from_slice(&data.blob); } - file.write_all(&all)?; + file.write_all(&all) + .map_err(|err| XlineClientError::IoError(err.to_string()))?; println!("snapshot saved to: {filename}"); }