Skip to content

Commit

Permalink
test: fix blocking in single thread
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Dec 25, 2023
1 parent 91e68f0 commit 6f5b578
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 26 deletions.
41 changes: 20 additions & 21 deletions curp/src/client_new/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{

use curp_external_api::LogIndex;
use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult};
use dashmap::DashMap;
use tokio::time::Instant;
use tracing_test::traced_test;

Expand All @@ -29,7 +28,7 @@ use crate::{
fn init_mocked_connects(
size: usize,
f: impl Fn(usize, &mut MockConnectApi),
) -> DashMap<ServerId, Arc<dyn ConnectApi>> {
) -> HashMap<ServerId, Arc<dyn ConnectApi>> {
std::iter::repeat_with(|| MockConnectApi::new())
.take(size)
.enumerate()
Expand Down Expand Up @@ -62,8 +61,8 @@ async fn test_unary_fetch_clusters_serializable() {
}))
});
});
let unary = Unary::<TestCommand>::new(connects, None, None);
assert!(unary.local_connect().is_none());
let unary = Unary::<TestCommand>::new(connects, None, None, None);
assert!(unary.local_connect().await.is_none());
let res = unary.fetch_cluster(false).await.unwrap();
assert_eq!(
res.into_members_addrs(),
Expand Down Expand Up @@ -96,8 +95,8 @@ async fn test_unary_fetch_clusters_serializable_local_first() {
}))
});
});
let unary = Unary::<TestCommand>::new(connects, Some(1), None);
assert!(unary.local_connect().is_some());
let unary = Unary::<TestCommand>::new(connects, Some(1), None, None);
assert!(unary.local_connect().await.is_some());
let res = unary.fetch_cluster(false).await.unwrap();
assert!(res.members.is_empty());
}
Expand Down Expand Up @@ -154,8 +153,8 @@ async fn test_unary_fetch_clusters_linearizable() {
Ok(tonic::Response::new(resp))
});
});
let unary = Unary::<TestCommand>::new(connects, None, None);
assert!(unary.local_connect().is_none());
let unary = Unary::<TestCommand>::new(connects, None, None, None);
assert!(unary.local_connect().await.is_none());
let res = unary.fetch_cluster(true).await.unwrap();
assert_eq!(
res.into_members_addrs(),
Expand Down Expand Up @@ -228,8 +227,8 @@ async fn test_unary_fetch_clusters_linearizable_failed() {
Ok(tonic::Response::new(resp))
});
});
let unary = Unary::<TestCommand>::new(connects, None, None);
assert!(unary.local_connect().is_none());
let unary = Unary::<TestCommand>::new(connects, None, None, None);
assert!(unary.local_connect().await.is_none());
let res = unary.fetch_cluster(true).await.unwrap_err();
// only server(0, 1)'s responses are valid, less than majority quorum(3), got a mocked RpcTransport to retry
assert_eq!(res, CurpError::RpcTransport(()));
Expand All @@ -249,7 +248,7 @@ async fn test_unary_fast_round_works() {
Ok(tonic::Response::new(resp))
});
});
let unary = Unary::<TestCommand>::new(connects, None, None);
let unary = Unary::<TestCommand>::new(connects, None, None, None);
let res = unary
.fast_round(ProposeId(0, 0), &TestCommand::default())
.await
Expand Down Expand Up @@ -283,7 +282,7 @@ async fn test_unary_fast_round_return_early_err() {
Err(err)
});
});
let unary = Unary::<TestCommand>::new(connects, None, None);
let unary = Unary::<TestCommand>::new(connects, None, None, None);
let err = unary
.fast_round(ProposeId(0, 0), &TestCommand::default())
.await
Expand All @@ -307,7 +306,7 @@ async fn test_unary_fast_round_less_quorum() {
Ok(tonic::Response::new(resp))
});
});
let unary = Unary::<TestCommand>::new(connects, None, None);
let unary = Unary::<TestCommand>::new(connects, None, None, None);
let err = unary
.fast_round(ProposeId(0, 0), &TestCommand::default())
.await
Expand Down Expand Up @@ -342,7 +341,7 @@ async fn test_unary_fast_round_with_two_leader() {
});
});
// old local leader(0), term 1
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let unary = Unary::<TestCommand>::new(connects, None, Some(0), Some(1));
let res = unary
.fast_round(ProposeId(0, 0), &TestCommand::default())
.await
Expand Down Expand Up @@ -389,7 +388,7 @@ async fn test_unary_slow_round_fetch_leader_first() {
)))
});
});
let unary = Unary::<TestCommand>::new(connects, None, None);
let unary = Unary::<TestCommand>::new(connects, None, None, None);
let res = unary.slow_round(ProposeId(0, 0)).await.unwrap().unwrap();
assert_eq!(LogIndex::from(res.0), 1);
assert_eq!(res.1, TestCommandResult::default());
Expand Down Expand Up @@ -420,7 +419,7 @@ async fn test_unary_propose_fast_path_works() {
)))

Check warning on line 419 in curp/src/client_new/tests.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/tests.rs#L412-L419

Added lines #L412 - L419 were not covered by tests
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let unary = Unary::<TestCommand>::new(connects, None, Some(0), Some(1));
let res = unary
.propose(&TestCommand::default(), true)
.await
Expand Down Expand Up @@ -454,7 +453,7 @@ async fn test_unary_propose_slow_path_works() {
)))
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let unary = Unary::<TestCommand>::new(connects, None, Some(0), Some(1));
let start_at = Instant::now();
let res = unary
.propose(&TestCommand::default(), false)
Expand Down Expand Up @@ -501,7 +500,7 @@ async fn test_unary_propose_fast_path_fallback_slow_path() {
)))
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let unary = Unary::<TestCommand>::new(connects, None, Some(0), Some(1));
let start_at = Instant::now();
let res = unary
.propose(&TestCommand::default(), true)
Expand Down Expand Up @@ -553,7 +552,7 @@ async fn test_unary_propose_return_early_err() {
Err(err)

Check warning on line 552 in curp/src/client_new/tests.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/tests.rs#L550-L552

Added lines #L550 - L552 were not covered by tests
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let unary = Unary::<TestCommand>::new(connects, None, Some(0), Some(1));
let err = unary
.propose(&TestCommand::default(), true)
.await
Expand Down Expand Up @@ -596,7 +595,7 @@ async fn test_retry_propose_return_no_retry_error() {
Err(err)
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let unary = Unary::<TestCommand>::new(connects, None, Some(0), Some(1));
let retry = Retry::new(
unary,
RetryConfig::new_fixed(Duration::from_millis(100), 5, false),
Expand Down Expand Up @@ -631,7 +630,7 @@ async fn test_retry_propose_return_retry_error() {
.returning(move |_req, _timeout| Err(err.clone()));
}
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let unary = Unary::<TestCommand>::new(connects, None, Some(0), Some(1));
let retry = Retry::new(
unary,
RetryConfig::new_fixed(Duration::from_millis(10), 5, false),
Expand Down
38 changes: 33 additions & 5 deletions curp/src/client_new/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,30 @@ pub(super) struct Unary<C: Command> {
}

impl<C: Command> Unary<C> {
/// Used in tests, all connects are mocked
#[cfg(test)]
pub(super) fn new(
connects: HashMap<ServerId, Arc<dyn ConnectApi>>,
local_server_id: Option<ServerId>,
leader_id: Option<ServerId>,
term: Option<u64>,
) -> Self {
Self {
state: RwLock::new(State {
leader: leader_id,
term: term.unwrap_or_default(),
cluster_version: 0,
connects,
}),
local_server_id,
config: UnaryConfig {
propose_timeout: Duration::from_secs(0),
wait_synced_timeout: Duration::from_secs(0),
},
phantom: PhantomData,
}
}

/// Update leader
fn check_and_update_leader(state: &mut State, leader_id: Option<ServerId>, term: u64) -> bool {
match state.term.cmp(&term) {
Expand Down Expand Up @@ -275,7 +299,8 @@ impl<C: Command> Unary<C> {
.await

Check warning on line 299 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L299

Added line #L299 was not covered by tests
.connects
.values()
.map(|connect| f(Arc::clone(connect)))
.map(Arc::clone)
.map(|connect| f(connect))
.collect::<FuturesUnordered<F>>();
// size calculated here to keep size = stream.len(), otherwise Non-atomic read operation on the `connects` may result in inconsistency.
let size = connects.len();
Expand All @@ -292,20 +317,23 @@ impl<C: Command> Unary<C> {
&self,
f: impl FnOnce(Arc<dyn ConnectApi>) -> F,
) -> Result<R, CurpError> {
let state_r = self.state.read().await;
let cached_leader = state_r.leader;
let cached_leader = self.state.read().await.leader;
let leader_id = match cached_leader {
Some(id) => id,
None => <Unary<C> as ClientApi>::fetch_leader_id(self, false).await?,
};
// If the leader id cannot be found in connects, it indicates that there is
// an inconsistency between the client's local leader state and the cluster
// state, then mock a `WrongClusterVersion` return to the outside.
let connect = state_r
let connect = self
.state
.read()
.await

Check warning on line 331 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L331

Added line #L331 was not covered by tests
.connects
.get(&leader_id)
.map(Arc::clone)
.ok_or_else(CurpError::wrong_cluster_version)?;
let res = f(Arc::clone(connect)).await;
let res = f(connect).await;
Ok(res)
}

Expand Down

0 comments on commit 6f5b578

Please sign in to comment.