From 5dfec836410c7ce200ac9a25fe84c09a053e487c Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:49:26 +0800 Subject: [PATCH 1/9] refactor: reimplement curp client state Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/unary/config.rs | 47 ++++++++++++++++++++++++++ crates/curp/src/client/unary/mod.rs | 8 +++++ crates/curp/src/client/unary/state.rs | 46 +++++++++++++++++++++++++ 3 files changed, 101 insertions(+) create mode 100644 crates/curp/src/client/unary/config.rs create mode 100644 crates/curp/src/client/unary/state.rs diff --git a/crates/curp/src/client/unary/config.rs b/crates/curp/src/client/unary/config.rs new file mode 100644 index 000000000..061662dae --- /dev/null +++ b/crates/curp/src/client/unary/config.rs @@ -0,0 +1,47 @@ +use std::time::Duration; + +use tonic::transport::ClientTlsConfig; + +use crate::members::ServerId; + +/// Client config +#[derive(Debug, Clone)] +pub(crate) struct Config { + /// Local server id, should be initialized on startup + local_server: Option, + /// Client tls config + tls_config: Option, + /// The rpc timeout of a propose request + propose_timeout: Duration, + /// The rpc timeout of a 2-RTT request, usually takes longer than propose timeout + /// + /// The recommended the values is within (propose_timeout, 2 * propose_timeout]. + wait_synced_timeout: Duration, +} + +impl Config { + /// Get the local server id + pub(crate) fn local_server(&self) -> Option { + self.local_server + } + + /// Get the client TLS config + pub(crate) fn tls_config(&self) -> Option<&ClientTlsConfig> { + self.tls_config.as_ref() + } + + /// Get the propose timeout + pub(crate) fn propose_timeout(&self) -> Duration { + self.propose_timeout + } + + /// Get the wait synced timeout + pub(crate) fn wait_synced_timeout(&self) -> Duration { + self.wait_synced_timeout + } + + /// Returns `true` if the current client is on the server + pub(crate) fn is_raw_curp(&self) -> bool { + self.local_server.is_some() + } +} diff --git a/crates/curp/src/client/unary/mod.rs b/crates/curp/src/client/unary/mod.rs index 90986bdb7..743c7d35d 100644 --- a/crates/curp/src/client/unary/mod.rs +++ b/crates/curp/src/client/unary/mod.rs @@ -1,6 +1,14 @@ /// Client propose implementation mod propose_impl; +#[allow(unused)] +/// State of the unary client +mod state; + +#[allow(unused)] +/// Config of the client +mod config; + use std::{ cmp::Ordering, marker::PhantomData, diff --git a/crates/curp/src/client/unary/state.rs b/crates/curp/src/client/unary/state.rs new file mode 100644 index 000000000..c85994566 --- /dev/null +++ b/crates/curp/src/client/unary/state.rs @@ -0,0 +1,46 @@ +use std::{collections::HashMap, sync::Arc}; + +use crate::{members::ServerId, rpc::connect::ConnectApi}; + +/// The cluster state +/// +/// The client must discover the cluster info before sending any propose +struct ClusterState { + /// Leader id. + leader: ServerId, + /// Term, initialize to 0, calibrated by the server. + term: u64, + /// Cluster version, initialize to 0, calibrated by the server. + cluster_version: u64, + /// Members' connect, calibrated by the server. + connects: HashMap>, +} + +impl std::fmt::Debug for ClusterState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("State") + .field("leader", &self.leader) + .field("term", &self.term) + .field("cluster_version", &self.cluster_version) + .field("connects", &self.connects.keys()) + .finish() + } +} + +impl ClusterState { + /// Updates the current leader + fn update_leader(&mut self, leader: ServerId, term: u64) { + self.leader = leader; + self.term = term; + } + + /// Updates the cluster + fn update_cluster( + &mut self, + cluster_version: u64, + connects: HashMap>, + ) { + self.cluster_version = cluster_version; + self.connects = connects; + } +} From aab9916726509672529ffc23a2888f47efe437a3 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 14:38:54 +0800 Subject: [PATCH 2/9] refactor: add new to `Config` Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/unary/config.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/curp/src/client/unary/config.rs b/crates/curp/src/client/unary/config.rs index 061662dae..41ef0a9e3 100644 --- a/crates/curp/src/client/unary/config.rs +++ b/crates/curp/src/client/unary/config.rs @@ -20,6 +20,21 @@ pub(crate) struct Config { } impl Config { + /// Creates a new `Config` + pub(crate) fn new( + local_server: Option, + tls_config: Option, + propose_timeout: Duration, + wait_synced_timeout: Duration, + ) -> Self { + Self { + local_server, + tls_config, + propose_timeout, + wait_synced_timeout, + } + } + /// Get the local server id pub(crate) fn local_server(&self) -> Option { self.local_server From c36eb4629f31fbf8fecd43aee858084ae69c6d90 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 14:58:10 +0800 Subject: [PATCH 3/9] feat: implement map functions for `ClusterState` Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/unary/state.rs | 55 ++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/crates/curp/src/client/unary/state.rs b/crates/curp/src/client/unary/state.rs index c85994566..6393ab8e9 100644 --- a/crates/curp/src/client/unary/state.rs +++ b/crates/curp/src/client/unary/state.rs @@ -1,6 +1,11 @@ use std::{collections::HashMap, sync::Arc}; -use crate::{members::ServerId, rpc::connect::ConnectApi}; +use futures::{stream::FuturesUnordered, Future}; + +use crate::{ + members::ServerId, + rpc::{connect::ConnectApi, CurpError}, +}; /// The cluster state /// @@ -28,6 +33,54 @@ impl std::fmt::Debug for ClusterState { } impl ClusterState { + /// Take an async function and map to the dedicated server, return None + /// if the server can not found in local state + pub(crate) fn map_server>>( + &self, + id: ServerId, + f: impl FnOnce(Arc) -> F, + ) -> Option { + // If the leader id cannot be found in connects, it indicates that there is + // an inconsistency between the client's local leader state and the cluster + // state, then mock a `WrongClusterVersion` return to the outside. + self.connects.get(&id).map(Arc::clone).map(f) + } + + /// Take an async function and map to the dedicated server, return None + /// if the server can not found in local state + pub(crate) fn map_leader>>( + &self, + f: impl FnOnce(Arc) -> F, + ) -> F { + // If the leader id cannot be found in connects, it indicates that there is + // an inconsistency between the client's local leader state and the cluster + // state, then mock a `WrongClusterVersion` return to the outside. + f(Arc::clone(self.connects.get(&self.leader).unwrap_or_else( + || unreachable!("leader connect should always exists"), + ))) + } + + /// Take an async function and map to all server, returning `FuturesUnordered` + pub(crate) fn for_each_server>( + &self, + f: impl FnMut(Arc) -> F, + ) -> FuturesUnordered { + self.connects.values().map(Arc::clone).map(f).collect() + } + + /// Take an async function and map to all server, returning `FuturesUnordered` + pub(crate) fn for_each_follower>( + &self, + f: impl FnMut(Arc) -> F, + ) -> FuturesUnordered { + self.connects + .iter() + .filter_map(|(id, conn)| (*id != self.leader).then_some(conn)) + .map(Arc::clone) + .map(f) + .collect() + } + /// Updates the current leader fn update_leader(&mut self, leader: ServerId, term: u64) { self.leader = leader; From 86404127ac876d795d3e63665b2e87619b125094 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 15:03:33 +0800 Subject: [PATCH 4/9] feat: implement `get_quorum` method for `ClusterState` Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/unary/state.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/curp/src/client/unary/state.rs b/crates/curp/src/client/unary/state.rs index 6393ab8e9..a5ed3a725 100644 --- a/crates/curp/src/client/unary/state.rs +++ b/crates/curp/src/client/unary/state.rs @@ -81,6 +81,15 @@ impl ClusterState { .collect() } + /// Returns the quorum size based on the given quorum function + /// + /// NOTE: Do not update the cluster in between an `for_each_xxx` and an `get_quorum`, which may + /// lead to inconsistent quorum. + pub(crate) fn get_quorum usize>(&self, mut quorum: Q) -> usize { + let cluster_size = self.connects.len(); + quorum(cluster_size) + } + /// Updates the current leader fn update_leader(&mut self, leader: ServerId, term: u64) { self.leader = leader; From 94e482ec81a45361b3b59717f4d652e3662e4e00 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 15:18:09 +0800 Subject: [PATCH 5/9] chore: rename state.rs to cluster_state.rs Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/unary/{state.rs => cluster_state.rs} | 0 crates/curp/src/client/unary/mod.rs | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename crates/curp/src/client/unary/{state.rs => cluster_state.rs} (100%) diff --git a/crates/curp/src/client/unary/state.rs b/crates/curp/src/client/unary/cluster_state.rs similarity index 100% rename from crates/curp/src/client/unary/state.rs rename to crates/curp/src/client/unary/cluster_state.rs diff --git a/crates/curp/src/client/unary/mod.rs b/crates/curp/src/client/unary/mod.rs index 743c7d35d..f06ec8d8f 100644 --- a/crates/curp/src/client/unary/mod.rs +++ b/crates/curp/src/client/unary/mod.rs @@ -3,7 +3,7 @@ mod propose_impl; #[allow(unused)] /// State of the unary client -mod state; +mod cluster_state; #[allow(unused)] /// Config of the client From 9b34bdcd461f1dc27e52b02e4223333a41e14ef4 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 18:47:26 +0800 Subject: [PATCH 6/9] chore: move cluster_state to upper level Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/{unary => }/cluster_state.rs | 0 crates/curp/src/client/mod.rs | 4 ++++ crates/curp/src/client/unary/mod.rs | 4 ---- 3 files changed, 4 insertions(+), 4 deletions(-) rename crates/curp/src/client/{unary => }/cluster_state.rs (100%) diff --git a/crates/curp/src/client/unary/cluster_state.rs b/crates/curp/src/client/cluster_state.rs similarity index 100% rename from crates/curp/src/client/unary/cluster_state.rs rename to crates/curp/src/client/cluster_state.rs diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 8cc18ca44..8e0d0f440 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -17,6 +17,10 @@ mod retry; /// State for clients mod state; +#[allow(unused)] +/// State of the cluster +mod cluster_state; + /// Tests for client #[cfg(test)] mod tests; diff --git a/crates/curp/src/client/unary/mod.rs b/crates/curp/src/client/unary/mod.rs index f06ec8d8f..93563e072 100644 --- a/crates/curp/src/client/unary/mod.rs +++ b/crates/curp/src/client/unary/mod.rs @@ -1,10 +1,6 @@ /// Client propose implementation mod propose_impl; -#[allow(unused)] -/// State of the unary client -mod cluster_state; - #[allow(unused)] /// Config of the client mod config; From 2485e7fdaadfcc0984421c19732f668dd545f025 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 4 Sep 2024 09:22:45 +0800 Subject: [PATCH 7/9] chore: remove mutable methods from ClusterState Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/cluster_state.rs | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/crates/curp/src/client/cluster_state.rs b/crates/curp/src/client/cluster_state.rs index a5ed3a725..8d5818fa9 100644 --- a/crates/curp/src/client/cluster_state.rs +++ b/crates/curp/src/client/cluster_state.rs @@ -90,19 +90,8 @@ impl ClusterState { quorum(cluster_size) } - /// Updates the current leader - fn update_leader(&mut self, leader: ServerId, term: u64) { - self.leader = leader; - self.term = term; - } - - /// Updates the cluster - fn update_cluster( - &mut self, - cluster_version: u64, - connects: HashMap>, - ) { - self.cluster_version = cluster_version; - self.connects = connects; + /// Returns the term of the cluster + pub(crate) fn term(&self) -> u64 { + self.term } } From 6d25924527b90cf75ff6e8738b73a72edbfbc443 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:29:33 +0800 Subject: [PATCH 8/9] fix: curp client config Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/unary/config.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/curp/src/client/unary/config.rs b/crates/curp/src/client/unary/config.rs index 41ef0a9e3..cc149d966 100644 --- a/crates/curp/src/client/unary/config.rs +++ b/crates/curp/src/client/unary/config.rs @@ -5,7 +5,7 @@ use tonic::transport::ClientTlsConfig; use crate::members::ServerId; /// Client config -#[derive(Debug, Clone)] +#[derive(Default, Debug, Clone)] pub(crate) struct Config { /// Local server id, should be initialized on startup local_server: Option, @@ -17,6 +17,8 @@ pub(crate) struct Config { /// /// The recommended the values is within (propose_timeout, 2 * propose_timeout]. wait_synced_timeout: Duration, + /// is current client send request to raw curp server + is_raw_curp: bool, } impl Config { @@ -26,12 +28,14 @@ impl Config { tls_config: Option, propose_timeout: Duration, wait_synced_timeout: Duration, + is_raw_curp: bool, ) -> Self { Self { local_server, tls_config, propose_timeout, wait_synced_timeout, + is_raw_curp, } } @@ -57,6 +61,6 @@ impl Config { /// Returns `true` if the current client is on the server pub(crate) fn is_raw_curp(&self) -> bool { - self.local_server.is_some() + self.is_raw_curp } } From 0755a3c492d68000a14ac292a41ce2da2554823e Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:51:19 +0800 Subject: [PATCH 9/9] feat: reimplement fetch cluster Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> chore: move fetch_impl to upper level chore: move config.rs to upper level Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> refactor: client fetch Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/cluster_state.rs | 18 ++- crates/curp/src/client/{unary => }/config.rs | 0 crates/curp/src/client/fetch.rs | 121 +++++++++++++++++++ crates/curp/src/client/mod.rs | 8 ++ crates/curp/src/client/unary/mod.rs | 19 ++- 5 files changed, 159 insertions(+), 7 deletions(-) rename crates/curp/src/client/{unary => }/config.rs (100%) create mode 100644 crates/curp/src/client/fetch.rs diff --git a/crates/curp/src/client/cluster_state.rs b/crates/curp/src/client/cluster_state.rs index 8d5818fa9..cdfe72b47 100644 --- a/crates/curp/src/client/cluster_state.rs +++ b/crates/curp/src/client/cluster_state.rs @@ -10,7 +10,8 @@ use crate::{ /// The cluster state /// /// The client must discover the cluster info before sending any propose -struct ClusterState { +#[derive(Default, Clone)] +pub(crate) struct ClusterState { /// Leader id. leader: ServerId, /// Term, initialize to 0, calibrated by the server. @@ -33,6 +34,21 @@ impl std::fmt::Debug for ClusterState { } impl ClusterState { + /// Creates a new `ClusterState` + pub(crate) fn new( + leader: ServerId, + term: u64, + cluster_version: u64, + connects: HashMap>, + ) -> Self { + Self { + leader, + term, + cluster_version, + connects, + } + } + /// Take an async function and map to the dedicated server, return None /// if the server can not found in local state pub(crate) fn map_server>>( diff --git a/crates/curp/src/client/unary/config.rs b/crates/curp/src/client/config.rs similarity index 100% rename from crates/curp/src/client/unary/config.rs rename to crates/curp/src/client/config.rs diff --git a/crates/curp/src/client/fetch.rs b/crates/curp/src/client/fetch.rs new file mode 100644 index 000000000..1b7f4e187 --- /dev/null +++ b/crates/curp/src/client/fetch.rs @@ -0,0 +1,121 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use curp_external_api::cmd::Command; +use futures::{future, FutureExt, StreamExt}; +use parking_lot::RwLock; +use tonic::Response; +use tracing::warn; +use utils::parking_lot_lock::RwLockMap; + +use crate::{ + quorum, + rpc::{self, connect::ConnectApi, CurpError, FetchClusterRequest, FetchClusterResponse}, +}; + +use super::cluster_state::ClusterState; +use super::config::Config; + +/// Fetch cluster implementation +struct Fetch { + /// The fetch config + config: Config, +} + +impl Fetch { + /// Creates a new `Fetch` + pub(crate) fn new(config: Config) -> Self { + Self { config } + } + + /// Fetch cluster and updates the current state + pub(crate) async fn fetch_cluster( + &self, + state: ClusterState, + ) -> Result { + /// Retry interval + const FETCH_RETRY_INTERVAL: Duration = Duration::from_secs(1); + loop { + let resp = self + .pre_fetch(&state) + .await + .ok_or(CurpError::internal("cluster not available"))?; + let new_members = self.member_addrs(&resp); + let new_connects = self.connect_to(new_members); + let new_state = ClusterState::new( + resp.leader_id + .unwrap_or_else(|| unreachable!("leader id should be Some")) + .into(), + resp.term, + resp.cluster_version, + new_connects, + ); + if self.fetch_term(&new_state).await { + return Ok(new_state); + } + warn!("Fetch cluster failed, sleep for {FETCH_RETRY_INTERVAL:?}"); + tokio::time::sleep(FETCH_RETRY_INTERVAL).await; + } + } + + /// Fetch the term of the cluster. This ensures that the current leader is the latest. + async fn fetch_term(&self, state: &ClusterState) -> bool { + let timeout = self.config.wait_synced_timeout(); + let term = state.term(); + let quorum = state.get_quorum(quorum); + state + .for_each_server(|c| async move { + c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout) + .await + }) + .filter_map(|r| future::ready(r.ok())) + .map(Response::into_inner) + .filter(move |resp| future::ready(resp.term == term)) + .take(quorum) + .count() + .map(move |t| t >= quorum) + .await + } + + /// Prefetch, send fetch cluster request to the cluster and get the + /// config with the greatest quorum. + async fn pre_fetch(&self, state: &ClusterState) -> Option { + let timeout = self.config.wait_synced_timeout(); + let requests = state.for_each_server(|c| async move { + c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout) + .await + }); + let responses: Vec<_> = requests + .filter_map(|r| future::ready(r.ok())) + .map(Response::into_inner) + .collect() + .await; + responses + .into_iter() + .filter(|resp| resp.leader_id.is_some()) + .filter(|resp| !resp.members.is_empty()) + .max_by(|x, y| x.term.cmp(&y.term)) + } + + /// Gets the member addresses to connect to + fn member_addrs(&self, resp: &FetchClusterResponse) -> HashMap> { + if self.config.is_raw_curp() { + resp.clone().into_peer_urls() + } else { + resp.clone().into_client_urls() + } + } + + /// Connect to the given addrs + fn connect_to( + &self, + new_members: HashMap>, + ) -> HashMap> { + new_members + .into_iter() + .map(|(id, addrs)| { + let tls_config = self.config.tls_config().cloned(); + (id, rpc::connect(id, addrs, tls_config)) + }) + .collect() + } +} diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 8e0d0f440..f759f18b9 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -21,6 +21,14 @@ mod state; /// State of the cluster mod cluster_state; +#[allow(unused)] +/// Client cluster fetch implementation +mod fetch; + +#[allow(unused)] +/// Config of the client +mod config; + /// Tests for client #[cfg(test)] mod tests; diff --git a/crates/curp/src/client/unary/mod.rs b/crates/curp/src/client/unary/mod.rs index 93563e072..41f510e80 100644 --- a/crates/curp/src/client/unary/mod.rs +++ b/crates/curp/src/client/unary/mod.rs @@ -1,10 +1,6 @@ /// Client propose implementation mod propose_impl; -#[allow(unused)] -/// Config of the client -mod config; - use std::{ cmp::Ordering, marker::PhantomData, @@ -20,8 +16,8 @@ use tonic::Response; use tracing::{debug, warn}; use super::{ - state::State, ClientApi, LeaderStateUpdate, ProposeIdGuard, ProposeResponse, - RepeatableClientApi, + cluster_state::ClusterState, config::Config, state::State, ClientApi, LeaderStateUpdate, + ProposeIdGuard, ProposeResponse, RepeatableClientApi, }; use crate::{ members::ServerId, @@ -68,6 +64,13 @@ pub(super) struct Unary { last_sent_seq: AtomicU64, /// marker phantom: PhantomData, + + #[allow(dead_code)] + /// Cluster state + cluster_state: RwLock, + #[allow(dead_code)] + /// Cluster state + client_config: Config, } impl Unary { @@ -79,6 +82,10 @@ impl Unary { tracker: RwLock::new(Tracker::default()), last_sent_seq: AtomicU64::new(0), phantom: PhantomData, + + // TODO: build cluster state + cluster_state: RwLock::default(), + client_config: Config::default(), } }