diff --git a/Cargo.lock b/Cargo.lock index 8786a7651..c5e19dd0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -849,6 +849,7 @@ dependencies = [ "test-macros", "thiserror", "tokio-util", + "tracing", "utils", "uuid", "workspace-hack", diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index edf890f3c..5ea89a808 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -297,7 +297,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() { )); let collection = collection_task.await.unwrap(); - sleep_secs(7).await; // wait for the cluster to shutdown + sleep_secs(1).await; // wait for the cluster to shutdown assert!(group.is_finished()); let group = CurpGroup::new_rocks(3, tmp_path).await; diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml index fdc75756c..89b08a314 100644 --- a/crates/engine/Cargo.toml +++ b/crates/engine/Cargo.toml @@ -28,6 +28,7 @@ tokio = { version = "0.2.23", package = "madsim-tokio", features = [ "io-util", ] } tokio-util = { version = "0.7.8", features = ["io"] } +tracing = "0.1.40" utils = { path = "../utils" } uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../../workspace-hack" } diff --git a/crates/engine/src/rocksdb_engine/mod.rs b/crates/engine/src/rocksdb_engine/mod.rs index bbfae9ce2..470d386be 100644 --- a/crates/engine/src/rocksdb_engine/mod.rs +++ b/crates/engine/src/rocksdb_engine/mod.rs @@ -14,11 +14,12 @@ use std::{ use bytes::{Buf, Bytes, BytesMut}; use clippy_utilities::{NumericCast, OverflowArithmetic}; use rocksdb::{ - Direction, Error as RocksError, IteratorMode, OptimisticTransactionDB, Options, SstFileWriter, + Direction, Error as RocksError, IteratorMode, OptimisticTransactionDB, Options, SstFileWriter, ErrorKind as RocksErrorKind, }; use serde::{Deserialize, Serialize}; use tokio::{fs::File, io::AsyncWriteExt}; use tokio_util::io::read_buf; +use tracing::warn; use crate::{ api::{engine_api::StorageEngine, snapshot_api::SnapshotApi}, @@ -200,51 +201,71 @@ impl StorageEngine for RocksEngine { #[inline] fn write_batch(&self, wr_ops: Vec<WriteOperation<'_>>, _sync: bool) -> Result<(), EngineError> { - let transaction = self.inner.transaction(); - let mut size = 0; - for op in wr_ops { - match op { - WriteOperation::Put { table, key, value } => { - let cf = self - .inner - .cf_handle(table) - .ok_or(EngineError::TableNotFound(table.to_owned()))?; - size = size.overflow_add(Self::max_write_size( - table.len(), - key.len(), - value.len(), - )); - transaction.put_cf(&cf, key, value)?; + let mut retry_interval = 10; + let max_retry_count = 5; + let mut retry_count = 0; + loop { + let transaction = self.inner.transaction(); + let mut size = 0; + #[allow(clippy::pattern_type_mismatch)] // can't be fixed + for op in &wr_ops { + match op { + WriteOperation::Put { table, key, value } => { + let cf = self + .inner + .cf_handle(table) + .ok_or(EngineError::TableNotFound((*table).to_owned()))?; + size = size.overflow_add(Self::max_write_size( + table.len(), + key.len(), + value.len(), + )); + transaction.put_cf(&cf, key, value)?; + } + WriteOperation::Delete { table, key } => { + let cf = self + .inner + .cf_handle(table) + .ok_or(EngineError::TableNotFound((*table).to_owned()))?; + transaction.delete_cf(&cf, key)?; + } + WriteOperation::DeleteRange { table, from, to } => { + let cf = self + .inner + .cf_handle(table) + .ok_or(EngineError::TableNotFound((*table).to_owned()))?; + let mode = IteratorMode::From(from, Direction::Forward); + let kvs: Vec<_> = transaction + .iterator_cf(&cf, mode) + .take_while(|res| res.as_ref().is_ok_and(|(key, _)| key.as_ref() < *to)) + .collect::<Result<Vec<_>, _>>()?; + for (key, _) in kvs { + transaction.delete_cf(&cf, key)?; + } + } } - WriteOperation::Delete { table, key } => { - let cf = self - .inner - .cf_handle(table) - .ok_or(EngineError::TableNotFound(table.to_owned()))?; - transaction.delete_cf(&cf, key)?; + } + match transaction.commit() { + Ok(_) => { + _ = self + .size + .fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed); + return Ok(()); } - WriteOperation::DeleteRange { table, from, to } => { - let cf = self - .inner - .cf_handle(table) - .ok_or(EngineError::TableNotFound(table.to_owned()))?; - let mode = IteratorMode::From(from, Direction::Forward); - let kvs: Vec<_> = transaction - .iterator_cf(&cf, mode) - .take_while(|res| res.as_ref().is_ok_and(|kv| kv.0.as_ref() < to)) - .collect::<Result<Vec<_>, _>>()?; - for (key, _) in kvs { - transaction.delete_cf(&cf, key)?; + Err(err) if matches!(err.kind(), RocksErrorKind::Busy | RocksErrorKind::TryAgain) => { + if retry_count > max_retry_count { + warn!("Oops, txn commit retry count reach the max_retry_count: {max_retry_count}"); + return Err(EngineError::UnderlyingError(err.to_string())); } + warn!("Rocksdb txn commit failed, retrying after {retry_interval}ms"); + std::thread::sleep(std::time::Duration::from_millis(retry_interval)); + retry_interval = retry_interval.overflow_mul(2); + retry_count = retry_count.overflow_add(1); + continue; } + Err(err) => return Err(EngineError::UnderlyingError(err.to_string())), } } - transaction.commit()?; - _ = self - .size - .fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed); - - Ok(()) } #[inline]