Skip to content

Commit

Permalink
tests: add tests for unary propose
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Dec 25, 2023
1 parent 1f32740 commit 76cfdf5
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 4 deletions.
220 changes: 217 additions & 3 deletions curp/src/client_new/tests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::{
collections::HashMap,
ops::AddAssign,
sync::{Arc, Mutex},
sync::{atomic::AtomicBool, Arc, Mutex},
time::Duration,
};

use curp_test_utils::test_cmd::{TestCommand, TestCommandResult};
use curp_external_api::LogIndex;
use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult};
use dashmap::DashMap;
use tokio::time::Instant;
use tracing_test::traced_test;

use super::unary::Unary;
Expand All @@ -14,7 +17,7 @@ use crate::{
members::ServerId,
rpc::{
connect::{ConnectApi, MockConnectApi},
CurpError, FetchClusterResponse, Member, ProposeId, ProposeResponse,
CurpError, FetchClusterResponse, Member, ProposeId, ProposeResponse, WaitSyncedResponse,
},
};

Expand Down Expand Up @@ -345,3 +348,214 @@ async fn test_unary_fast_round_with_two_leader() {
// quorum: server(0, 1, 2, 3)
assert_eq!(res, TestCommandResult::new(vec![2], vec![2]));
}

#[traced_test]
#[tokio::test]
async fn test_unary_slow_round_fetch_leader_first() {
let flag = Arc::new(AtomicBool::new(false));
let connects = init_mocked_connects(3, |id, conn| {
let flag_c = Arc::clone(&flag);
conn.expect_fetch_cluster()
.return_once(move |_req, _timeout| {
flag_c.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(tonic::Response::new(FetchClusterResponse {
leader_id: Some(0),
term: 1,
cluster_id: 123,
members: vec![
Member::new(0, "S0", vec!["A0".to_owned()], false),
Member::new(1, "S1", vec!["A1".to_owned()], false),
Member::new(2, "S2", vec!["A2".to_owned()], false),
],
cluster_version: 1,
}))
});
let flag_c = Arc::clone(&flag);
conn.expect_wait_synced()
.return_once(move |_req, _timeout| {
assert!(id == 0, "wait synced should send to leader");
assert!(
flag_c.load(std::sync::atomic::Ordering::Relaxed),
"fetch_leader should invoke first"
);
Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::<
TestCommand,
>(
Ok(TestCommandResult::default()),
Some(Ok(1.into())),
)))
});
});
let unary = Unary::<TestCommand>::new(connects, None, None);
let res = unary.slow_round(ProposeId(0, 0)).await.unwrap().unwrap();
assert_eq!(LogIndex::from(res.0), 1);
assert_eq!(res.1, TestCommandResult::default());
}

#[traced_test]
#[tokio::test]
async fn test_unary_propose_fast_path_works() {
let connects = init_mocked_connects(5, |id, conn| {
conn.expect_propose().return_once(move |_req, _timeout| {
let resp = match id {
0 => ProposeResponse::new_result::<TestCommand>(&Ok(TestCommandResult::default())),
1 | 2 | 3 => ProposeResponse::new_empty(),
4 => return Err(CurpError::key_conflict()),
_ => unreachable!("there are only 5 nodes"),
};
Ok(tonic::Response::new(resp))
});
conn.expect_wait_synced()
.return_once(move |_req, _timeout| {
assert!(id == 0, "wait synced should send to leader");
std::thread::sleep(Duration::from_millis(100));
Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::<
TestCommand,
>(
Ok(TestCommandResult::default()),
Some(Ok(1.into())),
)))
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let res = unary
.propose(&TestCommand::default(), true)
.await
.unwrap()
.unwrap();
assert_eq!(res, (TestCommandResult::default(), None));
}

#[traced_test]
#[tokio::test]
async fn test_unary_propose_slow_path_works() {
let connects = init_mocked_connects(5, |id, conn| {
conn.expect_propose().return_once(move |_req, _timeout| {
let resp = match id {
0 => ProposeResponse::new_result::<TestCommand>(&Ok(TestCommandResult::default())),
1 | 2 | 3 => ProposeResponse::new_empty(),
4 => return Err(CurpError::key_conflict()),
_ => unreachable!("there are only 5 nodes"),
};
Ok(tonic::Response::new(resp))
});
conn.expect_wait_synced()
.return_once(move |_req, _timeout| {
assert!(id == 0, "wait synced should send to leader");
std::thread::sleep(Duration::from_millis(100));
Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::<
TestCommand,
>(
Ok(TestCommandResult::default()),
Some(Ok(1.into())),
)))
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let start_at = Instant::now();
let res = unary
.propose(&TestCommand::default(), false)
.await
.unwrap()
.unwrap();
assert!(
start_at.elapsed() > Duration::from_millis(100),
"slow round takes at least 100ms"
);
assert_eq!(
res,
(TestCommandResult::default(), Some(LogIndexResult::from(1)))
);
}

#[traced_test]
#[tokio::test]
async fn test_unary_propose_fast_path_fallback_slow_path() {
// record how many times `handle_propose` was invoked.
let counter = Arc::new(Mutex::new(0));
let connects = init_mocked_connects(5, |id, conn| {
let counter_c = Arc::clone(&counter);
conn.expect_propose().return_once(move |_req, _timeout| {
counter_c.lock().unwrap().add_assign(1);
// insufficient quorum
let resp = match id {
0 => ProposeResponse::new_result::<TestCommand>(&Ok(TestCommandResult::default())),
1 | 2 => ProposeResponse::new_empty(),
3 | 4 => return Err(CurpError::key_conflict()),
_ => unreachable!("there are only 5 nodes"),
};
Ok(tonic::Response::new(resp))
});
conn.expect_wait_synced()
.return_once(move |_req, _timeout| {
assert!(id == 0, "wait synced should send to leader");
std::thread::sleep(Duration::from_millis(100));
Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::<
TestCommand,
>(
Ok(TestCommandResult::default()),
Some(Ok(1.into())),
)))
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let start_at = Instant::now();
let res = unary
.propose(&TestCommand::default(), true)
.await
.unwrap()
.unwrap();
assert!(
start_at.elapsed() > Duration::from_millis(100),
"slow round takes at least 100ms"
);
// indicate that we actually run out of fast round
assert_eq!(*counter.lock().unwrap(), 5);
assert_eq!(
res,
(TestCommandResult::default(), Some(LogIndexResult::from(1)))
);
}

#[traced_test]
#[tokio::test]
async fn test_unary_propose_return_early_err() {
for early_err in [
CurpError::duplicated(),
CurpError::shutting_down(),
CurpError::invalid_config(),
CurpError::node_already_exists(),
CurpError::node_not_exist(),
CurpError::learner_not_catch_up(),
CurpError::expired_client_id(),
CurpError::wrong_cluster_version(),
CurpError::redirect(Some(1), 0),
] {
assert!(early_err.return_early());
// record how many times rpc was invoked.
let counter = Arc::new(Mutex::new(0));
let connects = init_mocked_connects(5, |id, conn| {
let err = early_err.clone();
let counter_c = Arc::clone(&counter);
conn.expect_propose().return_once(move |_req, _timeout| {
counter_c.lock().unwrap().add_assign(1);
Err(err)
});
let err = early_err.clone();
let counter_c = Arc::clone(&counter);
conn.expect_wait_synced()
.return_once(move |_req, _timeout| {
assert!(id == 0, "wait synced should send to leader");
counter_c.lock().unwrap().add_assign(1);
Err(err)
});
});
let unary = Unary::<TestCommand>::new(connects, None, Some((0, 1)));
let err = unary
.propose(&TestCommand::default(), true)
.await
.unwrap_err();
assert_eq!(err, early_err);
assert_eq!(*counter.lock().unwrap(), 1);
}
}
3 changes: 2 additions & 1 deletion curp/src/client_new/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ impl<C: Command> Unary<C> {
// Same as fast round, we blame the server for the serializing error.
CurpError::from(ser_err)
})?;
debug!("slow round for cmd({}) succeed", propose_id);
Ok(synced_res)
}

Expand Down Expand Up @@ -590,7 +591,6 @@ impl<C: Command> ClientApi for Unary<C> {
let mut err = None;

while let Some((id, resp)) = responses.next().await {
debug!("{id} {max_term}");
let inner = match resp {
Ok(r) => r,
Err(e) => {
Expand Down Expand Up @@ -634,6 +634,7 @@ impl<C: Command> ClientApi for Unary<C> {
}
return Ok(res);
}
debug!("fetch_cluster quorum ok, but members are empty");
}
}

Expand Down

0 comments on commit 76cfdf5

Please sign in to comment.