From 26dcf3bd2aa55ab4683f2a77529041e400416880 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 21 Dec 2023 13:58:23 +0800 Subject: [PATCH 1/6] test: add tests for unary client Signed-off-by: iGxnon --- curp/src/client_new/mod.rs | 112 ++++++++--- curp/src/client_new/retry.rs | 38 +++- curp/src/client_new/tests.rs | 347 +++++++++++++++++++++++++++++++++++ curp/src/client_new/unary.rs | 134 +++++++++++--- 4 files changed, 577 insertions(+), 54 deletions(-) create mode 100644 curp/src/client_new/tests.rs diff --git a/curp/src/client_new/mod.rs b/curp/src/client_new/mod.rs index d7d65079e..8cba0c8e3 100644 --- a/curp/src/client_new/mod.rs +++ b/curp/src/client_new/mod.rs @@ -7,6 +7,10 @@ mod unary; /// Retry layer mod retry; +/// Tests for client +#[cfg(test)] +mod tests; + use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; @@ -14,11 +18,16 @@ use curp_external_api::cmd::Command; use futures::{stream::FuturesUnordered, StreamExt}; use utils::config::ClientConfig; +use self::{ + retry::Retry, + unary::{UnaryBuilder, UnaryConfig}, +}; use crate::{ + client_new::retry::RetryConfig, members::ServerId, rpc::{ connect::ConnectApi, protocol_client::ProtocolClient, ConfChange, FetchClusterRequest, - FetchClusterResponse, Member, ReadState, + FetchClusterResponse, Member, Protocol, ReadState, }, }; @@ -99,35 +108,38 @@ trait LeaderStateUpdate { /// Client builder to build a client #[derive(Debug, Clone)] -pub struct ClientBuilder { - /// local server id - local_server_id: Option, +pub struct ClientBuilder { + /// local server + local_server: Option<(ServerId, P)>, /// initial cluster version cluster_version: Option, + /// initial cluster members + all_members: Option>>, + /// initial leader state + leader_state: Option<(ServerId, u64)>, /// client configuration config: ClientConfig, - /// initial all members - all_members: Option>>, } -impl ClientBuilder { +impl ClientBuilder

{ /// Create a client builder #[inline] #[must_use] pub fn new(config: ClientConfig) -> Self { Self { - local_server_id: None, - config, - all_members: None, + local_server: None, cluster_version: None, + all_members: None, + leader_state: None, + config, } } - /// Set the local server id + /// Set the local server to bypass `gRPC` request #[inline] #[must_use] - pub fn local_server_id(&mut self, id: ServerId) -> &mut Self { - self.local_server_id = Some(id); + pub fn bypass(&mut self, id: ServerId, server: P) -> &mut Self { + self.local_server = Some((id, server)); self } @@ -139,16 +151,29 @@ impl ClientBuilder { self } - /// Fetch initial all members from some endpoints if you do not know the whole members + /// Set the initial all members + #[inline] + #[must_use] + pub fn all_members(&mut self, all_members: HashMap>) -> &mut Self { + self.all_members = Some(all_members); + self + } + + /// Set the initial leader state + #[inline] + #[must_use] + pub fn leader_state(&mut self, leader_id: ServerId, term: u64) -> &mut Self { + self.leader_state = Some((leader_id, term)); + self + } + + /// Discover the initial states from some endpoints /// /// # Errors /// /// Return `tonic::Status` for connection failure or some server errors. #[inline] - pub async fn fetch_all_members( - &mut self, - addrs: Vec, - ) -> Result<&mut Self, tonic::Status> { + pub async fn discover_from(&mut self, addrs: Vec) -> Result<&mut Self, tonic::Status> { let propose_timeout = *self.config.propose_timeout(); let mut futs: FuturesUnordered<_> = addrs .into_iter() @@ -168,10 +193,14 @@ impl ClientBuilder { }) .collect(); let mut err = tonic::Status::invalid_argument("addrs is empty"); + // find the first one return `FetchClusterResponse` while let Some(r) = futs.next().await { match r { Ok(r) => { self.cluster_version = Some(r.cluster_version); + if let Some(id) = r.leader_id { + self.leader_state = Some((id, r.term)); + } self.all_members = Some(r.into_members_addrs()); return Ok(self); } @@ -181,11 +210,48 @@ impl ClientBuilder { Err(err) } - /// Set the initial all members + /// Build the client + /// + /// # Errors + /// + /// Return `tonic::transport::Error` for connection failure. #[inline] - #[must_use] - pub fn set_all_members(&mut self, all_members: HashMap>) -> &mut Self { - self.all_members = Some(all_members); - self + pub async fn build( + self, + ) -> Result, tonic::transport::Error> { + let mut builder = UnaryBuilder::

::new( + self.all_members.unwrap_or_else(|| { + unreachable!("must set the initial members or discover from some endpoints") + }), + UnaryConfig::new_full( + *self.config.propose_timeout(), + *self.config.wait_synced_timeout(), + ), + ); + if let Some(version) = self.cluster_version { + builder.set_cluster_version(version); + } + if let Some((id, server)) = self.local_server { + builder.set_local_server(id, server); + } + if let Some((id, term)) = self.leader_state { + builder.set_leader_state(id, term); + } + let unary = builder.build::().await?; + + let retry_config = if *self.config.use_backoff() { + RetryConfig::new_exponential( + *self.config.initial_retry_timeout(), + *self.config.max_retry_timeout(), + *self.config.retry_count(), + ) + } else { + RetryConfig::new_fixed( + *self.config.initial_retry_timeout(), + *self.config.retry_count(), + ) + }; + let client = Retry::new(unary, retry_config); + Ok(client) } } diff --git a/curp/src/client_new/retry.rs b/curp/src/client_new/retry.rs index a33ac316b..7983f0bb4 100644 --- a/curp/src/client_new/retry.rs +++ b/curp/src/client_new/retry.rs @@ -19,7 +19,7 @@ use super::{ClientApi, LeaderStateUpdate, ProposeResponse}; /// Backoff config #[derive(Debug, Clone)] -pub(super) enum BackoffConfig { +enum BackoffConfig { /// A fixed delay backoff Fixed, /// A exponential delay backoff @@ -53,7 +53,7 @@ struct Backoff { impl RetryConfig { /// Create a fixed retry config - fn new_fixed(delay: Duration, count: usize) -> Self { + pub(super) fn new_fixed(delay: Duration, count: usize) -> Self { assert!(count > 0, "retry count should be larger than 0"); Self { backoff: BackoffConfig::Fixed, @@ -63,7 +63,7 @@ impl RetryConfig { } /// Create a exponential retry config - fn new_exponential(delay: Duration, max_delay: Duration, count: usize) -> Self { + pub(super) fn new_exponential(delay: Duration, max_delay: Duration, count: usize) -> Self { assert!(count > 0, "retry count should be larger than 0"); Self { backoff: BackoffConfig::Exponential { max_delay }, @@ -116,7 +116,7 @@ where Api: ClientApi + LeaderStateUpdate + Send + Sync + 'static, { /// Create a retry client - fn new(inner: Api, config: RetryConfig) -> Self { + pub(super) fn new(inner: Api, config: RetryConfig) -> Self { Self { inner, config } } @@ -245,3 +245,33 @@ where .await } } + +/// Tests for backoff +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::RetryConfig; + + #[test] + fn test_fixed_backoff_works() { + let config = RetryConfig::new_fixed(Duration::from_secs(1), 3); + let mut backoff = config.init_backoff(); + assert_eq!(backoff.next_delay(), Some(Duration::from_secs(1))); + assert_eq!(backoff.next_delay(), Some(Duration::from_secs(1))); + assert_eq!(backoff.next_delay(), Some(Duration::from_secs(1))); + assert_eq!(backoff.next_delay(), None); + } + + #[test] + fn test_exponential_backoff_works() { + let config = + RetryConfig::new_exponential(Duration::from_secs(1), Duration::from_secs(5), 4); + let mut backoff = config.init_backoff(); + assert_eq!(backoff.next_delay(), Some(Duration::from_secs(1))); + assert_eq!(backoff.next_delay(), Some(Duration::from_secs(2))); + assert_eq!(backoff.next_delay(), Some(Duration::from_secs(4))); + assert_eq!(backoff.next_delay(), Some(Duration::from_secs(5))); // 8 > 5 + assert_eq!(backoff.next_delay(), None); + } +} diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs new file mode 100644 index 000000000..91418ecd4 --- /dev/null +++ b/curp/src/client_new/tests.rs @@ -0,0 +1,347 @@ +use std::{ + collections::HashMap, + ops::AddAssign, + sync::{Arc, Mutex}, +}; + +use curp_test_utils::test_cmd::{TestCommand, TestCommandResult}; +use dashmap::DashMap; +use tracing_test::traced_test; + +use super::unary::Unary; +use crate::{ + client_new::ClientApi, + members::ServerId, + rpc::{ + connect::{ConnectApi, MockConnectApi}, + CurpError, FetchClusterResponse, Member, ProposeId, ProposeResponse, + }, +}; + +/// Create a mocked connects with server id from 0~size +#[allow(trivial_casts)] // Trait object with high ranked type inferences failed, cast manually +fn init_mocked_connects( + size: usize, + f: impl Fn(usize, &mut MockConnectApi), +) -> DashMap> { + std::iter::repeat_with(|| MockConnectApi::new()) + .take(size) + .enumerate() + .map(|(id, mut conn)| { + conn.expect_id().returning(move || id as ServerId); + conn.expect_update_addrs().returning(|_addr| Ok(())); + f(id, &mut conn); + (id as ServerId, Arc::new(conn) as Arc) + }) + .collect() +} + +// Tests for unary client + +#[traced_test] +#[tokio::test] +async fn test_unary_fetch_clusters_serializable() { + let connects = init_mocked_connects(3, |_id, conn| { + conn.expect_fetch_cluster().return_once(|_req, _timeout| { + Ok(tonic::Response::new(FetchClusterResponse { + leader_id: Some(0), + term: 1, + cluster_id: 123, + members: vec![ + Member::new(0, "S0", vec!["A0".to_owned()], false), + Member::new(1, "S1", vec!["A1".to_owned()], false), + Member::new(2, "S2", vec!["A2".to_owned()], false), + ], + cluster_version: 1, + })) + }); + }); + let unary = Unary::::new(connects, None, None); + assert!(unary.local_connect().is_none()); + let res = unary.fetch_cluster(false).await.unwrap(); + assert_eq!( + res.into_members_addrs(), + HashMap::from([ + (0, vec!["A0".to_owned()]), + (1, vec!["A1".to_owned()]), + (2, vec!["A2".to_owned()]) + ]) + ); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_fetch_clusters_serializable_local_first() { + let connects = init_mocked_connects(3, |id, conn| { + conn.expect_fetch_cluster() + .return_once(move |_req, _timeout| { + let members = if id == 1 { + // local server(1) does not see the cluster members + vec![] + } else { + panic!("other server's `fetch_cluster` should not be invoked"); + }; + Ok(tonic::Response::new(FetchClusterResponse { + leader_id: Some(0), + term: 1, + cluster_id: 123, + members, + cluster_version: 1, + })) + }); + }); + let unary = Unary::::new(connects, Some(1), None); + assert!(unary.local_connect().is_some()); + let res = unary.fetch_cluster(false).await.unwrap(); + assert!(res.members.is_empty()); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_fetch_clusters_linearizable() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_fetch_cluster() + .return_once(move |_req, _timeout| { + let resp = match id { + 0 => FetchClusterResponse { + leader_id: Some(0), + term: 2, + cluster_id: 123, + members: vec![ + Member::new(0, "S0", vec!["A0".to_owned()], false), + Member::new(1, "S1", vec!["A1".to_owned()], false), + Member::new(2, "S2", vec!["A2".to_owned()], false), + Member::new(3, "S3", vec!["A3".to_owned()], false), + Member::new(4, "S4", vec!["A4".to_owned()], false), + ], + cluster_version: 1, + }, + 1 | 4 => FetchClusterResponse { + leader_id: Some(0), + term: 2, + cluster_id: 123, + members: vec![], // linearizable read from follower returns empty members + cluster_version: 1, + }, + 2 => FetchClusterResponse { + leader_id: None, // imagine this node is a disconnected candidate + term: 23, // with a high term + cluster_id: 123, + members: vec![], + cluster_version: 1, + }, + 3 => FetchClusterResponse { + leader_id: Some(3), // imagine this node is a old leader + term: 1, // with the old term + cluster_id: 123, + members: vec![ + Member::new(0, "S0", vec!["B0".to_owned()], false), + Member::new(1, "S1", vec!["B1".to_owned()], false), + Member::new(2, "S2", vec!["B2".to_owned()], false), + Member::new(3, "S3", vec!["B3".to_owned()], false), + Member::new(4, "S4", vec!["B4".to_owned()], false), + ], + cluster_version: 1, + }, + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + }); + let unary = Unary::::new(connects, None, None); + assert!(unary.local_connect().is_none()); + let res = unary.fetch_cluster(true).await.unwrap(); + assert_eq!( + res.into_members_addrs(), + HashMap::from([ + (0, vec!["A0".to_owned()]), + (1, vec!["A1".to_owned()]), + (2, vec!["A2".to_owned()]), + (3, vec!["A3".to_owned()]), + (4, vec!["A4".to_owned()]) + ]) + ); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_fetch_clusters_linearizable_failed() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_fetch_cluster() + .return_once(move |_req, _timeout| { + let resp = match id { + 0 => FetchClusterResponse { + leader_id: Some(0), + term: 2, + cluster_id: 123, + members: vec![ + Member::new(0, "S0", vec!["A0".to_owned()], false), + Member::new(1, "S1", vec!["A1".to_owned()], false), + Member::new(2, "S2", vec!["A2".to_owned()], false), + Member::new(3, "S3", vec!["A3".to_owned()], false), + Member::new(4, "S4", vec!["A4".to_owned()], false), + ], + cluster_version: 1, + }, + 1 => FetchClusterResponse { + leader_id: Some(0), + term: 2, + cluster_id: 123, + members: vec![], // linearizable read from follower returns empty members + cluster_version: 1, + }, + 2 => FetchClusterResponse { + leader_id: None, // imagine this node is a disconnected candidate + term: 23, // with a high term + cluster_id: 123, + members: vec![], + cluster_version: 1, + }, + 3 => FetchClusterResponse { + leader_id: Some(3), // imagine this node is a old leader + term: 1, // with the old term + cluster_id: 123, + members: vec![ + Member::new(0, "S0", vec!["B0".to_owned()], false), + Member::new(1, "S1", vec!["B1".to_owned()], false), + Member::new(2, "S2", vec!["B2".to_owned()], false), + Member::new(3, "S3", vec!["B3".to_owned()], false), + Member::new(4, "S4", vec!["B4".to_owned()], false), + ], + cluster_version: 1, + }, + 4 => FetchClusterResponse { + leader_id: Some(3), // imagine this node is a old follower of old leader(3) + term: 1, // with the old term + cluster_id: 123, + members: vec![], + cluster_version: 1, + }, + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + }); + let unary = Unary::::new(connects, None, None); + assert!(unary.local_connect().is_none()); + let res = unary.fetch_cluster(true).await.unwrap_err(); + // only server(0, 1)'s responses are valid, less than majority quorum(3), got a mocked RpcTransport to retry + assert_eq!(res, CurpError::RpcTransport(())); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_fast_round_works() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_propose().return_once(move |_req, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), + 1 | 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + }); + let unary = Unary::::new(connects, None, None); + let res = unary + .fast_round(ProposeId(0, 0), &TestCommand::default()) + .await + .unwrap() + .unwrap(); + assert_eq!(res, TestCommandResult::default()); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_fast_round_return_early_err() { + for early_err in [ + CurpError::duplicated(), + CurpError::shutting_down(), + CurpError::invalid_config(), + CurpError::node_already_exists(), + CurpError::node_not_exist(), + CurpError::learner_not_catch_up(), + CurpError::expired_client_id(), + CurpError::wrong_cluster_version(), + CurpError::redirect(Some(1), 0), + ] { + assert!(early_err.return_early()); + // record how many times `handle_propose` was invoked. + let counter = Arc::new(Mutex::new(0)); + let connects = init_mocked_connects(3, |_id, conn| { + let counter_c = Arc::clone(&counter); + let err = early_err.clone(); + conn.expect_propose().return_once(move |_req, _timeout| { + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + }); + let unary = Unary::::new(connects, None, None); + let err = unary + .fast_round(ProposeId(0, 0), &TestCommand::default()) + .await + .unwrap_err(); + assert_eq!(err, early_err); + assert_eq!(*counter.lock().unwrap(), 1); + } +} + +#[traced_test] +#[tokio::test] +async fn test_unary_fast_round_less_quorum() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_propose().return_once(move |_req, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), + 1 | 2 => ProposeResponse::new_empty(), + 3 | 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + }); + let unary = Unary::::new(connects, None, None); + let err = unary + .fast_round(ProposeId(0, 0), &TestCommand::default()) + .await + .unwrap_err(); + assert_eq!(err, CurpError::KeyConflict(())); +} + +/// FIXME: two leader +/// TODO: fix in subsequence PR +#[traced_test] +#[tokio::test] +#[should_panic] +async fn test_unary_fast_round_with_two_leader() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_propose().return_once(move |_req, _timeout| { + let resp = match id { + // The execution result has been returned, indicating that server(0) has also recorded the command. + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::new( + vec![1], + vec![1], + ))), + // imagine that server(1) is the new leader + 1 => ProposeResponse::new_result::(&Ok(TestCommandResult::new( + vec![2], + vec![2], + ))), + 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + }); + // old local leader(0), term 1 + let unary = Unary::::new(connects, None, Some((0, 1))); + let res = unary + .fast_round(ProposeId(0, 0), &TestCommand::default()) + .await + .unwrap() + .unwrap(); + // quorum: server(0, 1, 2, 3) + assert_eq!(res, TestCommandResult::new(vec![2], vec![2])); +} diff --git a/curp/src/client_new/unary.rs b/curp/src/client_new/unary.rs index 938c6f921..744234ebb 100644 --- a/curp/src/client_new/unary.rs +++ b/curp/src/client_new/unary.rs @@ -54,6 +54,79 @@ impl std::fmt::Debug for State { } } +/// Unary builder +pub(super) struct UnaryBuilder { + /// All members (required) + all_members: HashMap>, + /// Unary config (required) + config: UnaryConfig, + /// Local server (optional) + local_server: Option<(ServerId, P)>, + /// Leader id (optional) + leader: Option, + /// Term (optional) + term: Option, + /// Cluster version (optional) + cluster_version: Option, +} + +impl UnaryBuilder

{ + /// Create unary builder + pub(super) fn new(all_members: HashMap>, config: UnaryConfig) -> Self { + Self { + all_members, + config, + local_server: None, + leader: None, + term: None, + cluster_version: None, + } + } + + /// Set the local server (optional) + pub(super) fn set_local_server(&mut self, id: ServerId, server: P) { + self.local_server = Some((id, server)); + } + + /// Set the leader state (optional) + pub(super) fn set_leader_state(&mut self, id: ServerId, term: u64) { + self.leader = Some(id); + self.term = Some(term); + } + + /// Set the cluster version (optional) + pub(super) fn set_cluster_version(&mut self, cluster_version: u64) { + self.cluster_version = Some(cluster_version); + } + + /// Build the unary client + pub(super) async fn build(mut self) -> Result, tonic::transport::Error> { + let mut local_server_id = None; + if let Some((id, _)) = self.local_server { + let _ig = self.all_members.remove(&id); + } + let mut connects: HashMap<_, _> = rpc::connects(self.all_members).await?.collect(); + if let Some((id, server)) = self.local_server { + debug!("client bypassed server({id})"); + local_server_id = Some(id); + let _ig = connects.insert(id, Arc::new(BypassedConnect::new(id, server))); + } + let state = State { + leader: self.leader, + term: self.term.unwrap_or_default(), + cluster_version: self.cluster_version.unwrap_or_default(), + connects, + }; + let unary = Unary { + state: RwLock::new(state), + local_server_id, + config: self.config, + phantom: PhantomData, + }; + Ok(unary) + } +} + /// The unary client config #[derive(Debug)] pub(super) struct UnaryConfig { @@ -67,7 +140,7 @@ pub(super) struct UnaryConfig { impl UnaryConfig { /// Create a unary config - fn new(propose_timeout: Duration) -> Self { + pub(super) fn new(propose_timeout: Duration) -> Self { Self { propose_timeout, wait_synced_timeout: propose_timeout * 2, @@ -75,7 +148,7 @@ impl UnaryConfig { } /// Create a unary config - fn new_full(propose_timeout: Duration, wait_synced_timeout: Duration) -> Self { + pub(super) fn new_full(propose_timeout: Duration, wait_synced_timeout: Duration) -> Self { Self { propose_timeout, wait_synced_timeout, @@ -238,7 +311,7 @@ impl Unary { } /// Send proposal to all servers - async fn fast_round( + pub(super) async fn fast_round( &self, propose_id: ProposeId, cmd: &C, @@ -323,7 +396,7 @@ impl Unary { } /// Wait synced result from server - async fn slow_round( + pub(super) async fn slow_round( &self, propose_id: ProposeId, ) -> Result, CurpError> { @@ -524,6 +597,7 @@ impl ClientApi for Unary { let mut err: Option = None; while let Some((id, resp)) = responses.next().await { + debug!("{id} {max_term}"); let inner = match resp { Ok(r) => r, Err(e) => { @@ -541,41 +615,47 @@ impl ClientApi for Unary { continue; } }; - - #[allow(clippy::integer_arithmetic)] - match max_term.cmp(&inner.term) { - Ordering::Less => { - if !inner.members.is_empty() { + // Ignore the response of a node that doesn't know who the leader is. + // some disconnected candidates may continue to increase term, disrupting the process below. + if inner.leader_id.is_some() { + #[allow(clippy::integer_arithmetic)] + match max_term.cmp(&inner.term) { + Ordering::Less => { max_term = inner.term; - res = Some(inner); + if !inner.members.is_empty() { + res = Some(inner); + } + // reset ok count to 1 + ok_cnt = 1; } - ok_cnt = 1; - } - Ordering::Equal => { - if !inner.members.is_empty() { - res = Some(inner); + Ordering::Equal => { + if !inner.members.is_empty() { + res = Some(inner); + } + ok_cnt += 1; } - ok_cnt += 1; + Ordering::Greater => {} } - Ordering::Greater => {} } - + // first check quorum if ok_cnt >= quorum { - break; - } - } - if let Some(res) = res { - debug!("fetch cluster succeeded, result: {res:?}"); - if let Err(e) = self.check_and_update(&res).await { - warn!("update to a new cluster state failed, error {e}"); + // then check if we got the response + if let Some(res) = res { + debug!("fetch cluster succeeded, result: {res:?}"); + if let Err(e) = self.check_and_update(&res).await { + warn!("update to a new cluster state failed, error {e}"); + } + return Ok(res); + } } - return Ok(res); } + if let Some(err) = err { return Err(err); } - unreachable!("At least one server will return `members` or a connection error has occurred. Leaders should not return empty members.") + // It seems that the max term has not reached the majority here. Mock a transport error and return it to the external to retry. + return Err(CurpError::RpcTransport(())); } } From a7a80b1c1901c7db343e4551789f548e1d81e52f Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 21 Dec 2023 20:14:46 +0800 Subject: [PATCH 2/6] tests: add tests for unary propose Signed-off-by: iGxnon --- curp/src/client_new/tests.rs | 220 ++++++++++++++++++++++++++++++++++- curp/src/client_new/unary.rs | 3 +- 2 files changed, 219 insertions(+), 4 deletions(-) diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs index 91418ecd4..1488c3fc6 100644 --- a/curp/src/client_new/tests.rs +++ b/curp/src/client_new/tests.rs @@ -1,11 +1,14 @@ use std::{ collections::HashMap, ops::AddAssign, - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Duration, }; -use curp_test_utils::test_cmd::{TestCommand, TestCommandResult}; +use curp_external_api::LogIndex; +use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult}; use dashmap::DashMap; +use tokio::time::Instant; use tracing_test::traced_test; use super::unary::Unary; @@ -14,7 +17,7 @@ use crate::{ members::ServerId, rpc::{ connect::{ConnectApi, MockConnectApi}, - CurpError, FetchClusterResponse, Member, ProposeId, ProposeResponse, + CurpError, FetchClusterResponse, Member, ProposeId, ProposeResponse, WaitSyncedResponse, }, }; @@ -345,3 +348,214 @@ async fn test_unary_fast_round_with_two_leader() { // quorum: server(0, 1, 2, 3) assert_eq!(res, TestCommandResult::new(vec![2], vec![2])); } + +#[traced_test] +#[tokio::test] +async fn test_unary_slow_round_fetch_leader_first() { + let flag = Arc::new(AtomicBool::new(false)); + let connects = init_mocked_connects(3, |id, conn| { + let flag_c = Arc::clone(&flag); + conn.expect_fetch_cluster() + .return_once(move |_req, _timeout| { + flag_c.store(true, std::sync::atomic::Ordering::Relaxed); + Ok(tonic::Response::new(FetchClusterResponse { + leader_id: Some(0), + term: 1, + cluster_id: 123, + members: vec![ + Member::new(0, "S0", vec!["A0".to_owned()], false), + Member::new(1, "S1", vec!["A1".to_owned()], false), + Member::new(2, "S2", vec!["A2".to_owned()], false), + ], + cluster_version: 1, + })) + }); + let flag_c = Arc::clone(&flag); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + assert!( + flag_c.load(std::sync::atomic::Ordering::Relaxed), + "fetch_leader should invoke first" + ); + Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< + TestCommand, + >( + Ok(TestCommandResult::default()), + Some(Ok(1.into())), + ))) + }); + }); + let unary = Unary::::new(connects, None, None); + let res = unary.slow_round(ProposeId(0, 0)).await.unwrap().unwrap(); + assert_eq!(LogIndex::from(res.0), 1); + assert_eq!(res.1, TestCommandResult::default()); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_propose_fast_path_works() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_propose().return_once(move |_req, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), + 1 | 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + std::thread::sleep(Duration::from_millis(100)); + Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< + TestCommand, + >( + Ok(TestCommandResult::default()), + Some(Ok(1.into())), + ))) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let res = unary + .propose(&TestCommand::default(), true) + .await + .unwrap() + .unwrap(); + assert_eq!(res, (TestCommandResult::default(), None)); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_propose_slow_path_works() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_propose().return_once(move |_req, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), + 1 | 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + std::thread::sleep(Duration::from_millis(100)); + Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< + TestCommand, + >( + Ok(TestCommandResult::default()), + Some(Ok(1.into())), + ))) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let start_at = Instant::now(); + let res = unary + .propose(&TestCommand::default(), false) + .await + .unwrap() + .unwrap(); + assert!( + start_at.elapsed() > Duration::from_millis(100), + "slow round takes at least 100ms" + ); + assert_eq!( + res, + (TestCommandResult::default(), Some(LogIndexResult::from(1))) + ); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_propose_fast_path_fallback_slow_path() { + // record how many times `handle_propose` was invoked. + let counter = Arc::new(Mutex::new(0)); + let connects = init_mocked_connects(5, |id, conn| { + let counter_c = Arc::clone(&counter); + conn.expect_propose().return_once(move |_req, _timeout| { + counter_c.lock().unwrap().add_assign(1); + // insufficient quorum + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), + 1 | 2 => ProposeResponse::new_empty(), + 3 | 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + std::thread::sleep(Duration::from_millis(100)); + Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< + TestCommand, + >( + Ok(TestCommandResult::default()), + Some(Ok(1.into())), + ))) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let start_at = Instant::now(); + let res = unary + .propose(&TestCommand::default(), true) + .await + .unwrap() + .unwrap(); + assert!( + start_at.elapsed() > Duration::from_millis(100), + "slow round takes at least 100ms" + ); + // indicate that we actually run out of fast round + assert_eq!(*counter.lock().unwrap(), 5); + assert_eq!( + res, + (TestCommandResult::default(), Some(LogIndexResult::from(1))) + ); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_propose_return_early_err() { + for early_err in [ + CurpError::duplicated(), + CurpError::shutting_down(), + CurpError::invalid_config(), + CurpError::node_already_exists(), + CurpError::node_not_exist(), + CurpError::learner_not_catch_up(), + CurpError::expired_client_id(), + CurpError::wrong_cluster_version(), + CurpError::redirect(Some(1), 0), + ] { + assert!(early_err.return_early()); + // record how many times rpc was invoked. + let counter = Arc::new(Mutex::new(0)); + let connects = init_mocked_connects(5, |id, conn| { + let err = early_err.clone(); + let counter_c = Arc::clone(&counter); + conn.expect_propose().return_once(move |_req, _timeout| { + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + let err = early_err.clone(); + let counter_c = Arc::clone(&counter); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let err = unary + .propose(&TestCommand::default(), true) + .await + .unwrap_err(); + assert_eq!(err, early_err); + assert_eq!(*counter.lock().unwrap(), 1); + } +} diff --git a/curp/src/client_new/unary.rs b/curp/src/client_new/unary.rs index 744234ebb..2e3a92401 100644 --- a/curp/src/client_new/unary.rs +++ b/curp/src/client_new/unary.rs @@ -411,6 +411,7 @@ impl Unary { // Same as fast round, we blame the server for the serializing error. CurpError::from(ser_err) })?; + debug!("slow round for cmd({}) succeed", propose_id); Ok(synced_res) } @@ -597,7 +598,6 @@ impl ClientApi for Unary { let mut err: Option = None; while let Some((id, resp)) = responses.next().await { - debug!("{id} {max_term}"); let inner = match resp { Ok(r) => r, Err(e) => { @@ -647,6 +647,7 @@ impl ClientApi for Unary { } return Ok(res); } + debug!("fetch_cluster quorum ok, but members are empty"); } } From bd51e837f2b2728a5331330cbb69ff3c43d87c61 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 21 Dec 2023 20:44:49 +0800 Subject: [PATCH 3/6] tests: add tests for retry layer Signed-off-by: iGxnon --- curp/src/client_new/tests.rs | 86 +++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs index 1488c3fc6..abf1231a0 100644 --- a/curp/src/client_new/tests.rs +++ b/curp/src/client_new/tests.rs @@ -11,7 +11,10 @@ use dashmap::DashMap; use tokio::time::Instant; use tracing_test::traced_test; -use super::unary::Unary; +use super::{ + retry::{Retry, RetryConfig}, + unary::Unary, +}; use crate::{ client_new::ClientApi, members::ServerId, @@ -559,3 +562,84 @@ async fn test_unary_propose_return_early_err() { assert_eq!(*counter.lock().unwrap(), 1); } } + +// Tests for retry layer + +#[traced_test] +#[tokio::test] +async fn test_retry_propose_return_no_retry_error() { + for early_err in [ + CurpError::duplicated(), + CurpError::shutting_down(), + CurpError::invalid_config(), + CurpError::node_already_exists(), + CurpError::node_not_exist(), + CurpError::learner_not_catch_up(), + ] { + // all no retry errors are returned early + assert!(early_err.return_early()); + // record how many times rpc was invoked. + let counter = Arc::new(Mutex::new(0)); + let connects = init_mocked_connects(5, |id, conn| { + let err = early_err.clone(); + let counter_c = Arc::clone(&counter); + conn.expect_propose().return_once(move |_req, _timeout| { + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + let err = early_err.clone(); + let counter_c = Arc::clone(&counter); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let retry = Retry::new( + unary, + RetryConfig::new_fixed(Duration::from_millis(100), 5, false), + ); + let err = retry + .propose(&TestCommand::default(), false) + .await + .unwrap_err(); + assert_eq!(err.message(), tonic::Status::from(early_err).message()); + // fast path + slow path = 2 + assert_eq!(*counter.lock().unwrap(), 2); + } +} + +#[traced_test] +#[tokio::test] +async fn test_retry_propose_return_retry_error() { + for early_err in [ + CurpError::expired_client_id(), + CurpError::key_conflict(), + CurpError::RpcTransport(()), + CurpError::internal("No reason"), + ] { + let connects = init_mocked_connects(5, |id, conn| { + let err = early_err.clone(); + conn.expect_propose() + .returning(move |_req, _timeout| Err(err.clone())); + if id == 0 { + let err = early_err.clone(); + conn.expect_wait_synced() + .times(5) // wait synced should be retried in 5 times on leader + .returning(move |_req, _timeout| Err(err.clone())); + } + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let retry = Retry::new( + unary, + RetryConfig::new_fixed(Duration::from_millis(10), 5, false), + ); + let err = retry + .propose(&TestCommand::default(), false) + .await + .unwrap_err(); + assert_eq!(err.message(), "request timeout"); + } +} From a4f936baef934098a18b95e9333a19d17ec7d0ad Mon Sep 17 00:00:00 2001 From: iGxnon Date: Mon, 25 Dec 2023 23:13:57 +0800 Subject: [PATCH 4/6] test: fix blocking in single thread Signed-off-by: iGxnon --- curp/src/client_new/tests.rs | 41 ++++++++++++++++++------------------ curp/src/client_new/unary.rs | 38 ++++++++++++++++++++++++++++----- 2 files changed, 53 insertions(+), 26 deletions(-) diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs index abf1231a0..719d63597 100644 --- a/curp/src/client_new/tests.rs +++ b/curp/src/client_new/tests.rs @@ -7,7 +7,6 @@ use std::{ use curp_external_api::LogIndex; use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult}; -use dashmap::DashMap; use tokio::time::Instant; use tracing_test::traced_test; @@ -29,7 +28,7 @@ use crate::{ fn init_mocked_connects( size: usize, f: impl Fn(usize, &mut MockConnectApi), -) -> DashMap> { +) -> HashMap> { std::iter::repeat_with(|| MockConnectApi::new()) .take(size) .enumerate() @@ -62,8 +61,8 @@ async fn test_unary_fetch_clusters_serializable() { })) }); }); - let unary = Unary::::new(connects, None, None); - assert!(unary.local_connect().is_none()); + let unary = Unary::::new(connects, None, None, None); + assert!(unary.local_connect().await.is_none()); let res = unary.fetch_cluster(false).await.unwrap(); assert_eq!( res.into_members_addrs(), @@ -96,8 +95,8 @@ async fn test_unary_fetch_clusters_serializable_local_first() { })) }); }); - let unary = Unary::::new(connects, Some(1), None); - assert!(unary.local_connect().is_some()); + let unary = Unary::::new(connects, Some(1), None, None); + assert!(unary.local_connect().await.is_some()); let res = unary.fetch_cluster(false).await.unwrap(); assert!(res.members.is_empty()); } @@ -154,8 +153,8 @@ async fn test_unary_fetch_clusters_linearizable() { Ok(tonic::Response::new(resp)) }); }); - let unary = Unary::::new(connects, None, None); - assert!(unary.local_connect().is_none()); + let unary = Unary::::new(connects, None, None, None); + assert!(unary.local_connect().await.is_none()); let res = unary.fetch_cluster(true).await.unwrap(); assert_eq!( res.into_members_addrs(), @@ -228,8 +227,8 @@ async fn test_unary_fetch_clusters_linearizable_failed() { Ok(tonic::Response::new(resp)) }); }); - let unary = Unary::::new(connects, None, None); - assert!(unary.local_connect().is_none()); + let unary = Unary::::new(connects, None, None, None); + assert!(unary.local_connect().await.is_none()); let res = unary.fetch_cluster(true).await.unwrap_err(); // only server(0, 1)'s responses are valid, less than majority quorum(3), got a mocked RpcTransport to retry assert_eq!(res, CurpError::RpcTransport(())); @@ -249,7 +248,7 @@ async fn test_unary_fast_round_works() { Ok(tonic::Response::new(resp)) }); }); - let unary = Unary::::new(connects, None, None); + let unary = Unary::::new(connects, None, None, None); let res = unary .fast_round(ProposeId(0, 0), &TestCommand::default()) .await @@ -283,7 +282,7 @@ async fn test_unary_fast_round_return_early_err() { Err(err) }); }); - let unary = Unary::::new(connects, None, None); + let unary = Unary::::new(connects, None, None, None); let err = unary .fast_round(ProposeId(0, 0), &TestCommand::default()) .await @@ -307,7 +306,7 @@ async fn test_unary_fast_round_less_quorum() { Ok(tonic::Response::new(resp)) }); }); - let unary = Unary::::new(connects, None, None); + let unary = Unary::::new(connects, None, None, None); let err = unary .fast_round(ProposeId(0, 0), &TestCommand::default()) .await @@ -342,7 +341,7 @@ async fn test_unary_fast_round_with_two_leader() { }); }); // old local leader(0), term 1 - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let res = unary .fast_round(ProposeId(0, 0), &TestCommand::default()) .await @@ -389,7 +388,7 @@ async fn test_unary_slow_round_fetch_leader_first() { ))) }); }); - let unary = Unary::::new(connects, None, None); + let unary = Unary::::new(connects, None, None, None); let res = unary.slow_round(ProposeId(0, 0)).await.unwrap().unwrap(); assert_eq!(LogIndex::from(res.0), 1); assert_eq!(res.1, TestCommandResult::default()); @@ -420,7 +419,7 @@ async fn test_unary_propose_fast_path_works() { ))) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let res = unary .propose(&TestCommand::default(), true) .await @@ -454,7 +453,7 @@ async fn test_unary_propose_slow_path_works() { ))) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let start_at = Instant::now(); let res = unary .propose(&TestCommand::default(), false) @@ -501,7 +500,7 @@ async fn test_unary_propose_fast_path_fallback_slow_path() { ))) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let start_at = Instant::now(); let res = unary .propose(&TestCommand::default(), true) @@ -553,7 +552,7 @@ async fn test_unary_propose_return_early_err() { Err(err) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let err = unary .propose(&TestCommand::default(), true) .await @@ -596,7 +595,7 @@ async fn test_retry_propose_return_no_retry_error() { Err(err) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let retry = Retry::new( unary, RetryConfig::new_fixed(Duration::from_millis(100), 5, false), @@ -631,7 +630,7 @@ async fn test_retry_propose_return_retry_error() { .returning(move |_req, _timeout| Err(err.clone())); } }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let retry = Retry::new( unary, RetryConfig::new_fixed(Duration::from_millis(10), 5, false), diff --git a/curp/src/client_new/unary.rs b/curp/src/client_new/unary.rs index 2e3a92401..a434a3156 100644 --- a/curp/src/client_new/unary.rs +++ b/curp/src/client_new/unary.rs @@ -170,6 +170,30 @@ pub(super) struct Unary { } impl Unary { + /// Used in tests, all connects are mocked + #[cfg(test)] + pub(super) fn new( + connects: HashMap>, + local_server_id: Option, + leader_id: Option, + term: Option, + ) -> Self { + Self { + state: RwLock::new(State { + leader: leader_id, + term: term.unwrap_or_default(), + cluster_version: 0, + connects, + }), + local_server_id, + config: UnaryConfig { + propose_timeout: Duration::from_secs(0), + wait_synced_timeout: Duration::from_secs(0), + }, + phantom: PhantomData, + } + } + /// Update leader fn check_and_update_leader(state: &mut State, leader_id: Option, term: u64) -> bool { match state.term.cmp(&term) { @@ -276,7 +300,8 @@ impl Unary { .await .connects .values() - .map(|connect| f(Arc::clone(connect))) + .map(Arc::clone) + .map(|connect| f(connect)) .collect::>(); // size calculated here to keep size = stream.len(), otherwise Non-atomic read operation on the `connects` may result in inconsistency. let size = connects.len(); @@ -293,8 +318,7 @@ impl Unary { &self, f: impl FnOnce(Arc) -> F, ) -> Result { - let state_r = self.state.read().await; - let cached_leader = state_r.leader; + let cached_leader = self.state.read().await.leader; let leader_id = match cached_leader { Some(id) => id, None => as ClientApi>::fetch_leader_id(self, false).await?, @@ -302,11 +326,15 @@ impl Unary { // If the leader id cannot be found in connects, it indicates that there is // an inconsistency between the client's local leader state and the cluster // state, then mock a `WrongClusterVersion` return to the outside. - let connect = state_r + let connect = self + .state + .read() + .await .connects .get(&leader_id) + .map(Arc::clone) .ok_or_else(CurpError::wrong_cluster_version)?; - let res = f(Arc::clone(connect)).await; + let res = f(connect).await; Ok(res) } From 57814004a6f752678acba9248b9d6cd2fe9ecf59 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Fri, 29 Dec 2023 14:49:30 +0800 Subject: [PATCH 5/6] chore: rebase on latest Signed-off-by: iGxnon --- curp/src/client_new/tests.rs | 21 +++++++-------------- curp/src/rpc/mod.rs | 4 ++++ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs index 719d63597..8b4a003ab 100644 --- a/curp/src/client_new/tests.rs +++ b/curp/src/client_new/tests.rs @@ -19,7 +19,8 @@ use crate::{ members::ServerId, rpc::{ connect::{ConnectApi, MockConnectApi}, - CurpError, FetchClusterResponse, Member, ProposeId, ProposeResponse, WaitSyncedResponse, + CurpError, CurpErrorPriority, FetchClusterResponse, Member, ProposeId, ProposeResponse, + WaitSyncedResponse, }, }; @@ -268,10 +269,9 @@ async fn test_unary_fast_round_return_early_err() { CurpError::node_not_exist(), CurpError::learner_not_catch_up(), CurpError::expired_client_id(), - CurpError::wrong_cluster_version(), CurpError::redirect(Some(1), 0), ] { - assert!(early_err.return_early()); + assert_eq!(early_err.priority(), CurpErrorPriority::ReturnImmediately); // record how many times `handle_propose` was invoked. let counter = Arc::new(Mutex::new(0)); let connects = init_mocked_connects(3, |_id, conn| { @@ -530,10 +530,9 @@ async fn test_unary_propose_return_early_err() { CurpError::node_not_exist(), CurpError::learner_not_catch_up(), CurpError::expired_client_id(), - CurpError::wrong_cluster_version(), CurpError::redirect(Some(1), 0), ] { - assert!(early_err.return_early()); + assert_eq!(early_err.priority(), CurpErrorPriority::ReturnImmediately); // record how many times rpc was invoked. let counter = Arc::new(Mutex::new(0)); let connects = init_mocked_connects(5, |id, conn| { @@ -576,7 +575,7 @@ async fn test_retry_propose_return_no_retry_error() { CurpError::learner_not_catch_up(), ] { // all no retry errors are returned early - assert!(early_err.return_early()); + assert_eq!(early_err.priority(), CurpErrorPriority::ReturnImmediately); // record how many times rpc was invoked. let counter = Arc::new(Mutex::new(0)); let connects = init_mocked_connects(5, |id, conn| { @@ -596,10 +595,7 @@ async fn test_retry_propose_return_no_retry_error() { }); }); let unary = Unary::::new(connects, None, Some(0), Some(1)); - let retry = Retry::new( - unary, - RetryConfig::new_fixed(Duration::from_millis(100), 5, false), - ); + let retry = Retry::new(unary, RetryConfig::new_fixed(Duration::from_millis(100), 5)); let err = retry .propose(&TestCommand::default(), false) .await @@ -631,10 +627,7 @@ async fn test_retry_propose_return_retry_error() { } }); let unary = Unary::::new(connects, None, Some(0), Some(1)); - let retry = Retry::new( - unary, - RetryConfig::new_fixed(Duration::from_millis(10), 5, false), - ); + let retry = Retry::new(unary, RetryConfig::new_fixed(Duration::from_millis(10), 5)); let err = retry .propose(&TestCommand::default(), false) .await diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index c1d4edc25..27be16943 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -578,6 +578,10 @@ impl PublishRequest { } } +/// NOTICE: +/// Please check test case `test_unary_fast_round_return_early_err` `test_unary_propose_return_early_err` +/// `test_retry_propose_return_no_retry_error` `test_retry_propose_return_retry_error` if you added some +/// new [`CurpError`] impl CurpError { /// `KeyConflict` error pub(crate) fn key_conflict() -> Self { From dc56e3ff1fe3e4122225544ec11bd076f3e949e7 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Wed, 3 Jan 2024 13:20:07 +0800 Subject: [PATCH 6/6] chore: correct cluster version in client Signed-off-by: iGxnon --- curp/src/client_new/tests.rs | 4 ++-- curp/src/client_new/unary.rs | 5 ++--- xlineapi/proto | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs index 8b4a003ab..a72fa7afb 100644 --- a/curp/src/client_new/tests.rs +++ b/curp/src/client_new/tests.rs @@ -130,8 +130,8 @@ async fn test_unary_fetch_clusters_linearizable() { cluster_version: 1, }, 2 => FetchClusterResponse { - leader_id: None, // imagine this node is a disconnected candidate - term: 23, // with a high term + leader_id: None, + term: 23, // abnormal term cluster_id: 123, members: vec![], cluster_version: 1, diff --git a/curp/src/client_new/unary.rs b/curp/src/client_new/unary.rs index a434a3156..3e85c32e6 100644 --- a/curp/src/client_new/unary.rs +++ b/curp/src/client_new/unary.rs @@ -238,9 +238,9 @@ impl Unary { if !Self::check_and_update_leader(&mut state, res.leader_id, res.term) { return Ok(()); } - if state.cluster_version >= res.cluster_version { + if state.cluster_version == res.cluster_version { debug!( - "ignore old cluster version({}) from server", + "ignore cluster version({}) from server", res.cluster_version ); return Ok(()); @@ -644,7 +644,6 @@ impl ClientApi for Unary { } }; // Ignore the response of a node that doesn't know who the leader is. - // some disconnected candidates may continue to increase term, disrupting the process below. if inner.leader_id.is_some() { #[allow(clippy::integer_arithmetic)] match max_term.cmp(&inner.term) { diff --git a/xlineapi/proto b/xlineapi/proto index 2331b11cf..e8696c7c5 160000 --- a/xlineapi/proto +++ b/xlineapi/proto @@ -1 +1 @@ -Subproject commit 2331b11cf35d9528fad3e3d9abe4837d93cd04cc +Subproject commit e8696c7c53b44e7096f621d48b607d40d7a8bed3