Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add linearizable parameter in FetchClusterRequest #482

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion curp/proto/common
Submodule common updated 1 files
+2 −1 src/message.proto
29 changes: 22 additions & 7 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,14 @@ where
}

/// Send fetch cluster requests to all servers
/// Note: The fetched cluster may still be outdated
/// Note: The fetched cluster may still be outdated if `linearizable` is false
/// # Errors
/// `ProposeError::Timeout` if timeout
#[inline]
async fn fetch_cluster(&self) -> Result<FetchClusterResponse, ProposeError> {
async fn fetch_cluster(
&self,
linearizable: bool,
) -> Result<FetchClusterResponse, ProposeError> {
let mut retry_timeout = self.get_backoff();
let retry_count = *self.config.retry_count();
for _ in 0..retry_count {
Expand All @@ -523,7 +526,7 @@ where
(
connect.id(),
connect
.fetch_cluster(FetchClusterRequest::default(), timeout)
.fetch_cluster(FetchClusterRequest { linearizable }, timeout)
.await,
)
})
Expand All @@ -542,15 +545,20 @@ where
continue;
}
};

#[allow(clippy::integer_arithmetic)]
match max_term.cmp(&inner.term) {
Ordering::Less => {
max_term = inner.term;
res = Some(inner);
if !inner.members.is_empty() {
res = Some(inner);
}
ok_cnt = 1;
}
Ordering::Equal => {
res = Some(inner);
if !inner.members.is_empty() {
res = Some(inner);
}
ok_cnt += 1;
}
Ordering::Greater => {}
Expand Down Expand Up @@ -579,7 +587,7 @@ where
async fn fetch_leader(&self) -> Result<ServerId, ProposeError> {
let retry_count = *self.config.retry_count();
for _ in 0..retry_count {
let res = self.fetch_cluster().await?;
let res = self.fetch_cluster(false).await?;
if let Some(leader_id) = res.leader_id {
return Ok(leader_id);
}
Expand Down Expand Up @@ -814,11 +822,18 @@ where
#[inline]
pub async fn get_cluster_from_curp(
&self,
linearizable: bool,
) -> Result<FetchClusterResponse, CommandProposeError<C>> {
if linearizable {
return self
.fetch_cluster(true)
.await
.map_err(|e| CommandProposeError::Propose(e));
}
if let Ok(resp) = self.fetch_local_cluster().await {
return Ok(resp);
}
self.fetch_cluster()
self.fetch_cluster(false)
.await
.map_err(|e| CommandProposeError::Propose(e))
}
Expand Down
15 changes: 11 additions & 4 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,18 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
#[allow(clippy::unnecessary_wraps, clippy::needless_pass_by_value)] // To keep type consistent with other request handlers
pub(super) fn fetch_cluster(
&self,
_req: FetchClusterRequest,
req: FetchClusterRequest,
) -> Result<FetchClusterResponse, CurpError> {
let (leader_id, term) = self.curp.leader();
let (leader_id, term, is_leader) = self.curp.leader();
let cluster_id = self.curp.cluster().cluster_id();
let members = self.curp.cluster().all_members_vec();
let members = if is_leader || !req.linearizable {
self.curp.cluster().all_members_vec()
} else {
// if it is a follower and enabled linearizable read, return empty members
// the client will ignore empty members and retry util it gets response from
// the leader
Vec::new()
};
Ok(FetchClusterResponse::new(
leader_id, term, cluster_id, members,
))
Expand Down Expand Up @@ -712,7 +719,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
fn run_bg_tasks(
curp: Arc<RawCurp<C, RC>>,
storage: Arc<impl StorageApi<Command = C> + 'static>,
log_rx: tokio::sync::mpsc::UnboundedReceiver<Arc<LogEntry<C>>>,
log_rx: mpsc::UnboundedReceiver<Arc<LogEntry<C>>>,
) {
let shutdown_listener = curp.shutdown_listener();
let _election_task = tokio::spawn(Self::election_task(Arc::clone(&curp)));
Expand Down
5 changes: 3 additions & 2 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,8 +772,9 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
}

/// Get the leader id, attached with the term
pub(super) fn leader(&self) -> (Option<ServerId>, u64) {
self.st.map_read(|st_r| (st_r.leader_id, st_r.term))
pub(super) fn leader(&self) -> (Option<ServerId>, u64, bool) {
self.st
.map_read(|st_r| (st_r.leader_id, st_r.term, st_r.role == Role::Leader))
}

/// Get cluster info
Expand Down
4 changes: 2 additions & 2 deletions curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl CurpGroup {

let FetchClusterResponse {
leader_id, term, ..
} = if let Ok(resp) = client.fetch_cluster(FetchClusterRequest {}).await {
} = if let Ok(resp) = client.fetch_cluster(FetchClusterRequest::default()).await {
resp.into_inner()
} else {
continue;
Expand Down Expand Up @@ -271,7 +271,7 @@ impl CurpGroup {

let FetchClusterResponse {
leader_id, term, ..
} = if let Ok(resp) = client.fetch_cluster(FetchClusterRequest {}).await {
} = if let Ok(resp) = client.fetch_cluster(FetchClusterRequest::default()).await {
resp.into_inner()
} else {
continue;
Expand Down
2 changes: 1 addition & 1 deletion curp/tests/read_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn read_state() {
);
}

sleep_millis(100).await;
sleep_millis(500).await;

let res = get_client
.fetch_read_state(&TestCommand::new_get(vec![0]))
Expand Down
16 changes: 9 additions & 7 deletions simulation/src/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ impl CurpGroup {

let FetchClusterResponse {
leader_id, term, ..
} = if let Ok(resp) = client.fetch_cluster(FetchClusterRequest {}).await {
} = if let Ok(resp) = client.fetch_cluster(FetchClusterRequest::default()).await
{
resp.into_inner()
} else {
continue;
Expand Down Expand Up @@ -274,12 +275,13 @@ impl CurpGroup {
continue;
};

let FetchClusterResponse { term, .. } =
if let Ok(resp) = client.fetch_cluster(FetchClusterRequest {}).await {
resp.into_inner()
} else {
continue;
};
let FetchClusterResponse { term, .. } = if let Ok(resp) =
client.fetch_cluster(FetchClusterRequest::default()).await
{
resp.into_inner()
} else {
continue;
};

if let Some(max_term) = max_term {
assert_eq!(max_term, term);
Expand Down
4 changes: 2 additions & 2 deletions xline-test-utils/src/bin/validation_lock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ async fn main() -> Result<()> {
.lock_client();
match args.command {
Commands::Lock { name } => {
let lock_res = client.lock(LockRequest::new().with_name(name)).await?;
let lock_res = client.lock(LockRequest::new(name)).await?;
println!("{}", String::from_utf8_lossy(&lock_res.key))
}
Commands::Unlock { key } => {
let _unlock_res = client.unlock(UnlockRequest::new().with_key(key)).await?;
let _unlock_res = client.unlock(UnlockRequest::new(key)).await?;
println!("unlock success");
}
};
Expand Down
10 changes: 1 addition & 9 deletions xline/src/server/cluster_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,9 @@ impl Cluster for ClusterServer {
) -> Result<Response<MemberListResponse>, Status> {
let req = request.into_inner();
let header = self.header_gen.gen_header();
if req.linearizable {
let members = self.propose_conf_change(vec![]).await?;
let resp = MemberListResponse {
header: Some(header),
members,
};
return Ok(Response::new(resp));
}
let members = self
.client
.get_cluster_from_curp()
.get_cluster_from_curp(req.linearizable)
.await
.map_err(propose_err_to_status)?
.members;
Expand Down
2 changes: 1 addition & 1 deletion xline/src/server/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ where
) -> Result<tonic::Response<StatusResponse>, tonic::Status> {
let cluster = self
.client
.get_cluster_from_curp()
.get_cluster_from_curp(false)
.await
.map_err(propose_err_to_status)?;
let header = self.header_gen.gen_header();
Expand Down
15 changes: 5 additions & 10 deletions xline/tests/lock_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ async fn test_lock() -> Result<(), Box<dyn Error>> {
let lock_handle = tokio::spawn({
let c = lock_client.clone();
async move {
let res = c.lock(LockRequest::new().with_name("test")).await.unwrap();
let res = c.lock(LockRequest::new("test")).await.unwrap();
time::sleep(Duration::from_secs(3)).await;
let _res = c
.unlock(UnlockRequest::new().with_key(res.key))
.await
.unwrap();
let _res = c.unlock(UnlockRequest::new(res.key)).await.unwrap();
}
});

time::sleep(Duration::from_secs(1)).await;
let now = time::Instant::now();
let res = lock_client
.lock(LockRequest::new().with_name("test"))
.await?;
let res = lock_client.lock(LockRequest::new("test")).await?;
let elapsed = now.elapsed();
assert!(res.key.starts_with(b"test"));
assert!(elapsed >= Duration::from_secs(1));
Expand All @@ -57,12 +52,12 @@ async fn test_lock_timeout() -> Result<(), Box<dyn Error>> {
.await?
.id;
let _res = lock_client
.lock(LockRequest::new().with_name("test").with_lease(lease_id))
.lock(LockRequest::new("test").with_lease(lease_id))
.await?;

let res = timeout(
Duration::from_secs(3),
lock_client.lock(LockRequest::new().with_name("test")),
lock_client.lock(LockRequest::new("test")),
)
.await??;
assert!(res.key.starts_with(b"test"));
Expand Down