diff --git a/Cargo.lock b/Cargo.lock index c169ad2af..f00733ad3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,6 +332,7 @@ dependencies = [ "clap", "clippy-utilities", "etcd-client", + "futures", "indicatif", "rand", "thiserror", @@ -1559,8 +1560,8 @@ dependencies = [ [[package]] name = "madsim" -version = "0.2.27" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +version = "0.2.30" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "ahash", "async-channel", @@ -1590,7 +1591,7 @@ dependencies = [ [[package]] name = "madsim-macros" version = "0.2.12" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "darling 0.14.4", "proc-macro2", @@ -1600,8 +1601,8 @@ dependencies = [ [[package]] name = "madsim-tokio" -version = "0.2.25" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +version = "0.2.28" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "madsim", "spin", @@ -1610,8 +1611,8 @@ dependencies = [ [[package]] name = "madsim-tonic" -version = "0.4.2+0.11.0" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +version = "0.4.2+0.10.0" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "async-stream", "chrono", @@ -1625,8 +1626,8 @@ dependencies = [ [[package]] name = "madsim-tonic-build" -version = "0.4.3+0.11.0" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +version = "0.4.3+0.10.0" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "prettyplease", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index e0220e105..cebe177e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ ignored = ["prost", "workspace-hack"] [patch.crates-io] # This branch update the tonic version for madsim. We should switch to the original etcd-client crate when new version release. -madsim = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" } -madsim-tonic = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" } -madsim-tonic-build = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" } -madsim-tokio = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" } +madsim = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" } +madsim-tonic = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" } +madsim-tonic-build = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" } +madsim-tokio = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" } diff --git a/crates/benchmark/Cargo.toml b/crates/benchmark/Cargo.toml index cc6a1c215..819ae65c1 100644 --- a/crates/benchmark/Cargo.toml +++ b/crates/benchmark/Cargo.toml @@ -17,6 +17,7 @@ anyhow = "1.0.83" clap = { version = "4", features = ["derive"] } clippy-utilities = "0.2.0" etcd-client = { version = "0.13.0", features = ["tls"] } +futures = "0.3.30" indicatif = "0.17.8" rand = "0.8.5" thiserror = "1.0.61" diff --git a/crates/benchmark/src/runner.rs b/crates/benchmark/src/runner.rs index f53063d59..fb167716f 100644 --- a/crates/benchmark/src/runner.rs +++ b/crates/benchmark/src/runner.rs @@ -9,6 +9,7 @@ use std::{ use anyhow::Result; use clippy_utilities::{NumericCast, OverflowArithmetic}; +use futures::future::join_all; use indicatif::ProgressBar; use rand::RngCore; use tokio::{ @@ -158,7 +159,6 @@ impl CommandRunner { /// Create clients async fn create_clients(&self) -> Result> { - let mut clients = Vec::with_capacity(self.args.clients); let client_options = ClientOptions::default().with_client_config(ClientConfig::new( Duration::from_secs(10), Duration::from_secs(5), @@ -180,11 +180,15 @@ impl CommandRunner { } }) .collect::>(); - for _ in 0..self.args.clients { - let client = - BenchClient::new(addrs.clone(), self.args.use_curp, client_options.clone()).await?; - clients.push(client); - } + let clients_futs = std::iter::repeat_with(|| { + BenchClient::new(addrs.clone(), self.args.use_curp, client_options.clone()) + }) + .take(self.args.clients); + let clients = join_all(clients_futs) + .await + .into_iter() + .collect::>()?; + Ok(clients) } diff --git a/crates/curp-external-api/src/cmd.rs b/crates/curp-external-api/src/cmd.rs index c29c221f8..5b282b8bd 100644 --- a/crates/curp-external-api/src/cmd.rs +++ b/crates/curp-external-api/src/cmd.rs @@ -104,6 +104,14 @@ where /// command. fn execute(&self, cmd: &C) -> Result; + /// Execute the read-only command + /// + /// # Errors + /// + /// This function may return an error if there is a problem executing the + /// command. + fn execute_ro(&self, cmd: &C) -> Result<(C::ER, C::ASR), C::Error>; + /// Batch execute the after_sync callback /// /// This `highest_index` means the last log index of the `cmds` diff --git a/crates/curp-test-utils/src/test_cmd.rs b/crates/curp-test-utils/src/test_cmd.rs index 2a7cc980e..c3fa23895 100644 --- a/crates/curp-test-utils/src/test_cmd.rs +++ b/crates/curp-test-utils/src/test_cmd.rs @@ -284,6 +284,16 @@ impl CommandExecutor for TestCE { Ok(result) } + fn execute_ro( + &self, + cmd: &TestCommand, + ) -> Result< + (::ER, ::ASR), + ::Error, + > { + self.execute(cmd).map(|er| (er, LogIndexResult(0))) + } + fn after_sync( &self, cmds: Vec>, diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 378b432d8..739fd9674 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -451,15 +451,23 @@ impl ClientBuilder { impl ClientApi + Send + Sync + 'static, Arc, ), - tonic::transport::Error, + tonic::Status, > { - let state = Arc::new(self.init_state_builder().build().await?); + let state = Arc::new( + self.init_state_builder() + .build() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?, + ); + let client = Retry::new( Unary::new(Arc::clone(&state), self.init_unary_config()), self.init_retry_config(), Some(self.spawn_bg_tasks(Arc::clone(&state))), ); let client_id = state.clone_client_id(); + self.wait_for_client_id(state).await?; + Ok((client, client_id)) } } diff --git a/crates/curp/src/client/retry.rs b/crates/curp/src/client/retry.rs index 607623e4f..ee9e3d6c1 100644 --- a/crates/curp/src/client/retry.rs +++ b/crates/curp/src/client/retry.rs @@ -224,9 +224,9 @@ where token: Option<&String>, use_fast_path: bool, ) -> Result, tonic::Status> { - let propose_id = self.inner.gen_propose_id()?; - self.retry::<_, _>(|client| { - RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path) + self.retry::<_, _>(|client| async move { + let propose_id = self.inner.gen_propose_id()?; + RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path).await }) .await } @@ -236,19 +236,23 @@ where &self, changes: Vec, ) -> Result, tonic::Status> { - let propose_id = self.inner.gen_propose_id()?; self.retry::<_, _>(|client| { let changes_c = changes.clone(); - RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c) + async move { + let propose_id = self.inner.gen_propose_id()?; + RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c).await + } }) .await } /// Send propose to shutdown cluster async fn propose_shutdown(&self) -> Result<(), tonic::Status> { - let propose_id = self.inner.gen_propose_id()?; - self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, *propose_id)) - .await + self.retry::<_, _>(|client| async move { + let propose_id = self.inner.gen_propose_id()?; + RepeatableClientApi::propose_shutdown(client, *propose_id).await + }) + .await } /// Send propose to publish a node id and name @@ -258,17 +262,20 @@ where node_name: String, node_client_urls: Vec, ) -> Result<(), Self::Error> { - let propose_id = self.inner.gen_propose_id()?; self.retry::<_, _>(|client| { let name_c = node_name.clone(); let node_client_urls_c = node_client_urls.clone(); - RepeatableClientApi::propose_publish( - client, - *propose_id, - node_id, - name_c, - node_client_urls_c, - ) + async move { + let propose_id = self.inner.gen_propose_id()?; + RepeatableClientApi::propose_publish( + client, + *propose_id, + node_id, + name_c, + node_client_urls_c, + ) + .await + } }) .await } diff --git a/crates/curp/src/client/tests.rs b/crates/curp/src/client/tests.rs index 32c177183..9db79c303 100644 --- a/crates/curp/src/client/tests.rs +++ b/crates/curp/src/client/tests.rs @@ -1,10 +1,10 @@ use std::{ collections::HashMap, - sync::{atomic::AtomicU64, Arc}, - time::Duration, + sync::{atomic::AtomicU64, Arc, Mutex}, + time::{Duration, Instant}, }; -use curp_test_utils::test_cmd::{TestCommand, TestCommandResult}; +use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult}; use futures::{future::BoxFuture, Stream}; #[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; @@ -19,7 +19,10 @@ use super::{ unary::{Unary, UnaryConfig}, }; use crate::{ - client::ClientApi, + client::{ + retry::{Retry, RetryConfig}, + ClientApi, + }, members::ServerId, rpc::{ connect::{ConnectApi, MockConnectApi}, @@ -257,7 +260,8 @@ async fn test_unary_fetch_clusters_linearizable_failed() { }); let unary = init_unary_client(connects, None, None, 0, 0, None); let res = unary.fetch_cluster(true).await.unwrap_err(); - // only server(0, 1)'s responses are valid, less than majority quorum(3), got a mocked RpcTransport to retry + // only server(0, 1)'s responses are valid, less than majority quorum(3), got a + // mocked RpcTransport to retry assert_eq!(res, CurpError::RpcTransport(())); } @@ -276,79 +280,71 @@ fn build_synced_response() -> OpResponse { // TODO: rewrite this tests #[cfg(ignore)] +fn build_empty_response() -> OpResponse { + OpResponse { op: None } +} + #[traced_test] #[tokio::test] async fn test_unary_propose_fast_path_works() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - let resp = match id { - 0 => ProposeResponse::new_result::( - &Ok(TestCommandResult::default()), - false, - ), - 1 | 2 | 3 => ProposeResponse::new_empty(), - 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), + assert_eq!(id, 0, "followers should not receive propose"); + let resp = async_stream::stream! { + yield Ok(build_propose_response(false)); + yield Ok(build_synced_response()); }; - Ok(tonic::Response::new(resp)) - }); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - std::thread::sleep(Duration::from_millis(100)); - Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< - TestCommand, - >( - Ok(TestCommandResult::default()), - Some(Ok(1.into())), - ))) + Ok(tonic::Response::new(Box::new(resp))) }); + conn.expect_record().return_once(move |_req, _timeout| { + let resp = match id { + 0 => unreachable!("leader should not receive record request"), + 1 | 2 | 3 => RecordResponse { conflict: false }, + 4 => RecordResponse { conflict: true }, + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let res = unary - .propose(&TestCommand::default(), None, true) + .propose(&TestCommand::new_put(vec![1], 1), None, true) .await .unwrap() .unwrap(); assert_eq!(res, (TestCommandResult::default(), None)); } -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_unary_propose_slow_path_works() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - let resp = match id { - 0 => ProposeResponse::new_result::( - &Ok(TestCommandResult::default()), - false, - ), - 1 | 2 | 3 => ProposeResponse::new_empty(), - 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), + assert_eq!(id, 0, "followers should not receive propose"); + let resp = async_stream::stream! { + yield Ok(build_propose_response(false)); + tokio::time::sleep(Duration::from_millis(100)).await; + yield Ok(build_synced_response()); }; - Ok(tonic::Response::new(resp)) - }); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - std::thread::sleep(Duration::from_millis(100)); - Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< - TestCommand, - >( - Ok(TestCommandResult::default()), - Some(Ok(1.into())), - ))) + Ok(tonic::Response::new(Box::new(resp))) }); + conn.expect_record().return_once(move |_req, _timeout| { + let resp = match id { + 0 => unreachable!("leader should not receive record request"), + 1 | 2 | 3 => RecordResponse { conflict: false }, + 4 => RecordResponse { conflict: true }, + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); + let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let start_at = Instant::now(); let res = unary - .propose(&TestCommand::default(), None, false) + .propose(&TestCommand::new_put(vec![1], 1), None, false) .await .unwrap() .unwrap(); @@ -362,42 +358,36 @@ async fn test_unary_propose_slow_path_works() { ); } -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_unary_propose_fast_path_fallback_slow_path() { + // record how many times `handle_propose` was invoked. let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - // insufficient quorum to force slow path. - let resp = match id { - 0 => ProposeResponse::new_result::( - &Ok(TestCommandResult::default()), - false, - ), - 1 | 2 => ProposeResponse::new_empty(), - 3 | 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), + assert_eq!(id, 0, "followers should not receive propose"); + let resp = async_stream::stream! { + yield Ok(build_propose_response(false)); + tokio::time::sleep(Duration::from_millis(100)).await; + yield Ok(build_synced_response()); }; - Ok(tonic::Response::new(resp)) - }); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - std::thread::sleep(Duration::from_millis(100)); - Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< - TestCommand, - >( - Ok(TestCommandResult::default()), - Some(Ok(1.into())), - ))) + Ok(tonic::Response::new(Box::new(resp))) }); + // insufficient quorum + conn.expect_record().return_once(move |_req, _timeout| { + let resp = match id { + 0 => unreachable!("leader should not receive record request"), + 1 | 2 => RecordResponse { conflict: false }, + 3 | 4 => RecordResponse { conflict: true }, + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let start_at = Instant::now(); let res = unary - .propose(&TestCommand::default(), None, true) + .propose(&TestCommand::new_put(vec![1], 1), None, true) .await .unwrap() .unwrap(); @@ -405,14 +395,13 @@ async fn test_unary_propose_fast_path_fallback_slow_path() { start_at.elapsed() > Duration::from_millis(100), "slow round takes at least 100ms" ); + // indicate that we actually run out of fast round assert_eq!( res, (TestCommandResult::default(), Some(LogIndexResult::from(1))) ); } -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_unary_propose_return_early_err() { @@ -428,26 +417,22 @@ async fn test_unary_propose_return_early_err() { assert!(early_err.should_abort_fast_round()); // record how many times rpc was invoked. let counter = Arc::new(Mutex::new(0)); - let connects = init_mocked_connects(5, |id, conn| { + let connects = init_mocked_connects(5, |_id, conn| { let err = early_err.clone(); let counter_c = Arc::clone(&counter); - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - counter_c.lock().unwrap().add_assign(1); + *counter_c.lock().unwrap() += 1; Err(err) }); + let err = early_err.clone(); - let counter_c = Arc::clone(&counter); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - counter_c.lock().unwrap().add_assign(1); - Err(err) - }); + conn.expect_record() + .return_once(move |_req, _timeout| Err(err)); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let err = unary - .propose(&TestCommand::default(), None, true) + .propose(&TestCommand::new_put(vec![1], 1), None, true) .await .unwrap_err(); assert_eq!(err, early_err); @@ -457,8 +442,6 @@ async fn test_unary_propose_return_early_err() { // Tests for retry layer -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_retry_propose_return_no_retry_error() { @@ -471,22 +454,18 @@ async fn test_retry_propose_return_no_retry_error() { ] { // record how many times rpc was invoked. let counter = Arc::new(Mutex::new(0)); - let connects = init_mocked_connects(5, |id, conn| { + let connects = init_mocked_connects(5, |_id, conn| { let err = early_err.clone(); let counter_c = Arc::clone(&counter); - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - counter_c.lock().unwrap().add_assign(1); + *counter_c.lock().unwrap() += 1; Err(err) }); + let err = early_err.clone(); - let counter_c = Arc::clone(&counter); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - counter_c.lock().unwrap().add_assign(1); - Err(err) - }); + conn.expect_record() + .return_once(move |_req, _timeout| Err(err)); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let retry = Retry::new( @@ -495,27 +474,22 @@ async fn test_retry_propose_return_no_retry_error() { None, ); let err = retry - .propose(&TestCommand::default(), None, false) + .propose(&TestCommand::new_put(vec![1], 1), None, false) .await .unwrap_err(); assert_eq!(err.message(), tonic::Status::from(early_err).message()); - // fast path + slow path = 2 - assert_eq!(*counter.lock().unwrap(), 2); + assert_eq!(*counter.lock().unwrap(), 1); } } -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_retry_propose_return_retry_error() { for early_err in [ - CurpError::key_conflict(), CurpError::RpcTransport(()), CurpError::internal("No reason"), ] { let connects = init_mocked_connects(5, |id, conn| { - let err = early_err.clone(); conn.expect_fetch_cluster() .returning(move |_req, _timeout| { Ok(tonic::Response::new(FetchClusterResponse { @@ -532,14 +506,16 @@ async fn test_retry_propose_return_retry_error() { cluster_version: 1, })) }); - conn.expect_propose() - .returning(move |_req, _token, _timeout| Err(err.clone())); if id == 0 { let err = early_err.clone(); - conn.expect_wait_synced() - .times(5) // wait synced should be retried in 5 times on leader - .returning(move |_req, _timeout| Err(err.clone())); + conn.expect_propose_stream() + .times(5) // propose should be retried in 5 times on leader + .returning(move |_req, _token, _timeout| Err(err.clone())); } + + let err = early_err.clone(); + conn.expect_record() + .returning(move |_req, _timeout| Err(err.clone())); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let retry = Retry::new( @@ -548,7 +524,7 @@ async fn test_retry_propose_return_retry_error() { None, ); let err = retry - .propose(&TestCommand::default(), None, false) + .propose(&TestCommand::new_put(vec![1], 1), None, false) .await .unwrap_err(); assert!(err.message().contains("request timeout")); diff --git a/crates/curp/src/client/unary.rs b/crates/curp/src/client/unary.rs index 7c6dc488f..2acf6658a 100644 --- a/crates/curp/src/client/unary.rs +++ b/crates/curp/src/client/unary.rs @@ -270,20 +270,6 @@ impl ClientApi for Unary { /// Send fetch cluster requests to all servers /// Note: The fetched cluster may still be outdated if `linearizable` is false async fn fetch_cluster(&self, linearizable: bool) -> Result { - /// Checks the member list, returns `true` if all member has been published - fn check_members(members: &[Member]) -> bool { - if members.is_empty() { - return false; - } - for member in members { - if member.client_urls.is_empty() { - debug!("new node {} not published yet", member.id()); - return false; - } - } - true - } - let timeout = self.config.wait_synced_timeout; if !linearizable { // firstly, try to fetch the local server @@ -344,14 +330,14 @@ impl ClientApi for Unary { match max_term.cmp(&inner.term) { Ordering::Less => { max_term = inner.term; - if check_members(&inner.members) { + if !inner.members.is_empty() { res = Some(inner); } // reset ok count to 1 ok_cnt = 1; } Ordering::Equal => { - if check_members(&inner.members) { + if !inner.members.is_empty() { res = Some(inner); } ok_cnt += 1; diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index 95a042597..d70cc20e7 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -51,12 +51,7 @@ pub(super) fn execute, RC: RoleChange>( unreachable!("should not speculative execute {:?}", entry.entry_data); }; if cmd.is_read_only() { - let result = ce - .after_sync(vec![AfterSyncCmd::new(cmd, true)], None) - .remove(0)?; - let (asr, er_opt) = result.into_parts(); - let er = er_opt.unwrap_or_else(|| unreachable!("er should exist")); - Ok((er, Some(asr))) + ce.execute_ro(cmd).map(|(er, asr)| (er, Some(asr))) } else { let er = ce.execute(cmd); let mut cb_w = cb.write(); diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index 2f56ee520..b1d3929d1 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -380,6 +380,9 @@ impl, RC: RoleChange> CurpNode { ) -> Result { pin_mut!(req_stream); while let Some(req) = req_stream.next().await { + // NOTE: The leader may shutdown itself in configuration change. + // We must first check this situation. + self.curp.check_leader_transfer()?; if self.curp.is_shutdown() { return Err(CurpError::shutting_down()); } diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index e2dbaab8d..e95694aa8 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -318,6 +318,10 @@ impl CurpGroup { &self.nodes[id] } + pub fn get_node_mut(&mut self, id: &ServerId) -> &mut CurpNode { + self.nodes.get_mut(id).unwrap() + } + pub async fn new_client(&self) -> impl ClientApi { let addrs = self.all_addrs().cloned().collect(); ClientBuilder::new(ClientConfig::default(), true) diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index 04c318e8f..9eeb5878a 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -93,14 +93,12 @@ async fn exe_exactly_once_on_leader() { let er = client.propose(&cmd, None, true).await.unwrap().unwrap().0; assert_eq!(er, TestCommandResult::new(vec![], vec![])); + let leader = group.get_leader().await.0; { - let mut exe_futs = group - .exe_rxs() - .map(|rx| rx.recv()) - .collect::>(); - let (cmd1, er) = exe_futs.next().await.unwrap().unwrap(); + let exec_rx = &mut group.get_node_mut(&leader).exe_rx; + let (cmd1, er) = exec_rx.recv().await.unwrap(); assert!( - tokio::time::timeout(Duration::from_millis(100), exe_futs.next()) + tokio::time::timeout(Duration::from_millis(100), exec_rx.recv()) .await .is_err() ); @@ -262,7 +260,7 @@ async fn concurrent_cmd_order_should_have_correct_revision() { let sample_range = 1..=100; for i in sample_range.clone() { - let rand_dur = Duration::from_millis(thread_rng().gen_range(0..500).numeric_cast()); + let rand_dur = Duration::from_millis(thread_rng().gen_range(0..50).numeric_cast()); let _er = client .propose( &TestCommand::new_put(vec![i], i).set_as_dur(rand_dur), diff --git a/crates/simulation/src/xline_group.rs b/crates/simulation/src/xline_group.rs index eb97322d2..0f61892b5 100644 --- a/crates/simulation/src/xline_group.rs +++ b/crates/simulation/src/xline_group.rs @@ -55,7 +55,7 @@ impl XlineGroup { vec!["0.0.0.0:2379".to_owned()], vec![format!("192.168.1.{}:2379", i + 1)], all.clone(), - false, + i == 0, CurpConfig::default(), ClientConfig::default(), ServerTimeout::default(), diff --git a/crates/simulation/tests/it/curp/server_recovery.rs b/crates/simulation/tests/it/curp/server_recovery.rs index e14abd406..7e8a88ccf 100644 --- a/crates/simulation/tests/it/curp/server_recovery.rs +++ b/crates/simulation/tests/it/curp/server_recovery.rs @@ -116,9 +116,15 @@ async fn leader_and_follower_both_crash_and_recovery() { let follower = *group.nodes.keys().find(|&id| id != &leader).unwrap(); group.crash(follower).await; + let _wait_up = client + .propose(TestCommand::new_get(vec![0]), true) + .await + .unwrap() + .unwrap(); + assert_eq!( client - .propose(TestCommand::new_put(vec![0], 0), true) + .propose(TestCommand::new_put(vec![0], 0), false) .await .unwrap() .unwrap() @@ -126,16 +132,6 @@ async fn leader_and_follower_both_crash_and_recovery() { .values, Vec::::new(), ); - assert_eq!( - client - .propose(TestCommand::new_get(vec![0]), true) - .await - .unwrap() - .unwrap() - .0 - .values, - vec![0] - ); group.crash(leader).await; diff --git a/crates/xline/src/server/auth_server.rs b/crates/xline/src/server/auth_server.rs index 33a0949ef..bd285d926 100644 --- a/crates/xline/src/server/auth_server.rs +++ b/crates/xline/src/server/auth_server.rs @@ -51,7 +51,6 @@ impl AuthServer { async fn propose( &self, request: tonic::Request, - use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into, @@ -59,7 +58,7 @@ impl AuthServer { let auth_info = self.auth_store.try_get_auth_info_from_request(&request)?; let request = request.into_inner().into(); let cmd = Command::new_with_auth_info(request, auth_info); - let res = self.client.propose(&cmd, None, use_fast_path).await??; + let res = self.client.propose(&cmd, None, false).await??; Ok(res) } @@ -67,13 +66,12 @@ impl AuthServer { async fn handle_req( &self, request: tonic::Request, - use_fast_path: bool, ) -> Result, tonic::Status> where Req: Into, Res: From, { - let (cmd_res, sync_res) = self.propose(request, use_fast_path).await?; + let (cmd_res, sync_res) = self.propose(request).await?; let mut res_wrapper = cmd_res.into_inner(); if let Some(sync_res) = sync_res { res_wrapper.update_revision(sync_res.revision()); @@ -89,7 +87,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthEnableRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn auth_disable( @@ -97,7 +95,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthDisableRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn auth_status( @@ -105,8 +103,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthStatusRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn authenticate( @@ -114,7 +111,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthenticateRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_add( @@ -128,7 +125,7 @@ impl Auth for AuthServer { .map_err(|err| tonic::Status::internal(format!("Failed to hash password: {err}")))?; user_add_req.hashed_password = hashed_password; user_add_req.password = String::new(); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_get( @@ -136,8 +133,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserGetRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn user_list( @@ -145,8 +141,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserListRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn user_delete( @@ -154,7 +149,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserDeleteRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_change_password( @@ -167,7 +162,7 @@ impl Auth for AuthServer { .map_err(|err| tonic::Status::internal(format!("Failed to hash password: {err}")))?; user_change_password_req.hashed_password = hashed_password; user_change_password_req.password = String::new(); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_grant_role( @@ -175,7 +170,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserGrantRoleRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_revoke_role( @@ -183,7 +178,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserRevokeRoleRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn role_add( @@ -192,7 +187,7 @@ impl Auth for AuthServer { ) -> Result, tonic::Status> { debug!("Receive AuthRoleAddRequest {:?}", request); request.get_ref().validation()?; - self.handle_req(request, false).await + self.handle_req(request).await } async fn role_get( @@ -200,8 +195,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthRoleGetRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn role_list( @@ -209,8 +203,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthRoleListRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn role_delete( @@ -218,7 +211,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthRoleDeleteRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn role_grant_permission( @@ -230,7 +223,7 @@ impl Auth for AuthServer { request.get_ref() ); request.get_ref().validation()?; - self.handle_req(request, false).await + self.handle_req(request).await } async fn role_revoke_permission( @@ -241,6 +234,6 @@ impl Auth for AuthServer { "Receive AuthRoleRevokePermissionRequest {}", request.get_ref() ); - self.handle_req(request, false).await + self.handle_req(request).await } } diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index 423e91739..cd564729d 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -15,7 +15,7 @@ use parking_lot::RwLock; use tracing::warn; use utils::{barrier::IdBarrier, table_names::META_TABLE}; use xlineapi::{ - command::{Command, CurpClient}, + command::{Command, CurpClient, SyncResponse}, execute_error::ExecuteError, AlarmAction, AlarmRequest, AlarmType, }; @@ -295,10 +295,11 @@ impl CommandExecutor { } /// After sync other type of commands - fn after_sync_others( + fn after_sync_others( &self, wrapper: &RequestWrapper, txn_db: &T, + index: &I, general_revision: &RevisionNumberGeneratorState<'_>, auth_revision: &RevisionNumberGeneratorState<'_>, to_execute: bool, @@ -311,6 +312,7 @@ impl CommandExecutor { > where T: XlineStorageOps + TransactionApi, + I: IndexOperate, { let er = to_execute .then(|| match wrapper.backend() { @@ -323,7 +325,10 @@ impl CommandExecutor { let (asr, wr_ops) = match wrapper.backend() { RequestBackend::Auth => self.auth_storage.after_sync(wrapper, auth_revision)?, - RequestBackend::Lease => self.lease_storage.after_sync(wrapper, general_revision)?, + RequestBackend::Lease => { + self.lease_storage + .after_sync(wrapper, general_revision, txn_db, index)? + } RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, general_revision), RequestBackend::Kv => unreachable!("Should not sync kv commands"), }; @@ -424,6 +429,24 @@ impl CurpCommandExecutor for CommandExecutor { } } + fn execute_ro( + &self, + cmd: &Command, + ) -> Result< + (::ER, ::ASR), + ::Error, + > { + let er = self.execute(cmd)?; + let wrapper = cmd.request(); + let rev = match wrapper.backend() { + RequestBackend::Kv | RequestBackend::Lease | RequestBackend::Alarm => { + self.kv_storage.revision_gen().get() + } + RequestBackend::Auth => self.auth_storage.revision_gen().get(), + }; + Ok((er, SyncResponse::new(rev))) + } + fn after_sync( &self, cmds: Vec>, @@ -473,6 +496,7 @@ impl CurpCommandExecutor for CommandExecutor { .after_sync_others( wrapper, &txn_db, + &index_state, &general_revision_state, &auth_revision_state, to_execute, diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index 1bdf482c7..7e87064f3 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -258,7 +258,7 @@ impl Kv for KvServer { } else { Either::Right(async {}) }; - let (cmd_res, _sync_res) = self.client.propose(&cmd, None, !physical).await??; + let (cmd_res, _sync_res) = self.client.propose(&cmd, None, false).await??; let resp = cmd_res.into_inner(); if timeout(self.compact_timeout, compact_physical_fut) .await diff --git a/crates/xline/src/server/lease_server.rs b/crates/xline/src/server/lease_server.rs index 931abb015..d528c1c8d 100644 --- a/crates/xline/src/server/lease_server.rs +++ b/crates/xline/src/server/lease_server.rs @@ -119,7 +119,6 @@ impl LeaseServer { async fn propose( &self, request: tonic::Request, - use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into, @@ -127,7 +126,7 @@ impl LeaseServer { let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?; let request = request.into_inner().into(); let cmd = Command::new_with_auth_info(request, auth_info); - let res = self.client.propose(&cmd, None, use_fast_path).await??; + let res = self.client.propose(&cmd, None, false).await??; Ok(res) } @@ -255,8 +254,7 @@ impl Lease for LeaseServer { lease_grant_req.id = self.id_gen.next(); } - let is_fast_path = true; - let (res, sync_res) = self.propose(request, is_fast_path).await?; + let (res, sync_res) = self.propose(request).await?; let mut res: LeaseGrantResponse = res.into_inner().into(); if let Some(sync_res) = sync_res { @@ -276,8 +274,7 @@ impl Lease for LeaseServer { ) -> Result, tonic::Status> { debug!("Receive LeaseRevokeRequest {:?}", request); - let is_fast_path = true; - let (res, sync_res) = self.propose(request, is_fast_path).await?; + let (res, sync_res) = self.propose(request).await?; let mut res: LeaseRevokeResponse = res.into_inner().into(); if let Some(sync_res) = sync_res { @@ -378,8 +375,7 @@ impl Lease for LeaseServer { ) -> Result, tonic::Status> { debug!("Receive LeaseLeasesRequest {:?}", request); - let is_fast_path = true; - let (res, sync_res) = self.propose(request, is_fast_path).await?; + let (res, sync_res) = self.propose(request).await?; let mut res: LeaseLeasesResponse = res.into_inner().into(); if let Some(sync_res) = sync_res { diff --git a/crates/xline/src/server/lock_server.rs b/crates/xline/src/server/lock_server.rs index f5649cb8c..dff302508 100644 --- a/crates/xline/src/server/lock_server.rs +++ b/crates/xline/src/server/lock_server.rs @@ -71,14 +71,13 @@ impl LockServer { &self, request: T, auth_info: Option, - use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into, { let request = request.into(); let cmd = Command::new_with_auth_info(request, auth_info); - let res = self.client.propose(&cmd, None, use_fast_path).await??; + let res = self.client.propose(&cmd, None, false).await??; Ok(res) } @@ -148,7 +147,7 @@ impl LockServer { max_create_revision: rev, ..Default::default() }; - let (cmd_res, _sync_res) = self.propose(get_req, auth_info.cloned(), false).await?; + let (cmd_res, _sync_res) = self.propose(get_req, auth_info.cloned()).await?; let response = Into::::into(cmd_res.into_inner()); let last_key = match response.kvs.first() { Some(kv) => kv.key.clone(), @@ -186,7 +185,7 @@ impl LockServer { key: key.into(), ..Default::default() }; - let (cmd_res, _) = self.propose(del_req, auth_info, true).await?; + let (cmd_res, _) = self.propose(del_req, auth_info).await?; let res = Into::::into(cmd_res.into_inner()); Ok(res.header) } @@ -198,7 +197,7 @@ impl LockServer { ttl: DEFAULT_SESSION_TTL, id: lease_id, }; - let (cmd_res, _) = self.propose(lease_grant_req, auth_info, true).await?; + let (cmd_res, _) = self.propose(lease_grant_req, auth_info).await?; let res = Into::::into(cmd_res.into_inner()); Ok(res.id) } @@ -229,7 +228,7 @@ impl Lock for LockServer { let key = format!("{prefix}{lease_id:x}"); let txn = Self::create_acquire_txn(&prefix, lease_id); - let (cmd_res, sync_res) = self.propose(txn, auth_info.clone(), false).await?; + let (cmd_res, sync_res) = self.propose(txn, auth_info.clone()).await?; let mut txn_res = Into::::into(cmd_res.into_inner()); #[allow(clippy::unwrap_used)] // sync_res always has value when use slow path let my_rev = sync_res.unwrap().revision(); @@ -261,7 +260,7 @@ impl Lock for LockServer { key: key.as_bytes().to_vec(), ..Default::default() }; - let result = self.propose(range_req, auth_info.clone(), true).await; + let result = self.propose(range_req, auth_info.clone()).await; match result { Ok(res) => { let res = Into::::into(res.0.into_inner()); diff --git a/crates/xline/src/server/maintenance.rs b/crates/xline/src/server/maintenance.rs index e8bc522c1..f0ffc01d0 100644 --- a/crates/xline/src/server/maintenance.rs +++ b/crates/xline/src/server/maintenance.rs @@ -84,7 +84,6 @@ impl MaintenanceServer { async fn propose( &self, request: tonic::Request, - use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into + Debug, @@ -92,7 +91,7 @@ impl MaintenanceServer { let auth_info = self.auth_store.try_get_auth_info_from_request(&request)?; let request = request.into_inner().into(); let cmd = Command::new_with_auth_info(request, auth_info); - let res = self.client.propose(&cmd, None, use_fast_path).await??; + let res = self.client.propose(&cmd, None, false).await??; Ok(res) } } @@ -103,8 +102,7 @@ impl Maintenance for MaintenanceServer { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let is_fast_path = true; - let (res, sync_res) = self.propose(request, is_fast_path).await?; + let (res, sync_res) = self.propose(request).await?; let mut res: AlarmResponse = res.into_inner().into(); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index a4b663689..de40466c5 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -228,7 +228,7 @@ impl XlineServer { self.task_manager.spawn(TaskName::CompactBg, |n| { compact_bg_task( Arc::clone(&kv_storage), - Arc::clone(&index), + index, *self.compact_config.compact_batch_size(), *self.compact_config.compact_sleep_interval(), compact_task_rx, @@ -239,7 +239,6 @@ impl XlineServer { Arc::clone(&lease_collection), Arc::clone(&header_gen), Arc::clone(&db), - index, kv_update_tx, *self.cluster_config.is_leader(), )); diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index 44a0cac04..19b8fb20a 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -11,8 +11,6 @@ use std::{ use clippy_utilities::{NumericCast, OverflowArithmetic}; use engine::{Transaction, TransactionApi}; -#[cfg(not(madsim))] -use event_listener::Listener; use prost::Message; use tracing::{debug, warn}; use utils::table_names::{KV_TABLE, META_TABLE}; @@ -149,11 +147,9 @@ impl KvStoreInner { /// Get previous `KeyValue` of a `KeyValue` pub(crate) fn get_prev_kv(&self, kv: &KeyValue) -> Option { - let txn_db = self.db.transaction(); - let index = self.index.state(); Self::get_range( - &txn_db, - &index, + self.db.as_ref(), + self.index.as_ref(), &kv.key, &[], kv.mod_revision.overflow_sub(1), @@ -168,11 +164,10 @@ impl KvStoreInner { key_range: KeyRange, revision: i64, ) -> Result, ExecuteError> { - let txn = self.db.transaction(); let revisions = self.index .get_from_rev(key_range.range_start(), key_range.range_end(), revision); - let events = Self::get_values(&txn, &revisions)? + let events = Self::get_values(self.db.as_ref(), &revisions)? .into_iter() .map(|kv| { // Delete @@ -966,6 +961,17 @@ impl KvStore { { let (new_rev, prev_rev_opt) = index.register_revision(req.key.clone(), revision, *sub_revision); + let execute_resp = to_execute + .then(|| { + self.generate_put_resp( + req, + txn_db, + prev_rev_opt.map(|key_rev| key_rev.as_revision()), + ) + .map(|(resp, _)| resp.into()) + }) + .transpose()?; + let mut kv = KeyValue { key: req.key.clone(), value: req.value.clone(), @@ -1009,17 +1015,6 @@ impl KvStore { prev_kv: None, }]; - let execute_resp = to_execute - .then(|| { - self.generate_put_resp( - req, - txn_db, - prev_rev_opt.map(|key_rev| key_rev.as_revision()), - ) - .map(|(resp, _)| resp.into()) - }) - .transpose()?; - Ok((events, execute_resp)) } @@ -1036,6 +1031,11 @@ impl KvStore { where T: XlineStorageOps, { + let execute_resp = to_execute + .then(|| self.generate_delete_range_resp(req, txn_db, index)) + .transpose()? + .map(Into::into); + let keys = Self::delete_keys( txn_db, index, @@ -1047,11 +1047,6 @@ impl KvStore { Self::detach_leases(&keys, &self.lease_collection); - let execute_resp = to_execute - .then(|| self.generate_delete_range_resp(req, txn_db, index)) - .transpose()? - .map(Into::into); - Ok((Self::new_deletion_events(revision, keys), execute_resp)) } @@ -1124,23 +1119,23 @@ impl KvStore { let ops = vec![WriteOp::PutScheduledCompactRevision(revision)]; // TODO: Remove the physical process logic here. It's better to move into the // KvServer - #[cfg_attr(madsim, allow(unused))] - let (event, listener) = if req.physical { - let event = Arc::new(event_listener::Event::new()); - let listener = event.listen(); - (Some(event), Some(listener)) - } else { - (None, None) - }; - // TODO: sync compaction task - if let Err(e) = self.compact_task_tx.send((revision, event)) { - panic!("the compactor exited unexpectedly: {e:?}"); - } // FIXME: madsim is single threaded, we cannot use synchronous wait here - #[cfg(not(madsim))] - if let Some(listener) = listener { - listener.wait(); + let index = self.index(); + let target_revisions = index + .compact(revision) + .into_iter() + .map(|key_rev| key_rev.as_revision().encode_to_vec()) + .collect::>>(); + // Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance? + for revision_chunk in target_revisions.chunks(1000) { + if let Err(e) = self.compact(revision_chunk) { + panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); + } } + if let Err(e) = self.compact_finished(revision) { + panic!("failed to set finished compact revision {revision:?} due to {e}"); + } + self.inner.db.write_ops(ops)?; let resp = to_execute diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index c396d669a..a6ff9c26a 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -16,6 +16,7 @@ use std::{ time::Duration, }; +use clippy_utilities::OverflowArithmetic; use engine::TransactionApi; use log::debug; use parking_lot::RwLock; @@ -29,7 +30,8 @@ use xlineapi::{ pub(crate) use self::{lease::Lease, lease_collection::LeaseCollection}; use super::{ db::{WriteOp, DB}, - index::Index, + index::IndexOperate, + storage_api::XlineStorageOps, }; use crate::{ header_gen::HeaderGenerator, @@ -52,8 +54,6 @@ pub(crate) struct LeaseStore { lease_collection: Arc, /// Db to store lease db: Arc, - /// Key to revision index - index: Arc, /// Header generator header_gen: Arc, /// KV update sender @@ -72,14 +72,12 @@ impl LeaseStore { lease_collection: Arc, header_gen: Arc, db: Arc, - index: Arc, kv_update_tx: flume::Sender<(i64, Vec)>, is_leader: bool, ) -> Self { Self { lease_collection, db, - index, header_gen, kv_update_tx, is_primary: AtomicBool::new(is_leader), @@ -98,18 +96,26 @@ impl LeaseStore { } /// sync a lease request - pub(crate) fn after_sync( + pub(crate) fn after_sync( &self, request: &RequestWrapper, revision_gen: &RevisionNumberGeneratorState<'_>, - ) -> Result<(SyncResponse, Vec), ExecuteError> { - let revision = if request.skip_lease_revision() { - revision_gen.get() - } else { + txn_db: &T, + index: &I, + ) -> Result<(SyncResponse, Vec), ExecuteError> + where + T: XlineStorageOps + TransactionApi, + I: IndexOperate, + { + let next_revision = revision_gen.get().overflow_add(1); + let updated = self.sync_request(request, next_revision, txn_db, index)?; + let rev = if updated { revision_gen.next() + } else { + revision_gen.get() }; - self.sync_request(request, revision) - .map(|(rev, ops)| (SyncResponse::new(rev), ops)) + // TODO: return only a `SyncResponse` + Ok((SyncResponse::new(rev), vec![])) } /// Get lease by id @@ -273,36 +279,47 @@ impl LeaseStore { } /// Sync `RequestWithToken` - fn sync_request( + fn sync_request( &self, wrapper: &RequestWrapper, revision: i64, - ) -> Result<(i64, Vec), ExecuteError> { + txn_db: &T, + index: &I, + ) -> Result + where + T: XlineStorageOps + TransactionApi, + I: IndexOperate, + { #[allow(clippy::wildcard_enum_match_arm)] - let ops = match *wrapper { + let updated = match *wrapper { RequestWrapper::LeaseGrantRequest(ref req) => { debug!("Sync LeaseGrantRequest {:?}", req); - self.sync_lease_grant_request(req) + self.sync_lease_grant_request(req, txn_db)?; + false } RequestWrapper::LeaseRevokeRequest(ref req) => { debug!("Sync LeaseRevokeRequest {:?}", req); - self.sync_lease_revoke_request(req, revision)? + self.sync_lease_revoke_request(req, revision, txn_db, index)? } RequestWrapper::LeaseLeasesRequest(ref req) => { debug!("Sync LeaseLeasesRequest {:?}", req); - vec![] + false } _ => unreachable!("Other request should not be sent to this store"), }; - Ok((revision, ops)) + Ok(updated) } /// Sync `LeaseGrantRequest` - fn sync_lease_grant_request(&self, req: &LeaseGrantRequest) -> Vec { + fn sync_lease_grant_request( + &self, + req: &LeaseGrantRequest, + txn_db: &T, + ) -> Result<(), ExecuteError> { let lease = self .lease_collection .grant(req.id, req.ttl, self.is_primary()); - vec![WriteOp::PutLease(lease)] + txn_db.write_op(WriteOp::PutLease(lease)) } /// Get all `PbLease` @@ -320,14 +337,19 @@ impl LeaseStore { } /// Sync `LeaseRevokeRequest` - fn sync_lease_revoke_request( + fn sync_lease_revoke_request( &self, req: &LeaseRevokeRequest, revision: i64, - ) -> Result, ExecuteError> { - let mut ops = Vec::new(); + txn_db: &T, + index: &I, + ) -> Result + where + T: XlineStorageOps + TransactionApi, + I: IndexOperate, + { let mut updates = Vec::new(); - ops.push(WriteOp::DeleteLease(req.id)); + txn_db.write_op(WriteOp::DeleteLease(req.id))?; let del_keys = match self.lease_collection.look_up(req.id) { Some(l) => l.keys(), @@ -336,31 +358,24 @@ impl LeaseStore { if del_keys.is_empty() { let _ignore = self.lease_collection.revoke(req.id); - return Ok(Vec::new()); + return Ok(false); } - let txn_db = self.db.transaction(); - let txn_index = self.index.state(); - for (key, mut sub_revision) in del_keys.iter().zip(0..) { let deleted = - KvStore::delete_keys(&txn_db, &txn_index, key, &[], revision, &mut sub_revision)?; + KvStore::delete_keys(txn_db, index, key, &[], revision, &mut sub_revision)?; KvStore::detach_leases(&deleted, &self.lease_collection); let mut del_event = KvStore::new_deletion_events(revision, deleted); updates.append(&mut del_event); } - txn_db - .commit() - .map_err(|e| ExecuteError::DbError(e.to_string()))?; - txn_index.commit(); - let _ignore = self.lease_collection.revoke(req.id); assert!( self.kv_update_tx.send((revision, updates)).is_ok(), "Failed to send updates to KV watcher" ); - Ok(ops) + + Ok(true) } } @@ -374,18 +389,23 @@ mod test { use super::*; use crate::{ revision_number::RevisionNumberGenerator, - storage::{db::DB, storage_api::XlineStorageOps}, + storage::{ + db::DB, + index::{Index, IndexState}, + storage_api::XlineStorageOps, + }, }; #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn test_lease_storage() -> Result<(), Box> { let db = DB::open(&EngineConfig::Memory)?; + let index = Index::new(); let (lease_store, rev_gen) = init_store(db); let rev_gen_state = rev_gen.state(); let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); - let _ignore1 = exe_and_sync_req(&lease_store, &req1, &rev_gen_state)?; + let _ignore1 = exe_and_sync_req(&lease_store, index.state(), &req1, &rev_gen_state)?; let lo = lease_store.look_up(1).unwrap(); assert_eq!(lo.id(), 1); @@ -399,7 +419,7 @@ mod test { lease_store.lease_collection.detach(1, "key".as_bytes())?; let req2 = RequestWrapper::from(LeaseRevokeRequest { id: 1 }); - let _ignore2 = exe_and_sync_req(&lease_store, &req2, &rev_gen_state)?; + let _ignore2 = exe_and_sync_req(&lease_store, index.state(), &req2, &rev_gen_state)?; assert!(lease_store.look_up(1).is_none()); assert!(lease_store.leases().is_empty()); @@ -407,9 +427,9 @@ mod test { let req4 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 4 }); let req5 = RequestWrapper::from(LeaseRevokeRequest { id: 3 }); let req6 = RequestWrapper::from(LeaseLeasesRequest {}); - let _ignore3 = exe_and_sync_req(&lease_store, &req3, &rev_gen_state)?; - let _ignore4 = exe_and_sync_req(&lease_store, &req4, &rev_gen_state)?; - let resp_1 = exe_and_sync_req(&lease_store, &req6, &rev_gen_state)?; + let _ignore3 = exe_and_sync_req(&lease_store, index.state(), &req3, &rev_gen_state)?; + let _ignore4 = exe_and_sync_req(&lease_store, index.state(), &req4, &rev_gen_state)?; + let resp_1 = exe_and_sync_req(&lease_store, index.state(), &req6, &rev_gen_state)?; let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else { panic!("wrong response type: {resp_1:?}"); @@ -417,8 +437,8 @@ mod test { assert_eq!(leases_1.leases[0].id, 3); assert_eq!(leases_1.leases[1].id, 4); - let _ignore5 = exe_and_sync_req(&lease_store, &req5, &rev_gen_state)?; - let resp_2 = exe_and_sync_req(&lease_store, &req6, &rev_gen_state)?; + let _ignore5 = exe_and_sync_req(&lease_store, index.state(), &req5, &rev_gen_state)?; + let resp_2 = exe_and_sync_req(&lease_store, index.state(), &req6, &rev_gen_state)?; let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else { panic!("wrong response type: {resp_2:?}"); }; @@ -430,7 +450,9 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_lease_sync() -> Result<(), Box> { let db = DB::open(&EngineConfig::Memory)?; - let (lease_store, rev_gen) = init_store(db); + let txn = db.transaction(); + let index = Index::new(); + let (lease_store, rev_gen) = init_store(Arc::clone(&db)); let rev_gen_state = rev_gen.state(); let wait_duration = Duration::from_millis(1); @@ -444,7 +466,7 @@ mod test { "the future should block until the lease is synced" ); - let (_ignore, ops) = lease_store.after_sync(&req1, &rev_gen_state)?; + let (_ignore, ops) = lease_store.after_sync(&req1, &rev_gen_state, &txn, &index)?; lease_store.db.write_ops(ops)?; lease_store.mark_lease_synced(&req1); @@ -465,7 +487,7 @@ mod test { "the future should block until the lease is synced" ); - let (_ignore, ops) = lease_store.after_sync(&req2, &rev_gen_state)?; + let (_ignore, ops) = lease_store.after_sync(&req2, &rev_gen_state, &txn, &index)?; lease_store.db.write_ops(ops)?; lease_store.mark_lease_synced(&req2); @@ -483,11 +505,12 @@ mod test { #[abort_on_panic] async fn test_recover() -> Result<(), ExecuteError> { let db = DB::open(&EngineConfig::Memory)?; + let index = Index::new(); let (store, rev_gen) = init_store(Arc::clone(&db)); let rev_gen_state = rev_gen.state(); let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); - let _ignore1 = exe_and_sync_req(&store, &req1, &rev_gen_state)?; + let _ignore1 = exe_and_sync_req(&store, index.state(), &req1, &rev_gen_state)?; store.lease_collection.attach(1, "key".into())?; let (new_store, _) = init_store(db); @@ -509,21 +532,24 @@ mod test { let lease_collection = Arc::new(LeaseCollection::new(0)); let (kv_update_tx, _) = flume::bounded(1); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); - let index = Arc::new(Index::new()); ( - LeaseStore::new(lease_collection, header_gen, db, index, kv_update_tx, true), + LeaseStore::new(lease_collection, header_gen, db, kv_update_tx, true), RevisionNumberGenerator::new(1), ) } fn exe_and_sync_req( ls: &LeaseStore, + index: IndexState, req: &RequestWrapper, rev_gen: &RevisionNumberGeneratorState<'_>, ) -> Result { let cmd_res = ls.execute(req)?; - let (_ignore, ops) = ls.after_sync(req, rev_gen)?; - ls.db.write_ops(ops)?; + let txn = ls.db.transaction(); + let (_ignore, _ops) = ls.after_sync(req, rev_gen, &txn, &index)?; + txn.commit() + .map_err(|e| ExecuteError::DbError(e.to_string()))?; + index.commit(); rev_gen.commit(); Ok(cmd_res.into_inner()) } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 6b4d31d24..7eec178ae 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -24,8 +24,8 @@ futures-util = { version = "0.3", features = ["channel", "io", "sink"] } getrandom = { version = "0.2", default-features = false, features = ["js", "rdrand", "std"] } libc = { version = "0.2", features = ["extra_traits"] } log = { version = "0.4", default-features = false, features = ["std"] } -madsim-tokio = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic", default-features = false, features = ["fs", "io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync", "time"] } -madsim-tonic = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic", default-features = false, features = ["tls"] } +madsim-tokio = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream", default-features = false, features = ["fs", "io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync", "time"] } +madsim-tonic = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream", default-features = false, features = ["tls"] } memchr = { version = "2" } num-traits = { version = "0.2", default-features = false, features = ["i128", "std"] } opentelemetry_sdk = { version = "0.22", features = ["metrics", "rt-tokio"] }