Skip to content

Commit

Permalink
fix: use execute_ro to speculative execute read-only commands
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Aug 23, 2024
1 parent 6b627c4 commit 7bf01d5
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
8 changes: 8 additions & 0 deletions crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ where
/// command.
fn execute(&self, cmd: &C) -> Result<C::ER, C::Error>;

/// Execute the read-only command
///
/// # Errors
///
/// This function may return an error if there is a problem executing the
/// command.
fn execute_ro(&self, cmd: &C) -> Result<(C::ER, C::ASR), C::Error>;

/// Batch execute the after_sync callback
///
/// This `highest_index` means the last log index of the `cmds`
Expand Down
10 changes: 10 additions & 0 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ impl CommandExecutor<TestCommand> for TestCE {
Ok(result)
}

fn execute_ro(
&self,
cmd: &TestCommand,
) -> Result<
(<TestCommand as Command>::ER, <TestCommand as Command>::ASR),
<TestCommand as Command>::Error,
> {
self.execute(cmd).map(|er| (er, LogIndexResult(0)))
}

fn after_sync(
&self,
cmds: Vec<AfterSyncCmd<'_, TestCommand>>,
Expand Down
7 changes: 1 addition & 6 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,7 @@ pub(super) fn execute<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
unreachable!("should not speculative execute {:?}", entry.entry_data);
};
if cmd.is_read_only() {
let result = ce
.after_sync(vec![AfterSyncCmd::new(cmd, true)], None)
.remove(0)?;
let (asr, er_opt) = result.into_parts();
let er = er_opt.unwrap_or_else(|| unreachable!("er should exist"));
Ok((er, Some(asr)))
ce.execute_ro(cmd).map(|(er, asr)| (er, Some(asr)))
} else {
let er = ce.execute(cmd);
let mut cb_w = cb.write();
Expand Down
20 changes: 19 additions & 1 deletion crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use parking_lot::RwLock;
use tracing::warn;
use utils::{barrier::IdBarrier, table_names::META_TABLE};
use xlineapi::{
command::{Command, CurpClient},
command::{Command, CurpClient, SyncResponse},
execute_error::ExecuteError,
AlarmAction, AlarmRequest, AlarmType,
};
Expand Down Expand Up @@ -429,6 +429,24 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
}
}

fn execute_ro(
&self,
cmd: &Command,
) -> Result<
(<Command as CurpCommand>::ER, <Command as CurpCommand>::ASR),
<Command as CurpCommand>::Error,
> {
let er = self.execute(cmd)?;
let wrapper = cmd.request();
let rev = match wrapper.backend() {
RequestBackend::Kv | RequestBackend::Lease | RequestBackend::Alarm => {
self.kv_storage.revision_gen().get()
}
RequestBackend::Auth => self.auth_storage.revision_gen().get(),
};
Ok((er, SyncResponse::new(rev)))
}

fn after_sync(
&self,
cmds: Vec<AfterSyncCmd<'_, Command>>,
Expand Down

0 comments on commit 7bf01d5

Please sign in to comment.