Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: implement xline conflict pools #769

Merged
merged 8 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/curp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bincode = "1.3.3"
bytes = "1.4.0"
clippy-utilities = "0.2.0"
curp-external-api = { path = "../curp-external-api" }
curp-test-utils = { path = "../curp-test-utils" }
dashmap = "5.5.0"
derive_builder = "0.20.0"
engine = { path = "../engine" }
Expand Down
2 changes: 1 addition & 1 deletion crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ impl<C> From<Vec<ConfChange>> for PoolEntryInner<C> {
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Ord, PartialOrd, Default,
)]
#[allow(clippy::exhaustive_structs)] // It is exhaustive
pub struct ProposeId(pub(crate) u64, pub(crate) u64);
pub struct ProposeId(pub u64, pub u64);

impl std::fmt::Display for ProposeId {
#[inline]
Expand Down
42 changes: 42 additions & 0 deletions crates/curp/src/server/conflict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub(crate) mod uncommitted_pool;
#[cfg(test)]
mod tests;

/// Conflict pool used in tests
#[doc(hidden)]
pub mod test_pools;

use std::{ops::Deref, sync::Arc};

use crate::rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId};
Expand Down Expand Up @@ -47,6 +51,14 @@ pub struct CommandEntry<C> {
cmd: Arc<C>,
}

impl<C> CommandEntry<C> {
/// Creates a new `CommandEntry`
#[inline]
pub fn new(id: ProposeId, cmd: Arc<C>) -> Self {
Self { id, cmd }
}
}

impl<C> Clone for CommandEntry<C> {
#[inline]
fn clone(&self) -> Self {
Expand All @@ -73,6 +85,36 @@ impl<C> AsRef<C> for CommandEntry<C> {
}
}

impl<C> std::hash::Hash for CommandEntry<C> {
#[inline]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}

impl<C> PartialEq for CommandEntry<C> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.id.eq(&other.id)
}
}

impl<C> Eq for CommandEntry<C> {}

impl<C> PartialOrd for CommandEntry<C> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl<C> Ord for CommandEntry<C> {
#[inline]
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.id.cmp(&other.id)
}
}

impl<C> From<CommandEntry<C>> for PoolEntry<C> {
fn from(entry: CommandEntry<C>) -> Self {
PoolEntry {
Expand Down
108 changes: 108 additions & 0 deletions crates/curp/src/server/conflict/test_pools.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use curp_external_api::{
cmd::ConflictCheck,
conflict::{ConflictPoolOp, SpeculativePoolOp, UncommittedPoolOp},
};
use curp_test_utils::test_cmd::TestCommand;

use super::CommandEntry;

#[derive(Debug, Default)]
pub struct TestSpecPool {
cmds: Vec<CommandEntry<TestCommand>>,
}

impl ConflictPoolOp for TestSpecPool {
type Entry = CommandEntry<TestCommand>;

#[inline]
fn len(&self) -> usize {
self.cmds.len()
}

#[inline]
fn is_empty(&self) -> bool {
self.cmds.is_empty()
}

#[inline]
fn remove(&mut self, entry: Self::Entry) {
if let Some(idx) = self.cmds.iter().position(|c| *c == entry) {
let _ignore = self.cmds.remove(idx);
}
}

#[inline]
fn all(&self) -> Vec<Self::Entry> {
self.cmds.clone()
}

#[inline]
fn clear(&mut self) {
self.cmds.clear();
}
}

impl SpeculativePoolOp for TestSpecPool {
#[inline]
fn insert_if_not_conflict(&mut self, entry: Self::Entry) -> Option<Self::Entry> {
if self.cmds.iter().any(|t| t.is_conflict(&entry)) {
return Some(entry);
}
self.cmds.push(entry);
None
}
}

#[derive(Debug, Default)]
pub struct TestUncomPool {
cmds: Vec<CommandEntry<TestCommand>>,
}

impl ConflictPoolOp for TestUncomPool {
type Entry = CommandEntry<TestCommand>;

#[inline]
fn all(&self) -> Vec<Self::Entry> {
self.cmds.clone()
}

#[inline]
fn len(&self) -> usize {
self.cmds.len()
}

#[inline]
fn is_empty(&self) -> bool {
self.cmds.is_empty()
}

#[inline]
fn remove(&mut self, entry: Self::Entry) {
if let Some(idx) = self.cmds.iter().position(|c| *c == entry) {
let _ignore = self.cmds.remove(idx);
}
}

#[inline]
fn clear(&mut self) {
self.cmds.clear();
}
}

impl UncommittedPoolOp for TestUncomPool {
#[inline]
fn insert(&mut self, entry: Self::Entry) -> bool {
let conflict = self.cmds.iter().any(|t| t.is_conflict(&entry));
self.cmds.push(entry);
conflict
}

#[inline]
fn all_conflict(&self, entry: &Self::Entry) -> Vec<Self::Entry> {
self.cmds
.iter()
.filter(|t| t.is_conflict(entry))
.map(Clone::clone)
.collect()
}
}
1 change: 1 addition & 0 deletions crates/xline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ clippy-utilities = "0.2.0"
crc32fast = "1.4.0"
crossbeam-skiplist = "0.1.1"
curp = { path = "../curp", version = "0.1.0", features = ["client-metrics"] }
curp-external-api = { path = "../curp-external-api" }
dashmap = "5.5.3"
engine = { path = "../engine" }
event-listener = "2.5.2"
Expand Down
55 changes: 55 additions & 0 deletions crates/xline/src/conflict/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use curp::cmd::Command as CurpCommand;
use utils::interval_map::Interval;
use xlineapi::{command::Command, interval::BytesAffine, RequestBackend, RequestWrapper};

/// Speculative pool implementations
pub(crate) mod spec_pool;
/// Uncommitted pool implementations
pub(crate) mod uncommitted_pool;

/// Tests
#[cfg(test)]
mod tests;

/// Returns command intervals
fn intervals<C>(entry: &C) -> Vec<Interval<BytesAffine>>
rogercloud marked this conversation as resolved.
Show resolved Hide resolved
where
C: AsRef<Command>,
{
entry
.as_ref()
.keys()
.iter()
.cloned()
.map(Into::into)
.collect()
}

/// Filter kv commands
fn filter_kv<C>(entry: C) -> Option<C>
where
C: AsRef<Command>,
{
matches!(entry.as_ref().request().backend(), RequestBackend::Kv).then_some(entry)
}

/// Returns `true` if this command conflicts with all other commands
fn is_exclusive_cmd(cmd: &Command) -> bool {
matches!(
*cmd.request(),
RequestWrapper::CompactionRequest(_)
| RequestWrapper::AuthEnableRequest(_)
| RequestWrapper::AuthDisableRequest(_)
| RequestWrapper::AuthRoleAddRequest(_)
| RequestWrapper::AuthRoleDeleteRequest(_)
| RequestWrapper::AuthRoleGrantPermissionRequest(_)
| RequestWrapper::AuthRoleRevokePermissionRequest(_)
| RequestWrapper::AuthUserAddRequest(_)
| RequestWrapper::AuthUserChangePasswordRequest(_)
| RequestWrapper::AuthUserDeleteRequest(_)
| RequestWrapper::AuthUserGrantRoleRequest(_)
| RequestWrapper::AuthUserRevokeRoleRequest(_)
| RequestWrapper::AuthenticateRequest(_)
| RequestWrapper::AlarmRequest(_)
)
}
Loading
Loading