From 9a683bdd4a86b4f92ce4ffc694b456ccac4934bc Mon Sep 17 00:00:00 2001 From: themanforfree Date: Wed, 18 Oct 2023 18:38:21 +0800 Subject: [PATCH] chore: update log and the ordering of atomic operation Signed-off-by: themanforfree --- curp/src/client.rs | 8 ++++---- curp/src/server/curp_node.rs | 7 ++++++- curp/src/server/raw_curp/mod.rs | 4 ++-- curp/tests/common/curp_group.rs | 4 +++- curp/tests/server.rs | 4 +++- 5 files changed, 18 insertions(+), 9 deletions(-) diff --git a/curp/src/client.rs b/curp/src/client.rs index 5e05c9d92..58c5452a0 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -455,7 +455,7 @@ where Ok(resp) => resp.into_inner(), Err(RpcError::StatusError(status)) => { if is_wrong_cluster_version(&status) { - let cluster = self.fetch_cluster().await?; + let cluster = self.fetch_cluster(false).await?; self.set_cluster(cluster).await?; } continue; @@ -715,7 +715,7 @@ where self.slow_path(Arc::clone(&cmd_arc)).await }; let Some(res) = res_option else { - let cluster = self.fetch_cluster().await?; + let cluster = self.fetch_cluster(false).await?; self.set_cluster(cluster).await?; continue; }; @@ -830,7 +830,7 @@ where Ok(resp) => resp.into_inner(), Err(RpcError::StatusError(status)) => { if is_wrong_cluster_version(&status) { - let cluster = self.fetch_cluster().await?; + let cluster = self.fetch_cluster(false).await?; self.set_cluster(cluster).await?; } continue; @@ -888,7 +888,7 @@ where Ok(resp) => resp.into_inner(), Err(RpcError::StatusError(status)) => { if is_wrong_cluster_version(&status) { - let cluster = self.fetch_cluster().await?; + let cluster = self.fetch_cluster(false).await?; self.set_cluster(cluster).await?; } continue; diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index 2c6237e1c..9d32b8b69 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -873,7 +873,12 @@ impl CurpNode { /// Check cluster version and return new cluster fn check_cluster_version(&self, client_cluster_version: u64) -> Result<(), CurpError> { - if client_cluster_version != self.curp.cluster().cluster_version() { + let server_cluster_version = self.curp.cluster().cluster_version(); + if client_cluster_version != server_cluster_version { + debug!( + "client cluster version({}) and server cluster version({}) are not equal, return wrong cluster version error to client", + client_cluster_version, server_cluster_version + ); return Err(CurpError::WrongClusterVersion); } Ok(()) diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index 104859974..8c56ad892 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -344,7 +344,7 @@ impl RawCurp { let (addrs, name, is_learner) = self.apply_conf_change(changes); self.ctx .last_conf_change_idx - .store(entry.index, Ordering::Relaxed); + .store(entry.index, Ordering::Release); let _ig = log_w.fallback_contexts.insert( entry.index, FallbackContext::new(Arc::clone(&entry), addrs, name, is_learner), @@ -807,7 +807,7 @@ impl RawCurp { return false; } let match_index = self.lst.get_match_index(follower_id); - let last_conf_change_idx = self.ctx.last_conf_change_idx.load(Ordering::Relaxed); + let last_conf_change_idx = self.ctx.last_conf_change_idx.load(Ordering::Acquire); if match_index >= last_conf_change_idx { return true; } diff --git a/curp/tests/common/curp_group.rs b/curp/tests/common/curp_group.rs index 322dcb19c..5dc508e16 100644 --- a/curp/tests/common/curp_group.rs +++ b/curp/tests/common/curp_group.rs @@ -307,7 +307,9 @@ impl CurpGroup { let leader_id = self.get_leader().await.0; let mut connect = self.get_connect(&leader_id).await; let cluster_res_base = connect - .fetch_cluster(tonic::Request::new(FetchClusterRequest {})) + .fetch_cluster(tonic::Request::new(FetchClusterRequest { + linearizable: false, + })) .await .unwrap() .into_inner(); diff --git a/curp/tests/server.rs b/curp/tests/server.rs index d72cf7dbd..5760f1ba9 100644 --- a/curp/tests/server.rs +++ b/curp/tests/server.rs @@ -514,7 +514,9 @@ async fn check_new_node(is_learner: bool) { // 3. fetch and check cluster from new node let mut new_connect = group.get_connect(&node_id).await; let res = new_connect - .fetch_cluster(tonic::Request::new(FetchClusterRequest {})) + .fetch_cluster(tonic::Request::new(FetchClusterRequest { + linearizable: false, + })) .await .unwrap() .into_inner();