Skip to content

Commit

Permalink
chore: fix validation
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Feb 17, 2024
1 parent 2f0f759 commit 4114b5b
Show file tree
Hide file tree
Showing 16 changed files with 95 additions and 83 deletions.
12 changes: 10 additions & 2 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ pub struct ClientBuilder {
cluster_version: Option<u64>,
/// initial cluster members
all_members: Option<HashMap<ServerId, Vec<String>>>,
/// is current client send request to raw curp server
is_raw_curp: bool,
/// initial leader state
leader_state: Option<(ServerId, u64)>,
/// client configuration
Expand All @@ -188,8 +190,9 @@ impl ClientBuilder {
/// Create a client builder
#[inline]
#[must_use]
pub fn new(config: ClientConfig) -> Self {
pub fn new(config: ClientConfig, is_raw_curp: bool) -> Self {
Self {
is_raw_curp,
config,
..ClientBuilder::default()
}
Expand Down Expand Up @@ -278,7 +281,11 @@ impl ClientBuilder {
if let Some(id) = r.leader_id {
self.leader_state = Some((id, r.term));
}
self.all_members = Some(r.into_members_addrs());
self.all_members = if self.is_raw_curp {
Some(r.into_peer_urls())
} else {
Some(r.into_client_urls())
};
return Ok(self);
}
Err(e) => err = e,
Expand All @@ -301,6 +308,7 @@ impl ClientBuilder {
if let Some((id, term)) = self.leader_state {
builder.set_leader_state(id, term);
}
builder.set_is_raw_curp(self.is_raw_curp);
builder
}

Expand Down
20 changes: 18 additions & 2 deletions crates/curp/src/client/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub(super) struct State {
/// Immutable client state, could be cloned
#[derive(Debug, Clone)]
struct StateStatic {
/// is current client send request to raw curp server
is_raw_curp: bool,
/// Local server id, should be initialized on startup
local_server: Option<ServerId>,
/// Notifier of leader update
Expand Down Expand Up @@ -87,6 +89,7 @@ impl State {
local_server,
leader_notifier: Arc::new(Event::new()),
tls_config,
is_raw_curp: true,
},
})
}
Expand Down Expand Up @@ -215,8 +218,11 @@ impl State {
info!("client cluster version updated to {}", res.cluster_version);
state.cluster_version = res.cluster_version;

let mut new_members = res.clone().into_members_addrs();

let mut new_members = if self.immutable.is_raw_curp {
res.clone().into_peer_urls()
} else {
res.clone().into_client_urls()
};
let old_ids = state.connects.keys().copied().collect::<HashSet<_>>();
let new_ids = new_members.keys().copied().collect::<HashSet<_>>();

Expand Down Expand Up @@ -262,6 +268,8 @@ pub(super) struct StateBuilder {
cluster_version: Option<u64>,
/// Client Tls config
tls_config: Option<ClientTlsConfig>,
/// is current client send request to raw curp server
is_raw_curp: bool,
}

impl StateBuilder {
Expand All @@ -275,9 +283,15 @@ impl StateBuilder {
leader_state: None,
cluster_version: None,
tls_config,
is_raw_curp: false,
}
}

/// Set is raw curp
pub(super) fn set_is_raw_curp(&mut self, is_raw_curp: bool) {
self.is_raw_curp = is_raw_curp;
}

/// Set the leader state (optional)
pub(super) fn set_leader_state(&mut self, id: ServerId, term: u64) {
self.leader_state = Some((id, term));
Expand Down Expand Up @@ -317,6 +331,7 @@ impl StateBuilder {
local_server: Some(local_server_id),
leader_notifier: Arc::new(Event::new()),
tls_config: self.tls_config.take(),
is_raw_curp: self.is_raw_curp,
},
})
}
Expand All @@ -338,6 +353,7 @@ impl StateBuilder {
local_server: None,
leader_notifier: Arc::new(Event::new()),
tls_config: self.tls_config,
is_raw_curp: self.is_raw_curp,
},
})
}
Expand Down
4 changes: 2 additions & 2 deletions crates/curp/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn test_unary_fetch_clusters_serializable() {
let unary = init_unary_client(connects, None, None, 0, 0, None);
let res = unary.fetch_cluster(false).await.unwrap();
assert_eq!(
res.into_members_addrs(),
res.into_peer_urls(),
HashMap::from([
(0, vec!["A0".to_owned()]),
(1, vec!["A1".to_owned()]),
Expand Down Expand Up @@ -182,7 +182,7 @@ async fn test_unary_fetch_clusters_linearizable() {
let unary = init_unary_client(connects, None, None, 0, 0, None);
let res = unary.fetch_cluster(true).await.unwrap();
assert_eq!(
res.into_members_addrs(),
res.into_peer_urls(),
HashMap::from([
(0, vec!["A0".to_owned()]),
(1, vec!["A1".to_owned()]),
Expand Down
12 changes: 6 additions & 6 deletions crates/curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ impl ClusterInfo {
#[must_use]
pub fn from_cluster(
cluster: FetchClusterResponse,
self_addr: &[String],
self_peer_urls: &[String],
self_name: &str,
) -> Self {
let mut member_id = 0;
let members = cluster
.members
.into_iter()
.map(|mut member| {
if member.peer_urls() == self_addr {
if member.peer_urls() == self_peer_urls {
member_id = member.id;
member.name = self_name.to_owned();
}
Expand Down Expand Up @@ -328,7 +328,7 @@ impl ClusterInfo {
/// cluster version decrease
pub(crate) fn cluster_version_update(&self) {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.all_members_addrs()
self.all_members_peer_urls()
.into_iter()
.sorted()
.for_each(|(id, mut addrs)| {
Expand All @@ -355,7 +355,7 @@ impl ClusterInfo {
/// Get all members
#[must_use]
#[inline]
pub fn all_members_addrs(&self) -> HashMap<ServerId, Vec<String>> {
pub fn all_members_peer_urls(&self) -> HashMap<ServerId, Vec<String>> {
self.members
.iter()
.map(|t| (t.id, t.peer_urls.clone()))
Expand Down Expand Up @@ -417,7 +417,7 @@ impl ClusterInfo {
#[inline]
pub async fn get_cluster_info_from_remote(
init_cluster_info: &ClusterInfo,
self_addr: &[String],
self_peer_urls: &[String],
self_name: &str,
timeout: Duration,
tls_config: Option<&ClientTlsConfig>,
Expand All @@ -444,7 +444,7 @@ pub async fn get_cluster_info_from_remote(
debug!("get cluster info from remote success: {:?}", cluster_res);
return Some(ClusterInfo::from_cluster(
cluster_res.into_inner(),
self_addr,
self_peer_urls,
self_name,
));
}
Expand Down
14 changes: 11 additions & 3 deletions crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,19 @@ impl FetchClusterResponse {
}
}

/// Get all members addresses
pub(crate) fn into_members_addrs(self) -> HashMap<ServerId, Vec<String>> {
/// Get all members peer urls
pub(crate) fn into_peer_urls(self) -> HashMap<ServerId, Vec<String>> {
self.members
.into_iter()
.map(|member| (member.id, member.peer_urls)) // TODO
.map(|member| (member.id, member.peer_urls))
.collect()
}

/// Get all members peer urls
pub(crate) fn into_client_urls(self) -> HashMap<ServerId, Vec<String>> {
self.members
.into_iter()
.map(|member| (member.id, member.client_urls))
.collect()
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/curp/tests/it/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, ServerTlsConfig};
use tracing::debug;
use utils::{
build_endpoint, certs,
build_endpoint,
config::{
default_quota, ClientConfig, CurpConfig, CurpConfigBuilder, EngineConfig, StorageConfig,
},
Expand Down Expand Up @@ -305,7 +305,7 @@ impl CurpGroup {

pub async fn new_client(&self) -> impl ClientApi<Error = tonic::Status, Cmd = TestCommand> {
let addrs = self.all_addrs().cloned().collect();
ClientBuilder::new(ClientConfig::default())
ClientBuilder::new(ClientConfig::default(), true)
.discover_from(addrs)
.await
.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions crates/curp/tests/it/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster_when_client_has_wrong_leader()
let leader_id = group.get_leader().await.0;
let follower_id = *group.nodes.keys().find(|&id| &leader_id != id).unwrap();
// build a client and set a wrong leader id
let client = ClientBuilder::new(ClientConfig::default())
let client = ClientBuilder::new(ClientConfig::default(), true)
.leader_state(follower_id, 0)
.all_members(group.all_addrs_map())
.build::<TestCommand>()
Expand All @@ -427,7 +427,7 @@ async fn propose_conf_change_to_follower() {
let leader_id = group.get_leader().await.0;
let follower_id = *group.nodes.keys().find(|&id| &leader_id != id).unwrap();
// build a client and set a wrong leader id
let client = ClientBuilder::new(ClientConfig::default())
let client = ClientBuilder::new(ClientConfig::default(), true)
.leader_state(follower_id, 0)
.all_members(group.all_addrs_map())
.build::<TestCommand>()
Expand Down
4 changes: 2 additions & 2 deletions crates/simulation/src/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl CurpGroup {

let cluster_info = Arc::new(ClusterInfo::from_members_map(all.clone(), [], &name));
all_members = cluster_info
.all_members_addrs()
.all_members_peer_urls()
.into_iter()
.map(|(k, mut v)| (k, v.pop().unwrap()))
.collect();
Expand Down Expand Up @@ -179,7 +179,7 @@ impl CurpGroup {
.collect();
SimClient {
inner: Arc::new(
ClientBuilder::new(config)
ClientBuilder::new(config, true)
.all_members(all_members)
.build()
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/simulation/tests/it/xline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn xline_members_restore() {
init_logger();
let mut group = XlineGroup::new(3).await;
let node = group.get_node("S1");
let addr = node.addr.clone();
let addr = node.client_url.clone();
let client = SimEtcdClient::new(addr, group.client_handle.clone()).await;

let res = client
Expand Down
16 changes: 8 additions & 8 deletions crates/utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub struct ClusterConfig {
client_advertise_urls: Vec<String>,
/// All the nodes in the xline cluster
#[getset(get = "pub")]
members: HashMap<String, Vec<String>>,
peers: HashMap<String, Vec<String>>,
/// Leader node.
#[getset(get = "pub")]
is_leader: bool,
Expand All @@ -126,11 +126,11 @@ impl Default for ClusterConfig {
fn default() -> Self {
Self {
name: "default".to_owned(),
peer_listen_urls: vec!["http://127.0.0.1:2379".to_owned()],
peer_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()],
peer_listen_urls: vec!["http://127.0.0.1:2380".to_owned()],
peer_advertise_urls: vec!["http://127.0.0.1:2380".to_owned()],
client_listen_urls: vec!["http://127.0.0.1:2379".to_owned()],
client_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()],
members: HashMap::from([(
peers: HashMap::from([(
"default".to_owned(),
vec!["http://127.0.0.1:2379".to_owned()],
)]),
Expand Down Expand Up @@ -196,7 +196,7 @@ impl ClusterConfig {
peer_advertise_urls,
client_listen_urls,
client_advertise_urls,
members,
peers: members,
is_leader,
curp_config: curp,
client_config,
Expand Down Expand Up @@ -1188,7 +1188,7 @@ mod tests {
sync_victims_interval = '20ms'
watch_progress_notify_interval = '1s'
[cluster.members]
[cluster.peers]
node1 = ['127.0.0.1:2378', '127.0.0.1:2379']
node2 = ['127.0.0.1:2380']
node3 = ['127.0.0.1:2381']
Expand Down Expand Up @@ -1368,7 +1368,7 @@ mod tests {
client_listen_urls = ['127.0.0.1:2379']
client_advertise_urls = ['127.0.0.1:2379']
[cluster.members]
[cluster.peers]
node1 = ['127.0.0.1:2379']
node2 = ['127.0.0.1:2380']
node3 = ['127.0.0.1:2381']
Expand Down Expand Up @@ -1457,7 +1457,7 @@ mod tests {
client_listen_urls = ['127.0.0.1:2379']
client_advertise_urls = ['127.0.0.1:2379']
[cluster.members]
[cluster.peers]
node1 = ['127.0.0.1:2379']
node2 = ['127.0.0.1:2380']
node3 = ['127.0.0.1:2381']
Expand Down
2 changes: 1 addition & 1 deletion crates/xline-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Client {
.collect();
let channel = Self::build_channel(addrs.clone(), options.tls_config.as_ref()).await?;
let curp_client = Arc::new(
CurpClientBuilder::new(options.client_config)
CurpClientBuilder::new(options.client_config, false)
.discover_from(addrs)
.await?
.build::<Command>()
Expand Down
Loading

0 comments on commit 4114b5b

Please sign in to comment.