Skip to content

Commit

Permalink
feat: discover cluster in shutdown propose_conf_change and fetch_read…
Browse files Browse the repository at this point in the history
…_state

Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Oct 16, 2023
1 parent 4d7b5ed commit 9038eb0
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 73 deletions.
2 changes: 1 addition & 1 deletion curp/proto/common
71 changes: 49 additions & 22 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ where
}

/// Set the information of current cluster
async fn set_cluster(&self, cluster: Cluster) {
async fn set_cluster(&self, cluster: Cluster) -> Result<(), tonic::transport::Error> {
debug!("update client by remote cluster: {cluster:?}");
self.state
.write()
Expand All @@ -418,20 +418,19 @@ where
.map(|m| (m.id, m.addrs))
.collect::<HashMap<ServerId, Vec<String>>>();
self.connects.clear();
// TODO
#[allow(clippy::unwrap_used)]
for (id, connect) in rpc::connect(member_addrs).await.unwrap() {
for (id, connect) in rpc::connect(member_addrs).await? {
let _ig = self.connects.insert(id, connect);
}
self.cluster_version.store(
cluster.cluster_version,
std::sync::atomic::Ordering::Relaxed,
);
Ok(())
}

/// The shutdown rpc of curp protocol
#[instrument(skip_all)]
pub async fn shutdown(&self) -> Result<(), ProposeError> {
pub async fn shutdown(&self, propose_id: ProposeId) -> Result<(), ProposeError> {
let mut retry_timeout = self.get_backoff();
let retry_count = *self.config.retry_count();
for _ in 0..retry_count {
Expand All @@ -443,16 +442,25 @@ where
}
};
debug!("shutdown request sent to {}", leader_id);

let resp = match self
.get_connect(leader_id)
.unwrap_or_else(|| unreachable!("leader {leader_id} not found"))
.shutdown(
ShutdownRequest::default(),
ShutdownRequest::new(propose_id.clone(), self.cluster_version()),
*self.config.wait_synced_timeout(),
)
.await
{
Ok(resp) => resp.into_inner(),
Err(RpcError::StatusError(status)) => {
if matches!(status.code(), tonic::Code::FailedPrecondition) {
if let Ok(cluster) = Cluster::decode(status.details()) {
self.set_cluster(cluster).await?;
};
}
continue;
}
Err(e) => {
warn!("shutdown rpc error: {e}");
tokio::time::sleep(retry_timeout.next_retry()).await;
Expand Down Expand Up @@ -522,6 +530,14 @@ where
}
resp
}
Err(RpcError::StatusError(status)) => {
if matches!(status.code(), tonic::Code::FailedPrecondition) {
if let Ok(cluster) = Cluster::decode(status.details()) {
self.set_cluster(cluster).await?;
};
}
continue;
}
Err(e) => {
// if the propose fails again, need to fetch the leader and try again
warn!("failed to resend propose, {e}");
Expand Down Expand Up @@ -716,8 +732,7 @@ where
let (fast_er, success) = match fast_result {
Ok(resp) => resp,
Err(CommandProposeError::Transfer(cluster)) => {
self.set_cluster(cluster).await;
return None;
return self.set_cluster(cluster).await.err().map(|e| Err(e.into()))
}
Err(e) => return Some(Err(e)),
};
Expand All @@ -730,8 +745,7 @@ where
let (asr, er) = match slow_round.await {
Ok(res) => res,
Err(CommandProposeError::Transfer(cluster)) => {
self.set_cluster(cluster).await;
return None;
return self.set_cluster(cluster).await.err().map(|e| Err(e.into()))
}
Err(e) => return Some(Err(e)),
};
Expand All @@ -742,16 +756,15 @@ where
futures::future::Either::Right((slow_result, fast_round)) => match slow_result {
Ok((asr, er)) => Some(Ok((er, Some(asr)))),
Err(CommandProposeError::Transfer(cluster)) => {
self.set_cluster(cluster).await;
None
self.set_cluster(cluster).await.err().map(|e| Err(e.into()))
}
Err(e) => {
if let Ok((Some(er), true)) = fast_round.await {
Some(Ok((er, None)))
} else {
Some(Err(e))
Err(err) => match fast_round.await {
Ok((Some(er), true)) => Some(Ok((er, None))),
Err(CommandProposeError::Transfer(cluster)) => {
self.set_cluster(cluster).await.err().map(|e| Err(e.into()))
}
}
_ => Some(Err(err)),
},
},
}
}
Expand All @@ -766,15 +779,13 @@ where
#[allow(clippy::integer_arithmetic)] // tokio framework triggers
let (fast_result, slow_result) = tokio::join!(fast_round, slow_round);
if let Err(CommandProposeError::Transfer(cluster)) = fast_result {
self.set_cluster(cluster).await;
return None;
return self.set_cluster(cluster).await.err().map(|e| Err(e.into()));
}

match slow_result {
Ok((asr, er)) => Some(Ok((er, Some(asr)))),
Err(CommandProposeError::Transfer(cluster)) => {
self.set_cluster(cluster).await;
None
self.set_cluster(cluster).await.err().map(|e| Err(e.into()))
}
Err(e) => Some(Err(e)),
}
Expand Down Expand Up @@ -813,6 +824,14 @@ where
.await
{
Ok(resp) => resp.into_inner(),
Err(RpcError::StatusError(status)) => {
if matches!(status.code(), tonic::Code::FailedPrecondition) {
if let Ok(cluster) = Cluster::decode(status.details()) {
self.set_cluster(cluster).await?;
};
}
continue;
}
Err(e) => {
warn!("wait synced rpc error: {e}");
tokio::time::sleep(retry_timeout).await;
Expand Down Expand Up @@ -864,6 +883,14 @@ where
.await
{
Ok(resp) => resp.into_inner(),
Err(RpcError::StatusError(status)) => {
if matches!(status.code(), tonic::Code::FailedPrecondition) {
if let Ok(cluster) = Cluster::decode(status.details()) {
self.set_cluster(cluster).await?;
};
}
continue;
}
Err(e) => {
warn!("fetch read state rpc error: {e}");
tokio::time::sleep(retry_timeout.next_retry()).await;
Expand Down
19 changes: 19 additions & 0 deletions curp/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ pub enum ProposeError {
/// Encode error
#[error("encode error: {0}")]
EncodeError(String),
/// Other error
#[error("other error: {0}")]
Other(String),
}

impl TryFrom<PbProposeError> for ProposeError {
Expand All @@ -113,6 +116,7 @@ impl TryFrom<PbProposeError> for ProposeError {
.into(),
),
PbProposeError::EncodeError(s) => ProposeError::EncodeError(s),
PbProposeError::Other(s) => ProposeError::Other(s),
})
}
}
Expand All @@ -130,6 +134,7 @@ impl From<ProposeError> for PbProposeErrorOuter {
wait_sync_error: Some(e.into()),
}),
ProposeError::EncodeError(s) => PbProposeError::EncodeError(s),
ProposeError::Other(s) => PbProposeError::Other(s),
};
PbProposeErrorOuter {
propose_error: Some(e),
Expand All @@ -144,6 +149,20 @@ impl From<PbSerializeError> for ProposeError {
}
}

impl From<tonic::transport::Error> for ProposeError {
#[inline]
fn from(value: tonic::transport::Error) -> Self {
ProposeError::Other(value.to_string())
}
}

impl<C: Command> From<tonic::transport::Error> for CommandProposeError<C> {
#[inline]
fn from(value: tonic::transport::Error) -> Self {
CommandProposeError::Propose(ProposeError::Other(value.to_string()))
}
}

impl PbCodec for ProposeError {
#[inline]
fn encode(&self) -> Vec<u8> {
Expand Down
9 changes: 2 additions & 7 deletions curp/src/log_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) enum EntryData<C> {
/// `ConfChange` entry
ConfChange(Box<ConfChangeEntry>), // Box to fix variant_size_differences
/// `Shutdown` entry
Shutdown,
Shutdown(ProposeId),
}

impl<C> From<ConfChangeEntry> for EntryData<C> {
Expand Down Expand Up @@ -69,12 +69,7 @@ where
match self.entry_data {
EntryData::Command(ref cmd) => cmd.id(),
EntryData::ConfChange(ref e) => e.id(),
EntryData::Shutdown => {
unreachable!(
"LogEntry::id() should not be called on {:?} entry",
self.entry_data
);
}
EntryData::Shutdown(ref id) => id,
}
}
}
10 changes: 10 additions & 0 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,16 @@ impl From<ProposeConfChangeRequest> for ConfChangeEntry {
}
}

impl ShutdownRequest {
/// Create a new shutdown request
pub(crate) fn new(propose_id: ProposeId, cluster_version: u64) -> Self {
Self {
propose_id,
cluster_version,
}
}
}

impl ShutdownResponse {
/// Create a new shutdown response
pub(crate) fn new(leader_id: Option<ServerId>, term: u64, error: Option<ProposeError>) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
Err(err) => Some(err),
}
}
EntryData::ConfChange(_) | EntryData::Shutdown => None,
EntryData::ConfChange(_) | EntryData::Shutdown(_) => None,
};
*exe_st = ExeState::Executing;
let task = Task {
Expand Down
4 changes: 2 additions & 2 deletions curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async fn worker_exe<
);
er_ok
}
EntryData::ConfChange(_) | EntryData::Shutdown => true,
EntryData::ConfChange(_) | EntryData::Shutdown(_) => true,
}
}

Expand Down Expand Up @@ -152,7 +152,7 @@ async fn worker_as<
debug!("{id} cmd({}) after sync is called", entry.id());
asr_ok
}
EntryData::Shutdown => {
EntryData::Shutdown(_) => {
curp.enter_shutdown();
if let Err(e) = ce.set_last_applied(entry.index) {
error!("failed to set last_applied, {e}");
Expand Down
Loading

0 comments on commit 9038eb0

Please sign in to comment.