diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index 510614ee1..7fe28dddc 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -368,10 +368,8 @@ where } }; ops.append(&mut wr_ops); - let key_revisions = self.persistent.flush_ops(ops)?; - if !key_revisions.is_empty() { - self.kv_storage.insert_index(key_revisions); - } + println!("flush ops: {ops:?}"); + let _key_revisions = self.persistent.flush_ops(ops)?; self.lease_storage.mark_lease_synced(wrapper); if !quota_enough { if let Some(alarmer) = self.alarmer.read().clone() { diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index 67a6d50a7..b0e78d884 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -468,8 +468,7 @@ mod test { ..Default::default() }); let (_sync_res, ops) = store.after_sync(&req, revision).await.unwrap(); - let key_revisions = db.flush_ops(ops).unwrap(); - store.insert_index(key_revisions); + let _key_revisions = db.flush_ops(ops).unwrap(); } #[tokio::test] diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index c7a47bedc..b8e21d168 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -21,7 +21,7 @@ use super::{ db::SCHEDULED_COMPACT_REVISION, index::{Index, IndexOperate}, lease_store::LeaseCollection, - revision::{KeyRevision, Revision}, + revision::Revision, storage_api::StorageApi, }; use crate::{ @@ -794,7 +794,7 @@ where sub_revision: i64, ) -> Result<(Vec, Vec), ExecuteError> { let mut ops = Vec::new(); - let (new_rev, _prev_rev) = + let (new_rev, prev_rev) = self.inner .index .register_revision(req.key.clone(), revision, sub_revision); @@ -807,7 +807,8 @@ where lease: req.lease, }; if req.ignore_lease || req.ignore_value { - let prev_kv = self.inner.get_range(&req.key, &[], 0)?.pop(); + let rev = prev_rev.ok_or(ExecuteError::KeyNotFound)?.mod_revision; + let prev_kv = self.inner.get_range(&req.key, &[], rev)?.pop(); let prev = prev_kv.as_ref().ok_or(ExecuteError::KeyNotFound)?; if req.ignore_lease { kv.lease = prev.lease; @@ -913,12 +914,6 @@ where let events = Self::new_deletion_events(revision, keys); (ops, events) } - - /// Insert the given pairs (key, `KeyRevision`) into the index - #[inline] - pub(crate) fn insert_index(&self, key_revisions: Vec<(Vec, KeyRevision)>) { - self.inner.index.insert(key_revisions); - } } #[cfg(test)] @@ -1040,8 +1035,7 @@ mod test { revision: i64, ) -> Result<(), ExecuteError> { let (_sync_res, ops) = store.after_sync(request, revision).await?; - let key_revs = store.inner.db.flush_ops(ops)?; - store.insert_index(key_revs); + let _key_revs = store.inner.db.flush_ops(ops)?; Ok(()) } diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index bdb65737c..dcbc4718c 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -785,7 +785,6 @@ mod test { ..Default::default() }); let (_sync_res, ops) = store.after_sync(&req, revision).await.unwrap(); - let key_revisions = db.flush_ops(ops).unwrap(); - store.insert_index(key_revisions); + let _key_revisions = db.flush_ops(ops).unwrap(); } }