diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index 12f713757a30..78c11214c726 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -457,9 +457,8 @@ mod tests { use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, - BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, - RangeResponse, + BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; use dashmap::DashMap; @@ -519,13 +518,6 @@ mod tests { unimplemented!() } - async fn compare_and_put( - &self, - _req: CompareAndPutRequest, - ) -> Result { - unimplemented!() - } - async fn delete_range( &self, _req: DeleteRangeRequest, diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index e16dc2c23afb..1bc04e7089d0 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -20,6 +20,7 @@ use common_error::ext::ErrorExt; pub use txn::TxnService; use crate::error::Error; +use crate::kv_backend::txn::{Txn, TxnOpResponse}; use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, @@ -52,11 +53,6 @@ where async fn batch_get(&self, req: BatchGetRequest) -> Result; - async fn compare_and_put( - &self, - req: CompareAndPutRequest, - ) -> Result; - async fn delete_range( &self, req: DeleteRangeRequest, @@ -80,6 +76,33 @@ where }) } + /// CAS: Compares the value at the key with the given value, and if they are + /// equal, puts the new value at the key. + async fn compare_and_put( + &self, + req: CompareAndPutRequest, + ) -> Result { + let CompareAndPutRequest { key, expect, value } = req; + let txn = if expect.is_empty() { + Txn::put_if_not_exists(key, value) + } else { + Txn::compare_and_put(key, expect, value) + }; + let txn_res = self.txn(txn).await?; + + let success = txn_res.succeeded; + // The response is guaranteed to have at most one element. + let op_res = txn_res.responses.into_iter().next(); + let prev_kv = match op_res { + Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv, + Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(), + Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(), + None => None, + }; + + Ok(CompareAndPutResponse { success, prev_kv }) + } + /// Puts a value at a key. If `if_not_exists` is `true`, the operation /// ensures the key does not exist before applying the PUT operation. /// Otherwise, it simply applies the PUT operation without checking for diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 9bd98b826820..c437f90a25c7 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -16,10 +16,9 @@ use std::any::Any; use std::sync::Arc; use etcd_client::{ - Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, - TxnResponse, + Client, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse, }; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use super::KvBackendRef; use crate::error::{self, Error, Result}; @@ -28,8 +27,8 @@ use crate::kv_backend::{KvBackend, TxnService}; use crate::metrics::METRIC_META_TXN_REQUEST; use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, - BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, }; use crate::rpc::KeyValue; @@ -202,53 +201,6 @@ impl KvBackend for EtcdStore { Ok(BatchGetResponse { kvs }) } - async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { - let CompareAndPut { - key, - expect, - value, - put_options, - } = req.try_into()?; - - let compare = if expect.is_empty() { - // create if absent - // revision 0 means key was not exist - Compare::create_revision(key.clone(), CompareOp::Equal, 0) - } else { - // compare and put - Compare::value(key.clone(), CompareOp::Equal, expect) - }; - let put = TxnOp::put(key.clone(), value, put_options); - let get = TxnOp::get(key, None); - let txn = Txn::new() - .when(vec![compare]) - .and_then(vec![put]) - .or_else(vec![get]); - - let txn_res = self - .client - .kv_client() - .txn(txn) - .await - .context(error::EtcdFailedSnafu)?; - - let success = txn_res.succeeded(); - let op_res = txn_res - .op_responses() - .pop() - .context(error::InvalidTxnResultSnafu { - err_msg: "empty response", - })?; - - let prev_kv = match op_res { - TxnOpResponse::Put(mut res) => res.take_prev_key().map(convert_key_value), - TxnOpResponse::Get(mut res) => res.take_kvs().into_iter().next().map(convert_key_value), - _ => unreachable!(), - }; - - Ok(CompareAndPutResponse { success, prev_kv }) - } - async fn delete_range(&self, req: DeleteRangeRequest) -> Result { let Delete { key, options } = req.try_into()?; @@ -461,28 +413,6 @@ impl TryFrom for BatchDelete { } } -struct CompareAndPut { - key: Vec, - expect: Vec, - value: Vec, - put_options: Option, -} - -impl TryFrom for CompareAndPut { - type Error = Error; - - fn try_from(req: CompareAndPutRequest) -> Result { - let CompareAndPutRequest { key, expect, value } = req; - - Ok(CompareAndPut { - key, - expect, - value, - put_options: Some(PutOptions::default().with_prev_key()), - }) - } -} - struct Delete { key: Vec, options: Option, @@ -597,22 +527,6 @@ mod tests { let _ = batch_delete.options.unwrap(); } - #[test] - fn test_parse_compare_and_put() { - let req = CompareAndPutRequest { - key: b"test_key".to_vec(), - expect: b"test_expect".to_vec(), - value: b"test_value".to_vec(), - }; - - let compare_and_put: CompareAndPut = req.try_into().unwrap(); - - assert_eq!(b"test_key".to_vec(), compare_and_put.key); - assert_eq!(b"test_expect".to_vec(), compare_and_put.expect); - assert_eq!(b"test_value".to_vec(), compare_and_put.value); - let _ = compare_and_put.put_options.unwrap(); - } - #[test] fn test_parse_delete() { let req = DeleteRangeRequest { diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 2310bd06538f..b9c1dd00bbdb 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; use std::marker::PhantomData; @@ -29,8 +28,8 @@ use crate::kv_backend::{KvBackend, TxnService}; use crate::metrics::METRIC_META_TXN_REQUEST; use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, - BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, }; use crate::rpc::KeyValue; @@ -190,41 +189,6 @@ impl KvBackend for MemoryKvBackend { Ok(BatchGetResponse { kvs }) } - async fn compare_and_put( - &self, - req: CompareAndPutRequest, - ) -> Result { - let CompareAndPutRequest { key, expect, value } = req; - - let mut kvs = self.kvs.write().unwrap(); - - let existed = kvs.entry(key); - let (success, prev_kv) = match existed { - Entry::Vacant(e) => { - let expected = expect.is_empty(); - if expected { - let _ = e.insert(value); - } - (expected, None) - } - Entry::Occupied(mut existed) => { - let expected = existed.get() == &expect; - let prev_kv = if expected { - let _ = existed.insert(value); - None - } else { - Some(KeyValue { - key: existed.key().clone(), - value: existed.get().clone(), - }) - }; - (expected, prev_kv) - } - }; - - Ok(CompareAndPutResponse { success, prev_kv }) - } - async fn delete_range( &self, req: DeleteRangeRequest, diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index c0d7e2e2f851..77cd0f921e21 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -170,13 +170,13 @@ impl Txn { } /// Builds a transaction that puts a value at a key if the key exists and the value - /// is equal to `old_value`. - pub fn compare_and_put(key: Vec, old_value: Vec, value: Vec) -> Self { + /// is equal to `expect`. + pub fn compare_and_put(key: Vec, expect: Vec, value: Vec) -> Self { Self::new() .when(vec![Compare::with_value( key.clone(), CompareOp::Equal, - old_value, + expect, )]) .and_then(vec![TxnOp::Put(key.clone(), value)]) .or_else(vec![TxnOp::Get(key)]) diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 43ed638448f1..fdc92cd49f19 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -25,8 +25,8 @@ use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnRes use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, - BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; use common_meta::util::get_next_prefix_key; @@ -277,42 +277,6 @@ impl KvBackend for RaftEngineBackend { Ok(response) } - async fn compare_and_put( - &self, - req: CompareAndPutRequest, - ) -> Result { - let CompareAndPutRequest { key, expect, value } = req; - - let mut batch = LogBatch::with_capacity(1); - let engine = self.engine.write().unwrap(); - let existing = engine_get(&engine, &key)?; - let eq = existing - .as_ref() - .map(|kv| kv.value == expect) - .unwrap_or_else(|| { - // if the associated value of key does not exist and expect is empty, - // then we still consider them as equal. - expect.is_empty() - }); - - if eq { - batch - .put(SYSTEM_NAMESPACE, key, value) - .context(RaftEngineSnafu) - .map_err(BoxedError::new) - .context(meta_error::ExternalSnafu)?; - engine - .write(&mut batch, false) - .context(RaftEngineSnafu) - .map_err(BoxedError::new) - .context(meta_error::ExternalSnafu)?; - } - Ok(CompareAndPutResponse { - success: eq, - prev_kv: existing, - }) - } - async fn delete_range( &self, req: DeleteRangeRequest, @@ -436,6 +400,7 @@ mod tests { prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, }; + use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse}; use common_test_util::temp_dir::create_temp_dir; use raft_engine::{Config, ReadableSize, RecoveryMode}; @@ -510,7 +475,8 @@ mod tests { .await .unwrap(); assert!(success); - assert_eq!(b"word".as_slice(), &prev_kv.unwrap().value); + // Do not return prev_kv on success + assert!(prev_kv.is_none()); assert_eq!( b"world".as_slice(),