From 7c035daaa57c317e869ebbbb17ccca444afef1b0 Mon Sep 17 00:00:00 2001 From: themanforfree Date: Thu, 28 Sep 2023 15:50:15 +0800 Subject: [PATCH] feat: discover cluster in shutdown propose_conf_change and fetch_read_state Signed-off-by: themanforfree --- curp/proto/common | 2 +- curp/src/client.rs | 47 +++++++++++- curp/src/rpc/mod.rs | 27 ++++++- curp/src/server/curp_node.rs | 128 ++++++++++++++++++++++++++++++- curp/tests/common/curp_group.rs | 25 +++++- curp/tests/server.rs | 132 ++++++++++++++++++++++++++------ 6 files changed, 327 insertions(+), 34 deletions(-) diff --git a/curp/proto/common b/curp/proto/common index ef7060fb9..ec575d6fc 160000 --- a/curp/proto/common +++ b/curp/proto/common @@ -1 +1 @@ -Subproject commit ef7060fb9639f70f150d83824c0f348e414dad69 +Subproject commit ec575d6fc145de118a43e96583fdd0d3f2bdc747 diff --git a/curp/src/client.rs b/curp/src/client.rs index 1203aff41..4127ac3bc 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -8,7 +8,7 @@ use std::{ time::Duration, }; -use curp_external_api::cmd::PbSerializeError; +use curp_external_api::cmd::{generate_propose_id, PbSerializeError}; use dashmap::DashMap; use event_listener::Event; use futures::{pin_mut, stream::FuturesUnordered, StreamExt}; @@ -444,12 +444,21 @@ where .get_connect(leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) .shutdown( - ShutdownRequest::default(), + ShutdownRequest::new(generate_propose_id("client"), self.cluster_version()), *self.config.wait_synced_timeout(), ) .await { Ok(resp) => resp.into_inner(), + // TODO + Err(RpcError::StatusError(status)) => { + if matches!(status.code(), tonic::Code::FailedPrecondition) { + if let Ok(cluster) = Cluster::decode(status.details()) { + self.set_cluster(cluster).await; + }; + } + continue; + } Err(e) => { warn!("shutdown rpc error: {e}"); tokio::time::sleep(*self.config.retry_timeout()).await; @@ -519,6 +528,14 @@ where } resp } + Err(RpcError::StatusError(status)) => { + if matches!(status.code(), tonic::Code::FailedPrecondition) { + if let Ok(cluster) = Cluster::decode(status.details()) { + self.set_cluster(cluster).await; + }; + } + continue; + } Err(e) => { // if the propose fails again, need to fetch the leader and try again warn!("failed to resend propose, {e}"); @@ -784,10 +801,24 @@ where let resp = match self .get_connect(leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) - .propose_conf_change(conf_change.clone(), *self.config.wait_synced_timeout()) + .propose_conf_change( + conf_change + .clone() + .with_cluster_version(self.cluster_version()), + *self.config.wait_synced_timeout(), + ) .await { Ok(resp) => resp.into_inner(), + // TODO + Err(RpcError::StatusError(status)) => { + if matches!(status.code(), tonic::Code::FailedPrecondition) { + if let Ok(cluster) = Cluster::decode(status.details()) { + self.set_cluster(cluster).await; + }; + } + continue; + } Err(e) => { warn!("wait synced rpc error: {e}"); tokio::time::sleep(retry_timeout).await; @@ -833,12 +864,20 @@ where .get_connect(leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) .fetch_read_state( - FetchReadStateRequest::new(cmd)?, + FetchReadStateRequest::new(cmd, self.cluster_version())?, *self.config.wait_synced_timeout(), ) .await { Ok(resp) => resp.into_inner(), + Err(RpcError::StatusError(status)) => { + if matches!(status.code(), tonic::Code::FailedPrecondition) { + if let Ok(cluster) = Cluster::decode(status.details()) { + self.set_cluster(cluster).await; + }; + } + continue; + } // TODO Err(e) => { warn!("fetch read state rpc error: {e}"); tokio::time::sleep(retry_timeout).await; diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index c4d9ee36f..9312244e1 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -416,9 +416,10 @@ impl IdSet { impl FetchReadStateRequest { /// Create a new fetch read state request - pub(crate) fn new(cmd: &C) -> bincode::Result { + pub(crate) fn new(cmd: &C, cluster_version: u64) -> bincode::Result { Ok(Self { command: bincode::serialize(cmd)?, + cluster_version, }) } @@ -511,7 +512,19 @@ impl ProposeConfChangeRequest { #[inline] #[must_use] pub fn new(id: String, changes: Vec) -> Self { - Self { id, changes } + Self { + id, + changes, + cluster_version: 0, + } + } + + /// Set cluster version + pub(crate) fn with_cluster_version(self, cluster_version: u64) -> Self { + Self { + cluster_version, + ..self + } } /// Get id of the request @@ -552,6 +565,16 @@ impl From for ConfChangeEntry { } } +impl ShutdownRequest { + /// Create a new shutdown request + pub(crate) fn new(propose_id: ProposeId, cluster_version: u64) -> Self { + Self { + propose_id, + cluster_version, + } + } +} + impl ShutdownResponse { /// Create a new shutdown response pub(crate) fn new(leader_id: Option, term: u64, error: Option) -> Self { diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index b7dc0eae1..e1233f695 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -152,9 +152,10 @@ impl CurpNode { /// Handle `Shutdown` requests pub(super) async fn shutdown( &self, - request: ShutdownRequest, + req: ShutdownRequest, ) -> Result { - let ((leader_id, term), result) = self.curp.handle_shutdown(request.propose_id); + self.check_cluster_version(req.cluster_version)?; + let ((leader_id, term), result) = self.curp.handle_shutdown(req.propose_id); let error = match result { Ok(()) => { CommandBoard::wait_for_shutdown_synced(&self.cmd_board).await; @@ -171,6 +172,7 @@ impl CurpNode { &self, req: ProposeConfChangeRequest, ) -> Result { + self.check_cluster_version(req.cluster_version)?; let id = req.id.clone(); let ((leader_id, term), result) = self.curp.handle_propose_conf_change(req.into()); let error = match result { @@ -343,6 +345,7 @@ impl CurpNode { &self, req: FetchReadStateRequest, ) -> Result { + self.check_cluster_version(req.cluster_version)?; let cmd = req.cmd()?; let state = self.curp.handle_fetch_read_state(&cmd)?; Ok(FetchReadStateResponse::new(state)) @@ -890,12 +893,19 @@ impl Debug for CurpNode { #[cfg(test)] mod tests { - use curp_test_utils::{mock_role_change, sleep_secs, test_cmd::TestCommand}; + use curp_test_utils::{ + mock_role_change, sleep_secs, + test_cmd::{next_id, TestCommand}, + TestRoleChange, + }; + use engine::MemorySnapshotAllocator; use tracing_test::traced_test; use super::*; use crate::{ - rpc::connect::MockInnerConnectApi, server::cmd_worker::MockCEEventTxApi, ConfChange, + rpc::connect::{InnerConnectApiWrapper, MockInnerConnectApi}, + server::cmd_worker::MockCEEventTxApi, + ConfChange, Member, }; #[traced_test] @@ -1023,4 +1033,114 @@ mod tests { assert!(curp.is_leader()); curp.shutdown_trigger().self_shutdown_and_wait().await; } + + #[tokio::test] + async fn curp_node_will_return_new_cluster_by_error() { + let curp = { + let exe_tx = MockCEEventTxApi::::default(); + Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change())) + }; + curp.apply_conf_change(vec![ConfChange::add(123, vec!["address".to_owned()])]) + .unwrap(); + let curp_node = init_curp_node(curp); + + let res = curp_node + .propose(ProposeRequest::new(&TestCommand::new_get(vec![0]), 0)) + .await; + check_err(res.unwrap_err()); + + let res = curp_node + .wait_synced(WaitSyncedRequest::new(next_id().to_string(), 0)) + .await; + check_err(res.unwrap_err()); + + let res = curp_node + .shutdown(ShutdownRequest { + propose_id: next_id().to_string(), + cluster_version: 0, + }) + .await; + check_err(res.unwrap_err()); + + let res = curp_node + .propose_conf_change(ProposeConfChangeRequest::new("id".to_owned(), vec![])) + .await; + check_err(res.unwrap_err()); + + let res = curp_node.fetch_read_state( + FetchReadStateRequest::new(&TestCommand::new_get(vec![0]), 0).unwrap(), + ); + check_err(res.unwrap_err()); + } + + fn check_err(err: CurpError) { + let expect_cluster = Cluster { + leader_id: Some(4_305_557_976_641_631_378), + term: 1, + cluster_id: 16_062_653_394_353_165_714, + members: vec![ + Member { + id: 6_723_373_919_636_969_324, + name: "S1".to_owned(), + addrs: vec!["S1".to_owned()], + is_learner: false, + }, + Member { + id: 123, + name: String::new(), + addrs: vec!["address".to_owned()], + is_learner: false, + }, + Member { + id: 7_539_450_858_769_671_360, + name: "S2".to_owned(), + addrs: vec!["S2".to_owned()], + is_learner: false, + }, + Member { + id: 4_305_557_976_641_631_378, + name: "S0".to_owned(), + addrs: vec!["S0".to_owned()], + is_learner: false, + }, + ], + cluster_version: 1, + }; + let CurpError::Transfer(cluster) = err else { + panic!("expect transfer error"); + }; + assert_eq!(cluster.leader_id, expect_cluster.leader_id); + assert_eq!(cluster.term, expect_cluster.term); + assert_eq!(cluster.cluster_id, expect_cluster.cluster_id); + assert_eq!(cluster.cluster_version, expect_cluster.cluster_version); + let members = cluster + .members + .into_iter() + .map(|m| (m.id, m)) + .collect::>(); + let expect_members = expect_cluster + .members + .into_iter() + .map(|m| (m.id, m)) + .collect::>(); + assert_eq!(members, expect_members); + } + + fn init_curp_node( + curp: Arc>, + ) -> CurpNode { + let spec_pool = Arc::new(Mutex::new(SpeculativePool::new())); + let cmd_board = Arc::new(RwLock::new(CommandBoard::new())); + let ce_event_tx = Arc::new(MockCEEventTxApi::::default()); + let storage = Arc::new(DB::open(&CurpConfig::default().storage_cfg).unwrap()); + let snapshot_allocator = Box::new(MemorySnapshotAllocator::default()); + CurpNode { + curp, + spec_pool, + cmd_board, + ce_event_tx, + storage, + snapshot_allocator, + } + } } diff --git a/curp/tests/common/curp_group.rs b/curp/tests/common/curp_group.rs index b828a08ce..228d9a964 100644 --- a/curp/tests/common/curp_group.rs +++ b/curp/tests/common/curp_group.rs @@ -10,7 +10,7 @@ use curp::{ error::ServerError, members::{ClusterInfo, ServerId}, server::Rpc, - LogIndex, + LogIndex, Member, }; use curp_external_api::cmd::ProposeId; use curp_test_utils::{ @@ -362,6 +362,29 @@ impl CurpGroup { let addr = format!("http://{}", self.nodes[id].addr); ProtocolClient::connect(addr.clone()).await.unwrap() } + + pub async fn fetch_cluster_info(&self, addrs: &[String]) -> ClusterInfo { + let leader_id = self.get_leader().await.0; + let mut connect = self.get_connect(&leader_id).await; + let cluster_res_base = connect + .fetch_cluster(tonic::Request::new(FetchClusterRequest {})) + .await + .unwrap() + .into_inner(); + let members = cluster_res_base + .members + .into_iter() + .map(|m| Member::new(m.id, m.name, m.addrs, m.is_learner)) + .collect(); + let cluster_res = curp::FetchClusterResponse { + leader_id: cluster_res_base.leader_id, + term: cluster_res_base.term, + cluster_id: cluster_res_base.cluster_id, + members, + cluster_version: cluster_res_base.cluster_version, + }; + ClusterInfo::from_cluster(cluster_res, addrs) + } } impl Drop for CurpGroup { diff --git a/curp/tests/server.rs b/curp/tests/server.rs index 0eafc0613..bdc646087 100644 --- a/curp/tests/server.rs +++ b/curp/tests/server.rs @@ -9,7 +9,7 @@ use clippy_utilities::NumericCast; use curp::{ client::{Builder, Client}, error::{CommandProposeError, ProposeError}, - members::{ClusterInfo, Member}, + members::ClusterInfo, ConfChange, ConfChangeError, ProposeConfChangeRequest, }; use curp_external_api::cmd::generate_propose_id; @@ -503,27 +503,7 @@ async fn check_new_node(is_learner: bool) { /******* start new node *******/ // 1. fetch cluster from other nodes - let leader_id = group.get_leader().await.0; - let mut connect = group.get_connect(&leader_id).await; - let cluster_res_base = connect - .fetch_cluster(tonic::Request::new(FetchClusterRequest {})) - .await - .unwrap() - .into_inner(); - let members = cluster_res_base - .members - .into_iter() - .map(|m| Member::new(m.id, m.name, m.addrs, m.is_learner)) - .collect(); - let cluster_res = curp::FetchClusterResponse { - leader_id: cluster_res_base.leader_id, - term: cluster_res_base.term, - cluster_id: cluster_res_base.cluster_id, - members, - cluster_version: cluster_res_base.cluster_version, - }; - let cluster_info = Arc::new(ClusterInfo::from_cluster(cluster_res, &[addr])); - + let cluster_info = Arc::new(group.fetch_cluster_info(&[addr]).await); // 2. start new node group .run_node(listener, "new_node".to_owned(), cluster_info) @@ -572,3 +552,111 @@ async fn new_follower_node_should_apply_old_cluster_logs() { async fn new_learner_node_should_apply_old_cluster_logs() { check_new_node(true).await; } + +#[tokio::test] +#[abort_on_panic] +async fn shutdown_rpc_should_shutdown_the_cluster_when_client_has_wrong_cluster() { + init_logger(); + let tmp_path = tempfile::TempDir::new().unwrap().into_path(); + let mut group = CurpGroup::new_rocks(3, tmp_path.clone()).await; + let client = group.new_client(ClientConfig::default()).await; + + let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); + let addrs = vec![listener.local_addr().unwrap().to_string()]; + let node_id = ClusterInfo::calculate_member_id(addrs.clone(), "", Some(123)); + let conf_change = ProposeConfChangeRequest::new( + generate_propose_id("test"), + vec![ConfChange::add(node_id, addrs.clone())], + ); + let members = client + .propose_conf_change(conf_change) + .await + .unwrap() + .unwrap(); + assert_eq!(members.len(), 4); + assert!(members.iter().any(|m| m.id == node_id)); + + let cluster_info = Arc::new(group.fetch_cluster_info(&addrs).await); + group + .run_node(listener, "new_node".to_owned(), cluster_info) + .await; + + client.shutdown().await.unwrap(); + + sleep_secs(3).await; // wait for the cluster to shutdown + assert!(group.is_finished()); +} + +#[tokio::test] +#[abort_on_panic] +async fn propose_conf_change_rpc_should_work_when_client_has_wrong_cluster() { + init_logger(); + let tmp_path = tempfile::TempDir::new().unwrap().into_path(); + let mut group = CurpGroup::new_rocks(3, tmp_path.clone()).await; + let client = group.new_client(ClientConfig::default()).await; + + let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); + let addrs = vec![listener.local_addr().unwrap().to_string()]; + let node_id = ClusterInfo::calculate_member_id(addrs.clone(), "", Some(123)); + let conf_change = ProposeConfChangeRequest::new( + generate_propose_id("test"), + vec![ConfChange::add(node_id, addrs.clone())], + ); + let members = client + .propose_conf_change(conf_change) + .await + .unwrap() + .unwrap(); + assert_eq!(members.len(), 4); + assert!(members.iter().any(|m| m.id == node_id)); + let cluster_info = Arc::new(group.fetch_cluster_info(&addrs).await); + group + .run_node(listener, "new_node".to_owned(), cluster_info) + .await; + + let conf_change = ProposeConfChangeRequest::new( + generate_propose_id("test"), + vec![ConfChange::remove(node_id)], + ); + let members = client + .propose_conf_change(conf_change) + .await + .unwrap() + .unwrap(); + assert_eq!(members.len(), 3); + assert!(members.iter().all(|m| m.id != node_id)); + sleep_secs(3).await; + assert!(group.nodes.get(&node_id).unwrap().handle.is_finished()); +} + +#[tokio::test] +#[abort_on_panic] +async fn fetch_read_state_rpc_should_work_when_client_has_wrong_cluster() { + init_logger(); + let tmp_path = tempfile::TempDir::new().unwrap().into_path(); + let mut group = CurpGroup::new_rocks(3, tmp_path.clone()).await; + let client = group.new_client(ClientConfig::default()).await; + + let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); + let addrs = vec![listener.local_addr().unwrap().to_string()]; + let node_id = ClusterInfo::calculate_member_id(addrs.clone(), "", Some(123)); + let conf_change = ProposeConfChangeRequest::new( + generate_propose_id("test"), + vec![ConfChange::add(node_id, addrs.clone())], + ); + let members = client + .propose_conf_change(conf_change) + .await + .unwrap() + .unwrap(); + assert_eq!(members.len(), 4); + assert!(members.iter().any(|m| m.id == node_id)); + let cluster_info = Arc::new(group.fetch_cluster_info(&addrs).await); + group + .run_node(listener, "new_node".to_owned(), cluster_info) + .await; + + let cmd = TestCommand::new_get(vec![0]); + let res = client.fetch_read_state(&cmd).await; + println!("{res:?}"); +}