Skip to content

Commit

Permalink
refactor: remove read state from kv_server (xline-kv#857)
Browse files Browse the repository at this point in the history
refactor: disable fast path in etcd competible layer

WIP: fix rebase kv server
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Jul 12, 2024
1 parent 5634782 commit 9dabf85
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 227 deletions.
114 changes: 0 additions & 114 deletions crates/xline/src/server/barriers.rs

This file was deleted.

131 changes: 25 additions & 106 deletions crates/xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,18 @@ use std::{
time::Duration,
};

use curp::{rpc::ReadState, InflightId};
use dashmap::DashMap;
use event_listener::Event;
use futures::future::{join_all, Either};
use futures::future::Either;
use tokio::time::timeout;
use tracing::{debug, instrument};
use utils::barrier::IdBarrier;
use xlineapi::{
command::{Command, CommandResponse, CurpClient, SyncResponse},
execute_error::ExecuteError,
request_validation::RequestValidator,
AuthInfo, ResponseWrapper,
};

use super::barriers::IndexBarrier;
use crate::{
metrics,
revision_check::RevisionCheck,
rpc::{
CompactionRequest, CompactionResponse, DeleteRangeRequest, DeleteRangeResponse, Kv,
Expand All @@ -38,12 +33,6 @@ pub(crate) struct KvServer {
kv_storage: Arc<KvStore>,
/// Auth storage
auth_storage: Arc<AuthStore>,
/// Barrier for applied index
index_barrier: Arc<IndexBarrier>,
/// Barrier for propose id
id_barrier: Arc<IdBarrier<InflightId>>,
/// Range request retry timeout
range_retry_timeout: Duration,
/// Compact timeout
compact_timeout: Duration,
/// Consensus client
Expand All @@ -60,19 +49,13 @@ impl KvServer {
pub(crate) fn new(
kv_storage: Arc<KvStore>,
auth_storage: Arc<AuthStore>,
index_barrier: Arc<IndexBarrier>,
id_barrier: Arc<IdBarrier<InflightId>>,
range_retry_timeout: Duration,
compact_timeout: Duration,
client: Arc<CurpClient>,
compact_events: Arc<DashMap<u64, Arc<Event>>>,
) -> Self {
Self {
kv_storage,
auth_storage,
index_barrier,
id_barrier,
range_retry_timeout,
compact_timeout,
client,
compact_events,
Expand Down Expand Up @@ -143,48 +126,6 @@ impl KvServer {
}
};
}

/// check whether the required revision is compacted or not
fn check_range_compacted(
range_revision: i64,
compacted_revision: i64,
) -> Result<(), tonic::Status> {
(range_revision <= 0 || range_revision >= compacted_revision)
.then_some(())
.ok_or(ExecuteError::RevisionCompacted(range_revision, compacted_revision).into())
}

/// Wait current node's state machine apply the conflict commands
async fn wait_read_state(&self, cmd: &Command) -> Result<(), tonic::Status> {
loop {
let rd_state = self.client.fetch_read_state(cmd).await.map_err(|e| {
metrics::get().read_indexes_failed_total.add(1, &[]);
e
})?;
let wait_future = async move {
match rd_state {
ReadState::Ids(id_set) => {
debug!(?id_set, "Range wait for command ids");
let fus = id_set
.inflight_ids
.into_iter()
.map(|id| self.id_barrier.wait(id))
.collect::<Vec<_>>();
let _ignore = join_all(fus).await;
}
ReadState::CommitIndex(index) => {
debug!(?index, "Range wait for commit index");
self.index_barrier.wait(index).await;
}
}
};
if timeout(self.range_retry_timeout, wait_future).await.is_ok() {
break;
}
metrics::get().slow_read_indexes_total.add(1, &[]);
}
Ok(())
}
}

#[tonic::async_trait]
Expand All @@ -203,22 +144,21 @@ impl Kv for KvServer {
self.kv_storage.revision(),
)?;
let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?;
let range_required_revision = range_req.revision;
let is_serializable = range_req.serializable;
let request = RequestWrapper::from(request.into_inner());
let cmd = Command::new_with_auth_info(request, auth_info);
if !is_serializable {
self.wait_read_state(&cmd).await?;
// Double check whether the range request is compacted or not since the compaction request
// may be executed during the process of `wait_read_state` which results in the result of
// previous `check_range_request` outdated.
Self::check_range_compacted(
range_required_revision,
self.kv_storage.compacted_revision(),
)?;
}
let res = if !is_serializable {
let (cmd_res, sync_res) = self.propose(request.into_inner(), auth_info, false).await?;
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
let revision = sync_res.revision();
debug!("Get revision {:?} for PutRequest", revision);
Self::update_header_revision(&mut res, revision);
}
res
} else {
let cmd = Command::new_with_auth_info(request.into_inner().into(), auth_info);
self.do_serializable(&cmd)?
};

let res = self.do_serializable(&cmd)?;
if let Response::ResponseRange(response) = res {
Ok(tonic::Response::new(response))
} else {
Expand All @@ -236,12 +176,9 @@ impl Kv for KvServer {
) -> Result<tonic::Response<PutResponse>, tonic::Status> {
let put_req: &PutRequest = request.get_ref();
put_req.validation()?;
debug!("Receive grpc request: {}", put_req);
debug!("Receive grpc request: {:?}", put_req);
let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?;
let is_fast_path = true;
let (cmd_res, sync_res) = self
.propose(request.into_inner(), auth_info, is_fast_path)
.await?;
let (cmd_res, sync_res) = self.propose(request.into_inner(), auth_info, false).await?;
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
let revision = sync_res.revision();
Expand All @@ -265,12 +202,9 @@ impl Kv for KvServer {
) -> Result<tonic::Response<DeleteRangeResponse>, tonic::Status> {
let delete_range_req = request.get_ref();
delete_range_req.validation()?;
debug!("Receive grpc request: {}", delete_range_req);
debug!("Receive grpc request: {:?}", delete_range_req);
let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?;
let is_fast_path = true;
let (cmd_res, sync_res) = self
.propose(request.into_inner(), auth_info, is_fast_path)
.await?;
let (cmd_res, sync_res) = self.propose(request.into_inner(), auth_info, false).await?;
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
let revision = sync_res.revision();
Expand Down Expand Up @@ -301,28 +235,13 @@ impl Kv for KvServer {
self.kv_storage.revision(),
)?;
let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?;
let res = if txn_req.is_read_only() {
debug!("TxnRequest is read only");
let is_serializable = txn_req.is_serializable();
let request = RequestWrapper::from(request.into_inner());
let cmd = Command::new_with_auth_info(request, auth_info);
if !is_serializable {
self.wait_read_state(&cmd).await?;
}
self.do_serializable(&cmd)?
} else {
let is_fast_path = true;
let (cmd_res, sync_res) = self
.propose(request.into_inner(), auth_info, is_fast_path)
.await?;
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
let revision = sync_res.revision();
debug!("Get revision {} for TxnRequest", revision);
Self::update_header_revision(&mut res, revision);
}
res
};
let (cmd_res, sync_res) = self.propose(request.into_inner(), auth_info, false).await?;
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
let revision = sync_res.revision();
debug!("Get revision {:?} for TxnRequest", revision);
Self::update_header_revision(&mut res, revision);
}
if let Response::ResponseTxn(response) = res {
Ok(tonic::Response::new(response))
} else {
Expand Down
2 changes: 0 additions & 2 deletions crates/xline/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
mod auth_server;
/// Auth Wrapper
mod auth_wrapper;
/// Barriers for range requests
mod barriers;
/// Cluster server
mod cluster_server;
/// Command to be executed
Expand Down
5 changes: 0 additions & 5 deletions crates/xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use xlineapi::command::{Command, CurpClient};
use super::{
auth_server::AuthServer,
auth_wrapper::AuthWrapper,
barriers::IndexBarrier,
cluster_server::ClusterServer,
command::{Alarmer, CommandExecutor},
kv_server::KvServer,
Expand Down Expand Up @@ -466,7 +465,6 @@ impl XlineServer {
)
.await?;

let index_barrier = Arc::new(IndexBarrier::new());
let id_barrier = Arc::new(IdBarrier::new());
let compact_events = Arc::new(DashMap::new());
let ce = Arc::new(CommandExecutor::new(
Expand Down Expand Up @@ -548,9 +546,6 @@ impl XlineServer {
KvServer::new(
Arc::clone(&kv_storage),
Arc::clone(&auth_storage),
index_barrier,
id_barrier,
*server_timeout.range_retry_timeout(),
*server_timeout.compact_timeout(),
Arc::clone(&client),
compact_events,
Expand Down

0 comments on commit 9dabf85

Please sign in to comment.