Skip to content

Commit

Permalink
refactor: use infinity client retry in curp tests
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds authored and Phoenix500526 committed Oct 9, 2023
1 parent 7e38ff2 commit 6408669
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 28 deletions.
13 changes: 11 additions & 2 deletions curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use tokio::{
};
use tracing::debug;
use utils::{
config::{ClientConfig, CurpConfigBuilder, StorageConfig},
config::{
default_client_wait_synced_timeout, default_propose_timeout, default_retry_timeout,
ClientConfig, CurpConfigBuilder, StorageConfig,
},
shutdown::{self, Trigger},
};

Expand Down Expand Up @@ -179,7 +182,13 @@ impl CurpGroup {
&self.nodes[id]
}

pub async fn new_client(&self, config: ClientConfig) -> Client<TestCommand> {
pub async fn new_client(&self) -> Client<TestCommand> {
let config = ClientConfig::new(
default_client_wait_synced_timeout(),
default_propose_timeout(),
default_retry_timeout(),
usize::MAX,
);
Client::builder()
.config(config)
.build_from_addrs(self.all.values().cloned().collect_vec())
Expand Down
5 changes: 2 additions & 3 deletions curp/tests/read_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use curp_test_utils::{
test_cmd::{TestCommand, TestCommandResult},
};
use test_macros::abort_on_panic;
use utils::config::ClientConfig;

use crate::common::curp_group::CurpGroup;

Expand All @@ -17,7 +16,7 @@ mod common;
async fn read_state() {
init_logger();
let group = CurpGroup::new(3).await;
let put_client = group.new_client(ClientConfig::default()).await;
let put_client = group.new_client().await;
let put_cmd = TestCommand::new_put(vec![0], 0).set_exe_dur(Duration::from_millis(100));
let put_id = put_cmd.id().clone();
tokio::spawn(async move {
Expand All @@ -26,7 +25,7 @@ async fn read_state() {
TestCommandResult::default(),
);
});
let get_client = group.new_client(ClientConfig::default()).await;
let get_client = group.new_client().await;
let res = get_client
.fetch_read_state(&TestCommand::new_get(vec![0]))
.await
Expand Down
16 changes: 8 additions & 8 deletions curp/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn basic_propose() {
init_logger();

let group = CurpGroup::new(3).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

assert_eq!(
client
Expand Down Expand Up @@ -67,7 +67,7 @@ async fn synced_propose() {
init_logger();

let mut group = CurpGroup::new(5).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;
let cmd = TestCommand::new_get(vec![0]);

let (er, index) = client.propose(cmd.clone(), false).await.unwrap();
Expand All @@ -94,7 +94,7 @@ async fn exe_exact_n_times() {
init_logger();

let mut group = CurpGroup::new(3).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;
let cmd = TestCommand::new_get(vec![0]);

let er = client.propose(cmd.clone(), true).await.unwrap().0;
Expand Down Expand Up @@ -204,7 +204,7 @@ async fn concurrent_cmd_order() {

sleep_secs(1).await;

let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

assert_eq!(
client
Expand All @@ -224,7 +224,7 @@ async fn concurrent_cmd_order_should_have_correct_revision() {
init_logger();

let group = CurpGroup::new(3).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

let sample_range = 1..=100;

Expand Down Expand Up @@ -256,7 +256,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
let tmp_path = tempfile::TempDir::new().unwrap().into_path();
let group = CurpGroup::new_rocks(3, tmp_path.clone()).await;

let req_client = group.new_client(ClientConfig::default()).await;
let req_client = group.new_client().await;
let collection_task = tokio::spawn(async move {
let mut collection = vec![];
for i in 0..10 {
Expand All @@ -269,7 +269,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
collection
});

let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;
client.shutdown().await.unwrap();

let res = client
Expand All @@ -285,7 +285,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
assert!(group.is_finished());

let group = CurpGroup::new_rocks(3, tmp_path).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;
for i in collection {
let res = client.propose(TestCommand::new_get(vec![i]), true).await;
assert_eq!(res.unwrap().0.values, vec![i]);
Expand Down
13 changes: 11 additions & 2 deletions simulation/src/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use parking_lot::Mutex;
use tokio::sync::mpsc;
use tracing::debug;
use utils::{
config::{ClientConfig, CurpConfigBuilder, StorageConfig},
config::{
default_client_wait_synced_timeout, default_propose_timeout, default_retry_timeout,
ClientConfig, CurpConfigBuilder, StorageConfig,
},
shutdown,
};

Expand Down Expand Up @@ -159,7 +162,13 @@ impl CurpGroup {
&self.nodes[id]
}

pub async fn new_client(&self, config: ClientConfig) -> SimClient<TestCommand> {
pub async fn new_client(&self) -> SimClient<TestCommand> {
let config = ClientConfig::new(
default_client_wait_synced_timeout(),
default_propose_timeout(),
default_retry_timeout(),
usize::MAX,
);
let all_members = self
.nodes
.iter()
Expand Down
5 changes: 2 additions & 3 deletions simulation/tests/it/curp/server_election.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use curp::members::ServerId;
use curp_test_utils::{init_logger, sleep_secs, test_cmd::TestCommand};
use simulation::curp_group::CurpGroup;
use utils::config::ClientConfig;

/// Wait some time for the election to finish, and get the leader to ensure that the election is
/// completed.
Expand Down Expand Up @@ -122,7 +121,7 @@ async fn propose_after_reelect() {
init_logger();

let group = CurpGroup::new(5).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;
assert_eq!(
client
.propose(TestCommand::new_put(vec![0], 0), true)
Expand Down Expand Up @@ -156,7 +155,7 @@ async fn conflict_should_detected_in_new_leader() {
init_logger();

let group = CurpGroup::new(3).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;
let leader1 = group.get_leader().await.0;

// client only propose to leader
Expand Down
17 changes: 8 additions & 9 deletions simulation/tests/it/curp/server_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use engine::StorageEngine;
use itertools::Itertools;
use simulation::curp_group::{CurpGroup, ProposeRequest};
use tracing::debug;
use utils::config::ClientConfig;

#[madsim::test]
async fn leader_crash_and_recovery() {
init_logger();

let mut group = CurpGroup::new(5).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

let leader = group.try_get_leader().await.unwrap().0;
group.crash(leader).await;
Expand Down Expand Up @@ -58,7 +57,7 @@ async fn follower_crash_and_recovery() {
init_logger();

let mut group = CurpGroup::new(5).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

let leader = group.try_get_leader().await.unwrap().0;
let follower = *group.nodes.keys().find(|&id| id != &leader).unwrap();
Expand Down Expand Up @@ -103,7 +102,7 @@ async fn leader_and_follower_both_crash_and_recovery() {
init_logger();

let mut group = CurpGroup::new(5).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

let leader = group.try_get_leader().await.unwrap().0;
let follower = *group.nodes.keys().find(|&id| id != &leader).unwrap();
Expand Down Expand Up @@ -174,7 +173,7 @@ async fn new_leader_will_recover_spec_cmds_cond1() {
init_logger();

let mut group = CurpGroup::new(5).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

let leader1 = group.get_leader().await.0;

Expand Down Expand Up @@ -227,7 +226,7 @@ async fn new_leader_will_recover_spec_cmds_cond2() {
init_logger();

let group = CurpGroup::new(5).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

let leader1 = group.get_leader().await.0;

Expand Down Expand Up @@ -267,7 +266,7 @@ async fn old_leader_will_keep_original_states() {
init_logger();

let group = CurpGroup::new(5).await;
let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

// 0: let's first propose an initial cmd0
let cmd0 = TestCommand::new_put(vec![0], 0);
Expand Down Expand Up @@ -335,7 +334,7 @@ async fn minority_crash_and_recovery() {

let mut group = CurpGroup::new(NODES).await;

let client = group.new_client(ClientConfig::default()).await;
let client = group.new_client().await;

assert_eq!(
client
Expand Down Expand Up @@ -404,7 +403,7 @@ async fn recovery_after_compaction() {
init_logger();

let mut group = CurpGroup::new(5).await;
let client = group.new_client(Default::default()).await;
let client = group.new_client().await;
let (leader, _term) = group.get_leader().await;
let node_id = group
.nodes
Expand Down
2 changes: 1 addition & 1 deletion utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ pub struct ClientConfig {
#[serde(with = "duration_format", default = "default_retry_timeout")]
retry_timeout: Duration,

/// Curp client retry interval
/// Curp client retry count
#[getset(get = "pub")]
#[serde(default = "default_retry_count")]
retry_count: usize,
Expand Down

0 comments on commit 6408669

Please sign in to comment.