diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs index abf1231a0..719d63597 100644 --- a/curp/src/client_new/tests.rs +++ b/curp/src/client_new/tests.rs @@ -7,7 +7,6 @@ use std::{ use curp_external_api::LogIndex; use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult}; -use dashmap::DashMap; use tokio::time::Instant; use tracing_test::traced_test; @@ -29,7 +28,7 @@ use crate::{ fn init_mocked_connects( size: usize, f: impl Fn(usize, &mut MockConnectApi), -) -> DashMap> { +) -> HashMap> { std::iter::repeat_with(|| MockConnectApi::new()) .take(size) .enumerate() @@ -62,8 +61,8 @@ async fn test_unary_fetch_clusters_serializable() { })) }); }); - let unary = Unary::::new(connects, None, None); - assert!(unary.local_connect().is_none()); + let unary = Unary::::new(connects, None, None, None); + assert!(unary.local_connect().await.is_none()); let res = unary.fetch_cluster(false).await.unwrap(); assert_eq!( res.into_members_addrs(), @@ -96,8 +95,8 @@ async fn test_unary_fetch_clusters_serializable_local_first() { })) }); }); - let unary = Unary::::new(connects, Some(1), None); - assert!(unary.local_connect().is_some()); + let unary = Unary::::new(connects, Some(1), None, None); + assert!(unary.local_connect().await.is_some()); let res = unary.fetch_cluster(false).await.unwrap(); assert!(res.members.is_empty()); } @@ -154,8 +153,8 @@ async fn test_unary_fetch_clusters_linearizable() { Ok(tonic::Response::new(resp)) }); }); - let unary = Unary::::new(connects, None, None); - assert!(unary.local_connect().is_none()); + let unary = Unary::::new(connects, None, None, None); + assert!(unary.local_connect().await.is_none()); let res = unary.fetch_cluster(true).await.unwrap(); assert_eq!( res.into_members_addrs(), @@ -228,8 +227,8 @@ async fn test_unary_fetch_clusters_linearizable_failed() { Ok(tonic::Response::new(resp)) }); }); - let unary = Unary::::new(connects, None, None); - assert!(unary.local_connect().is_none()); + let unary = Unary::::new(connects, None, None, None); + assert!(unary.local_connect().await.is_none()); let res = unary.fetch_cluster(true).await.unwrap_err(); // only server(0, 1)'s responses are valid, less than majority quorum(3), got a mocked RpcTransport to retry assert_eq!(res, CurpError::RpcTransport(())); @@ -249,7 +248,7 @@ async fn test_unary_fast_round_works() { Ok(tonic::Response::new(resp)) }); }); - let unary = Unary::::new(connects, None, None); + let unary = Unary::::new(connects, None, None, None); let res = unary .fast_round(ProposeId(0, 0), &TestCommand::default()) .await @@ -283,7 +282,7 @@ async fn test_unary_fast_round_return_early_err() { Err(err) }); }); - let unary = Unary::::new(connects, None, None); + let unary = Unary::::new(connects, None, None, None); let err = unary .fast_round(ProposeId(0, 0), &TestCommand::default()) .await @@ -307,7 +306,7 @@ async fn test_unary_fast_round_less_quorum() { Ok(tonic::Response::new(resp)) }); }); - let unary = Unary::::new(connects, None, None); + let unary = Unary::::new(connects, None, None, None); let err = unary .fast_round(ProposeId(0, 0), &TestCommand::default()) .await @@ -342,7 +341,7 @@ async fn test_unary_fast_round_with_two_leader() { }); }); // old local leader(0), term 1 - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let res = unary .fast_round(ProposeId(0, 0), &TestCommand::default()) .await @@ -389,7 +388,7 @@ async fn test_unary_slow_round_fetch_leader_first() { ))) }); }); - let unary = Unary::::new(connects, None, None); + let unary = Unary::::new(connects, None, None, None); let res = unary.slow_round(ProposeId(0, 0)).await.unwrap().unwrap(); assert_eq!(LogIndex::from(res.0), 1); assert_eq!(res.1, TestCommandResult::default()); @@ -420,7 +419,7 @@ async fn test_unary_propose_fast_path_works() { ))) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let res = unary .propose(&TestCommand::default(), true) .await @@ -454,7 +453,7 @@ async fn test_unary_propose_slow_path_works() { ))) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let start_at = Instant::now(); let res = unary .propose(&TestCommand::default(), false) @@ -501,7 +500,7 @@ async fn test_unary_propose_fast_path_fallback_slow_path() { ))) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let start_at = Instant::now(); let res = unary .propose(&TestCommand::default(), true) @@ -553,7 +552,7 @@ async fn test_unary_propose_return_early_err() { Err(err) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let err = unary .propose(&TestCommand::default(), true) .await @@ -596,7 +595,7 @@ async fn test_retry_propose_return_no_retry_error() { Err(err) }); }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let retry = Retry::new( unary, RetryConfig::new_fixed(Duration::from_millis(100), 5, false), @@ -631,7 +630,7 @@ async fn test_retry_propose_return_retry_error() { .returning(move |_req, _timeout| Err(err.clone())); } }); - let unary = Unary::::new(connects, None, Some((0, 1))); + let unary = Unary::::new(connects, None, Some(0), Some(1)); let retry = Retry::new( unary, RetryConfig::new_fixed(Duration::from_millis(10), 5, false), diff --git a/curp/src/client_new/unary.rs b/curp/src/client_new/unary.rs index d5779635f..e053d02c4 100644 --- a/curp/src/client_new/unary.rs +++ b/curp/src/client_new/unary.rs @@ -169,6 +169,30 @@ pub(super) struct Unary { } impl Unary { + /// Used in tests, all connects are mocked + #[cfg(test)] + pub(super) fn new( + connects: HashMap>, + local_server_id: Option, + leader_id: Option, + term: Option, + ) -> Self { + Self { + state: RwLock::new(State { + leader: leader_id, + term: term.unwrap_or_default(), + cluster_version: 0, + connects, + }), + local_server_id, + config: UnaryConfig { + propose_timeout: Duration::from_secs(0), + wait_synced_timeout: Duration::from_secs(0), + }, + phantom: PhantomData, + } + } + /// Update leader fn check_and_update_leader(state: &mut State, leader_id: Option, term: u64) -> bool { match state.term.cmp(&term) { @@ -275,7 +299,8 @@ impl Unary { .await .connects .values() - .map(|connect| f(Arc::clone(connect))) + .map(Arc::clone) + .map(|connect| f(connect)) .collect::>(); // size calculated here to keep size = stream.len(), otherwise Non-atomic read operation on the `connects` may result in inconsistency. let size = connects.len(); @@ -292,8 +317,7 @@ impl Unary { &self, f: impl FnOnce(Arc) -> F, ) -> Result { - let state_r = self.state.read().await; - let cached_leader = state_r.leader; + let cached_leader = self.state.read().await.leader; let leader_id = match cached_leader { Some(id) => id, None => as ClientApi>::fetch_leader_id(self, false).await?, @@ -301,11 +325,15 @@ impl Unary { // If the leader id cannot be found in connects, it indicates that there is // an inconsistency between the client's local leader state and the cluster // state, then mock a `WrongClusterVersion` return to the outside. - let connect = state_r + let connect = self + .state + .read() + .await .connects .get(&leader_id) + .map(Arc::clone) .ok_or_else(CurpError::wrong_cluster_version)?; - let res = f(Arc::clone(connect)).await; + let res = f(connect).await; Ok(res) }