diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index fa468c9ef..032e95fe9 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -27,8 +27,11 @@ use crate::{ PutResponse, RangeRequest, RangeResponse, Request, RequestWithToken, RequestWrapper, ResponseWrapper, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse, }, - server::command::{CommandResponse, KeyRange, SyncResponse}, - storage::{db::WriteOp, ExecuteError}, + server::command::{CommandResponse, KeyRange, SyncResponse, META_TABLE}, + storage::{ + db::{WriteOp, COMPACT_REVISION}, + ExecuteError, + }, }; /// KV table name @@ -116,8 +119,18 @@ where self.attach(lease_id, key)?; } - // compact Lock free - + if let Some(revision_bytes) = self.db.get_value(META_TABLE, COMPACT_REVISION)? { + let compacted_revision = + i64::from_le_bytes(revision_bytes.try_into().map_err(|e| { + ExecuteError::DbError(format!( + "cannot decode compacted revision from META_TABLE: {e:?}" + )) + })?); + self.update_compacted_revision(compacted_revision); + if let Err(e) = self.compact_task_tx.send((compacted_revision, None)) { + panic!("the compactor exited unexpectedly: {e:?}"); + } + } Ok(()) } @@ -157,12 +170,12 @@ where self.revision.get() } - /// Get compacted revision of KV stre + /// Get compacted revision of KV store pub(crate) fn compacted_revision(&self) -> i64 { self.compacted_rev.load(Relaxed) } - #[cfg(test)] + /// Update compacted revision of KV store pub(crate) fn update_compacted_revision(&self, revision: i64) { self.compacted_rev.store(revision, Relaxed); } @@ -591,7 +604,7 @@ where target_revision > self.compacted_revision(), "required revision should not be compacted" ); - self.compacted_rev.store(target_revision, Relaxed); + self.update_compacted_revision(target_revision); CompactionResponse { header: Some(self.header_gen.gen_header_without_revision()), } @@ -831,7 +844,7 @@ mod test { use crate::{ revision_number::RevisionNumberGenerator, rpc::RequestOp, - storage::{db::DB, kvwatcher::KvWatcher}, + storage::{compact::compactor, db::DB, kvwatcher::KvWatcher}, }; const CHANNEL_SIZE: usize = 1024; @@ -850,8 +863,8 @@ mod test { db: Arc, ) -> Result<(Arc>, RevisionNumberGenerator), ExecuteError> { let store = init_empty_store(db); - let keys = vec!["a", "b", "c", "d", "e"]; - let vals = vec!["a", "b", "c", "d", "e"]; + let keys = vec!["a", "b", "c", "d", "e", "z", "z", "z"]; + let vals = vec!["a", "b", "c", "d", "e", "z1", "z2", "z3"]; let revision = RevisionNumberGenerator::default(); for (key, val) in keys.into_iter().zip(vals.into_iter()) { let req = RequestWithToken::new( @@ -868,13 +881,13 @@ mod test { } fn init_empty_store(db: Arc) -> Arc> { - let (compact_tx, _compact_rx) = mpsc::unbounded_channel(); + let (compact_tx, compact_rx) = mpsc::unbounded_channel(); let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE); let lease_collection = Arc::new(LeaseCollection::new(0)); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); let index = Arc::new(Index::new()); let storage = Arc::new(KvStore::new( - index, + Arc::clone(&index), db, header_gen, kv_update_tx, @@ -888,6 +901,13 @@ mod test { shutdown_trigger, Duration::from_millis(10), ); + let _compactor = tokio::spawn(compactor( + Arc::clone(&storage), + index, + 1000, + Duration::from_millis(10), + compact_rx, + )); storage } @@ -923,7 +943,7 @@ mod test { ..Default::default() }; let response = store.handle_range_request(&request)?; - assert_eq!(response.kvs.len(), 5); + assert_eq!(response.kvs.len(), 6); for kv in response.kvs { assert!(kv.value.is_empty()); } @@ -964,7 +984,7 @@ mod test { ..Default::default() }; let response = store.handle_range_request(&request)?; - assert_eq!(response.count, 5); + assert_eq!(response.count, 6); assert_eq!(response.kvs.len(), 2); assert_eq!(response.kvs[0].create_revision, 2); assert_eq!(response.kvs[1].create_revision, 3); @@ -976,8 +996,9 @@ mod test { async fn test_range_sort() -> Result<(), ExecuteError> { let db = DB::open(&StorageConfig::Memory)?; let (store, _rev) = init_store(db).await?; - let keys = ["a", "b", "c", "d", "e"]; - let reversed_keys = ["e", "d", "c", "b", "a"]; + let keys = ["a", "b", "c", "d", "e", "z"]; + let reversed_keys = ["z", "e", "d", "c", "b", "a"]; + let version_keys = ["z", "a", "b", "c", "d", "e"]; for order in [SortOrder::Ascend, SortOrder::Descend, SortOrder::None] { for target in [ @@ -987,30 +1008,43 @@ mod test { SortTarget::Value, ] { let response = store.handle_range_request(&sort_req(order, target))?; - assert_eq!(response.count, 5); - assert_eq!(response.kvs.len(), 5); - let expected = match order { + assert_eq!(response.count, 6); + assert_eq!(response.kvs.len(), 6); + let expected: [&str; 6] = match order { SortOrder::Descend => reversed_keys, SortOrder::Ascend | SortOrder::None => keys, }; - let is_identical = response - .kvs - .iter() - .zip(expected.iter()) - .all(|(kv, want)| kv.key == want.as_bytes()); - assert!(is_identical); + for (kv, want) in response.kvs.iter().zip(expected.iter()) { + assert_eq!( + kv.key, + want.as_bytes(), + "order: {:?}, target: {:?}, key {:?}, want {:?}", + order, + target, + kv.key, + want.as_bytes(), + ); + } } } for order in [SortOrder::Ascend, SortOrder::Descend, SortOrder::None] { let response = store.handle_range_request(&sort_req(order, SortTarget::Version))?; - assert_eq!(response.count, 5); - assert_eq!(response.kvs.len(), 5); - let is_identical = response - .kvs - .iter() - .zip(keys.iter()) - .all(|(kv, want)| kv.key == want.as_bytes()); - assert!(is_identical); + assert_eq!(response.count, 6); + assert_eq!(response.kvs.len(), 6); + let expected = match order { + SortOrder::Ascend | SortOrder::None => keys, + SortOrder::Descend => version_keys, + }; + for (kv, want) in response.kvs.iter().zip(expected.iter()) { + assert_eq!( + kv.key, + want.as_bytes(), + "order: {:?}, key {:?}, want {:?}", + order, + kv.key, + want.as_bytes(), + ); + } } Ok(()) } @@ -1018,7 +1052,10 @@ mod test { #[tokio::test] async fn test_recover() -> Result<(), ExecuteError> { let db = DB::open(&StorageConfig::Memory)?; - let _store = init_store(Arc::clone(&db)).await?; + let ops = vec![WriteOp::PutCompactRevision(8)]; + db.flush_ops(ops)?; + let (store, _rev_gen) = init_store(Arc::clone(&db)).await?; + assert_eq!(store.index.get_from_rev(b"z", b"", 5).len(), 3); let new_store = init_empty_store(db); @@ -1029,13 +1066,16 @@ mod test { }; let res = new_store.handle_range_request(&range_req)?; assert_eq!(res.kvs.len(), 0); + assert_eq!(new_store.compacted_revision(), -1); new_store.recover()?; let res = new_store.handle_range_request(&range_req)?; assert_eq!(res.kvs.len(), 1); assert_eq!(res.kvs[0].key, b"a"); - + assert_eq!(new_store.compacted_revision(), 8); + tokio::time::sleep(Duration::from_millis(500)).await; + assert_eq!(new_store.index.get_from_rev(b"z", b"", 5).len(), 2); Ok(()) }