Skip to content

Commit

Permalink
feat: discover cluster in shutdown propose_conf_change and fetch_read…
Browse files Browse the repository at this point in the history
…_state

Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Oct 9, 2023
1 parent f539bfb commit 7c035da
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 34 deletions.
2 changes: 1 addition & 1 deletion curp/proto/common
47 changes: 43 additions & 4 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
time::Duration,
};

use curp_external_api::cmd::PbSerializeError;
use curp_external_api::cmd::{generate_propose_id, PbSerializeError};
use dashmap::DashMap;
use event_listener::Event;
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
Expand Down Expand Up @@ -444,12 +444,21 @@ where
.get_connect(leader_id)
.unwrap_or_else(|| unreachable!("leader {leader_id} not found"))
.shutdown(
ShutdownRequest::default(),
ShutdownRequest::new(generate_propose_id("client"), self.cluster_version()),
*self.config.wait_synced_timeout(),
)
.await
{
Ok(resp) => resp.into_inner(),
// TODO
Err(RpcError::StatusError(status)) => {
if matches!(status.code(), tonic::Code::FailedPrecondition) {
if let Ok(cluster) = Cluster::decode(status.details()) {
self.set_cluster(cluster).await;
};
}
continue;
}
Err(e) => {
warn!("shutdown rpc error: {e}");
tokio::time::sleep(*self.config.retry_timeout()).await;
Expand Down Expand Up @@ -519,6 +528,14 @@ where
}
resp
}
Err(RpcError::StatusError(status)) => {
if matches!(status.code(), tonic::Code::FailedPrecondition) {
if let Ok(cluster) = Cluster::decode(status.details()) {
self.set_cluster(cluster).await;
};
}
continue;
}
Err(e) => {
// if the propose fails again, need to fetch the leader and try again
warn!("failed to resend propose, {e}");
Expand Down Expand Up @@ -784,10 +801,24 @@ where
let resp = match self
.get_connect(leader_id)
.unwrap_or_else(|| unreachable!("leader {leader_id} not found"))
.propose_conf_change(conf_change.clone(), *self.config.wait_synced_timeout())
.propose_conf_change(
conf_change
.clone()
.with_cluster_version(self.cluster_version()),
*self.config.wait_synced_timeout(),
)
.await
{
Ok(resp) => resp.into_inner(),
// TODO
Err(RpcError::StatusError(status)) => {
if matches!(status.code(), tonic::Code::FailedPrecondition) {
if let Ok(cluster) = Cluster::decode(status.details()) {
self.set_cluster(cluster).await;
};
}
continue;
}
Err(e) => {
warn!("wait synced rpc error: {e}");
tokio::time::sleep(retry_timeout).await;
Expand Down Expand Up @@ -833,12 +864,20 @@ where
.get_connect(leader_id)
.unwrap_or_else(|| unreachable!("leader {leader_id} not found"))
.fetch_read_state(
FetchReadStateRequest::new(cmd)?,
FetchReadStateRequest::new(cmd, self.cluster_version())?,
*self.config.wait_synced_timeout(),
)
.await
{
Ok(resp) => resp.into_inner(),
Err(RpcError::StatusError(status)) => {
if matches!(status.code(), tonic::Code::FailedPrecondition) {
if let Ok(cluster) = Cluster::decode(status.details()) {
self.set_cluster(cluster).await;
};
}
continue;
} // TODO
Err(e) => {
warn!("fetch read state rpc error: {e}");
tokio::time::sleep(retry_timeout).await;
Expand Down
27 changes: 25 additions & 2 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,10 @@ impl IdSet {

impl FetchReadStateRequest {
/// Create a new fetch read state request
pub(crate) fn new<C: Command>(cmd: &C) -> bincode::Result<Self> {
pub(crate) fn new<C: Command>(cmd: &C, cluster_version: u64) -> bincode::Result<Self> {
Ok(Self {
command: bincode::serialize(cmd)?,
cluster_version,
})
}

Expand Down Expand Up @@ -511,7 +512,19 @@ impl ProposeConfChangeRequest {
#[inline]
#[must_use]
pub fn new(id: String, changes: Vec<ConfChange>) -> Self {
Self { id, changes }
Self {
id,
changes,
cluster_version: 0,
}
}

/// Set cluster version
pub(crate) fn with_cluster_version(self, cluster_version: u64) -> Self {
Self {
cluster_version,
..self
}
}

/// Get id of the request
Expand Down Expand Up @@ -552,6 +565,16 @@ impl From<ProposeConfChangeRequest> for ConfChangeEntry {
}
}

impl ShutdownRequest {
/// Create a new shutdown request
pub(crate) fn new(propose_id: ProposeId, cluster_version: u64) -> Self {
Self {
propose_id,
cluster_version,
}
}
}

impl ShutdownResponse {
/// Create a new shutdown response
pub(crate) fn new(leader_id: Option<ServerId>, term: u64, error: Option<ProposeError>) -> Self {
Expand Down
128 changes: 124 additions & 4 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
/// Handle `Shutdown` requests
pub(super) async fn shutdown(
&self,
request: ShutdownRequest,
req: ShutdownRequest,
) -> Result<ShutdownResponse, CurpError> {
let ((leader_id, term), result) = self.curp.handle_shutdown(request.propose_id);
self.check_cluster_version(req.cluster_version)?;
let ((leader_id, term), result) = self.curp.handle_shutdown(req.propose_id);
let error = match result {
Ok(()) => {
CommandBoard::wait_for_shutdown_synced(&self.cmd_board).await;
Expand All @@ -171,6 +172,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
&self,
req: ProposeConfChangeRequest,
) -> Result<ProposeConfChangeResponse, CurpError> {
self.check_cluster_version(req.cluster_version)?;
let id = req.id.clone();
let ((leader_id, term), result) = self.curp.handle_propose_conf_change(req.into());
let error = match result {
Expand Down Expand Up @@ -343,6 +345,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
&self,
req: FetchReadStateRequest,
) -> Result<FetchReadStateResponse, CurpError> {
self.check_cluster_version(req.cluster_version)?;
let cmd = req.cmd()?;
let state = self.curp.handle_fetch_read_state(&cmd)?;
Ok(FetchReadStateResponse::new(state))
Expand Down Expand Up @@ -890,12 +893,19 @@ impl<C: Command, RC: RoleChange> Debug for CurpNode<C, RC> {

#[cfg(test)]
mod tests {
use curp_test_utils::{mock_role_change, sleep_secs, test_cmd::TestCommand};
use curp_test_utils::{
mock_role_change, sleep_secs,
test_cmd::{next_id, TestCommand},
TestRoleChange,
};
use engine::MemorySnapshotAllocator;
use tracing_test::traced_test;

use super::*;
use crate::{
rpc::connect::MockInnerConnectApi, server::cmd_worker::MockCEEventTxApi, ConfChange,
rpc::connect::{InnerConnectApiWrapper, MockInnerConnectApi},
server::cmd_worker::MockCEEventTxApi,
ConfChange, Member,
};

#[traced_test]
Expand Down Expand Up @@ -1023,4 +1033,114 @@ mod tests {
assert!(curp.is_leader());
curp.shutdown_trigger().self_shutdown_and_wait().await;
}

#[tokio::test]
async fn curp_node_will_return_new_cluster_by_error() {
let curp = {
let exe_tx = MockCEEventTxApi::<TestCommand>::default();
Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change()))
};
curp.apply_conf_change(vec![ConfChange::add(123, vec!["address".to_owned()])])
.unwrap();
let curp_node = init_curp_node(curp);

let res = curp_node
.propose(ProposeRequest::new(&TestCommand::new_get(vec![0]), 0))
.await;
check_err(res.unwrap_err());

let res = curp_node
.wait_synced(WaitSyncedRequest::new(next_id().to_string(), 0))
.await;
check_err(res.unwrap_err());

let res = curp_node
.shutdown(ShutdownRequest {
propose_id: next_id().to_string(),
cluster_version: 0,
})
.await;
check_err(res.unwrap_err());

let res = curp_node
.propose_conf_change(ProposeConfChangeRequest::new("id".to_owned(), vec![]))
.await;
check_err(res.unwrap_err());

let res = curp_node.fetch_read_state(
FetchReadStateRequest::new(&TestCommand::new_get(vec![0]), 0).unwrap(),
);
check_err(res.unwrap_err());
}

fn check_err(err: CurpError) {
let expect_cluster = Cluster {
leader_id: Some(4_305_557_976_641_631_378),
term: 1,
cluster_id: 16_062_653_394_353_165_714,
members: vec![
Member {
id: 6_723_373_919_636_969_324,
name: "S1".to_owned(),
addrs: vec!["S1".to_owned()],
is_learner: false,
},
Member {
id: 123,
name: String::new(),
addrs: vec!["address".to_owned()],
is_learner: false,
},
Member {
id: 7_539_450_858_769_671_360,
name: "S2".to_owned(),
addrs: vec!["S2".to_owned()],
is_learner: false,
},
Member {
id: 4_305_557_976_641_631_378,
name: "S0".to_owned(),
addrs: vec!["S0".to_owned()],
is_learner: false,
},
],
cluster_version: 1,
};
let CurpError::Transfer(cluster) = err else {
panic!("expect transfer error");
};
assert_eq!(cluster.leader_id, expect_cluster.leader_id);
assert_eq!(cluster.term, expect_cluster.term);
assert_eq!(cluster.cluster_id, expect_cluster.cluster_id);
assert_eq!(cluster.cluster_version, expect_cluster.cluster_version);
let members = cluster
.members
.into_iter()
.map(|m| (m.id, m))
.collect::<HashMap<_, _>>();
let expect_members = expect_cluster
.members
.into_iter()
.map(|m| (m.id, m))
.collect::<HashMap<_, _>>();
assert_eq!(members, expect_members);
}

fn init_curp_node(
curp: Arc<RawCurp<TestCommand, TestRoleChange>>,
) -> CurpNode<TestCommand, TestRoleChange> {
let spec_pool = Arc::new(Mutex::new(SpeculativePool::new()));
let cmd_board = Arc::new(RwLock::new(CommandBoard::new()));
let ce_event_tx = Arc::new(MockCEEventTxApi::<TestCommand>::default());
let storage = Arc::new(DB::open(&CurpConfig::default().storage_cfg).unwrap());
let snapshot_allocator = Box::new(MemorySnapshotAllocator::default());
CurpNode {
curp,
spec_pool,
cmd_board,
ce_event_tx,
storage,
snapshot_allocator,
}
}
}
25 changes: 24 additions & 1 deletion curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use curp::{
error::ServerError,
members::{ClusterInfo, ServerId},
server::Rpc,
LogIndex,
LogIndex, Member,
};
use curp_external_api::cmd::ProposeId;
use curp_test_utils::{
Expand Down Expand Up @@ -362,6 +362,29 @@ impl CurpGroup {
let addr = format!("http://{}", self.nodes[id].addr);
ProtocolClient::connect(addr.clone()).await.unwrap()
}

pub async fn fetch_cluster_info(&self, addrs: &[String]) -> ClusterInfo {
let leader_id = self.get_leader().await.0;
let mut connect = self.get_connect(&leader_id).await;
let cluster_res_base = connect
.fetch_cluster(tonic::Request::new(FetchClusterRequest {}))
.await
.unwrap()
.into_inner();
let members = cluster_res_base
.members
.into_iter()
.map(|m| Member::new(m.id, m.name, m.addrs, m.is_learner))
.collect();
let cluster_res = curp::FetchClusterResponse {
leader_id: cluster_res_base.leader_id,
term: cluster_res_base.term,
cluster_id: cluster_res_base.cluster_id,
members,
cluster_version: cluster_res_base.cluster_version,
};
ClusterInfo::from_cluster(cluster_res, addrs)
}
}

impl Drop for CurpGroup {
Expand Down
Loading

0 comments on commit 7c035da

Please sign in to comment.