diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index b3c447c54..494bb15d7 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -164,6 +164,8 @@ pub struct ClientBuilder { cluster_version: Option, /// initial cluster members all_members: Option>>, + /// is current client send request to raw curp server + is_raw_curp: bool, /// initial leader state leader_state: Option<(ServerId, u64)>, /// client configuration @@ -188,8 +190,9 @@ impl ClientBuilder { /// Create a client builder #[inline] #[must_use] - pub fn new(config: ClientConfig) -> Self { + pub fn new(config: ClientConfig, is_raw_curp: bool) -> Self { Self { + is_raw_curp, config, ..ClientBuilder::default() } @@ -278,7 +281,11 @@ impl ClientBuilder { if let Some(id) = r.leader_id { self.leader_state = Some((id, r.term)); } - self.all_members = Some(r.into_members_addrs()); + self.all_members = if self.is_raw_curp { + Some(r.into_peer_urls()) + } else { + Some(r.into_client_urls()) + }; return Ok(self); } Err(e) => err = e, @@ -301,6 +308,7 @@ impl ClientBuilder { if let Some((id, term)) = self.leader_state { builder.set_leader_state(id, term); } + builder.set_is_raw_curp(self.is_raw_curp); builder } diff --git a/crates/curp/src/client/state.rs b/crates/curp/src/client/state.rs index 838b65091..f109e6d62 100644 --- a/crates/curp/src/client/state.rs +++ b/crates/curp/src/client/state.rs @@ -34,6 +34,8 @@ pub(super) struct State { /// Immutable client state, could be cloned #[derive(Debug, Clone)] struct StateStatic { + /// is current client send request to raw curp server + is_raw_curp: bool, /// Local server id, should be initialized on startup local_server: Option, /// Notifier of leader update @@ -87,6 +89,7 @@ impl State { local_server, leader_notifier: Arc::new(Event::new()), tls_config, + is_raw_curp: true, }, }) } @@ -215,8 +218,11 @@ impl State { info!("client cluster version updated to {}", res.cluster_version); state.cluster_version = res.cluster_version; - let mut new_members = res.clone().into_members_addrs(); - + let mut new_members = if self.immutable.is_raw_curp { + res.clone().into_peer_urls() + } else { + res.clone().into_client_urls() + }; let old_ids = state.connects.keys().copied().collect::>(); let new_ids = new_members.keys().copied().collect::>(); @@ -262,6 +268,8 @@ pub(super) struct StateBuilder { cluster_version: Option, /// Client Tls config tls_config: Option, + /// is current client send request to raw curp server + is_raw_curp: bool, } impl StateBuilder { @@ -275,9 +283,15 @@ impl StateBuilder { leader_state: None, cluster_version: None, tls_config, + is_raw_curp: false, } } + /// Set is raw curp + pub(super) fn set_is_raw_curp(&mut self, is_raw_curp: bool) { + self.is_raw_curp = is_raw_curp; + } + /// Set the leader state (optional) pub(super) fn set_leader_state(&mut self, id: ServerId, term: u64) { self.leader_state = Some((id, term)); @@ -317,6 +331,7 @@ impl StateBuilder { local_server: Some(local_server_id), leader_notifier: Arc::new(Event::new()), tls_config: self.tls_config.take(), + is_raw_curp: self.is_raw_curp, }, }) } @@ -338,6 +353,7 @@ impl StateBuilder { local_server: None, leader_notifier: Arc::new(Event::new()), tls_config: self.tls_config, + is_raw_curp: self.is_raw_curp, }, }) } diff --git a/crates/curp/src/client/tests.rs b/crates/curp/src/client/tests.rs index 29b05b4e1..50f64d7c9 100644 --- a/crates/curp/src/client/tests.rs +++ b/crates/curp/src/client/tests.rs @@ -92,7 +92,7 @@ async fn test_unary_fetch_clusters_serializable() { let unary = init_unary_client(connects, None, None, 0, 0, None); let res = unary.fetch_cluster(false).await.unwrap(); assert_eq!( - res.into_members_addrs(), + res.into_peer_urls(), HashMap::from([ (0, vec!["A0".to_owned()]), (1, vec!["A1".to_owned()]), @@ -182,7 +182,7 @@ async fn test_unary_fetch_clusters_linearizable() { let unary = init_unary_client(connects, None, None, 0, 0, None); let res = unary.fetch_cluster(true).await.unwrap(); assert_eq!( - res.into_members_addrs(), + res.into_peer_urls(), HashMap::from([ (0, vec!["A0".to_owned()]), (1, vec!["A1".to_owned()]), diff --git a/crates/curp/src/members.rs b/crates/curp/src/members.rs index e2eaa236c..43c600ab9 100644 --- a/crates/curp/src/members.rs +++ b/crates/curp/src/members.rs @@ -137,7 +137,7 @@ impl ClusterInfo { #[must_use] pub fn from_cluster( cluster: FetchClusterResponse, - self_addr: &[String], + self_peer_urls: &[String], self_name: &str, ) -> Self { let mut member_id = 0; @@ -145,7 +145,7 @@ impl ClusterInfo { .members .into_iter() .map(|mut member| { - if member.peer_urls() == self_addr { + if member.peer_urls() == self_peer_urls { member_id = member.id; member.name = self_name.to_owned(); } @@ -328,7 +328,7 @@ impl ClusterInfo { /// cluster version decrease pub(crate) fn cluster_version_update(&self) { let mut hasher = std::collections::hash_map::DefaultHasher::new(); - self.all_members_addrs() + self.all_members_peer_urls() .into_iter() .sorted() .for_each(|(id, mut addrs)| { @@ -355,7 +355,7 @@ impl ClusterInfo { /// Get all members #[must_use] #[inline] - pub fn all_members_addrs(&self) -> HashMap> { + pub fn all_members_peer_urls(&self) -> HashMap> { self.members .iter() .map(|t| (t.id, t.peer_urls.clone())) @@ -417,7 +417,7 @@ impl ClusterInfo { #[inline] pub async fn get_cluster_info_from_remote( init_cluster_info: &ClusterInfo, - self_addr: &[String], + self_peer_urls: &[String], self_name: &str, timeout: Duration, tls_config: Option<&ClientTlsConfig>, @@ -444,7 +444,7 @@ pub async fn get_cluster_info_from_remote( debug!("get cluster info from remote success: {:?}", cluster_res); return Some(ClusterInfo::from_cluster( cluster_res.into_inner(), - self_addr, + self_peer_urls, self_name, )); } diff --git a/crates/curp/src/rpc/mod.rs b/crates/curp/src/rpc/mod.rs index 5b24e7e6e..bcfec2acd 100644 --- a/crates/curp/src/rpc/mod.rs +++ b/crates/curp/src/rpc/mod.rs @@ -118,11 +118,19 @@ impl FetchClusterResponse { } } - /// Get all members addresses - pub(crate) fn into_members_addrs(self) -> HashMap> { + /// Get all members peer urls + pub(crate) fn into_peer_urls(self) -> HashMap> { self.members .into_iter() - .map(|member| (member.id, member.peer_urls)) // TODO + .map(|member| (member.id, member.peer_urls)) + .collect() + } + + /// Get all members peer urls + pub(crate) fn into_client_urls(self) -> HashMap> { + self.members + .into_iter() + .map(|member| (member.id, member.client_urls)) .collect() } } diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index dcad1c241..6cf8bb0c8 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -34,7 +34,7 @@ use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, ServerTlsConfig}; use tracing::debug; use utils::{ - build_endpoint, certs, + build_endpoint, config::{ default_quota, ClientConfig, CurpConfig, CurpConfigBuilder, EngineConfig, StorageConfig, }, @@ -305,7 +305,7 @@ impl CurpGroup { pub async fn new_client(&self) -> impl ClientApi { let addrs = self.all_addrs().cloned().collect(); - ClientBuilder::new(ClientConfig::default()) + ClientBuilder::new(ClientConfig::default(), true) .discover_from(addrs) .await .unwrap() diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index 8211df7f9..fe9fa1243 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -406,7 +406,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster_when_client_has_wrong_leader() let leader_id = group.get_leader().await.0; let follower_id = *group.nodes.keys().find(|&id| &leader_id != id).unwrap(); // build a client and set a wrong leader id - let client = ClientBuilder::new(ClientConfig::default()) + let client = ClientBuilder::new(ClientConfig::default(), true) .leader_state(follower_id, 0) .all_members(group.all_addrs_map()) .build::() @@ -427,7 +427,7 @@ async fn propose_conf_change_to_follower() { let leader_id = group.get_leader().await.0; let follower_id = *group.nodes.keys().find(|&id| &leader_id != id).unwrap(); // build a client and set a wrong leader id - let client = ClientBuilder::new(ClientConfig::default()) + let client = ClientBuilder::new(ClientConfig::default(), true) .leader_state(follower_id, 0) .all_members(group.all_addrs_map()) .build::() diff --git a/crates/simulation/src/curp_group.rs b/crates/simulation/src/curp_group.rs index 27bfc2a48..204928988 100644 --- a/crates/simulation/src/curp_group.rs +++ b/crates/simulation/src/curp_group.rs @@ -81,7 +81,7 @@ impl CurpGroup { let cluster_info = Arc::new(ClusterInfo::from_members_map(all.clone(), [], &name)); all_members = cluster_info - .all_members_addrs() + .all_members_peer_urls() .into_iter() .map(|(k, mut v)| (k, v.pop().unwrap())) .collect(); @@ -179,7 +179,7 @@ impl CurpGroup { .collect(); SimClient { inner: Arc::new( - ClientBuilder::new(config) + ClientBuilder::new(config, true) .all_members(all_members) .build() .await diff --git a/crates/simulation/tests/it/xline.rs b/crates/simulation/tests/it/xline.rs index 6d3f62342..93d1db115 100644 --- a/crates/simulation/tests/it/xline.rs +++ b/crates/simulation/tests/it/xline.rs @@ -53,7 +53,7 @@ async fn xline_members_restore() { init_logger(); let mut group = XlineGroup::new(3).await; let node = group.get_node("S1"); - let addr = node.addr.clone(); + let addr = node.client_url.clone(); let client = SimEtcdClient::new(addr, group.client_handle.clone()).await; let res = client diff --git a/crates/utils/src/config.rs b/crates/utils/src/config.rs index 686aa5d45..3b197cb77 100644 --- a/crates/utils/src/config.rs +++ b/crates/utils/src/config.rs @@ -99,7 +99,7 @@ pub struct ClusterConfig { client_advertise_urls: Vec, /// All the nodes in the xline cluster #[getset(get = "pub")] - members: HashMap>, + peers: HashMap>, /// Leader node. #[getset(get = "pub")] is_leader: bool, @@ -126,11 +126,11 @@ impl Default for ClusterConfig { fn default() -> Self { Self { name: "default".to_owned(), - peer_listen_urls: vec!["http://127.0.0.1:2379".to_owned()], - peer_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()], + peer_listen_urls: vec!["http://127.0.0.1:2380".to_owned()], + peer_advertise_urls: vec!["http://127.0.0.1:2380".to_owned()], client_listen_urls: vec!["http://127.0.0.1:2379".to_owned()], client_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()], - members: HashMap::from([( + peers: HashMap::from([( "default".to_owned(), vec!["http://127.0.0.1:2379".to_owned()], )]), @@ -196,7 +196,7 @@ impl ClusterConfig { peer_advertise_urls, client_listen_urls, client_advertise_urls, - members, + peers: members, is_leader, curp_config: curp, client_config, @@ -1188,7 +1188,7 @@ mod tests { sync_victims_interval = '20ms' watch_progress_notify_interval = '1s' - [cluster.members] + [cluster.peers] node1 = ['127.0.0.1:2378', '127.0.0.1:2379'] node2 = ['127.0.0.1:2380'] node3 = ['127.0.0.1:2381'] @@ -1368,7 +1368,7 @@ mod tests { client_listen_urls = ['127.0.0.1:2379'] client_advertise_urls = ['127.0.0.1:2379'] - [cluster.members] + [cluster.peers] node1 = ['127.0.0.1:2379'] node2 = ['127.0.0.1:2380'] node3 = ['127.0.0.1:2381'] @@ -1457,7 +1457,7 @@ mod tests { client_listen_urls = ['127.0.0.1:2379'] client_advertise_urls = ['127.0.0.1:2379'] - [cluster.members] + [cluster.peers] node1 = ['127.0.0.1:2379'] node2 = ['127.0.0.1:2380'] node3 = ['127.0.0.1:2381'] diff --git a/crates/xline-client/src/lib.rs b/crates/xline-client/src/lib.rs index 98554aeed..e1bf11c5b 100644 --- a/crates/xline-client/src/lib.rs +++ b/crates/xline-client/src/lib.rs @@ -233,7 +233,7 @@ impl Client { .collect(); let channel = Self::build_channel(addrs.clone(), options.tls_config.as_ref()).await?; let curp_client = Arc::new( - CurpClientBuilder::new(options.client_config) + CurpClientBuilder::new(options.client_config, false) .discover_from(addrs) .await? .build::() diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index 5c9bbb681..015b49961 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -1,4 +1,4 @@ -use std::{net::ToSocketAddrs, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use anyhow::{anyhow, Result}; use clippy_utilities::{Cast, OverflowArithmetic}; @@ -12,7 +12,6 @@ use dashmap::DashMap; use engine::{MemorySnapshotAllocator, RocksSnapshotAllocator, SnapshotAllocator}; #[cfg(not(madsim))] use futures::Stream; -use itertools::Itertools; use jsonwebtoken::{DecodingKey, EncodingKey}; #[cfg(not(madsim))] use tokio::io::{AsyncRead, AsyncWrite}; @@ -139,33 +138,13 @@ impl XlineServer { curp_storage: &CurpDB, tls_config: Option<&ClientTlsConfig>, ) -> Result { - let server_addr_str = cluster_config - .members() - .get(cluster_config.name()) - .ok_or_else(|| { - anyhow!( - "node name {} not found in cluster peers", - cluster_config.name() - ) - })?; - let server_addr = server_addr_str - .iter() - .map(|addr| { - let address = match addr.split_once("://") { - Some((_, address)) => address, - None => addr, - }; - address.to_socket_addrs() - }) - .collect::, _>>()? - .into_iter() - .flatten() - .collect_vec(); info!("name = {:?}", cluster_config.name()); - info!("server_addr = {server_addr:?}"); - info!("cluster_peers = {:?}", cluster_config.members()); + info!("cluster_peers = {:?}", cluster_config.peers()); - let self_client_addr = cluster_config.client_advertise_urls().clone(); + let name = cluster_config.name().clone(); + let all_members = cluster_config.peers().clone(); + let self_client_urls = cluster_config.client_advertise_urls().clone(); + let self_peer_urls = cluster_config.peer_advertise_urls().clone(); match ( curp_storage.recover_cluster_info()?, *cluster_config.initial_cluster_state(), @@ -176,23 +155,16 @@ impl XlineServer { } (None, InitialClusterState::New) => { info!("get cluster_info by args"); - let cluster_info = ClusterInfo::from_members_map( - cluster_config.members().clone(), - self_client_addr, - cluster_config.name(), - ); + let cluster_info = + ClusterInfo::from_members_map(all_members, self_client_urls, &name); curp_storage.put_cluster_info(&cluster_info)?; Ok(cluster_info) } (None, InitialClusterState::Existing) => { info!("get cluster_info from remote"); let cluster_info = get_cluster_info_from_remote( - &ClusterInfo::from_members_map( - cluster_config.members().clone(), - self_client_addr, - cluster_config.name(), - ), - server_addr_str, + &ClusterInfo::from_members_map(all_members, self_client_urls, &name), + &self_peer_urls, cluster_config.name(), *cluster_config.client_config().wait_synced_timeout(), tls_config, @@ -435,7 +407,8 @@ impl XlineServer { let peer_listen_urls = self.cluster_config.peer_listen_urls(); let xline_incoming = bind_addrs(client_listen_urls)?; let curp_incoming = bind_addrs(peer_listen_urls)?; - + info!("start xline server on {:?}", client_listen_urls); + info!("start curp server on {:?}", peer_listen_urls); self.start_inner(xline_incoming, curp_incoming).await } @@ -549,9 +522,9 @@ impl XlineServer { .await; let client = Arc::new( - CurpClientBuilder::new(*self.cluster_config.client_config()) + CurpClientBuilder::new(*self.cluster_config.client_config(), false) .cluster_version(self.cluster_info.cluster_version()) - .all_members(self.cluster_info.all_members_addrs()) + .all_members(self.cluster_info.all_members_peer_urls()) .bypass(self.cluster_info.self_id(), curp_server.clone()) .build::() .await?, @@ -691,6 +664,10 @@ impl XlineServer { fn bind_addrs( addrs: &[String], ) -> Result>> { + use std::net::ToSocketAddrs; + if addrs.is_empty() { + return Err(anyhow!("No address to bind")); + } let incoming = addrs .iter() .map(|addr| { diff --git a/crates/xline/src/utils/args.rs b/crates/xline/src/utils/args.rs index 0664af940..98997023f 100644 --- a/crates/xline/src/utils/args.rs +++ b/crates/xline/src/utils/args.rs @@ -37,16 +37,16 @@ pub struct ServerArgs { #[clap(long)] name: String, /// Node peer listen urls - #[clap(long)] + #[clap(long, num_args = 1.., value_delimiter = ',')] peer_listen_urls: Vec, /// Node peer advertise urls - #[clap(long)] + #[clap(long, num_args = 1.., value_delimiter = ',')] peer_advertise_urls: Vec, /// Node client listen urls - #[clap(long)] + #[clap(long, num_args = 1.., value_delimiter = ',')] client_listen_urls: Vec, /// Node client advertise urls - #[clap(long)] + #[clap(long, num_args = 1.., value_delimiter = ',')] client_advertise_urls: Vec, /// Cluster peers. eg: node1=192.168.x.x:8080,192.168.x.x:8081,node2=192.168.x.x:8083 #[clap(long, value_parser = parse_members)] diff --git a/scripts/Dockerfile b/scripts/Dockerfile index 83fe1f9ca..8b56ddf6a 100644 --- a/scripts/Dockerfile +++ b/scripts/Dockerfile @@ -1,9 +1,8 @@ FROM ubuntu:latest -COPY xline /usr/local/bin -COPY benchmark /usr/local/bin - RUN apt-get update && apt-get install -y iproute2 iputils-ping +COPY xline /usr/local/bin +COPY benchmark /usr/local/bin CMD ["/usr/local/bin/xline"] diff --git a/scripts/quick_start.sh b/scripts/quick_start.sh index 1d5bae78a..4a4361ac7 100755 --- a/scripts/quick_start.sh +++ b/scripts/quick_start.sh @@ -4,7 +4,7 @@ DIR=$( pwd ) SERVERS=("172.20.0.2" "172.20.0.3" "172.20.0.4" "172.20.0.5") -MEMBERS="node1=${SERVERS[1]}:2379,${SERVERS[1]}:2380,node2=${SERVERS[2]}:2379,${SERVERS[2]}:2380,node3=${SERVERS[3]}:2379,${SERVERS[3]}:2380" +MEMBERS="node1=${SERVERS[1]}:2380,${SERVERS[1]}:2381,node2=${SERVERS[2]}:2380,${SERVERS[2]}:2381,node3=${SERVERS[3]}:2380,${SERVERS[3]}:2381" source $DIR/log.sh @@ -33,7 +33,11 @@ run_xline() { --storage-engine rocksdb \ --data-dir /usr/local/xline/data-dir \ --auth-public-key /mnt/public.pem \ - --auth-private-key /mnt/private.pem" + --auth-private-key /mnt/private.pem \ + --client-listen-urls=http://${SERVERS[$1]}:2379 \ + --peer-listen-urls=http://${SERVERS[$1]}:2380,http://${SERVERS[$1]}:2381 \ + --client-advertise-urls=http://${SERVERS[$1]}:2379 \ + --peer-advertise-urls=http://${SERVERS[$1]}:2380,http://${SERVERS[$1]}:2381" if [ -n "$LOG_PATH" ]; then cmd="${cmd} --log-file ${LOG_PATH}/node${1} --log-level debug" diff --git a/scripts/validation_test.sh b/scripts/validation_test.sh index c6c8ae74e..0ef27a43a 100755 --- a/scripts/validation_test.sh +++ b/scripts/validation_test.sh @@ -1,7 +1,7 @@ #!/bin/bash DIR="$(dirname $0)" QUICK_START="${DIR}/quick_start.sh" -ETCDCTL="docker exec -i client etcdctl --endpoints=http://172.20.0.3:2379,http://172.20.0.4:2380" +ETCDCTL="docker exec -i client etcdctl --endpoints=http://172.20.0.3:2379,http://172.20.0.4:2379" LOCK_CLIENT="docker exec -i client /mnt/validation_lock_client --endpoints=http://172.20.0.3:2379" @@ -280,7 +280,7 @@ cluster_validation() { log::info "cluster validation test running..." run "${ETCDCTL} member list" - check "\s*[0-9a-z]+, started, node[0-9], 172.20.0.[0-9]:2379,172.20.0.[0-9]:2380, 172.20.0.[0-9]:2379,172.20.0.[0-9]:2380, false" + check "\s*[0-9a-z]+, started, node[0-9], 172.20.0.[0-9]:2380,172.20.0.[0-9]:2381, http://172.20.0.[0-9]:2379, false" run "${ETCDCTL} member add client --peer-urls=http://172.20.0.6:2379 --learner=true" check "Member\s+[a-zA-Z0-9]+ added to cluster\s+[a-zA-Z0-9]+" node_id=$(echo -e ${res} | awk '{print $2}')