From 8b8a20d7212a5c9f36fb779adb023fbd5e2e4701 Mon Sep 17 00:00:00 2001 From: themanforfree Date: Fri, 24 Nov 2023 10:03:06 +0800 Subject: [PATCH] feat: suppotr alarm Signed-off-by: themanforfree --- curp-external-api/src/cmd.rs | 28 +-- curp-test-utils/src/test_cmd.rs | 15 +- curp/src/server/curp_node.rs | 3 - curp/src/server/raw_curp/mod.rs | 16 +- curp/src/server/raw_curp/tests.rs | 3 - engine/src/proxy.rs | 5 +- xline-client/src/clients/maintenance.rs | 33 +++- xline-test-utils/src/lib.rs | 15 +- xline/src/server/command.rs | 146 ++++++++++++++-- xline/src/server/maintenance.rs | 50 +++++- xline/src/server/xline_server.rs | 38 ++-- xline/src/storage/alarm_store.rs | 222 ++++++++++++++++++++++++ xline/src/storage/db.rs | 26 ++- xline/src/storage/execute_error.rs | 10 ++ xline/src/storage/mod.rs | 5 +- xline/tests/it/maintenance_test.rs | 45 ++++- xlineapi/proto | 2 +- xlineapi/src/lib.rs | 51 ++++-- 18 files changed, 597 insertions(+), 116 deletions(-) create mode 100644 xline/src/storage/alarm_store.rs diff --git a/curp-external-api/src/cmd.rs b/curp-external-api/src/cmd.rs index 4caf358de..eb6e9cd1e 100644 --- a/curp-external-api/src/cmd.rs +++ b/curp-external-api/src/cmd.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, hash::Hash, sync::Arc}; +use std::{fmt::Display, hash::Hash}; use async_trait::async_trait; use engine::Snapshot; @@ -114,29 +114,6 @@ impl ConflictCheck for u32 { } } -/// Quota checker -pub trait QuotaChecker: Sync + Send + std::fmt::Debug -where - C: Command, -{ - /// Check if the command executor has enough quota to execute the command - fn check(&self, cmd: &C) -> bool; -} - -/// Pass through quota checker -#[derive(Debug, Clone, Copy, Default)] -#[non_exhaustive] -pub struct PassThrough; -impl QuotaChecker for PassThrough -where - C: Command, -{ - #[inline] - fn check(&self, _cmd: &C) -> bool { - true - } -} - /// Command executor which actually executes the command. /// It is usually defined by the protocol user. #[async_trait] @@ -172,9 +149,6 @@ where /// Trigger the barrier of the given id and index. fn trigger(&self, id: ProposeId, index: u64); - - /// Get quota checker - fn quota_checker(&self) -> Arc>; } /// Codec for encoding and decoding data into/from the Protobuf format diff --git a/curp-test-utils/src/test_cmd.rs b/curp-test-utils/src/test_cmd.rs index a4279f6a0..f86ea89ec 100644 --- a/curp-test-utils/src/test_cmd.rs +++ b/curp-test-utils/src/test_cmd.rs @@ -9,7 +9,7 @@ use std::{ use async_trait::async_trait; use curp_external_api::{ - cmd::{Command, CommandExecutor, ConflictCheck, PassThrough, PbCodec, ProposeId, QuotaChecker}, + cmd::{Command, CommandExecutor, ConflictCheck, PbCodec, ProposeId}, LogIndex, }; use engine::{Engine, EngineType, Snapshot, SnapshotApi, StorageEngine, WriteOperation}; @@ -247,15 +247,6 @@ pub struct TestCE { after_sync_sender: mpsc::UnboundedSender<(TestCommand, LogIndex)>, } -impl QuotaChecker for TestCE -where - C: Command, -{ - fn check(&self, _cmd: &C) -> bool { - true - } -} - #[async_trait] impl CommandExecutor for TestCE { fn prepare( @@ -437,10 +428,6 @@ impl CommandExecutor for TestCE { } fn trigger(&self, _id: ProposeId, _index: u64) {} - - fn quota_checker(&self) -> Arc> { - Arc::new(PassThrough::default()) - } } impl TestCE { diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index e5a0669d0..aae29b53b 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -802,7 +802,6 @@ impl CurpNode { // create curp state machine let (voted_for, entries) = storage.recover().await?; - let quota_checker = cmd_executor.quota_checker(); let curp = if voted_for.is_none() && entries.is_empty() { Arc::new(RawCurp::new( Arc::clone(&cluster_info), @@ -817,7 +816,6 @@ impl CurpNode { role_change, shutdown_trigger, connects, - quota_checker, )) } else { info!( @@ -842,7 +840,6 @@ impl CurpNode { role_change, shutdown_trigger, connects, - quota_checker, )) }; diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index fa05e80f7..ac277612a 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -20,7 +20,7 @@ use std::{ }; use clippy_utilities::{NumericCast, OverflowArithmetic}; -use curp_external_api::cmd::{ConflictCheck, QuotaChecker}; +use curp_external_api::cmd::ConflictCheck; use dashmap::DashMap; use event_listener::Event; use itertools::Itertools; @@ -98,8 +98,6 @@ pub(super) struct RawCurp { ctx: Context, /// Shutdown trigger shutdown_trigger: shutdown::Trigger, - /// Quota checker - quota_checker: Arc>, } /// Actions of syncing @@ -258,14 +256,6 @@ impl RawCurp { }, )); } - if !self.quota_checker.check(cmd.as_ref()) { - warn!( - "{} has no enough quota to execute cmd({:?})", - self.id(), - cmd - ); - return Err(CurpError::Internal("Quota exceeded".to_owned())); - } let id = cmd.id(); if !self.ctx.cb.map_write(|mut cb_w| cb_w.sync.insert(id)) { return Ok((info, Err(ProposeError::Duplicated))); @@ -805,7 +795,6 @@ impl RawCurp { role_change: RC, shutdown_trigger: shutdown::Trigger, connects: DashMap, - quota_checker: Arc>, ) -> Self { let (change_tx, change_rx) = flume::bounded(CHANGE_CHANNEL_SIZE); let raw_curp = Self { @@ -838,7 +827,6 @@ impl RawCurp { last_conf_change_idx: AtomicU64::new(0), }, shutdown_trigger, - quota_checker, }; if is_leader { let mut st_w = raw_curp.st.write(); @@ -867,7 +855,6 @@ impl RawCurp { role_change: RC, shutdown_trigger: shutdown::Trigger, connects: DashMap, - quota_checker: Arc>, ) -> Self { let raw_curp = Self::new( cluster_info, @@ -882,7 +869,6 @@ impl RawCurp { role_change, shutdown_trigger, connects, - quota_checker, ); if let Some((term, server_id)) = voted_for { diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index 8a8f5c915..b5862050f 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -1,6 +1,5 @@ use std::time::Instant; -use curp_external_api::cmd::PassThrough; use curp_test_utils::{ mock_role_change, test_cmd::{next_id, TestCommand}, @@ -73,7 +72,6 @@ impl RawCurp { .build() .unwrap(); let (shutdown_trigger, _) = shutdown::channel(); - let quota_checker = Arc::new(PassThrough::default()); Self::new( cluster_info, true, @@ -87,7 +85,6 @@ impl RawCurp { role_change, shutdown_trigger, connects, - quota_checker, ) } diff --git a/engine/src/proxy.rs b/engine/src/proxy.rs index 273356c48..17700dc31 100644 --- a/engine/src/proxy.rs +++ b/engine/src/proxy.rs @@ -118,7 +118,10 @@ impl StorageEngine for Engine { #[inline] fn size(&self) -> u64 { - 0 + match *self { + Engine::Memory(ref e) => e.size(), + Engine::Rocks(ref e) => e.size(), + } } #[inline] diff --git a/xline-client/src/clients/maintenance.rs b/xline-client/src/clients/maintenance.rs index 94d24d135..b66386c3a 100644 --- a/xline-client/src/clients/maintenance.rs +++ b/xline-client/src/clients/maintenance.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, sync::Arc}; use tonic::{transport::Channel, Streaming}; -use xlineapi::{SnapshotRequest, SnapshotResponse}; +use xlineapi::{AlarmRequest, AlarmResponse, SnapshotRequest, SnapshotResponse}; use crate::{error::Result, AuthService}; @@ -66,4 +66,35 @@ impl MaintenanceClient { pub async fn snapshot(&mut self) -> Result> { Ok(self.inner.snapshot(SnapshotRequest {}).await?.into_inner()) } + + /// Sends a alarm request + /// + /// # Errors + /// + /// This function will return an error if the inner RPC client encountered a propose failure + /// + /// # Examples + /// + /// ```no_run + /// use xline_client::{Client, ClientOptions}; + /// use anyhow::Result; + /// + /// #[tokio::main] + /// async fn main() -> Result<()> { + /// // the name and address of all curp members + /// let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"]; + /// + /// let mut client = Client::connect(curp_members, ClientOptions::default()) + /// .await? + /// .maintenance_client(); + /// + /// client.alarm(AlarmRequest::new(AlarmAction::Get, 0, AlarmType::None)).await?; + /// + /// Ok(()) + /// } + /// ``` + #[inline] + pub async fn alarm(&mut self, request: AlarmRequest) -> Result { + Ok(self.inner.alarm(request).await?.into_inner()) + } } diff --git a/xline-test-utils/src/lib.rs b/xline-test-utils/src/lib.rs index e33431c63..c4966c0a4 100644 --- a/xline-test-utils/src/lib.rs +++ b/xline-test-utils/src/lib.rs @@ -29,6 +29,8 @@ pub struct Cluster { size: usize, /// storage paths paths: HashMap, + /// Cluster quotas + pub quotas: HashMap, } impl Cluster { @@ -48,6 +50,7 @@ impl Cluster { client: None, size, paths: HashMap::new(), + quotas: HashMap::new(), } } @@ -77,17 +80,19 @@ impl Cluster { .collect(), &name, ); + let storage_config = if let Some(quota) = self.quotas.get(&i) { + StorageConfig::new(EngineConfig::default(), *quota) + } else { + StorageConfig::default() + }; tokio::spawn(async move { let server = XlineServer::new( cluster_info.into(), is_leader, - CurpConfig { - engine_cfg: EngineConfig::RocksDB(path.join("curp")), - ..Default::default() - }, + CurpConfig::default(), ClientConfig::default(), ServerTimeout::default(), - StorageConfig::default(), + storage_config, CompactConfig::default(), ); let result = server diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index fcb648286..8429b43c9 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -6,24 +6,33 @@ use std::{ use clippy_utilities::{Cast, OverflowArithmetic}; use curp::{ + client::Client, cmd::{ Command as CurpCommand, CommandExecutor as CurpCommandExecutor, ConflictCheck, PbCodec, - PbSerializeError, ProposeId, QuotaChecker, + PbSerializeError, ProposeId, }, error::ClientError, + members::ServerId, LogIndex, }; use engine::Snapshot; use itertools::Itertools; use prost::Message; use serde::{Deserialize, Serialize}; -use xlineapi::{PbCommand, PbCommandResponse, PbKeyRange, PbSyncResponse, PutRequest, TxnRequest}; +use tracing::warn; +use xlineapi::{ + AlarmAction, AlarmRequest, AlarmType, PbCommand, PbCommandResponse, PbKeyRange, PbSyncResponse, + PutRequest, TxnRequest, +}; use super::barriers::{IdBarrier, IndexBarrier}; use crate::{ revision_number::RevisionNumberGenerator, rpc::{Request, RequestBackend, RequestWithToken, RequestWrapper, ResponseWrapper}, - storage::{db::WriteOp, storage_api::StorageApi, AuthStore, ExecuteError, KvStore, LeaseStore}, + storage::{ + db::WriteOp, storage_api::StorageApi, AlarmStore, AuthStore, ExecuteError, KvStore, + LeaseStore, + }, }; /// Meta table name @@ -257,6 +266,8 @@ where auth_storage: Arc>, /// Lease Storage lease_storage: Arc>, + /// Alarm Storage + alarm_storage: Arc>, /// persistent storage persistent: Arc, /// Barrier for applied index @@ -268,7 +279,15 @@ where /// Revision Number generator for Auth request auth_rev: Arc, /// Quota checker - quota_checker: Arc>, + quota_checker: Arc, + /// Alarmer + alarmer: Alarmer, +} + +/// Quota checker +pub(crate) trait QuotaChecker: Sync + Send + std::fmt::Debug { + /// Check if the command executor has enough quota to execute the command + fn check(&self, cmd: &Command) -> bool; } /// Quota checker for `Command` @@ -355,20 +374,28 @@ where | RequestWrapper::AuthUserRevokeRoleRequest(_) | RequestWrapper::AuthenticateRequest(_) | RequestWrapper::LeaseRevokeRequest(_) - | RequestWrapper::LeaseLeasesRequest(_) => 0, + | RequestWrapper::LeaseLeasesRequest(_) + | RequestWrapper::AlarmRequest(_) => 0, } } } -impl QuotaChecker for CommandQuotaChecker +impl QuotaChecker for CommandQuotaChecker where S: StorageApi, { fn check(&self, cmd: &Command) -> bool { + if !cmd.need_check_quota() { + return true; + } let cmd_size = Self::cmd_size(&cmd.request.request); if self.persistent.size().overflow_add(cmd_size) > self.quota { if let Ok(file_size) = self.persistent.file_size() { if file_size.overflow_add(cmd_size) > self.quota { + warn!( + "Quota exceeded, file size: {}, cmd size: {}, quota: {}", + file_size, cmd_size, self.quota + ); return false; } } @@ -377,6 +404,44 @@ where } } +/// Alarmer +#[derive(Debug, Clone)] +struct Alarmer { + /// Node id + id: ServerId, + /// Client + client: Arc>, +} + +impl Alarmer { + /// Create a new `Alarmer` + fn new(id: ServerId, client: Arc>) -> Self { + Self { id, client } + } + + /// Propose alarm request to other nodes + async fn alarm( + &self, + action: AlarmAction, + alarm: AlarmType, + ) -> Result<(), ClientError> { + let id = self.client.gen_propose_id().await?; + let alarm_req = AlarmRequest::new(action, self.id, alarm); + _ = self + .client + .propose( + Command::new( + vec![], + RequestWithToken::new(RequestWrapper::AlarmRequest(alarm_req)), + id, + ), + true, + ) + .await?; + Ok(()) + } +} + impl CommandExecutor where S: StorageApi, @@ -387,24 +452,54 @@ where kv_storage: Arc>, auth_storage: Arc>, lease_storage: Arc>, + alarm_storage: Arc>, persistent: Arc, index_barrier: Arc, id_barrier: Arc, general_rev: Arc, auth_rev: Arc, quota: u64, + node_id: ServerId, + client: Arc>, ) -> Self { + let alarmer = Alarmer::new(node_id, client); let quota_checker = Arc::new(CommandQuotaChecker::new(quota, Arc::clone(&persistent))); Self { kv_storage, auth_storage, lease_storage, + alarm_storage, persistent, index_barrier, id_barrier, general_rev, auth_rev, quota_checker, + alarmer, + } + } + + /// Check if the alarm is activated + fn check_alarm(&self, cmd: &Command) -> Result<(), ExecuteError> { + #[allow(clippy::wildcard_enum_match_arm)] + match cmd.request().request { + RequestWrapper::PutRequest(_) + | RequestWrapper::TxnRequest(_) + | RequestWrapper::LeaseGrantRequest(_) => match self.alarm_storage.current_alarm() { + AlarmType::Corrupt => Err(ExecuteError::DbError("Corrupt".to_owned())), + AlarmType::Nospace => Err(ExecuteError::Nospace), + AlarmType::None => Ok(()), + }, + + RequestWrapper::RangeRequest(_) + | RequestWrapper::DeleteRangeRequest(_) + | RequestWrapper::LeaseRevokeRequest(_) + | RequestWrapper::CompactionRequest(_) => match self.alarm_storage.current_alarm() { + AlarmType::Corrupt => Err(ExecuteError::DbError("Corrupt".to_owned())), + AlarmType::Nospace | AlarmType::None => Ok(()), + }, + + _ => Ok(()), } } } @@ -418,6 +513,7 @@ where &self, cmd: &Command, ) -> Result<::PR, ::Error> { + self.check_alarm(cmd)?; let wrapper = cmd.request(); self.auth_storage.check_permission(wrapper)?; let revision = match wrapper.request.backend() { @@ -435,6 +531,7 @@ where self.general_rev.next() } } + RequestBackend::Alarm => -1, }; Ok(revision) } @@ -448,6 +545,7 @@ where RequestBackend::Kv => self.kv_storage.execute(wrapper), RequestBackend::Auth => self.auth_storage.execute(wrapper), RequestBackend::Lease => self.lease_storage.execute(wrapper), + RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)), } } @@ -464,6 +562,7 @@ where RequestBackend::Kv => self.kv_storage.after_sync(wrapper, revision).await?, RequestBackend::Auth => self.auth_storage.after_sync(wrapper, revision)?, RequestBackend::Lease => self.lease_storage.after_sync(wrapper, revision).await?, + RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, revision), }; ops.append(&mut wr_ops); let key_revisions = self.persistent.flush_ops(ops)?; @@ -472,7 +571,15 @@ where } self.lease_storage.mark_lease_synced(&wrapper.request); if !quota_enough { - return Err(ExecuteError::DbError("Quota exceeded".to_owned())); + let alarmer = self.alarmer.clone(); + let _ig = tokio::spawn(async move { + if let Err(e) = alarmer + .alarm(AlarmAction::Activate, AlarmType::Nospace) + .await + { + warn!("{} propose alarm failed: {:?}", alarmer.id, e); + } + }); } Ok(res) } @@ -518,10 +625,6 @@ where self.id_barrier.trigger(id); self.index_barrier.trigger(index); } - - fn quota_checker(&self) -> Arc> { - Arc::clone(&self.quota_checker) - } } /// Command to run consensus protocol @@ -592,7 +695,8 @@ fn get_lease_ids(wrapper: &RequestWrapper) -> HashSet { | RequestWrapper::AuthUserListRequest(_) | RequestWrapper::AuthUserRevokeRoleRequest(_) | RequestWrapper::AuthenticateRequest(_) - | RequestWrapper::LeaseLeasesRequest(_) => HashSet::new(), + | RequestWrapper::LeaseLeasesRequest(_) + | RequestWrapper::AlarmRequest(_) => HashSet::new(), } } @@ -613,7 +717,11 @@ impl ConflictCheck for Command { } // any two requests that don't meet the above conditions will conflict with each other // because the auth write request will make all previous token invalid - if (this_req.is_auth_request()) || (other_req.is_auth_request()) { + if this_req.is_auth_request() + || other_req.is_auth_request() + || this_req.is_alarm_request() + || other_req.is_alarm_request() + { return true; } @@ -680,6 +788,18 @@ impl Command { pub fn request(&self) -> &RequestWithToken { &self.request } + + /// need check quota + #[must_use] + #[inline] + pub fn need_check_quota(&self) -> bool { + matches!( + self.request.request, + RequestWrapper::LeaseGrantRequest(_) + | RequestWrapper::PutRequest(_) + | RequestWrapper::TxnRequest(_) + ) + } } /// Command to run consensus protocol diff --git a/xline/src/server/maintenance.rs b/xline/src/server/maintenance.rs index 62ba4a3f3..7287fc9c5 100644 --- a/xline/src/server/maintenance.rs +++ b/xline/src/server/maintenance.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, sync::Arc}; +use std::{fmt::Debug, pin::Pin, sync::Arc}; use async_stream::try_stream; use bytes::BytesMut; @@ -7,9 +7,14 @@ use curp::{client::Client, members::ClusterInfo}; use engine::SnapshotApi; use futures::stream::Stream; use sha2::{Digest, Sha256}; -use tracing::error; +use tracing::{debug, error}; +use xlineapi::{RequestWithToken, RequestWrapper}; -use super::command::{client_err_to_status, Command}; +use super::{ + auth_server::get_token, + command::{client_err_to_status, command_from_request_wrapper, Command}, + CommandResponse, SyncResponse, +}; use crate::{ header_gen::HeaderGenerator, rpc::{ @@ -60,6 +65,29 @@ where cluster_info, } } + + /// Propose request and get result with fast/slow path + async fn propose( + &self, + request: tonic::Request, + use_fast_path: bool, + ) -> Result<(CommandResponse, Option), tonic::Status> + where + T: Into + Debug, + { + let token = get_token(request.metadata()); + let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); + let propose_id = self + .client + .gen_propose_id() + .await + .map_err(client_err_to_status)?; + let cmd = command_from_request_wrapper::(propose_id, wrapper, None); + self.client + .propose(cmd, use_fast_path) + .await + .map_err(client_err_to_status) + } } #[tonic::async_trait] @@ -69,11 +97,19 @@ where { async fn alarm( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented( - "alarm is unimplemented".to_owned(), - )) + let is_fast_path = true; + let (res, sync_res) = self.propose(request, is_fast_path).await?; + let mut res: AlarmResponse = res.into_inner().into(); + if let Some(sync_res) = sync_res { + let revision = sync_res.revision(); + debug!("Get revision {:?} for AlarmResponse", revision); + if let Some(mut header) = res.header.as_mut() { + header.revision = revision; + } + } + Ok(tonic::Response::new(res)) } async fn status( diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 7d316b29c..0192538ff 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -46,7 +46,7 @@ use crate::{ kvwatcher::KvWatcher, lease_store::LeaseCollection, storage_api::StorageApi, - AuthStore, KvStore, LeaseStore, + AlarmStore, AuthStore, KvStore, LeaseStore, }, }; @@ -134,6 +134,7 @@ impl XlineServer { Arc>, Arc>, Arc>, + Arc>, Arc>, )> { let (compact_task_tx, compact_task_rx) = channel(COMPACT_CHANNEL_SIZE); @@ -166,9 +167,11 @@ impl XlineServer { let auth_storage = Arc::new(AuthStore::new( lease_collection, key_pair, - header_gen, - persistent, + Arc::clone(&header_gen), + Arc::clone(&persistent), )); + let alarm_storage = Arc::new(AlarmStore::new(header_gen, persistent)); + let watcher = KvWatcher::new_arc( Arc::clone(&kv_storage), kv_update_rx, @@ -179,7 +182,14 @@ impl XlineServer { lease_storage.recover()?; kv_storage.recover().await?; auth_storage.recover()?; - Ok((kv_storage, lease_storage, auth_storage, watcher)) + alarm_storage.recover()?; + Ok(( + kv_storage, + lease_storage, + auth_storage, + alarm_storage, + watcher, + )) } /// Construct a header generator @@ -322,7 +332,7 @@ impl XlineServer { self.curp_cfg.candidate_timeout_ticks, ); - let (kv_storage, lease_storage, auth_storage, watcher) = self + let (kv_storage, lease_storage, auth_storage, alarm_storage, watcher) = self .construct_underlying_storages( Arc::clone(&persistent), lease_collection, @@ -334,16 +344,26 @@ impl XlineServer { let index_barrier = Arc::new(IndexBarrier::new()); let id_barrier = Arc::new(IdBarrier::new()); + let client = Arc::new( + CurpClient::builder() + .local_server_id(self.cluster_info.self_id()) + .config(self.client_config) + .build_from_all_members(self.cluster_info.all_members_addrs(), None) + .await?, + ); let ce = CommandExecutor::new( Arc::clone(&kv_storage), Arc::clone(&auth_storage), Arc::clone(&lease_storage), + Arc::clone(&alarm_storage), Arc::clone(&persistent), Arc::clone(&index_barrier), Arc::clone(&id_barrier), header_gen.general_revision_arc(), header_gen.auth_revision_arc(), self.storage_cfg.quota, + self.cluster_info.self_id(), + Arc::clone(&client), ); let snapshot_allocator: Box = match self.storage_cfg.engine { EngineConfig::Memory => Box::::default(), @@ -352,14 +372,6 @@ impl XlineServer { _ => unimplemented!(), }; - let client = Arc::new( - CurpClient::builder() - .local_server_id(self.cluster_info.self_id()) - .config(self.client_config) - .build_from_all_members(self.cluster_info.all_members_addrs(), None) - .await?, - ); - let auto_compactor = if let Some(auto_config_cfg) = *self.compact_cfg.auto_compact_config() { Some( diff --git a/xline/src/storage/alarm_store.rs b/xline/src/storage/alarm_store.rs new file mode 100644 index 000000000..2e577943c --- /dev/null +++ b/xline/src/storage/alarm_store.rs @@ -0,0 +1,222 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicI32, Ordering}, + Arc, + }, +}; + +use curp::members::ServerId; +use parking_lot::RwLock; +use prost::Message; +use xlineapi::{ + AlarmAction, AlarmMember, AlarmResponse, AlarmType, RequestWithToken, RequestWrapper, + ResponseWrapper, +}; + +use super::{db::WriteOp, storage_api::StorageApi, ExecuteError}; +use crate::{ + header_gen::HeaderGenerator, + server::{CommandResponse, SyncResponse}, +}; + +/// Alarm table name +pub(crate) const ALARM_TABLE: &str = "alarm"; + +/// Alarm store +#[derive(Debug)] +pub(crate) struct AlarmStore +where + DB: StorageApi, +{ + /// Header generator + header_gen: Arc, + /// Persistent storage + db: Arc, + /// Alarm types + types: RwLock>>, + /// Current alarm + current_alarm: AtomicI32, +} + +impl AlarmStore +where + DB: StorageApi, +{ + /// execute a kv request + pub(crate) fn execute(&self, request: &RequestWithToken) -> CommandResponse { + #[allow(clippy::wildcard_enum_match_arm)] + let alarms = match request.request { + RequestWrapper::AlarmRequest(ref req) => match req.action() { + AlarmAction::Get => self.handle_alarm_get(req.alarm()), + AlarmAction::Activate => { + if req.alarm() == AlarmType::None { + vec![] + } else { + self.handle_alarm_activate(req.member_id, req.alarm()) + } + } + AlarmAction::Deactivate => self.handle_alarm_deactivate(req.member_id, req.alarm()), + }, + _ => { + unreachable!("Other request should not be sent to this store"); + } + }; + CommandResponse::new(ResponseWrapper::AlarmResponse(AlarmResponse { + header: Some(self.header_gen.gen_header()), + alarms, + })) + } + + /// sync a alarm request + pub(crate) fn after_sync( + &self, + request: &RequestWithToken, + revision: i64, + ) -> (SyncResponse, Vec) { + #[allow(clippy::wildcard_enum_match_arm)] + let ops = match request.request { + RequestWrapper::AlarmRequest(ref req) => match req.action() { + AlarmAction::Get => vec![], + AlarmAction::Activate => self.sync_alarm_activate(req.member_id, req.alarm()), + AlarmAction::Deactivate => self.sync_alarm_deactivate(req.member_id, req.alarm()), + }, + _ => { + unreachable!("Other request should not be sent to this store"); + } + }; + (SyncResponse::new(revision), ops) + } + + /// Recover data form persistent storage + pub(crate) fn recover(&self) -> Result<(), ExecuteError> { + let alarms = self.get_all_alarms()?; + let mut types_w = self.types.write(); + for alarm in alarms { + _ = types_w + .entry(alarm.alarm()) + .or_default() + .insert(alarm.member_id, alarm); + } + Ok(()) + } +} + +impl AlarmStore +where + DB: StorageApi, +{ + /// Create a new alarm store + pub(crate) fn new(header_gen: Arc, db: Arc) -> Self { + Self { + header_gen, + db, + types: RwLock::new(HashMap::new()), + current_alarm: AtomicI32::new(i32::from(AlarmType::None)), + } + } + + /// Get current alarm + pub(crate) fn current_alarm(&self) -> AlarmType { + let current_alarm = self.current_alarm.load(Ordering::Relaxed); + AlarmType::from_i32(current_alarm) + .unwrap_or_else(|| unreachable!("current alarm should be valid")) + } + + /// Refresh current alarm + fn refresh_current_alarm(&self, types: &HashMap>) { + let corrupt_alarms = types + .get(&AlarmType::Corrupt) + .is_some_and(|e| !e.is_empty()); + if corrupt_alarms { + self.current_alarm + .store(i32::from(AlarmType::Corrupt), Ordering::Relaxed); + return; + } + + let no_space_alarms = types + .get(&AlarmType::Nospace) + .is_some_and(|e| !e.is_empty()); + if no_space_alarms { + self.current_alarm + .store(i32::from(AlarmType::Nospace), Ordering::Relaxed); + return; + } + + self.current_alarm + .store(i32::from(AlarmType::None), Ordering::Relaxed); + } + + /// Get all alarms from persistent storage + fn get_all_alarms(&self) -> Result, ExecuteError> { + let alarms = self + .db + .get_all(ALARM_TABLE)? + .into_iter() + .map(|(alarm, _)| { + AlarmMember::decode(alarm.as_slice()).unwrap_or_else(|e| { + panic!("Failed to decode alarm from value, error: {e:?}, alarm: {alarm:?}"); + }) + }) + .collect(); + Ok(alarms) + } + + /// Handle alarm get request + fn handle_alarm_get(&self, alarm: AlarmType) -> Vec { + let types = self.types.read(); + match alarm { + AlarmType::None => types.values().flat_map(HashMap::values).cloned().collect(), + a @ (AlarmType::Nospace | AlarmType::Corrupt) => types + .get(&a) + .map(|s| s.values().cloned().collect()) + .unwrap_or_default(), + } + } + + /// Handle alarm activate request + fn handle_alarm_activate(&self, member_id: ServerId, alarm: AlarmType) -> Vec { + let new_alarm = AlarmMember::new(member_id, alarm); + self.types + .read() + .get(&alarm) + .and_then(|e| e.get(&member_id)) + .map_or_else(|| vec![new_alarm], |m| vec![m.clone()]) + } + + /// Handle alarm deactivate request + fn handle_alarm_deactivate(&self, member_id: ServerId, alarm: AlarmType) -> Vec { + self.types + .read() + .get(&alarm) + .and_then(|e| e.get(&member_id)) + .map(|m| vec![m.clone()]) + .unwrap_or_default() + } + + /// Sync alarm activate request + fn sync_alarm_activate(&self, member_id: ServerId, alarm: AlarmType) -> Vec { + let new_alarm: AlarmMember = AlarmMember::new(member_id, alarm); + let mut types_w = self.types.write(); + let e = types_w.entry(alarm).or_default(); + let mut ops = vec![]; + if e.get(&member_id).is_none() { + _ = e.insert(new_alarm.member_id, new_alarm.clone()); + ops.push(WriteOp::PutAlarm(new_alarm)); + } + self.refresh_current_alarm(&types_w); + ops + } + + /// Sync alarm deactivate request + fn sync_alarm_deactivate(&self, member_id: ServerId, alarm: AlarmType) -> Vec { + let mut types_w = self.types.write(); + let e = types_w.entry(alarm).or_default(); + let mut ops = vec![]; + if let Some(m) = e.remove(&member_id) { + ops.push(WriteOp::DeleteAlarm(m)); + } + self.refresh_current_alarm(&types_w); + ops + } +} diff --git a/xline/src/storage/db.rs b/xline/src/storage/db.rs index 23d33f259..fe174752e 100644 --- a/xline/src/storage/db.rs +++ b/xline/src/storage/db.rs @@ -3,8 +3,10 @@ use std::{collections::HashMap, path::Path, sync::Arc}; use engine::{Engine, EngineType, Snapshot, StorageEngine, WriteOperation}; use prost::Message; use utils::config::EngineConfig; +use xlineapi::AlarmMember; use super::{ + alarm_store::ALARM_TABLE, auth_store::{AUTH_ENABLE_KEY, AUTH_REVISION_KEY, AUTH_TABLE, ROLE_TABLE, USER_TABLE}, kv_store::KV_TABLE, lease_store::LEASE_TABLE, @@ -19,13 +21,14 @@ use crate::{ }; /// Xline Server Storage Table -pub(crate) const XLINE_TABLES: [&str; 6] = [ +pub(crate) const XLINE_TABLES: [&str; 7] = [ META_TABLE, KV_TABLE, LEASE_TABLE, AUTH_TABLE, USER_TABLE, ROLE_TABLE, + ALARM_TABLE, ]; /// Key of compacted revision @@ -134,6 +137,16 @@ impl StorageApi for DB { } }) .collect::>(); + let del_alarm_buffer = ops + .iter() + .find_map(|op| { + if let WriteOp::DeleteAlarm(ref key) = *op { + Some(key.encode_to_vec()) + } else { + None + } + }) + .unwrap_or_default(); for op in ops { let wop = match op { WriteOp::PutKeyValue(rev, value) => { @@ -195,6 +208,13 @@ impl StorageApi for DB { WriteOp::DeleteRole(name) => { WriteOperation::new_delete(ROLE_TABLE, name.as_bytes()) } + WriteOp::PutAlarm(alarm) => { + let key = alarm.encode_to_vec(); + WriteOperation::new_put(ALARM_TABLE, key, vec![]) + } + WriteOp::DeleteAlarm(_key) => { + WriteOperation::new_delete(ALARM_TABLE, del_alarm_buffer.as_ref()) + } }; wr_ops.push(wop); } @@ -243,6 +263,10 @@ pub enum WriteOp<'a> { PutRole(Role), /// Delete a role from role table DeleteRole(&'a str), + /// Put a alarm member to alarm table + PutAlarm(AlarmMember), + /// Delete a alarm member from alarm table + DeleteAlarm(AlarmMember), } #[cfg(test)] diff --git a/xline/src/storage/execute_error.rs b/xline/src/storage/execute_error.rs index f91122233..7660be422 100644 --- a/xline/src/storage/execute_error.rs +++ b/xline/src/storage/execute_error.rs @@ -94,6 +94,10 @@ pub enum ExecuteError { /// Permission denied Error #[error("permission denied")] PermissionDenied, + + /// no space left in quota + #[error("no space left in quota")] + Nospace, } impl From for ExecuteError { @@ -134,6 +138,7 @@ impl From for ExecuteError { } PbExecuteError::DbError(e) => ExecuteError::DbError(e), PbExecuteError::PermissionDenied(_) => ExecuteError::PermissionDenied, + PbExecuteError::Nospace(_) => ExecuteError::Nospace, } } } @@ -185,6 +190,7 @@ impl From for PbExecuteError { } ExecuteError::DbError(e) => PbExecuteError::DbError(e), ExecuteError::PermissionDenied => PbExecuteError::PermissionDenied(()), + ExecuteError::Nospace => PbExecuteError::Nospace(()), } } } @@ -291,6 +297,10 @@ impl From for tonic::Status { tonic::Code::InvalidArgument, "etcdserver: invalid auth management".to_owned(), ), + ExecuteError::Nospace => ( + tonic::Code::ResourceExhausted, + "etcdserver: mvcc: database space exceeded".to_owned(), + ), ExecuteError::LeaseExpired(_) => (tonic::Code::DeadlineExceeded, err.to_string()), ExecuteError::UserAlreadyHasRole(_, _) | ExecuteError::NoPasswordUser diff --git a/xline/src/storage/mod.rs b/xline/src/storage/mod.rs index 514a225e5..fc1aa942f 100644 --- a/xline/src/storage/mod.rs +++ b/xline/src/storage/mod.rs @@ -1,3 +1,5 @@ +/// Storage for alarm +pub(crate) mod alarm_store; /// Storage for Auth pub(crate) mod auth_store; /// Compact module @@ -21,5 +23,6 @@ pub(crate) mod storage_api; pub use self::execute_error::ExecuteError; pub(crate) use self::{ - auth_store::AuthStore, kv_store::KvStore, lease_store::LeaseStore, revision::Revision, + alarm_store::AlarmStore, auth_store::AuthStore, kv_store::KvStore, lease_store::LeaseStore, + revision::Revision, }; diff --git a/xline/tests/it/maintenance_test.rs b/xline/tests/it/maintenance_test.rs index 3123429a6..c144748fc 100644 --- a/xline/tests/it/maintenance_test.rs +++ b/xline/tests/it/maintenance_test.rs @@ -4,12 +4,14 @@ use test_macros::abort_on_panic; use tokio::io::AsyncWriteExt; #[cfg(test)] use xline::restore::restore; +use xline::storage::ExecuteError; +use xline_client::error::XlineClientError; use xline_test_utils::{ types::kv::{PutRequest, RangeRequest}, Client, ClientOptions, Cluster, }; +use xlineapi::{AlarmAction, AlarmRequest, AlarmType}; -#[cfg(test)] #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn test_snapshot_and_restore() -> Result<(), Box> { @@ -51,3 +53,44 @@ async fn test_snapshot_and_restore() -> Result<(), Box> { tokio::fs::remove_dir_all(&dir).await?; Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +#[abort_on_panic] +async fn leader_should_detect_no_space_alarm() { + test_alarm(0).await; +} + +#[tokio::test(flavor = "multi_thread")] +#[abort_on_panic] +async fn follower_should_detect_no_space_alarm() { + test_alarm(1).await; +} + +async fn test_alarm(idx: usize) { + let q = 8 * 1024; + let mut cluster = Cluster::new(3).await; + _ = cluster.quotas.insert(idx, q); + cluster.start().await; + let client = cluster.client().await; + let mut m_client = client.maintenance_client(); + let k_client = client.kv_client(); + + for i in 1..100u8 { + let key: Vec = vec![i]; + let value: Vec = vec![i]; + let req = PutRequest::new(key, value); + if let Err(err) = k_client.put(req).await { + assert!(matches!( + err, + XlineClientError::CommandError(ExecuteError::Nospace) + )); + break; + } + } + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let res = m_client + .alarm(AlarmRequest::new(AlarmAction::Get, 0, AlarmType::None)) + .await + .unwrap(); + assert!(!res.alarms.is_empty()); +} diff --git a/xlineapi/proto b/xlineapi/proto index f5c2906b8..60f86759c 160000 --- a/xlineapi/proto +++ b/xlineapi/proto @@ -1 +1 @@ -Subproject commit f5c2906b854b63f537ae5abb1ccbb7b5903e3f51 +Subproject commit 60f86759c37fcfce94387c2d421c4ca3ea8c8c45 diff --git a/xlineapi/src/lib.rs b/xlineapi/src/lib.rs index 1e07ee9ce..16fa53c79 100644 --- a/xlineapi/src/lib.rs +++ b/xlineapi/src/lib.rs @@ -203,6 +203,7 @@ pub use self::{ Revisions as PbRevisions, UserRole as PbUserRole, }, etcdserverpb::{ + alarm_request::AlarmAction, auth_client::AuthClient, auth_server::{Auth, AuthServer}, cluster_client::ClusterClient, @@ -219,14 +220,15 @@ pub use self::{ watch_client::WatchClient, watch_request::RequestUnion, watch_server::{Watch, WatchServer}, - AlarmRequest, AlarmResponse, AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, - AuthEnableResponse, AuthRoleAddRequest, AuthRoleAddResponse, AuthRoleDeleteRequest, - AuthRoleDeleteResponse, AuthRoleGetRequest, AuthRoleGetResponse, - AuthRoleGrantPermissionRequest, AuthRoleGrantPermissionResponse, AuthRoleListRequest, - AuthRoleListResponse, AuthRoleRevokePermissionRequest, AuthRoleRevokePermissionResponse, - AuthStatusRequest, AuthStatusResponse, AuthUserAddRequest, AuthUserAddResponse, - AuthUserChangePasswordRequest, AuthUserChangePasswordResponse, AuthUserDeleteRequest, - AuthUserDeleteResponse, AuthUserGetRequest, AuthUserGetResponse, AuthUserGrantRoleRequest, + AlarmMember, AlarmRequest, AlarmResponse, AlarmType, AuthDisableRequest, + AuthDisableResponse, AuthEnableRequest, AuthEnableResponse, AuthRoleAddRequest, + AuthRoleAddResponse, AuthRoleDeleteRequest, AuthRoleDeleteResponse, AuthRoleGetRequest, + AuthRoleGetResponse, AuthRoleGrantPermissionRequest, AuthRoleGrantPermissionResponse, + AuthRoleListRequest, AuthRoleListResponse, AuthRoleRevokePermissionRequest, + AuthRoleRevokePermissionResponse, AuthStatusRequest, AuthStatusResponse, + AuthUserAddRequest, AuthUserAddResponse, AuthUserChangePasswordRequest, + AuthUserChangePasswordResponse, AuthUserDeleteRequest, AuthUserDeleteResponse, + AuthUserGetRequest, AuthUserGetResponse, AuthUserGrantRoleRequest, AuthUserGrantRoleResponse, AuthUserListRequest, AuthUserListResponse, AuthUserRevokeRoleRequest, AuthUserRevokeRoleResponse, AuthenticateRequest, AuthenticateResponse, CompactionRequest, CompactionResponse, Compare, DefragmentRequest, @@ -336,6 +338,7 @@ impl ResponseWrapper { ResponseWrapper::LeaseGrantResponse(ref mut resp) => &mut resp.header, ResponseWrapper::LeaseRevokeResponse(ref mut resp) => &mut resp.header, ResponseWrapper::LeaseLeasesResponse(ref mut resp) => &mut resp.header, + ResponseWrapper::AlarmResponse(ref mut resp) => &mut resp.header, }; if let Some(ref mut header) = *header { header.revision = revision; @@ -352,6 +355,8 @@ pub enum RequestBackend { Auth, /// Lease backend Lease, + /// Alarm backend + Alarm, } impl RequestWrapper { @@ -383,6 +388,7 @@ impl RequestWrapper { RequestWrapper::LeaseGrantRequest(_) | RequestWrapper::LeaseRevokeRequest(_) | RequestWrapper::LeaseLeasesRequest(_) => RequestBackend::Lease, + RequestWrapper::AlarmRequest(_) => RequestBackend::Alarm, } } @@ -446,6 +452,10 @@ impl RequestWrapper { RequestWrapper::LeaseGrantRequest(_) | RequestWrapper::LeaseRevokeRequest(_) ) } + + pub fn is_alarm_request(&self) -> bool { + matches!(*self, RequestWrapper::AlarmRequest(_)) + } } /// impl `From` trait for all request types @@ -517,7 +527,8 @@ impl_from_requests!( AuthenticateRequest, LeaseGrantRequest, LeaseRevokeRequest, - LeaseLeasesRequest + LeaseLeasesRequest, + AlarmRequest ); impl_from_responses!( @@ -545,7 +556,8 @@ impl_from_responses!( AuthenticateResponse, LeaseGrantResponse, LeaseRevokeResponse, - LeaseLeasesResponse + LeaseLeasesResponse, + AlarmResponse ); impl From for RequestWrapper { @@ -657,6 +669,25 @@ impl TxnRequest { } } +impl AlarmRequest { + pub fn new(action: AlarmAction, member_id: u64, alarm: AlarmType) -> Self { + Self { + action: i32::from(action), + member_id, + alarm: i32::from(alarm), + } + } +} + +impl AlarmMember { + pub fn new(member_id: u64, alarm: AlarmType) -> Self { + Self { + member_id, + alarm: i32::from(alarm), + } + } +} + #[cfg(test)] mod test { use super::*;