diff --git a/crates/curp-external-api/src/cmd.rs b/crates/curp-external-api/src/cmd.rs index 0b38d4b57..c53738399 100644 --- a/crates/curp-external-api/src/cmd.rs +++ b/crates/curp-external-api/src/cmd.rs @@ -193,6 +193,7 @@ impl From for PbSerializeError { } } +#[allow(clippy::module_name_repetitions)] /// After sync command type #[derive(Debug)] pub struct AfterSyncCmd<'a, C> { @@ -204,16 +205,21 @@ pub struct AfterSyncCmd<'a, C> { impl<'a, C> AfterSyncCmd<'a, C> { /// Creates a new `AfterSyncCmd` + #[inline] pub fn new(cmd: &'a C, to_exectue: bool) -> Self { Self { cmd, to_exectue } } /// Gets the command + #[inline] + #[must_use] pub fn cmd(&self) -> &'a C { self.cmd } /// Convert self into parts + #[inline] + #[must_use] pub fn into_parts(self) -> (&'a C, bool) { (self.cmd, self.to_exectue) } diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index bb73e6a0a..a3f2b3357 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -5,6 +5,7 @@ use std::{fmt::Debug, iter, sync::Arc}; use async_trait::async_trait; use clippy_utilities::NumericCast; +use curp_external_api::cmd::AfterSyncCmd; #[cfg(test)] use mockall::automock; use tokio::sync::oneshot; @@ -145,6 +146,7 @@ async fn worker_exe, RC: RoleChange>( } /// Cmd worker after sync handler +#[allow(clippy::too_many_lines)] // TODO: split this to multiple fns async fn worker_as, RC: RoleChange>( entry: Arc>, prepare: Option, @@ -155,10 +157,20 @@ async fn worker_as, RC: RoleChange>( let id = curp.id(); let success = match entry.entry_data { EntryData::Command(ref cmd) => { - let Some(prepare) = prepare else { + let Some(_prepare) = prepare else { unreachable!("prepare should always be Some(_) when entry is a command"); }; - let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await; + let asr = ce + .after_sync(vec![AfterSyncCmd::new(cmd.as_ref(), false)], entry.index) + .await + .map(|res| { + #[allow(clippy::expect_used)] + let (asr, _) = res + .into_iter() + .next() + .expect("the asr should always be Some"); + asr + }); let asr_ok = asr.is_ok(); cb.write().insert_asr(entry.propose_id, asr); sp.lock() @@ -328,7 +340,8 @@ pub(crate) trait CEEventTxApi: Send + Sync + 'static { /// Send cmd to background cmd worker for speculative execution fn send_sp_exe(&self, entry: Arc>); - /// Send after sync event to the background cmd worker so that after sync can be called + /// Send after sync event to the background cmd worker so that after sync + /// can be called fn send_after_sync(&self, entry: Arc>); /// Send reset @@ -398,7 +411,8 @@ impl TaskRxApi for TaskRx { } } -/// Run cmd execute workers. Each cmd execute worker will continually fetch task to perform from `task_rx`. +/// Run cmd execute workers. Each cmd execute worker will continually fetch task +/// to perform from `task_rx`. pub(super) fn start_cmd_workers, RC: RoleChange>( cmd_executor: Arc, curp: Arc>, @@ -476,7 +490,8 @@ mod tests { task_manager.shutdown(true).await; } - // When the execution takes more time than sync, `as` should be called after exe has finished + // When the execution takes more time than sync, `as` should be called after exe + // has finished #[traced_test] #[tokio::test] #[abort_on_panic] @@ -524,7 +539,8 @@ mod tests { task_manager.shutdown(true).await; } - // When the execution takes more time than sync and fails, after sync should not be called + // When the execution takes more time than sync and fails, after sync should not + // be called #[traced_test] #[tokio::test] #[abort_on_panic] @@ -663,7 +679,8 @@ mod tests { task_manager.shutdown(true).await; } - // If cmd1 and cmd2 conflict, order will be (cmd1 exe) -> (cmd1 as) -> (cmd2 exe) -> (cmd2 as) + // If cmd1 and cmd2 conflict, order will be (cmd1 exe) -> (cmd1 as) -> (cmd2 + // exe) -> (cmd2 as) #[traced_test] #[tokio::test] #[abort_on_panic] diff --git a/crates/curp/src/server/storage/wal/codec.rs b/crates/curp/src/server/storage/wal/codec.rs index fc93801c3..33c7f4226 100644 --- a/crates/curp/src/server/storage/wal/codec.rs +++ b/crates/curp/src/server/storage/wal/codec.rs @@ -295,7 +295,10 @@ impl FrameEncoder for DataFrame<'_, C> where C: Serialize, { - #[allow(clippy::arithmetic_side_effects)] // The integer shift is safe + #[allow( + clippy::arithmetic_side_effects, // The integer shift is safe + clippy::indexing_slicing // The slicing is checked + )] fn encode(&self) -> Vec { match *self { DataFrame::Entry(ref entry) => { diff --git a/crates/xline/src/revision_number.rs b/crates/xline/src/revision_number.rs index eafa77d67..fb5e4287f 100644 --- a/crates/xline/src/revision_number.rs +++ b/crates/xline/src/revision_number.rs @@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicI64, Ordering}; /// Revision number #[derive(Debug)] pub(crate) struct RevisionNumberGenerator { + /// The current revision number current: AtomicI64, } @@ -16,12 +17,12 @@ impl RevisionNumberGenerator { /// Get the current revision number pub(crate) fn get(&self) -> i64 { - self.current.load(Ordering::Acquire) + self.current.load(Ordering::Relaxed) } /// Set the revision number pub(crate) fn set(&self, rev: i64) { - self.current.store(rev, Ordering::Release); + self.current.store(rev, Ordering::Relaxed); } /// Gets a temporary state @@ -42,24 +43,26 @@ impl Default for RevisionNumberGenerator { /// Revision generator with temporary state pub(crate) struct RevisionNumberGeneratorState<'a> { + /// The current revision number current: &'a AtomicI64, + /// Next revision number next: AtomicI64, } impl RevisionNumberGeneratorState<'_> { /// Get the current revision number pub(crate) fn get(&self) -> i64 { - self.next.load(Ordering::Acquire) + self.next.load(Ordering::Relaxed) } /// Increases the next revision number pub(crate) fn next(&self) -> i64 { - self.next.fetch_add(1, Ordering::Release).wrapping_add(1) + self.next.fetch_add(1, Ordering::Relaxed).wrapping_add(1) } /// Commit the revision number pub(crate) fn commit(&self) { self.current - .store(self.next.load(Ordering::Acquire), Ordering::Release) + .store(self.next.load(Ordering::Relaxed), Ordering::Relaxed); } } diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index c90f7010a..cf855385d 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -19,9 +19,11 @@ use xlineapi::{ }; use crate::{ + revision_number::RevisionNumberGeneratorState, rpc::{RequestBackend, RequestWrapper}, storage::{ db::{WriteOp, DB}, + index::IndexOperate, storage_api::XlineStorageOps, AlarmStore, AuthStore, KvStore, LeaseStore, }, @@ -265,10 +267,85 @@ impl CommandExecutor { _ => Ok(()), } } + + /// After sync KV commands + async fn after_sync_kv( + &self, + wrapper: &RequestWrapper, + txn_db: &T, + index: &(dyn IndexOperate + Send + Sync), + revision_gen: &RevisionNumberGeneratorState<'_>, + to_execute: bool, + ) -> Result< + ( + ::ASR, + Option<::ER>, + ), + ExecuteError, + > + where + T: XlineStorageOps + TransactionApi, + { + let (asr, er) = self + .kv_storage + .after_sync(wrapper, txn_db, index, revision_gen, to_execute) + .await?; + Ok((asr, er)) + } + + /// After sync other type of commands + async fn after_sync_others( + &self, + wrapper: &RequestWrapper, + txn_db: &T, + general_revision: &RevisionNumberGeneratorState<'_>, + auth_revision: &RevisionNumberGeneratorState<'_>, + to_execute: bool, + ) -> Result< + ( + ::ASR, + Option<::ER>, + ), + ExecuteError, + > + where + T: XlineStorageOps + TransactionApi, + { + let er = to_execute + .then(|| match wrapper.backend() { + RequestBackend::Auth => self.auth_storage.execute(wrapper), + RequestBackend::Lease => self.lease_storage.execute(wrapper), + RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)), + RequestBackend::Kv => unreachable!("Should not execute kv commands"), + }) + .transpose()?; + + let (asr, wr_ops) = match wrapper.backend() { + RequestBackend::Auth => self.auth_storage.after_sync(wrapper, auth_revision)?, + RequestBackend::Lease => { + self.lease_storage + .after_sync(wrapper, general_revision) + .await? + } + RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, general_revision), + RequestBackend::Kv => unreachable!("Should not sync kv commands"), + }; + + txn_db.write_ops(wr_ops)?; + + Ok((asr, er)) + } } #[async_trait::async_trait] impl CurpCommandExecutor for CommandExecutor { + fn prepare( + &self, + _cmd: &Command, + ) -> Result<::PR, ::Error> { + Ok(-1) + } + async fn execute( &self, cmd: &Command, @@ -301,22 +378,18 @@ impl CurpCommandExecutor for CommandExecutor { } cmds.iter() .map(AfterSyncCmd::cmd) - .map(|c| self.check_alarm(c)) - .collect::>()?; + .try_for_each(|c| self.check_alarm(c))?; let quota_enough = cmds .iter() .map(AfterSyncCmd::cmd) .all(|c| self.quota_checker.check(c)); - cmds.iter() - .map(AfterSyncCmd::cmd) - .map(|c| { - self.auth_storage - .check_permission(c.request(), c.auth_info()) - }) - .collect::>()?; + cmds.iter().map(AfterSyncCmd::cmd).try_for_each(|c| { + self.auth_storage + .check_permission(c.request(), c.auth_info()) + })?; let index = self.kv_storage.index(); - let mut index_state = index.state(); + let index_state = index.state(); let general_revision_gen = self.kv_storage.revision_gen(); let auth_revision_gen = self.auth_storage.revision_gen(); let general_revision_state = general_revision_gen.state(); @@ -328,53 +401,41 @@ impl CurpCommandExecutor for CommandExecutor { let mut resps = Vec::with_capacity(cmds.len()); for (cmd, to_execute) in cmds.into_iter().map(AfterSyncCmd::into_parts) { let wrapper = cmd.request(); - let er = to_execute - .then(|| match wrapper.backend() { - RequestBackend::Kv => self - .kv_storage - .execute(wrapper, Some((&txn_db, &mut index_state))), - RequestBackend::Auth => self.auth_storage.execute(wrapper), - RequestBackend::Lease => self.lease_storage.execute(wrapper), - RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)), - }) - .transpose()?; - tracing::info!("sync cmd: {cmd:?}"); - if to_execute { - tracing::info!("execute in after sync for: {cmd:?}"); - } - let (asr, wr_ops) = match wrapper.backend() { - RequestBackend::Kv => ( - self.kv_storage - .after_sync(wrapper, &txn_db, &index_state, &general_revision_state) - .await?, - vec![], - ), - RequestBackend::Auth => self - .auth_storage - .after_sync(wrapper, &auth_revision_state)?, - RequestBackend::Lease => { - self.lease_storage - .after_sync(wrapper, &general_revision_state) - .await? + let (asr, er) = match wrapper.backend() { + RequestBackend::Kv => { + self.after_sync_kv( + wrapper, + &txn_db, + &index_state, + &general_revision_state, + to_execute, + ) + .await } - RequestBackend::Alarm => self - .alarm_storage - .after_sync(wrapper, &general_revision_state), - }; - txn_db.write_ops(wr_ops)?; + RequestBackend::Auth | RequestBackend::Lease | RequestBackend::Alarm => { + self.after_sync_others( + wrapper, + &txn_db, + &general_revision_state, + &auth_revision_state, + to_execute, + ) + .await + } + }?; resps.push((asr, er)); if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper { if compact_req.physical { if let Some(n) = self.compact_events.get(&cmd.compact_id()) { - n.notify(usize::MAX); + let _ignore = n.notify(usize::MAX); } } }; if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper { if compact_req.physical { if let Some(n) = self.compact_events.get(&cmd.compact_id()) { - n.notify(usize::MAX); + let _ignore = n.notify(usize::MAX); } } }; diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index d730b4b14..9e96e5bae 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -76,7 +76,7 @@ impl KvServer { fn do_serializable(&self, command: &Command) -> Result { self.auth_storage .check_permission(command.request(), command.auth_info())?; - let cmd_res = self.kv_storage.execute(command.request())?; + let cmd_res = self.kv_storage.execute(command.request(), None)?; Ok(Self::parse_response_op(cmd_res.into_inner().into())) } diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index a23befdc5..5953e4d4f 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -457,6 +457,7 @@ mod test { &txn, &store.index().state(), &store.revision_gen().state(), + false, ) .await .unwrap(); diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index fd3770e74..d89f142f9 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -474,8 +474,6 @@ impl XlineServer { Arc::clone(&alarm_storage), Arc::clone(&db), Arc::clone(&id_barrier), - header_gen.general_revision_arc(), - header_gen.auth_revision_arc(), Arc::clone(&compact_events), self.storage_config.quota, )); diff --git a/crates/xline/src/storage/auth_store/store.rs b/crates/xline/src/storage/auth_store/store.rs index 27eddeeba..d0ed710fb 100644 --- a/crates/xline/src/storage/auth_store/store.rs +++ b/crates/xline/src/storage/auth_store/store.rs @@ -1156,6 +1156,11 @@ impl AuthStore { self.create_permission_cache()?; Ok(()) } + + /// Gets the auth revision generator + pub(crate) fn revision_gen(&self) -> Arc { + Arc::clone(&self.revision) + } } /// Get common name from tonic request @@ -1166,13 +1171,6 @@ fn get_cn(request: &tonic::Request) -> Option { cert.subject_common_name() } -impl AuthStore { - /// Gets the auth revision generator - pub(crate) fn revision_gen(&self) -> Arc { - Arc::clone(&self.revision) - } -} - #[cfg(test)] mod test { use std::collections::HashMap; diff --git a/crates/xline/src/storage/compact/revision_compactor.rs b/crates/xline/src/storage/compact/revision_compactor.rs index 49d52daa5..cd6619a3b 100644 --- a/crates/xline/src/storage/compact/revision_compactor.rs +++ b/crates/xline/src/storage/compact/revision_compactor.rs @@ -135,11 +135,14 @@ mod test { // auto_compactor works successfully assert_eq!(revision_compactor.do_compact(None).await, Some(10)); revision_gen_state.next(); // current revision: 111 + revision_gen_state.commit(); assert_eq!(revision_compactor.do_compact(Some(10)).await, Some(11)); revision_compactor.pause(); revision_gen_state.next(); // current revision 112 + revision_gen_state.commit(); assert!(revision_compactor.do_compact(Some(11)).await.is_none()); revision_gen_state.next(); // current revision 113 + revision_gen_state.commit(); assert!(revision_compactor.do_compact(Some(11)).await.is_none()); revision_compactor.resume(); assert_eq!(revision_compactor.do_compact(Some(11)).await, Some(13)); diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index ec474894c..13b7bac1f 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -104,7 +104,8 @@ impl KvStoreInner { /// Get `KeyValue` of a range /// - /// If `range_end` is `&[]`, this function will return one or zero `KeyValue`. + /// If `range_end` is `&[]`, this function will return one or zero + /// `KeyValue`. fn get_range( txn_db: &T, index: &dyn IndexOperate, @@ -119,16 +120,20 @@ impl KvStoreInner { Self::get_values(txn_db, &revisions) } - /// Get `KeyValue` of a range with limit and count only, return kvs and total count - fn get_range_with_opts( - txn_db: &Transaction, + /// Get `KeyValue` of a range with limit and count only, return kvs and + /// total count + fn get_range_with_opts( + txn_db: &T, index: &dyn IndexOperate, key: &[u8], range_end: &[u8], revision: i64, limit: usize, count_only: bool, - ) -> Result<(Vec, usize), ExecuteError> { + ) -> Result<(Vec, usize), ExecuteError> + where + T: XlineStorageOps, + { let mut revisions = index.get(key, range_end, revision); let total = revisions.len(); if count_only || total == 0 { @@ -220,11 +225,12 @@ impl KvStore { txn_db: &T, index: &(dyn IndexOperate + Send + Sync), revision_gen: &RevisionNumberGeneratorState<'_>, - ) -> Result + to_execute: bool, + ) -> Result<(SyncResponse, Option), ExecuteError> where T: XlineStorageOps + TransactionApi, { - self.sync_request(request, txn_db, index, revision_gen) + self.sync_request(request, txn_db, index, revision_gen, to_execute) .await } @@ -591,7 +597,7 @@ impl KvStore { // As we store use revision as key in the DB storage, // a fake revision needs to be used during speculative execution let fake_revision = i64::MAX; - self.execute_txn(&txn_db, index, req, fake_revision, &mut 0) + self.execute_txn(txn_db, index, req, fake_revision, &mut 0) .map(Into::into)? } RequestWrapper::CompactionRequest(ref req) => { @@ -604,13 +610,16 @@ impl KvStore { Ok(res) } - /// Handle `RangeRequest` - fn execute_range( + /// Execute `RangeRequest` + fn execute_range( &self, - tnx_db: &Transaction, + tnx_db: &T, index: &dyn IndexOperate, req: &RangeRequest, - ) -> Result { + ) -> Result + where + T: XlineStorageOps, + { req.check_revision(self.compacted_revision(), self.revision())?; let storage_fetch_limit = if (req.sort_order() != SortOrder::None) @@ -664,13 +673,16 @@ impl KvStore { } /// Generates `PutResponse` - fn generate_put_resp( + fn generate_put_resp( &self, req: &PutRequest, - txn_db: &Transaction, + txn_db: &T, prev_rev: Option, - ) -> Result<(PutResponse, Option), ExecuteError> { - let response = PutResponse { + ) -> Result<(PutResponse, Option), ExecuteError> + where + T: XlineStorageOps, + { + let mut response = PutResponse { header: Some(self.header_gen.gen_header()), ..Default::default() }; @@ -684,13 +696,16 @@ impl KvStore { if prev_kv.is_none() && (req.ignore_lease || req.ignore_value) { return Err(ExecuteError::KeyNotFound); } + if req.prev_kv { + response.prev_kv = prev_kv.clone(); + } return Ok((response, prev_kv)); } Ok((response, None)) } - /// Handle `PutRequest` + /// Execute `PutRequest` fn execute_put( &self, txn_db: &Transaction, @@ -700,25 +715,22 @@ impl KvStore { let prev_rev = (req.prev_kv || req.ignore_lease || req.ignore_value) .then(|| index.current_rev(&req.key)) .flatten(); - let (mut response, prev_kv) = + let (response, _prev_kv) = self.generate_put_resp(req, txn_db, prev_rev.map(|key_rev| key_rev.as_revision()))?; - if req.prev_kv { - response.prev_kv = prev_kv; - } Ok(response) } - /// Handle `PutRequest` + /// Execute `PutRequest` in Txn fn execute_txn_put( &self, txn_db: &Transaction, - index: &mut dyn IndexOperate, + index: &dyn IndexOperate, req: &PutRequest, revision: i64, sub_revision: &mut i64, ) -> Result { let (new_rev, prev_rev) = index.register_revision(req.key.clone(), revision, *sub_revision); - let (mut response, prev_kv) = + let (response, prev_kv) = self.generate_put_resp(req, txn_db, prev_rev.map(|key_rev| key_rev.as_revision()))?; let mut kv = KeyValue { key: req.key.clone(), @@ -745,9 +757,6 @@ impl KvStore { .value .clone(); } - if req.prev_kv { - response.prev_kv = prev_kv; - } txn_db.write_op(WriteOp::PutKeyValue(new_rev.as_revision(), kv.clone()))?; *sub_revision = sub_revision.overflow_add(1); @@ -776,7 +785,7 @@ impl KvStore { Ok(response) } - /// Handle `DeleteRangeRequest` + /// Execute `DeleteRangeRequest` fn execute_delete_range( &self, txn_db: &T, @@ -789,11 +798,11 @@ impl KvStore { self.generate_delete_range_resp(req, txn_db, index) } - /// Handle `DeleteRangeRequest` + /// Execute `DeleteRangeRequest` in Txn fn execute_txn_delete_range( &self, txn_db: &T, - index: &mut dyn IndexOperate, + index: &dyn IndexOperate, req: &DeleteRangeRequest, revision: i64, sub_revision: &mut i64, @@ -814,7 +823,7 @@ impl KvStore { Ok(response) } - /// Handle `TxnRequest` + /// Execute `TxnRequest` fn execute_txn( &self, txn_db: &Transaction, @@ -859,7 +868,7 @@ impl KvStore { }) } - /// Handle `CompactionRequest` + /// Execute `CompactionRequest` fn execute_compaction( &self, req: &CompactionRequest, @@ -880,14 +889,15 @@ impl KvStore { /// Sync requests impl KvStore { - /// Handle kv requests + /// Sync kv requests async fn sync_request( &self, wrapper: &RequestWrapper, txn_db: &T, index: &(dyn IndexOperate + Send + Sync), revision_gen: &RevisionNumberGeneratorState<'_>, - ) -> Result + to_execute: bool, + ) -> Result<(SyncResponse, Option), ExecuteError> where T: XlineStorageOps + TransactionApi, { @@ -897,36 +907,57 @@ impl KvStore { let next_revision = revision_gen.get().overflow_add(1); #[allow(clippy::wildcard_enum_match_arm)] - let events = match *wrapper { - RequestWrapper::RangeRequest(_) => { - vec![] + let (events, execute_response): (_, Option) = match *wrapper { + RequestWrapper::RangeRequest(ref req) => { + self.sync_range(txn_db, index, req, to_execute) } RequestWrapper::PutRequest(ref req) => { - self.sync_put(txn_db, index, req, next_revision, &mut 0)? + self.sync_put(txn_db, index, req, next_revision, &mut 0, to_execute) } RequestWrapper::DeleteRangeRequest(ref req) => { - self.sync_delete_range(txn_db, index, req, next_revision, &mut 0)? + self.sync_delete_range(txn_db, index, req, next_revision, &mut 0, to_execute) } RequestWrapper::TxnRequest(ref req) => { - self.sync_txn(txn_db, index, req, next_revision, &mut 0)? + self.sync_txn(txn_db, index, req, next_revision, &mut 0, to_execute) + } + RequestWrapper::CompactionRequest(ref req) => { + self.sync_compaction(req, to_execute).await } - RequestWrapper::CompactionRequest(ref req) => self.sync_compaction(req).await?, _ => unreachable!("Other request should not be sent to this store"), - }; + }?; - let response = if events.is_empty() { + let sync_response = if events.is_empty() { SyncResponse::new(revision_gen.get()) } else { self.notify_updates(next_revision, events).await; SyncResponse::new(revision_gen.next()) }; - tracing::warn!("sync response: {response:?}"); + tracing::warn!("sync response: {sync_response:?}"); - Ok(response) + Ok((sync_response, execute_response.map(CommandResponse::new))) + } + + /// Sync `RangeRequest` + fn sync_range( + &self, + txn_db: &T, + index: &dyn IndexOperate, + req: &RangeRequest, + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> + where + T: XlineStorageOps, + { + Ok(( + vec![], + to_execute + .then(|| self.execute_range(txn_db, index, req).map(Into::into)) + .transpose()?, + )) } - /// Handle `PutRequest` + /// Sync `PutRequest` fn sync_put( &self, txn_db: &T, @@ -934,7 +965,8 @@ impl KvStore { req: &PutRequest, revision: i64, sub_revision: &mut i64, - ) -> Result, ExecuteError> + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> where T: XlineStorageOps, { @@ -976,15 +1008,28 @@ impl KvStore { txn_db.write_op(WriteOp::PutKeyValue(new_rev.as_revision(), kv.clone()))?; *sub_revision = sub_revision.overflow_add(1); - Ok(vec![Event { + let events = vec![Event { #[allow(clippy::as_conversions)] // This cast is always valid r#type: EventType::Put as i32, kv: Some(kv), prev_kv: None, - }]) + }]; + + let execute_resp = to_execute + .then(|| { + self.generate_put_resp( + req, + txn_db, + prev_rev_opt.map(|key_rev| key_rev.as_revision()), + ) + .map(|(resp, _)| resp.into()) + }) + .transpose()?; + + Ok((events, execute_resp)) } - /// Handle `DeleteRangeRequest` + /// Sync `DeleteRangeRequest` fn sync_delete_range( &self, txn_db: &T, @@ -992,7 +1037,8 @@ impl KvStore { req: &DeleteRangeRequest, revision: i64, sub_revision: &mut i64, - ) -> Result, ExecuteError> + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> where T: XlineStorageOps, { @@ -1007,10 +1053,15 @@ impl KvStore { Self::detach_leases(&keys, &self.lease_collection); - Ok(Self::new_deletion_events(revision, keys)) + let execute_resp = to_execute + .then(|| self.generate_delete_range_resp(req, txn_db, index)) + .transpose()? + .map(Into::into); + + Ok((Self::new_deletion_events(revision, keys), execute_resp)) } - /// Handle `TxnRequest` + /// Sync `TxnRequest` fn sync_txn( &self, txn_db: &T, @@ -1018,7 +1069,8 @@ impl KvStore { request: &TxnRequest, revision: i64, sub_revision: &mut i64, - ) -> Result, ExecuteError> + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> where T: XlineStorageOps, { @@ -1034,33 +1086,50 @@ impl KvStore { request.failure.iter() }; - let events = requests + let (events, resps): (Vec<_>, Vec<_>) = requests .filter_map(|op| op.request.as_ref()) .map(|req| match *req { - Request::RequestRange(_) => Ok(vec![]), + Request::RequestRange(ref r) => self.sync_range(txn_db, index, r, to_execute), Request::RequestTxn(ref r) => { - self.sync_txn(txn_db, index, r, revision, sub_revision) + self.sync_txn(txn_db, index, r, revision, sub_revision, to_execute) } Request::RequestPut(ref r) => { - self.sync_put(txn_db, index, r, revision, sub_revision) + self.sync_put(txn_db, index, r, revision, sub_revision, to_execute) } Request::RequestDeleteRange(ref r) => { - self.sync_delete_range(txn_db, index, r, revision, sub_revision) + self.sync_delete_range(txn_db, index, r, revision, sub_revision, to_execute) } }) .collect::, _>>()? .into_iter() - .flatten() - .collect(); + .unzip(); + + let resp = to_execute.then(|| { + TxnResponse { + header: Some(self.header_gen.gen_header()), + succeeded: success, + responses: resps + .into_iter() + .flat_map(Option::into_iter) + .map(Into::into) + .collect(), + } + .into() + }); - Ok(events) + Ok((events.into_iter().flatten().collect(), resp)) } /// Sync `CompactionRequest` and return if kvstore is changed - async fn sync_compaction(&self, req: &CompactionRequest) -> Result, ExecuteError> { + async fn sync_compaction( + &self, + req: &CompactionRequest, + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> { let revision = req.revision; let ops = vec![WriteOp::PutScheduledCompactRevision(revision)]; - // TODO: Remove the physical process logic here. It's better to move into the KvServer + // TODO: Remove the physical process logic here. It's better to move into the + // KvServer let (event, listener) = if req.physical { let event = Arc::new(event_listener::Event::new()); let listener = event.listen(); @@ -1076,7 +1145,13 @@ impl KvStore { } self.inner.db.write_ops(ops)?; - Ok(vec![]) + let resp = to_execute + .then(|| CompactionResponse { + header: Some(self.header_gen.gen_header()), + }) + .map(Into::into); + + Ok((vec![], resp)) } } @@ -1129,7 +1204,8 @@ impl KvStore { .unzip() } - /// Delete keys from index and detach them in lease collection, return all the write operations and events + /// Delete keys from index and detach them in lease collection, return all + /// the write operations and events pub(crate) fn delete_keys( txn_db: &T, index: &dyn IndexOperate, @@ -1299,8 +1375,9 @@ mod test { let index_state = index.state(); let rev_gen_state = store.revision.state(); let _res = store - .after_sync(request, &txn_db, &index_state, &rev_gen_state) + .after_sync(request, &txn_db, &index_state, &rev_gen_state, false) .await?; + txn_db.commit().unwrap(); index_state.commit(); rev_gen_state.commit(); Ok(()) diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 59a585021..f5f1bbbbf 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -592,6 +592,7 @@ mod test { use std::{collections::BTreeMap, time::Duration}; + use engine::TransactionApi; use test_macros::abort_on_panic; use tokio::time::{sleep, timeout}; use utils::config::EngineConfig; @@ -758,9 +759,10 @@ mod test { let rev_gen = store.revision_gen(); let rev_gen_state = rev_gen.state(); store - .after_sync(&req, &txn, &index_state, &rev_gen_state) + .after_sync(&req, &txn, &index_state, &rev_gen_state, false) .await .unwrap(); + txn.commit().unwrap(); index_state.commit(); rev_gen_state.commit(); } diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index 0f3440526..8156333e6 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -342,17 +342,11 @@ impl LeaseStore { } let txn_db = self.db.transaction(); - let mut txn_index = self.index.state(); + let txn_index = self.index.state(); for (key, mut sub_revision) in del_keys.iter().zip(0..) { - let deleted = KvStore::delete_keys( - &txn_db, - &mut txn_index, - key, - &[], - revision, - &mut sub_revision, - )?; + let deleted = + KvStore::delete_keys(&txn_db, &txn_index, key, &[], revision, &mut sub_revision)?; KvStore::detach_leases(&deleted, &self.lease_collection); let mut del_event = KvStore::new_deletion_events(revision, deleted); updates.append(&mut del_event);