From 76cfdf504b71767b9e2914cf15118cbd16cadd27 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 21 Dec 2023 20:14:46 +0800 Subject: [PATCH] tests: add tests for unary propose Signed-off-by: iGxnon --- curp/src/client_new/tests.rs | 220 ++++++++++++++++++++++++++++++++++- curp/src/client_new/unary.rs | 3 +- 2 files changed, 219 insertions(+), 4 deletions(-) diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs index 91418ecd4b..1488c3fc66 100644 --- a/curp/src/client_new/tests.rs +++ b/curp/src/client_new/tests.rs @@ -1,11 +1,14 @@ use std::{ collections::HashMap, ops::AddAssign, - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Duration, }; -use curp_test_utils::test_cmd::{TestCommand, TestCommandResult}; +use curp_external_api::LogIndex; +use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult}; use dashmap::DashMap; +use tokio::time::Instant; use tracing_test::traced_test; use super::unary::Unary; @@ -14,7 +17,7 @@ use crate::{ members::ServerId, rpc::{ connect::{ConnectApi, MockConnectApi}, - CurpError, FetchClusterResponse, Member, ProposeId, ProposeResponse, + CurpError, FetchClusterResponse, Member, ProposeId, ProposeResponse, WaitSyncedResponse, }, }; @@ -345,3 +348,214 @@ async fn test_unary_fast_round_with_two_leader() { // quorum: server(0, 1, 2, 3) assert_eq!(res, TestCommandResult::new(vec![2], vec![2])); } + +#[traced_test] +#[tokio::test] +async fn test_unary_slow_round_fetch_leader_first() { + let flag = Arc::new(AtomicBool::new(false)); + let connects = init_mocked_connects(3, |id, conn| { + let flag_c = Arc::clone(&flag); + conn.expect_fetch_cluster() + .return_once(move |_req, _timeout| { + flag_c.store(true, std::sync::atomic::Ordering::Relaxed); + Ok(tonic::Response::new(FetchClusterResponse { + leader_id: Some(0), + term: 1, + cluster_id: 123, + members: vec![ + Member::new(0, "S0", vec!["A0".to_owned()], false), + Member::new(1, "S1", vec!["A1".to_owned()], false), + Member::new(2, "S2", vec!["A2".to_owned()], false), + ], + cluster_version: 1, + })) + }); + let flag_c = Arc::clone(&flag); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + assert!( + flag_c.load(std::sync::atomic::Ordering::Relaxed), + "fetch_leader should invoke first" + ); + Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< + TestCommand, + >( + Ok(TestCommandResult::default()), + Some(Ok(1.into())), + ))) + }); + }); + let unary = Unary::::new(connects, None, None); + let res = unary.slow_round(ProposeId(0, 0)).await.unwrap().unwrap(); + assert_eq!(LogIndex::from(res.0), 1); + assert_eq!(res.1, TestCommandResult::default()); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_propose_fast_path_works() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_propose().return_once(move |_req, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), + 1 | 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + std::thread::sleep(Duration::from_millis(100)); + Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< + TestCommand, + >( + Ok(TestCommandResult::default()), + Some(Ok(1.into())), + ))) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let res = unary + .propose(&TestCommand::default(), true) + .await + .unwrap() + .unwrap(); + assert_eq!(res, (TestCommandResult::default(), None)); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_propose_slow_path_works() { + let connects = init_mocked_connects(5, |id, conn| { + conn.expect_propose().return_once(move |_req, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), + 1 | 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + std::thread::sleep(Duration::from_millis(100)); + Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< + TestCommand, + >( + Ok(TestCommandResult::default()), + Some(Ok(1.into())), + ))) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let start_at = Instant::now(); + let res = unary + .propose(&TestCommand::default(), false) + .await + .unwrap() + .unwrap(); + assert!( + start_at.elapsed() > Duration::from_millis(100), + "slow round takes at least 100ms" + ); + assert_eq!( + res, + (TestCommandResult::default(), Some(LogIndexResult::from(1))) + ); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_propose_fast_path_fallback_slow_path() { + // record how many times `handle_propose` was invoked. + let counter = Arc::new(Mutex::new(0)); + let connects = init_mocked_connects(5, |id, conn| { + let counter_c = Arc::clone(&counter); + conn.expect_propose().return_once(move |_req, _timeout| { + counter_c.lock().unwrap().add_assign(1); + // insufficient quorum + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), + 1 | 2 => ProposeResponse::new_empty(), + 3 | 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + std::thread::sleep(Duration::from_millis(100)); + Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< + TestCommand, + >( + Ok(TestCommandResult::default()), + Some(Ok(1.into())), + ))) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let start_at = Instant::now(); + let res = unary + .propose(&TestCommand::default(), true) + .await + .unwrap() + .unwrap(); + assert!( + start_at.elapsed() > Duration::from_millis(100), + "slow round takes at least 100ms" + ); + // indicate that we actually run out of fast round + assert_eq!(*counter.lock().unwrap(), 5); + assert_eq!( + res, + (TestCommandResult::default(), Some(LogIndexResult::from(1))) + ); +} + +#[traced_test] +#[tokio::test] +async fn test_unary_propose_return_early_err() { + for early_err in [ + CurpError::duplicated(), + CurpError::shutting_down(), + CurpError::invalid_config(), + CurpError::node_already_exists(), + CurpError::node_not_exist(), + CurpError::learner_not_catch_up(), + CurpError::expired_client_id(), + CurpError::wrong_cluster_version(), + CurpError::redirect(Some(1), 0), + ] { + assert!(early_err.return_early()); + // record how many times rpc was invoked. + let counter = Arc::new(Mutex::new(0)); + let connects = init_mocked_connects(5, |id, conn| { + let err = early_err.clone(); + let counter_c = Arc::clone(&counter); + conn.expect_propose().return_once(move |_req, _timeout| { + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + let err = early_err.clone(); + let counter_c = Arc::clone(&counter); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let err = unary + .propose(&TestCommand::default(), true) + .await + .unwrap_err(); + assert_eq!(err, early_err); + assert_eq!(*counter.lock().unwrap(), 1); + } +} diff --git a/curp/src/client_new/unary.rs b/curp/src/client_new/unary.rs index e5aef4f2a6..d5779635f0 100644 --- a/curp/src/client_new/unary.rs +++ b/curp/src/client_new/unary.rs @@ -404,6 +404,7 @@ impl Unary { // Same as fast round, we blame the server for the serializing error. CurpError::from(ser_err) })?; + debug!("slow round for cmd({}) succeed", propose_id); Ok(synced_res) } @@ -590,7 +591,6 @@ impl ClientApi for Unary { let mut err = None; while let Some((id, resp)) = responses.next().await { - debug!("{id} {max_term}"); let inner = match resp { Ok(r) => r, Err(e) => { @@ -634,6 +634,7 @@ impl ClientApi for Unary { } return Ok(res); } + debug!("fetch_cluster quorum ok, but members are empty"); } }