diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index a4b663689..de40466c5 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -228,7 +228,7 @@ impl XlineServer { self.task_manager.spawn(TaskName::CompactBg, |n| { compact_bg_task( Arc::clone(&kv_storage), - Arc::clone(&index), + index, *self.compact_config.compact_batch_size(), *self.compact_config.compact_sleep_interval(), compact_task_rx, @@ -239,7 +239,6 @@ impl XlineServer { Arc::clone(&lease_collection), Arc::clone(&header_gen), Arc::clone(&db), - index, kv_update_tx, *self.cluster_config.is_leader(), )); diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index 7aab4a111..a6ff9c26a 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -30,7 +30,7 @@ use xlineapi::{ pub(crate) use self::{lease::Lease, lease_collection::LeaseCollection}; use super::{ db::{WriteOp, DB}, - index::{Index, IndexOperate}, + index::IndexOperate, storage_api::XlineStorageOps, }; use crate::{ @@ -54,9 +54,6 @@ pub(crate) struct LeaseStore { lease_collection: Arc, /// Db to store lease db: Arc, - #[allow(unused)] // used in tests - /// Key to revision index - index: Arc, /// Header generator header_gen: Arc, /// KV update sender @@ -75,14 +72,12 @@ impl LeaseStore { lease_collection: Arc, header_gen: Arc, db: Arc, - index: Arc, kv_update_tx: flume::Sender<(i64, Vec)>, is_leader: bool, ) -> Self { Self { lease_collection, db, - index, header_gen, kv_update_tx, is_primary: AtomicBool::new(is_leader), @@ -394,18 +389,23 @@ mod test { use super::*; use crate::{ revision_number::RevisionNumberGenerator, - storage::{db::DB, storage_api::XlineStorageOps}, + storage::{ + db::DB, + index::{Index, IndexState}, + storage_api::XlineStorageOps, + }, }; #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn test_lease_storage() -> Result<(), Box> { let db = DB::open(&EngineConfig::Memory)?; + let index = Index::new(); let (lease_store, rev_gen) = init_store(db); let rev_gen_state = rev_gen.state(); let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); - let _ignore1 = exe_and_sync_req(&lease_store, &req1, &rev_gen_state)?; + let _ignore1 = exe_and_sync_req(&lease_store, index.state(), &req1, &rev_gen_state)?; let lo = lease_store.look_up(1).unwrap(); assert_eq!(lo.id(), 1); @@ -419,7 +419,7 @@ mod test { lease_store.lease_collection.detach(1, "key".as_bytes())?; let req2 = RequestWrapper::from(LeaseRevokeRequest { id: 1 }); - let _ignore2 = exe_and_sync_req(&lease_store, &req2, &rev_gen_state)?; + let _ignore2 = exe_and_sync_req(&lease_store, index.state(), &req2, &rev_gen_state)?; assert!(lease_store.look_up(1).is_none()); assert!(lease_store.leases().is_empty()); @@ -427,9 +427,9 @@ mod test { let req4 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 4 }); let req5 = RequestWrapper::from(LeaseRevokeRequest { id: 3 }); let req6 = RequestWrapper::from(LeaseLeasesRequest {}); - let _ignore3 = exe_and_sync_req(&lease_store, &req3, &rev_gen_state)?; - let _ignore4 = exe_and_sync_req(&lease_store, &req4, &rev_gen_state)?; - let resp_1 = exe_and_sync_req(&lease_store, &req6, &rev_gen_state)?; + let _ignore3 = exe_and_sync_req(&lease_store, index.state(), &req3, &rev_gen_state)?; + let _ignore4 = exe_and_sync_req(&lease_store, index.state(), &req4, &rev_gen_state)?; + let resp_1 = exe_and_sync_req(&lease_store, index.state(), &req6, &rev_gen_state)?; let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else { panic!("wrong response type: {resp_1:?}"); @@ -437,8 +437,8 @@ mod test { assert_eq!(leases_1.leases[0].id, 3); assert_eq!(leases_1.leases[1].id, 4); - let _ignore5 = exe_and_sync_req(&lease_store, &req5, &rev_gen_state)?; - let resp_2 = exe_and_sync_req(&lease_store, &req6, &rev_gen_state)?; + let _ignore5 = exe_and_sync_req(&lease_store, index.state(), &req5, &rev_gen_state)?; + let resp_2 = exe_and_sync_req(&lease_store, index.state(), &req6, &rev_gen_state)?; let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else { panic!("wrong response type: {resp_2:?}"); }; @@ -505,11 +505,12 @@ mod test { #[abort_on_panic] async fn test_recover() -> Result<(), ExecuteError> { let db = DB::open(&EngineConfig::Memory)?; + let index = Index::new(); let (store, rev_gen) = init_store(Arc::clone(&db)); let rev_gen_state = rev_gen.state(); let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); - let _ignore1 = exe_and_sync_req(&store, &req1, &rev_gen_state)?; + let _ignore1 = exe_and_sync_req(&store, index.state(), &req1, &rev_gen_state)?; store.lease_collection.attach(1, "key".into())?; let (new_store, _) = init_store(db); @@ -531,21 +532,20 @@ mod test { let lease_collection = Arc::new(LeaseCollection::new(0)); let (kv_update_tx, _) = flume::bounded(1); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); - let index = Arc::new(Index::new()); ( - LeaseStore::new(lease_collection, header_gen, db, index, kv_update_tx, true), + LeaseStore::new(lease_collection, header_gen, db, kv_update_tx, true), RevisionNumberGenerator::new(1), ) } fn exe_and_sync_req( ls: &LeaseStore, + index: IndexState, req: &RequestWrapper, rev_gen: &RevisionNumberGeneratorState<'_>, ) -> Result { let cmd_res = ls.execute(req)?; let txn = ls.db.transaction(); - let index = ls.index.state(); let (_ignore, _ops) = ls.after_sync(req, rev_gen, &txn, &index)?; txn.commit() .map_err(|e| ExecuteError::DbError(e.to_string()))?;