Skip to content

Commit

Permalink
Add a new begin_latest_read api for transaction support
Browse files Browse the repository at this point in the history
Signed-off-by: yongman <[email protected]>
  • Loading branch information
yongman committed Sep 8, 2022
1 parent 5619b79 commit 6c60921
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 18 deletions.
33 changes: 28 additions & 5 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,35 @@ impl Client {
Ok(res)
}

pub fn new_transaction(
&self,
timestamp: Timestamp,
options: TransactionOptions,
) -> Transaction {
fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
let logger = self.logger.new(o!("child" => 1));
Transaction::new(timestamp, self.pd.clone(), options, logger)
}

/// Creates a new latest commit readonly [`Transaction`].
///
/// Read operations will read the latest commit data which is not a snapshot read.
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::{Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
/// .await
/// .unwrap();
/// let options = TransactionOptions::new_optimistic();
/// let mut transaction = client.begin_latest_read(options).await.unwrap();
/// // ... Issue some reads.
/// transaction.commit().await.unwrap();
/// # });
/// ```
pub fn begin_latest_read(&self, options: TransactionOptions) -> Transaction {
debug!(
self.logger,
"creating new latest commit readonly transaction"
);
self.new_transaction(Timestamp::from_version(u64::MAX), options.read_only())
}
}
40 changes: 28 additions & 12 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!(self.logger, "invoking transactional get request");
self.check_allow_operation().await?;
self.check_allow_operation(true).await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let key = key.into();
Expand Down Expand Up @@ -177,7 +177,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!(self.logger, "invoking transactional get_for_update request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
if !self.is_pessimistic() {
let key = key.into();
self.lock_keys(iter::once(key.clone())).await?;
Expand Down Expand Up @@ -244,7 +244,7 @@ impl<PdC: PdClient> Transaction<PdC> {
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
debug!(self.logger, "invoking transactional batch_get request");
self.check_allow_operation().await?;
self.check_allow_operation(true).await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let retry_options = self.options.retry_options.clone();
Expand Down Expand Up @@ -299,7 +299,7 @@ impl<PdC: PdClient> Transaction<PdC> {
self.logger,
"invoking transactional batch_get_for_update request"
);
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
if !self.is_pessimistic() {
self.lock_keys(keys.clone()).await?;
Expand Down Expand Up @@ -433,7 +433,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!(self.logger, "invoking transactional put request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
let key = key.into();
if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false)
Expand Down Expand Up @@ -464,7 +464,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!(self.logger, "invoking transactional insert request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
let key = key.into();
if self.buffer.get(&key).is_some() {
return Err(Error::DuplicateKeyInsertion);
Expand Down Expand Up @@ -499,7 +499,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
debug!(self.logger, "invoking transactional delete request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
let key = key.into();
if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false)
Expand Down Expand Up @@ -537,7 +537,7 @@ impl<PdC: PdClient> Transaction<PdC> {
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<()> {
debug!(self.logger, "invoking transactional lock_keys request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
match self.options.kind {
TransactionKind::Optimistic => {
for key in keys {
Expand Down Expand Up @@ -569,6 +569,15 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
debug!(self.logger, "commiting transaction");

{
// readonly transaction no need to commit
let status = self.status.read().await;
if *status == TransactionStatus::ReadOnly {
return Ok(None);
}
}

{
let mut status = self.status.write().await;
if !matches!(
Expand Down Expand Up @@ -677,7 +686,7 @@ impl<PdC: PdClient> Transaction<PdC> {
#[doc(hidden)]
pub async fn send_heart_beat(&mut self) -> Result<u64> {
debug!(self.logger, "sending heart_beat");
self.check_allow_operation().await?;
self.check_allow_operation(true).await?;
let primary_key = match self.buffer.get_primary_key() {
Some(k) => k,
None => return Err(Error::NoPrimaryKey),
Expand All @@ -703,7 +712,7 @@ impl<PdC: PdClient> Transaction<PdC> {
key_only: bool,
reverse: bool,
) -> Result<impl Iterator<Item = KvPair>> {
self.check_allow_operation().await?;
self.check_allow_operation(true).await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let retry_options = self.options.retry_options.clone();
Expand Down Expand Up @@ -840,10 +849,17 @@ impl<PdC: PdClient> Transaction<PdC> {
}

/// Checks if the transaction can perform arbitrary operations.
async fn check_allow_operation(&self) -> Result<()> {
async fn check_allow_operation(&self, readonly: bool) -> Result<()> {
let status = self.status.read().await;
match *status {
TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()),
TransactionStatus::Active => Ok(()),
TransactionStatus::ReadOnly => {
if readonly {
Ok(())
} else {
Err(Error::OperationReadOnlyError)
}
}
TransactionStatus::Committed
| TransactionStatus::Rolledback
| TransactionStatus::StartedCommit
Expand Down
28 changes: 27 additions & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use futures::prelude::*;
use rand::{seq::IteratorRandom, thread_rng, Rng};
use serial_test::serial;
use std::{
assert_eq,
collections::{HashMap, HashSet},
convert::TryInto,
iter,
iter, matches,
};
use tikv_client::{
transaction::HeartbeatOption, BoundRange, Error, Key, KvPair, RawClient, Result, Transaction,
Expand Down Expand Up @@ -942,6 +943,31 @@ async fn txn_key_exists() -> Result<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn txn_latest_read() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let key = "key".to_owned();
let value = "value".to_owned();
let options = TransactionOptions::new_optimistic();
let mut t1 = client.begin_latest_read(options);
t1.get(key.clone()).await?;
t1.put(key.clone(), value.clone())
.await
.map_err(|_e| matches!(Error::OperationReadOnlyError, _e))
.unwrap_err();
// commit is no needed for readonly transaction, commit() will take no effect if called.
t1.commit().await?;

let options = TransactionOptions::new_pessimistic();
let mut t2 = client.begin_latest_read(options);
t2.get(key.clone()).await?;
// t2.commit().await?;

Ok(())
}

// helper function
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Result<u32> {
let x = client.get(key).await?.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions tikv-client-common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub enum Error {
/// It's not allowed to perform operations in a transaction after it has been committed or rolled back.
#[error("Cannot read or write data after any attempt to commit or roll back the transaction")]
OperationAfterCommitError,
/// It's not allowed to perform write operation in a readonly transaction.
#[error("Cannot write data in read-only transaction")]
OperationReadOnlyError,
/// We tried to use 1pc for a transaction, but it didn't work. Probably should have used 2pc.
#[error("1PC transaction could not be committed.")]
OnePcFailure,
Expand Down

0 comments on commit 6c60921

Please sign in to comment.