Skip to content

Commit

Permalink
chore: remove unused index from lease_store
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds authored and Phoenix500526 committed Aug 26, 2024
1 parent baeb975 commit 2553eec
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
3 changes: 1 addition & 2 deletions crates/xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl XlineServer {
self.task_manager.spawn(TaskName::CompactBg, |n| {
compact_bg_task(
Arc::clone(&kv_storage),
Arc::clone(&index),
index,
*self.compact_config.compact_batch_size(),
*self.compact_config.compact_sleep_interval(),
compact_task_rx,
Expand All @@ -239,7 +239,6 @@ impl XlineServer {
Arc::clone(&lease_collection),
Arc::clone(&header_gen),
Arc::clone(&db),
index,
kv_update_tx,
*self.cluster_config.is_leader(),
));
Expand Down
36 changes: 18 additions & 18 deletions crates/xline/src/storage/lease_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use xlineapi::{
pub(crate) use self::{lease::Lease, lease_collection::LeaseCollection};
use super::{
db::{WriteOp, DB},
index::{Index, IndexOperate},
index::IndexOperate,
storage_api::XlineStorageOps,
};
use crate::{
Expand All @@ -54,9 +54,6 @@ pub(crate) struct LeaseStore {
lease_collection: Arc<LeaseCollection>,
/// Db to store lease
db: Arc<DB>,
#[allow(unused)] // used in tests
/// Key to revision index
index: Arc<Index>,
/// Header generator
header_gen: Arc<HeaderGenerator>,
/// KV update sender
Expand All @@ -75,14 +72,12 @@ impl LeaseStore {
lease_collection: Arc<LeaseCollection>,
header_gen: Arc<HeaderGenerator>,
db: Arc<DB>,
index: Arc<Index>,
kv_update_tx: flume::Sender<(i64, Vec<Event>)>,
is_leader: bool,
) -> Self {
Self {
lease_collection,
db,
index,
header_gen,
kv_update_tx,
is_primary: AtomicBool::new(is_leader),
Expand Down Expand Up @@ -394,18 +389,23 @@ mod test {
use super::*;
use crate::{
revision_number::RevisionNumberGenerator,
storage::{db::DB, storage_api::XlineStorageOps},
storage::{
db::DB,
index::{Index, IndexState},
storage_api::XlineStorageOps,
},
};

#[tokio::test(flavor = "multi_thread")]
#[abort_on_panic]
async fn test_lease_storage() -> Result<(), Box<dyn Error>> {
let db = DB::open(&EngineConfig::Memory)?;
let index = Index::new();
let (lease_store, rev_gen) = init_store(db);
let rev_gen_state = rev_gen.state();

let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 });
let _ignore1 = exe_and_sync_req(&lease_store, &req1, &rev_gen_state)?;
let _ignore1 = exe_and_sync_req(&lease_store, index.state(), &req1, &rev_gen_state)?;

let lo = lease_store.look_up(1).unwrap();
assert_eq!(lo.id(), 1);
Expand All @@ -419,26 +419,26 @@ mod test {
lease_store.lease_collection.detach(1, "key".as_bytes())?;

let req2 = RequestWrapper::from(LeaseRevokeRequest { id: 1 });
let _ignore2 = exe_and_sync_req(&lease_store, &req2, &rev_gen_state)?;
let _ignore2 = exe_and_sync_req(&lease_store, index.state(), &req2, &rev_gen_state)?;
assert!(lease_store.look_up(1).is_none());
assert!(lease_store.leases().is_empty());

let req3 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 3 });
let req4 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 4 });
let req5 = RequestWrapper::from(LeaseRevokeRequest { id: 3 });
let req6 = RequestWrapper::from(LeaseLeasesRequest {});
let _ignore3 = exe_and_sync_req(&lease_store, &req3, &rev_gen_state)?;
let _ignore4 = exe_and_sync_req(&lease_store, &req4, &rev_gen_state)?;
let resp_1 = exe_and_sync_req(&lease_store, &req6, &rev_gen_state)?;
let _ignore3 = exe_and_sync_req(&lease_store, index.state(), &req3, &rev_gen_state)?;
let _ignore4 = exe_and_sync_req(&lease_store, index.state(), &req4, &rev_gen_state)?;
let resp_1 = exe_and_sync_req(&lease_store, index.state(), &req6, &rev_gen_state)?;

let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else {
panic!("wrong response type: {resp_1:?}");
};
assert_eq!(leases_1.leases[0].id, 3);
assert_eq!(leases_1.leases[1].id, 4);

let _ignore5 = exe_and_sync_req(&lease_store, &req5, &rev_gen_state)?;
let resp_2 = exe_and_sync_req(&lease_store, &req6, &rev_gen_state)?;
let _ignore5 = exe_and_sync_req(&lease_store, index.state(), &req5, &rev_gen_state)?;
let resp_2 = exe_and_sync_req(&lease_store, index.state(), &req6, &rev_gen_state)?;
let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else {
panic!("wrong response type: {resp_2:?}");
};
Expand Down Expand Up @@ -505,11 +505,12 @@ mod test {
#[abort_on_panic]
async fn test_recover() -> Result<(), ExecuteError> {
let db = DB::open(&EngineConfig::Memory)?;
let index = Index::new();
let (store, rev_gen) = init_store(Arc::clone(&db));
let rev_gen_state = rev_gen.state();

let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 });
let _ignore1 = exe_and_sync_req(&store, &req1, &rev_gen_state)?;
let _ignore1 = exe_and_sync_req(&store, index.state(), &req1, &rev_gen_state)?;
store.lease_collection.attach(1, "key".into())?;

let (new_store, _) = init_store(db);
Expand All @@ -531,21 +532,20 @@ mod test {
let lease_collection = Arc::new(LeaseCollection::new(0));
let (kv_update_tx, _) = flume::bounded(1);
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let index = Arc::new(Index::new());
(
LeaseStore::new(lease_collection, header_gen, db, index, kv_update_tx, true),
LeaseStore::new(lease_collection, header_gen, db, kv_update_tx, true),
RevisionNumberGenerator::new(1),
)
}

fn exe_and_sync_req(
ls: &LeaseStore,
index: IndexState,
req: &RequestWrapper,
rev_gen: &RevisionNumberGeneratorState<'_>,
) -> Result<ResponseWrapper, ExecuteError> {
let cmd_res = ls.execute(req)?;
let txn = ls.db.transaction();
let index = ls.index.state();
let (_ignore, _ops) = ls.after_sync(req, rev_gen, &txn, &index)?;
txn.commit()
.map_err(|e| ExecuteError::DbError(e.to_string()))?;
Expand Down

0 comments on commit 2553eec

Please sign in to comment.