Skip to content

Commit

Permalink
fix: add txn retry logic
Browse files Browse the repository at this point in the history
Refs: xline-kv#774
Closes: xline-kv#774
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Apr 12, 2024
1 parent bc37cb1 commit 7501171
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 260 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/curp/tests/it/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
));

let collection = collection_task.await.unwrap();
sleep_secs(7).await; // wait for the cluster to shutdown
sleep_secs(1).await; // wait for the cluster to shutdown
assert!(group.is_finished());

let group = CurpGroup::new_rocks(3, tmp_path).await;
Expand Down
1 change: 1 addition & 0 deletions crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tokio = { version = "0.2.23", package = "madsim-tokio", features = [
"io-util",
] }
tokio-util = { version = "0.7.8", features = ["io"] }
tracing = "0.1.40"
utils = { path = "../utils" }
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../../workspace-hack" }
Expand Down
219 changes: 0 additions & 219 deletions crates/engine/src/mock_rocksdb_engine.rs

This file was deleted.

101 changes: 61 additions & 40 deletions crates/engine/src/rocksdb_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use std::{
use bytes::{Buf, Bytes, BytesMut};
use clippy_utilities::{NumericCast, OverflowArithmetic};
use rocksdb::{
Direction, Error as RocksError, IteratorMode, OptimisticTransactionDB, Options, SstFileWriter,
Direction, Error as RocksError, IteratorMode, OptimisticTransactionDB, Options, SstFileWriter, ErrorKind as RocksErrorKind,
};
use serde::{Deserialize, Serialize};
use tokio::{fs::File, io::AsyncWriteExt};
use tokio_util::io::read_buf;
use tracing::warn;

use crate::{
api::{engine_api::StorageEngine, snapshot_api::SnapshotApi},
Expand Down Expand Up @@ -200,51 +201,71 @@ impl StorageEngine for RocksEngine {

#[inline]
fn write_batch(&self, wr_ops: Vec<WriteOperation<'_>>, _sync: bool) -> Result<(), EngineError> {
let transaction = self.inner.transaction();
let mut size = 0;
for op in wr_ops {
match op {
WriteOperation::Put { table, key, value } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound(table.to_owned()))?;
size = size.overflow_add(Self::max_write_size(
table.len(),
key.len(),
value.len(),
));
transaction.put_cf(&cf, key, value)?;
let mut retry_interval = 10;
let max_retry_count = 5;
let mut retry_count = 0;
loop {
let transaction = self.inner.transaction();
let mut size = 0;
#[allow(clippy::pattern_type_mismatch)] // can't be fixed
for op in &wr_ops {
match op {
WriteOperation::Put { table, key, value } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound((*table).to_owned()))?;
size = size.overflow_add(Self::max_write_size(
table.len(),
key.len(),
value.len(),
));
transaction.put_cf(&cf, key, value)?;
}
WriteOperation::Delete { table, key } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound((*table).to_owned()))?;
transaction.delete_cf(&cf, key)?;
}
WriteOperation::DeleteRange { table, from, to } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound((*table).to_owned()))?;
let mode = IteratorMode::From(from, Direction::Forward);
let kvs: Vec<_> = transaction
.iterator_cf(&cf, mode)
.take_while(|res| res.as_ref().is_ok_and(|(key, _)| key.as_ref() < *to))
.collect::<Result<Vec<_>, _>>()?;
for (key, _) in kvs {
transaction.delete_cf(&cf, key)?;
}
}
}
WriteOperation::Delete { table, key } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound(table.to_owned()))?;
transaction.delete_cf(&cf, key)?;
}
match transaction.commit() {
Ok(_) => {
_ = self
.size
.fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed);
return Ok(());
}
WriteOperation::DeleteRange { table, from, to } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound(table.to_owned()))?;
let mode = IteratorMode::From(from, Direction::Forward);
let kvs: Vec<_> = transaction
.iterator_cf(&cf, mode)
.take_while(|res| res.as_ref().is_ok_and(|kv| kv.0.as_ref() < to))
.collect::<Result<Vec<_>, _>>()?;
for (key, _) in kvs {
transaction.delete_cf(&cf, key)?;
Err(err) if matches!(err.kind(), RocksErrorKind::Busy | RocksErrorKind::TryAgain) => {
if retry_count > max_retry_count {
warn!("Oops, txn commit retry count reach the max_retry_count: {max_retry_count}");
return Err(EngineError::UnderlyingError(err.to_string()));
}
warn!("Rocksdb txn commit failed, retrying after {retry_interval}ms");
std::thread::sleep(std::time::Duration::from_millis(retry_interval));
retry_interval = retry_interval.overflow_mul(2);
retry_count = retry_count.overflow_add(1);
continue;
}
Err(err) => return Err(EngineError::UnderlyingError(err.to_string())),
}
}
transaction.commit()?;
_ = self
.size
.fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed);

Ok(())
}

#[inline]
Expand Down

0 comments on commit 7501171

Please sign in to comment.