Skip to content

Commit

Permalink
refactor(client)!: refactor Transaction complete
Browse files Browse the repository at this point in the history
Signed-off-by: lxl66566 <[email protected]>
  • Loading branch information
lxl66566 committed Jul 17, 2024
1 parent db36c3d commit 9e6b057
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 121 deletions.
59 changes: 34 additions & 25 deletions crates/benchmark/src/bench_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ use etcd_client::{Client as EtcdClient, ConnectOptions};
use thiserror::Error;
#[cfg(test)]
use xline_client::types::kv::{RangeRequest, RangeResponse};
use xline_client::{
error::XlineClientError,
types::kv::{PutFut, PutResponse},
Client, ClientOptions,
};
use xline_client::{error::XlineClientError, types::kv::PutResponse, Client, ClientOptions};
use xlineapi::command::Command;

/// The client used in benchmark
Expand Down Expand Up @@ -86,30 +82,31 @@ impl BenchClient {
}

/// Send `PutRequest` by `XlineClient` or `EtcdClient`
/// A `PutRequest` is made by key, value and options.
///
/// # Errors
///
/// If `XlineClient` or `EtcdClient` failed to send request
#[inline]
pub(crate) async fn put(
&mut self,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
options: Option<PutFut>,
req: xlineapi::PutRequest,
) -> Result<PutResponse, BenchClientError> {
let req_clone = req.clone();
match self.kv_client {
KVClient::Xline(ref mut xline_client) => {
let response = xline_client.kv_client().put(key, value, options).await?;
let response = xline_client
.kv_client()
.put(req.key, req.value)
.with_lease(req.lease)
.with_prev_kv(req.prev_kv)
.with_ignore_value(req.ignore_value)
.with_ignore_lease(req.ignore_lease)
.await?;
Ok(response)
}
KVClient::Etcd(ref mut etcd_client) => {
let response = etcd_client
.put(
key,
value,
Some(convert::put_req(&options.unwrap_or_default())),
)
.put(req.key, req.value, Some(convert::put_req(&req_clone)))
.await?;
Ok(convert::put_res(response))
}
Expand Down Expand Up @@ -142,19 +139,18 @@ impl BenchClient {

/// Convert utils
mod convert {
use xline_client::types::kv::PutFut;
use xlineapi::{KeyValue, PutResponse, ResponseHeader};
use xlineapi::{KeyValue, PutRequest, PutResponse, ResponseHeader};

/// transform `xline_client::types::kv::PutOptions` into `etcd_client::PutOptions`
pub(super) fn put_req(req: &PutFut) -> etcd_client::PutOptions {
let mut opts = etcd_client::PutOptions::new().with_lease(req.lease());
if req.prev_kv() {
pub(super) fn put_req(req: &PutRequest) -> etcd_client::PutOptions {
let mut opts = etcd_client::PutOptions::new().with_lease(req.lease);
if req.prev_kv {
opts = opts.with_prev_key();
}
if req.ignore_value() {
if req.ignore_value {
opts = opts.with_ignore_value();
}
if req.ignore_lease() {
if req.ignore_lease {
opts = opts.with_ignore_lease();
}
opts
Expand Down Expand Up @@ -217,6 +213,7 @@ mod convert {
mod test {
use xline_client::types::kv::RangeRequest;
use xline_test_utils::Cluster;
use xlineapi::PutRequest;

use crate::bench_client::{BenchClient, ClientOptions};

Expand All @@ -230,8 +227,14 @@ mod test {
let mut client = BenchClient::new(cluster.all_client_addrs(), use_curp_client, config)
.await
.unwrap();
//check xline client put value exist
let _put_response = client.put("put", "123", None).await;
// check xline client put value exist
let _put_response = client
.put(PutRequest {
key: "put".into(),
value: "123".into(),
..Default::default()
})
.await;
let range_request = RangeRequest::new("put");
let response = client.get(range_request).await.unwrap();
assert_eq!(response.kvs[0].value, b"123");
Expand All @@ -247,7 +250,13 @@ mod test {
.await
.unwrap();

let _put_response = client.put("put", "123", None).await;
let _put_response = client
.put(PutRequest {
key: "put".into(),
value: "123".into(),
..Default::default()
})
.await;
let range_request = RangeRequest::new("put");
let response = client.get(range_request).await.unwrap();
assert_eq!(response.kvs[0].value, b"123");
Expand Down
9 changes: 8 additions & 1 deletion crates/benchmark/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::{
use tracing::debug;
use utils::config::ClientConfig;
use xline_client::ClientOptions;
use xlineapi::PutRequest;

use crate::{args::Commands, bench_client::BenchClient, Benchmark};

Expand Down Expand Up @@ -229,7 +230,13 @@ impl CommandRunner {
);
}
let start = Instant::now();
let result = client.put(key.as_slice(), val_clone.as_slice(), None).await;
let result = client
.put(PutRequest {
key: key.as_slice().to_vec(),
value: val_clone.as_slice().to_vec(),
..Default::default()
})
.await;
let cmd_result = CmdResult {
elapsed: start.elapsed(),
error: result.err().map(|e| format!("{e:?}")),
Expand Down
5 changes: 2 additions & 3 deletions crates/xline-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ To create a xline client:

```rust, no_run
use xline_client::{
types::kv::{PutRequest, RangeRequest},
Client, ClientOptions,
types::kv::RangeRequest, Client, ClientOptions,
};
use anyhow::Result;
Expand All @@ -95,7 +94,7 @@ To create a xline client:
.await?
.kv_client();
client.put(PutRequest::new("key", "value")).await?;
client.put("key", "value").await?;
let resp = client.range(RangeRequest::new("key")).await?;
Expand Down
16 changes: 9 additions & 7 deletions crates/xline-client/examples/lock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use xline_client::{
clients::Xutex,
types::kv::{Compare, CompareResult, PutFut, TxnOp},
types::kv::{Compare, CompareResult},
Client, ClientOptions,
};

Expand All @@ -13,18 +13,20 @@ async fn main() -> Result<()> {
let client = Client::connect(curp_members, ClientOptions::default()).await?;

let lock_client = client.lock_client();
let kv_client = client.kv_client();
let mut kv_client = client.kv_client();

let mut xutex = Xutex::new(lock_client, "lock-test", None, None).await?;
// when the `xutex_guard` drop, the lock will be unlocked.
let xutex_guard = xutex.lock_unsafe().await?;

let txn_req = xutex_guard
.txn_check_locked_key()
.when(Compare::value("key2", CompareResult::Equal, "value2"))
.and_then(|c| c.put("key2", "value3").with_prev_kv(true));
let lock_check = xutex_guard.txn_check_locked_key();

let _resp = kv_client.txn_exec(txn_req).await?;
let _resp = kv_client
.when(lock_check)
.when(Compare::value("key2", CompareResult::Equal, "value2"))
.and_then(|c| c.put("key2", "value3").with_prev_kv(true))
.txn_exec()
.await?;

Ok(())
}
33 changes: 18 additions & 15 deletions crates/xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl KvClient {
/// # Examples
///
/// ```no_run
/// use xline_client::{types::kv::PutOptions, Client, ClientOptions};
/// use xline_client::{Client, ClientOptions};
/// use anyhow::Result;
///
/// #[tokio::main]
Expand Down Expand Up @@ -193,8 +193,7 @@ impl KvClient {
///
///```no_run
/// use xline_client::{
/// types::kv::{CompactionRequest, PutRequest},
/// Client, ClientOptions,
/// types::kv::CompactionRequest, Client, ClientOptions,
/// };
/// use anyhow::Result;
///
Expand All @@ -206,7 +205,7 @@ impl KvClient {
/// .await?
/// .kv_client();
///
/// let resp_put = client.put(PutRequest::new("key", "val")).await?;
/// let resp_put = client.put("key", "val").await?;
/// let rev = resp_put.header.unwrap().revision;
///
/// let _resp = client.compact(CompactionRequest::new(rev)).await?;
Expand All @@ -233,15 +232,15 @@ impl KvClient {
Ok(cmd_res.into_inner().into())
}

// transaction
// region transaction

/// Append a condition to the transaction `compare`
#[inline]
pub fn when(&mut self, compares: impl Into<Compare>) -> &mut Self {
self.txn.when(compares.into());
self
}
/// Append a operation to the transaction `success`
/// Append an operation to the transaction `success`
#[inline]
pub fn and_then<F, O>(&mut self, operation: F) -> &mut Self
where
Expand All @@ -252,7 +251,7 @@ impl KvClient {
self.txn.and_then(temp);
self
}
/// Append a operation to the transaction `failure`
/// Append an operation to the transaction `failure`
#[inline]
pub fn or_else<F, O>(&mut self, operation: F) -> &mut Self
where
Expand All @@ -263,11 +262,15 @@ impl KvClient {
self.txn.or_else(temp);
self
}
/// swap out self.txn with given value
#[inline]
pub fn replace_txn(&mut self, txn: TxnRequest) -> TxnRequest {
std::mem::replace(&mut self.txn, txn)
}
/// swap out self.txn with default value, to send the transaction stored in Self to cluster
#[inline]
pub fn replace_txn_with_default(&mut self) -> TxnRequest {
let default_value = TxnRequest::default();
std::mem::replace(&mut self.txn, default_value)
self.replace_txn(TxnRequest::default())
}

/// Send the transaction stored in Self to cluster, which can provide serializable writes
Expand All @@ -285,7 +288,7 @@ impl KvClient {
///
/// ```no_run
/// use xline_client::{
/// types::kv::{Compare, PutRequest, RangeRequest, TxnOp, TxnRequest, CompareResult},
/// types::kv::{Compare, RangeRequest, TxnOp, TxnRequest, CompareResult},
/// Client, ClientOptions,
/// };
/// use anyhow::Result;
Expand All @@ -298,12 +301,12 @@ impl KvClient {
/// .await?
/// .kv_client();
///
///
///
/// let _resp = client
/// .when(&[Compare::value("key2", CompareResult::Equal, "value2")][..])
/// .and_then(|c| [c.put("key2", "value3").with_prev_kv(true)])
/// .or_else(|_| [TxnOp::range(RangeRequest::new("key2"))]).txn_exec().await?;
/// .when(Compare::value("key2", CompareResult::Equal, "value2"))
/// .and_then(|c| c.put("key2", "value3").with_prev_kv(true))
/// .or_else(|_| TxnOp::range(RangeRequest::new("key2")))
/// .txn_exec()
/// .await?;
///
/// Ok(())
/// }
Expand Down
46 changes: 22 additions & 24 deletions crates/xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
error::{Result, XlineClientError},
lease_gen::LeaseIdGenerator,
types::{
kv::TxnRequest as KvTxnRequest,
self,
lease::{LeaseGrantRequest, LeaseKeepAliveRequest},
watch::WatchRequest,
},
Expand Down Expand Up @@ -84,22 +84,21 @@ impl XutexGuard {
self.key.as_str()
}

/// Return a `TxnRequest` which will perform the success ops when the locked key is exist.
/// Returns a [`Compare`] indicates whether the locked key exists.
/// This method is syntactic sugar
#[inline]
#[must_use]
pub fn txn_check_locked_key(&self) -> KvTxnRequest {
let mut txn_request = KvTxnRequest::default();
#[allow(clippy::as_conversions)]
let cmp = Compare {
result: CompareResult::Greater as i32,
target: CompareTarget::Create as i32,
key: self.key().into(),
range_end: Vec::new(),
target_union: Some(TargetUnion::CreateRevision(0)),
};
txn_request.inner.compare.push(cmp);
txn_request
pub fn txn_check_locked_key(&self) -> types::kv::Compare {
types::kv::Compare(
#[allow(clippy::as_conversions)]
Compare {
result: CompareResult::Greater as i32,
target: CompareTarget::Create as i32,
key: self.key().into(),
range_end: Vec::new(),
target_union: Some(TargetUnion::CreateRevision(0)),
},
)
}
}

Expand Down Expand Up @@ -263,7 +262,7 @@ impl Xutex {
/// use anyhow::Result;
/// use xline_client::{
/// clients::Xutex,
/// types::kv::{Compare, CompareResult, PutRequest, TxnOp},
/// types::kv::{Compare, CompareResult, TxnOp},
/// Client, ClientOptions,
/// };
///
Expand All @@ -275,20 +274,19 @@ impl Xutex {
/// let client = Client::connect(curp_members, ClientOptions::default()).await?;
///
/// let lock_client = client.lock_client();
/// let kv_client = client.kv_client();
/// let mut kv_client = client.kv_client();
///
/// let mut xutex = Xutex::new(lock_client, "lock-test", None, None).await?;
/// // when the `xutex_guard` drop, the lock will be unlocked.
/// let xutex_guard = xutex.lock_unsafe().await?;
/// let txn_req = xutex_guard
/// .txn_check_locked_key()
/// .when([Compare::value("key2", CompareResult::Equal, "value2")])
/// .and_then([TxnOp::put(
/// PutRequest::new("key2", "value3").with_prev_kv(true),
/// )])
/// .or_else(&[]);
///
/// let _resp = kv_client.txn(txn_req).await?;
/// let _resp = kv_client
/// .when(xutex_guard.txn_check_locked_key())
/// .when(Compare::value("key2", CompareResult::Equal, "value2"))
/// .and_then(|c| c.put("key2", "value3").with_prev_kv(true))
/// .txn_exec()
/// .await?;
///
/// // the lock will be released when the lock session is dropped.
/// Ok(())
/// }
Expand Down
5 changes: 2 additions & 3 deletions crates/xline-client/src/clients/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ impl WatchClient {
///
/// ```no_run
/// use xline_client::{
/// types::{kv::PutRequest, watch::WatchRequest},
/// Client, ClientOptions,
/// types::watch::WatchRequest, Client, ClientOptions,
/// };
/// use anyhow::Result;
///
Expand All @@ -68,7 +67,7 @@ impl WatchClient {
/// let mut kv_client = client.kv_client();
///
/// let (mut watcher, mut stream) = watch_client.watch(WatchRequest::new("key1")).await?;
/// kv_client.put(PutRequest::new("key1", "value1")).await?;
/// kv_client.put("key1", "value1").await?;
///
/// let resp = stream.message().await?.unwrap();
/// let kv = resp.events[0].kv.as_ref().unwrap();
Expand Down
Loading

0 comments on commit 9e6b057

Please sign in to comment.