Skip to content

Commit

Permalink
feat: suppotr alarm
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Nov 24, 2023
1 parent d2d1069 commit 8b8a20d
Show file tree
Hide file tree
Showing 18 changed files with 597 additions and 116 deletions.
28 changes: 1 addition & 27 deletions curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Display, hash::Hash, sync::Arc};
use std::{fmt::Display, hash::Hash};

use async_trait::async_trait;
use engine::Snapshot;
Expand Down Expand Up @@ -114,29 +114,6 @@ impl ConflictCheck for u32 {
}
}

/// Quota checker
pub trait QuotaChecker<C>: Sync + Send + std::fmt::Debug
where
C: Command,
{
/// Check if the command executor has enough quota to execute the command
fn check(&self, cmd: &C) -> bool;
}

/// Pass through quota checker
#[derive(Debug, Clone, Copy, Default)]
#[non_exhaustive]
pub struct PassThrough;
impl<C> QuotaChecker<C> for PassThrough
where
C: Command,
{
#[inline]
fn check(&self, _cmd: &C) -> bool {
true
}
}

/// Command executor which actually executes the command.
/// It is usually defined by the protocol user.
#[async_trait]
Expand Down Expand Up @@ -172,9 +149,6 @@ where

/// Trigger the barrier of the given id and index.
fn trigger(&self, id: ProposeId, index: u64);

/// Get quota checker
fn quota_checker(&self) -> Arc<dyn QuotaChecker<C>>;
}

/// Codec for encoding and decoding data into/from the Protobuf format
Expand Down
15 changes: 1 addition & 14 deletions curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use async_trait::async_trait;
use curp_external_api::{
cmd::{Command, CommandExecutor, ConflictCheck, PassThrough, PbCodec, ProposeId, QuotaChecker},
cmd::{Command, CommandExecutor, ConflictCheck, PbCodec, ProposeId},
LogIndex,
};
use engine::{Engine, EngineType, Snapshot, SnapshotApi, StorageEngine, WriteOperation};
Expand Down Expand Up @@ -247,15 +247,6 @@ pub struct TestCE {
after_sync_sender: mpsc::UnboundedSender<(TestCommand, LogIndex)>,
}

impl<C> QuotaChecker<C> for TestCE
where
C: Command,
{
fn check(&self, _cmd: &C) -> bool {
true
}
}

#[async_trait]
impl CommandExecutor<TestCommand> for TestCE {
fn prepare(
Expand Down Expand Up @@ -437,10 +428,6 @@ impl CommandExecutor<TestCommand> for TestCE {
}

fn trigger(&self, _id: ProposeId, _index: u64) {}

fn quota_checker(&self) -> Arc<dyn QuotaChecker<TestCommand>> {
Arc::new(PassThrough::default())
}
}

impl TestCE {
Expand Down
3 changes: 0 additions & 3 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {

// create curp state machine
let (voted_for, entries) = storage.recover().await?;
let quota_checker = cmd_executor.quota_checker();
let curp = if voted_for.is_none() && entries.is_empty() {
Arc::new(RawCurp::new(
Arc::clone(&cluster_info),
Expand All @@ -817,7 +816,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
role_change,
shutdown_trigger,
connects,
quota_checker,
))
} else {
info!(
Expand All @@ -842,7 +840,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
role_change,
shutdown_trigger,
connects,
quota_checker,
))
};

Expand Down
16 changes: 1 addition & 15 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
};

use clippy_utilities::{NumericCast, OverflowArithmetic};
use curp_external_api::cmd::{ConflictCheck, QuotaChecker};
use curp_external_api::cmd::ConflictCheck;
use dashmap::DashMap;
use event_listener::Event;
use itertools::Itertools;
Expand Down Expand Up @@ -98,8 +98,6 @@ pub(super) struct RawCurp<C: Command, RC: RoleChange> {
ctx: Context<C, RC>,
/// Shutdown trigger
shutdown_trigger: shutdown::Trigger,
/// Quota checker
quota_checker: Arc<dyn QuotaChecker<C>>,
}

/// Actions of syncing
Expand Down Expand Up @@ -258,14 +256,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
},
));
}
if !self.quota_checker.check(cmd.as_ref()) {
warn!(
"{} has no enough quota to execute cmd({:?})",
self.id(),
cmd
);
return Err(CurpError::Internal("Quota exceeded".to_owned()));
}
let id = cmd.id();
if !self.ctx.cb.map_write(|mut cb_w| cb_w.sync.insert(id)) {
return Ok((info, Err(ProposeError::Duplicated)));
Expand Down Expand Up @@ -805,7 +795,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
role_change: RC,
shutdown_trigger: shutdown::Trigger,
connects: DashMap<ServerId, InnerConnectApiWrapper>,
quota_checker: Arc<dyn QuotaChecker<C>>,
) -> Self {
let (change_tx, change_rx) = flume::bounded(CHANGE_CHANNEL_SIZE);
let raw_curp = Self {
Expand Down Expand Up @@ -838,7 +827,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
last_conf_change_idx: AtomicU64::new(0),
},
shutdown_trigger,
quota_checker,
};
if is_leader {
let mut st_w = raw_curp.st.write();
Expand Down Expand Up @@ -867,7 +855,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
role_change: RC,
shutdown_trigger: shutdown::Trigger,
connects: DashMap<ServerId, InnerConnectApiWrapper>,
quota_checker: Arc<dyn QuotaChecker<C>>,
) -> Self {
let raw_curp = Self::new(
cluster_info,
Expand All @@ -882,7 +869,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
role_change,
shutdown_trigger,
connects,
quota_checker,
);

if let Some((term, server_id)) = voted_for {
Expand Down
3 changes: 0 additions & 3 deletions curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::time::Instant;

use curp_external_api::cmd::PassThrough;
use curp_test_utils::{
mock_role_change,
test_cmd::{next_id, TestCommand},
Expand Down Expand Up @@ -73,7 +72,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
.build()
.unwrap();
let (shutdown_trigger, _) = shutdown::channel();
let quota_checker = Arc::new(PassThrough::default());
Self::new(
cluster_info,
true,
Expand All @@ -87,7 +85,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
role_change,
shutdown_trigger,
connects,
quota_checker,
)
}

Expand Down
5 changes: 4 additions & 1 deletion engine/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ impl StorageEngine for Engine {

#[inline]
fn size(&self) -> u64 {
0
match *self {
Engine::Memory(ref e) => e.size(),
Engine::Rocks(ref e) => e.size(),
}
}

#[inline]
Expand Down
33 changes: 32 additions & 1 deletion xline-client/src/clients/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fmt::Debug, sync::Arc};

use tonic::{transport::Channel, Streaming};
use xlineapi::{SnapshotRequest, SnapshotResponse};
use xlineapi::{AlarmRequest, AlarmResponse, SnapshotRequest, SnapshotResponse};

use crate::{error::Result, AuthService};

Expand Down Expand Up @@ -66,4 +66,35 @@ impl MaintenanceClient {
pub async fn snapshot(&mut self) -> Result<Streaming<SnapshotResponse>> {
Ok(self.inner.snapshot(SnapshotRequest {}).await?.into_inner())
}

/// Sends a alarm request
///
/// # Errors
///
/// This function will return an error if the inner RPC client encountered a propose failure
///
/// # Examples
///
/// ```no_run
/// use xline_client::{Client, ClientOptions};
/// use anyhow::Result;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // the name and address of all curp members
/// let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"];
///
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .maintenance_client();
///
/// client.alarm(AlarmRequest::new(AlarmAction::Get, 0, AlarmType::None)).await?;
///
/// Ok(())
/// }
/// ```
#[inline]
pub async fn alarm(&mut self, request: AlarmRequest) -> Result<AlarmResponse> {
Ok(self.inner.alarm(request).await?.into_inner())
}
}
15 changes: 10 additions & 5 deletions xline-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct Cluster {
size: usize,
/// storage paths
paths: HashMap<usize, PathBuf>,
/// Cluster quotas
pub quotas: HashMap<usize, u64>,
}

impl Cluster {
Expand All @@ -48,6 +50,7 @@ impl Cluster {
client: None,
size,
paths: HashMap::new(),
quotas: HashMap::new(),
}
}

Expand Down Expand Up @@ -77,17 +80,19 @@ impl Cluster {
.collect(),
&name,
);
let storage_config = if let Some(quota) = self.quotas.get(&i) {
StorageConfig::new(EngineConfig::default(), *quota)
} else {
StorageConfig::default()
};
tokio::spawn(async move {
let server = XlineServer::new(
cluster_info.into(),
is_leader,
CurpConfig {
engine_cfg: EngineConfig::RocksDB(path.join("curp")),
..Default::default()
},
CurpConfig::default(),
ClientConfig::default(),
ServerTimeout::default(),
StorageConfig::default(),
storage_config,
CompactConfig::default(),
);
let result = server
Expand Down
Loading

0 comments on commit 8b8a20d

Please sign in to comment.