From a48b3033f8e9635cb7952af16a5f9d1416e6a91b Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Tue, 11 Jul 2023 16:53:58 +0800 Subject: [PATCH] refactor: refactor compact mod * unify the argument between PutKeyValue and DeleteKeyValue * move compact.rs into compact mod * using a bounded channel to replace the unbounded channel (compact_task_tx, compact_task_rx) Signed-off-by: Phoeniix Zhao --- xline/src/server/kv_server.rs | 4 +- xline/src/server/watch_server.rs | 8 ++-- xline/src/server/xline_server.rs | 15 +++--- .../storage/{compact.rs => compact/mod.rs} | 7 ++- xline/src/storage/db.rs | 18 +++---- xline/src/storage/index.rs | 48 +++++++------------ xline/src/storage/kv_store.rs | 25 ++++++---- xline/src/storage/kvwatcher.rs | 7 ++- xline/src/storage/revision.rs | 2 +- 9 files changed, 65 insertions(+), 69 deletions(-) rename xline/src/storage/{compact.rs => compact/mod.rs} (86%) diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 84dd2c933..38bc4e235 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -100,6 +100,7 @@ where .iter() .map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice())) .collect(), + RequestWrapper::CompactionRequest(ref _req) => Vec::new(), _ => unreachable!("Other request should not be sent to this store"), }; Command::new(key_ranges, wrapper, propose_id) @@ -222,7 +223,7 @@ where range_revision: i64, compacted_revision: i64, ) -> Result<(), tonic::Status> { - (range_revision >= compacted_revision) + (range_revision <= 0 || range_revision >= compacted_revision) .then_some(()) .ok_or(tonic::Status::invalid_argument(format!( "required revision {range_revision} has been compacted, compacted revision is {compacted_revision}" @@ -671,7 +672,6 @@ mod test { .unwrap_err() .to_string(); assert_eq!(message, expected_err_message); - assert_eq!(message, expected_err_message); } #[tokio::test] diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index bc6af9865..383a7b6eb 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -436,8 +436,8 @@ mod test { use crate::{ rpc::{PutRequest, RequestWithToken, WatchProgressRequest}, storage::{ - db::DB, index::Index, kvwatcher::MockKvWatcherOps, lease_store::LeaseCollection, - KvStore, + compact::COMPACT_CHANNEL_SIZE, db::DB, index::Index, kvwatcher::MockKvWatcherOps, + lease_store::LeaseCollection, KvStore, }, }; @@ -579,7 +579,7 @@ mod test { #[tokio::test] #[abort_on_panic] async fn test_watch_prev_kv() { - let (compact_tx, _compact_rx) = mpsc::unbounded_channel(); + let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE); let index = Arc::new(Index::new()); let db = DB::open(&StorageConfig::Memory).unwrap(); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); @@ -752,7 +752,7 @@ mod test { #[tokio::test] async fn watch_compacted_revision_should_fail() { - let (compact_tx, _compact_rx) = mpsc::unbounded_channel(); + let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE); let index = Arc::new(Index::new()); let db = DB::open(&StorageConfig::Memory).unwrap(); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 294e37f2e..012f81bb8 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -5,7 +5,7 @@ use clippy_utilities::{Cast, OverflowArithmetic}; use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator}; use event_listener::Event; use jsonwebtoken::{DecodingKey, EncodingKey}; -use tokio::{net::TcpListener, sync::mpsc::unbounded_channel}; +use tokio::{net::TcpListener, sync::mpsc::channel}; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::Server; use tonic_health::ServingStatus; @@ -32,7 +32,7 @@ use crate::{ }, state::State, storage::{ - compact::compactor, + compact::{compactor, COMPACT_CHANNEL_SIZE}, index::Index, kvwatcher::KvWatcher, lease_store::LeaseCollection, @@ -112,7 +112,7 @@ impl XlineServer { /// Construct underlying storages, including `KvStore`, `LeaseStore`, `AuthStore` #[allow(clippy::type_complexity)] // it is easy to read #[inline] - fn construct_underlying_storages( + async fn construct_underlying_storages( &self, persistent: Arc, lease_collection: Arc, @@ -125,9 +125,9 @@ impl XlineServer { Arc>, Arc>, )> { - let (compact_task_tx, compact_task_rx) = unbounded_channel(); + let (compact_task_tx, compact_task_rx) = channel(COMPACT_CHANNEL_SIZE); let index = Arc::new(Index::new()); - let (kv_update_tx, kv_update_rx) = tokio::sync::mpsc::channel(CHANNEL_SIZE); + let (kv_update_tx, kv_update_rx) = channel(CHANNEL_SIZE); let kv_storage = Arc::new(KvStore::new( Arc::clone(&index), Arc::clone(&persistent), @@ -167,7 +167,7 @@ impl XlineServer { ); // lease storage must recover before kv storage lease_storage.recover()?; - kv_storage.recover()?; + kv_storage.recover().await?; auth_storage.recover()?; Ok((kv_storage, lease_storage, auth_storage, watcher)) } @@ -299,7 +299,8 @@ impl XlineServer { Arc::clone(&header_gen), Arc::clone(&auth_revision_gen), key_pair, - )?; + ) + .await?; let index_barrier = Arc::new(IndexBarrier::new()); let id_barrier = Arc::new(IdBarrier::new()); diff --git a/xline/src/storage/compact.rs b/xline/src/storage/compact/mod.rs similarity index 86% rename from xline/src/storage/compact.rs rename to xline/src/storage/compact/mod.rs index 03ac75200..4c335ad9e 100644 --- a/xline/src/storage/compact.rs +++ b/xline/src/storage/compact/mod.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use event_listener::Event; -use tokio::{sync::mpsc::UnboundedReceiver, time::sleep}; +use tokio::{sync::mpsc::Receiver, time::sleep}; use super::{ index::{Index, IndexOperate}, @@ -9,13 +9,16 @@ use super::{ KvStore, }; +/// compact task channel size +pub(crate) const COMPACT_CHANNEL_SIZE: usize = 32; + /// background compact executor pub(crate) async fn compactor( kv_store: Arc>, index: Arc, batch_limit: usize, interval: Duration, - mut compact_task_rx: UnboundedReceiver<(i64, Option>)>, + mut compact_task_rx: Receiver<(i64, Option>)>, ) where DB: StorageApi, { diff --git a/xline/src/storage/db.rs b/xline/src/storage/db.rs index 9486c00b6..d2294aaf8 100644 --- a/xline/src/storage/db.rs +++ b/xline/src/storage/db.rs @@ -9,7 +9,7 @@ use super::{ kv_store::KV_TABLE, lease_store::LEASE_TABLE, storage_api::StorageApi, - ExecuteError, Revision, + ExecuteError, }; use crate::{ rpc::{PbLease, Role, User}, @@ -133,8 +133,7 @@ impl StorageApi for DB { .collect::>(); for op in ops { let wop = match op { - WriteOp::PutKeyValue(rev, value) => { - let key = rev.encode_to_vec(); + WriteOp::PutKeyValue(key, value) => { WriteOperation::new_put(KV_TABLE, key, value.clone()) } WriteOp::PutAppliedIndex(index) => WriteOperation::new_put( @@ -198,7 +197,7 @@ impl StorageApi for DB { #[non_exhaustive] pub enum WriteOp<'a> { /// Put a key-value pair to kv table - PutKeyValue(Revision, Vec), + PutKeyValue(Vec, Vec), /// Put the applied index to meta table PutAppliedIndex(u64), /// Put a lease to lease table @@ -231,14 +230,15 @@ mod test { use test_macros::abort_on_panic; use super::*; + use crate::storage::Revision; #[tokio::test] #[abort_on_panic] async fn test_reset() -> Result<(), ExecuteError> { let data_dir = PathBuf::from("/tmp/test_reset"); let db = DB::open(&StorageConfig::RocksDB(data_dir.clone()))?; - let revision = Revision::new(1, 1); - let key = revision.encode_to_vec(); + let revision = Revision::new(1, 1).encode_to_vec(); + let key = revision.clone(); let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())]; db.flush_ops(ops)?; let res = db.get_value(KV_TABLE, &key)?; @@ -264,8 +264,8 @@ mod test { let snapshot_path = dir.join("snapshot"); let origin_db = DB::open(&StorageConfig::RocksDB(origin_db_path))?; - let revision = Revision::new(1, 1); - let key = revision.encode_to_vec(); + let revision = Revision::new(1, 1).encode_to_vec(); + let key: Vec = revision.clone(); let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())]; origin_db.flush_ops(ops)?; @@ -336,7 +336,7 @@ mod test { }; let role_bytes = role.encode_to_vec(); let write_ops = vec![ - WriteOp::PutKeyValue(Revision::new(1, 2), "value".into()), + WriteOp::PutKeyValue(Revision::new(1, 2).encode_to_vec(), "value".into()), WriteOp::PutAppliedIndex(5), WriteOp::PutLease(lease), WriteOp::PutAuthEnable(true), diff --git a/xline/src/storage/index.rs b/xline/src/storage/index.rs index 9ad5b67b0..997d66ce1 100644 --- a/xline/src/storage/index.rs +++ b/xline/src/storage/index.rs @@ -299,44 +299,28 @@ impl IndexOperate for Index { inner.index.iter_mut().for_each(|(key, revisions)| { if let Some(revision) = revisions.first() { if revision.mod_revision < at_rev { - match revisions.binary_search_by(|rev| rev.mod_revision.cmp(&at_rev)) { - Ok(idx) => { - let key_rev = revisions.get(idx).unwrap_or_else(|| { - unreachable!( - "{idx} is out of range, len of revisions is {}", - revisions.len() - ) - }); - let compact_revs = if key_rev.is_deleted() { - revisions.drain(..=idx) - } else { - revisions.drain(..idx) - }; - revs.extend(compact_revs.into_iter()); - } - Err(idx) => { - let compacted_last_idx = idx.overflow_sub(1); - let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| { - unreachable!( - "{idx} is out of range, len of revisions is {}", - revisions.len() - ) - }); - let compact_revs = if key_rev.is_deleted() { - revisions.drain(..=compacted_last_idx) - } else { - revisions.drain(..compacted_last_idx) - }; - revs.extend(compact_revs.into_iter()); - } - } + let pivot = revisions.partition_point(|rev| rev.mod_revision <= at_rev); + let compacted_last_idx = pivot.overflow_sub(1); + // There is at least 1 element in the first partition, so the key revision at `compacted_last_idx` + // must exist. + let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| { + unreachable!( + "Oops, the key revision at {compacted_last_idx} should not be None", + ) + }); + let compact_revs = if key_rev.is_deleted() { + revisions.drain(..=compacted_last_idx) + } else { + revisions.drain(..compacted_last_idx) + }; + revs.extend(compact_revs.into_iter()); + if revisions.is_empty() { del_keys.push(key.clone()); } } } }); - for key in del_keys { let _ignore = inner.index.remove(&key); } diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index 97cb9b6f7..61d6e06ca 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -56,7 +56,7 @@ where /// KV update sender kv_update_tx: mpsc::Sender<(i64, Vec)>, /// Compact task submit sender - compact_task_tx: mpsc::UnboundedSender<(i64, Option>)>, + compact_task_tx: mpsc::Sender<(i64, Option>)>, /// Lease collection lease_collection: Arc, } @@ -86,7 +86,7 @@ where } /// Recover data from persistent storage - pub(crate) fn recover(&self) -> Result<(), ExecuteError> { + pub(crate) async fn recover(&self) -> Result<(), ExecuteError> { let mut key_to_lease: HashMap, i64> = HashMap::new(); let kvs = self.db.get_all(KV_TABLE)?; @@ -131,7 +131,7 @@ where "compacted revision corruption, which ({compacted_revision}) must belong to the range [-1, {current_rev}]" ); self.update_compacted_revision(compacted_revision); - if let Err(e) = self.compact_task_tx.send((compacted_revision, None)) { + if let Err(e) = self.compact_task_tx.send((compacted_revision, None)).await { panic!("the compactor exited unexpectedly: {e:?}"); } } @@ -154,7 +154,7 @@ where db: Arc, header_gen: Arc, kv_update_tx: mpsc::Sender<(i64, Vec)>, - compact_task_tx: mpsc::UnboundedSender<(i64, Option>)>, + compact_task_tx: mpsc::Sender<(i64, Option>)>, lease_collection: Arc, ) -> Self { Self { @@ -655,12 +655,13 @@ where if let Err(e) = self .compact_task_tx .send((revision, Some(Arc::clone(&event)))) + .await { panic!("the compactor exited unexpectedly: {e:?}"); } event.listen().await; } else { - if let Err(e) = self.compact_task_tx.send((revision, None)) { + if let Err(e) = self.compact_task_tx.send((revision, None)).await { panic!("the compactor exited unexpectedly: {e:?}"); } } @@ -747,7 +748,7 @@ where .unwrap_or_else(|e| panic!("unexpected error from lease Attach: {e}")); } ops.push(WriteOp::PutKeyValue( - new_rev.as_revision(), + new_rev.as_revision().encode_to_vec(), kv.encode_to_vec(), )); let event = Event { @@ -793,7 +794,7 @@ where ..KeyValue::default() }; let value = del_kv.encode_to_vec(); - WriteOp::PutKeyValue(new_rev, value) + WriteOp::PutKeyValue(new_rev.encode_to_vec(), value) }) .collect() } @@ -850,7 +851,11 @@ mod test { use crate::{ revision_number::RevisionNumberGenerator, rpc::RequestOp, - storage::{compact::compactor, db::DB, kvwatcher::KvWatcher}, + storage::{ + compact::{compactor, COMPACT_CHANNEL_SIZE}, + db::DB, + kvwatcher::KvWatcher, + }, }; const CHANNEL_SIZE: usize = 1024; @@ -887,7 +892,7 @@ mod test { } fn init_empty_store(db: Arc) -> Arc> { - let (compact_tx, compact_rx) = mpsc::unbounded_channel(); + let (compact_tx, compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE); let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE); let lease_collection = Arc::new(LeaseCollection::new(0)); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); @@ -1079,7 +1084,7 @@ mod test { assert_eq!(res.kvs.len(), 0); assert_eq!(new_store.compacted_revision(), -1); - new_store.recover()?; + new_store.recover().await?; let res = new_store.handle_range_request(&range_req)?; assert_eq!(res.kvs.len(), 1); diff --git a/xline/src/storage/kvwatcher.rs b/xline/src/storage/kvwatcher.rs index 368820986..0f373c9db 100644 --- a/xline/src/storage/kvwatcher.rs +++ b/xline/src/storage/kvwatcher.rs @@ -551,11 +551,14 @@ mod test { use crate::{ header_gen::HeaderGenerator, rpc::{PutRequest, RequestWithToken}, - storage::{db::DB, index::Index, lease_store::LeaseCollection, KvStore}, + storage::{ + compact::COMPACT_CHANNEL_SIZE, db::DB, index::Index, lease_store::LeaseCollection, + KvStore, + }, }; fn init_empty_store() -> (Arc>, Arc, Arc>) { - let (compact_tx, _compact_rx) = mpsc::unbounded_channel(); + let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE); let db = DB::open(&StorageConfig::Memory).unwrap(); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); let index = Arc::new(Index::new()); diff --git a/xline/src/storage/revision.rs b/xline/src/storage/revision.rs index 652951412..c47d061da 100644 --- a/xline/src/storage/revision.rs +++ b/xline/src/storage/revision.rs @@ -18,7 +18,7 @@ pub(super) struct KeyRevision { /// Revision #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct Revision { +pub(crate) struct Revision { /// Main revision revision: i64, /// Sub revision in one transaction or range deletion