Skip to content

Commit

Permalink
refactor(client)!: ClusterClient::*
Browse files Browse the repository at this point in the history
Signed-off-by: lxl66566 <[email protected]>

refactor(client)!: ClusterClient::member_remove

Signed-off-by: lxl66566 <[email protected]>

refactor(client)!: ClusterClient::member_promote

Signed-off-by: lxl66566 <[email protected]>

refactor(client)!: ClusterClient::member_update

Signed-off-by: lxl66566 <[email protected]>

refactor(client)!: ClusterClient::member_list

Signed-off-by: lxl66566 <[email protected]>

refactor(client)!: modify code in simulation

Signed-off-by: lxl66566 <[email protected]>

fix: ci fail

Signed-off-by: lxl66566 <[email protected]>
  • Loading branch information
lxl66566 committed Aug 15, 2024
1 parent 8852084 commit 4bc9bc1
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 284 deletions.
25 changes: 16 additions & 9 deletions crates/simulation/src/xline_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use xline::server::XlineServer;
use xline_client::{
error::XlineClientError,
types::{
cluster::{MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse},
kv::{CompactionResponse, PutOptions, PutResponse, RangeOptions, RangeResponse},
watch::{WatchOptions, WatchStreaming, Watcher},
},
Client, ClientOptions,
};
use xlineapi::{command::Command, ClusterClient, KvClient, RequestUnion, WatchClient};
use xlineapi::{
command::Command, ClusterClient, KvClient, MemberAddResponse, MemberListResponse, RequestUnion,
WatchClient,
};

pub struct XlineNode {
pub client_url: String,
Expand Down Expand Up @@ -340,15 +342,20 @@ impl SimEtcdClient {
.unwrap()
}

pub async fn member_add(
&self,
request: MemberAddRequest,
pub async fn member_add<I: Into<String>>(
&mut self,
peer_urls: impl Into<Vec<I>>,
is_learner: bool,
) -> Result<MemberAddResponse, XlineClientError<Command>> {
let mut client = self.cluster.clone();
let peer_urls: Vec<String> = peer_urls.into().into_iter().map(Into::into).collect();
self.handle
.spawn(async move {
client
.member_add(xlineapi::MemberAddRequest::from(request))
.member_add(xlineapi::MemberAddRequest {
peer_ur_ls: peer_urls,
is_learner,
})
.await
.map(|r| r.into_inner())
.map_err(Into::into)
Expand All @@ -358,14 +365,14 @@ impl SimEtcdClient {
}

pub async fn member_list(
&self,
request: MemberListRequest,
&mut self,
linearizable: bool,
) -> Result<MemberListResponse, XlineClientError<Command>> {
let mut client = self.cluster.clone();
self.handle
.spawn(async move {
client
.member_list(xlineapi::MemberListRequest::from(request))
.member_list(xlineapi::MemberListRequest { linearizable })
.await
.map(|r| r.into_inner())
.map_err(Into::into)
Expand Down
22 changes: 5 additions & 17 deletions crates/simulation/tests/it/xline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use std::time::Duration;
use curp_test_utils::init_logger;
use madsim::time::sleep;
use simulation::xline_group::{SimEtcdClient, XlineGroup};
use xline_client::types::{
cluster::{MemberAddRequest, MemberListRequest},
watch::WatchOptions,
};
use xline_client::types::watch::WatchOptions;

// TODO: Add more tests if needed

Expand Down Expand Up @@ -49,29 +46,20 @@ async fn xline_members_restore() {
let mut group = XlineGroup::new(3).await;
let node = group.get_node("S1");
let addr = node.client_url.clone();
let client = SimEtcdClient::new(addr, group.client_handle.clone()).await;
let mut client = SimEtcdClient::new(addr, group.client_handle.clone()).await;

let res = client
.member_add(MemberAddRequest::new(
vec!["http://192.168.1.4:12345".to_owned()],
true,
))
.member_add(["http://192.168.1.4:12345"], true)
.await
.unwrap();
assert_eq!(res.members.len(), 4);
let members = client
.member_list(MemberListRequest::new(false))
.await
.unwrap();
let members = client.member_list(false).await.unwrap();
assert_eq!(members.members.len(), 4);
group.crash("S1").await;
sleep(Duration::from_secs(10)).await;

group.restart("S1").await;
sleep(Duration::from_secs(10)).await;
let members = client
.member_list(MemberListRequest::new(false))
.await
.unwrap();
let members = client.member_list(false).await.unwrap();
assert_eq!(members.members.len(), 4);
}
30 changes: 6 additions & 24 deletions crates/xline-client/examples/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
use anyhow::Result;
use xline_client::{
types::cluster::{
MemberAddRequest, MemberListRequest, MemberPromoteRequest, MemberRemoveRequest,
MemberUpdateRequest,
},
Client, ClientOptions,
};
use xline_client::{Client, ClientOptions};

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -17,44 +11,32 @@ async fn main() -> Result<()> {
.cluster_client();

// send a linearizable member list request
let resp = client.member_list(MemberListRequest::new(true)).await?;
let resp = client.member_list(true).await?;
println!("members: {:?}", resp.members);

// whether the added member is a learner.
// the learner does not participate in voting and will only catch up with the progress of the leader.
let is_learner = true;

// add a normal node into the cluster
let resp = client
.member_add(MemberAddRequest::new(
vec!["127.0.0.1:2379".to_owned()],
is_learner,
))
.await?;
let resp = client.member_add(["127.0.0.1:2379"], is_learner).await?;
let added_member = resp.member.unwrap();
println!("members: {:?}, added: {}", resp.members, added_member.id);

if is_learner {
// promote the learner to a normal node
let resp = client
.member_promote(MemberPromoteRequest::new(added_member.id))
.await?;
let resp = client.member_promote(added_member.id).await?;
println!("members: {:?}", resp.members);
}

// update the peer_ur_ls of the added member if the network topology has changed.
let resp = client
.member_update(MemberUpdateRequest::new(
added_member.id,
vec!["127.0.0.2:2379".to_owned()],
))
.member_update(added_member.id, ["127.0.0.2:2379"])
.await?;
println!("members: {:?}", resp.members);

// remove the member from the cluster if it is no longer needed.
let resp = client
.member_remove(MemberRemoveRequest::new(added_member.id))
.await?;
let resp = client.member_remove(added_member.id).await?;
println!("members: {:?}", resp.members);

Ok(())
Expand Down
66 changes: 31 additions & 35 deletions crates/xline-client/src/clients/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ use std::sync::Arc;

use tonic::transport::Channel;

use crate::{
error::Result,
types::cluster::{
MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
MemberPromoteRequest, MemberPromoteResponse, MemberRemoveRequest, MemberRemoveResponse,
MemberUpdateRequest, MemberUpdateResponse,
},
AuthService,
use crate::{error::Result, AuthService};
use xlineapi::{
MemberAddResponse, MemberListResponse, MemberPromoteResponse, MemberRemoveResponse,
MemberUpdateResponse,
};

/// Client for Cluster operations.
Expand Down Expand Up @@ -47,7 +43,6 @@ impl ClusterClient {
///
/// ```no_run
/// use xline_client::{Client, ClientOptions};
/// use xline_client::types::cluster::*;
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -58,7 +53,7 @@ impl ClusterClient {
/// .await?
/// .cluster_client();
///
/// let resp = client.member_add(MemberAddRequest::new(vec!["127.0.0.1:2380".to_owned()], true)).await?;
/// let resp = client.member_add(["127.0.0.1:2380"], true).await?;
///
/// println!(
/// "members: {:?}, added: {:?}",
Expand All @@ -69,10 +64,17 @@ impl ClusterClient {
/// }
/// ```
#[inline]
pub async fn member_add(&mut self, request: MemberAddRequest) -> Result<MemberAddResponse> {
pub async fn member_add<I: Into<String>>(
&mut self,
peer_urls: impl Into<Vec<I>>,
is_learner: bool,
) -> Result<MemberAddResponse> {
Ok(self
.inner
.member_add(xlineapi::MemberAddRequest::from(request))
.member_add(xlineapi::MemberAddRequest {
peer_ur_ls: peer_urls.into().into_iter().map(Into::into).collect(),
is_learner,
})
.await?
.into_inner())
}
Expand All @@ -87,7 +89,6 @@ impl ClusterClient {
///
/// ```no_run
/// use xline_client::{Client, ClientOptions};
/// use xline_client::types::cluster::*;
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -97,21 +98,18 @@ impl ClusterClient {
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
/// let resp = client.member_remove(MemberRemoveRequest::new(1)).await?;
/// let resp = client.member_remove(1).await?;
///
/// println!("members: {:?}", resp.members);
///
/// Ok(())
/// }
///
#[inline]
pub async fn member_remove(
&mut self,
request: MemberRemoveRequest,
) -> Result<MemberRemoveResponse> {
pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
Ok(self
.inner
.member_remove(xlineapi::MemberRemoveRequest::from(request))
.member_remove(xlineapi::MemberRemoveRequest { id })
.await?
.into_inner())
}
Expand All @@ -126,7 +124,6 @@ impl ClusterClient {
///
/// ```no_run
/// use xline_client::{Client, ClientOptions};
/// use xline_client::types::cluster::*;
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -136,21 +133,18 @@ impl ClusterClient {
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
/// let resp = client.member_promote(MemberPromoteRequest::new(1)).await?;
/// let resp = client.member_promote(1).await?;
///
/// println!("members: {:?}", resp.members);
///
/// Ok(())
/// }
///
#[inline]
pub async fn member_promote(
&mut self,
request: MemberPromoteRequest,
) -> Result<MemberPromoteResponse> {
pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
Ok(self
.inner
.member_promote(xlineapi::MemberPromoteRequest::from(request))
.member_promote(xlineapi::MemberPromoteRequest { id })
.await?
.into_inner())
}
Expand All @@ -165,7 +159,6 @@ impl ClusterClient {
///
/// ```no_run
/// use xline_client::{Client, ClientOptions};
/// use xline_client::types::cluster::*;
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -175,21 +168,25 @@ impl ClusterClient {
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
/// let resp = client.member_update(MemberUpdateRequest::new(1, vec!["127.0.0.1:2379".to_owned()])).await?;
/// let resp = client.member_update(1, ["127.0.0.1:2379"]).await?;
///
/// println!("members: {:?}", resp.members);
///
/// Ok(())
/// }
///
#[inline]
pub async fn member_update(
pub async fn member_update<I: Into<String>>(
&mut self,
request: MemberUpdateRequest,
id: u64,
peer_urls: impl Into<Vec<I>>,
) -> Result<MemberUpdateResponse> {
Ok(self
.inner
.member_update(xlineapi::MemberUpdateRequest::from(request))
.member_update(xlineapi::MemberUpdateRequest {
id,
peer_ur_ls: peer_urls.into().into_iter().map(Into::into).collect(),
})
.await?
.into_inner())
}
Expand All @@ -204,7 +201,6 @@ impl ClusterClient {
///
/// ```no_run
/// use xline_client::{Client, ClientOptions};
/// use xline_client::types::cluster::*;
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -214,17 +210,17 @@ impl ClusterClient {
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
/// let resp = client.member_list(MemberListRequest::new(false)).await?;
/// let resp = client.member_list(false).await?;
///
/// println!("members: {:?}", resp.members);
///
/// Ok(())
/// }
#[inline]
pub async fn member_list(&mut self, request: MemberListRequest) -> Result<MemberListResponse> {
pub async fn member_list(&mut self, linearizable: bool) -> Result<MemberListResponse> {
Ok(self
.inner
.member_list(xlineapi::MemberListRequest::from(request))
.member_list(xlineapi::MemberListRequest { linearizable })
.await?
.into_inner())
}
Expand Down
Loading

0 comments on commit 4bc9bc1

Please sign in to comment.