diff --git a/simulation/src/curp_group.rs b/simulation/src/curp_group.rs index 58a7d6394..4b9d990f1 100644 --- a/simulation/src/curp_group.rs +++ b/simulation/src/curp_group.rs @@ -257,12 +257,13 @@ impl CurpGroup { } pub async fn get_leader(&self) -> (ServerId, u64) { + const RETRY_INTERVAL: u64 = 100; loop { if let Some(leader) = self.try_get_leader().await { return leader; } debug!("failed to get leader"); - madsim::time::sleep(Duration::from_millis(100)).await; + madsim::time::sleep(Duration::from_millis(RETRY_INTERVAL)).await; } } diff --git a/utils/src/config.rs b/utils/src/config.rs index 79ad577de..01bf30809 100644 --- a/utils/src/config.rs +++ b/utils/src/config.rs @@ -24,6 +24,9 @@ pub struct XlineServerConfig { /// auth configuration object #[getset(get = "pub")] auth: AuthConfig, + /// compactor configuration object + #[getset(get = "pub")] + compact: CompactConfig, } /// Cluster Range type alias @@ -118,6 +121,56 @@ impl ClusterConfig { } } +/// Compaction configuration +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +pub struct CompactConfig { + /// The max number of historical versions processed in a single compact operation + #[getset(get = "pub")] + #[serde(default = "default_compact_batch_size")] + compact_batch_size: usize, + /// The interval between two compaction batches + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_compact_sleep_interval")] + compact_sleep_interval: Duration, +} + +impl Default for CompactConfig { + #[inline] + fn default() -> Self { + Self { + compact_batch_size: default_compact_batch_size(), + compact_sleep_interval: default_compact_sleep_interval(), + } + } +} + +impl CompactConfig { + /// Create a new compact config + #[must_use] + #[inline] + pub fn new(compact_batch_size: usize, compact_sleep_interval: Duration) -> Self { + Self { + compact_batch_size, + compact_sleep_interval, + } + } +} + +/// default compact batch size +#[must_use] +#[inline] +pub const fn default_compact_batch_size() -> usize { + 1000 +} + +/// default compact interval +#[must_use] +#[inline] +pub const fn default_compact_sleep_interval() -> Duration { + Duration::from_millis(10) +} + /// Curp server timeout settings #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)] #[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)] @@ -470,7 +523,7 @@ pub mod level_format { use crate::parse_log_level; /// deserializes a cluster duration - #[allow(single_use_lifetimes)] // TODO: Think is it necessary to allow this clippy?? + #[allow(single_use_lifetimes)] pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, @@ -647,6 +700,7 @@ impl XlineServerConfig { log: LogConfig, trace: TraceConfig, auth: AuthConfig, + compact: CompactConfig, ) -> Self { Self { cluster, @@ -654,6 +708,7 @@ impl XlineServerConfig { log, trace, auth, + compact, } } } @@ -692,6 +747,10 @@ mod tests { [storage] engine = 'memory' + [compact] + compact_batch_size = 123 + compact_sleep_interval = '5ms' + [log] path = '/var/log/xline' rotation = 'daily' @@ -761,6 +820,14 @@ mod tests { LevelConfig::INFO ) ); + + assert_eq!( + config.compact, + CompactConfig { + compact_batch_size: 123, + compact_sleep_interval: Duration::from_millis(5) + } + ); } #[allow(clippy::unwrap_used)] @@ -785,6 +852,8 @@ mod tests { engine = 'rocksdb' data_dir = '/usr/local/xline/data-dir' + [compact] + [trace] jaeger_online = false jaeger_offline = false @@ -836,5 +905,6 @@ mod tests { LevelConfig::INFO ) ); + assert_eq!(config.compact, CompactConfig::default()); } } diff --git a/xline-test-utils/src/lib.rs b/xline-test-utils/src/lib.rs index 9a68abc6c..05a16bb58 100644 --- a/xline-test-utils/src/lib.rs +++ b/xline-test-utils/src/lib.rs @@ -12,7 +12,7 @@ use tokio::{ sync::broadcast::{self, Sender}, time::{self, Duration}, }; -use utils::config::{ClientTimeout, CurpConfig, ServerTimeout, StorageConfig}; +use utils::config::{ClientTimeout, CompactConfig, CurpConfig, ServerTimeout, StorageConfig}; use xline::{client::Client, server::XlineServer, storage::db::DB}; /// Cluster @@ -86,6 +86,7 @@ impl Cluster { ClientTimeout::default(), ServerTimeout::default(), StorageConfig::Memory, + CompactConfig::default(), ); let signal = async { let _ = rx.recv().await; diff --git a/xline/src/main.rs b/xline/src/main.rs index cf6eb1495..c3a259b1d 100644 --- a/xline/src/main.rs +++ b/xline/src/main.rs @@ -151,14 +151,14 @@ use tracing_subscriber::{fmt::format, prelude::*}; use utils::{ config::{ default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks, - default_client_wait_synced_timeout, default_cmd_workers, default_follower_timeout_ticks, - default_gc_interval, default_heartbeat_interval, default_log_entries_cap, - default_log_level, default_propose_timeout, default_range_retry_timeout, - default_retry_timeout, default_rotation, default_rpc_timeout, - default_server_wait_synced_timeout, default_sync_victims_interval, - default_watch_progress_notify_interval, file_appender, AuthConfig, ClientTimeout, - ClusterConfig, CurpConfigBuilder, LevelConfig, LogConfig, RotationConfig, ServerTimeout, - StorageConfig, TraceConfig, XlineServerConfig, + default_client_wait_synced_timeout, default_cmd_workers, default_compact_batch_size, + default_compact_sleep_interval, default_follower_timeout_ticks, default_gc_interval, + default_heartbeat_interval, default_log_entries_cap, default_log_level, + default_propose_timeout, default_range_retry_timeout, default_retry_timeout, + default_rotation, default_rpc_timeout, default_server_wait_synced_timeout, + default_sync_victims_interval, default_watch_progress_notify_interval, file_appender, + AuthConfig, ClientTimeout, ClusterConfig, CompactConfig, CurpConfigBuilder, LevelConfig, + LogConfig, RotationConfig, ServerTimeout, StorageConfig, TraceConfig, XlineServerConfig, }, parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_rotation, }; @@ -263,6 +263,12 @@ struct ServerArgs { /// Curp command workers count #[clap(long, default_value_t = default_cmd_workers())] cmd_workers: u8, + /// The max number of historical versions processed in a single compact operation [default: 1000] + #[clap(long, default_value_t = default_compact_batch_size())] + compact_batch_size: usize, + /// Interval between two compaction operations [default: 10ms] + #[clap(long, value_parser = parse_duration)] + compact_sleep_interval: Option, } impl From for XlineServerConfig { @@ -329,7 +335,12 @@ impl From for XlineServerConfig { args.jaeger_level, ); let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key); - XlineServerConfig::new(cluster, storage, log, trace, auth) + let compact = CompactConfig::new( + args.compact_batch_size, + args.compact_sleep_interval + .unwrap_or_else(default_compact_sleep_interval), + ); + XlineServerConfig::new(cluster, storage, log, trace, auth, compact) } } @@ -480,6 +491,7 @@ async fn main() -> Result<()> { *cluster_config.client_timeout(), *cluster_config.server_timeout(), config.storage().clone(), + *config.compact(), ); debug!("{:?}", server); server.start(self_addr, db_proxy, key_pair).await?; diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index 59cf406eb..d52adf42c 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -481,6 +481,29 @@ impl ConflictCheck for Command { return true; } + if this_req.is_compaction_request() && other_req.is_compaction_request() { + return true; + } + + if (this_req.is_txn_request() && other_req.is_compaction_request()) + || (this_req.is_compaction_request() && other_req.is_txn_request()) + { + match (this_req, other_req) { + ( + &RequestWrapper::CompactionRequest(ref com_req), + &RequestWrapper::TxnRequest(ref txn_req), + ) + | ( + &RequestWrapper::TxnRequest(ref txn_req), + &RequestWrapper::CompactionRequest(ref com_req), + ) => { + let target_revision = com_req.revision; + return txn_req.is_conflict_with_rev(target_revision) + } + _ => unreachable!("The request must be either a transaction or a compaction request! \nthis_req = {this_req:?} \nother_req = {other_req:?}") + } + } + let this_lease_ids = get_lease_ids(this_req); let other_lease_ids = get_lease_ids(other_req); let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids); @@ -577,10 +600,12 @@ impl CurpCommand for Command { #[cfg(test)] mod test { + use xlineapi::Compare; + use super::*; use crate::rpc::{ - AuthEnableRequest, AuthStatusRequest, LeaseGrantRequest, LeaseLeasesRequest, - LeaseRevokeRequest, PutRequest, RequestOp, TxnRequest, + AuthEnableRequest, AuthStatusRequest, CompactionRequest, LeaseGrantRequest, + LeaseLeasesRequest, LeaseRevokeRequest, PutRequest, RangeRequest, RequestOp, TxnRequest, }; #[test] @@ -708,4 +733,108 @@ mod test { assert!(lease_leases_cmd.is_conflict(&cmd5)); // lease read and write assert!(cmd6.is_conflict(&lease_leases_cmd)); // lease read and write } + + fn generate_txn_command( + keys: Vec, + compare: Vec, + success: Vec, + failure: Vec, + propose_id: &str, + ) -> Command { + Command::new( + keys, + RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { + compare, + success, + failure, + })), + ProposeId::new(propose_id.to_owned()), + ) + } + + #[test] + fn test_compaction_txn_conflict() { + let compaction_cmd_1 = Command::new( + vec![], + RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest { + revision: 3, + physical: false, + })), + ProposeId::new("id11".to_owned()), + ); + + let compaction_cmd_2 = Command::new( + vec![], + RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest { + revision: 5, + physical: false, + })), + ProposeId::new("id12".to_owned()), + ); + + let txn_with_lease_id_cmd = generate_txn_command( + vec![KeyRange::new_one_key("key")], + vec![], + vec![RequestOp { + request: Some(Request::RequestPut(PutRequest { + key: b"key".to_vec(), + value: b"value".to_vec(), + lease: 123, + ..Default::default() + })), + }], + vec![], + "id6", + ); + + let txn_cmd_1 = generate_txn_command( + vec![KeyRange::new_one_key("key")], + vec![], + vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + ..Default::default() + })), + }], + vec![], + "id13", + ); + + let txn_cmd_2 = generate_txn_command( + vec![KeyRange::new_one_key("key")], + vec![], + vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + revision: 3, + ..Default::default() + })), + }], + vec![], + "id14", + ); + + let txn_cmd_3 = generate_txn_command( + vec![KeyRange::new_one_key("key")], + vec![], + vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + revision: 7, + ..Default::default() + })), + }], + vec![], + "id15", + ); + + assert!(compaction_cmd_1.is_conflict(&compaction_cmd_2)); + assert!(compaction_cmd_2.is_conflict(&compaction_cmd_1)); + assert!(!compaction_cmd_1.is_conflict(&txn_with_lease_id_cmd)); + assert!(!compaction_cmd_2.is_conflict(&txn_with_lease_id_cmd)); + + assert!(!compaction_cmd_2.is_conflict(&txn_cmd_1)); + assert!(compaction_cmd_2.is_conflict(&txn_cmd_2)); + assert!(!compaction_cmd_2.is_conflict(&txn_cmd_3)); + } } diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index e98c689ad..38bc4e235 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -9,6 +9,7 @@ use futures::future::join_all; use tokio::time::timeout; use tracing::{debug, instrument}; use uuid::Uuid; +use xlineapi::ResponseWrapper; use super::{ auth_server::get_token, @@ -99,6 +100,7 @@ where .iter() .map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice())) .collect(), + RequestWrapper::CompactionRequest(ref _req) => Vec::new(), _ => unreachable!("Other request should not be sent to this store"), }; Command::new(key_ranges, wrapper, propose_id) @@ -195,7 +197,11 @@ where } /// Validate range request before handle - fn check_range_request(req: &RangeRequest, current_revision: i64) -> Result<(), tonic::Status> { + fn check_range_request( + req: &RangeRequest, + compacted_revision: i64, + current_revision: i64, + ) -> Result<(), tonic::Status> { if req.key.is_empty() { return Err(tonic::Status::invalid_argument("key is not provided")); } @@ -203,13 +209,50 @@ where return Err(tonic::Status::invalid_argument("invalid sort option")); } if req.revision > current_revision { - return Err(tonic::Status::invalid_argument(format!( + Err(tonic::Status::invalid_argument(format!( "required revision {} is higher than current revision {}", req.revision, current_revision - ))); + ))) + } else { + Self::check_range_compacted(req.revision, compacted_revision) } + } - Ok(()) + /// check whether the required revision is compacted or not + fn check_range_compacted( + range_revision: i64, + compacted_revision: i64, + ) -> Result<(), tonic::Status> { + (range_revision <= 0 || range_revision >= compacted_revision) + .then_some(()) + .ok_or(tonic::Status::invalid_argument(format!( + "required revision {range_revision} has been compacted, compacted revision is {compacted_revision}" + ))) + } + + /// Validate compact request before handle + fn check_compact_request( + req: &CompactionRequest, + compacted_revision: i64, + current_revision: i64, + ) -> Result<(), tonic::Status> { + debug_assert!( + compacted_revision <= current_revision, + "compacted revision should not larger than current revision" + ); + if req.revision <= compacted_revision { + Err(tonic::Status::invalid_argument(format!( + "required revision {} has been compacted, compacted revision is {}", + req.revision, compacted_revision + ))) + } else if req.revision > current_revision { + Err(tonic::Status::invalid_argument(format!( + "required revision {} is higher than current revision {}", + req.revision, current_revision + ))) + } else { + Ok(()) + } } /// Validate put request before handle @@ -237,7 +280,11 @@ where } /// Validate txn request before handle - fn check_txn_request(req: &TxnRequest, current_revision: i64) -> Result<(), tonic::Status> { + fn check_txn_request( + req: &TxnRequest, + compacted_revision: i64, + current_revision: i64, + ) -> Result<(), tonic::Status> { let opc = req .compare .len() @@ -256,10 +303,14 @@ where for op in req.success.iter().chain(req.failure.iter()) { if let Some(ref request) = op.request { match *request { - Request::RequestRange(ref r) => Self::check_range_request(r, current_revision), + Request::RequestRange(ref r) => { + Self::check_range_request(r, compacted_revision, current_revision) + } Request::RequestPut(ref r) => Self::check_put_request(r), Request::RequestDeleteRange(ref r) => Self::check_delete_range_request(r), - Request::RequestTxn(ref r) => Self::check_txn_request(r, current_revision), + Request::RequestTxn(ref r) => { + Self::check_txn_request(r, compacted_revision, current_revision) + } }?; } else { return Err(tonic::Status::invalid_argument("key not found")); @@ -392,7 +443,12 @@ where ) -> Result, tonic::Status> { let range_req = request.get_ref(); debug!("Receive grpc request: {:?}", range_req); - Self::check_range_request(range_req, self.kv_storage.revision())?; + Self::check_range_request( + range_req, + self.kv_storage.compacted_revision(), + self.kv_storage.revision(), + )?; + let range_required_revision = range_req.revision; let is_serializable = range_req.serializable; let token = get_token(request.metadata()); let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); @@ -400,6 +456,13 @@ where let cmd = Self::command_from_request_wrapper(propose_id, wrapper); if !is_serializable { self.wait_read_state(&cmd).await?; + // Double check whether the range request is compacted or not since the compaction request + // may be executed during the process of `wait_read_state` which results in the result of + // previous `check_range_request` outdated. + Self::check_range_compacted( + range_required_revision, + self.kv_storage.compacted_revision(), + )?; } self.serializable_range(cmd.request()) } @@ -469,7 +532,11 @@ where ) -> Result, tonic::Status> { let txn_req = request.get_ref(); debug!("Receive grpc request: {:?}", txn_req); - Self::check_txn_request(txn_req, self.kv_storage.revision())?; + Self::check_txn_request( + txn_req, + self.kv_storage.compacted_revision(), + self.kv_storage.revision(), + )?; let is_fast_path = false; // lock need revision of txn let (cmd_res, sync_res) = self.propose(request, is_fast_path).await?; @@ -494,19 +561,26 @@ where &self, request: tonic::Request, ) -> Result, tonic::Status> { - let compact_req = request.get_ref(); - debug!("Receive grpc request: {:?}", compact_req); - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Not Implemented".to_owned(), - )) + debug!("Receive CompactionRequest {:?}", request); + let compacted_revision = self.kv_storage.compacted_revision(); + let current_revision = self.kv_storage.revision(); + let req = request.get_ref(); + Self::check_compact_request(req, compacted_revision, current_revision)?; + + let is_fast_path = !req.physical; + let (cmd_res, _sync_res) = self.propose(request, is_fast_path).await?; + let resp = cmd_res.decode(); + + if let ResponseWrapper::CompactionResponse(response) = resp { + Ok(tonic::Response::new(response)) + } else { + panic!("Receive wrong response {resp:?} for CompactionRequest"); + } } } #[cfg(test)] mod test { - use test_macros::abort_on_panic; - use super::*; use crate::storage::db::DB; @@ -550,37 +624,140 @@ mod test { ], failure: vec![], }; - let result = KvServer::::check_txn_request(&txn_req, 0); + let result = KvServer::::check_txn_request(&txn_req, 1, 0); assert!(result.is_ok()); } #[tokio::test] - #[abort_on_panic] - async fn test_future_revision() { + async fn test_range_invalid_revision() { let current_revision = 10; - let range_request = RangeRequest { + let compacted_revision = 5; + let range_request_with_future_rev = RangeRequest { key: b"foo".to_vec(), revision: 20, ..Default::default() }; + let expected_err_message = tonic::Status::invalid_argument(format!( "required revision {} is higher than current revision {}", - range_request.revision, current_revision + range_request_with_future_rev.revision, current_revision )) .to_string(); - let message = KvServer::::check_range_request(&range_request, current_revision) - .unwrap_err() - .to_string(); + let message = KvServer::::check_range_request( + &range_request_with_future_rev, + compacted_revision, + current_revision, + ) + .unwrap_err() + .to_string(); assert_eq!(message, expected_err_message); - let txn_req = TxnRequest { + let range_request_with_compacted_rev = RangeRequest { + key: b"foo".to_vec(), + revision: 2, + ..Default::default() + }; + + let expected_err_message = tonic::Status::invalid_argument(format!( + "required revision {} has been compacted, compacted revision is {}", + range_request_with_compacted_rev.revision, compacted_revision + )) + .to_string(); + + let message = KvServer::::check_range_request( + &range_request_with_compacted_rev, + compacted_revision, + current_revision, + ) + .unwrap_err() + .to_string(); + assert_eq!(message, expected_err_message); + } + + #[tokio::test] + async fn test_txn_invalid_revision() { + let current_revision = 10; + let compacted_revision = 5; + let txn_request_with_future_revision = TxnRequest { compare: vec![], success: vec![RequestOp { - request: Some(Request::RequestRange(range_request)), + request: Some(Request::RequestRange(RangeRequest { + key: b"foo".to_vec(), + revision: 20, + ..Default::default() + })), }], failure: vec![], }; - let message = KvServer::::check_txn_request(&txn_req, current_revision) + + let expected_err_message = tonic::Status::invalid_argument(format!( + "required revision {} is higher than current revision {}", + 20, current_revision + )) + .to_string(); + + let message = KvServer::::check_txn_request( + &txn_request_with_future_revision, + compacted_revision, + current_revision, + ) + .unwrap_err() + .to_string(); + assert_eq!(message, expected_err_message); + + let txn_request_with_compacted_revision = TxnRequest { + compare: vec![], + success: vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"foo".to_vec(), + revision: 3, + ..Default::default() + })), + }], + failure: vec![], + }; + + let expected_err_message = tonic::Status::invalid_argument(format!( + "required revision {} has been compacted, compacted revision is {}", + 3, compacted_revision + )) + .to_string(); + + let message = KvServer::::check_txn_request( + &txn_request_with_compacted_revision, + compacted_revision, + current_revision, + ) + .unwrap_err() + .to_string(); + assert_eq!(message, expected_err_message); + } + + #[tokio::test] + async fn test_compact_invalid_revision() { + let compact_request = CompactionRequest { + revision: 10, + ..Default::default() + }; + + let expected_err_message = tonic::Status::invalid_argument(format!( + "required revision {} is higher than current revision {}", + compact_request.revision, 8 + )) + .to_string(); + + let message = KvServer::::check_compact_request(&compact_request, 3, 8) + .unwrap_err() + .to_string(); + assert_eq!(message, expected_err_message); + + let expected_err_message = tonic::Status::invalid_argument(format!( + "required revision {} has been compacted, compacted revision is {}", + compact_request.revision, 13 + )) + .to_string(); + + let message = KvServer::::check_compact_request(&compact_request, 13, 18) .unwrap_err() .to_string(); assert_eq!(message, expected_err_message); diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index 64f61e46b..383a7b6eb 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -198,6 +198,17 @@ where /// Handle `WatchCreateRequest` async fn handle_watch_create(&mut self, req: WatchCreateRequest) { + let compacted_revision = self.kv_watcher.compacted_revision(); + if req.start_revision < compacted_revision { + let result = Err(tonic::Status::invalid_argument(format!( + "required revision {} has been compacted, compacted revision is {}", + req.start_revision, compacted_revision + ))); + if self.response_tx.send(result).await.is_err() { + self.stop_notify.notify(1); + } + return; + } let Some(watch_id) = self.validate_watch_id(req.watch_id) else { let result = Err(tonic::Status::already_exists(format!( "Watch ID {} has already been used", @@ -239,6 +250,7 @@ where header: Some(self.header_gen.gen_header()), watch_id, created: true, + compact_revision: compacted_revision, ..WatchResponse::default() }; if self.response_tx.send(Ok(response)).await.is_err() { @@ -414,15 +426,18 @@ mod test { use parking_lot::Mutex; use test_macros::abort_on_panic; - use tokio::time::{sleep, timeout}; + use tokio::{ + sync::mpsc, + time::{sleep, timeout}, + }; use utils::config::{default_watch_progress_notify_interval, StorageConfig}; use super::*; use crate::{ rpc::{PutRequest, RequestWithToken, WatchProgressRequest}, storage::{ - db::DB, index::Index, kvwatcher::MockKvWatcherOps, lease_store::LeaseCollection, - KvStore, + compact::COMPACT_CHANNEL_SIZE, db::DB, index::Index, kvwatcher::MockKvWatcherOps, + lease_store::LeaseCollection, KvStore, }, }; @@ -465,6 +480,9 @@ mod test { let mut mock_watcher = MockKvWatcherOps::new(); let _ = mock_watcher.expect_watch().times(1).return_const(()); let _ = mock_watcher.expect_cancel().times(1).return_const(()); + let _ = mock_watcher + .expect_compacted_revision() + .return_const(-1_i64); let watcher = Arc::new(mock_watcher); let next_id = Arc::new(WatchIdGenerator::new(1)); let handle = tokio::spawn(WatchServer::::task( @@ -507,6 +525,9 @@ mod test { } }); let _ = mock_watcher.expect_cancel().return_const(()); + let _ = mock_watcher + .expect_compacted_revision() + .return_const(-1_i64); let kv_watcher = Arc::new(mock_watcher); let next_id_gen = Arc::new(WatchIdGenerator::new(1)); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); @@ -558,6 +579,7 @@ mod test { #[tokio::test] #[abort_on_panic] async fn test_watch_prev_kv() { + let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE); let index = Arc::new(Index::new()); let db = DB::open(&StorageConfig::Memory).unwrap(); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); @@ -565,11 +587,12 @@ mod test { let next_id_gen = Arc::new(WatchIdGenerator::new(1)); let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE); let kv_store = Arc::new(KvStore::new( + index, + Arc::clone(&db), + Arc::clone(&header_gen), kv_update_tx, + compact_tx, lease_collection, - Arc::clone(&header_gen), - Arc::clone(&db), - index, )); let shutdown_trigger = Arc::new(event_listener::Event::new()); let kv_watcher = KvWatcher::new_arc( @@ -631,6 +654,9 @@ mod test { let mut mock_watcher = MockKvWatcherOps::new(); let _ = mock_watcher.expect_watch().times(1).return_const(()); let _ = mock_watcher.expect_cancel().times(1).return_const(()); + let _ = mock_watcher + .expect_compacted_revision() + .return_const(-1_i64); let watcher = Arc::new(mock_watcher); let next_id = Arc::new(WatchIdGenerator::new(1)); let handle = tokio::spawn(WatchServer::::task( @@ -677,8 +703,7 @@ mod test { } #[tokio::test] - #[abort_on_panic] - async fn watch_task_should_be_terminated_when_response_tx_is_closed( + async fn watch_task_should_terminate_when_response_tx_closed( ) -> Result<(), Box> { let (req_tx, req_rx) = mpsc::channel(CHANNEL_SIZE); let (res_tx, res_rx) = mpsc::channel(CHANNEL_SIZE); @@ -688,6 +713,9 @@ mod test { let mut mock_watcher = MockKvWatcherOps::new(); let _ = mock_watcher.expect_watch().times(1).return_const(()); let _ = mock_watcher.expect_cancel().times(1).return_const(()); + let _ = mock_watcher + .expect_compacted_revision() + .return_const(-1_i64); let watcher = Arc::new(mock_watcher); let next_id = Arc::new(WatchIdGenerator::new(1)); let handle = tokio::spawn(WatchServer::::task( @@ -721,4 +749,78 @@ mod test { assert!(timeout(Duration::from_secs(10), handle).await.is_ok()); Ok(()) } + + #[tokio::test] + async fn watch_compacted_revision_should_fail() { + let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE); + let index = Arc::new(Index::new()); + let db = DB::open(&StorageConfig::Memory).unwrap(); + let header_gen = Arc::new(HeaderGenerator::new(0, 0)); + let lease_collection = Arc::new(LeaseCollection::new(0)); + let next_id_gen = Arc::new(WatchIdGenerator::new(1)); + let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE); + let kv_store = Arc::new(KvStore::new( + index, + Arc::clone(&db), + Arc::clone(&header_gen), + kv_update_tx, + compact_tx, + lease_collection, + )); + let shutdown_trigger = Arc::new(event_listener::Event::new()); + let kv_watcher = KvWatcher::new_arc( + Arc::clone(&kv_store), + kv_update_rx, + shutdown_trigger, + Duration::from_millis(10), + ); + put(&kv_store, &db, "foo", "old_bar", 2).await; + put(&kv_store, &db, "foo", "bar", 3).await; + put(&kv_store, &db, "foo", "new_bar", 4).await; + + kv_store.update_compacted_revision(3); + + let (req_tx, req_rx) = mpsc::channel(CHANNEL_SIZE); + let req_stream = ReceiverStream::new(req_rx); + let create_watch_req = move |watch_id: WatchId, start_rev: i64| WatchRequest { + request_union: Some(RequestUnion::CreateRequest(WatchCreateRequest { + watch_id, + key: "foo".into(), + start_revision: start_rev, + ..Default::default() + })), + }; + req_tx.send(Ok(create_watch_req(1, 2))).await.unwrap(); + req_tx.send(Ok(create_watch_req(2, 3))).await.unwrap(); + req_tx.send(Ok(create_watch_req(3, 4))).await.unwrap(); + let (res_tx, mut res_rx) = mpsc::channel(CHANNEL_SIZE); + let _hd = tokio::spawn(WatchServer::::task( + Arc::clone(&next_id_gen), + Arc::clone(&kv_watcher), + res_tx, + req_stream, + Arc::clone(&header_gen), + default_watch_progress_notify_interval(), + )); + + for i in 0..3 { + let watch_res = res_rx.recv().await.unwrap(); + if i == 0 { + if let Err(e) = watch_res { + assert_eq!(e.code(), tonic::Code::InvalidArgument, + "watch a compacted revision should return invalid_argument error, but found {e:?}" + ); + } else { + unreachable!( + "watch create request with a compacted revision should not be successful" + ) + } + } else { + assert!( + watch_res.is_ok(), + "watch create request with a valid revision should be successful" + ); + } + } + } } diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 9bc4b1084..012f81bb8 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -5,11 +5,11 @@ use clippy_utilities::{Cast, OverflowArithmetic}; use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator}; use event_listener::Event; use jsonwebtoken::{DecodingKey, EncodingKey}; -use tokio::net::TcpListener; +use tokio::{net::TcpListener, sync::mpsc::channel}; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::Server; use tonic_health::ServingStatus; -use utils::config::{ClientTimeout, CurpConfig, ServerTimeout, StorageConfig}; +use utils::config::{ClientTimeout, CompactConfig, CurpConfig, ServerTimeout, StorageConfig}; use super::{ auth_server::AuthServer, @@ -32,6 +32,7 @@ use crate::{ }, state::State, storage::{ + compact::{compactor, COMPACT_CHANNEL_SIZE}, index::Index, kvwatcher::KvWatcher, lease_store::LeaseCollection, @@ -57,6 +58,8 @@ pub struct XlineServer { client_timeout: ClientTimeout, /// Storage config, storage_cfg: StorageConfig, + /// Compact config + compact_cfg: CompactConfig, /// Server timeout server_timeout: ServerTimeout, /// Shutdown trigger @@ -78,6 +81,7 @@ impl XlineServer { client_timeout: ClientTimeout, server_timeout: ServerTimeout, storage_config: StorageConfig, + compact_config: CompactConfig, ) -> Self { Self { cluster_info, @@ -85,6 +89,7 @@ impl XlineServer { curp_cfg: Arc::new(curp_config), client_timeout, storage_cfg: storage_config, + compact_cfg: compact_config, server_timeout, shutdown_trigger: Arc::new(Event::new()), } @@ -107,7 +112,7 @@ impl XlineServer { /// Construct underlying storages, including `KvStore`, `LeaseStore`, `AuthStore` #[allow(clippy::type_complexity)] // it is easy to read #[inline] - fn construct_underlying_storages( + async fn construct_underlying_storages( &self, persistent: Arc, lease_collection: Arc, @@ -120,15 +125,25 @@ impl XlineServer { Arc>, Arc>, )> { + let (compact_task_tx, compact_task_rx) = channel(COMPACT_CHANNEL_SIZE); let index = Arc::new(Index::new()); - let (kv_update_tx, kv_update_rx) = tokio::sync::mpsc::channel(CHANNEL_SIZE); + let (kv_update_tx, kv_update_rx) = channel(CHANNEL_SIZE); let kv_storage = Arc::new(KvStore::new( + Arc::clone(&index), + Arc::clone(&persistent), + Arc::clone(&header_gen), kv_update_tx.clone(), + compact_task_tx, Arc::clone(&lease_collection), - Arc::clone(&header_gen), - Arc::clone(&persistent), + )); + let _hd = tokio::spawn(compactor( + Arc::clone(&kv_storage), Arc::clone(&index), + *self.compact_cfg.compact_batch_size(), + *self.compact_cfg.compact_sleep_interval(), + compact_task_rx, )); + // TODO: Boot up the compact policy scheduler let lease_storage = Arc::new(LeaseStore::new( Arc::clone(&lease_collection), Arc::clone(&header_gen), @@ -152,7 +167,7 @@ impl XlineServer { ); // lease storage must recover before kv storage lease_storage.recover()?; - kv_storage.recover()?; + kv_storage.recover().await?; auth_storage.recover()?; Ok((kv_storage, lease_storage, auth_storage, watcher)) } @@ -284,7 +299,8 @@ impl XlineServer { Arc::clone(&header_gen), Arc::clone(&auth_revision_gen), key_pair, - )?; + ) + .await?; let index_barrier = Arc::new(IndexBarrier::new()); let id_barrier = Arc::new(IdBarrier::new()); diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs new file mode 100644 index 000000000..4c335ad9e --- /dev/null +++ b/xline/src/storage/compact/mod.rs @@ -0,0 +1,42 @@ +use std::{sync::Arc, time::Duration}; + +use event_listener::Event; +use tokio::{sync::mpsc::Receiver, time::sleep}; + +use super::{ + index::{Index, IndexOperate}, + storage_api::StorageApi, + KvStore, +}; + +/// compact task channel size +pub(crate) const COMPACT_CHANNEL_SIZE: usize = 32; + +/// background compact executor +pub(crate) async fn compactor( + kv_store: Arc>, + index: Arc, + batch_limit: usize, + interval: Duration, + mut compact_task_rx: Receiver<(i64, Option>)>, +) where + DB: StorageApi, +{ + while let Some((revision, listener)) = compact_task_rx.recv().await { + let target_revisions = index + .compact(revision) + .into_iter() + .map(|key_rev| key_rev.as_revision().encode_to_vec()) + .collect::>>(); + // Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance? + for revision_chunk in target_revisions.chunks(batch_limit) { + if let Err(e) = kv_store.compact(revision_chunk) { + panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); + } + sleep(interval).await; + } + if let Some(notifier) = listener { + notifier.notify(usize::MAX); + } + } +} diff --git a/xline/src/storage/db.rs b/xline/src/storage/db.rs index 8e982fe53..d2294aaf8 100644 --- a/xline/src/storage/db.rs +++ b/xline/src/storage/db.rs @@ -9,7 +9,7 @@ use super::{ kv_store::KV_TABLE, lease_store::LEASE_TABLE, storage_api::StorageApi, - ExecuteError, Revision, + ExecuteError, }; use crate::{ rpc::{PbLease, Role, User}, @@ -26,6 +26,9 @@ pub(crate) const XLINE_TABLES: [&str; 6] = [ ROLE_TABLE, ]; +/// Key of compacted revision +pub(crate) const COMPACT_REVISION: &str = "compact_revision"; + /// Database to store revision to kv mapping #[derive(Debug)] pub struct DB { @@ -130,8 +133,7 @@ impl StorageApi for DB { .collect::>(); for op in ops { let wop = match op { - WriteOp::PutKeyValue(rev, value) => { - let key = rev.encode_to_vec(); + WriteOp::PutKeyValue(key, value) => { WriteOperation::new_put(KV_TABLE, key, value.clone()) } WriteOp::PutAppliedIndex(index) => WriteOperation::new_put( @@ -144,6 +146,12 @@ impl StorageApi for DB { lease.id.encode_to_vec(), lease.encode_to_vec(), ), + WriteOp::PutCompactRevision(rev) => WriteOperation::new_put( + META_TABLE, + COMPACT_REVISION.as_bytes().to_vec(), + rev.to_le_bytes().to_vec(), + ), + WriteOp::DeleteKeyValue(rev) => WriteOperation::new_delete(KV_TABLE, rev), WriteOp::DeleteLease(lease_id) => { let key = del_lease_key_buffer.get(&lease_id).unwrap_or_else(|| { panic!("lease_id({lease_id}) is not in del_lease_key_buffer") @@ -189,11 +197,15 @@ impl StorageApi for DB { #[non_exhaustive] pub enum WriteOp<'a> { /// Put a key-value pair to kv table - PutKeyValue(Revision, Vec), + PutKeyValue(Vec, Vec), /// Put the applied index to meta table PutAppliedIndex(u64), /// Put a lease to lease table PutLease(PbLease), + /// Put a compacted revision into meta table + PutCompactRevision(i64), + /// Delete a key-value pair from kv table + DeleteKeyValue(&'a [u8]), /// Delete a lease from lease table DeleteLease(i64), /// Put a auth enable flag to auth table @@ -218,14 +230,15 @@ mod test { use test_macros::abort_on_panic; use super::*; + use crate::storage::Revision; #[tokio::test] #[abort_on_panic] async fn test_reset() -> Result<(), ExecuteError> { let data_dir = PathBuf::from("/tmp/test_reset"); let db = DB::open(&StorageConfig::RocksDB(data_dir.clone()))?; - let revision = Revision::new(1, 1); - let key = revision.encode_to_vec(); + let revision = Revision::new(1, 1).encode_to_vec(); + let key = revision.clone(); let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())]; db.flush_ops(ops)?; let res = db.get_value(KV_TABLE, &key)?; @@ -251,8 +264,8 @@ mod test { let snapshot_path = dir.join("snapshot"); let origin_db = DB::open(&StorageConfig::RocksDB(origin_db_path))?; - let revision = Revision::new(1, 1); - let key = revision.encode_to_vec(); + let revision = Revision::new(1, 1).encode_to_vec(); + let key: Vec = revision.clone(); let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())]; origin_db.flush_ops(ops)?; @@ -323,7 +336,7 @@ mod test { }; let role_bytes = role.encode_to_vec(); let write_ops = vec![ - WriteOp::PutKeyValue(Revision::new(1, 2), "value".into()), + WriteOp::PutKeyValue(Revision::new(1, 2).encode_to_vec(), "value".into()), WriteOp::PutAppliedIndex(5), WriteOp::PutLease(lease), WriteOp::PutAuthEnable(true), diff --git a/xline/src/storage/index.rs b/xline/src/storage/index.rs index 33fb259c2..997d66ce1 100644 --- a/xline/src/storage/index.rs +++ b/xline/src/storage/index.rs @@ -68,10 +68,10 @@ impl Index { revision: i64, sub_revision: i64, ) -> Option<(Revision, Revision)> { - let last_rev = Self::get_revision(revs, 0)?; + let last_available_rev = Self::get_revision(revs, 0)?; let del_rev = KeyRevision::new_deletion(revision, sub_revision); revs.push(del_rev); - Some((last_rev, del_rev.as_revision())) + Some((last_available_rev, del_rev.as_revision())) } /// Mark the `KeyRevision` as available @@ -129,7 +129,10 @@ pub(super) trait IndexOperate { version: i64, ); - // TODO: fn compact(rev:i64) + /// Compact a `KeyRevision` by removing the versions with smaller or equal + /// revision than the given atRev except the largest one (If the largest one is + /// a tombstone, it will not be kept). + fn compact(&self, at_rev: i64) -> Vec; } impl IndexOperate for Index { @@ -287,6 +290,42 @@ impl IndexOperate for Index { .or_insert_with(Vec::new) .push(new_rev); } + + fn compact(&self, at_rev: i64) -> Vec { + let mut revs = Vec::new(); + let mut del_keys = Vec::new(); + + let mut inner = self.inner.lock(); + inner.index.iter_mut().for_each(|(key, revisions)| { + if let Some(revision) = revisions.first() { + if revision.mod_revision < at_rev { + let pivot = revisions.partition_point(|rev| rev.mod_revision <= at_rev); + let compacted_last_idx = pivot.overflow_sub(1); + // There is at least 1 element in the first partition, so the key revision at `compacted_last_idx` + // must exist. + let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| { + unreachable!( + "Oops, the key revision at {compacted_last_idx} should not be None", + ) + }); + let compact_revs = if key_rev.is_deleted() { + revisions.drain(..=compacted_last_idx) + } else { + revisions.drain(..compacted_last_idx) + }; + revs.extend(compact_revs.into_iter()); + + if revisions.is_empty() { + del_keys.push(key.clone()); + } + } + } + }); + for key in del_keys { + let _ignore = inner.index.remove(&key); + } + revs + } } #[cfg(test)] @@ -309,12 +348,20 @@ mod test { index.insert_or_update_revision(b"key", 3, 1); index.insert_or_update_revision(b"foo", 4, 5); index.insert_or_update_revision(b"bar", 5, 4); + index.insert_or_update_revision(b"foo", 6, 6); + index.insert_or_update_revision(b"bar", 7, 7); + index.insert_or_update_revision(b"foo", 8, 8); + index.insert_or_update_revision(b"bar", 9, 9); index.mark_available(1); index.mark_available(2); index.mark_available(3); index.mark_available(4); index.mark_available(5); + index.mark_available(6); + index.mark_available(7); + index.mark_available(8); + index.mark_available(9); assert_eq!( index.inner.lock().index, @@ -329,11 +376,19 @@ mod test { ), ( b"foo".to_vec(), - vec![KeyRevision::new(4, 1, 4, 5).mark_available()] + vec![ + KeyRevision::new(4, 1, 4, 5).mark_available(), + KeyRevision::new(4, 2, 6, 6).mark_available(), + KeyRevision::new(4, 3, 8, 8).mark_available() + ] ), ( b"bar".to_vec(), - vec![KeyRevision::new(5, 1, 5, 4).mark_available()] + vec![ + KeyRevision::new(5, 1, 5, 4).mark_available(), + KeyRevision::new(5, 2, 7, 7).mark_available(), + KeyRevision::new(5, 3, 9, 9).mark_available(), + ] ) ]) ); @@ -352,14 +407,25 @@ mod test { ); assert_eq!( index.get_from_rev(b"a", b"g", 3), - vec![Revision::new(4, 5), Revision::new(5, 4)] + vec![ + Revision::new(4, 5), + Revision::new(5, 4), + Revision::new(6, 6), + Revision::new(7, 7), + Revision::new(8, 8), + Revision::new(9, 9) + ] ); assert_eq!( index.get_from_rev(b"\0", b"\0", 3), vec![ Revision::new(3, 1), Revision::new(4, 5), - Revision::new(5, 4) + Revision::new(5, 4), + Revision::new(6, 6), + Revision::new(7, 7), + Revision::new(8, 8), + Revision::new(9, 9) ] ); } @@ -369,27 +435,27 @@ mod test { let index = init_and_test_insert(); assert_eq!( - index.delete(b"key", b"", 6, 0), + index.delete(b"key", b"", 10, 0), ( - vec![(Revision::new(3, 1), Revision::new(6, 0))], + vec![(Revision::new(3, 1), Revision::new(10, 0))], vec![b"key".to_vec()] ) ); - index.mark_available(6); + index.mark_available(10); assert_eq!( - index.delete(b"a", b"g", 7, 0), + index.delete(b"a", b"g", 11, 0), ( vec![ - (Revision::new(5, 4), Revision::new(7, 0)), - (Revision::new(4, 5), Revision::new(7, 1)), + (Revision::new(9, 9), Revision::new(11, 0)), + (Revision::new(8, 8), Revision::new(11, 1)), ], vec![b"bar".to_vec(), b"foo".to_vec()] ) ); - index.mark_available(7); + index.mark_available(11); - assert_eq!(index.delete(b"\0", b"\0", 8, 0), (vec![], vec![])); + assert_eq!(index.delete(b"\0", b"\0", 12, 0), (vec![], vec![])); assert_eq!( index.inner.lock().index, @@ -400,21 +466,25 @@ mod test { KeyRevision::new(1, 1, 1, 3).mark_available(), KeyRevision::new(1, 2, 2, 2).mark_available(), KeyRevision::new(1, 3, 3, 1).mark_available(), - KeyRevision::new_deletion(6, 0).mark_available() + KeyRevision::new_deletion(10, 0).mark_available() ] ), ( b"foo".to_vec(), vec![ KeyRevision::new(4, 1, 4, 5).mark_available(), - KeyRevision::new_deletion(7, 1).mark_available() + KeyRevision::new(4, 2, 6, 6).mark_available(), + KeyRevision::new(4, 3, 8, 8).mark_available(), + KeyRevision::new_deletion(11, 1).mark_available() ] ), ( b"bar".to_vec(), vec![ KeyRevision::new(5, 1, 5, 4).mark_available(), - KeyRevision::new_deletion(7, 0).mark_available() + KeyRevision::new(5, 2, 7, 7).mark_available(), + KeyRevision::new(5, 3, 9, 9).mark_available(), + KeyRevision::new_deletion(11, 0).mark_available() ] ) ]) @@ -444,4 +514,82 @@ mod test { ]) ); } + + #[test] + fn test_compact() { + let index = init_and_test_insert(); + let res = index.compact(7); + assert_eq!( + index.inner.lock().index, + BTreeMap::from_iter(vec![ + ( + b"key".to_vec(), + vec![KeyRevision::new(1, 3, 3, 1).mark_available(),] + ), + ( + b"foo".to_vec(), + vec![ + KeyRevision::new(4, 2, 6, 6).mark_available(), + KeyRevision::new(4, 3, 8, 8).mark_available() + ] + ), + ( + b"bar".to_vec(), + vec![ + KeyRevision::new(5, 2, 7, 7).mark_available(), + KeyRevision::new(5, 3, 9, 9).mark_available(), + ] + ) + ]) + ); + assert_eq!( + res, + vec![ + KeyRevision::new(5, 1, 5, 4).mark_available(), + KeyRevision::new(4, 1, 4, 5).mark_available(), + KeyRevision::new(1, 1, 1, 3).mark_available(), + KeyRevision::new(1, 2, 2, 2).mark_available(), + ] + ); + } + + #[test] + fn test_compact_with_deletion() { + let index = init_and_test_insert(); + index.delete(b"a", b"g", 10, 0); + index.mark_available(10); + + index.insert_or_update_revision(b"bar", 11, 0); + index.mark_available(11); + + let res = index.compact(10); + assert_eq!( + index.inner.lock().index, + BTreeMap::from_iter(vec![ + ( + b"key".to_vec(), + vec![KeyRevision::new(1, 3, 3, 1).mark_available(),] + ), + ( + b"bar".to_vec(), + vec![KeyRevision::new(11, 1, 11, 0).mark_available(),] + ) + ]) + ); + assert_eq!( + res, + vec![ + KeyRevision::new(5, 1, 5, 4).mark_available(), + KeyRevision::new(5, 2, 7, 7).mark_available(), + KeyRevision::new(5, 3, 9, 9).mark_available(), + KeyRevision::new(0, 0, 10, 0).mark_available(), + KeyRevision::new(4, 1, 4, 5).mark_available(), + KeyRevision::new(4, 2, 6, 6).mark_available(), + KeyRevision::new(4, 3, 8, 8).mark_available(), + KeyRevision::new(0, 0, 10, 1).mark_available(), + KeyRevision::new(1, 1, 1, 3).mark_available(), + KeyRevision::new(1, 2, 2, 2).mark_available(), + ] + ); + } } diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index 107617cdd..61d6e06ca 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -1,7 +1,10 @@ use std::{ cmp::Ordering, collections::{HashMap, VecDeque}, - sync::Arc, + sync::{ + atomic::{AtomicI64, Ordering::Relaxed}, + Arc, + }, }; use clippy_utilities::{Cast, OverflowArithmetic}; @@ -19,13 +22,16 @@ use crate::{ header_gen::HeaderGenerator, revision_number::RevisionNumberGenerator, rpc::{ - Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, Event, - EventType, KeyValue, PutRequest, PutResponse, RangeRequest, RangeResponse, Request, - RequestWithToken, RequestWrapper, ResponseWrapper, SortOrder, SortTarget, TargetUnion, - TxnRequest, TxnResponse, + CompactionRequest, CompactionResponse, Compare, CompareResult, CompareTarget, + DeleteRangeRequest, DeleteRangeResponse, Event, EventType, KeyValue, PutRequest, + PutResponse, RangeRequest, RangeResponse, Request, RequestWithToken, RequestWrapper, + ResponseWrapper, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse, + }, + server::command::{CommandResponse, KeyRange, SyncResponse, META_TABLE}, + storage::{ + db::{WriteOp, COMPACT_REVISION}, + ExecuteError, }, - server::command::{CommandResponse, KeyRange, SyncResponse}, - storage::{db::WriteOp, ExecuteError}, }; /// KV table name @@ -41,12 +47,16 @@ where index: Arc, /// DB to store key value db: Arc, + /// Compacted Revision + compacted_rev: AtomicI64, /// Revision revision: Arc, /// Header generator header_gen: Arc, /// KV update sender kv_update_tx: mpsc::Sender<(i64, Vec)>, + /// Compact task submit sender + compact_task_tx: mpsc::Sender<(i64, Option>)>, /// Lease collection lease_collection: Arc, } @@ -76,7 +86,7 @@ where } /// Recover data from persistent storage - pub(crate) fn recover(&self) -> Result<(), ExecuteError> { + pub(crate) async fn recover(&self) -> Result<(), ExecuteError> { let mut key_to_lease: HashMap, i64> = HashMap::new(); let kvs = self.db.get_all(KV_TABLE)?; @@ -109,8 +119,22 @@ where self.attach(lease_id, key)?; } - // compact Lock free - + if let Some(revision_bytes) = self.db.get_value(META_TABLE, COMPACT_REVISION)? { + let compacted_revision = + i64::from_le_bytes(revision_bytes.try_into().map_err(|e| { + ExecuteError::DbError(format!( + "cannot decode compacted revision from META_TABLE: {e:?}" + )) + })?); + assert!( + compacted_revision >= -1 && compacted_revision <= current_rev, + "compacted revision corruption, which ({compacted_revision}) must belong to the range [-1, {current_rev}]" + ); + self.update_compacted_revision(compacted_revision); + if let Err(e) = self.compact_task_tx.send((compacted_revision, None)).await { + panic!("the compactor exited unexpectedly: {e:?}"); + } + } Ok(()) } @@ -126,18 +150,21 @@ where { /// New `KvStore` pub(crate) fn new( + index: Arc, + db: Arc, + header_gen: Arc, kv_update_tx: mpsc::Sender<(i64, Vec)>, + compact_task_tx: mpsc::Sender<(i64, Option>)>, lease_collection: Arc, - header_gen: Arc, - db: Arc, - index: Arc, ) -> Self { Self { index, db, + compacted_rev: AtomicI64::new(-1), revision: header_gen.revision_arc(), header_gen, kv_update_tx, + compact_task_tx, lease_collection, } } @@ -147,6 +174,16 @@ where self.revision.get() } + /// Get compacted revision of KV store + pub(crate) fn compacted_revision(&self) -> i64 { + self.compacted_rev.load(Relaxed) + } + + /// Update compacted revision of KV store + pub(crate) fn update_compacted_revision(&self, revision: i64) { + self.compacted_rev.store(revision, Relaxed); + } + /// Notify KV changes to KV watcher async fn notify_updates(&self, revision: i64, updates: Vec) { assert!( @@ -316,6 +353,16 @@ where fn attach(&self, lease_id: i64, key: impl Into>) -> Result<(), ExecuteError> { self.lease_collection.attach(lease_id, key.into()) } + + /// Compact kv storage + pub(crate) fn compact(&self, revisions: &[Vec]) -> Result<(), ExecuteError> { + let mut ops = Vec::new(); + revisions + .iter() + .for_each(|rev| ops.push(WriteOp::DeleteKeyValue(rev.as_ref()))); + self.db.flush_ops(ops)?; + Ok(()) + } } /// db operations @@ -434,7 +481,14 @@ where RequestWrapper::DeleteRangeRequest(ref req) => { self.handle_delete_range_request(req).map(Into::into) } - RequestWrapper::TxnRequest(ref req) => self.handle_txn_request(req).map(Into::into), + RequestWrapper::TxnRequest(ref req) => { + debug!("Receive TxnRequest {:?}", req); + self.handle_txn_request(req).map(Into::into) + } + RequestWrapper::CompactionRequest(ref req) => { + debug!("Receive CompactionRequest {:?}", req); + Ok(self.handle_compaction_request(req).into()) + } _ => unreachable!("Other request should not be sent to this store"), }; res @@ -547,6 +601,19 @@ where }) } + /// Handle `CompactionRequest` + fn handle_compaction_request(&self, req: &CompactionRequest) -> CompactionResponse { + let target_revision = req.revision; + debug_assert!( + target_revision > self.compacted_revision(), + "required revision should not be compacted" + ); + self.update_compacted_revision(target_revision); + CompactionResponse { + header: Some(self.header_gen.gen_header_without_revision()), + } + } + /// Sync requests in kv store async fn sync_request( &self, @@ -562,6 +629,9 @@ where self.sync_delete_range_request(req, revision, 0) } RequestWrapper::TxnRequest(ref req) => self.sync_txn_request(req, revision)?, + RequestWrapper::CompactionRequest(ref req) => { + self.sync_compaction_request(req, revision).await? + } _ => { unreachable!("only kv requests can be sent to kv store"); } @@ -570,6 +640,34 @@ where Ok((revision, ops)) } + /// Sync `CompactionRequest` and return if kvstore is changed + async fn sync_compaction_request( + &self, + req: &CompactionRequest, + _revision: i64, + ) -> Result<(Vec, Vec), ExecuteError> { + let revision = req.revision; + let ops = vec![WriteOp::PutCompactRevision(revision)]; + // TODO: Remove the physical process logic here. It's better to move into the KvServer + #[allow(clippy::collapsible_else_if)] + if req.physical { + let event = Arc::new(event_listener::Event::new()); + if let Err(e) = self + .compact_task_tx + .send((revision, Some(Arc::clone(&event)))) + .await + { + panic!("the compactor exited unexpectedly: {e:?}"); + } + event.listen().await; + } else { + if let Err(e) = self.compact_task_tx.send((revision, None)).await { + panic!("the compactor exited unexpectedly: {e:?}"); + } + } + Ok((ops, Vec::new())) + } + /// Sync `TxnRequest` and return if kvstore is changed fn sync_txn_request( &self, @@ -650,7 +748,7 @@ where .unwrap_or_else(|e| panic!("unexpected error from lease Attach: {e}")); } ops.push(WriteOp::PutKeyValue( - new_rev.as_revision(), + new_rev.as_revision().encode_to_vec(), kv.encode_to_vec(), )); let event = Event { @@ -696,7 +794,7 @@ where ..KeyValue::default() }; let value = del_kv.encode_to_vec(); - WriteOp::PutKeyValue(new_rev, value) + WriteOp::PutKeyValue(new_rev.encode_to_vec(), value) }) .collect() } @@ -753,7 +851,11 @@ mod test { use crate::{ revision_number::RevisionNumberGenerator, rpc::RequestOp, - storage::{db::DB, kvwatcher::KvWatcher}, + storage::{ + compact::{compactor, COMPACT_CHANNEL_SIZE}, + db::DB, + kvwatcher::KvWatcher, + }, }; const CHANNEL_SIZE: usize = 1024; @@ -772,8 +874,8 @@ mod test { db: Arc, ) -> Result<(Arc>, RevisionNumberGenerator), ExecuteError> { let store = init_empty_store(db); - let keys = vec!["a", "b", "c", "d", "e"]; - let vals = vec!["a", "b", "c", "d", "e"]; + let keys = vec!["a", "b", "c", "d", "e", "z", "z", "z"]; + let vals = vec!["a", "b", "c", "d", "e", "z1", "z2", "z3"]; let revision = RevisionNumberGenerator::default(); for (key, val) in keys.into_iter().zip(vals.into_iter()) { let req = RequestWithToken::new( @@ -790,16 +892,18 @@ mod test { } fn init_empty_store(db: Arc) -> Arc> { + let (compact_tx, compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE); let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE); let lease_collection = Arc::new(LeaseCollection::new(0)); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); let index = Arc::new(Index::new()); let storage = Arc::new(KvStore::new( + Arc::clone(&index), + db, + header_gen, kv_update_tx, + compact_tx, lease_collection, - header_gen, - db, - index, )); let shutdown_trigger = Arc::new(event_listener::Event::new()); let _watcher = KvWatcher::new_arc( @@ -808,6 +912,13 @@ mod test { shutdown_trigger, Duration::from_millis(10), ); + let _compactor = tokio::spawn(compactor( + Arc::clone(&storage), + index, + 1000, + Duration::from_millis(10), + compact_rx, + )); storage } @@ -822,6 +933,15 @@ mod test { Ok(()) } + fn index_compact(store: &Arc>, at_rev: i64) -> Vec> { + store + .index + .compact(at_rev) + .into_iter() + .map(|key_rev| key_rev.as_revision().encode_to_vec()) + .collect::>>() + } + #[tokio::test] #[abort_on_panic] async fn test_keys_only() -> Result<(), ExecuteError> { @@ -835,7 +955,7 @@ mod test { ..Default::default() }; let response = store.handle_range_request(&request)?; - assert_eq!(response.kvs.len(), 5); + assert_eq!(response.kvs.len(), 6); for kv in response.kvs { assert!(kv.value.is_empty()); } @@ -878,7 +998,7 @@ mod test { ..Default::default() }; let response = store.handle_range_request(&request)?; - assert_eq!(response.count, 5); + assert_eq!(response.count, 6); assert_eq!(response.kvs.len(), 2); assert_eq!(response.kvs[0].create_revision, 2); assert_eq!(response.kvs[1].create_revision, 3); @@ -891,8 +1011,9 @@ mod test { async fn test_range_sort() -> Result<(), ExecuteError> { let db = DB::open(&StorageConfig::Memory)?; let (store, _rev) = init_store(db).await?; - let keys = ["a", "b", "c", "d", "e"]; - let reversed_keys = ["e", "d", "c", "b", "a"]; + let keys = ["a", "b", "c", "d", "e", "z"]; + let reversed_keys = ["z", "e", "d", "c", "b", "a"]; + let version_keys = ["z", "a", "b", "c", "d", "e"]; for order in [SortOrder::Ascend, SortOrder::Descend, SortOrder::None] { for target in [ @@ -902,30 +1023,43 @@ mod test { SortTarget::Value, ] { let response = store.handle_range_request(&sort_req(order, target))?; - assert_eq!(response.count, 5); - assert_eq!(response.kvs.len(), 5); - let expected = match order { + assert_eq!(response.count, 6); + assert_eq!(response.kvs.len(), 6); + let expected: [&str; 6] = match order { SortOrder::Descend => reversed_keys, SortOrder::Ascend | SortOrder::None => keys, }; - let is_identical = response - .kvs - .iter() - .zip(expected.iter()) - .all(|(kv, want)| kv.key == want.as_bytes()); - assert!(is_identical); + for (kv, want) in response.kvs.iter().zip(expected.iter()) { + assert_eq!( + kv.key, + want.as_bytes(), + "order: {:?}, target: {:?}, key {:?}, want {:?}", + order, + target, + kv.key, + want.as_bytes(), + ); + } } } for order in [SortOrder::Ascend, SortOrder::Descend, SortOrder::None] { let response = store.handle_range_request(&sort_req(order, SortTarget::Version))?; - assert_eq!(response.count, 5); - assert_eq!(response.kvs.len(), 5); - let is_identical = response - .kvs - .iter() - .zip(keys.iter()) - .all(|(kv, want)| kv.key == want.as_bytes()); - assert!(is_identical); + assert_eq!(response.count, 6); + assert_eq!(response.kvs.len(), 6); + let expected = match order { + SortOrder::Ascend | SortOrder::None => keys, + SortOrder::Descend => version_keys, + }; + for (kv, want) in response.kvs.iter().zip(expected.iter()) { + assert_eq!( + kv.key, + want.as_bytes(), + "order: {:?}, key {:?}, want {:?}", + order, + kv.key, + want.as_bytes(), + ); + } } Ok(()) } @@ -934,7 +1068,10 @@ mod test { #[abort_on_panic] async fn test_recover() -> Result<(), ExecuteError> { let db = DB::open(&StorageConfig::Memory)?; - let _store = init_store(Arc::clone(&db)).await?; + let ops = vec![WriteOp::PutCompactRevision(8)]; + db.flush_ops(ops)?; + let (store, _rev_gen) = init_store(Arc::clone(&db)).await?; + assert_eq!(store.index.get_from_rev(b"z", b"", 5).len(), 3); let new_store = init_empty_store(db); @@ -945,13 +1082,16 @@ mod test { }; let res = new_store.handle_range_request(&range_req)?; assert_eq!(res.kvs.len(), 0); + assert_eq!(new_store.compacted_revision(), -1); - new_store.recover()?; + new_store.recover().await?; let res = new_store.handle_range_request(&range_req)?; assert_eq!(res.kvs.len(), 1); assert_eq!(res.kvs[0].key, b"a"); - + assert_eq!(new_store.compacted_revision(), 8); + tokio::time::sleep(Duration::from_millis(500)).await; + assert_eq!(new_store.index.get_from_rev(b"z", b"", 5).len(), 2); Ok(()) } @@ -1045,4 +1185,104 @@ mod test { ); handle.await.unwrap(); } + + #[tokio::test] + async fn test_compaction() -> Result<(), ExecuteError> { + let db = DB::open(&StorageConfig::Memory)?; + let store = init_empty_store(db); + let revision = RevisionNumberGenerator::default(); + // sample requests: (a, 1) (b, 2) (a, 3) (del a) + // their revisions: 2 3 4 5 + let requests = vec![ + RequestWithToken::new( + PutRequest { + key: "a".into(), + value: "1".into(), + ..Default::default() + } + .into(), + ), + RequestWithToken::new( + PutRequest { + key: "b".into(), + value: "2".into(), + ..Default::default() + } + .into(), + ), + RequestWithToken::new( + PutRequest { + key: "a".into(), + value: "3".into(), + ..Default::default() + } + .into(), + ), + RequestWithToken::new( + DeleteRangeRequest { + key: "a".into(), + ..Default::default() + } + .into(), + ), + ]; + + for req in requests { + exe_as_and_flush(&store, &req, revision.next()) + .await + .unwrap(); + } + + let target_revisions = index_compact(&store, 3); + store.compact(target_revisions.as_ref())?; + assert_eq!( + store.get_range(b"a", b"", 2).unwrap().len(), + 1, + "(a, 1) should not be removed" + ); + assert_eq!( + store.get_range(b"b", b"", 3).unwrap().len(), + 1, + "(b, 2) should not be removed" + ); + + let target_revisions = index_compact(&store, 4); + store.compact(target_revisions.as_ref())?; + assert!( + store.get_range(b"a", b"", 2).unwrap().is_empty(), + "(a, 1) should be removed" + ); + assert_eq!( + store.get_range(b"b", b"", 3).unwrap().len(), + 1, + "(b, 2) should not be removed" + ); + assert_eq!( + store.get_range(b"a", b"", 4).unwrap().len(), + 1, + "(a, 3) should not be removed" + ); + + let target_revisions = index_compact(&store, 5); + store.compact(target_revisions.as_ref())?; + assert!( + store.get_range(b"a", b"", 2).unwrap().is_empty(), + "(a, 1) should be removed" + ); + assert_eq!( + store.get_range(b"b", b"", 3).unwrap().len(), + 1, + "(b, 2) should not be removed" + ); + assert!( + store.get_range(b"a", b"", 4).unwrap().is_empty(), + "(a, 3) should be removed" + ); + assert!( + store.get_range(b"a", b"", 5).unwrap().is_empty(), + "(a, 4) should be removed" + ); + + Ok(()) + } } diff --git a/xline/src/storage/kvwatcher.rs b/xline/src/storage/kvwatcher.rs index 8086dc205..0f373c9db 100644 --- a/xline/src/storage/kvwatcher.rs +++ b/xline/src/storage/kvwatcher.rs @@ -298,6 +298,9 @@ pub(crate) trait KvWatcherOps { /// Get Prev `KeyValue` of a `KeyValue` fn get_prev_kv(&self, kv: &KeyValue) -> Option; + + /// Get compacted revision from backend store + fn compacted_revision(&self) -> i64; } #[async_trait::async_trait] @@ -305,7 +308,6 @@ impl KvWatcherOps for KvWatcher where S: StorageApi, { - /// Create a watch to KV store fn watch( &self, id: WatchId, @@ -354,7 +356,6 @@ where watcher_map_w.register(watcher); } - /// Cancel a watch from KV store fn cancel(&self, watch_id: WatchId) { self.watcher_map.write().remove(watch_id); } @@ -362,6 +363,10 @@ where fn get_prev_kv(&self, kv: &KeyValue) -> Option { self.storage.get_prev_kv(kv) } + + fn compacted_revision(&self) -> i64 { + self.storage.compacted_revision() + } } impl KvWatcher @@ -546,21 +551,26 @@ mod test { use crate::{ header_gen::HeaderGenerator, rpc::{PutRequest, RequestWithToken}, - storage::{db::DB, index::Index, lease_store::LeaseCollection, KvStore}, + storage::{ + compact::COMPACT_CHANNEL_SIZE, db::DB, index::Index, lease_store::LeaseCollection, + KvStore, + }, }; fn init_empty_store() -> (Arc>, Arc, Arc>) { + let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE); let db = DB::open(&StorageConfig::Memory).unwrap(); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); let index = Arc::new(Index::new()); let lease_collection = Arc::new(LeaseCollection::new(0)); let (kv_update_tx, kv_update_rx) = mpsc::channel(128); let store = Arc::new(KvStore::new( + index, + Arc::clone(&db), + header_gen, kv_update_tx, + compact_tx, lease_collection, - header_gen, - Arc::clone(&db), - index, )); let shutdown_trigger = Arc::new(event_listener::Event::new()); let sync_victims_interval = Duration::from_millis(10); diff --git a/xline/src/storage/mod.rs b/xline/src/storage/mod.rs index 8eb2b6d4c..fc9cf38dc 100644 --- a/xline/src/storage/mod.rs +++ b/xline/src/storage/mod.rs @@ -1,5 +1,7 @@ /// Storage for Auth pub(crate) mod auth_store; +/// Compactor +pub(crate) mod compact; /// Database module pub mod db; /// Execute error diff --git a/xline/src/storage/revision.rs b/xline/src/storage/revision.rs index 652951412..c47d061da 100644 --- a/xline/src/storage/revision.rs +++ b/xline/src/storage/revision.rs @@ -18,7 +18,7 @@ pub(super) struct KeyRevision { /// Revision #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct Revision { +pub(crate) struct Revision { /// Main revision revision: i64, /// Sub revision in one transaction or range deletion diff --git a/xlineapi/src/lib.rs b/xlineapi/src/lib.rs index 08c5b557f..421717b7f 100644 --- a/xlineapi/src/lib.rs +++ b/xlineapi/src/lib.rs @@ -458,7 +458,9 @@ impl RequestWrapper { /// Check whether the kv request or lease request should skip the revision or not pub fn skip_general_revision(&self) -> bool { match self { - RequestWrapper::RangeRequest(_) | RequestWrapper::LeaseGrantRequest(_) => true, + RequestWrapper::RangeRequest(_) + | RequestWrapper::LeaseGrantRequest(_) + | RequestWrapper::CompactionRequest(_) => true, RequestWrapper::TxnRequest(req) => req.is_read_only(), _ => false, } @@ -474,6 +476,14 @@ impl RequestWrapper { self.backend() == RequestBackend::Kv } + pub fn is_compaction_request(&self) -> bool { + matches!(*self, RequestWrapper::CompactionRequest(_)) + } + + pub fn is_txn_request(&self) -> bool { + matches!(*self, RequestWrapper::TxnRequest(_)) + } + pub fn is_lease_read_request(&self) -> bool { matches!(*self, RequestWrapper::LeaseLeasesRequest(_)) } @@ -660,6 +670,22 @@ impl TxnRequest { }; self.success.iter().all(read_only_checker) && self.failure.iter().all(read_only_checker) } + + /// Checks whether a `TxnRequest` is conflict with a given revision + pub fn is_conflict_with_rev(&self, revision: i64) -> bool { + let conflict_checker = |req: &RequestOp| { + if let Some(ref request) = req.request { + match request { + Request::RequestRange(req) => req.revision > 0 && req.revision < revision, + Request::RequestDeleteRange(_) | Request::RequestPut(_) => false, + Request::RequestTxn(req) => req.is_conflict_with_rev(revision), + } + } else { + false + } + }; + self.success.iter().any(conflict_checker) || self.failure.iter().any(conflict_checker) + } } #[cfg(test)]