diff --git a/crates/benchmark/src/bench_client.rs b/crates/benchmark/src/bench_client.rs index 3b0629e32..e836ab244 100644 --- a/crates/benchmark/src/bench_client.rs +++ b/crates/benchmark/src/bench_client.rs @@ -221,7 +221,7 @@ mod test { cluster.start().await; let use_curp_client = true; let config = ClientOptions::default(); - let mut client = BenchClient::new(cluster.addrs(), use_curp_client, config) + let mut client = BenchClient::new(cluster.all_client_addrs(), use_curp_client, config) .await .unwrap(); //check xline client put value exist @@ -238,7 +238,7 @@ mod test { cluster.start().await; let use_curp_client = false; let config = ClientOptions::default(); - let mut client = BenchClient::new(cluster.addrs(), use_curp_client, config) + let mut client = BenchClient::new(cluster.all_client_addrs(), use_curp_client, config) .await .unwrap(); diff --git a/crates/curp/proto/common b/crates/curp/proto/common index 0ac729f32..c109a73a4 160000 --- a/crates/curp/proto/common +++ b/crates/curp/proto/common @@ -1 +1 @@ -Subproject commit 0ac729f32fa0b5bee8e6b6d6e65bedcb9a7d05ae +Subproject commit c109a73a42c0d8e193891417b7637d32a902b399 diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 1638507f8..b3c447c54 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -81,6 +81,7 @@ pub trait ClientApi { &self, node_id: ServerId, node_name: String, + node_client_urls: Vec, ) -> Result<(), Self::Error>; /// Send move leader request @@ -144,6 +145,7 @@ trait RepeatableClientApi: ClientApi { propose_id: ProposeId, node_id: ServerId, node_name: String, + node_client_urls: Vec, ) -> Result<(), Self::Error>; } diff --git a/crates/curp/src/client/retry.rs b/crates/curp/src/client/retry.rs index ebfcd779b..77482c23d 100644 --- a/crates/curp/src/client/retry.rs +++ b/crates/curp/src/client/retry.rs @@ -238,13 +238,21 @@ where &self, node_id: ServerId, node_name: String, + node_client_urls: Vec, ) -> Result<(), Self::Error> { let propose_id = self .retry::<_, _>(RepeatableClientApi::gen_propose_id) .await?; self.retry::<_, _>(|client| { let name_c = node_name.clone(); - RepeatableClientApi::propose_publish(client, propose_id, node_id, name_c) + let node_client_urls_c = node_client_urls.clone(); + RepeatableClientApi::propose_publish( + client, + propose_id, + node_id, + name_c, + node_client_urls_c, + ) }) .await } diff --git a/crates/curp/src/client/tests.rs b/crates/curp/src/client/tests.rs index de653a1ef..29b05b4e1 100644 --- a/crates/curp/src/client/tests.rs +++ b/crates/curp/src/client/tests.rs @@ -8,8 +8,11 @@ use std::{ use curp_external_api::LogIndex; use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult}; use tokio::time::Instant; +#[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; use tracing_test::traced_test; +#[cfg(madsim)] +use utils::ClientTlsConfig; use super::{ retry::{Retry, RetryConfig}, @@ -78,9 +81,9 @@ async fn test_unary_fetch_clusters_serializable() { term: 1, cluster_id: 123, members: vec![ - Member::new(0, "S0", vec!["A0".to_owned()], false), - Member::new(1, "S1", vec!["A1".to_owned()], false), - Member::new(2, "S2", vec!["A2".to_owned()], false), + Member::new(0, "S0", vec!["A0".to_owned()], [], false), + Member::new(1, "S1", vec!["A1".to_owned()], [], false), + Member::new(2, "S2", vec!["A2".to_owned()], [], false), ], cluster_version: 1, })) @@ -136,11 +139,11 @@ async fn test_unary_fetch_clusters_linearizable() { term: 2, cluster_id: 123, members: vec![ - Member::new(0, "S0", vec!["A0".to_owned()], false), - Member::new(1, "S1", vec!["A1".to_owned()], false), - Member::new(2, "S2", vec!["A2".to_owned()], false), - Member::new(3, "S3", vec!["A3".to_owned()], false), - Member::new(4, "S4", vec!["A4".to_owned()], false), + Member::new(0, "S0", vec!["A0".to_owned()], [], false), + Member::new(1, "S1", vec!["A1".to_owned()], [], false), + Member::new(2, "S2", vec!["A2".to_owned()], [], false), + Member::new(3, "S3", vec!["A3".to_owned()], [], false), + Member::new(4, "S4", vec!["A4".to_owned()], [], false), ], cluster_version: 1, }, @@ -163,11 +166,11 @@ async fn test_unary_fetch_clusters_linearizable() { term: 1, // with the old term cluster_id: 123, members: vec![ - Member::new(0, "S0", vec!["B0".to_owned()], false), - Member::new(1, "S1", vec!["B1".to_owned()], false), - Member::new(2, "S2", vec!["B2".to_owned()], false), - Member::new(3, "S3", vec!["B3".to_owned()], false), - Member::new(4, "S4", vec!["B4".to_owned()], false), + Member::new(0, "S0", vec!["B0".to_owned()], [], false), + Member::new(1, "S1", vec!["B1".to_owned()], [], false), + Member::new(2, "S2", vec!["B2".to_owned()], [], false), + Member::new(3, "S3", vec!["B3".to_owned()], [], false), + Member::new(4, "S4", vec!["B4".to_owned()], [], false), ], cluster_version: 1, }, @@ -202,11 +205,11 @@ async fn test_unary_fetch_clusters_linearizable_failed() { term: 2, cluster_id: 123, members: vec![ - Member::new(0, "S0", vec!["A0".to_owned()], false), - Member::new(1, "S1", vec!["A1".to_owned()], false), - Member::new(2, "S2", vec!["A2".to_owned()], false), - Member::new(3, "S3", vec!["A3".to_owned()], false), - Member::new(4, "S4", vec!["A4".to_owned()], false), + Member::new(0, "S0", vec!["A0".to_owned()], [], false), + Member::new(1, "S1", vec!["A1".to_owned()], [], false), + Member::new(2, "S2", vec!["A2".to_owned()], [], false), + Member::new(3, "S3", vec!["A3".to_owned()], [], false), + Member::new(4, "S4", vec!["A4".to_owned()], [], false), ], cluster_version: 1, }, @@ -229,11 +232,11 @@ async fn test_unary_fetch_clusters_linearizable_failed() { term: 1, // with the old term cluster_id: 123, members: vec![ - Member::new(0, "S0", vec!["B0".to_owned()], false), - Member::new(1, "S1", vec!["B1".to_owned()], false), - Member::new(2, "S2", vec!["B2".to_owned()], false), - Member::new(3, "S3", vec!["B3".to_owned()], false), - Member::new(4, "S4", vec!["B4".to_owned()], false), + Member::new(0, "S0", vec!["B0".to_owned()], [], false), + Member::new(1, "S1", vec!["B1".to_owned()], [], false), + Member::new(2, "S2", vec!["B2".to_owned()], [], false), + Member::new(3, "S3", vec!["B3".to_owned()], [], false), + Member::new(4, "S4", vec!["B4".to_owned()], [], false), ], cluster_version: 1, }, @@ -408,9 +411,9 @@ async fn test_unary_slow_round_fetch_leader_first() { term: 1, cluster_id: 123, members: vec![ - Member::new(0, "S0", vec!["A0".to_owned()], false), - Member::new(1, "S1", vec!["A1".to_owned()], false), - Member::new(2, "S2", vec!["A2".to_owned()], false), + Member::new(0, "S0", vec!["A0".to_owned()], [], false), + Member::new(1, "S1", vec!["A1".to_owned()], [], false), + Member::new(2, "S2", vec!["A2".to_owned()], [], false), ], cluster_version: 1, })) @@ -663,11 +666,11 @@ async fn test_retry_propose_return_retry_error() { term: 2, cluster_id: 123, members: vec![ - Member::new(0, "S0", vec!["A0".to_owned()], false), - Member::new(1, "S1", vec!["A1".to_owned()], false), - Member::new(2, "S2", vec!["A2".to_owned()], false), - Member::new(3, "S3", vec!["A3".to_owned()], false), - Member::new(4, "S4", vec!["A4".to_owned()], false), + Member::new(0, "S0", vec!["A0".to_owned()], [], false), + Member::new(1, "S1", vec!["A1".to_owned()], [], false), + Member::new(2, "S2", vec!["A2".to_owned()], [], false), + Member::new(3, "S3", vec!["A3".to_owned()], [], false), + Member::new(4, "S4", vec!["A4".to_owned()], [], false), ], cluster_version: 1, })) diff --git a/crates/curp/src/client/unary.rs b/crates/curp/src/client/unary.rs index ca3f31bbf..3e8e8505f 100644 --- a/crates/curp/src/client/unary.rs +++ b/crates/curp/src/client/unary.rs @@ -234,9 +234,11 @@ impl ClientApi for Unary { &self, node_id: ServerId, node_name: String, + node_client_urls: Vec, ) -> Result<(), Self::Error> { let propose_id = self.gen_propose_id().await?; - RepeatableClientApi::propose_publish(self, propose_id, node_id, node_name).await + RepeatableClientApi::propose_publish(self, propose_id, node_id, node_name, node_client_urls) + .await } /// Send move leader request @@ -515,8 +517,9 @@ impl RepeatableClientApi for Unary { propose_id: ProposeId, node_id: ServerId, node_name: String, + node_client_urls: Vec, ) -> Result<(), Self::Error> { - let req = PublishRequest::new(propose_id, node_id, node_name); + let req = PublishRequest::new(propose_id, node_id, node_name, node_client_urls); let timeout = self.config.wait_synced_timeout; let _ig = self .map_leader(|conn| async move { conn.publish(req, timeout).await }) diff --git a/crates/curp/src/log_entry.rs b/crates/curp/src/log_entry.rs index 3538fc79d..0cb890332 100644 --- a/crates/curp/src/log_entry.rs +++ b/crates/curp/src/log_entry.rs @@ -37,8 +37,8 @@ pub(crate) enum EntryData { ConfChange(Vec), /// `Shutdown` entry Shutdown, - /// `SetName` entry - SetName(ServerId, String), + /// `SetNodeState` entry + SetNodeState(ServerId, String, Vec), } impl From> for EntryData { @@ -64,7 +64,7 @@ impl From> for EntryData { impl From for EntryData { fn from(value: PublishRequest) -> Self { - EntryData::SetName(value.node_id, value.name) + EntryData::SetNodeState(value.node_id, value.name, value.client_urls) } } diff --git a/crates/curp/src/members.rs b/crates/curp/src/members.rs index f25c16a35..e2eaa236c 100644 --- a/crates/curp/src/members.rs +++ b/crates/curp/src/members.rs @@ -30,13 +30,14 @@ impl Member { id: ServerId, name: impl Into, peer_urls: impl Into>, + client_urls: impl Into>, is_learner: bool, ) -> Self { Self { id, name: name.into(), peer_urls: peer_urls.into(), - client_urls: vec![], // TODO + client_urls: client_urls.into(), is_learner, } } @@ -102,17 +103,20 @@ impl ClusterInfo { #[inline] #[must_use] pub fn from_members_map( - all_members_addrs: HashMap>, + all_members_peer_urls: HashMap>, + self_client_urls: impl Into>, self_name: &str, ) -> Self { let mut member_id = 0; + let self_client_urls = self_client_urls.into(); let members = DashMap::new(); - for (name, addrs) in all_members_addrs { - let id = Self::calculate_member_id(addrs.clone(), "", None); + for (name, peer_urls) in all_members_peer_urls { + let id = Self::calculate_member_id(peer_urls.clone(), "", None); + let mut member = Member::new(id, name.clone(), peer_urls, [], false); if name == self_name { member_id = id; + member.client_urls = self_client_urls.clone(); } - let member = Member::new(id, name, addrs, false); let _ig = members.insert(id, member); } debug_assert!(member_id != 0, "self_id should not be 0"); @@ -207,13 +211,20 @@ impl ClusterInfo { addrs } - /// Get server addresses via server id + /// Get server peer urls via server id #[must_use] #[inline] - pub fn addrs(&self, id: ServerId) -> Option> { + pub fn peer_urls(&self, id: ServerId) -> Option> { self.members.get(&id).map(|t| t.peer_urls.clone()) } + /// Get server client urls via server id + #[must_use] + #[inline] + pub fn client_urls(&self, id: ServerId) -> Option> { + self.members.get(&id).map(|t| t.client_urls.clone()) + } + /// Get the current member /// # Panics /// panic if self member id is not in members @@ -224,13 +235,20 @@ impl ClusterInfo { self.members.get(&self.member_id).unwrap() } - /// Get the current server address + /// Get the current server peer urls #[must_use] #[inline] - pub fn self_addrs(&self) -> Vec { + pub fn self_peer_urls(&self) -> Vec { self.self_member().peer_urls.clone() } + /// Get the current server client addrs + #[must_use] + #[inline] + pub fn self_client_urls(&self) -> Vec { + self.self_member().client_urls.clone() + } + /// Get the current server id #[must_use] #[inline] @@ -382,11 +400,15 @@ impl ClusterInfo { self.members.contains_key(&node_id) } - /// Set name for a node - pub(crate) fn set_name(&self, node_id: ServerId, name: String) { + /// Set state for a node + pub(crate) fn set_node_state(&self, node_id: ServerId, name: String, client_urls: Vec) { if let Some(mut s) = self.members.get_mut(&node_id) { - debug!("set name for node {} to {}", node_id, name); + debug!( + "set name and client_urls for node {} to {},{:?}", + node_id, name, client_urls + ); s.name = name; + s.client_urls = client_urls; } } } @@ -442,9 +464,9 @@ mod tests { ("S3".to_owned(), vec!["S3".to_owned()]), ]); - let node1 = ClusterInfo::from_members_map(all_members.clone(), "S1"); - let node2 = ClusterInfo::from_members_map(all_members.clone(), "S2"); - let node3 = ClusterInfo::from_members_map(all_members, "S3"); + let node1 = ClusterInfo::from_members_map(all_members.clone(), [], "S1"); + let node2 = ClusterInfo::from_members_map(all_members.clone(), [], "S2"); + let node3 = ClusterInfo::from_members_map(all_members, [], "S3"); assert_ne!(node1.self_id(), node2.self_id()); assert_ne!(node1.self_id(), node3.self_id()); @@ -462,10 +484,10 @@ mod tests { ("S3".to_owned(), vec!["S3".to_owned()]), ]); - let node1 = ClusterInfo::from_members_map(all_members, "S1"); + let node1 = ClusterInfo::from_members_map(all_members, [], "S1"); let peers = node1.peers_addrs(); let node1_id = node1.self_id(); - let node1_url = node1.self_addrs(); + let node1_url = node1.self_peer_urls(); assert!(!peers.contains_key(&node1_id)); assert_eq!(peers.len(), 2); assert_eq!(node1.voters_len(), peers.len() + 1); diff --git a/crates/curp/src/rpc/mod.rs b/crates/curp/src/rpc/mod.rs index 7c5e5b000..5b24e7e6e 100644 --- a/crates/curp/src/rpc/mod.rs +++ b/crates/curp/src/rpc/mod.rs @@ -585,11 +585,17 @@ impl MoveLeaderRequest { impl PublishRequest { /// Create a new `PublishRequest` - pub(crate) fn new(id: ProposeId, node_id: ServerId, name: String) -> Self { + pub(crate) fn new( + id: ProposeId, + node_id: ServerId, + name: String, + client_urls: Vec, + ) -> Self { Self { propose_id: Some(id.into()), node_id, name, + client_urls, } } diff --git a/crates/curp/src/server/cmd_worker/conflict_checked_mpmc.rs b/crates/curp/src/server/cmd_worker/conflict_checked_mpmc.rs index cb3981c80..4946dc479 100644 --- a/crates/curp/src/server/cmd_worker/conflict_checked_mpmc.rs +++ b/crates/curp/src/server/cmd_worker/conflict_checked_mpmc.rs @@ -356,7 +356,7 @@ impl> Filter { EntryData::ConfChange(_) | EntryData::Shutdown | EntryData::Empty - | EntryData::SetName(_, _) => None, + | EntryData::SetNodeState(_, _, _) => None, }; *exe_st = ExeState::Executing; let task = Task { diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index 46eecd5f5..8ede8acd7 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -131,7 +131,7 @@ async fn worker_exe, RC: RoleChange>( EntryData::ConfChange(_) | EntryData::Shutdown | EntryData::Empty - | EntryData::SetName(_, _) => true, + | EntryData::SetNodeState(_, _, _) => true, }; if !success { ce.trigger(entry.inflight_id(), entry.index); @@ -222,12 +222,13 @@ async fn worker_as, RC: RoleChange>( } true } - EntryData::SetName(node_id, ref name) => { + EntryData::SetNodeState(node_id, ref name, ref client_urls) => { if let Err(e) = ce.set_last_applied(entry.index) { error!("failed to set last_applied, {e}"); return false; } - curp.cluster().set_name(node_id, name.clone()); + curp.cluster() + .set_node_state(node_id, name.clone(), client_urls.clone()); true } EntryData::Empty => true, diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index d141dad66..298677e75 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -862,7 +862,7 @@ impl RawCurp { EntryData::Empty | EntryData::Command(_) | EntryData::Shutdown - | EntryData::SetName(_, _) => false, + | EntryData::SetNodeState(_, _, _) => false, }); // extra check to shutdown removed node if !contains_candidate && !remove_candidate_is_not_committed { @@ -1390,7 +1390,7 @@ impl RawCurp { Some(ConfChange::remove(node_id)) } ConfChangeType::Remove => { - let member = Member::new(node_id, name, old_addrs.clone(), is_learner); + let member = Member::new(node_id, name, old_addrs.clone(), [], is_learner); self.cst .map_lock(|mut cst_l| _ = cst_l.config.insert(node_id, is_learner)); self.lst.insert(node_id, is_learner); @@ -1747,7 +1747,7 @@ impl RawCurp { let _ignore = ucp_l.insert(propose_id, PoolEntry::new(propose_id, conf_change.clone())); } - EntryData::Shutdown | EntryData::Empty | EntryData::SetName(_, _) => {} + EntryData::Shutdown | EntryData::Empty | EntryData::SetNodeState(_, _, _) => {} } } } @@ -1792,7 +1792,7 @@ impl RawCurp { let (modified, fallback_info) = match conf_change.change_type() { ConfChangeType::Add | ConfChangeType::AddLearner => { let is_learner = matches!(conf_change.change_type(), ConfChangeType::AddLearner); - let member = Member::new(node_id, "", conf_change.address.clone(), is_learner); + let member = Member::new(node_id, "", conf_change.address.clone(), [], is_learner); _ = cst_l.config.insert(node_id, is_learner); self.lst.insert(node_id, is_learner); _ = self.ctx.sync_events.insert(node_id, Arc::new(Event::new())); diff --git a/crates/curp/src/server/raw_curp/tests.rs b/crates/curp/src/server/raw_curp/tests.rs index f0d51b2cb..827213497 100644 --- a/crates/curp/src/server/raw_curp/tests.rs +++ b/crates/curp/src/server/raw_curp/tests.rs @@ -47,7 +47,7 @@ impl RawCurp { let all_members: HashMap<_, _> = (0..n) .map(|i| (format!("S{i}"), vec![format!("S{i}")])) .collect(); - let cluster_info = Arc::new(ClusterInfo::from_members_map(all_members, "S0")); + let cluster_info = Arc::new(ClusterInfo::from_members_map(all_members, [], "S0")); let cmd_board = Arc::new(RwLock::new(CommandBoard::new())); let spec_pool = Arc::new(Mutex::new(SpeculativePool::new())); let uncommitted_pool = Arc::new(Mutex::new(UncommittedPool::new())); @@ -1027,7 +1027,7 @@ fn update_node_should_update_the_address_of_node() { InnerConnectApiWrapper::new_from_arc(Arc::new(mock_connect)), ); assert_eq!( - curp.cluster().addrs(follower_id), + curp.cluster().peer_urls(follower_id), Some(vec!["S1".to_owned()]) ); let changes = vec![ConfChange::update( @@ -1038,7 +1038,7 @@ fn update_node_should_update_the_address_of_node() { let infos = curp.apply_conf_change(changes.clone()); assert_eq!(infos, (vec!["S1".to_owned()], String::new(), false)); assert_eq!( - curp.cluster().addrs(follower_id), + curp.cluster().peer_urls(follower_id), Some(vec!["http://127.0.0.1:4567".to_owned()]) ); curp.fallback_conf_change(changes, infos.0, infos.1, infos.2); @@ -1070,7 +1070,7 @@ fn leader_handle_propose_conf_change() { }; let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); assert_eq!( - curp.cluster().addrs(follower_id), + curp.cluster().peer_urls(follower_id), Some(vec!["S1".to_owned()]) ); let changes = vec![ConfChange::update( @@ -1098,7 +1098,7 @@ fn follower_handle_propose_conf_change() { let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); assert_eq!( - curp.cluster().addrs(follower_id), + curp.cluster().peer_urls(follower_id), Some(vec!["S1".to_owned()]) ); let changes = vec![ConfChange::update( diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index 4c57d2fb1..dcad1c241 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -106,11 +106,12 @@ impl CurpGroup { let snapshot_allocator = Self::get_snapshot_allocator_from_cfg(&config); let cluster_info = Arc::new(ClusterInfo::from_members_map( all_members_addrs.clone(), + [], &name, )); let listener = listeners.remove(&name).unwrap(); let id = cluster_info.self_id(); - let addr = cluster_info.self_addrs().pop().unwrap(); + let addr = cluster_info.self_peer_urls().pop().unwrap(); let (exe_tx, exe_rx) = mpsc::unbounded_channel(); let (as_tx, as_rx) = mpsc::unbounded_channel(); @@ -284,7 +285,7 @@ impl CurpGroup { }, ); let client = self.new_client().await; - client.propose_publish(id, name).await.unwrap(); + client.propose_publish(id, name, vec![]).await.unwrap(); } pub fn all_addrs(&self) -> impl Iterator { @@ -444,7 +445,7 @@ impl CurpGroup { let members = cluster_res_base .members .into_iter() - .map(|m| Member::new(m.id, m.name, m.peer_urls, m.is_learner)) + .map(|m| Member::new(m.id, m.name, m.peer_urls, m.client_urls, m.is_learner)) .collect(); let cluster_res = curp::rpc::FetchClusterResponse { leader_id: cluster_res_base.leader_id, diff --git a/crates/simulation/src/curp_group.rs b/crates/simulation/src/curp_group.rs index ccfe74016..27bfc2a48 100644 --- a/crates/simulation/src/curp_group.rs +++ b/crates/simulation/src/curp_group.rs @@ -65,21 +65,21 @@ impl CurpGroup { let handle = madsim::runtime::Handle::current(); let all: HashMap<_, _> = (0..n_nodes) - .map(|x| (format!("S{x}"), vec![format!("192.168.1.{}:12345", x + 1)])) + .map(|x| (format!("S{x}"), vec![format!("192.168.1.{}:2380", x + 1)])) .collect(); let mut all_members = HashMap::new(); let nodes = (0..n_nodes) .map(|i| { let name = format!("S{i}"); - let addr = format!("192.168.1.{}:12345", i + 1); + let peer_url = format!("192.168.1.{}:2380", i + 1); let storage_path = tempfile::tempdir().unwrap().into_path(); let (exe_tx, exe_rx) = mpsc::unbounded_channel(); let (as_tx, as_rx) = mpsc::unbounded_channel(); let store = Arc::new(Mutex::new(None)); - let cluster_info = Arc::new(ClusterInfo::from_members_map(all.clone(), &name)); + let cluster_info = Arc::new(ClusterInfo::from_members_map(all.clone(), [], &name)); all_members = cluster_info .all_members_addrs() .into_iter() @@ -122,7 +122,7 @@ impl CurpGroup { Rpc::run_from_addr( cluster_info, is_leader, - "0.0.0.0:12345".parse().unwrap(), + "0.0.0.0:2380".parse().unwrap(), ce, Box::new(MemorySnapshotAllocator), TestRoleChange { @@ -140,7 +140,7 @@ impl CurpGroup { id, CurpNode { id, - addr, + addr: peer_url, handle: node_handle, exe_rx, as_rx, diff --git a/crates/simulation/src/xline_group.rs b/crates/simulation/src/xline_group.rs index 372a5a3be..9943400dd 100644 --- a/crates/simulation/src/xline_group.rs +++ b/crates/simulation/src/xline_group.rs @@ -24,7 +24,8 @@ use xline_client::{ use xlineapi::{command::Command, ClusterClient, KvClient, RequestUnion, WatchClient}; pub struct XlineNode { - pub addr: String, + pub client_url: String, + pub peer_url: String, pub name: String, pub handle: NodeHandle, } @@ -40,14 +41,19 @@ impl XlineGroup { let handle = madsim::runtime::Handle::current(); let all: HashMap<_, _> = (0..size) - .map(|x| (format!("S{x}"), vec![format!("192.168.1.{}:12345", x + 1)])) + .map(|x| (format!("S{x}"), vec![format!("192.168.1.{}:2380", x + 1)])) .collect(); let nodes = (0..size) .map(|i| { let name = format!("S{i}"); - let addr = format!("192.168.1.{}:12345", i + 1); + let client_url = format!("192.168.1.{}:2379", i + 1); + let peer_url = format!("192.168.1.{}:2380", i + 1); let cluster_config = ClusterConfig::new( name.clone(), + vec!["0.0.0.0:2380".to_owned()], + vec![format!("192.168.1.{}:2380", i + 1)], + vec!["0.0.0.0:2379".to_owned()], + vec![format!("192.168.1.{}:2379", i + 1)], all.clone(), false, CurpConfig::default(), @@ -73,7 +79,10 @@ impl XlineGroup { .await .unwrap(); server - .start_from_single_addr("0.0.0.0:12345".parse().unwrap()) + .start_from_single_addr( + "0.0.0.0:2379".parse().unwrap(), + "0.0.0.0:2380".parse().unwrap(), + ) .await .unwrap() .await @@ -82,7 +91,15 @@ impl XlineGroup { } }) .build(); - (name.clone(), XlineNode { addr, name, handle }) + ( + name.clone(), + XlineNode { + client_url, + peer_url, + name, + handle, + }, + ) }) .collect(); let client_handle = handle @@ -101,7 +118,7 @@ impl XlineGroup { let all_members = self .nodes .values() - .map(|node| node.addr.clone()) + .map(|node| node.client_url.clone()) .collect_vec(); let client = self .client_handle diff --git a/crates/simulation/tests/it/xline.rs b/crates/simulation/tests/it/xline.rs index 9b9c3f847..6d3f62342 100644 --- a/crates/simulation/tests/it/xline.rs +++ b/crates/simulation/tests/it/xline.rs @@ -24,7 +24,7 @@ async fn basic_put() { async fn watch_compacted_revision_should_receive_canceled_response() { init_logger(); let group = XlineGroup::new(3).await; - let watch_addr = group.get_node("S2").addr.clone(); + let watch_addr = group.get_node("S2").client_url.clone(); let client = SimEtcdClient::new(watch_addr, group.client_handle.clone()).await; diff --git a/crates/xline-client/tests/it/common.rs b/crates/xline-client/tests/it/common.rs index 17bf0e39b..b817aee48 100644 --- a/crates/xline-client/tests/it/common.rs +++ b/crates/xline-client/tests/it/common.rs @@ -4,6 +4,6 @@ use xline_test_utils::Cluster; pub async fn get_cluster_client() -> Result<(Cluster, Client), XlineClientBuildError> { let mut cluster = Cluster::new(3).await; cluster.start().await; - let client = Client::connect(cluster.addrs(), ClientOptions::default()).await?; + let client = Client::connect(cluster.all_client_addrs(), ClientOptions::default()).await?; Ok((cluster, client)) } diff --git a/crates/xline-test-utils/src/bin/test_cluster.rs b/crates/xline-test-utils/src/bin/test_cluster.rs index 16a1ff58c..8a358be43 100644 --- a/crates/xline-test-utils/src/bin/test_cluster.rs +++ b/crates/xline-test-utils/src/bin/test_cluster.rs @@ -9,7 +9,7 @@ async fn main() { println!("cluster running"); - for (id, addr) in cluster.all_members_map() { + for (id, addr) in cluster.all_members_client_urls_map() { println!("server id: {} addr: {}", id, addr); } diff --git a/crates/xline-test-utils/src/lib.rs b/crates/xline-test-utils/src/lib.rs index 1d94a27e8..52a3517d5 100644 --- a/crates/xline-test-utils/src/lib.rs +++ b/crates/xline-test-utils/src/lib.rs @@ -16,10 +16,12 @@ pub use xline_client::{types, Client, ClientOptions}; /// Cluster pub struct Cluster { - /// listeners of members - listeners: Vec, + /// client and peer listeners of members + listeners: Vec<(TcpListener, TcpListener)>, /// address of members - all_members: Vec, + all_members_peer_urls: Vec, + /// address of members + all_members_client_urls: Vec, /// Server configs configs: Vec, /// Xline servers @@ -52,15 +54,23 @@ impl Cluster { let size = configs.len(); let mut listeners = Vec::new(); for _i in 0..size { - listeners.push(TcpListener::bind("0.0.0.0:0").await.unwrap()); + listeners.push(( + TcpListener::bind("0.0.0.0:0").await.unwrap(), + TcpListener::bind("0.0.0.0:0").await.unwrap(), + )); } - let all_members = listeners + let all_members_client_urls = listeners + .iter() + .map(|l| l.0.local_addr().unwrap().to_string()) + .collect(); + let all_members_peer_urls = listeners .iter() - .map(|l| l.local_addr().unwrap().to_string()) + .map(|l| l.1.local_addr().unwrap().to_string()) .collect(); Self { listeners, - all_members, + all_members_peer_urls, + all_members_client_urls, configs, servers: Vec::new(), client: None, @@ -71,12 +81,15 @@ impl Cluster { pub async fn start(&mut self) { for (i, config) in self.configs.iter().enumerate() { let name = format!("server{}", i); - let listener = self.listeners.remove(0); - + let (xline_listener, curp_listener) = self.listeners.remove(0); + let self_client_url = xline_listener.local_addr().unwrap().to_string(); + let self_peer_url = curp_listener.local_addr().unwrap().to_string(); let config = Self::merge_config( config, name, - self.all_members + self_client_url, + self_peer_url, + self.all_members_peer_urls .clone() .into_iter() .enumerate() @@ -96,7 +109,9 @@ impl Cluster { .await .unwrap(); - let result = server.start_from_listener(listener).await; + let result = server + .start_from_listener(xline_listener, curp_listener) + .await; if let Err(e) = result { panic!("Server start error: {e}"); } @@ -105,23 +120,27 @@ impl Cluster { time::sleep(Duration::from_millis(300)).await; } - pub async fn run_node(&mut self, listener: TcpListener) { + pub async fn run_node(&mut self, xline_listener: TcpListener, curp_listener: TcpListener) { let config = XlineServerConfig::default(); - self.run_node_with_config(listener, config).await; + self.run_node_with_config(xline_listener, curp_listener, config) + .await; } pub async fn run_node_with_config( &mut self, - listener: TcpListener, + xline_listener: TcpListener, + curp_listener: TcpListener, base_config: XlineServerConfig, ) { - let idx = self.all_members.len(); + let idx = self.all_members_peer_urls.len(); let name = format!("server{}", idx); - let self_addr = listener.local_addr().unwrap().to_string(); - self.all_members.push(self_addr.clone()); + let self_client_url = xline_listener.local_addr().unwrap().to_string(); + let self_peer_url = curp_listener.local_addr().unwrap().to_string(); + self.all_members_client_urls.push(self_client_url.clone()); + self.all_members_peer_urls.push(self_peer_url.clone()); let members = self - .all_members + .all_members_peer_urls .clone() .into_iter() .enumerate() @@ -133,6 +152,8 @@ impl Cluster { let config = Self::merge_config( base_config, name, + self_client_url, + self_peer_url, members, false, InitialClusterState::Existing, @@ -147,7 +168,9 @@ impl Cluster { ) .await .unwrap(); - let result = server.start_from_listener(listener).await; + let result = server + .start_from_listener(xline_listener, curp_listener) + .await; if let Err(e) = result { panic!("Server start error: {e}"); } @@ -156,26 +179,33 @@ impl Cluster { /// Create or get the client with the specified index pub async fn client(&mut self) -> &mut Client { if self.client.is_none() { - let client = Client::connect(self.all_members.clone(), ClientOptions::default()) - .await - .unwrap_or_else(|e| { - panic!("Client connect error: {:?}", e); - }); + let client = Client::connect( + self.all_members_client_urls.clone(), + ClientOptions::default(), + ) + .await + .unwrap_or_else(|e| { + panic!("Client connect error: {:?}", e); + }); self.client = Some(client); } self.client.as_mut().unwrap() } - pub fn all_members_map(&self) -> HashMap { - self.all_members.iter().cloned().enumerate().collect() + pub fn all_members_client_urls_map(&self) -> HashMap { + self.all_members_client_urls + .iter() + .cloned() + .enumerate() + .collect() } - pub fn get_addr(&self, idx: usize) -> String { - self.all_members[idx].clone() + pub fn get_client_urls(&self, idx: usize) -> String { + self.all_members_client_urls[idx].clone() } - pub fn addrs(&self) -> Vec { - self.all_members.clone() + pub fn all_client_addrs(&self) -> Vec { + self.all_members_client_urls.clone() } pub fn default_config_with_quota_and_rocks_path( @@ -210,6 +240,8 @@ impl Cluster { fn merge_config( base_config: &XlineServerConfig, name: String, + client_url: String, + peer_url: String, members: HashMap>, is_leader: bool, initial_cluster_state: InitialClusterState, @@ -217,10 +249,10 @@ impl Cluster { let old_cluster = base_config.cluster(); let new_cluster = ClusterConfig::new( name, - vec![], - vec![], - vec![], - vec![], // TODO + vec![peer_url.clone()], + vec![peer_url], + vec![client_url.clone()], + vec![client_url], members, is_leader, old_cluster.curp_config().clone(), diff --git a/crates/xline/src/server/lease_server.rs b/crates/xline/src/server/lease_server.rs index 20a4ec2dc..367280c57 100644 --- a/crates/xline/src/server/lease_server.rs +++ b/crates/xline/src/server/lease_server.rs @@ -335,7 +335,7 @@ where // a follower when it lost the election. Therefore we need to double check here. // We can directly invoke leader_keep_alive when a candidate becomes a leader. if !self.lease_storage.is_primary() { - let leader_addrs = self.cluster_info.addrs(leader_id).unwrap_or_else(|| { + let leader_addrs = self.cluster_info.client_urls(leader_id).unwrap_or_else(|| { unreachable!( "The address of leader {} not found in all_members {:?}", leader_id, self.cluster_info @@ -379,7 +379,7 @@ where return Ok(tonic::Response::new(res)); } let leader_id = self.client.fetch_leader_id(false).await?; - let leader_addrs = self.cluster_info.addrs(leader_id).unwrap_or_else(|| { + let leader_addrs = self.cluster_info.client_urls(leader_id).unwrap_or_else(|| { unreachable!( "The address of leader {} not found in all_members {:?}", leader_id, self.cluster_info diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index 2545b5253..5c9bbb681 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -1,8 +1,4 @@ -use std::{ - net::{SocketAddr, ToSocketAddrs}, - sync::Arc, - time::Duration, -}; +use std::{net::ToSocketAddrs, sync::Arc, time::Duration}; use anyhow::{anyhow, Result}; use clippy_utilities::{Cast, OverflowArithmetic}; @@ -169,6 +165,7 @@ impl XlineServer { info!("server_addr = {server_addr:?}"); info!("cluster_peers = {:?}", cluster_config.members()); + let self_client_addr = cluster_config.client_advertise_urls().clone(); match ( curp_storage.recover_cluster_info()?, *cluster_config.initial_cluster_state(), @@ -181,6 +178,7 @@ impl XlineServer { info!("get cluster_info by args"); let cluster_info = ClusterInfo::from_members_map( cluster_config.members().clone(), + self_client_addr, cluster_config.name(), ); curp_storage.put_cluster_info(&cluster_info)?; @@ -191,6 +189,7 @@ impl XlineServer { let cluster_info = get_cluster_info_from_remote( &ClusterInfo::from_members_map( cluster_config.members().clone(), + self_client_addr, cluster_config.name(), ), server_addr_str, @@ -311,7 +310,7 @@ impl XlineServer { ) } - /// Init router + /// Init xline and curp router /// /// # Errors /// @@ -321,7 +320,7 @@ impl XlineServer { &self, persistent: Arc, key_pair: Option<(EncodingKey, DecodingKey)>, - ) -> Result<(Router, Arc)> { + ) -> Result<(Router, Router, Arc)> { let ( kv_server, lock_server, @@ -339,7 +338,8 @@ impl XlineServer { if let Some(ref cfg) = self.server_tls_config { builder = builder.tls_config(cfg.clone())?; } - let router = builder + let xline_router = builder + .clone() .add_service(RpcLockServer::new(lock_server)) .add_service(RpcKvServer::new(kv_server)) .add_service(RpcLeaseServer::from_arc(lease_server)) @@ -347,19 +347,19 @@ impl XlineServer { .add_service(RpcWatchServer::new(watch_server)) .add_service(RpcMaintenanceServer::new(maintenance_server)) .add_service(RpcClusterServer::new(cluster_server)) - .add_service(ProtocolServer::new(auth_wrapper)) - // TODO: run origin curp server in a separate port - // .add_service(ProtocolServer::new(curp_server.clone())) + .add_service(ProtocolServer::new(auth_wrapper)); + let curp_router = builder + .add_service(ProtocolServer::new(curp_server.clone())) .add_service(InnerProtocolServer::new(curp_server)); #[cfg(not(madsim))] - let router = { + let xline_router = { let (mut reporter, health_server) = tonic_health::server::health_reporter(); reporter .set_service_status("", tonic_health::ServingStatus::Serving) .await; - router.add_service(health_server) + xline_router.add_service(health_server) }; - Ok((router, curp_client)) + Ok((xline_router, curp_router, curp_client)) } /// Start `XlineServer` @@ -371,15 +371,24 @@ impl XlineServer { #[cfg(madsim)] pub async fn start_from_single_addr( &self, - addr: SocketAddr, + xline_addr: std::net::SocketAddr, + curp_addr: std::net::SocketAddr, ) -> Result>> { - let n = self + let n1 = self .task_manager .get_shutdown_listener(TaskName::TonicServer); + let n2 = n1.clone(); let persistent = DB::open(&self.storage_config.engine)?; let key_pair = Self::read_key_pair(&self.auth_config).await?; - let (router, curp_client) = self.init_router(persistent, key_pair).await?; - let handle = tokio::spawn(async move { router.serve_with_shutdown(addr, n.wait()).await }); + let (xline_router, curp_router, curp_client) = + self.init_router(persistent, key_pair).await?; + let handle = tokio::spawn(async move { + tokio::select! { + _ = xline_router.serve_with_shutdown(xline_addr, n1.wait()) => {}, + _ = curp_router.serve_with_shutdown(curp_addr, n2.wait()) => {}, + } + Ok(()) + }); if let Err(e) = self.publish(curp_client).await { warn!("publish name to cluster failed: {:?}", e); }; @@ -388,21 +397,25 @@ impl XlineServer { /// inner start method shared by `start` and `start_from_listener` #[cfg(not(madsim))] - async fn start_inner(&self, incoming: I) -> Result<()> + async fn start_inner(&self, xline_incoming: I1, curp_incoming: I2) -> Result<()> where - I: Stream> + Send + 'static, + I1: Stream> + Send + 'static, + I2: Stream> + Send + 'static, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into> + Send, { let persistent = DB::open(&self.storage_config.engine)?; let key_pair = Self::read_key_pair(&self.auth_config).await?; - let (router, curp_client) = self.init_router(persistent, key_pair).await?; + let (xline_router, curp_router, curp_client) = + self.init_router(persistent, key_pair).await?; self.task_manager - .spawn(TaskName::TonicServer, |n| async move { - let _ig = router - .serve_with_incoming_shutdown(incoming, n.wait()) - .await; + .spawn(TaskName::TonicServer, |n1| async move { + let n2 = n1.clone(); + tokio::select! { + _ = xline_router.serve_with_incoming_shutdown(xline_incoming, n1.wait()) => {}, + _ = curp_router.serve_with_incoming_shutdown(curp_incoming, n2.wait()) => {}, + } }); if let Err(e) = self.publish(curp_client).await { warn!("publish name to cluster failed: {e:?}"); @@ -418,30 +431,12 @@ impl XlineServer { #[inline] #[cfg(not(madsim))] pub async fn start(&self) -> Result<()> { - let server_addr_iter = self - .cluster_config - .members() - .get(self.cluster_config.name()) - .ok_or_else(|| { - anyhow!( - "node name {} not found in cluster peers", - self.cluster_config.name() - ) - })? - .iter() - .map(|addr| { - let address = match addr.split_once("://") { - Some((_, address)) => address, - None => addr, - }; - address.to_socket_addrs() - }) - .collect::, _>>()? - .into_iter() - .flatten(); - let incoming = bind_addrs(server_addr_iter)?; + let client_listen_urls = self.cluster_config.client_listen_urls(); + let peer_listen_urls = self.cluster_config.peer_listen_urls(); + let xline_incoming = bind_addrs(client_listen_urls)?; + let curp_incoming = bind_addrs(peer_listen_urls)?; - self.start_inner(incoming).await + self.start_inner(xline_incoming, curp_incoming).await } /// Start `XlineServer` from listeners @@ -451,9 +446,14 @@ impl XlineServer { /// Will return `Err` when `tonic::Server` serve return an error #[inline] #[cfg(not(madsim))] - pub async fn start_from_listener(&self, xline_listener: tokio::net::TcpListener) -> Result<()> { - let incoming = tokio_stream::wrappers::TcpListenerStream::new(xline_listener); - self.start_inner(incoming).await + pub async fn start_from_listener( + &self, + xline_listener: tokio::net::TcpListener, + curp_listener: tokio::net::TcpListener, + ) -> Result<()> { + let xline_incoming = tokio_stream::wrappers::TcpListenerStream::new(xline_listener); + let curp_incoming = tokio_stream::wrappers::TcpListenerStream::new(curp_listener); + self.start_inner(xline_incoming, curp_incoming).await } /// Init `KvServer`, `LockServer`, `LeaseServer`, `WatchServer` and `CurpServer` @@ -583,7 +583,7 @@ impl XlineServer { LockServer::new( Arc::clone(&client), Arc::clone(&id_gen), - &self.cluster_info.self_addrs(), + &self.cluster_info.self_peer_urls(), self.client_tls_config.as_ref(), ), LeaseServer::new( @@ -622,7 +622,11 @@ impl XlineServer { /// Publish the name of current node to cluster async fn publish(&self, curp_client: Arc) -> Result<(), tonic::Status> { curp_client - .propose_publish(self.cluster_info.self_id(), self.cluster_info.self_name()) + .propose_publish( + self.cluster_info.self_id(), + self.cluster_info.self_name(), + self.cluster_info.self_client_urls(), + ) .await } @@ -684,10 +688,21 @@ impl XlineServer { /// Bind multiple addresses #[cfg(not(madsim))] -fn bind_addrs>( - addrs: T, +fn bind_addrs( + addrs: &[String], ) -> Result>> { let incoming = addrs + .iter() + .map(|addr| { + let address = match addr.split_once("://") { + Some((_, address)) => address, + None => addr, + }; + address.to_socket_addrs() + }) + .collect::, _>>()? + .into_iter() + .flatten() .map(|addr| { tonic::transport::server::TcpIncoming::new(addr, true, None) .map_err(|e| anyhow::anyhow!("Failed to bind to {}, err: {e}", addr)) diff --git a/crates/xline/tests/it/auth_test.rs b/crates/xline/tests/it/auth_test.rs index 79bc74a5f..27804a088 100644 --- a/crates/xline/tests/it/auth_test.rs +++ b/crates/xline/tests/it/auth_test.rs @@ -54,7 +54,7 @@ async fn test_auth_token_with_disable() -> Result<(), Box> { enable_auth(client).await?; let authed_client = Client::connect( - vec![cluster.get_addr(0)], + vec![cluster.get_client_urls(0)], ClientOptions::default().with_user("root", "123"), ) .await?; @@ -117,13 +117,13 @@ async fn test_kv_authorization() -> Result<(), Box> { enable_auth(client).await?; let u1_client = Client::connect( - vec![cluster.get_addr(0)], + vec![cluster.get_client_urls(0)], ClientOptions::default().with_user("u1", "123"), ) .await? .kv_client(); let u2_client = Client::connect( - vec![cluster.get_addr(0)], + vec![cluster.get_client_urls(0)], ClientOptions::default().with_user("u2", "123"), ) .await? @@ -175,13 +175,13 @@ async fn test_no_root_user_do_admin_ops() -> Result<(), Box> { set_user(client, "u", "123", "r", &[], &[]).await?; enable_auth(client).await?; let user_client = Client::connect( - vec![cluster.get_addr(0)], + vec![cluster.get_client_urls(0)], ClientOptions::default().with_user("u", "123"), ) .await? .auth_client(); let root_client = Client::connect( - vec![cluster.get_addr(0)], + vec![cluster.get_client_urls(0)], ClientOptions::default().with_user("root", "123"), ) .await? @@ -212,14 +212,14 @@ async fn test_auth_wrong_password() -> Result<(), Box> { enable_auth(client).await?; let result = Client::connect( - vec![cluster.get_addr(0)], + vec![cluster.get_client_urls(0)], ClientOptions::default().with_user("root", "456"), ) .await; assert!(result.is_err()); let result = Client::connect( - vec![cluster.get_addr(0)], + vec![cluster.get_client_urls(0)], ClientOptions::default().with_user("root", "123"), ) .await; diff --git a/crates/xline/tests/it/cluster_test.rs b/crates/xline/tests/it/cluster_test.rs index 38a42cce1..22100bf42 100644 --- a/crates/xline/tests/it/cluster_test.rs +++ b/crates/xline/tests/it/cluster_test.rs @@ -16,7 +16,7 @@ use xline_test_utils::Cluster; async fn xline_remove_node() -> Result<(), Box> { let mut cluster = Cluster::new(5).await; cluster.start().await; - let mut cluster_client = Client::connect(cluster.addrs(), ClientOptions::default()) + let mut cluster_client = Client::connect(cluster.all_client_addrs(), ClientOptions::default()) .await? .cluster_client(); let list_res = cluster_client @@ -36,17 +36,21 @@ async fn xline_remove_node() -> Result<(), Box> { async fn xline_add_node() -> Result<(), Box> { let mut cluster = Cluster::new(3).await; cluster.start().await; - let client = Client::connect(cluster.addrs(), ClientOptions::default()).await?; + let client = Client::connect(cluster.all_client_addrs(), ClientOptions::default()).await?; let mut cluster_client = client.cluster_client(); let kv_client = client.kv_client(); _ = kv_client.put(PutRequest::new("key", "value")).await?; - let new_node_listener = TcpListener::bind("0.0.0.0:0").await?; - let new_node_addrs = vec![new_node_listener.local_addr()?.to_string()]; - let add_req = MemberAddRequest::new(new_node_addrs.clone(), false); + let new_node_peer_listener = TcpListener::bind("0.0.0.0:0").await?; + let new_node_peer_urls = vec![new_node_peer_listener.local_addr()?.to_string()]; + let new_node_client_listener = TcpListener::bind("0.0.0.0:0").await?; + let new_node_client_urls = vec![new_node_client_listener.local_addr()?.to_string()]; + let add_req = MemberAddRequest::new(new_node_peer_urls.clone(), false); let add_res = cluster_client.member_add(add_req).await?; assert_eq!(add_res.members.len(), 4); - cluster.run_node(new_node_listener).await; - let mut etcd_client = etcd_client::Client::connect(&new_node_addrs, None).await?; + cluster + .run_node(new_node_client_listener, new_node_peer_listener) + .await; + let mut etcd_client = etcd_client::Client::connect(&new_node_client_urls, None).await?; let res = etcd_client.get("key", None).await?; assert_eq!(res.kvs().get(0).unwrap().value(), b"value"); Ok(()) diff --git a/crates/xline/tests/it/kv_test.rs b/crates/xline/tests/it/kv_test.rs index 3a636e286..0cc79a598 100644 --- a/crates/xline/tests/it/kv_test.rs +++ b/crates/xline/tests/it/kv_test.rs @@ -171,7 +171,7 @@ async fn test_range_redirect() -> Result<(), Box> { let mut cluster = Cluster::new(3).await; cluster.start().await; - let addr = cluster.get_addr(1); + let addr = cluster.get_client_urls(1); let kv_client = Client::connect([addr], ClientOptions::default()) .await? .kv_client(); diff --git a/crates/xline/tests/it/lease_test.rs b/crates/xline/tests/it/lease_test.rs index 2286596fb..0dd85ab93 100644 --- a/crates/xline/tests/it/lease_test.rs +++ b/crates/xline/tests/it/lease_test.rs @@ -45,7 +45,7 @@ async fn test_lease_expired() -> Result<(), Box> { async fn test_lease_keep_alive() -> Result<(), Box> { let mut cluster = Cluster::new(3).await; cluster.start().await; - let non_leader_ep = cluster.get_addr(1); + let non_leader_ep = cluster.get_client_urls(1); let client = cluster.client().await; let res = client diff --git a/crates/xline/tests/it/maintenance_test.rs b/crates/xline/tests/it/maintenance_test.rs index 80a3eae9f..7edce626b 100644 --- a/crates/xline/tests/it/maintenance_test.rs +++ b/crates/xline/tests/it/maintenance_test.rs @@ -30,7 +30,7 @@ async fn test_snapshot_and_restore() -> Result<(), Box> { let _ignore = client.put(PutRequest::new("key", "value")).await?; tokio::time::sleep(Duration::from_millis(100)).await; // TODO: use `propose_index` and remove this sleep after we finished our client. let mut maintenance_client = - Client::connect(vec![cluster.get_addr(0)], ClientOptions::default()) + Client::connect(vec![cluster.get_client_urls(0)], ClientOptions::default()) .await? .maintenance_client(); let mut stream = maintenance_client.snapshot().await?;