Skip to content

Commit

Permalink
refactor: execute in after sync
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Jul 25, 2024
1 parent 9b718c5 commit 52a9aa0
Show file tree
Hide file tree
Showing 13 changed files with 308 additions and 145 deletions.
6 changes: 6 additions & 0 deletions crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl From<DecodeError> for PbSerializeError {
}
}

#[allow(clippy::module_name_repetitions)]
/// After sync command type
#[derive(Debug)]
pub struct AfterSyncCmd<'a, C> {
Expand All @@ -204,16 +205,21 @@ pub struct AfterSyncCmd<'a, C> {

impl<'a, C> AfterSyncCmd<'a, C> {
/// Creates a new `AfterSyncCmd`
#[inline]
pub fn new(cmd: &'a C, to_exectue: bool) -> Self {
Self { cmd, to_exectue }
}

/// Gets the command
#[inline]
#[must_use]
pub fn cmd(&self) -> &'a C {
self.cmd
}

/// Convert self into parts
#[inline]
#[must_use]
pub fn into_parts(self) -> (&'a C, bool) {
(self.cmd, self.to_exectue)
}
Expand Down
31 changes: 24 additions & 7 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{fmt::Debug, iter, sync::Arc};

use async_trait::async_trait;
use clippy_utilities::NumericCast;
use curp_external_api::cmd::AfterSyncCmd;
#[cfg(test)]
use mockall::automock;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -145,6 +146,7 @@ async fn worker_exe<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
}

/// Cmd worker after sync handler
#[allow(clippy::too_many_lines)] // TODO: split this to multiple fns
async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
entry: Arc<LogEntry<C>>,
prepare: Option<C::PR>,
Expand All @@ -155,10 +157,20 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let id = curp.id();
let success = match entry.entry_data {
EntryData::Command(ref cmd) => {
let Some(prepare) = prepare else {
let Some(_prepare) = prepare else {
unreachable!("prepare should always be Some(_) when entry is a command");
};
let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await;
let asr = ce
.after_sync(vec![AfterSyncCmd::new(cmd.as_ref(), false)], entry.index)
.await
.map(|res| {
#[allow(clippy::expect_used)]
let (asr, _) = res
.into_iter()
.next()
.expect("the asr should always be Some");
asr
});
let asr_ok = asr.is_ok();
cb.write().insert_asr(entry.propose_id, asr);
sp.lock()
Expand Down Expand Up @@ -328,7 +340,8 @@ pub(crate) trait CEEventTxApi<C: Command>: Send + Sync + 'static {
/// Send cmd to background cmd worker for speculative execution
fn send_sp_exe(&self, entry: Arc<LogEntry<C>>);

/// Send after sync event to the background cmd worker so that after sync can be called
/// Send after sync event to the background cmd worker so that after sync
/// can be called
fn send_after_sync(&self, entry: Arc<LogEntry<C>>);

/// Send reset
Expand Down Expand Up @@ -398,7 +411,8 @@ impl<C: Command> TaskRxApi<C> for TaskRx<C> {
}
}

/// Run cmd execute workers. Each cmd execute worker will continually fetch task to perform from `task_rx`.
/// Run cmd execute workers. Each cmd execute worker will continually fetch task
/// to perform from `task_rx`.
pub(super) fn start_cmd_workers<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
cmd_executor: Arc<CE>,
curp: Arc<RawCurp<C, RC>>,
Expand Down Expand Up @@ -476,7 +490,8 @@ mod tests {
task_manager.shutdown(true).await;
}

// When the execution takes more time than sync, `as` should be called after exe has finished
// When the execution takes more time than sync, `as` should be called after exe
// has finished
#[traced_test]
#[tokio::test]
#[abort_on_panic]
Expand Down Expand Up @@ -524,7 +539,8 @@ mod tests {
task_manager.shutdown(true).await;
}

// When the execution takes more time than sync and fails, after sync should not be called
// When the execution takes more time than sync and fails, after sync should not
// be called
#[traced_test]
#[tokio::test]
#[abort_on_panic]
Expand Down Expand Up @@ -663,7 +679,8 @@ mod tests {
task_manager.shutdown(true).await;
}

// If cmd1 and cmd2 conflict, order will be (cmd1 exe) -> (cmd1 as) -> (cmd2 exe) -> (cmd2 as)
// If cmd1 and cmd2 conflict, order will be (cmd1 exe) -> (cmd1 as) -> (cmd2
// exe) -> (cmd2 as)
#[traced_test]
#[tokio::test]
#[abort_on_panic]
Expand Down
5 changes: 4 additions & 1 deletion crates/curp/src/server/storage/wal/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ impl<C> FrameEncoder for DataFrame<'_, C>
where
C: Serialize,
{
#[allow(clippy::arithmetic_side_effects)] // The integer shift is safe
#[allow(
clippy::arithmetic_side_effects, // The integer shift is safe
clippy::indexing_slicing // The slicing is checked
)]
fn encode(&self) -> Vec<u8> {
match *self {
DataFrame::Entry(ref entry) => {
Expand Down
13 changes: 8 additions & 5 deletions crates/xline/src/revision_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicI64, Ordering};
/// Revision number
#[derive(Debug)]
pub(crate) struct RevisionNumberGenerator {
/// The current revision number
current: AtomicI64,
}

Expand All @@ -16,12 +17,12 @@ impl RevisionNumberGenerator {

/// Get the current revision number
pub(crate) fn get(&self) -> i64 {
self.current.load(Ordering::Acquire)
self.current.load(Ordering::Relaxed)
}

/// Set the revision number
pub(crate) fn set(&self, rev: i64) {
self.current.store(rev, Ordering::Release);
self.current.store(rev, Ordering::Relaxed);
}

/// Gets a temporary state
Expand All @@ -42,24 +43,26 @@ impl Default for RevisionNumberGenerator {

/// Revision generator with temporary state
pub(crate) struct RevisionNumberGeneratorState<'a> {
/// The current revision number
current: &'a AtomicI64,
/// Next revision number
next: AtomicI64,
}

impl RevisionNumberGeneratorState<'_> {
/// Get the current revision number
pub(crate) fn get(&self) -> i64 {
self.next.load(Ordering::Acquire)
self.next.load(Ordering::Relaxed)
}

/// Increases the next revision number
pub(crate) fn next(&self) -> i64 {
self.next.fetch_add(1, Ordering::Release).wrapping_add(1)
self.next.fetch_add(1, Ordering::Relaxed).wrapping_add(1)
}

/// Commit the revision number
pub(crate) fn commit(&self) {
self.current
.store(self.next.load(Ordering::Acquire), Ordering::Release)
.store(self.next.load(Ordering::Relaxed), Ordering::Relaxed);
}
}
151 changes: 106 additions & 45 deletions crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use xlineapi::{
};

use crate::{
revision_number::RevisionNumberGeneratorState,
rpc::{RequestBackend, RequestWrapper},
storage::{
db::{WriteOp, DB},
index::IndexOperate,
storage_api::XlineStorageOps,
AlarmStore, AuthStore, KvStore, LeaseStore,
},
Expand Down Expand Up @@ -265,10 +267,85 @@ impl CommandExecutor {
_ => Ok(()),
}
}

/// After sync KV commands
async fn after_sync_kv<T>(
&self,
wrapper: &RequestWrapper,
txn_db: &T,
index: &(dyn IndexOperate + Send + Sync),
revision_gen: &RevisionNumberGeneratorState<'_>,
to_execute: bool,
) -> Result<
(
<Command as CurpCommand>::ASR,
Option<<Command as CurpCommand>::ER>,
),
ExecuteError,
>
where
T: XlineStorageOps + TransactionApi,
{
let (asr, er) = self
.kv_storage
.after_sync(wrapper, txn_db, index, revision_gen, to_execute)
.await?;
Ok((asr, er))
}

/// After sync other type of commands
async fn after_sync_others<T>(
&self,
wrapper: &RequestWrapper,
txn_db: &T,
general_revision: &RevisionNumberGeneratorState<'_>,
auth_revision: &RevisionNumberGeneratorState<'_>,
to_execute: bool,
) -> Result<
(
<Command as CurpCommand>::ASR,
Option<<Command as CurpCommand>::ER>,
),
ExecuteError,
>
where
T: XlineStorageOps + TransactionApi,
{
let er = to_execute
.then(|| match wrapper.backend() {
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
RequestBackend::Kv => unreachable!("Should not execute kv commands"),
})
.transpose()?;

let (asr, wr_ops) = match wrapper.backend() {
RequestBackend::Auth => self.auth_storage.after_sync(wrapper, auth_revision)?,
RequestBackend::Lease => {
self.lease_storage
.after_sync(wrapper, general_revision)
.await?
}
RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, general_revision),
RequestBackend::Kv => unreachable!("Should not sync kv commands"),
};

txn_db.write_ops(wr_ops)?;

Ok((asr, er))
}
}

#[async_trait::async_trait]
impl CurpCommandExecutor<Command> for CommandExecutor {
fn prepare(
&self,
_cmd: &Command,
) -> Result<<Command as CurpCommand>::PR, <Command as CurpCommand>::Error> {
Ok(-1)
}

async fn execute(
&self,
cmd: &Command,
Expand Down Expand Up @@ -301,22 +378,18 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
}
cmds.iter()
.map(AfterSyncCmd::cmd)
.map(|c| self.check_alarm(c))
.collect::<Result<_, _>>()?;
.try_for_each(|c| self.check_alarm(c))?;
let quota_enough = cmds
.iter()
.map(AfterSyncCmd::cmd)
.all(|c| self.quota_checker.check(c));
cmds.iter()
.map(AfterSyncCmd::cmd)
.map(|c| {
self.auth_storage
.check_permission(c.request(), c.auth_info())
})
.collect::<Result<_, _>>()?;
cmds.iter().map(AfterSyncCmd::cmd).try_for_each(|c| {
self.auth_storage
.check_permission(c.request(), c.auth_info())
})?;

let index = self.kv_storage.index();
let mut index_state = index.state();
let index_state = index.state();
let general_revision_gen = self.kv_storage.revision_gen();
let auth_revision_gen = self.auth_storage.revision_gen();
let general_revision_state = general_revision_gen.state();
Expand All @@ -328,53 +401,41 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
let mut resps = Vec::with_capacity(cmds.len());
for (cmd, to_execute) in cmds.into_iter().map(AfterSyncCmd::into_parts) {
let wrapper = cmd.request();
let er = to_execute
.then(|| match wrapper.backend() {
RequestBackend::Kv => self
.kv_storage
.execute(wrapper, Some((&txn_db, &mut index_state))),
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
})
.transpose()?;
tracing::info!("sync cmd: {cmd:?}");
if to_execute {
tracing::info!("execute in after sync for: {cmd:?}");
}
let (asr, wr_ops) = match wrapper.backend() {
RequestBackend::Kv => (
self.kv_storage
.after_sync(wrapper, &txn_db, &index_state, &general_revision_state)
.await?,
vec![],
),
RequestBackend::Auth => self
.auth_storage
.after_sync(wrapper, &auth_revision_state)?,
RequestBackend::Lease => {
self.lease_storage
.after_sync(wrapper, &general_revision_state)
.await?
let (asr, er) = match wrapper.backend() {
RequestBackend::Kv => {
self.after_sync_kv(
wrapper,
&txn_db,
&index_state,
&general_revision_state,
to_execute,
)
.await
}
RequestBackend::Alarm => self
.alarm_storage
.after_sync(wrapper, &general_revision_state),
};
txn_db.write_ops(wr_ops)?;
RequestBackend::Auth | RequestBackend::Lease | RequestBackend::Alarm => {
self.after_sync_others(
wrapper,
&txn_db,
&general_revision_state,
&auth_revision_state,
to_execute,
)
.await
}
}?;
resps.push((asr, er));

if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper {
if compact_req.physical {
if let Some(n) = self.compact_events.get(&cmd.compact_id()) {
n.notify(usize::MAX);
let _ignore = n.notify(usize::MAX);
}
}
};
if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper {
if compact_req.physical {
if let Some(n) = self.compact_events.get(&cmd.compact_id()) {
n.notify(usize::MAX);
let _ignore = n.notify(usize::MAX);
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion crates/xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl KvServer {
fn do_serializable(&self, command: &Command) -> Result<Response, tonic::Status> {
self.auth_storage
.check_permission(command.request(), command.auth_info())?;
let cmd_res = self.kv_storage.execute(command.request())?;
let cmd_res = self.kv_storage.execute(command.request(), None)?;
Ok(Self::parse_response_op(cmd_res.into_inner().into()))
}

Expand Down
Loading

0 comments on commit 52a9aa0

Please sign in to comment.