diff --git a/Cargo.lock b/Cargo.lock index 3fbd2283f..717901bee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,7 +630,6 @@ dependencies = [ "curp-external-api", "curp-test-utils", "dashmap", - "derive_builder", "engine", "event-listener", "flume", diff --git a/curp/Cargo.toml b/curp/Cargo.toml index 1c13eb930..a88f6d362 100644 --- a/curp/Cargo.toml +++ b/curp/Cargo.toml @@ -24,7 +24,6 @@ clippy-utilities = "0.2.0" curp-external-api = { path = "../curp-external-api" } curp-test-utils = { path = "../curp-test-utils" } dashmap = "5.5.0" -derive_builder = "0.12.0" engine = { path = "../engine" } event-listener = "2.5.2" flume = "0.10.14" diff --git a/curp/src/client.rs b/curp/src/client.rs index 35eb07ec5..1044cd372 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -31,35 +31,68 @@ use crate::{ }; /// Protocol client -#[derive(derive_builder::Builder)] -#[builder(build_fn(skip), name = "Builder")] +#[derive(Debug)] pub struct Client { + /// Inner + inner: Arc>, +} + +/// Client inner +struct ClientInner { /// local server id. Only use in an inner client. - #[builder(field(type = "Option"), setter(custom))] local_server_id: Option, /// Current leader and term - #[builder(setter(skip))] state: RwLock, + /// Current client id + client_id: Arc>, + /// Notify when a new client id is set + client_id_notifier: Arc, /// Request tracker - #[builder(setter(skip))] tracker: RwLock, /// Last sent sequence number - #[builder(setter(skip))] last_sent_seq: AtomicU64, /// All servers's `Connect` - #[builder(setter(skip))] connects: DashMap>, /// Cluster version - #[builder(setter(skip))] cluster_version: AtomicU64, /// Curp client config settings config: ClientConfig, /// To keep Command type - #[builder(setter(skip))] phantom: PhantomData, } -impl Builder { +/// Client builder +pub struct Builder { + /// Curp client config settings + config: Option, + /// local server id. Only use in an inner client. + local_server_id: Option, + /// To keep Command type + phantom: PhantomData, +} + +impl Default for Builder { + #[inline] + fn default() -> Self { + Self { + config: None, + local_server_id: None, + phantom: PhantomData, + } + } +} + +impl Debug for Builder { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Builder") + .field("config", &self.config) + .field("local_server_id", &self.local_server_id) + .finish() + } +} + +impl Builder { /// Set local server id. #[inline] pub fn local_server_id(&mut self, value: ServerId) -> &mut Self { @@ -67,6 +100,13 @@ impl Builder { self } + /// Set the client config + #[inline] + pub fn config(&mut self, config: ClientConfig) -> &mut Self { + self.config = Some(config); + self + } + /// Build client from all members /// # Errors /// Return error when meet rpc error or missing some arguments @@ -80,17 +120,20 @@ impl Builder { return Err(ClientBuildError::invalid_arguments("timeout is required")); }; let connects = rpc::connect(all_members).await?.collect(); - let client = Client:: { + let inner = Arc::new(ClientInner:: { local_server_id: self.local_server_id, state: RwLock::new(State::new(leader_id, 0)), + client_id: Arc::new(tokio::sync::RwLock::new(0)), + client_id_notifier: Arc::new(Event::new()), tracker: RwLock::new(Tracker::default()), config, connects, cluster_version: AtomicU64::new(0), phantom: PhantomData, last_sent_seq: AtomicU64::new(0), - }; - Ok(client) + }); + let _ig = tokio::spawn(Client::client_lease_keep_alive(Arc::clone(&inner))); + Ok(Client { inner }) } /// Fetch cluster from server, return the first `FetchClusterResponse` @@ -140,21 +183,24 @@ impl Builder { let res: FetchClusterResponse = self .fast_fetch_cluster(addrs.clone(), *config.propose_timeout()) .await?; - let client = Client:: { + let inner = Arc::new(ClientInner:: { local_server_id: self.local_server_id, state: RwLock::new(State::new(res.leader_id, res.term)), + client_id: Arc::new(tokio::sync::RwLock::new(0)), + client_id_notifier: Arc::new(Event::new()), tracker: RwLock::new(Tracker::default()), config, cluster_version: AtomicU64::new(res.cluster_version), connects: rpc::connect(res.into_members_addrs()).await?.collect(), phantom: PhantomData, last_sent_seq: AtomicU64::new(0), - }; - Ok(client) + }); + let _ig = tokio::spawn(Client::client_lease_keep_alive(Arc::clone(&inner))); + Ok(Client { inner }) } } -impl Debug for Client { +impl Debug for ClientInner { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Client") @@ -297,6 +343,127 @@ where Builder::default() } + /// The shutdown rpc of curp protocol + #[instrument(skip_all)] + pub async fn shutdown(&self, propose_id: ProposeId) -> Result<(), ClientError> { + self.inner.shutdown(propose_id).await + } + + /// Get leader id from the state or fetch it from servers + /// # Errors + /// `ClientError::Timeout` if timeout + #[inline] + pub async fn get_leader_id(&self) -> Result> { + self.inner.get_leader_id().await + } + + /// Propose the request to servers, if use_fast_path is false, it will wait for the synced index + /// # Errors + /// `ClientError::Execute` if execution error is met + /// `ClientError::AfterSync` error met while syncing logs to followers + /// # Panics + /// If leader index is out of bound of all the connections, panic + #[inline] + #[instrument(skip_all, fields(cmd_id=%cmd.id()))] + #[allow(clippy::type_complexity)] // This type is not complex + pub async fn propose( + &self, + cmd: C, + use_fast_path: bool, + ) -> Result<(C::ER, Option), ClientError> { + self.inner.propose(cmd, use_fast_path).await + } + + /// Propose the conf change request to servers + #[instrument(skip_all)] + pub async fn propose_conf_change( + &self, + propose_id: ProposeId, + changes: Vec, + ) -> Result, ConfChangeError>, ClientError> { + self.inner.propose_conf_change(propose_id, changes).await + } + + /// Fetch Read state from leader + /// # Errors + /// `ClientError::EncodingError` encoding error met while deserializing the propose id + #[inline] + pub async fn fetch_read_state(&self, cmd: &C) -> Result> { + self.inner.fetch_read_state(cmd).await + } + + /// Fetch the current leader id without cache + /// # Errors + /// `ClientError::Timeout` if timeout + #[inline] + pub async fn get_leader_id_from_curp(&self) -> Result> { + self.inner.get_leader_id_from_curp().await + } + + /// Fetch the current cluster without cache + /// # Errors + /// `ClientError::Timeout` if timeout + #[inline] + pub async fn get_cluster_from_curp( + &self, + linearizable: bool, + ) -> Result> { + self.inner.get_cluster_from_curp(linearizable).await + } + + /// Generate a propose id + /// + /// # Errors + /// `ClientError::Timeout` if timeout + #[inline] + pub async fn gen_propose_id(&self) -> Result> { + self.inner.gen_propose_id().await + } + + /// Client lease keep alive background task + async fn client_lease_keep_alive(inner: Arc>) { + loop { + let leader_id = match inner.get_leader_id().await { + Ok(id) => id, + Err(e) => { + warn!("failed to fetch leader, {e}"); + tokio::time::sleep(*inner.config.max_retry_timeout()).await; + continue; + } + }; + let connect = inner + .get_connect(leader_id) + .unwrap_or_else(|| unreachable!("leader {leader_id} not found")); + let (new_leader_id, term, cluster_shutdown) = match connect + .lease_keep_alive( + Arc::clone(&inner.client_id), + Arc::clone(&inner.client_id_notifier), + ) + .await + { + Ok(resp) => resp, + Err(e) => { + warn!("client_lease_keep_alive rpc error: {e}"); + tokio::time::sleep(*inner.config.max_retry_timeout()).await; + continue; + } + }; + if cluster_shutdown { + debug!("cluster shutdown, close client lease keep alive task"); + return; + } + inner + .state + .write() + .check_and_update(Some(new_leader_id), term); + } + } +} + +impl ClientInner +where + C: Command + 'static, +{ /// Get cluster version fn cluster_version(&self) -> u64 { self.cluster_version @@ -477,8 +644,7 @@ where } /// The shutdown rpc of curp protocol - #[instrument(skip_all)] - pub async fn shutdown(&self, propose_id: ProposeId) -> Result<(), ClientError> { + async fn shutdown(&self, propose_id: ProposeId) -> Result<(), ClientError> { let ProposeId(_, seq_num) = propose_id; let _ig = self.tracker.write().record(seq_num); let mut retry_timeout = self.get_backoff(); @@ -639,7 +805,6 @@ where /// Note: The fetched cluster may still be outdated if `linearizable` is false /// # Errors /// `ClientError::Timeout` if timeout - #[inline] async fn fetch_cluster( &self, linearizable: bool, @@ -730,8 +895,7 @@ where /// Get leader id from the state or fetch it from servers /// # Errors /// `ClientError::Timeout` if timeout - #[inline] - pub async fn get_leader_id(&self) -> Result> { + async fn get_leader_id(&self) -> Result> { let notify = Arc::clone(&self.state.read().leader_notify); let mut retry_timeout = self.get_backoff(); let retry_count = *self.config.retry_count(); @@ -749,16 +913,13 @@ where Err(ClientError::Timeout) } - /// Propose the request to servers, if use_fast_path is false, it will wait for the synced index + /// Propose the request to servers, if `use_fast_path` is false, it will wait for the synced index /// # Errors /// `ClientError::Execute` if execution error is met /// `ClientError::AfterSync` error met while syncing logs to followers /// # Panics /// If leader index is out of bound of all the connections, panic - #[inline] - #[instrument(skip_all, fields(cmd_id=%cmd.id()))] - #[allow(clippy::type_complexity)] // This type is not complex - pub async fn propose( + async fn propose( &self, cmd: C, use_fast_path: bool, @@ -853,8 +1014,7 @@ where } /// Propose the conf change request to servers - #[instrument(skip_all)] - pub async fn propose_conf_change( + async fn propose_conf_change( &self, propose_id: ProposeId, changes: Vec, @@ -933,8 +1093,7 @@ where /// Fetch Read state from leader /// # Errors /// `ClientError::EncodingError` encoding error met while deserializing the propose id - #[inline] - pub async fn fetch_read_state(&self, cmd: &C) -> Result> { + async fn fetch_read_state(&self, cmd: &C) -> Result> { let mut retry_timeout = self.get_backoff(); let retry_count = *self.config.retry_count(); for _ in 0..retry_count { @@ -983,7 +1142,6 @@ where /// Note that this method should not be invoked by an outside client because /// we will fallback to fetch the full cluster for the response if fetching local /// failed. - #[inline] async fn fetch_local_cluster(&self) -> Result> { if let Some(local_server) = self.local_server_id { let resp = self @@ -1004,8 +1162,7 @@ where /// Fetch the current leader id without cache /// # Errors /// `ClientError::Timeout` if timeout - #[inline] - pub async fn get_leader_id_from_curp(&self) -> Result> { + async fn get_leader_id_from_curp(&self) -> Result> { if let Ok(FetchClusterResponse { leader_id: Some(leader_id), .. @@ -1019,8 +1176,7 @@ where /// Fetch the current cluster without cache /// # Errors /// `ClientError::Timeout` if timeout - #[inline] - pub async fn get_cluster_from_curp( + async fn get_cluster_from_curp( &self, linearizable: bool, ) -> Result> { @@ -1050,7 +1206,16 @@ where /// `ClientError::Timeout` if timeout #[allow(clippy::unused_async)] // TODO: grant a client id from server async fn get_client_id(&self) -> Result> { - Ok(rand::random()) + let mut retry_timeout = self.get_backoff(); + let retry_count = *self.config.retry_count(); + for _ in 0..retry_count { + let client_id = *self.client_id.read().await; + if client_id != 0 { + return Ok(client_id); + } + let _ig = timeout(retry_timeout.next_retry(), self.client_id_notifier.listen()).await; + } + Err(ClientError::Timeout) } /// New a seq num and record it @@ -1063,8 +1228,7 @@ where /// /// # Errors /// `ClientError::Timeout` if timeout - #[inline] - pub async fn gen_propose_id(&self) -> Result> { + async fn gen_propose_id(&self) -> Result> { let client_id = self.get_client_id().await?; let seq_num = self.new_seq_num(); Ok(ProposeId(client_id, seq_num)) diff --git a/curp/src/rpc/connect.rs b/curp/src/rpc/connect.rs index 0f748920b..485731ffa 100644 --- a/curp/src/rpc/connect.rs +++ b/curp/src/rpc/connect.rs @@ -11,12 +11,13 @@ use async_trait::async_trait; use bytes::BytesMut; use clippy_utilities::NumericCast; use engine::SnapshotApi; +use event_listener::Event; use futures::Stream; #[cfg(test)] use mockall::automock; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use tonic::transport::{Channel, Endpoint}; -use tracing::{error, instrument}; +use tracing::{error, instrument, debug}; use utils::tracing::Inject; use super::{ShutdownRequest, ShutdownResponse}; @@ -33,7 +34,7 @@ use crate::{ ProposeRequest, ProposeResponse, VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, }, - snapshot::Snapshot, + snapshot::Snapshot, ClientLeaseKeepAliveRequest, }; /// Install snapshot chunk size: 64KB @@ -152,6 +153,15 @@ pub(crate) trait ConnectApi: Send + Sync + 'static { request: FetchReadStateRequest, timeout: Duration, ) -> Result, tonic::Status>; + + /// Keep send lease keep alive to server and mutate the client id + /// Return RpcError if failed to get response + /// Return leader_id and term if leadership changed + async fn lease_keep_alive( + &self, + client_id: Arc>, + client_id_notifier: Arc, + ) -> Result<(ServerId, u64, bool), tonic::Status>; } /// Inner Connect interface among different servers @@ -350,6 +360,33 @@ impl ConnectApi for Connect> { req.set_timeout(timeout); client.fetch_read_state(req).await } + + /// Keep send lease keep alive to server and mutate the client id + /// Return RpcError if failed to get response + /// Return leader_id and term if leadership changed + async fn lease_keep_alive( + &self, + client_id: Arc>, + client_id_notifier: Arc, + ) -> Result<(ServerId, u64, bool), tonic::Status> { + let mut client = self.rpc_connect.clone(); + loop { + let stream = heartbeat_stream(Arc::clone(&client_id)); + let resp = client + .client_lease_keep_alive(stream) + .await? + .into_inner(); + if resp.cluster_shutdown { + return Ok((0, 0, true)); + } + if let Some(leader_id) = resp.leader_id { + return Ok((leader_id, resp.term, false)); + } + let mut client_id = client_id.write().await; + *client_id = resp.client_id; + client_id_notifier.notify(usize::MAX); + } + } } #[async_trait] @@ -442,6 +479,31 @@ fn install_snapshot_stream( } } +/// Generate heartbeat stream +fn heartbeat_stream( + client_id: Arc>, +) -> impl Stream { + /// Keep alive interval + const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1); + + let mut ticker = tokio::time::interval(HEARTBEAT_INTERVAL); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + stream! { + loop { + _ = ticker.tick().await; + let id = *client_id.read().await; + if id == 0 { + debug!("grant a client id"); + yield ClientLeaseKeepAliveRequest::grant(); + } else { + debug!("keep alive the client id({id})"); + yield ClientLeaseKeepAliveRequest::keep_alive(id); + } + } + } +} + #[cfg(test)] mod tests { use bytes::Bytes; diff --git a/xline-client/src/clients/auth.rs b/xline-client/src/clients/auth.rs index e376a09c2..e20b3d6a8 100644 --- a/xline-client/src/clients/auth.rs +++ b/xline-client/src/clients/auth.rs @@ -41,6 +41,7 @@ pub struct AuthClient { impl AuthClient { /// Creates a new `AuthClient` #[inline] + #[must_use] pub fn new( curp_client: Arc>, channel: Channel,