Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client discover cluster #460

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion curp/proto/common
226 changes: 175 additions & 51 deletions curp/src/client.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions curp/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ pub enum ClientError<C: Command> {
/// Serialize and Deserialize Error
#[error("EncodeDecode error: {0}")]
EncodeDecode(String),
/// Wrong cluster version
#[error("wrong cluster version")]
WrongClusterVersion,
}

impl<C: Command> From<PbSerializeError> for ClientError<C> {
Expand Down
9 changes: 2 additions & 7 deletions curp/src/log_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) enum EntryData<C> {
/// `ConfChange` entry
ConfChange(Box<ConfChangeEntry>), // Box to fix variant_size_differences
/// `Shutdown` entry
Shutdown,
Shutdown(ProposeId),
}

impl<C> From<ConfChangeEntry> for EntryData<C> {
Expand Down Expand Up @@ -69,12 +69,7 @@ where
match self.entry_data {
EntryData::Command(ref cmd) => cmd.id(),
EntryData::ConfChange(ref e) => e.id(),
EntryData::Shutdown => {
unreachable!(
"LogEntry::id() should not be called on {:?} entry",
self.entry_data
);
}
EntryData::Shutdown(id) => id,
}
}
}
50 changes: 49 additions & 1 deletion curp/src/members.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::Hasher,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

use dashmap::{mapref::one::Ref, DashMap};
use itertools::Itertools;

use crate::Member;
use crate::rpc::FetchClusterResponse;
pub use crate::Member;

/// Server Id
pub type ServerId = u64;
Expand Down Expand Up @@ -67,6 +72,8 @@ pub struct ClusterInfo {
member_id: ServerId,
/// all members information
members: DashMap<ServerId, Member>,
/// cluster version
cluster_version: Arc<AtomicU64>,
}

impl ClusterInfo {
Expand All @@ -91,11 +98,35 @@ impl ClusterInfo {
cluster_id: 0,
member_id,
members,
cluster_version: Arc::new(AtomicU64::new(0)),
};
cluster_info.gen_cluster_id();
cluster_info
}

/// Construct a new `ClusterInfo` from `FetchClusterResponse`
#[inline]
#[must_use]
pub fn from_cluster(cluster: FetchClusterResponse, self_addr: &[String]) -> Self {
let mut member_id = 0;
let members = cluster
.members
.into_iter()
.map(|member| {
if member.addrs == self_addr {
member_id = member.id;
}
(member.id, member)
})
.collect();
Self {
cluster_id: cluster.cluster_id,
member_id,
members,
cluster_version: Arc::new(AtomicU64::new(cluster.cluster_version)),
}
}

/// Get all members
#[must_use]
#[inline]
Expand Down Expand Up @@ -227,6 +258,23 @@ impl ClusterInfo {
self.cluster_id
}

/// Get cluster version
#[must_use]
#[inline]
pub fn cluster_version(&self) -> u64 {
self.cluster_version.load(Ordering::Relaxed)
}

/// cluster version increase
pub(crate) fn cluster_version_inc(&self) -> u64 {
self.cluster_version.fetch_add(1, Ordering::Relaxed)
}

/// cluster version decrease
pub(crate) fn cluster_version_dec(&self) -> u64 {
self.cluster_version.fetch_sub(1, Ordering::Relaxed)
}
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved

/// Get peers
#[must_use]
#[inline]
Expand Down
38 changes: 33 additions & 5 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ impl FetchClusterResponse {
term: u64,
cluster_id: u64,
members: Vec<Member>,
cluster_version: u64,
) -> Self {
Self {
leader_id,
term,
cluster_id,
members,
cluster_version,
}
}

Expand All @@ -109,9 +111,10 @@ impl FetchClusterResponse {

impl ProposeRequest {
/// Create a new `Propose` request
pub(crate) fn new<C: Command>(cmd: &C) -> Self {
pub(crate) fn new<C: Command>(cmd: &C, cluster_version: u64) -> Self {
Self {
command: cmd.encode(),
cluster_version,
}
}

Expand Down Expand Up @@ -192,9 +195,10 @@ impl ProposeResponse {

impl WaitSyncedRequest {
/// Create a `WaitSynced` request
pub(crate) fn new(propose_id: ProposeId) -> Self {
pub(crate) fn new(id: ProposeId, cluster_version: u64) -> Self {
Self {
propose_id: Some(propose_id.into()),
propose_id: Some(id.into()),
cluster_version,
}
}

Expand Down Expand Up @@ -432,9 +436,10 @@ impl IdSet {

impl FetchReadStateRequest {
/// Create a new fetch read state request
pub(crate) fn new<C: Command>(cmd: &C) -> bincode::Result<Self> {
pub(crate) fn new<C: Command>(cmd: &C, cluster_version: u64) -> bincode::Result<Self> {
Ok(Self {
command: bincode::serialize(cmd)?,
cluster_version,
})
}

Expand Down Expand Up @@ -526,10 +531,11 @@ impl ProposeConfChangeRequest {
/// Create a new `ProposeConfChangeRequest`
#[inline]
#[must_use]
pub fn new(id: ProposeId, changes: Vec<ConfChange>) -> Self {
pub fn new(id: ProposeId, changes: Vec<ConfChange>, cluster_version: u64) -> Self {
Self {
propose_id: Some(id.into()),
changes,
cluster_version,
}
}

Expand Down Expand Up @@ -581,6 +587,28 @@ impl From<ProposeConfChangeRequest> for ConfChangeEntry {
}
}

impl ShutdownRequest {
/// Create a new shutdown request
pub(crate) fn new(id: ProposeId, cluster_version: u64) -> Self {
Self {
propose_id: Some(id.into()),
cluster_version,
}
}

/// Get id of the request
#[inline]
#[must_use]
pub fn id(&self) -> ProposeId {
self.propose_id
.clone()
.unwrap_or_else(|| {
unreachable!("propose id should be set in propose conf change request")
})
.into()
}
}

impl ConfChangeError {
/// Create a new `ConfChangeError` with `ProposeError`
pub(crate) fn new_propose(error: ProposeError) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
Err(err) => Some(err),
}
}
EntryData::ConfChange(_) | EntryData::Shutdown => None,
EntryData::ConfChange(_) | EntryData::Shutdown(_) => None,
};
*exe_st = ExeState::Executing;
let task = Task {
Expand Down
4 changes: 2 additions & 2 deletions curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async fn worker_exe<
);
er_ok
}
EntryData::ConfChange(_) | EntryData::Shutdown => true,
EntryData::ConfChange(_) | EntryData::Shutdown(_) => true,
}
}

Expand Down Expand Up @@ -152,7 +152,7 @@ async fn worker_as<
debug!("{id} cmd({}) after sync is called", entry.id());
asr_ok
}
EntryData::Shutdown => {
EntryData::Shutdown(_) => {
curp.enter_shutdown();
if let Err(e) = ce.set_last_applied(entry.index) {
error!("failed to set last_applied, {e}");
Expand Down
45 changes: 41 additions & 4 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub(super) enum CurpError {
/// If client sent a wait synced request to a non-leader
#[error("redirect to {0:?}, term {1}")]
Redirect(Option<ServerId>, u64),
/// Wrong cluster version
#[error("wrong cluster version")]
WrongClusterVersion,
}

impl From<bincode::Error> for CurpError {
Expand Down Expand Up @@ -143,6 +146,10 @@ impl From<CurpError> for Status {
metadata,
)
}
CurpError::WrongClusterVersion => {
let metadata = gen_metadata("wrong-cluster-version");
Status::with_metadata(Code::FailedPrecondition, "wrong cluster version", metadata)
}
}
}
}
Expand Down Expand Up @@ -249,6 +256,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
if self.curp.is_shutdown() {
return Err(CurpError::ShuttingDown);
}
self.check_cluster_version(req.cluster_version)?;
let cmd: Arc<C> = Arc::new(req.cmd()?);

// handle proposal
Expand All @@ -268,9 +276,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
/// Handle `Shutdown` requests
pub(super) async fn shutdown(
&self,
_request: ShutdownRequest,
req: ShutdownRequest,
) -> Result<ShutdownResponse, CurpError> {
self.curp.handle_shutdown()?;
self.check_cluster_version(req.cluster_version)?;
self.curp.handle_shutdown(req.id())?;
CommandBoard::wait_for_shutdown_synced(&self.cmd_board).await;
Ok(ShutdownResponse::default())
}
Expand All @@ -280,6 +289,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
&self,
req: ProposeConfChangeRequest,
) -> Result<ProposeConfChangeResponse, CurpError> {
self.check_cluster_version(req.cluster_version)?;
let id = req.id();
let ((leader_id, term), result) = self.curp.handle_propose_conf_change(req.into())?;
let error = match result {
Expand Down Expand Up @@ -348,6 +358,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
if self.curp.is_shutdown() {
return Err(CurpError::ShuttingDown);
}
self.check_cluster_version(req.cluster_version)?;
let id = req.propose_id();
debug!("{} get wait synced request for cmd({id})", self.curp.id());

Expand All @@ -372,8 +383,13 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
// the leader
Vec::new()
};
let cluster_version = self.curp.cluster().cluster_version();
Ok(FetchClusterResponse::new(
leader_id, term, cluster_id, members,
leader_id,
term,
cluster_id,
members,
cluster_version,
))
}

Expand Down Expand Up @@ -449,6 +465,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
&self,
req: FetchReadStateRequest,
) -> Result<FetchReadStateResponse, CurpError> {
self.check_cluster_version(req.cluster_version)?;
let cmd = req.cmd()?;
let state = self.curp.handle_fetch_read_state(&cmd);
Ok(FetchReadStateResponse::new(state))
Expand Down Expand Up @@ -625,6 +642,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
let id = connect.id();
let batch_timeout = curp.cfg().batch_timeout;
let mut is_shutdown_state = false;
let mut is_remove_state = false;

#[allow(clippy::integer_arithmetic)] // tokio select internal triggered
let leader_retired = loop {
Expand All @@ -644,7 +662,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
}
}
_ = remove_event.listen() => {
break false;
is_remove_state = true;
bsbds marked this conversation as resolved.
Show resolved Hide resolved
}
_now = ticker.tick() => {
hb_opt = false;
Expand All @@ -656,6 +674,8 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
}
}

let can_remove_node = is_remove_state && curp.can_remove_follower_after_hb(id);

let Some(sync_action) = curp.sync(id) else {
break true;
};
Expand All @@ -680,6 +700,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
if is_shutdown_state && is_empty && curp.is_synced() {
break false;
}
if can_remove_node {
curp.remove_node_status(id);
break false;
}
}
}
SyncAction::Snapshot(rx) => match rx.await {
Expand Down Expand Up @@ -956,6 +980,19 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
pub(super) fn shutdown_listener(&self) -> shutdown::Listener {
self.curp.shutdown_listener()
}

/// Check cluster version and return new cluster
fn check_cluster_version(&self, client_cluster_version: u64) -> Result<(), CurpError> {
let server_cluster_version = self.curp.cluster().cluster_version();
if client_cluster_version != server_cluster_version {
debug!(
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
"client cluster version({}) and server cluster version({}) not match",
client_cluster_version, server_cluster_version
);
return Err(CurpError::WrongClusterVersion);
}
Ok(())
}
}

impl<C: Command, RC: RoleChange> Debug for CurpNode<C, RC> {
Expand Down
Loading
Loading