Skip to content

Commit

Permalink
refactor: refactor sync logic of curp
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Sep 5, 2023
1 parent 1b090e4 commit b13c21c
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 175 deletions.
5 changes: 3 additions & 2 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::time::timeout;
use tracing::{debug, instrument, warn};
use utils::{config::ClientTimeout, parking_lot_lock::RwLockMap};

use crate::rpc::connect::ConnectApiWrapper;
use crate::{
cmd::{Command, ProposeId},
error::{
Expand Down Expand Up @@ -40,7 +41,7 @@ pub struct Client<C: Command> {
state: RwLock<State>,
/// All servers's `Connect`
#[builder(setter(skip))]
connects: DashMap<ServerId, Arc<dyn ConnectApi>>,
connects: DashMap<ServerId, ConnectApiWrapper>,
/// Curp client timeout settings
timeout: ClientTimeout,
/// To keep Command type
Expand Down Expand Up @@ -689,7 +690,7 @@ where

/// Get the superquorum for curp protocol
/// Although curp can proceed with f + 1 available replicas, it needs f + 1 + (f + 1)/2 replicas
/// (for superquorum of witenesses) to use 1 RTT operations. With less than superquorum replicas,
/// (for superquorum of witnesses) to use 1 RTT operations. With less than superquorum replicas,
/// clients must ask masters to commit operations in f + 1 replicas before returning result.(2 RTTs).
#[inline]
fn superquorum(nodes: usize) -> usize {
Expand Down
61 changes: 58 additions & 3 deletions curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::Formatter;
use std::ops::Deref;
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};

use async_stream::stream;
Expand Down Expand Up @@ -32,7 +34,7 @@ const SNAPSHOT_CHUNK_SIZE: u64 = 64 * 1024;
/// Convert a vec of addr string to a vec of `Connect`
pub(crate) async fn connect(
addrs: HashMap<ServerId, String>,
) -> impl Iterator<Item = (ServerId, Arc<dyn ConnectApi>)> {
) -> impl Iterator<Item = (ServerId, ConnectApiWrapper)> {
futures::future::join_all(addrs.into_iter().map(|(id, mut addr)| async move {
// Addrs must start with "http" to communicate with the server
if !addr.starts_with("http://") {
Expand All @@ -48,11 +50,11 @@ pub(crate) async fn connect(
.into_iter()
.map(|(id, addr, conn)| {
debug!("successfully establish connection with {addr}");
let connect: Arc<dyn ConnectApi> = Arc::new(Connect {
let connect = ConnectApiWrapper::new_from_arc(Arc::new(Connect {
id,
rpc_connect: RwLock::new(conn),
addr,
});
}));
(id, connect)
})
}
Expand Down Expand Up @@ -127,6 +129,43 @@ pub(crate) trait ConnectApi: Send + Sync + 'static {
) -> Result<tonic::Response<FetchReadStateResponse>, RpcError>;
}

/// Connect Api Wrapper
/// The solution of [rustc bug](https://github.com/dtolnay/async-trait/issues/141#issuecomment-767978616)
#[derive(Clone)]
pub(crate) struct ConnectApiWrapper(Arc<dyn ConnectApi>);

impl ConnectApiWrapper {
/// Create a new `ConnectApiWrapper` from `id` and `addr`
pub(crate) async fn new(id: ServerId, addr: String) -> Self {
let connect = Connect::new(id, addr).await;
Self::new_from_arc(Arc::new(connect))
}

/// Create a new `ConnectApiWrapper` from `Arc<dyn ConnectApi>`
pub(crate) fn new_from_arc(connect: Arc<dyn ConnectApi>) -> Self {
Self(connect)
}

/// Converts `self` into inner `Arc<dyn ConnectApi>`
pub(crate) fn into_inner(self) -> Arc<dyn ConnectApi> {
self.0
}
}

impl Debug for ConnectApiWrapper {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectApiWrapper").finish()
}
}

impl Deref for ConnectApiWrapper {
type Target = Arc<dyn ConnectApi>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// The connection struct to hold the real rpc connections, it may failed to connect, but it also
/// retries the next time
#[derive(Debug)]
Expand All @@ -139,6 +178,22 @@ pub(crate) struct Connect {
addr: String,
}

impl Connect {
/// Create a new `Connect`
pub(crate) async fn new(id: ServerId, addr: String) -> Self {
let addr = if addr.starts_with("http://") {
addr
} else {
format!("http://{addr}")
};
Self {
id,
rpc_connect: RwLock::new(ProtocolClient::connect(addr.clone()).await),
addr,
}
}
}

#[async_trait]
impl ConnectApi for Connect {
/// Get server id
Expand Down
7 changes: 3 additions & 4 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use self::proto::{cmd_result::Result as CmdResultInner, CmdResult};
pub(crate) use self::proto::{
fetch_read_state_response::ReadState,
propose_conf_change_request::{ConfChange, ConfChangeType},
propose_error::ProposeError as PbProposeError,
propose_response::ExeResult,
protocol_server::Protocol,
sync_error::SyncError as PbSyncError,
wait_synced_response::{Success, SyncResult as SyncResultRaw},
AppendEntriesRequest, AppendEntriesResponse, Empty, FetchClusterRequest, FetchClusterResponse,
FetchReadStateRequest, FetchReadStateResponse, IdSet, InstallSnapshotRequest,
InstallSnapshotResponse, Member, ProposeConfChangeRequest, ProposeConfChangeResponse,
ProposeError as PbProposeErrorOuter, RedirectData, SyncError as PbSyncErrorOuter, VoteRequest,
VoteResponse, WaitSyncedRequest, WaitSyncedResponse,
InstallSnapshotResponse, Member, ProposeError as PbProposeErrorOuter, RedirectData,
SyncError as PbSyncErrorOuter, VoteRequest, VoteResponse, WaitSyncedRequest,
WaitSyncedResponse,
};
pub use self::proto::{
propose_conf_change_request::{ConfChange, ConfChangeType},
Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn cmd_worker<
}
EntryData::ConfChange(ref conf_change) => {
let changes = conf_change.changes().to_owned();
let res = curp.apply_conf_change(changes);
let res = curp.apply_conf_change(changes).await;
cb.write().insert_conf(entry.id(), res);
true
}
Expand Down
Loading

0 comments on commit b13c21c

Please sign in to comment.