Skip to content

Commit

Permalink
refactor(client): LeaseClient::*
Browse files Browse the repository at this point in the history
Signed-off-by: lxl66566 <[email protected]>

refactor(client): LeaseClient::revoke

Signed-off-by: lxl66566 <[email protected]>

refactor(client): LeaseClient::keep_alive

Signed-off-by: lxl66566 <[email protected]>

refactor(client): LeaseClient::time_to_live

Signed-off-by: lxl66566 <[email protected]>
  • Loading branch information
lxl66566 committed Jul 29, 2024
1 parent 5bf41d2 commit 8032dfe
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 269 deletions.
23 changes: 7 additions & 16 deletions crates/xline-client/examples/lease.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use anyhow::Result;
use xline_client::{
types::lease::{
LeaseGrantRequest, LeaseKeepAliveRequest, LeaseRevokeRequest, LeaseTimeToLiveRequest,
},
Client, ClientOptions,
};
use xline_client::{Client, ClientOptions};

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -16,24 +11,20 @@ async fn main() -> Result<()> {
.lease_client();

// grant new lease
let resp1 = client.grant(LeaseGrantRequest::new(60)).await?;
let resp2 = client.grant(LeaseGrantRequest::new(60)).await?;
let resp1 = client.grant(60, None).await?;
let resp2 = client.grant(60, None).await?;
let lease_id1 = resp1.id;
let lease_id2 = resp2.id;
println!("lease id 1: {}", lease_id1);
println!("lease id 2: {}", lease_id2);

// get the ttl of lease1
let resp = client
.time_to_live(LeaseTimeToLiveRequest::new(lease_id1))
.await?;
let resp = client.time_to_live(lease_id1, false).await?;

println!("remaining ttl: {}", resp.ttl);

// keep alive lease2
let (mut keeper, mut stream) = client
.keep_alive(LeaseKeepAliveRequest::new(lease_id2))
.await?;
let (mut keeper, mut stream) = client.keep_alive(lease_id2).await?;

if let Some(resp) = stream.message().await? {
println!("new ttl: {}", resp.ttl);
Expand All @@ -48,8 +39,8 @@ async fn main() -> Result<()> {
}

// revoke the leases
let _resp = client.revoke(LeaseRevokeRequest::new(lease_id1)).await?;
let _resp = client.revoke(LeaseRevokeRequest::new(lease_id2)).await?;
let _resp = client.revoke(lease_id1).await?;
let _resp = client.revoke(lease_id2).await?;

Ok(())
}
66 changes: 38 additions & 28 deletions crates/xline-client/src/clients/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use xlineapi::{
use crate::{
error::{Result, XlineClientError},
lease_gen::LeaseIdGenerator,
types::lease::{
LeaseGrantRequest, LeaseKeepAliveRequest, LeaseKeeper, LeaseRevokeRequest,
LeaseTimeToLiveRequest,
},
types::lease::LeaseKeeper,
AuthService, CurpClient,
};

Expand Down Expand Up @@ -70,14 +67,17 @@ impl LeaseClient {
/// within a given time to live period. All keys attached to the lease will be expired and
/// deleted if the lease expires. Each expired key generates a delete event in the event history.
///
/// `ttl` is the advisory time-to-live in seconds. Expired lease will return -1.
/// `id` is the requested ID for the lease. If ID is set to `None` or 0, the lessor chooses an ID.
///
/// # Errors
///
/// This function will return an error if the inner CURP client encountered a propose failure
///
/// # Examples
///
/// ```no_run
/// use xline_client::{types::lease::LeaseGrantRequest, Client, ClientOptions};
/// use xline_client::{Client, ClientOptions};
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -88,19 +88,22 @@ impl LeaseClient {
/// .await?
/// .lease_client();
///
/// let resp = client.grant(LeaseGrantRequest::new(60)).await?;
/// let resp = client.grant(60, None).await?;
/// println!("lease id: {}", resp.id);
///
/// Ok(())
/// }
/// ```
#[inline]
pub async fn grant(&self, mut request: LeaseGrantRequest) -> Result<LeaseGrantResponse> {
if request.inner.id == 0 {
request.inner.id = self.id_gen.next();
pub async fn grant(&self, ttl: i64, id: Option<i64>) -> Result<LeaseGrantResponse> {
let mut id = id.unwrap_or_default();
if id == 0 {
id = self.id_gen.next();
}
let request = RequestWrapper::from(xlineapi::LeaseGrantRequest::from(request));
let cmd = Command::new(request);
let cmd = Command::new(RequestWrapper::from(xlineapi::LeaseGrantRequest {
ttl,
id,
}));
let (cmd_res, _sync_res) = self
.curp_client
.propose(&cmd, self.token.as_ref(), true)
Expand All @@ -110,14 +113,16 @@ impl LeaseClient {

/// Revokes a lease. All keys attached to the lease will expire and be deleted.
///
/// `id` is the lease ID to revoke. When the ID is revoked, all associated keys will be deleted.
///
/// # Errors
///
/// This function will return an error if the inner RPC client encountered a propose failure
///
/// # Examples
///
/// ```no_run
/// use xline_client::{types::lease::LeaseRevokeRequest, Client, ClientOptions};
/// use xline_client::{Client, ClientOptions};
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -130,28 +135,33 @@ impl LeaseClient {
///
/// // granted a lease id 1
///
/// let _resp = client.revoke(LeaseRevokeRequest::new(1)).await?;
/// let _resp = client.revoke(1).await?;
///
/// Ok(())
/// }
/// ```
#[inline]
pub async fn revoke(&mut self, request: LeaseRevokeRequest) -> Result<LeaseRevokeResponse> {
let res = self.lease_client.lease_revoke(request.inner).await?;
pub async fn revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
let res = self
.lease_client
.lease_revoke(xlineapi::LeaseRevokeRequest { id })
.await?;
Ok(res.into_inner())
}

/// Keeps the lease alive by streaming keep alive requests from the client
/// to the server and streaming keep alive responses from the server to the client.
///
/// `id` is the lease ID for the lease to keep alive.
///
/// # Errors
///
/// This function will return an error if the inner RPC client encountered a propose failure
///
/// # Examples
///
/// ```no_run
/// use xline_client::{types::lease::LeaseKeepAliveRequest, Client, ClientOptions};
/// use xline_client::{Client, ClientOptions};
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -164,7 +174,7 @@ impl LeaseClient {
///
/// // granted a lease id 1
///
/// let (mut keeper, mut stream) = client.keep_alive(LeaseKeepAliveRequest::new(1)).await?;
/// let (mut keeper, mut stream) = client.keep_alive(1).await?;
///
/// if let Some(resp) = stream.message().await? {
/// println!("new ttl: {}", resp.ttl);
Expand All @@ -178,12 +188,12 @@ impl LeaseClient {
#[inline]
pub async fn keep_alive(
&mut self,
request: LeaseKeepAliveRequest,
id: i64,
) -> Result<(LeaseKeeper, Streaming<LeaseKeepAliveResponse>)> {
let (mut sender, receiver) = channel::<xlineapi::LeaseKeepAliveRequest>(100);

sender
.try_send(request.into())
.try_send(xlineapi::LeaseKeepAliveRequest { id })
.map_err(|e| XlineClientError::LeaseError(e.to_string()))?;

let mut stream = self
Expand All @@ -192,7 +202,7 @@ impl LeaseClient {
.await?
.into_inner();

let id = match stream.message().await? {
let resp_id = match stream.message().await? {
Some(resp) => resp.id,
None => {
return Err(XlineClientError::LeaseError(String::from(
Expand All @@ -201,19 +211,22 @@ impl LeaseClient {
}
};

Ok((LeaseKeeper::new(id, sender), stream))
Ok((LeaseKeeper::new(resp_id, sender), stream))
}

/// Retrieves lease information.
///
/// `id` is the lease ID for the lease,
/// `keys` is true to query all the keys attached to this lease.
///
/// # Errors
///
/// This function will return an error if the inner RPC client encountered a propose failure
///
/// # Examples
///
/// ```no_run
/// use xline_client::{types::lease::LeaseTimeToLiveRequest, Client, ClientOptions};
/// use xline_client::{Client, ClientOptions};
/// use anyhow::Result;
///
/// #[tokio::main]
Expand All @@ -226,21 +239,18 @@ impl LeaseClient {
///
/// // granted a lease id 1
///
/// let resp = client.time_to_live(LeaseTimeToLiveRequest::new(1)).await?;
/// let resp = client.time_to_live(1, false).await?;
///
/// println!("remaining ttl: {}", resp.ttl);
///
/// Ok(())
/// }
/// ```
#[inline]
pub async fn time_to_live(
&mut self,
request: LeaseTimeToLiveRequest,
) -> Result<LeaseTimeToLiveResponse> {
pub async fn time_to_live(&mut self, id: i64, keys: bool) -> Result<LeaseTimeToLiveResponse> {
Ok(self
.lease_client
.lease_time_to_live(xlineapi::LeaseTimeToLiveRequest::from(request))
.lease_time_to_live(xlineapi::LeaseTimeToLiveRequest { id, keys })
.await?
.into_inner())
}
Expand Down
15 changes: 3 additions & 12 deletions crates/xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ use crate::{
clients::{lease::LeaseClient, watch::WatchClient, DEFAULT_SESSION_TTL},
error::{Result, XlineClientError},
lease_gen::LeaseIdGenerator,
types::{
kv::TxnRequest as KvTxnRequest,
lease::{LeaseGrantRequest, LeaseKeepAliveRequest},
watch::WatchRequest,
},
types::{kv::TxnRequest as KvTxnRequest, watch::WatchRequest},
CurpClient,
};

Expand Down Expand Up @@ -130,19 +126,14 @@ impl Xutex {
let lease_id = if let Some(id) = lease_id {
id
} else {
let lease_response = client
.lease_client
.grant(LeaseGrantRequest::new(ttl))
.await?;
let lease_response = client.lease_client.grant(ttl, None).await?;
lease_response.id
};
let mut lease_client = client.lease_client.clone();
let keep_alive = Some(tokio::spawn(async move {
/// The renew interval factor of which value equals 60% of one second.
const RENEW_INTERVAL_FACTOR: u64 = 600;
let (mut keeper, mut stream) = lease_client
.keep_alive(LeaseKeepAliveRequest::new(lease_id))
.await?;
let (mut keeper, mut stream) = lease_client.keep_alive(lease_id).await?;
loop {
keeper.keep_alive()?;
if let Some(resp) = stream.message().await? {
Expand Down
Loading

0 comments on commit 8032dfe

Please sign in to comment.