Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp support u64::MAX #364

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,24 @@ pub trait TimestampExt: Sized {

impl TimestampExt for Timestamp {
fn version(&self) -> u64 {
if self.physical == i64::MAX && self.logical == i64::MAX {
return u64::MAX;
}

((self.physical << PHYSICAL_SHIFT_BITS) + self.logical)
.try_into()
.expect("Overflow converting timestamp to version")
}

fn from_version(version: u64) -> Self {
if version == u64::MAX {
return Self {
physical: i64::MAX,
logical: i64::MAX,
suffix_bits: 0,
};
}

let version = version as i64;
Self {
physical: version >> PHYSICAL_SHIFT_BITS,
Expand All @@ -41,7 +53,7 @@ impl TimestampExt for Timestamp {
}

fn try_from_version(version: u64) -> Option<Self> {
if version == 0 {
if version == 0 || (version >= i64::MAX as u64 && version != u64::MAX) {
None
} else {
Some(Self::from_version(version))
Expand Down
27 changes: 27 additions & 0 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,31 @@ impl Client {
let logger = self.logger.new(o!("child" => 1));
Transaction::new(timestamp, self.pd.clone(), options, logger)
}

/// Creates a new latest commit readonly [`Transaction`].
///
/// Read operations will read the latest commit data which is not a snapshot read.
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::{Config, TransactionClient};
/// # use futures::prelude::*;
/// # use tikv_client::TransactionOptions;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
/// .await
/// .unwrap();
/// let options = TransactionOptions::new_optimistic();
/// let mut transaction = client.begin_latest_read(options);
/// // ... Issue some reads.
/// # });
/// ```
pub fn begin_latest_read(&self, options: TransactionOptions) -> Transaction {
debug!(
self.logger,
"creating new latest commit readonly transaction"
);
self.new_transaction(Timestamp::from_version(u64::MAX), options.read_only())
}
}
40 changes: 28 additions & 12 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!(self.logger, "invoking transactional get request");
self.check_allow_operation().await?;
self.check_allow_operation(true).await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let key = key.into();
Expand Down Expand Up @@ -177,7 +177,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!(self.logger, "invoking transactional get_for_update request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
if !self.is_pessimistic() {
let key = key.into();
self.lock_keys(iter::once(key.clone())).await?;
Expand Down Expand Up @@ -244,7 +244,7 @@ impl<PdC: PdClient> Transaction<PdC> {
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
debug!(self.logger, "invoking transactional batch_get request");
self.check_allow_operation().await?;
self.check_allow_operation(true).await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let retry_options = self.options.retry_options.clone();
Expand Down Expand Up @@ -299,7 +299,7 @@ impl<PdC: PdClient> Transaction<PdC> {
self.logger,
"invoking transactional batch_get_for_update request"
);
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
if !self.is_pessimistic() {
self.lock_keys(keys.clone()).await?;
Expand Down Expand Up @@ -433,7 +433,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!(self.logger, "invoking transactional put request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
let key = key.into();
if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false)
Expand Down Expand Up @@ -464,7 +464,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!(self.logger, "invoking transactional insert request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
let key = key.into();
if self.buffer.get(&key).is_some() {
return Err(Error::DuplicateKeyInsertion);
Expand Down Expand Up @@ -499,7 +499,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
debug!(self.logger, "invoking transactional delete request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
let key = key.into();
if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false)
Expand Down Expand Up @@ -537,7 +537,7 @@ impl<PdC: PdClient> Transaction<PdC> {
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<()> {
debug!(self.logger, "invoking transactional lock_keys request");
self.check_allow_operation().await?;
self.check_allow_operation(false).await?;
match self.options.kind {
TransactionKind::Optimistic => {
for key in keys {
Expand Down Expand Up @@ -569,6 +569,15 @@ impl<PdC: PdClient> Transaction<PdC> {
/// ```
pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
debug!(self.logger, "commiting transaction");

{
// readonly transaction no need to commit
let status = self.status.read().await;
if *status == TransactionStatus::ReadOnly {
return Ok(None);
}
}

{
let mut status = self.status.write().await;
if !matches!(
Expand Down Expand Up @@ -677,7 +686,7 @@ impl<PdC: PdClient> Transaction<PdC> {
#[doc(hidden)]
pub async fn send_heart_beat(&mut self) -> Result<u64> {
debug!(self.logger, "sending heart_beat");
self.check_allow_operation().await?;
self.check_allow_operation(true).await?;
let primary_key = match self.buffer.get_primary_key() {
Some(k) => k,
None => return Err(Error::NoPrimaryKey),
Expand All @@ -703,7 +712,7 @@ impl<PdC: PdClient> Transaction<PdC> {
key_only: bool,
reverse: bool,
) -> Result<impl Iterator<Item = KvPair>> {
self.check_allow_operation().await?;
self.check_allow_operation(true).await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let retry_options = self.options.retry_options.clone();
Expand Down Expand Up @@ -840,10 +849,17 @@ impl<PdC: PdClient> Transaction<PdC> {
}

/// Checks if the transaction can perform arbitrary operations.
async fn check_allow_operation(&self) -> Result<()> {
async fn check_allow_operation(&self, readonly: bool) -> Result<()> {
let status = self.status.read().await;
match *status {
TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()),
TransactionStatus::Active => Ok(()),
TransactionStatus::ReadOnly => {
if readonly {
Ok(())
} else {
Err(Error::OperationReadOnlyError)
}
}
TransactionStatus::Committed
| TransactionStatus::Rolledback
| TransactionStatus::StartedCommit
Expand Down
28 changes: 27 additions & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use futures::prelude::*;
use rand::{seq::IteratorRandom, thread_rng, Rng};
use serial_test::serial;
use std::{
assert_eq,
collections::{HashMap, HashSet},
convert::TryInto,
iter,
iter, matches,
};
use tikv_client::{
transaction::HeartbeatOption, BoundRange, Error, Key, KvPair, RawClient, Result, Transaction,
Expand Down Expand Up @@ -942,6 +943,31 @@ async fn txn_key_exists() -> Result<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn txn_latest_read() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let key = "key".to_owned();
let value = "value".to_owned();
let options = TransactionOptions::new_optimistic();
let mut t1 = client.begin_latest_read(options);
t1.get(key.clone()).await?;
t1.put(key.clone(), value.clone())
.await
.map_err(|_e| matches!(Error::OperationReadOnlyError, _e))
.unwrap_err();
// commit is no needed for readonly transaction, commit() will take no effect if called.
t1.commit().await?;

let options = TransactionOptions::new_pessimistic();
let mut t2 = client.begin_latest_read(options);
t2.get(key.clone()).await?;
// t2.commit().await?;

Ok(())
}

// helper function
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Result<u32> {
let x = client.get(key).await?.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions tikv-client-common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub enum Error {
/// It's not allowed to perform operations in a transaction after it has been committed or rolled back.
#[error("Cannot read or write data after any attempt to commit or roll back the transaction")]
OperationAfterCommitError,
/// It's not allowed to perform write operation in a readonly transaction.
#[error("Cannot write data in read-only transaction")]
OperationReadOnlyError,
/// We tried to use 1pc for a transaction, but it didn't work. Probably should have used 2pc.
#[error("1PC transaction could not be committed.")]
OnePcFailure,
Expand Down