Skip to content

Commit

Permalink
chore: update log and the ordering of atomic operation
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Oct 18, 2023
1 parent effd507 commit 9a683bd
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 9 deletions.
8 changes: 4 additions & 4 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ where
Ok(resp) => resp.into_inner(),
Err(RpcError::StatusError(status)) => {
if is_wrong_cluster_version(&status) {
let cluster = self.fetch_cluster().await?;
let cluster = self.fetch_cluster(false).await?;
self.set_cluster(cluster).await?;
}
continue;
Expand Down Expand Up @@ -715,7 +715,7 @@ where
self.slow_path(Arc::clone(&cmd_arc)).await
};
let Some(res) = res_option else {
let cluster = self.fetch_cluster().await?;
let cluster = self.fetch_cluster(false).await?;
self.set_cluster(cluster).await?;
continue;
};
Expand Down Expand Up @@ -830,7 +830,7 @@ where
Ok(resp) => resp.into_inner(),
Err(RpcError::StatusError(status)) => {
if is_wrong_cluster_version(&status) {
let cluster = self.fetch_cluster().await?;
let cluster = self.fetch_cluster(false).await?;
self.set_cluster(cluster).await?;
}
continue;
Expand Down Expand Up @@ -888,7 +888,7 @@ where
Ok(resp) => resp.into_inner(),
Err(RpcError::StatusError(status)) => {
if is_wrong_cluster_version(&status) {
let cluster = self.fetch_cluster().await?;
let cluster = self.fetch_cluster(false).await?;
self.set_cluster(cluster).await?;
}
continue;
Expand Down
7 changes: 6 additions & 1 deletion curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,12 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {

/// Check cluster version and return new cluster
fn check_cluster_version(&self, client_cluster_version: u64) -> Result<(), CurpError> {
if client_cluster_version != self.curp.cluster().cluster_version() {
let server_cluster_version = self.curp.cluster().cluster_version();
if client_cluster_version != server_cluster_version {
debug!(
"client cluster version({}) and server cluster version({}) are not equal, return wrong cluster version error to client",
client_cluster_version, server_cluster_version
);
return Err(CurpError::WrongClusterVersion);
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
let (addrs, name, is_learner) = self.apply_conf_change(changes);
self.ctx
.last_conf_change_idx
.store(entry.index, Ordering::Relaxed);
.store(entry.index, Ordering::Release);
let _ig = log_w.fallback_contexts.insert(
entry.index,
FallbackContext::new(Arc::clone(&entry), addrs, name, is_learner),
Expand Down Expand Up @@ -807,7 +807,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
return false;
}
let match_index = self.lst.get_match_index(follower_id);
let last_conf_change_idx = self.ctx.last_conf_change_idx.load(Ordering::Relaxed);
let last_conf_change_idx = self.ctx.last_conf_change_idx.load(Ordering::Acquire);
if match_index >= last_conf_change_idx {
return true;
}
Expand Down
4 changes: 3 additions & 1 deletion curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ impl CurpGroup {
let leader_id = self.get_leader().await.0;
let mut connect = self.get_connect(&leader_id).await;
let cluster_res_base = connect
.fetch_cluster(tonic::Request::new(FetchClusterRequest {}))
.fetch_cluster(tonic::Request::new(FetchClusterRequest {
linearizable: false,
}))
.await
.unwrap()
.into_inner();
Expand Down
4 changes: 3 additions & 1 deletion curp/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,9 @@ async fn check_new_node(is_learner: bool) {
// 3. fetch and check cluster from new node
let mut new_connect = group.get_connect(&node_id).await;
let res = new_connect
.fetch_cluster(tonic::Request::new(FetchClusterRequest {}))
.fetch_cluster(tonic::Request::new(FetchClusterRequest {
linearizable: false,
}))
.await
.unwrap()
.into_inner();
Expand Down

0 comments on commit 9a683bd

Please sign in to comment.