diff --git a/crates/xline/src/server/barriers.rs b/crates/xline/src/server/barriers.rs deleted file mode 100644 index dd50ac7fb..000000000 --- a/crates/xline/src/server/barriers.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::collections::BTreeMap; - -use clippy_utilities::OverflowArithmetic; -use curp::LogIndex; -use event_listener::Event; -use parking_lot::Mutex; - -/// Waiter for index -#[derive(Debug)] -pub(crate) struct IndexBarrier { - /// Inner - inner: Mutex, -} - -impl IndexBarrier { - /// Create a new index barrier - pub(crate) fn new() -> Self { - IndexBarrier { - inner: Mutex::new(IndexBarrierInner { - last_trigger_index: 0, - barriers: BTreeMap::new(), - }), - } - } - - /// Wait for the index until it is triggered. - pub(crate) async fn wait(&self, index: LogIndex) { - let listener = { - let mut inner_l = self.inner.lock(); - if inner_l.last_trigger_index >= index { - return; - } - inner_l.barriers.entry(index).or_default().listen() - }; - listener.await; - } - - /// Trigger all barriers whose index is less than or equal to the given index. - #[allow(dead_code)] - pub(crate) fn trigger(&self, index: LogIndex) { - let mut inner_l = self.inner.lock(); - if inner_l.last_trigger_index < index { - inner_l.last_trigger_index = index; - } - let mut split_barriers = inner_l.barriers.split_off(&(index.overflow_add(1))); - std::mem::swap(&mut inner_l.barriers, &mut split_barriers); - for (_, barrier) in split_barriers { - let _ignore = barrier.notify(usize::MAX); - } - } -} - -/// Inner of index barrier. -#[derive(Debug)] -struct IndexBarrierInner { - /// The last index that the barrier has triggered. - last_trigger_index: LogIndex, - /// Barrier of index. - barriers: BTreeMap, -} - -#[cfg(test)] -mod test { - use std::{sync::Arc, time::Duration}; - - use futures::future::join_all; - use test_macros::abort_on_panic; - use tokio::time::{sleep, timeout}; - use utils::barrier::IdBarrier; - - use super::*; - - #[tokio::test] - #[abort_on_panic] - async fn test_id_barrier() { - let id_barrier = Arc::new(IdBarrier::new()); - let barriers = (0..5) - .map(|i| { - let id_barrier = Arc::clone(&id_barrier); - tokio::spawn(async move { - id_barrier.wait(i).await; - }) - }) - .collect::>(); - sleep(Duration::from_millis(10)).await; - for i in 0..5 { - id_barrier.trigger(&i); - } - timeout(Duration::from_millis(100), join_all(barriers)) - .await - .unwrap(); - } - - #[tokio::test] - #[abort_on_panic] - async fn test_index_barrier() { - let index_barrier = Arc::new(IndexBarrier::new()); - let barriers = (0..5).map(|i| { - let id_barrier = Arc::clone(&index_barrier); - tokio::spawn(async move { - id_barrier.wait(i).await; - }) - }); - index_barrier.trigger(5); - - timeout(Duration::from_millis(100), index_barrier.wait(3)) - .await - .unwrap(); - - timeout(Duration::from_millis(100), join_all(barriers)) - .await - .unwrap(); - } -} diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index 1850b4fa1..375a2a9d5 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -6,23 +6,18 @@ use std::{ time::Duration, }; -use curp::{rpc::ReadState, InflightId}; use dashmap::DashMap; use event_listener::Event; -use futures::future::{join_all, Either}; +use futures::future::Either; use tokio::time::timeout; use tracing::{debug, instrument}; -use utils::barrier::IdBarrier; use xlineapi::{ command::{Command, CommandResponse, CurpClient, SyncResponse}, - execute_error::ExecuteError, request_validation::RequestValidator, AuthInfo, ResponseWrapper, }; -use super::barriers::IndexBarrier; use crate::{ - metrics, revision_check::RevisionCheck, rpc::{ CompactionRequest, CompactionResponse, DeleteRangeRequest, DeleteRangeResponse, Kv, @@ -38,12 +33,6 @@ pub(crate) struct KvServer { kv_storage: Arc, /// Auth storage auth_storage: Arc, - /// Barrier for applied index - index_barrier: Arc, - /// Barrier for propose id - id_barrier: Arc>, - /// Range request retry timeout - range_retry_timeout: Duration, /// Compact timeout compact_timeout: Duration, /// Consensus client @@ -60,9 +49,6 @@ impl KvServer { pub(crate) fn new( kv_storage: Arc, auth_storage: Arc, - index_barrier: Arc, - id_barrier: Arc>, - range_retry_timeout: Duration, compact_timeout: Duration, client: Arc, compact_events: Arc>>, @@ -70,9 +56,6 @@ impl KvServer { Self { kv_storage, auth_storage, - index_barrier, - id_barrier, - range_retry_timeout, compact_timeout, client, compact_events, @@ -143,48 +126,6 @@ impl KvServer { } }; } - - /// check whether the required revision is compacted or not - fn check_range_compacted( - range_revision: i64, - compacted_revision: i64, - ) -> Result<(), tonic::Status> { - (range_revision <= 0 || range_revision >= compacted_revision) - .then_some(()) - .ok_or(ExecuteError::RevisionCompacted(range_revision, compacted_revision).into()) - } - - /// Wait current node's state machine apply the conflict commands - async fn wait_read_state(&self, cmd: &Command) -> Result<(), tonic::Status> { - loop { - let rd_state = self.client.fetch_read_state(cmd).await.map_err(|e| { - metrics::get().read_indexes_failed_total.add(1, &[]); - e - })?; - let wait_future = async move { - match rd_state { - ReadState::Ids(id_set) => { - debug!(?id_set, "Range wait for command ids"); - let fus = id_set - .inflight_ids - .into_iter() - .map(|id| self.id_barrier.wait(id)) - .collect::>(); - let _ignore = join_all(fus).await; - } - ReadState::CommitIndex(index) => { - debug!(?index, "Range wait for commit index"); - self.index_barrier.wait(index).await; - } - } - }; - if timeout(self.range_retry_timeout, wait_future).await.is_ok() { - break; - } - metrics::get().slow_read_indexes_total.add(1, &[]); - } - Ok(()) - } } #[tonic::async_trait] @@ -203,22 +144,21 @@ impl Kv for KvServer { self.kv_storage.revision(), )?; let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?; - let range_required_revision = range_req.revision; let is_serializable = range_req.serializable; - let request = RequestWrapper::from(request.into_inner()); - let cmd = Command::new_with_auth_info(request, auth_info); - if !is_serializable { - self.wait_read_state(&cmd).await?; - // Double check whether the range request is compacted or not since the compaction request - // may be executed during the process of `wait_read_state` which results in the result of - // previous `check_range_request` outdated. - Self::check_range_compacted( - range_required_revision, - self.kv_storage.compacted_revision(), - )?; - } + let res = if !is_serializable { + let (cmd_res, sync_res) = self.propose(request.into_inner(), auth_info, false).await?; + let mut res = Self::parse_response_op(cmd_res.into_inner().into()); + if let Some(sync_res) = sync_res { + let revision = sync_res.revision(); + debug!("Get revision {:?} for PutRequest", revision); + Self::update_header_revision(&mut res, revision); + } + res + } else { + let cmd = Command::new_with_auth_info(request.into_inner().into(), auth_info); + self.do_serializable(&cmd)? + }; - let res = self.do_serializable(&cmd)?; if let Response::ResponseRange(response) = res { Ok(tonic::Response::new(response)) } else { @@ -236,12 +176,9 @@ impl Kv for KvServer { ) -> Result, tonic::Status> { let put_req: &PutRequest = request.get_ref(); put_req.validation()?; - debug!("Receive grpc request: {}", put_req); + debug!("Receive grpc request: {:?}", put_req); let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?; - let is_fast_path = true; - let (cmd_res, sync_res) = self - .propose(request.into_inner(), auth_info, is_fast_path) - .await?; + let (cmd_res, sync_res) = self.propose(request.into_inner(), auth_info, false).await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); @@ -265,12 +202,9 @@ impl Kv for KvServer { ) -> Result, tonic::Status> { let delete_range_req = request.get_ref(); delete_range_req.validation()?; - debug!("Receive grpc request: {}", delete_range_req); + debug!("Receive grpc request: {:?}", delete_range_req); let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?; - let is_fast_path = true; - let (cmd_res, sync_res) = self - .propose(request.into_inner(), auth_info, is_fast_path) - .await?; + let (cmd_res, sync_res) = self.propose(request.into_inner(), auth_info, false).await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); @@ -301,28 +235,13 @@ impl Kv for KvServer { self.kv_storage.revision(), )?; let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?; - let res = if txn_req.is_read_only() { - debug!("TxnRequest is read only"); - let is_serializable = txn_req.is_serializable(); - let request = RequestWrapper::from(request.into_inner()); - let cmd = Command::new_with_auth_info(request, auth_info); - if !is_serializable { - self.wait_read_state(&cmd).await?; - } - self.do_serializable(&cmd)? - } else { - let is_fast_path = true; - let (cmd_res, sync_res) = self - .propose(request.into_inner(), auth_info, is_fast_path) - .await?; - let mut res = Self::parse_response_op(cmd_res.into_inner().into()); - if let Some(sync_res) = sync_res { - let revision = sync_res.revision(); - debug!("Get revision {} for TxnRequest", revision); - Self::update_header_revision(&mut res, revision); - } - res - }; + let (cmd_res, sync_res) = self.propose(request.into_inner(), auth_info, false).await?; + let mut res = Self::parse_response_op(cmd_res.into_inner().into()); + if let Some(sync_res) = sync_res { + let revision = sync_res.revision(); + debug!("Get revision {:?} for TxnRequest", revision); + Self::update_header_revision(&mut res, revision); + } if let Response::ResponseTxn(response) = res { Ok(tonic::Response::new(response)) } else { diff --git a/crates/xline/src/server/mod.rs b/crates/xline/src/server/mod.rs index f6c88947c..b2d0feefa 100644 --- a/crates/xline/src/server/mod.rs +++ b/crates/xline/src/server/mod.rs @@ -2,8 +2,6 @@ mod auth_server; /// Auth Wrapper mod auth_wrapper; -/// Barriers for range requests -mod barriers; /// Cluster server mod cluster_server; /// Command to be executed diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index 0c9c3d36b..cffb434c6 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -37,7 +37,6 @@ use xlineapi::command::{Command, CurpClient}; use super::{ auth_server::AuthServer, auth_wrapper::AuthWrapper, - barriers::IndexBarrier, cluster_server::ClusterServer, command::{Alarmer, CommandExecutor}, kv_server::KvServer, @@ -466,7 +465,6 @@ impl XlineServer { ) .await?; - let index_barrier = Arc::new(IndexBarrier::new()); let id_barrier = Arc::new(IdBarrier::new()); let compact_events = Arc::new(DashMap::new()); let ce = Arc::new(CommandExecutor::new( @@ -548,9 +546,6 @@ impl XlineServer { KvServer::new( Arc::clone(&kv_storage), Arc::clone(&auth_storage), - index_barrier, - id_barrier, - *server_timeout.range_retry_timeout(), *server_timeout.compact_timeout(), Arc::clone(&client), compact_events,