From 91e68f0aefcb696d066305d800279371b5008584 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 21 Dec 2023 20:44:49 +0800 Subject: [PATCH] tests: add tests for retry layer Signed-off-by: iGxnon --- curp/src/client_new/tests.rs | 86 +++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/curp/src/client_new/tests.rs b/curp/src/client_new/tests.rs index 1488c3fc6..abf1231a0 100644 --- a/curp/src/client_new/tests.rs +++ b/curp/src/client_new/tests.rs @@ -11,7 +11,10 @@ use dashmap::DashMap; use tokio::time::Instant; use tracing_test::traced_test; -use super::unary::Unary; +use super::{ + retry::{Retry, RetryConfig}, + unary::Unary, +}; use crate::{ client_new::ClientApi, members::ServerId, @@ -559,3 +562,84 @@ async fn test_unary_propose_return_early_err() { assert_eq!(*counter.lock().unwrap(), 1); } } + +// Tests for retry layer + +#[traced_test] +#[tokio::test] +async fn test_retry_propose_return_no_retry_error() { + for early_err in [ + CurpError::duplicated(), + CurpError::shutting_down(), + CurpError::invalid_config(), + CurpError::node_already_exists(), + CurpError::node_not_exist(), + CurpError::learner_not_catch_up(), + ] { + // all no retry errors are returned early + assert!(early_err.return_early()); + // record how many times rpc was invoked. + let counter = Arc::new(Mutex::new(0)); + let connects = init_mocked_connects(5, |id, conn| { + let err = early_err.clone(); + let counter_c = Arc::clone(&counter); + conn.expect_propose().return_once(move |_req, _timeout| { + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + let err = early_err.clone(); + let counter_c = Arc::clone(&counter); + conn.expect_wait_synced() + .return_once(move |_req, _timeout| { + assert!(id == 0, "wait synced should send to leader"); + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let retry = Retry::new( + unary, + RetryConfig::new_fixed(Duration::from_millis(100), 5, false), + ); + let err = retry + .propose(&TestCommand::default(), false) + .await + .unwrap_err(); + assert_eq!(err.message(), tonic::Status::from(early_err).message()); + // fast path + slow path = 2 + assert_eq!(*counter.lock().unwrap(), 2); + } +} + +#[traced_test] +#[tokio::test] +async fn test_retry_propose_return_retry_error() { + for early_err in [ + CurpError::expired_client_id(), + CurpError::key_conflict(), + CurpError::RpcTransport(()), + CurpError::internal("No reason"), + ] { + let connects = init_mocked_connects(5, |id, conn| { + let err = early_err.clone(); + conn.expect_propose() + .returning(move |_req, _timeout| Err(err.clone())); + if id == 0 { + let err = early_err.clone(); + conn.expect_wait_synced() + .times(5) // wait synced should be retried in 5 times on leader + .returning(move |_req, _timeout| Err(err.clone())); + } + }); + let unary = Unary::::new(connects, None, Some((0, 1))); + let retry = Retry::new( + unary, + RetryConfig::new_fixed(Duration::from_millis(10), 5, false), + ); + let err = retry + .propose(&TestCommand::default(), false) + .await + .unwrap_err(); + assert_eq!(err.message(), "request timeout"); + } +}