Skip to content

Commit

Permalink
feat: split client and peer port
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Feb 17, 2024
1 parent fb8b97a commit 2f0f759
Show file tree
Hide file tree
Showing 27 changed files with 310 additions and 196 deletions.
4 changes: 2 additions & 2 deletions crates/benchmark/src/bench_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ mod test {
cluster.start().await;
let use_curp_client = true;
let config = ClientOptions::default();
let mut client = BenchClient::new(cluster.addrs(), use_curp_client, config)
let mut client = BenchClient::new(cluster.all_client_addrs(), use_curp_client, config)
.await
.unwrap();
//check xline client put value exist
Expand All @@ -238,7 +238,7 @@ mod test {
cluster.start().await;
let use_curp_client = false;
let config = ClientOptions::default();
let mut client = BenchClient::new(cluster.addrs(), use_curp_client, config)
let mut client = BenchClient::new(cluster.all_client_addrs(), use_curp_client, config)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/curp/proto/common
Submodule common updated 1 files
+4 −3 src/curp-command.proto
2 changes: 2 additions & 0 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub trait ClientApi {
&self,
node_id: ServerId,
node_name: String,
node_client_urls: Vec<String>,
) -> Result<(), Self::Error>;

/// Send move leader request
Expand Down Expand Up @@ -144,6 +145,7 @@ trait RepeatableClientApi: ClientApi {
propose_id: ProposeId,
node_id: ServerId,
node_name: String,
node_client_urls: Vec<String>,
) -> Result<(), Self::Error>;
}

Expand Down
10 changes: 9 additions & 1 deletion crates/curp/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,21 @@ where
&self,
node_id: ServerId,
node_name: String,
node_client_urls: Vec<String>,
) -> Result<(), Self::Error> {
let propose_id = self
.retry::<_, _>(RepeatableClientApi::gen_propose_id)
.await?;
self.retry::<_, _>(|client| {
let name_c = node_name.clone();
RepeatableClientApi::propose_publish(client, propose_id, node_id, name_c)
let node_client_urls_c = node_client_urls.clone();
RepeatableClientApi::propose_publish(
client,
propose_id,
node_id,
name_c,
node_client_urls_c,
)
})
.await
}
Expand Down
65 changes: 34 additions & 31 deletions crates/curp/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use std::{
use curp_external_api::LogIndex;
use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult};
use tokio::time::Instant;
#[cfg(not(madsim))]
use tonic::transport::ClientTlsConfig;
use tracing_test::traced_test;
#[cfg(madsim)]
use utils::ClientTlsConfig;

use super::{
retry::{Retry, RetryConfig},
Expand Down Expand Up @@ -78,9 +81,9 @@ async fn test_unary_fetch_clusters_serializable() {
term: 1,
cluster_id: 123,
members: vec![
Member::new(0, "S0", vec!["A0".to_owned()], false),
Member::new(1, "S1", vec!["A1".to_owned()], false),
Member::new(2, "S2", vec!["A2".to_owned()], false),
Member::new(0, "S0", vec!["A0".to_owned()], [], false),
Member::new(1, "S1", vec!["A1".to_owned()], [], false),
Member::new(2, "S2", vec!["A2".to_owned()], [], false),
],
cluster_version: 1,
}))
Expand Down Expand Up @@ -136,11 +139,11 @@ async fn test_unary_fetch_clusters_linearizable() {
term: 2,
cluster_id: 123,
members: vec![
Member::new(0, "S0", vec!["A0".to_owned()], false),
Member::new(1, "S1", vec!["A1".to_owned()], false),
Member::new(2, "S2", vec!["A2".to_owned()], false),
Member::new(3, "S3", vec!["A3".to_owned()], false),
Member::new(4, "S4", vec!["A4".to_owned()], false),
Member::new(0, "S0", vec!["A0".to_owned()], [], false),
Member::new(1, "S1", vec!["A1".to_owned()], [], false),
Member::new(2, "S2", vec!["A2".to_owned()], [], false),
Member::new(3, "S3", vec!["A3".to_owned()], [], false),
Member::new(4, "S4", vec!["A4".to_owned()], [], false),
],
cluster_version: 1,
},
Expand All @@ -163,11 +166,11 @@ async fn test_unary_fetch_clusters_linearizable() {
term: 1, // with the old term
cluster_id: 123,
members: vec![
Member::new(0, "S0", vec!["B0".to_owned()], false),
Member::new(1, "S1", vec!["B1".to_owned()], false),
Member::new(2, "S2", vec!["B2".to_owned()], false),
Member::new(3, "S3", vec!["B3".to_owned()], false),
Member::new(4, "S4", vec!["B4".to_owned()], false),
Member::new(0, "S0", vec!["B0".to_owned()], [], false),
Member::new(1, "S1", vec!["B1".to_owned()], [], false),
Member::new(2, "S2", vec!["B2".to_owned()], [], false),
Member::new(3, "S3", vec!["B3".to_owned()], [], false),
Member::new(4, "S4", vec!["B4".to_owned()], [], false),
],
cluster_version: 1,
},
Expand Down Expand Up @@ -202,11 +205,11 @@ async fn test_unary_fetch_clusters_linearizable_failed() {
term: 2,
cluster_id: 123,
members: vec![
Member::new(0, "S0", vec!["A0".to_owned()], false),
Member::new(1, "S1", vec!["A1".to_owned()], false),
Member::new(2, "S2", vec!["A2".to_owned()], false),
Member::new(3, "S3", vec!["A3".to_owned()], false),
Member::new(4, "S4", vec!["A4".to_owned()], false),
Member::new(0, "S0", vec!["A0".to_owned()], [], false),
Member::new(1, "S1", vec!["A1".to_owned()], [], false),
Member::new(2, "S2", vec!["A2".to_owned()], [], false),
Member::new(3, "S3", vec!["A3".to_owned()], [], false),
Member::new(4, "S4", vec!["A4".to_owned()], [], false),
],
cluster_version: 1,
},
Expand All @@ -229,11 +232,11 @@ async fn test_unary_fetch_clusters_linearizable_failed() {
term: 1, // with the old term
cluster_id: 123,
members: vec![
Member::new(0, "S0", vec!["B0".to_owned()], false),
Member::new(1, "S1", vec!["B1".to_owned()], false),
Member::new(2, "S2", vec!["B2".to_owned()], false),
Member::new(3, "S3", vec!["B3".to_owned()], false),
Member::new(4, "S4", vec!["B4".to_owned()], false),
Member::new(0, "S0", vec!["B0".to_owned()], [], false),
Member::new(1, "S1", vec!["B1".to_owned()], [], false),
Member::new(2, "S2", vec!["B2".to_owned()], [], false),
Member::new(3, "S3", vec!["B3".to_owned()], [], false),
Member::new(4, "S4", vec!["B4".to_owned()], [], false),
],
cluster_version: 1,
},
Expand Down Expand Up @@ -408,9 +411,9 @@ async fn test_unary_slow_round_fetch_leader_first() {
term: 1,
cluster_id: 123,
members: vec![
Member::new(0, "S0", vec!["A0".to_owned()], false),
Member::new(1, "S1", vec!["A1".to_owned()], false),
Member::new(2, "S2", vec!["A2".to_owned()], false),
Member::new(0, "S0", vec!["A0".to_owned()], [], false),
Member::new(1, "S1", vec!["A1".to_owned()], [], false),
Member::new(2, "S2", vec!["A2".to_owned()], [], false),
],
cluster_version: 1,
}))
Expand Down Expand Up @@ -663,11 +666,11 @@ async fn test_retry_propose_return_retry_error() {
term: 2,
cluster_id: 123,
members: vec![
Member::new(0, "S0", vec!["A0".to_owned()], false),
Member::new(1, "S1", vec!["A1".to_owned()], false),
Member::new(2, "S2", vec!["A2".to_owned()], false),
Member::new(3, "S3", vec!["A3".to_owned()], false),
Member::new(4, "S4", vec!["A4".to_owned()], false),
Member::new(0, "S0", vec!["A0".to_owned()], [], false),
Member::new(1, "S1", vec!["A1".to_owned()], [], false),
Member::new(2, "S2", vec!["A2".to_owned()], [], false),
Member::new(3, "S3", vec!["A3".to_owned()], [], false),
Member::new(4, "S4", vec!["A4".to_owned()], [], false),
],
cluster_version: 1,
}))
Expand Down
7 changes: 5 additions & 2 deletions crates/curp/src/client/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,11 @@ impl<C: Command> ClientApi for Unary<C> {
&self,
node_id: ServerId,
node_name: String,
node_client_urls: Vec<String>,
) -> Result<(), Self::Error> {
let propose_id = self.gen_propose_id().await?;
RepeatableClientApi::propose_publish(self, propose_id, node_id, node_name).await
RepeatableClientApi::propose_publish(self, propose_id, node_id, node_name, node_client_urls)
.await
}

/// Send move leader request
Expand Down Expand Up @@ -515,8 +517,9 @@ impl<C: Command> RepeatableClientApi for Unary<C> {
propose_id: ProposeId,
node_id: ServerId,
node_name: String,
node_client_urls: Vec<String>,
) -> Result<(), Self::Error> {
let req = PublishRequest::new(propose_id, node_id, node_name);
let req = PublishRequest::new(propose_id, node_id, node_name, node_client_urls);
let timeout = self.config.wait_synced_timeout;
let _ig = self
.map_leader(|conn| async move { conn.publish(req, timeout).await })
Expand Down
6 changes: 3 additions & 3 deletions crates/curp/src/log_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub(crate) enum EntryData<C> {
ConfChange(Vec<ConfChange>),
/// `Shutdown` entry
Shutdown,
/// `SetName` entry
SetName(ServerId, String),
/// `SetNodeState` entry
SetNodeState(ServerId, String, Vec<String>),
}

impl<C> From<Arc<C>> for EntryData<C> {
Expand All @@ -64,7 +64,7 @@ impl<C> From<PoolEntryInner<C>> for EntryData<C> {

impl<C> From<PublishRequest> for EntryData<C> {
fn from(value: PublishRequest) -> Self {
EntryData::SetName(value.node_id, value.name)
EntryData::SetNodeState(value.node_id, value.name, value.client_urls)
}
}

Expand Down
56 changes: 39 additions & 17 deletions crates/curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ impl Member {
id: ServerId,
name: impl Into<String>,
peer_urls: impl Into<Vec<String>>,
client_urls: impl Into<Vec<String>>,
is_learner: bool,
) -> Self {
Self {
id,
name: name.into(),
peer_urls: peer_urls.into(),
client_urls: vec![], // TODO
client_urls: client_urls.into(),
is_learner,
}
}
Expand Down Expand Up @@ -102,17 +103,20 @@ impl ClusterInfo {
#[inline]
#[must_use]
pub fn from_members_map(
all_members_addrs: HashMap<String, Vec<String>>,
all_members_peer_urls: HashMap<String, Vec<String>>,
self_client_urls: impl Into<Vec<String>>,
self_name: &str,
) -> Self {
let mut member_id = 0;
let self_client_urls = self_client_urls.into();
let members = DashMap::new();
for (name, addrs) in all_members_addrs {
let id = Self::calculate_member_id(addrs.clone(), "", None);
for (name, peer_urls) in all_members_peer_urls {
let id = Self::calculate_member_id(peer_urls.clone(), "", None);
let mut member = Member::new(id, name.clone(), peer_urls, [], false);
if name == self_name {
member_id = id;
member.client_urls = self_client_urls.clone();
}
let member = Member::new(id, name, addrs, false);
let _ig = members.insert(id, member);
}
debug_assert!(member_id != 0, "self_id should not be 0");
Expand Down Expand Up @@ -207,13 +211,20 @@ impl ClusterInfo {
addrs
}

/// Get server addresses via server id
/// Get server peer urls via server id
#[must_use]
#[inline]
pub fn addrs(&self, id: ServerId) -> Option<Vec<String>> {
pub fn peer_urls(&self, id: ServerId) -> Option<Vec<String>> {
self.members.get(&id).map(|t| t.peer_urls.clone())
}

/// Get server client urls via server id
#[must_use]
#[inline]
pub fn client_urls(&self, id: ServerId) -> Option<Vec<String>> {
self.members.get(&id).map(|t| t.client_urls.clone())
}

/// Get the current member
/// # Panics
/// panic if self member id is not in members
Expand All @@ -224,13 +235,20 @@ impl ClusterInfo {
self.members.get(&self.member_id).unwrap()
}

/// Get the current server address
/// Get the current server peer urls
#[must_use]
#[inline]
pub fn self_addrs(&self) -> Vec<String> {
pub fn self_peer_urls(&self) -> Vec<String> {
self.self_member().peer_urls.clone()
}

/// Get the current server client addrs
#[must_use]
#[inline]
pub fn self_client_urls(&self) -> Vec<String> {
self.self_member().client_urls.clone()
}

/// Get the current server id
#[must_use]
#[inline]
Expand Down Expand Up @@ -382,11 +400,15 @@ impl ClusterInfo {
self.members.contains_key(&node_id)
}

/// Set name for a node
pub(crate) fn set_name(&self, node_id: ServerId, name: String) {
/// Set state for a node
pub(crate) fn set_node_state(&self, node_id: ServerId, name: String, client_urls: Vec<String>) {
if let Some(mut s) = self.members.get_mut(&node_id) {
debug!("set name for node {} to {}", node_id, name);
debug!(
"set name and client_urls for node {} to {},{:?}",
node_id, name, client_urls
);
s.name = name;
s.client_urls = client_urls;
}
}
}
Expand Down Expand Up @@ -442,9 +464,9 @@ mod tests {
("S3".to_owned(), vec!["S3".to_owned()]),
]);

let node1 = ClusterInfo::from_members_map(all_members.clone(), "S1");
let node2 = ClusterInfo::from_members_map(all_members.clone(), "S2");
let node3 = ClusterInfo::from_members_map(all_members, "S3");
let node1 = ClusterInfo::from_members_map(all_members.clone(), [], "S1");
let node2 = ClusterInfo::from_members_map(all_members.clone(), [], "S2");
let node3 = ClusterInfo::from_members_map(all_members, [], "S3");

assert_ne!(node1.self_id(), node2.self_id());
assert_ne!(node1.self_id(), node3.self_id());
Expand All @@ -462,10 +484,10 @@ mod tests {
("S3".to_owned(), vec!["S3".to_owned()]),
]);

let node1 = ClusterInfo::from_members_map(all_members, "S1");
let node1 = ClusterInfo::from_members_map(all_members, [], "S1");
let peers = node1.peers_addrs();
let node1_id = node1.self_id();
let node1_url = node1.self_addrs();
let node1_url = node1.self_peer_urls();
assert!(!peers.contains_key(&node1_id));
assert_eq!(peers.len(), 2);
assert_eq!(node1.voters_len(), peers.len() + 1);
Expand Down
8 changes: 7 additions & 1 deletion crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,11 +585,17 @@ impl MoveLeaderRequest {

impl PublishRequest {
/// Create a new `PublishRequest`
pub(crate) fn new(id: ProposeId, node_id: ServerId, name: String) -> Self {
pub(crate) fn new(
id: ProposeId,
node_id: ServerId,
name: String,
client_urls: Vec<String>,
) -> Self {
Self {
propose_id: Some(id.into()),
node_id,
name,
client_urls,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
EntryData::ConfChange(_)
| EntryData::Shutdown
| EntryData::Empty
| EntryData::SetName(_, _) => None,
| EntryData::SetNodeState(_, _, _) => None,
};
*exe_st = ExeState::Executing;
let task = Task {
Expand Down
Loading

0 comments on commit 2f0f759

Please sign in to comment.