Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reimplement curp client fetch cluster #1005

Open
wants to merge 9 commits into
base: refactor-curp-client
Choose a base branch
from
113 changes: 113 additions & 0 deletions crates/curp/src/client/cluster_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::{collections::HashMap, sync::Arc};

use futures::{stream::FuturesUnordered, Future};

use crate::{
members::ServerId,
rpc::{connect::ConnectApi, CurpError},
};

/// The cluster state
///
/// The client must discover the cluster info before sending any propose
#[derive(Default, Clone)]
pub(crate) struct ClusterState {
/// Leader id.
leader: ServerId,
/// Term, initialize to 0, calibrated by the server.
term: u64,
/// Cluster version, initialize to 0, calibrated by the server.
cluster_version: u64,
/// Members' connect, calibrated by the server.
connects: HashMap<ServerId, Arc<dyn ConnectApi>>,
}

impl std::fmt::Debug for ClusterState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("State")
.field("leader", &self.leader)
.field("term", &self.term)
.field("cluster_version", &self.cluster_version)
.field("connects", &self.connects.keys())
.finish()
}
}

impl ClusterState {
/// Creates a new `ClusterState`
pub(crate) fn new(
leader: ServerId,
term: u64,
cluster_version: u64,
connects: HashMap<ServerId, Arc<dyn ConnectApi>>,
) -> Self {
Self {
leader,
term,
cluster_version,
connects,
}
}

/// Take an async function and map to the dedicated server, return None
/// if the server can not found in local state
pub(crate) fn map_server<R, F: Future<Output = Result<R, CurpError>>>(
&self,
id: ServerId,
f: impl FnOnce(Arc<dyn ConnectApi>) -> F,
) -> Option<F> {
// If the leader id cannot be found in connects, it indicates that there is
// an inconsistency between the client's local leader state and the cluster
// state, then mock a `WrongClusterVersion` return to the outside.
self.connects.get(&id).map(Arc::clone).map(f)
}

/// Take an async function and map to the dedicated server, return None
/// if the server can not found in local state
pub(crate) fn map_leader<R, F: Future<Output = Result<R, CurpError>>>(
&self,
f: impl FnOnce(Arc<dyn ConnectApi>) -> F,
) -> F {
// If the leader id cannot be found in connects, it indicates that there is
// an inconsistency between the client's local leader state and the cluster
// state, then mock a `WrongClusterVersion` return to the outside.
f(Arc::clone(self.connects.get(&self.leader).unwrap_or_else(
|| unreachable!("leader connect should always exists"),
)))
}

/// Take an async function and map to all server, returning `FuturesUnordered<F>`
pub(crate) fn for_each_server<R, F: Future<Output = R>>(
&self,
f: impl FnMut(Arc<dyn ConnectApi>) -> F,
) -> FuturesUnordered<F> {
self.connects.values().map(Arc::clone).map(f).collect()
}

/// Take an async function and map to all server, returning `FuturesUnordered<F>`
pub(crate) fn for_each_follower<R, F: Future<Output = R>>(
&self,
f: impl FnMut(Arc<dyn ConnectApi>) -> F,
) -> FuturesUnordered<F> {
self.connects
.iter()
.filter_map(|(id, conn)| (*id != self.leader).then_some(conn))
.map(Arc::clone)
.map(f)
.collect()
}

/// Returns the quorum size based on the given quorum function
///
/// NOTE: Do not update the cluster in between an `for_each_xxx` and an `get_quorum`, which may
/// lead to inconsistent quorum.
pub(crate) fn get_quorum<Q: FnMut(usize) -> usize>(&self, mut quorum: Q) -> usize {
let cluster_size = self.connects.len();
quorum(cluster_size)
}

/// Returns the term of the cluster
pub(crate) fn term(&self) -> u64 {
self.term
}
}
66 changes: 66 additions & 0 deletions crates/curp/src/client/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::time::Duration;

use tonic::transport::ClientTlsConfig;

use crate::members::ServerId;

/// Client config
#[derive(Default, Debug, Clone)]
pub(crate) struct Config {
/// Local server id, should be initialized on startup
local_server: Option<ServerId>,
/// Client tls config
tls_config: Option<ClientTlsConfig>,
/// The rpc timeout of a propose request
propose_timeout: Duration,
/// The rpc timeout of a 2-RTT request, usually takes longer than propose timeout
///
/// The recommended the values is within (propose_timeout, 2 * propose_timeout].
wait_synced_timeout: Duration,
/// is current client send request to raw curp server
is_raw_curp: bool,
}

impl Config {
/// Creates a new `Config`
pub(crate) fn new(
local_server: Option<ServerId>,
tls_config: Option<ClientTlsConfig>,
propose_timeout: Duration,
wait_synced_timeout: Duration,
is_raw_curp: bool,
) -> Self {
Self {
local_server,
tls_config,
propose_timeout,
wait_synced_timeout,
is_raw_curp,
}
}

/// Get the local server id
pub(crate) fn local_server(&self) -> Option<ServerId> {
self.local_server
}

/// Get the client TLS config
pub(crate) fn tls_config(&self) -> Option<&ClientTlsConfig> {
self.tls_config.as_ref()
}

/// Get the propose timeout
pub(crate) fn propose_timeout(&self) -> Duration {
self.propose_timeout
}

/// Get the wait synced timeout
pub(crate) fn wait_synced_timeout(&self) -> Duration {
self.wait_synced_timeout
}

/// Returns `true` if the current client is on the server
pub(crate) fn is_raw_curp(&self) -> bool {
self.is_raw_curp
}
}
121 changes: 121 additions & 0 deletions crates/curp/src/client/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use curp_external_api::cmd::Command;
use futures::{future, FutureExt, StreamExt};
use parking_lot::RwLock;
use tonic::Response;
use tracing::warn;
use utils::parking_lot_lock::RwLockMap;

use crate::{
quorum,
rpc::{self, connect::ConnectApi, CurpError, FetchClusterRequest, FetchClusterResponse},
};

use super::cluster_state::ClusterState;
use super::config::Config;

/// Fetch cluster implementation
struct Fetch {
/// The fetch config
config: Config,
}

impl Fetch {
/// Creates a new `Fetch`
pub(crate) fn new(config: Config) -> Self {
Self { config }
}

/// Fetch cluster and updates the current state
pub(crate) async fn fetch_cluster(
&self,
state: ClusterState,
) -> Result<ClusterState, CurpError> {
/// Retry interval
const FETCH_RETRY_INTERVAL: Duration = Duration::from_secs(1);
loop {
let resp = self
.pre_fetch(&state)
.await
.ok_or(CurpError::internal("cluster not available"))?;
let new_members = self.member_addrs(&resp);
let new_connects = self.connect_to(new_members);
let new_state = ClusterState::new(
resp.leader_id
.unwrap_or_else(|| unreachable!("leader id should be Some"))
.into(),
resp.term,
resp.cluster_version,
new_connects,
);
if self.fetch_term(&new_state).await {
return Ok(new_state);
}
warn!("Fetch cluster failed, sleep for {FETCH_RETRY_INTERVAL:?}");
tokio::time::sleep(FETCH_RETRY_INTERVAL).await;
}
}

/// Fetch the term of the cluster. This ensures that the current leader is the latest.
async fn fetch_term(&self, state: &ClusterState) -> bool {
let timeout = self.config.wait_synced_timeout();
let term = state.term();
let quorum = state.get_quorum(quorum);
state
.for_each_server(|c| async move {
c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout)
.await
})
.filter_map(|r| future::ready(r.ok()))
.map(Response::into_inner)
.filter(move |resp| future::ready(resp.term == term))
.take(quorum)
.count()
.map(move |t| t >= quorum)
.await
}

/// Prefetch, send fetch cluster request to the cluster and get the
/// config with the greatest quorum.
async fn pre_fetch(&self, state: &ClusterState) -> Option<FetchClusterResponse> {
let timeout = self.config.wait_synced_timeout();
let requests = state.for_each_server(|c| async move {
c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout)
.await
});
let responses: Vec<_> = requests
.filter_map(|r| future::ready(r.ok()))
.map(Response::into_inner)
.collect()
.await;
responses
.into_iter()
.filter(|resp| resp.leader_id.is_some())
.filter(|resp| !resp.members.is_empty())
.max_by(|x, y| x.term.cmp(&y.term))
}

/// Gets the member addresses to connect to
fn member_addrs(&self, resp: &FetchClusterResponse) -> HashMap<u64, Vec<String>> {
if self.config.is_raw_curp() {
resp.clone().into_peer_urls()
} else {
resp.clone().into_client_urls()
}
}

/// Connect to the given addrs
fn connect_to(
&self,
new_members: HashMap<u64, Vec<String>>,
) -> HashMap<u64, Arc<dyn ConnectApi>> {
new_members
.into_iter()
.map(|(id, addrs)| {
let tls_config = self.config.tls_config().cloned();
(id, rpc::connect(id, addrs, tls_config))
})
.collect()
}
}
12 changes: 12 additions & 0 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ mod retry;
/// State for clients
mod state;

#[allow(unused)]
/// State of the cluster
mod cluster_state;

#[allow(unused)]
/// Client cluster fetch implementation
mod fetch;

#[allow(unused)]
/// Config of the client
mod config;

/// Tests for client
#[cfg(test)]
mod tests;
Expand Down
15 changes: 13 additions & 2 deletions crates/curp/src/client/unary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use tonic::Response;
use tracing::{debug, warn};

use super::{
state::State, ClientApi, LeaderStateUpdate, ProposeIdGuard, ProposeResponse,
RepeatableClientApi,
cluster_state::ClusterState, config::Config, state::State, ClientApi, LeaderStateUpdate,
ProposeIdGuard, ProposeResponse, RepeatableClientApi,
};
use crate::{
members::ServerId,
Expand Down Expand Up @@ -64,6 +64,13 @@ pub(super) struct Unary<C: Command> {
last_sent_seq: AtomicU64,
/// marker
phantom: PhantomData<C>,

#[allow(dead_code)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not deadcode

/// Cluster state
cluster_state: RwLock<ClusterState>,
#[allow(dead_code)]
/// Cluster state
client_config: Config,
}

impl<C: Command> Unary<C> {
Expand All @@ -75,6 +82,10 @@ impl<C: Command> Unary<C> {
tracker: RwLock::new(Tracker::default()),
last_sent_seq: AtomicU64::new(0),
phantom: PhantomData,

// TODO: build cluster state
cluster_state: RwLock::default(),
client_config: Config::default(),
}
}

Expand Down
Loading