Skip to content

Commit

Permalink
test: add some tests for conf change
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Sep 26, 2023
1 parent 67d7bd8 commit 35464d6
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 41 deletions.
2 changes: 1 addition & 1 deletion curp/proto/common
9 changes: 6 additions & 3 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<C: Command> Builder<C> {
}
async move {
let mut protocol_client = ProtocolClient::connect(addr).await?;
let mut req = tonic::Request::new(FetchClusterRequest::new());
let mut req = tonic::Request::new(FetchClusterRequest::default());
req.set_timeout(propose_timeout);
let fetch_cluster_res = protocol_client.fetch_cluster(req).await?.into_inner();
Ok::<FetchClusterResponse, ClientBuildError>(fetch_cluster_res)
Expand Down Expand Up @@ -519,7 +519,10 @@ where
(
connect.id(),
connect
.fetch_leader(FetchLeaderRequest::new(), *self.config.retry_timeout())
.fetch_leader(
FetchLeaderRequest::default(),
*self.config.retry_timeout(),
)
.await,
)
})
Expand Down Expand Up @@ -757,7 +760,7 @@ where
let resp = self
.get_connect(local_server)
.unwrap_or_else(|| unreachable!("self id {} not found", local_server))
.fetch_leader(FetchLeaderRequest::new(), *self.config.retry_timeout())
.fetch_leader(FetchLeaderRequest::default(), *self.config.retry_timeout())
.await?
.into_inner();
Ok((resp.leader_id, resp.term))
Expand Down
1 change: 0 additions & 1 deletion curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ impl ClusterInfo {
.addrs = addrs.into();
}

// TODO
/// Get server addresses via server id
#[must_use]
#[inline]
Expand Down
41 changes: 31 additions & 10 deletions curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,32 @@ impl ConnectApiWrapper {
pub(crate) fn new_from_arc(connect: Arc<dyn InnerConnectApi>) -> Self {
Self(connect)
}

/// Create a new `ConnectApiWrapper` from id and addrs
pub(crate) async fn connect(
id: ServerId,
mut addrs: Vec<String>,
) -> Result<Self, tonic::transport::Error> {
let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE);
// Addrs must start with "http" to communicate with the server
for addr in &mut addrs {
if !addr.starts_with("http://") {
addr.insert_str(0, "http://");
}
let endpoint = Endpoint::from_shared(addr.clone())?;
let _ig = change_tx
.send(tower::discover::Change::Insert(addr.clone(), endpoint))
.await;
}
let client = InnerProtocolClient::new(channel);
let connect = ConnectApiWrapper::new_from_arc(Arc::new(Connect {
id,
rpc_connect: client,
change_tx,
addrs: Mutex::new(addrs),
}));
Ok(connect)
}
}

impl Debug for ConnectApiWrapper {
Expand Down Expand Up @@ -226,18 +252,13 @@ impl<C> Connect<C> {
let new_addrs: HashSet<String> = addrs.iter().cloned().collect();
let diffs = &old_addrs ^ &new_addrs;
for diff in &diffs {
if new_addrs.contains(diff) {
let change = if new_addrs.contains(diff) {
let endpoint = Endpoint::from_shared(diff.clone())?;
let _ig = self
.change_tx
.send(tower::discover::Change::Insert(diff.clone(), endpoint))
.await;
tower::discover::Change::Insert(diff.clone(), endpoint)
} else {
let _ig = self
.change_tx
.send(tower::discover::Change::Remove(diff.clone()))
.await;
}
tower::discover::Change::Remove(diff.clone())
};
let _ig = self.change_tx.send(change).await;
}
*old = addrs;
Ok(())
Expand Down
83 changes: 69 additions & 14 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,13 @@ mod proto {
}
}

impl FetchLeaderRequest {
/// Create a new `FetchLeaderRequest`
pub(crate) fn new() -> Self {
Self {}
}
}

impl FetchLeaderResponse {
/// Create a new `FetchLeaderResponse`
pub(crate) fn new(leader_id: Option<ServerId>, term: u64) -> Self {
Self { leader_id, term }
}
}

impl FetchClusterRequest {
/// Create a new `FetchClusterRequest`
pub(crate) fn new() -> Self {
Self {}
}
}

impl FetchClusterResponse {
/// Create a new `FetchClusterResponse`
pub(crate) fn new(
Expand Down Expand Up @@ -495,6 +481,17 @@ impl ConfChange {
address: vec![address],
}
}

/// Create a new `ConfChange` to promote a learner node
#[must_use]
#[inline]
pub fn promote_learner(node_id: ServerId) -> Self {
Self {
change_type: ConfChangeType::Promote as i32,
node_id,
address: vec![],
}
}
}

impl ProposeConfChangeRequest {
Expand Down Expand Up @@ -561,3 +558,61 @@ impl ConfChangeError {
Self::Propose(error.into())
}
}

#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_conf_change_constructor() {
assert_eq!(
ConfChange::add(1, "123".to_owned()),
ConfChange {
change_type: ConfChangeType::Add as i32,
node_id: 1,
address: vec!["123".to_owned()],
}
);
assert_eq!(
ConfChange::remove(1),
ConfChange {
change_type: ConfChangeType::Remove as i32,
node_id: 1,
address: vec![],
}
);
assert_eq!(
ConfChange::update(1, "123".to_owned()),
ConfChange {
change_type: ConfChangeType::Update as i32,
node_id: 1,
address: vec!["123".to_owned()],
}
);
assert_eq!(
ConfChange::add_learner(1, "123".to_owned()),
ConfChange {
change_type: ConfChangeType::AddLearner as i32,
node_id: 1,
address: vec!["123".to_owned()],
}
);
assert_eq!(
ConfChange::promote_learner(1),
ConfChange {
change_type: ConfChangeType::Promote as i32,
node_id: 1,
address: vec![],
}
);
let conf_entry = ConfChangeEntry {
id: ProposeId::from("test_id"),
changes: vec![ConfChange::remove(1)],
};
assert_eq!(conf_entry.id(), "test_id",);
assert_eq!(conf_entry.changes(), &[ConfChange::remove(1)],);
assert_eq!(
ConfChangeError::new_propose(ProposeError::Duplicated),
ConfChangeError::Propose(ProposeError::Duplicated.into())
);
}
}
22 changes: 13 additions & 9 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {

self.check_new_config(&conf_change)?;

Ok(self.switch_config(conf_change).await)
self.switch_config(conf_change).await
}

/// Get a receiver for conf changes
Expand Down Expand Up @@ -1268,7 +1268,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {

/// Switch to a new config and return true if self node is removed
#[allow(clippy::unimplemented)] // TODO: remove this when learner is implemented
async fn switch_config(&self, conf_change: ConfChange) -> bool {
async fn switch_config(&self, conf_change: ConfChange) -> Result<bool, ConfChangeError> {
let node_id = conf_change.node_id;
let remove_self = match conf_change.change_type() {
ConfChangeType::Add => {
Expand All @@ -1278,11 +1278,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
self.lst.insert(node_id);
_ = self.ctx.sync_events.insert(node_id, Arc::new(Event::new()));
self.ctx.cluster_info.insert(member);
// let connect = ConnectApiWrapper::new_from_arc(Arc::new(
// InnerConnect::new(conf_change.node_id, address).await,
// ));
// _ = self.ctx.connects.insert(connect.id(), connect);
// TODO
let connect = ConnectApiWrapper::connect(node_id, conf_change.address.clone())
.await
.map_err(|e| ConfChangeError::Other(e.to_string()))?;
_ = self.ctx.connects.insert(connect.id(), connect);
false
}
ConfChangeType::Remove => {
Expand All @@ -1298,7 +1297,12 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
self.ctx
.cluster_info
.update(&node_id, conf_change.address.clone());
// TODO: update connect
if let Some(connect) = self.ctx.connects.get_mut(&node_id) {
connect
.update_addrs(conf_change.address.clone())
.await
.map_err(|e| ConfChangeError::Other(e.to_string()))?;
}
false
}
ConfChangeType::AddLearner | ConfChangeType::Promote => {
Expand All @@ -1309,6 +1313,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
.change_tx
.send(conf_change)
.unwrap_or_else(|_e| unreachable!("change_rx should not be dropped"));
remove_self
Ok(remove_self)
}
}
55 changes: 55 additions & 0 deletions curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use utils::config::{

use super::*;
use crate::rpc::connect::MockInnerConnectApi;
use crate::ProposeConfChangeRequest;
use crate::{
server::{
cmd_board::CommandBoard,
Expand Down Expand Up @@ -811,6 +812,12 @@ async fn update_node_should_update_the_address_of_node() {
Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change()))
};
let follower_id = curp.cluster().get_id_by_name("S1").unwrap();
let mut mock_connect = MockInnerConnectApi::new();
mock_connect.expect_update_addrs().returning(|_| Ok(()));
curp.set_connect(
follower_id,
ConnectApiWrapper::new_from_arc(Arc::new(mock_connect)),
);
assert_eq!(
curp.cluster().addrs(follower_id),
Some(vec!["S1".to_owned()])
Expand All @@ -826,3 +833,51 @@ async fn update_node_should_update_the_address_of_node() {
Some(vec!["http://127.0.0.1:4567".to_owned()])
);
}

#[traced_test]
#[tokio::test]
async fn leader_handle_propose_conf_change() {
let curp = {
let exe_tx = MockCEEventTxApi::<TestCommand>::default();
Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change()))
};
let follower_id = curp.cluster().get_id_by_name("S1").unwrap();
assert_eq!(
curp.cluster().addrs(follower_id),
Some(vec!["S1".to_owned()])
);
let changes = vec![ConfChange::update(
follower_id,
"http://127.0.0.1:4567".to_owned(),
)];
let conf_change_entry = ProposeConfChangeRequest::new("test_id".to_owned(), changes);
let ((leader, term), result) = curp.handle_propose_conf_change(conf_change_entry.into());
assert_eq!(leader, Some(curp.id().clone()));
assert_eq!(term, 1);
assert!(result.is_ok());
}

#[traced_test]
#[tokio::test]
async fn follower_handle_propose_conf_change() {
let curp = {
let exe_tx = MockCEEventTxApi::<TestCommand>::default();
Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change()))
};
curp.update_to_term_and_become_follower(&mut *curp.st.write(), 2);

let follower_id = curp.cluster().get_id_by_name("S1").unwrap();
assert_eq!(
curp.cluster().addrs(follower_id),
Some(vec!["S1".to_owned()])
);
let changes = vec![ConfChange::update(
follower_id,
"http://127.0.0.1:4567".to_owned(),
)];
let conf_change_entry = ProposeConfChangeRequest::new("test_id".to_owned(), changes);
let ((leader, term), result) = curp.handle_propose_conf_change(conf_change_entry.into());
assert_eq!(leader, None);
assert_eq!(term, 2);
assert!(result.is_err());
}
5 changes: 2 additions & 3 deletions curp/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ async fn propose_remove_node() {
assert!(members.iter().all(|m| m.id != node_id));
}

#[ignore] // TODO: use this test after multi-address and update node is supported
#[tokio::test(flavor = "multi_thread")]
#[abort_on_panic]
async fn propose_update_node() {
Expand All @@ -361,7 +360,7 @@ async fn propose_update_node() {
.unwrap();
assert_eq!(members.len(), 5);
let member = members.iter().find(|m| m.id == node_id);
assert!(member.is_some_and(|m| &m.addrs == &["new_addr"]));
assert!(member.is_some_and(|m| m.addrs == ["new_addr"]));
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -438,5 +437,5 @@ async fn propose_conf_change_to_learner() {
.unwrap();
assert_eq!(members.len(), 5);
let member = members.iter().find(|m| m.id == node_id);
assert!(member.is_some_and(|m| &m.addrs == &["new_addr"]));
assert!(member.is_some_and(|m| m.addrs == ["new_addr"]));
}

0 comments on commit 35464d6

Please sign in to comment.