From 5415aa37deac22efc0b98cc20ecc371aa0a5755f Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 27 May 2024 22:26:54 +0800 Subject: [PATCH 01/19] fix: check leader transfer in lease keep alive Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/curp_node.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index 2f56ee520..b1d3929d1 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -380,6 +380,9 @@ impl, RC: RoleChange> CurpNode { ) -> Result { pin_mut!(req_stream); while let Some(req) = req_stream.next().await { + // NOTE: The leader may shutdown itself in configuration change. + // We must first check this situation. + self.curp.check_leader_transfer()?; if self.curp.is_shutdown() { return Err(CurpError::shutting_down()); } From 40e55f2fe3cbd648fc170c5b29748f08db292b9c Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 7 Aug 2024 10:06:02 +0800 Subject: [PATCH 02/19] fix: update madsim to fix stream early close issue Ref: https://github.com/madsim-rs/madsim/pull/218 Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- Cargo.lock | 132 ++++++++++++++++++++++++++++---------- Cargo.toml | 8 +-- workspace-hack/Cargo.toml | 4 +- 3 files changed, 105 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c169ad2af..6bcadb425 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -672,9 +672,9 @@ dependencies = [ "futures", "indexmap 2.2.6", "itertools 0.13.0", - "madsim", - "madsim-tokio", - "madsim-tonic", + "madsim 0.2.30", + "madsim-tokio 0.2.28", + "madsim-tonic 0.4.2+0.10.0", "madsim-tonic-build", "mockall", "once_cell", @@ -721,7 +721,7 @@ dependencies = [ "curp-external-api", "engine", "itertools 0.13.0", - "madsim-tokio", + "madsim-tokio 0.2.28", "prost", "serde", "thiserror", @@ -928,7 +928,7 @@ dependencies = [ "bincode", "bytes", "clippy-utilities", - "madsim-tokio", + "madsim-tokio 0.2.28", "opentelemetry 0.21.0", "parking_lot", "rocksdb", @@ -1559,8 +1559,8 @@ dependencies = [ [[package]] name = "madsim" -version = "0.2.27" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +version = "0.2.30" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "ahash", "async-channel", @@ -1572,7 +1572,7 @@ dependencies = [ "futures-util", "lazy_static", "libc", - "madsim-macros", + "madsim-macros 0.2.12 (git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic)", "naive-timer", "panic-message", "rand", @@ -1587,10 +1587,51 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "madsim" +version = "0.2.30" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" +dependencies = [ + "ahash", + "async-channel", + "async-stream", + "async-task", + "bincode", + "bytes", + "downcast-rs", + "futures-util", + "lazy_static", + "libc", + "madsim-macros 0.2.12 (git+https://github.com/bsbds/madsim.git?branch=fix-client-stream)", + "naive-timer", + "panic-message", + "rand", + "rand_xoshiro", + "rustversion", + "serde", + "spin", + "tokio", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "madsim-macros" +version = "0.2.12" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" +dependencies = [ + "darling 0.14.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "madsim-macros" version = "0.2.12" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "darling 0.14.4", "proc-macro2", @@ -1600,23 +1641,48 @@ dependencies = [ [[package]] name = "madsim-tokio" -version = "0.2.25" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +version = "0.2.28" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" +dependencies = [ + "madsim 0.2.27", + "spin", + "tokio", +] + +[[package]] +name = "madsim-tokio" +version = "0.2.28" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ - "madsim", + "madsim 0.2.30", "spin", "tokio", ] [[package]] name = "madsim-tonic" -version = "0.4.2+0.11.0" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +version = "0.4.2+0.10.0" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" +dependencies = [ + "async-stream", + "chrono", + "futures-util", + "madsim 0.2.30", + "tokio", + "tonic", + "tower", + "tracing", +] + +[[package]] +name = "madsim-tonic" +version = "0.4.2+0.10.0" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "async-stream", "chrono", "futures-util", - "madsim", + "madsim 0.2.27", "tokio", "tonic", "tower", @@ -1625,8 +1691,8 @@ dependencies = [ [[package]] name = "madsim-tonic-build" -version = "0.4.3+0.11.0" -source = "git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic#4df254ae43fe7921a8403873460005379ccb8247" +version = "0.4.3+0.10.0" +source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ "prettyplease", "proc-macro2", @@ -2747,9 +2813,9 @@ dependencies = [ "engine", "futures", "itertools 0.13.0", - "madsim", - "madsim-tokio", - "madsim-tonic", + "madsim 0.2.30", + "madsim-tokio 0.2.28", + "madsim-tonic 0.4.2+0.10.0", "madsim-tonic-build", "parking_lot", "prost", @@ -3075,7 +3141,7 @@ version = "0.1.12" source = "git+https://github.com/madsim-rs/tokio.git?rev=ab251ad#ab251ad1fae8e16d9a1df74e301dbf3ed9d4d3af" dependencies = [ "futures-core", - "madsim-tokio", + "madsim-tokio 0.2.28", "pin-project-lite", ] @@ -3420,8 +3486,8 @@ dependencies = [ "event-listener", "futures", "getset", - "madsim-tokio", - "madsim-tonic", + "madsim-tokio 0.2.28", + "madsim-tonic 0.4.2+0.10.0", "opentelemetry 0.22.0", "opentelemetry-jaeger", "opentelemetry-otlp", @@ -3785,8 +3851,8 @@ dependencies = [ "itertools 0.12.1", "libc", "log", - "madsim-tokio", - "madsim-tonic", + "madsim-tokio 0.2.25", + "madsim-tonic 0.4.2+0.11.0", "memchr", "num-traits", "opentelemetry_sdk 0.22.1", @@ -3852,8 +3918,8 @@ dependencies = [ "itertools 0.13.0", "jsonwebtoken", "log", - "madsim-tokio", - "madsim-tonic", + "madsim-tokio 0.2.28", + "madsim-tonic 0.4.2+0.10.0", "madsim-tonic-build", "merged_range", "mockall", @@ -3903,8 +3969,8 @@ dependencies = [ "futures", "getrandom", "http", - "madsim-tokio", - "madsim-tonic", + "madsim-tokio 0.2.28", + "madsim-tonic 0.4.2+0.10.0", "rand", "test-macros", "thiserror", @@ -3920,8 +3986,8 @@ name = "xline-test-utils" version = "0.1.0" dependencies = [ "futures", - "madsim-tokio", - "madsim-tonic", + "madsim-tokio 0.2.28", + "madsim-tonic 0.4.2+0.10.0", "rand", "utils", "workspace-hack", @@ -3937,7 +4003,7 @@ dependencies = [ "curp", "curp-external-api", "itertools 0.13.0", - "madsim-tonic", + "madsim-tonic 0.4.2+0.10.0", "madsim-tonic-build", "prost", "serde", @@ -3954,7 +4020,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "madsim-tonic", + "madsim-tonic 0.4.2+0.10.0", "regex", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index e0220e105..cebe177e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ ignored = ["prost", "workspace-hack"] [patch.crates-io] # This branch update the tonic version for madsim. We should switch to the original etcd-client crate when new version release. -madsim = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" } -madsim-tonic = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" } -madsim-tonic-build = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" } -madsim-tokio = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" } +madsim = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" } +madsim-tonic = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" } +madsim-tonic-build = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" } +madsim-tokio = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 6b4d31d24..7eec178ae 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -24,8 +24,8 @@ futures-util = { version = "0.3", features = ["channel", "io", "sink"] } getrandom = { version = "0.2", default-features = false, features = ["js", "rdrand", "std"] } libc = { version = "0.2", features = ["extra_traits"] } log = { version = "0.4", default-features = false, features = ["std"] } -madsim-tokio = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic", default-features = false, features = ["fs", "io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync", "time"] } -madsim-tonic = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic", default-features = false, features = ["tls"] } +madsim-tokio = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream", default-features = false, features = ["fs", "io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync", "time"] } +madsim-tonic = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream", default-features = false, features = ["tls"] } memchr = { version = "2" } num-traits = { version = "0.2", default-features = false, features = ["i128", "std"] } opentelemetry_sdk = { version = "0.22", features = ["metrics", "rt-tokio"] } From 3577254c65670e0860e4cf0f50bf9d2c9e38b8a9 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 19 Aug 2024 15:40:18 +0800 Subject: [PATCH 03/19] fix: Cargo.lock Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- Cargo.lock | 114 +++++++++++------------------------------------------ 1 file changed, 24 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6bcadb425..7240258cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -672,9 +672,9 @@ dependencies = [ "futures", "indexmap 2.2.6", "itertools 0.13.0", - "madsim 0.2.30", - "madsim-tokio 0.2.28", - "madsim-tonic 0.4.2+0.10.0", + "madsim", + "madsim-tokio", + "madsim-tonic", "madsim-tonic-build", "mockall", "once_cell", @@ -721,7 +721,7 @@ dependencies = [ "curp-external-api", "engine", "itertools 0.13.0", - "madsim-tokio 0.2.28", + "madsim-tokio", "prost", "serde", "thiserror", @@ -928,7 +928,7 @@ dependencies = [ "bincode", "bytes", "clippy-utilities", - "madsim-tokio 0.2.28", + "madsim-tokio", "opentelemetry 0.21.0", "parking_lot", "rocksdb", @@ -1572,37 +1572,7 @@ dependencies = [ "futures-util", "lazy_static", "libc", - "madsim-macros 0.2.12 (git+https://github.com/Phoenix500526/madsim.git?branch=update-tonic)", - "naive-timer", - "panic-message", - "rand", - "rand_xoshiro", - "rustversion", - "serde", - "spin", - "tokio", - "tokio-util", - "toml", - "tracing", - "tracing-subscriber", -] - -[[package]] -name = "madsim" -version = "0.2.30" -source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" -dependencies = [ - "ahash", - "async-channel", - "async-stream", - "async-task", - "bincode", - "bytes", - "downcast-rs", - "futures-util", - "lazy_static", - "libc", - "madsim-macros 0.2.12 (git+https://github.com/bsbds/madsim.git?branch=fix-client-stream)", + "madsim-macros", "naive-timer", "panic-message", "rand", @@ -1628,52 +1598,16 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "madsim-macros" -version = "0.2.12" -source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" -dependencies = [ - "darling 0.14.4", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "madsim-tokio" version = "0.2.28" source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" dependencies = [ - "madsim 0.2.27", + "madsim", "spin", "tokio", ] -[[package]] -name = "madsim-tokio" -version = "0.2.28" -source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" -dependencies = [ - "madsim 0.2.30", - "spin", - "tokio", -] - -[[package]] -name = "madsim-tonic" -version = "0.4.2+0.10.0" -source = "git+https://github.com/bsbds/madsim.git?branch=fix-client-stream#831b320ed47a1c202646fd25e879a0ad61cd374d" -dependencies = [ - "async-stream", - "chrono", - "futures-util", - "madsim 0.2.30", - "tokio", - "tonic", - "tower", - "tracing", -] - [[package]] name = "madsim-tonic" version = "0.4.2+0.10.0" @@ -1682,7 +1616,7 @@ dependencies = [ "async-stream", "chrono", "futures-util", - "madsim 0.2.27", + "madsim", "tokio", "tonic", "tower", @@ -2813,9 +2747,9 @@ dependencies = [ "engine", "futures", "itertools 0.13.0", - "madsim 0.2.30", - "madsim-tokio 0.2.28", - "madsim-tonic 0.4.2+0.10.0", + "madsim", + "madsim-tokio", + "madsim-tonic", "madsim-tonic-build", "parking_lot", "prost", @@ -3141,7 +3075,7 @@ version = "0.1.12" source = "git+https://github.com/madsim-rs/tokio.git?rev=ab251ad#ab251ad1fae8e16d9a1df74e301dbf3ed9d4d3af" dependencies = [ "futures-core", - "madsim-tokio 0.2.28", + "madsim-tokio", "pin-project-lite", ] @@ -3486,8 +3420,8 @@ dependencies = [ "event-listener", "futures", "getset", - "madsim-tokio 0.2.28", - "madsim-tonic 0.4.2+0.10.0", + "madsim-tokio", + "madsim-tonic", "opentelemetry 0.22.0", "opentelemetry-jaeger", "opentelemetry-otlp", @@ -3851,8 +3785,8 @@ dependencies = [ "itertools 0.12.1", "libc", "log", - "madsim-tokio 0.2.25", - "madsim-tonic 0.4.2+0.11.0", + "madsim-tokio", + "madsim-tonic", "memchr", "num-traits", "opentelemetry_sdk 0.22.1", @@ -3918,8 +3852,8 @@ dependencies = [ "itertools 0.13.0", "jsonwebtoken", "log", - "madsim-tokio 0.2.28", - "madsim-tonic 0.4.2+0.10.0", + "madsim-tokio", + "madsim-tonic", "madsim-tonic-build", "merged_range", "mockall", @@ -3969,8 +3903,8 @@ dependencies = [ "futures", "getrandom", "http", - "madsim-tokio 0.2.28", - "madsim-tonic 0.4.2+0.10.0", + "madsim-tokio", + "madsim-tonic", "rand", "test-macros", "thiserror", @@ -3986,8 +3920,8 @@ name = "xline-test-utils" version = "0.1.0" dependencies = [ "futures", - "madsim-tokio 0.2.28", - "madsim-tonic 0.4.2+0.10.0", + "madsim-tokio", + "madsim-tonic", "rand", "utils", "workspace-hack", @@ -4003,7 +3937,7 @@ dependencies = [ "curp", "curp-external-api", "itertools 0.13.0", - "madsim-tonic 0.4.2+0.10.0", + "madsim-tonic", "madsim-tonic-build", "prost", "serde", @@ -4020,7 +3954,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "madsim-tonic 0.4.2+0.10.0", + "madsim-tonic", "regex", "serde", "serde_json", From 4c99c46cd9e466cb57a67213004a2863960f6ce6 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Fri, 23 Aug 2024 09:05:13 +0800 Subject: [PATCH 04/19] fix: madsim tests Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> fix: compaction in madsim Run the compaction task synchronously in madsim, please refer to `compact_bg_task` for the madsim compaction code Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> fix: not waiting for client id in madsim tests Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> fix: set leader in simulation xline group Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/mod.rs | 12 ++++- crates/simulation/src/xline_group.rs | 2 +- .../tests/it/curp/server_recovery.rs | 18 +++----- crates/xline/src/storage/kv_store.rs | 46 +++++++++++++------ 4 files changed, 51 insertions(+), 27 deletions(-) diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 378b432d8..739fd9674 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -451,15 +451,23 @@ impl ClientBuilder { impl ClientApi + Send + Sync + 'static, Arc, ), - tonic::transport::Error, + tonic::Status, > { - let state = Arc::new(self.init_state_builder().build().await?); + let state = Arc::new( + self.init_state_builder() + .build() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?, + ); + let client = Retry::new( Unary::new(Arc::clone(&state), self.init_unary_config()), self.init_retry_config(), Some(self.spawn_bg_tasks(Arc::clone(&state))), ); let client_id = state.clone_client_id(); + self.wait_for_client_id(state).await?; + Ok((client, client_id)) } } diff --git a/crates/simulation/src/xline_group.rs b/crates/simulation/src/xline_group.rs index eb97322d2..0f61892b5 100644 --- a/crates/simulation/src/xline_group.rs +++ b/crates/simulation/src/xline_group.rs @@ -55,7 +55,7 @@ impl XlineGroup { vec!["0.0.0.0:2379".to_owned()], vec![format!("192.168.1.{}:2379", i + 1)], all.clone(), - false, + i == 0, CurpConfig::default(), ClientConfig::default(), ServerTimeout::default(), diff --git a/crates/simulation/tests/it/curp/server_recovery.rs b/crates/simulation/tests/it/curp/server_recovery.rs index e14abd406..7e8a88ccf 100644 --- a/crates/simulation/tests/it/curp/server_recovery.rs +++ b/crates/simulation/tests/it/curp/server_recovery.rs @@ -116,9 +116,15 @@ async fn leader_and_follower_both_crash_and_recovery() { let follower = *group.nodes.keys().find(|&id| id != &leader).unwrap(); group.crash(follower).await; + let _wait_up = client + .propose(TestCommand::new_get(vec![0]), true) + .await + .unwrap() + .unwrap(); + assert_eq!( client - .propose(TestCommand::new_put(vec![0], 0), true) + .propose(TestCommand::new_put(vec![0], 0), false) .await .unwrap() .unwrap() @@ -126,16 +132,6 @@ async fn leader_and_follower_both_crash_and_recovery() { .values, Vec::::new(), ); - assert_eq!( - client - .propose(TestCommand::new_get(vec![0]), true) - .await - .unwrap() - .unwrap() - .0 - .values, - vec![0] - ); group.crash(leader).await; diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index 44a0cac04..7b92043d9 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -1124,23 +1124,43 @@ impl KvStore { let ops = vec![WriteOp::PutScheduledCompactRevision(revision)]; // TODO: Remove the physical process logic here. It's better to move into the // KvServer - #[cfg_attr(madsim, allow(unused))] - let (event, listener) = if req.physical { - let event = Arc::new(event_listener::Event::new()); - let listener = event.listen(); - (Some(event), Some(listener)) - } else { - (None, None) - }; // TODO: sync compaction task - if let Err(e) = self.compact_task_tx.send((revision, event)) { - panic!("the compactor exited unexpectedly: {e:?}"); - } // FIXME: madsim is single threaded, we cannot use synchronous wait here #[cfg(not(madsim))] - if let Some(listener) = listener { - listener.wait(); + { + let (event, listener) = if req.physical { + let event = Arc::new(event_listener::Event::new()); + let listener = event.listen(); + (Some(event), Some(listener)) + } else { + (None, None) + }; + if let Err(e) = self.compact_task_tx.send((revision, event)) { + panic!("the compactor exited unexpectedly: {e:?}"); + } + if let Some(listener) = listener { + listener.wait(); + } + } + #[cfg(madsim)] + { + let index = self.index(); + let target_revisions = index + .compact(revision) + .into_iter() + .map(|key_rev| key_rev.as_revision().encode_to_vec()) + .collect::>>(); + // Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance? + for revision_chunk in target_revisions.chunks(1000) { + if let Err(e) = self.compact(revision_chunk) { + panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); + } + } + if let Err(e) = self.compact_finished(revision) { + panic!("failed to set finished compact revision {revision:?} due to {e}"); + } } + self.inner.db.write_ops(ops)?; let resp = to_execute From fdb5bb29f089586f140b8a662c17d8903ad0d6e9 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 8 Jul 2024 19:12:43 +0800 Subject: [PATCH 05/19] test: rewrite tests for curp client Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/tests.rs | 204 ++++++++++++++------------------ 1 file changed, 90 insertions(+), 114 deletions(-) diff --git a/crates/curp/src/client/tests.rs b/crates/curp/src/client/tests.rs index 32c177183..9db79c303 100644 --- a/crates/curp/src/client/tests.rs +++ b/crates/curp/src/client/tests.rs @@ -1,10 +1,10 @@ use std::{ collections::HashMap, - sync::{atomic::AtomicU64, Arc}, - time::Duration, + sync::{atomic::AtomicU64, Arc, Mutex}, + time::{Duration, Instant}, }; -use curp_test_utils::test_cmd::{TestCommand, TestCommandResult}; +use curp_test_utils::test_cmd::{LogIndexResult, TestCommand, TestCommandResult}; use futures::{future::BoxFuture, Stream}; #[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; @@ -19,7 +19,10 @@ use super::{ unary::{Unary, UnaryConfig}, }; use crate::{ - client::ClientApi, + client::{ + retry::{Retry, RetryConfig}, + ClientApi, + }, members::ServerId, rpc::{ connect::{ConnectApi, MockConnectApi}, @@ -257,7 +260,8 @@ async fn test_unary_fetch_clusters_linearizable_failed() { }); let unary = init_unary_client(connects, None, None, 0, 0, None); let res = unary.fetch_cluster(true).await.unwrap_err(); - // only server(0, 1)'s responses are valid, less than majority quorum(3), got a mocked RpcTransport to retry + // only server(0, 1)'s responses are valid, less than majority quorum(3), got a + // mocked RpcTransport to retry assert_eq!(res, CurpError::RpcTransport(())); } @@ -276,79 +280,71 @@ fn build_synced_response() -> OpResponse { // TODO: rewrite this tests #[cfg(ignore)] +fn build_empty_response() -> OpResponse { + OpResponse { op: None } +} + #[traced_test] #[tokio::test] async fn test_unary_propose_fast_path_works() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - let resp = match id { - 0 => ProposeResponse::new_result::( - &Ok(TestCommandResult::default()), - false, - ), - 1 | 2 | 3 => ProposeResponse::new_empty(), - 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), + assert_eq!(id, 0, "followers should not receive propose"); + let resp = async_stream::stream! { + yield Ok(build_propose_response(false)); + yield Ok(build_synced_response()); }; - Ok(tonic::Response::new(resp)) - }); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - std::thread::sleep(Duration::from_millis(100)); - Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< - TestCommand, - >( - Ok(TestCommandResult::default()), - Some(Ok(1.into())), - ))) + Ok(tonic::Response::new(Box::new(resp))) }); + conn.expect_record().return_once(move |_req, _timeout| { + let resp = match id { + 0 => unreachable!("leader should not receive record request"), + 1 | 2 | 3 => RecordResponse { conflict: false }, + 4 => RecordResponse { conflict: true }, + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let res = unary - .propose(&TestCommand::default(), None, true) + .propose(&TestCommand::new_put(vec![1], 1), None, true) .await .unwrap() .unwrap(); assert_eq!(res, (TestCommandResult::default(), None)); } -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_unary_propose_slow_path_works() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - let resp = match id { - 0 => ProposeResponse::new_result::( - &Ok(TestCommandResult::default()), - false, - ), - 1 | 2 | 3 => ProposeResponse::new_empty(), - 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), + assert_eq!(id, 0, "followers should not receive propose"); + let resp = async_stream::stream! { + yield Ok(build_propose_response(false)); + tokio::time::sleep(Duration::from_millis(100)).await; + yield Ok(build_synced_response()); }; - Ok(tonic::Response::new(resp)) - }); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - std::thread::sleep(Duration::from_millis(100)); - Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< - TestCommand, - >( - Ok(TestCommandResult::default()), - Some(Ok(1.into())), - ))) + Ok(tonic::Response::new(Box::new(resp))) }); + conn.expect_record().return_once(move |_req, _timeout| { + let resp = match id { + 0 => unreachable!("leader should not receive record request"), + 1 | 2 | 3 => RecordResponse { conflict: false }, + 4 => RecordResponse { conflict: true }, + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); + let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let start_at = Instant::now(); let res = unary - .propose(&TestCommand::default(), None, false) + .propose(&TestCommand::new_put(vec![1], 1), None, false) .await .unwrap() .unwrap(); @@ -362,42 +358,36 @@ async fn test_unary_propose_slow_path_works() { ); } -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_unary_propose_fast_path_fallback_slow_path() { + // record how many times `handle_propose` was invoked. let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - // insufficient quorum to force slow path. - let resp = match id { - 0 => ProposeResponse::new_result::( - &Ok(TestCommandResult::default()), - false, - ), - 1 | 2 => ProposeResponse::new_empty(), - 3 | 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), + assert_eq!(id, 0, "followers should not receive propose"); + let resp = async_stream::stream! { + yield Ok(build_propose_response(false)); + tokio::time::sleep(Duration::from_millis(100)).await; + yield Ok(build_synced_response()); }; - Ok(tonic::Response::new(resp)) - }); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - std::thread::sleep(Duration::from_millis(100)); - Ok(tonic::Response::new(WaitSyncedResponse::new_from_result::< - TestCommand, - >( - Ok(TestCommandResult::default()), - Some(Ok(1.into())), - ))) + Ok(tonic::Response::new(Box::new(resp))) }); + // insufficient quorum + conn.expect_record().return_once(move |_req, _timeout| { + let resp = match id { + 0 => unreachable!("leader should not receive record request"), + 1 | 2 => RecordResponse { conflict: false }, + 3 | 4 => RecordResponse { conflict: true }, + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let start_at = Instant::now(); let res = unary - .propose(&TestCommand::default(), None, true) + .propose(&TestCommand::new_put(vec![1], 1), None, true) .await .unwrap() .unwrap(); @@ -405,14 +395,13 @@ async fn test_unary_propose_fast_path_fallback_slow_path() { start_at.elapsed() > Duration::from_millis(100), "slow round takes at least 100ms" ); + // indicate that we actually run out of fast round assert_eq!( res, (TestCommandResult::default(), Some(LogIndexResult::from(1))) ); } -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_unary_propose_return_early_err() { @@ -428,26 +417,22 @@ async fn test_unary_propose_return_early_err() { assert!(early_err.should_abort_fast_round()); // record how many times rpc was invoked. let counter = Arc::new(Mutex::new(0)); - let connects = init_mocked_connects(5, |id, conn| { + let connects = init_mocked_connects(5, |_id, conn| { let err = early_err.clone(); let counter_c = Arc::clone(&counter); - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - counter_c.lock().unwrap().add_assign(1); + *counter_c.lock().unwrap() += 1; Err(err) }); + let err = early_err.clone(); - let counter_c = Arc::clone(&counter); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - counter_c.lock().unwrap().add_assign(1); - Err(err) - }); + conn.expect_record() + .return_once(move |_req, _timeout| Err(err)); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let err = unary - .propose(&TestCommand::default(), None, true) + .propose(&TestCommand::new_put(vec![1], 1), None, true) .await .unwrap_err(); assert_eq!(err, early_err); @@ -457,8 +442,6 @@ async fn test_unary_propose_return_early_err() { // Tests for retry layer -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_retry_propose_return_no_retry_error() { @@ -471,22 +454,18 @@ async fn test_retry_propose_return_no_retry_error() { ] { // record how many times rpc was invoked. let counter = Arc::new(Mutex::new(0)); - let connects = init_mocked_connects(5, |id, conn| { + let connects = init_mocked_connects(5, |_id, conn| { let err = early_err.clone(); let counter_c = Arc::clone(&counter); - conn.expect_propose() + conn.expect_propose_stream() .return_once(move |_req, _token, _timeout| { - counter_c.lock().unwrap().add_assign(1); + *counter_c.lock().unwrap() += 1; Err(err) }); + let err = early_err.clone(); - let counter_c = Arc::clone(&counter); - conn.expect_wait_synced() - .return_once(move |_req, _timeout| { - assert!(id == 0, "wait synced should send to leader"); - counter_c.lock().unwrap().add_assign(1); - Err(err) - }); + conn.expect_record() + .return_once(move |_req, _timeout| Err(err)); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let retry = Retry::new( @@ -495,27 +474,22 @@ async fn test_retry_propose_return_no_retry_error() { None, ); let err = retry - .propose(&TestCommand::default(), None, false) + .propose(&TestCommand::new_put(vec![1], 1), None, false) .await .unwrap_err(); assert_eq!(err.message(), tonic::Status::from(early_err).message()); - // fast path + slow path = 2 - assert_eq!(*counter.lock().unwrap(), 2); + assert_eq!(*counter.lock().unwrap(), 1); } } -// TODO: rewrite this tests -#[cfg(ignore)] #[traced_test] #[tokio::test] async fn test_retry_propose_return_retry_error() { for early_err in [ - CurpError::key_conflict(), CurpError::RpcTransport(()), CurpError::internal("No reason"), ] { let connects = init_mocked_connects(5, |id, conn| { - let err = early_err.clone(); conn.expect_fetch_cluster() .returning(move |_req, _timeout| { Ok(tonic::Response::new(FetchClusterResponse { @@ -532,14 +506,16 @@ async fn test_retry_propose_return_retry_error() { cluster_version: 1, })) }); - conn.expect_propose() - .returning(move |_req, _token, _timeout| Err(err.clone())); if id == 0 { let err = early_err.clone(); - conn.expect_wait_synced() - .times(5) // wait synced should be retried in 5 times on leader - .returning(move |_req, _timeout| Err(err.clone())); + conn.expect_propose_stream() + .times(5) // propose should be retried in 5 times on leader + .returning(move |_req, _token, _timeout| Err(err.clone())); } + + let err = early_err.clone(); + conn.expect_record() + .returning(move |_req, _timeout| Err(err.clone())); }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let retry = Retry::new( @@ -548,7 +524,7 @@ async fn test_retry_propose_return_retry_error() { None, ); let err = retry - .propose(&TestCommand::default(), None, false) + .propose(&TestCommand::new_put(vec![1], 1), None, false) .await .unwrap_err(); assert!(err.message().contains("request timeout")); From debbdabe75564412a701ad476f9be6258b0ab4e0 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 15 Aug 2024 15:07:22 +0800 Subject: [PATCH 06/19] fix: exe_exactly_once_on_leader will only test on leader Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/tests/it/common/curp_group.rs | 4 ++++ crates/curp/tests/it/server.rs | 10 ++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index e2dbaab8d..e95694aa8 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -318,6 +318,10 @@ impl CurpGroup { &self.nodes[id] } + pub fn get_node_mut(&mut self, id: &ServerId) -> &mut CurpNode { + self.nodes.get_mut(id).unwrap() + } + pub async fn new_client(&self) -> impl ClientApi { let addrs = self.all_addrs().cloned().collect(); ClientBuilder::new(ClientConfig::default(), true) diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index 04c318e8f..ebd400373 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -93,14 +93,12 @@ async fn exe_exactly_once_on_leader() { let er = client.propose(&cmd, None, true).await.unwrap().unwrap().0; assert_eq!(er, TestCommandResult::new(vec![], vec![])); + let leader = group.get_leader().await.0; { - let mut exe_futs = group - .exe_rxs() - .map(|rx| rx.recv()) - .collect::>(); - let (cmd1, er) = exe_futs.next().await.unwrap().unwrap(); + let exec_rx = &mut group.get_node_mut(&leader).exe_rx; + let (cmd1, er) = exec_rx.recv().await.unwrap(); assert!( - tokio::time::timeout(Duration::from_millis(100), exe_futs.next()) + tokio::time::timeout(Duration::from_millis(100), exec_rx.recv()) .await .is_err() ); From ba555d5af32f0e15fbe104a0cad0d9c455e0e960 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 15 Aug 2024 15:10:02 +0800 Subject: [PATCH 07/19] fix: concurrent_cmd_order_should_have_correct_revision timeout due to serial execution Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/tests/it/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index ebd400373..9eeb5878a 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -260,7 +260,7 @@ async fn concurrent_cmd_order_should_have_correct_revision() { let sample_range = 1..=100; for i in sample_range.clone() { - let rand_dur = Duration::from_millis(thread_rng().gen_range(0..500).numeric_cast()); + let rand_dur = Duration::from_millis(thread_rng().gen_range(0..50).numeric_cast()); let _er = client .propose( &TestCommand::new_put(vec![i], i).set_as_dur(rand_dur), From a26b114325cf3f9be3535fcaca3e81e6236610b8 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 15 Aug 2024 17:42:55 +0800 Subject: [PATCH 08/19] fix: execute early before after sync Prevents updating the state early before speculative execution Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/xline/src/storage/kv_store.rs | 32 ++++++++++++++-------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index 7b92043d9..e69a7d709 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -966,6 +966,17 @@ impl KvStore { { let (new_rev, prev_rev_opt) = index.register_revision(req.key.clone(), revision, *sub_revision); + let execute_resp = to_execute + .then(|| { + self.generate_put_resp( + req, + txn_db, + prev_rev_opt.map(|key_rev| key_rev.as_revision()), + ) + .map(|(resp, _)| resp.into()) + }) + .transpose()?; + let mut kv = KeyValue { key: req.key.clone(), value: req.value.clone(), @@ -1009,17 +1020,6 @@ impl KvStore { prev_kv: None, }]; - let execute_resp = to_execute - .then(|| { - self.generate_put_resp( - req, - txn_db, - prev_rev_opt.map(|key_rev| key_rev.as_revision()), - ) - .map(|(resp, _)| resp.into()) - }) - .transpose()?; - Ok((events, execute_resp)) } @@ -1036,6 +1036,11 @@ impl KvStore { where T: XlineStorageOps, { + let execute_resp = to_execute + .then(|| self.generate_delete_range_resp(req, txn_db, index)) + .transpose()? + .map(Into::into); + let keys = Self::delete_keys( txn_db, index, @@ -1047,11 +1052,6 @@ impl KvStore { Self::detach_leases(&keys, &self.lease_collection); - let execute_resp = to_execute - .then(|| self.generate_delete_range_resp(req, txn_db, index)) - .transpose()? - .map(Into::into); - Ok((Self::new_deletion_events(revision, keys), execute_resp)) } From b2caa6ea3e1035c54e92d05e049349d82e8a25d8 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 15 Aug 2024 19:22:10 +0800 Subject: [PATCH 09/19] refactor: disable fast path completely in etcd competible server Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/xline/src/server/auth_server.rs | 45 +++++++++++-------------- crates/xline/src/server/kv_server.rs | 2 +- crates/xline/src/server/lease_server.rs | 12 +++---- crates/xline/src/server/lock_server.rs | 13 ++++--- crates/xline/src/server/maintenance.rs | 6 ++-- 5 files changed, 32 insertions(+), 46 deletions(-) diff --git a/crates/xline/src/server/auth_server.rs b/crates/xline/src/server/auth_server.rs index 33a0949ef..bd285d926 100644 --- a/crates/xline/src/server/auth_server.rs +++ b/crates/xline/src/server/auth_server.rs @@ -51,7 +51,6 @@ impl AuthServer { async fn propose( &self, request: tonic::Request, - use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into, @@ -59,7 +58,7 @@ impl AuthServer { let auth_info = self.auth_store.try_get_auth_info_from_request(&request)?; let request = request.into_inner().into(); let cmd = Command::new_with_auth_info(request, auth_info); - let res = self.client.propose(&cmd, None, use_fast_path).await??; + let res = self.client.propose(&cmd, None, false).await??; Ok(res) } @@ -67,13 +66,12 @@ impl AuthServer { async fn handle_req( &self, request: tonic::Request, - use_fast_path: bool, ) -> Result, tonic::Status> where Req: Into, Res: From, { - let (cmd_res, sync_res) = self.propose(request, use_fast_path).await?; + let (cmd_res, sync_res) = self.propose(request).await?; let mut res_wrapper = cmd_res.into_inner(); if let Some(sync_res) = sync_res { res_wrapper.update_revision(sync_res.revision()); @@ -89,7 +87,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthEnableRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn auth_disable( @@ -97,7 +95,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthDisableRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn auth_status( @@ -105,8 +103,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthStatusRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn authenticate( @@ -114,7 +111,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthenticateRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_add( @@ -128,7 +125,7 @@ impl Auth for AuthServer { .map_err(|err| tonic::Status::internal(format!("Failed to hash password: {err}")))?; user_add_req.hashed_password = hashed_password; user_add_req.password = String::new(); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_get( @@ -136,8 +133,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserGetRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn user_list( @@ -145,8 +141,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserListRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn user_delete( @@ -154,7 +149,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserDeleteRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_change_password( @@ -167,7 +162,7 @@ impl Auth for AuthServer { .map_err(|err| tonic::Status::internal(format!("Failed to hash password: {err}")))?; user_change_password_req.hashed_password = hashed_password; user_change_password_req.password = String::new(); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_grant_role( @@ -175,7 +170,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserGrantRoleRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn user_revoke_role( @@ -183,7 +178,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthUserRevokeRoleRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn role_add( @@ -192,7 +187,7 @@ impl Auth for AuthServer { ) -> Result, tonic::Status> { debug!("Receive AuthRoleAddRequest {:?}", request); request.get_ref().validation()?; - self.handle_req(request, false).await + self.handle_req(request).await } async fn role_get( @@ -200,8 +195,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthRoleGetRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn role_list( @@ -209,8 +203,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthRoleListRequest {:?}", request); - let is_fast_path = true; - self.handle_req(request, is_fast_path).await + self.handle_req(request).await } async fn role_delete( @@ -218,7 +211,7 @@ impl Auth for AuthServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive AuthRoleDeleteRequest {:?}", request); - self.handle_req(request, false).await + self.handle_req(request).await } async fn role_grant_permission( @@ -230,7 +223,7 @@ impl Auth for AuthServer { request.get_ref() ); request.get_ref().validation()?; - self.handle_req(request, false).await + self.handle_req(request).await } async fn role_revoke_permission( @@ -241,6 +234,6 @@ impl Auth for AuthServer { "Receive AuthRoleRevokePermissionRequest {}", request.get_ref() ); - self.handle_req(request, false).await + self.handle_req(request).await } } diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index 1bdf482c7..7e87064f3 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -258,7 +258,7 @@ impl Kv for KvServer { } else { Either::Right(async {}) }; - let (cmd_res, _sync_res) = self.client.propose(&cmd, None, !physical).await??; + let (cmd_res, _sync_res) = self.client.propose(&cmd, None, false).await??; let resp = cmd_res.into_inner(); if timeout(self.compact_timeout, compact_physical_fut) .await diff --git a/crates/xline/src/server/lease_server.rs b/crates/xline/src/server/lease_server.rs index 931abb015..d528c1c8d 100644 --- a/crates/xline/src/server/lease_server.rs +++ b/crates/xline/src/server/lease_server.rs @@ -119,7 +119,6 @@ impl LeaseServer { async fn propose( &self, request: tonic::Request, - use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into, @@ -127,7 +126,7 @@ impl LeaseServer { let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?; let request = request.into_inner().into(); let cmd = Command::new_with_auth_info(request, auth_info); - let res = self.client.propose(&cmd, None, use_fast_path).await??; + let res = self.client.propose(&cmd, None, false).await??; Ok(res) } @@ -255,8 +254,7 @@ impl Lease for LeaseServer { lease_grant_req.id = self.id_gen.next(); } - let is_fast_path = true; - let (res, sync_res) = self.propose(request, is_fast_path).await?; + let (res, sync_res) = self.propose(request).await?; let mut res: LeaseGrantResponse = res.into_inner().into(); if let Some(sync_res) = sync_res { @@ -276,8 +274,7 @@ impl Lease for LeaseServer { ) -> Result, tonic::Status> { debug!("Receive LeaseRevokeRequest {:?}", request); - let is_fast_path = true; - let (res, sync_res) = self.propose(request, is_fast_path).await?; + let (res, sync_res) = self.propose(request).await?; let mut res: LeaseRevokeResponse = res.into_inner().into(); if let Some(sync_res) = sync_res { @@ -378,8 +375,7 @@ impl Lease for LeaseServer { ) -> Result, tonic::Status> { debug!("Receive LeaseLeasesRequest {:?}", request); - let is_fast_path = true; - let (res, sync_res) = self.propose(request, is_fast_path).await?; + let (res, sync_res) = self.propose(request).await?; let mut res: LeaseLeasesResponse = res.into_inner().into(); if let Some(sync_res) = sync_res { diff --git a/crates/xline/src/server/lock_server.rs b/crates/xline/src/server/lock_server.rs index f5649cb8c..dff302508 100644 --- a/crates/xline/src/server/lock_server.rs +++ b/crates/xline/src/server/lock_server.rs @@ -71,14 +71,13 @@ impl LockServer { &self, request: T, auth_info: Option, - use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into, { let request = request.into(); let cmd = Command::new_with_auth_info(request, auth_info); - let res = self.client.propose(&cmd, None, use_fast_path).await??; + let res = self.client.propose(&cmd, None, false).await??; Ok(res) } @@ -148,7 +147,7 @@ impl LockServer { max_create_revision: rev, ..Default::default() }; - let (cmd_res, _sync_res) = self.propose(get_req, auth_info.cloned(), false).await?; + let (cmd_res, _sync_res) = self.propose(get_req, auth_info.cloned()).await?; let response = Into::::into(cmd_res.into_inner()); let last_key = match response.kvs.first() { Some(kv) => kv.key.clone(), @@ -186,7 +185,7 @@ impl LockServer { key: key.into(), ..Default::default() }; - let (cmd_res, _) = self.propose(del_req, auth_info, true).await?; + let (cmd_res, _) = self.propose(del_req, auth_info).await?; let res = Into::::into(cmd_res.into_inner()); Ok(res.header) } @@ -198,7 +197,7 @@ impl LockServer { ttl: DEFAULT_SESSION_TTL, id: lease_id, }; - let (cmd_res, _) = self.propose(lease_grant_req, auth_info, true).await?; + let (cmd_res, _) = self.propose(lease_grant_req, auth_info).await?; let res = Into::::into(cmd_res.into_inner()); Ok(res.id) } @@ -229,7 +228,7 @@ impl Lock for LockServer { let key = format!("{prefix}{lease_id:x}"); let txn = Self::create_acquire_txn(&prefix, lease_id); - let (cmd_res, sync_res) = self.propose(txn, auth_info.clone(), false).await?; + let (cmd_res, sync_res) = self.propose(txn, auth_info.clone()).await?; let mut txn_res = Into::::into(cmd_res.into_inner()); #[allow(clippy::unwrap_used)] // sync_res always has value when use slow path let my_rev = sync_res.unwrap().revision(); @@ -261,7 +260,7 @@ impl Lock for LockServer { key: key.as_bytes().to_vec(), ..Default::default() }; - let result = self.propose(range_req, auth_info.clone(), true).await; + let result = self.propose(range_req, auth_info.clone()).await; match result { Ok(res) => { let res = Into::::into(res.0.into_inner()); diff --git a/crates/xline/src/server/maintenance.rs b/crates/xline/src/server/maintenance.rs index e8bc522c1..f0ffc01d0 100644 --- a/crates/xline/src/server/maintenance.rs +++ b/crates/xline/src/server/maintenance.rs @@ -84,7 +84,6 @@ impl MaintenanceServer { async fn propose( &self, request: tonic::Request, - use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into + Debug, @@ -92,7 +91,7 @@ impl MaintenanceServer { let auth_info = self.auth_store.try_get_auth_info_from_request(&request)?; let request = request.into_inner().into(); let cmd = Command::new_with_auth_info(request, auth_info); - let res = self.client.propose(&cmd, None, use_fast_path).await??; + let res = self.client.propose(&cmd, None, false).await??; Ok(res) } } @@ -103,8 +102,7 @@ impl Maintenance for MaintenanceServer { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let is_fast_path = true; - let (res, sync_res) = self.propose(request, is_fast_path).await?; + let (res, sync_res) = self.propose(request).await?; let mut res: AlarmResponse = res.into_inner().into(); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); From d2258711b294bb3e7057020f950c6d2a21eefba6 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 20 Aug 2024 09:21:50 +0800 Subject: [PATCH 10/19] fix: use after sync txn and index in lease revoke Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/xline/src/server/command.rs | 10 ++- crates/xline/src/storage/lease_store/mod.rs | 90 +++++++++++++-------- 2 files changed, 64 insertions(+), 36 deletions(-) diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index 423e91739..183433b84 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -295,10 +295,11 @@ impl CommandExecutor { } /// After sync other type of commands - fn after_sync_others( + fn after_sync_others( &self, wrapper: &RequestWrapper, txn_db: &T, + index: &I, general_revision: &RevisionNumberGeneratorState<'_>, auth_revision: &RevisionNumberGeneratorState<'_>, to_execute: bool, @@ -311,6 +312,7 @@ impl CommandExecutor { > where T: XlineStorageOps + TransactionApi, + I: IndexOperate, { let er = to_execute .then(|| match wrapper.backend() { @@ -323,7 +325,10 @@ impl CommandExecutor { let (asr, wr_ops) = match wrapper.backend() { RequestBackend::Auth => self.auth_storage.after_sync(wrapper, auth_revision)?, - RequestBackend::Lease => self.lease_storage.after_sync(wrapper, general_revision)?, + RequestBackend::Lease => { + self.lease_storage + .after_sync(wrapper, general_revision, txn_db, index)? + } RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, general_revision), RequestBackend::Kv => unreachable!("Should not sync kv commands"), }; @@ -473,6 +478,7 @@ impl CurpCommandExecutor for CommandExecutor { .after_sync_others( wrapper, &txn_db, + &index_state, &general_revision_state, &auth_revision_state, to_execute, diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index c396d669a..36adf1b48 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -29,7 +29,8 @@ use xlineapi::{ pub(crate) use self::{lease::Lease, lease_collection::LeaseCollection}; use super::{ db::{WriteOp, DB}, - index::Index, + index::{Index, IndexOperate}, + storage_api::XlineStorageOps, }; use crate::{ header_gen::HeaderGenerator, @@ -52,6 +53,7 @@ pub(crate) struct LeaseStore { lease_collection: Arc, /// Db to store lease db: Arc, + #[allow(unused)] // used in tests /// Key to revision index index: Arc, /// Header generator @@ -98,18 +100,25 @@ impl LeaseStore { } /// sync a lease request - pub(crate) fn after_sync( + pub(crate) fn after_sync( &self, request: &RequestWrapper, revision_gen: &RevisionNumberGeneratorState<'_>, - ) -> Result<(SyncResponse, Vec), ExecuteError> { + txn_db: &T, + index: &I, + ) -> Result<(SyncResponse, Vec), ExecuteError> + where + T: XlineStorageOps + TransactionApi, + I: IndexOperate, + { let revision = if request.skip_lease_revision() { revision_gen.get() } else { revision_gen.next() }; - self.sync_request(request, revision) - .map(|(rev, ops)| (SyncResponse::new(rev), ops)) + // TODO: return only a `SyncResponse` + self.sync_request(request, revision, txn_db, index) + .map(|rev| (SyncResponse::new(rev), vec![])) } /// Get lease by id @@ -273,36 +282,45 @@ impl LeaseStore { } /// Sync `RequestWithToken` - fn sync_request( + fn sync_request( &self, wrapper: &RequestWrapper, revision: i64, - ) -> Result<(i64, Vec), ExecuteError> { + txn_db: &T, + index: &I, + ) -> Result + where + T: XlineStorageOps + TransactionApi, + I: IndexOperate, + { #[allow(clippy::wildcard_enum_match_arm)] - let ops = match *wrapper { + match *wrapper { RequestWrapper::LeaseGrantRequest(ref req) => { debug!("Sync LeaseGrantRequest {:?}", req); - self.sync_lease_grant_request(req) + self.sync_lease_grant_request(req, txn_db)?; } RequestWrapper::LeaseRevokeRequest(ref req) => { debug!("Sync LeaseRevokeRequest {:?}", req); - self.sync_lease_revoke_request(req, revision)? + self.sync_lease_revoke_request(req, revision, txn_db, index)?; } RequestWrapper::LeaseLeasesRequest(ref req) => { debug!("Sync LeaseLeasesRequest {:?}", req); - vec![] } _ => unreachable!("Other request should not be sent to this store"), }; - Ok((revision, ops)) + Ok(revision) } /// Sync `LeaseGrantRequest` - fn sync_lease_grant_request(&self, req: &LeaseGrantRequest) -> Vec { + fn sync_lease_grant_request( + &self, + req: &LeaseGrantRequest, + txn_db: &T, + ) -> Result<(), ExecuteError> { let lease = self .lease_collection .grant(req.id, req.ttl, self.is_primary()); - vec![WriteOp::PutLease(lease)] + txn_db.write_op(WriteOp::PutLease(lease)) } /// Get all `PbLease` @@ -320,14 +338,19 @@ impl LeaseStore { } /// Sync `LeaseRevokeRequest` - fn sync_lease_revoke_request( + fn sync_lease_revoke_request( &self, req: &LeaseRevokeRequest, revision: i64, - ) -> Result, ExecuteError> { - let mut ops = Vec::new(); + txn_db: &T, + index: &I, + ) -> Result<(), ExecuteError> + where + T: XlineStorageOps + TransactionApi, + I: IndexOperate, + { let mut updates = Vec::new(); - ops.push(WriteOp::DeleteLease(req.id)); + txn_db.write_op(WriteOp::DeleteLease(req.id))?; let del_keys = match self.lease_collection.look_up(req.id) { Some(l) => l.keys(), @@ -336,31 +359,24 @@ impl LeaseStore { if del_keys.is_empty() { let _ignore = self.lease_collection.revoke(req.id); - return Ok(Vec::new()); + return Ok(()); } - let txn_db = self.db.transaction(); - let txn_index = self.index.state(); - for (key, mut sub_revision) in del_keys.iter().zip(0..) { let deleted = - KvStore::delete_keys(&txn_db, &txn_index, key, &[], revision, &mut sub_revision)?; + KvStore::delete_keys(txn_db, index, key, &[], revision, &mut sub_revision)?; KvStore::detach_leases(&deleted, &self.lease_collection); let mut del_event = KvStore::new_deletion_events(revision, deleted); updates.append(&mut del_event); } - txn_db - .commit() - .map_err(|e| ExecuteError::DbError(e.to_string()))?; - txn_index.commit(); - let _ignore = self.lease_collection.revoke(req.id); assert!( self.kv_update_tx.send((revision, updates)).is_ok(), "Failed to send updates to KV watcher" ); - Ok(ops) + + Ok(()) } } @@ -430,7 +446,9 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_lease_sync() -> Result<(), Box> { let db = DB::open(&EngineConfig::Memory)?; - let (lease_store, rev_gen) = init_store(db); + let txn = db.transaction(); + let index = Index::new(); + let (lease_store, rev_gen) = init_store(Arc::clone(&db)); let rev_gen_state = rev_gen.state(); let wait_duration = Duration::from_millis(1); @@ -444,7 +462,7 @@ mod test { "the future should block until the lease is synced" ); - let (_ignore, ops) = lease_store.after_sync(&req1, &rev_gen_state)?; + let (_ignore, ops) = lease_store.after_sync(&req1, &rev_gen_state, &txn, &index)?; lease_store.db.write_ops(ops)?; lease_store.mark_lease_synced(&req1); @@ -465,7 +483,7 @@ mod test { "the future should block until the lease is synced" ); - let (_ignore, ops) = lease_store.after_sync(&req2, &rev_gen_state)?; + let (_ignore, ops) = lease_store.after_sync(&req2, &rev_gen_state, &txn, &index)?; lease_store.db.write_ops(ops)?; lease_store.mark_lease_synced(&req2); @@ -522,8 +540,12 @@ mod test { rev_gen: &RevisionNumberGeneratorState<'_>, ) -> Result { let cmd_res = ls.execute(req)?; - let (_ignore, ops) = ls.after_sync(req, rev_gen)?; - ls.db.write_ops(ops)?; + let txn = ls.db.transaction(); + let index = ls.index.state(); + let (_ignore, _ops) = ls.after_sync(req, rev_gen, &txn, &index)?; + txn.commit() + .map_err(|e| ExecuteError::DbError(e.to_string()))?; + index.commit(); rev_gen.commit(); Ok(cmd_res.into_inner()) } From 46ae7d342b1b6e9fdffbea7384ccbd85bd6945f0 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 20 Aug 2024 09:26:53 +0800 Subject: [PATCH 11/19] chore: remove unecessary txn usage Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/xline/src/storage/kv_store.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index e69a7d709..0832b7832 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -149,11 +149,9 @@ impl KvStoreInner { /// Get previous `KeyValue` of a `KeyValue` pub(crate) fn get_prev_kv(&self, kv: &KeyValue) -> Option { - let txn_db = self.db.transaction(); - let index = self.index.state(); Self::get_range( - &txn_db, - &index, + self.db.as_ref(), + self.index.as_ref(), &kv.key, &[], kv.mod_revision.overflow_sub(1), @@ -168,11 +166,10 @@ impl KvStoreInner { key_range: KeyRange, revision: i64, ) -> Result, ExecuteError> { - let txn = self.db.transaction(); let revisions = self.index .get_from_rev(key_range.range_start(), key_range.range_end(), revision); - let events = Self::get_values(&txn, &revisions)? + let events = Self::get_values(self.db.as_ref(), &revisions)? .into_iter() .map(|kv| { // Delete From 6b627c42df720e17c4d0d56201ed23703145c53b Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 20 Aug 2024 17:16:42 +0800 Subject: [PATCH 12/19] fix: lease store revision generation Only increments revision number when there's key deleted Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/xline/src/storage/lease_store/mod.rs | 28 ++++++++++++--------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index 36adf1b48..7aab4a111 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -16,6 +16,7 @@ use std::{ time::Duration, }; +use clippy_utilities::OverflowArithmetic; use engine::TransactionApi; use log::debug; use parking_lot::RwLock; @@ -111,14 +112,15 @@ impl LeaseStore { T: XlineStorageOps + TransactionApi, I: IndexOperate, { - let revision = if request.skip_lease_revision() { - revision_gen.get() - } else { + let next_revision = revision_gen.get().overflow_add(1); + let updated = self.sync_request(request, next_revision, txn_db, index)?; + let rev = if updated { revision_gen.next() + } else { + revision_gen.get() }; // TODO: return only a `SyncResponse` - self.sync_request(request, revision, txn_db, index) - .map(|rev| (SyncResponse::new(rev), vec![])) + Ok((SyncResponse::new(rev), vec![])) } /// Get lease by id @@ -288,27 +290,29 @@ impl LeaseStore { revision: i64, txn_db: &T, index: &I, - ) -> Result + ) -> Result where T: XlineStorageOps + TransactionApi, I: IndexOperate, { #[allow(clippy::wildcard_enum_match_arm)] - match *wrapper { + let updated = match *wrapper { RequestWrapper::LeaseGrantRequest(ref req) => { debug!("Sync LeaseGrantRequest {:?}", req); self.sync_lease_grant_request(req, txn_db)?; + false } RequestWrapper::LeaseRevokeRequest(ref req) => { debug!("Sync LeaseRevokeRequest {:?}", req); - self.sync_lease_revoke_request(req, revision, txn_db, index)?; + self.sync_lease_revoke_request(req, revision, txn_db, index)? } RequestWrapper::LeaseLeasesRequest(ref req) => { debug!("Sync LeaseLeasesRequest {:?}", req); + false } _ => unreachable!("Other request should not be sent to this store"), }; - Ok(revision) + Ok(updated) } /// Sync `LeaseGrantRequest` @@ -344,7 +348,7 @@ impl LeaseStore { revision: i64, txn_db: &T, index: &I, - ) -> Result<(), ExecuteError> + ) -> Result where T: XlineStorageOps + TransactionApi, I: IndexOperate, @@ -359,7 +363,7 @@ impl LeaseStore { if del_keys.is_empty() { let _ignore = self.lease_collection.revoke(req.id); - return Ok(()); + return Ok(false); } for (key, mut sub_revision) in del_keys.iter().zip(0..) { @@ -376,7 +380,7 @@ impl LeaseStore { "Failed to send updates to KV watcher" ); - Ok(()) + Ok(true) } } From 7bf01d55ab7ead17a49f5ed32d3bbefa1663a681 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Fri, 23 Aug 2024 09:56:37 +0800 Subject: [PATCH 13/19] fix: use execute_ro to speculative execute read-only commands Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp-external-api/src/cmd.rs | 8 ++++++++ crates/curp-test-utils/src/test_cmd.rs | 10 ++++++++++ crates/curp/src/server/cmd_worker/mod.rs | 7 +------ crates/xline/src/server/command.rs | 20 +++++++++++++++++++- 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/crates/curp-external-api/src/cmd.rs b/crates/curp-external-api/src/cmd.rs index c29c221f8..5b282b8bd 100644 --- a/crates/curp-external-api/src/cmd.rs +++ b/crates/curp-external-api/src/cmd.rs @@ -104,6 +104,14 @@ where /// command. fn execute(&self, cmd: &C) -> Result; + /// Execute the read-only command + /// + /// # Errors + /// + /// This function may return an error if there is a problem executing the + /// command. + fn execute_ro(&self, cmd: &C) -> Result<(C::ER, C::ASR), C::Error>; + /// Batch execute the after_sync callback /// /// This `highest_index` means the last log index of the `cmds` diff --git a/crates/curp-test-utils/src/test_cmd.rs b/crates/curp-test-utils/src/test_cmd.rs index 2a7cc980e..c3fa23895 100644 --- a/crates/curp-test-utils/src/test_cmd.rs +++ b/crates/curp-test-utils/src/test_cmd.rs @@ -284,6 +284,16 @@ impl CommandExecutor for TestCE { Ok(result) } + fn execute_ro( + &self, + cmd: &TestCommand, + ) -> Result< + (::ER, ::ASR), + ::Error, + > { + self.execute(cmd).map(|er| (er, LogIndexResult(0))) + } + fn after_sync( &self, cmds: Vec>, diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index 95a042597..d70cc20e7 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -51,12 +51,7 @@ pub(super) fn execute, RC: RoleChange>( unreachable!("should not speculative execute {:?}", entry.entry_data); }; if cmd.is_read_only() { - let result = ce - .after_sync(vec![AfterSyncCmd::new(cmd, true)], None) - .remove(0)?; - let (asr, er_opt) = result.into_parts(); - let er = er_opt.unwrap_or_else(|| unreachable!("er should exist")); - Ok((er, Some(asr))) + ce.execute_ro(cmd).map(|(er, asr)| (er, Some(asr))) } else { let er = ce.execute(cmd); let mut cb_w = cb.write(); diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index 183433b84..cd564729d 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -15,7 +15,7 @@ use parking_lot::RwLock; use tracing::warn; use utils::{barrier::IdBarrier, table_names::META_TABLE}; use xlineapi::{ - command::{Command, CurpClient}, + command::{Command, CurpClient, SyncResponse}, execute_error::ExecuteError, AlarmAction, AlarmRequest, AlarmType, }; @@ -429,6 +429,24 @@ impl CurpCommandExecutor for CommandExecutor { } } + fn execute_ro( + &self, + cmd: &Command, + ) -> Result< + (::ER, ::ASR), + ::Error, + > { + let er = self.execute(cmd)?; + let wrapper = cmd.request(); + let rev = match wrapper.backend() { + RequestBackend::Kv | RequestBackend::Lease | RequestBackend::Alarm => { + self.kv_storage.revision_gen().get() + } + RequestBackend::Auth => self.auth_storage.revision_gen().get(), + }; + Ok((er, SyncResponse::new(rev))) + } + fn after_sync( &self, cmds: Vec>, From c6d7d9b4258a3c44fd557bea45038ee19a64e77f Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Fri, 16 Aug 2024 17:36:57 +0800 Subject: [PATCH 14/19] chore: use join_all to concurrently build clients in benchmark Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- Cargo.lock | 1 + crates/benchmark/Cargo.toml | 1 + crates/benchmark/src/runner.rs | 16 ++++++++++------ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7240258cd..f00733ad3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,6 +332,7 @@ dependencies = [ "clap", "clippy-utilities", "etcd-client", + "futures", "indicatif", "rand", "thiserror", diff --git a/crates/benchmark/Cargo.toml b/crates/benchmark/Cargo.toml index cc6a1c215..819ae65c1 100644 --- a/crates/benchmark/Cargo.toml +++ b/crates/benchmark/Cargo.toml @@ -17,6 +17,7 @@ anyhow = "1.0.83" clap = { version = "4", features = ["derive"] } clippy-utilities = "0.2.0" etcd-client = { version = "0.13.0", features = ["tls"] } +futures = "0.3.30" indicatif = "0.17.8" rand = "0.8.5" thiserror = "1.0.61" diff --git a/crates/benchmark/src/runner.rs b/crates/benchmark/src/runner.rs index f53063d59..fb167716f 100644 --- a/crates/benchmark/src/runner.rs +++ b/crates/benchmark/src/runner.rs @@ -9,6 +9,7 @@ use std::{ use anyhow::Result; use clippy_utilities::{NumericCast, OverflowArithmetic}; +use futures::future::join_all; use indicatif::ProgressBar; use rand::RngCore; use tokio::{ @@ -158,7 +159,6 @@ impl CommandRunner { /// Create clients async fn create_clients(&self) -> Result> { - let mut clients = Vec::with_capacity(self.args.clients); let client_options = ClientOptions::default().with_client_config(ClientConfig::new( Duration::from_secs(10), Duration::from_secs(5), @@ -180,11 +180,15 @@ impl CommandRunner { } }) .collect::>(); - for _ in 0..self.args.clients { - let client = - BenchClient::new(addrs.clone(), self.args.use_curp, client_options.clone()).await?; - clients.push(client); - } + let clients_futs = std::iter::repeat_with(|| { + BenchClient::new(addrs.clone(), self.args.use_curp, client_options.clone()) + }) + .take(self.args.clients); + let clients = join_all(clients_futs) + .await + .into_iter() + .collect::>()?; + Ok(clients) } From c4f1dcb4c621b5bcf1e25ce1b2e7981bf9842096 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Fri, 23 Aug 2024 15:37:21 +0800 Subject: [PATCH 15/19] fix: remove check_members This check is not necessary, the urls are allowed to be empty before publish Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/unary.rs | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/crates/curp/src/client/unary.rs b/crates/curp/src/client/unary.rs index 7c6dc488f..2acf6658a 100644 --- a/crates/curp/src/client/unary.rs +++ b/crates/curp/src/client/unary.rs @@ -270,20 +270,6 @@ impl ClientApi for Unary { /// Send fetch cluster requests to all servers /// Note: The fetched cluster may still be outdated if `linearizable` is false async fn fetch_cluster(&self, linearizable: bool) -> Result { - /// Checks the member list, returns `true` if all member has been published - fn check_members(members: &[Member]) -> bool { - if members.is_empty() { - return false; - } - for member in members { - if member.client_urls.is_empty() { - debug!("new node {} not published yet", member.id()); - return false; - } - } - true - } - let timeout = self.config.wait_synced_timeout; if !linearizable { // firstly, try to fetch the local server @@ -344,14 +330,14 @@ impl ClientApi for Unary { match max_term.cmp(&inner.term) { Ordering::Less => { max_term = inner.term; - if check_members(&inner.members) { + if !inner.members.is_empty() { res = Some(inner); } // reset ok count to 1 ok_cnt = 1; } Ordering::Equal => { - if check_members(&inner.members) { + if !inner.members.is_empty() { res = Some(inner); } ok_cnt += 1; From 313b819caa305cc17125454224fab8953e3f6784 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 7 Aug 2024 16:43:15 +0800 Subject: [PATCH 16/19] fix: generate propose id inside client retry closure Because client id may change during retry, the propose id generation must be called for each retry Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- Cargo.lock | 2 ++ crates/curp/src/client/retry.rs | 39 +++++++++++++++++++-------------- workspace-hack/Cargo.toml | 2 ++ 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f00733ad3..068f7bfa9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3780,6 +3780,7 @@ dependencies = [ "crypto-common", "digest", "either", + "etcd-client", "futures-channel", "futures-util", "getrandom", @@ -3802,6 +3803,7 @@ dependencies = [ "tokio", "tokio-util", "tonic", + "tonic-build", "tower", "tracing", "tracing-log", diff --git a/crates/curp/src/client/retry.rs b/crates/curp/src/client/retry.rs index 607623e4f..ee9e3d6c1 100644 --- a/crates/curp/src/client/retry.rs +++ b/crates/curp/src/client/retry.rs @@ -224,9 +224,9 @@ where token: Option<&String>, use_fast_path: bool, ) -> Result, tonic::Status> { - let propose_id = self.inner.gen_propose_id()?; - self.retry::<_, _>(|client| { - RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path) + self.retry::<_, _>(|client| async move { + let propose_id = self.inner.gen_propose_id()?; + RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path).await }) .await } @@ -236,19 +236,23 @@ where &self, changes: Vec, ) -> Result, tonic::Status> { - let propose_id = self.inner.gen_propose_id()?; self.retry::<_, _>(|client| { let changes_c = changes.clone(); - RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c) + async move { + let propose_id = self.inner.gen_propose_id()?; + RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c).await + } }) .await } /// Send propose to shutdown cluster async fn propose_shutdown(&self) -> Result<(), tonic::Status> { - let propose_id = self.inner.gen_propose_id()?; - self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, *propose_id)) - .await + self.retry::<_, _>(|client| async move { + let propose_id = self.inner.gen_propose_id()?; + RepeatableClientApi::propose_shutdown(client, *propose_id).await + }) + .await } /// Send propose to publish a node id and name @@ -258,17 +262,20 @@ where node_name: String, node_client_urls: Vec, ) -> Result<(), Self::Error> { - let propose_id = self.inner.gen_propose_id()?; self.retry::<_, _>(|client| { let name_c = node_name.clone(); let node_client_urls_c = node_client_urls.clone(); - RepeatableClientApi::propose_publish( - client, - *propose_id, - node_id, - name_c, - node_client_urls_c, - ) + async move { + let propose_id = self.inner.gen_propose_id()?; + RepeatableClientApi::propose_publish( + client, + *propose_id, + node_id, + name_c, + node_client_urls_c, + ) + .await + } }) .await } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 7eec178ae..913b7cb78 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -19,6 +19,7 @@ clap = { version = "4", features = ["derive"] } crypto-common = { version = "0.1", default-features = false, features = ["std"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1", default-features = false, features = ["use_std"] } +etcd-client = { version = "0.13", default-features = false, features = ["tls"] } futures-channel = { version = "0.3", features = ["sink"] } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } getrandom = { version = "0.2", default-features = false, features = ["js", "rdrand", "std"] } @@ -57,5 +58,6 @@ predicates = { version = "3", default-features = false, features = ["diff"] } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "rt-multi-thread", "signal", "sync", "time"] } +tonic-build = { version = "0.11" } ### END HAKARI SECTION From 2ba7ae1e641d5a4d99bb3b81dcaad1a2df0be57e Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Fri, 23 Aug 2024 16:02:08 +0800 Subject: [PATCH 17/19] refactor: use synchronous compaction in `sync_compaction` It seems sync wait on a event listener will potentialy cause a deadlock, we will address this in the future. Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/xline/src/storage/kv_store.rs | 46 ++++++++-------------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index 0832b7832..19b8fb20a 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -11,8 +11,6 @@ use std::{ use clippy_utilities::{NumericCast, OverflowArithmetic}; use engine::{Transaction, TransactionApi}; -#[cfg(not(madsim))] -use event_listener::Listener; use prost::Message; use tracing::{debug, warn}; use utils::table_names::{KV_TABLE, META_TABLE}; @@ -1121,41 +1119,21 @@ impl KvStore { let ops = vec![WriteOp::PutScheduledCompactRevision(revision)]; // TODO: Remove the physical process logic here. It's better to move into the // KvServer - // TODO: sync compaction task // FIXME: madsim is single threaded, we cannot use synchronous wait here - #[cfg(not(madsim))] - { - let (event, listener) = if req.physical { - let event = Arc::new(event_listener::Event::new()); - let listener = event.listen(); - (Some(event), Some(listener)) - } else { - (None, None) - }; - if let Err(e) = self.compact_task_tx.send((revision, event)) { - panic!("the compactor exited unexpectedly: {e:?}"); - } - if let Some(listener) = listener { - listener.wait(); + let index = self.index(); + let target_revisions = index + .compact(revision) + .into_iter() + .map(|key_rev| key_rev.as_revision().encode_to_vec()) + .collect::>>(); + // Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance? + for revision_chunk in target_revisions.chunks(1000) { + if let Err(e) = self.compact(revision_chunk) { + panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); } } - #[cfg(madsim)] - { - let index = self.index(); - let target_revisions = index - .compact(revision) - .into_iter() - .map(|key_rev| key_rev.as_revision().encode_to_vec()) - .collect::>>(); - // Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance? - for revision_chunk in target_revisions.chunks(1000) { - if let Err(e) = self.compact(revision_chunk) { - panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); - } - } - if let Err(e) = self.compact_finished(revision) { - panic!("failed to set finished compact revision {revision:?} due to {e}"); - } + if let Err(e) = self.compact_finished(revision) { + panic!("failed to set finished compact revision {revision:?} due to {e}"); } self.inner.db.write_ops(ops)?; From 47a89003661ad865d44ca7500f66b807b2f32002 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Fri, 23 Aug 2024 16:20:05 +0800 Subject: [PATCH 18/19] chore: update cargo hakari Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- Cargo.lock | 2 -- workspace-hack/Cargo.toml | 2 -- 2 files changed, 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 068f7bfa9..f00733ad3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3780,7 +3780,6 @@ dependencies = [ "crypto-common", "digest", "either", - "etcd-client", "futures-channel", "futures-util", "getrandom", @@ -3803,7 +3802,6 @@ dependencies = [ "tokio", "tokio-util", "tonic", - "tonic-build", "tower", "tracing", "tracing-log", diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 913b7cb78..7eec178ae 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -19,7 +19,6 @@ clap = { version = "4", features = ["derive"] } crypto-common = { version = "0.1", default-features = false, features = ["std"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1", default-features = false, features = ["use_std"] } -etcd-client = { version = "0.13", default-features = false, features = ["tls"] } futures-channel = { version = "0.3", features = ["sink"] } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } getrandom = { version = "0.2", default-features = false, features = ["js", "rdrand", "std"] } @@ -58,6 +57,5 @@ predicates = { version = "3", default-features = false, features = ["diff"] } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "rt-multi-thread", "signal", "sync", "time"] } -tonic-build = { version = "0.11" } ### END HAKARI SECTION From 7d0bc8f5fe6f4a594a778253559729a2c791d2c0 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 26 Aug 2024 11:22:00 +0800 Subject: [PATCH 19/19] chore: remove unused index from lease_store Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/xline/src/server/xline_server.rs | 3 +- crates/xline/src/storage/lease_store/mod.rs | 36 ++++++++++----------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index a4b663689..de40466c5 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -228,7 +228,7 @@ impl XlineServer { self.task_manager.spawn(TaskName::CompactBg, |n| { compact_bg_task( Arc::clone(&kv_storage), - Arc::clone(&index), + index, *self.compact_config.compact_batch_size(), *self.compact_config.compact_sleep_interval(), compact_task_rx, @@ -239,7 +239,6 @@ impl XlineServer { Arc::clone(&lease_collection), Arc::clone(&header_gen), Arc::clone(&db), - index, kv_update_tx, *self.cluster_config.is_leader(), )); diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index 7aab4a111..a6ff9c26a 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -30,7 +30,7 @@ use xlineapi::{ pub(crate) use self::{lease::Lease, lease_collection::LeaseCollection}; use super::{ db::{WriteOp, DB}, - index::{Index, IndexOperate}, + index::IndexOperate, storage_api::XlineStorageOps, }; use crate::{ @@ -54,9 +54,6 @@ pub(crate) struct LeaseStore { lease_collection: Arc, /// Db to store lease db: Arc, - #[allow(unused)] // used in tests - /// Key to revision index - index: Arc, /// Header generator header_gen: Arc, /// KV update sender @@ -75,14 +72,12 @@ impl LeaseStore { lease_collection: Arc, header_gen: Arc, db: Arc, - index: Arc, kv_update_tx: flume::Sender<(i64, Vec)>, is_leader: bool, ) -> Self { Self { lease_collection, db, - index, header_gen, kv_update_tx, is_primary: AtomicBool::new(is_leader), @@ -394,18 +389,23 @@ mod test { use super::*; use crate::{ revision_number::RevisionNumberGenerator, - storage::{db::DB, storage_api::XlineStorageOps}, + storage::{ + db::DB, + index::{Index, IndexState}, + storage_api::XlineStorageOps, + }, }; #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn test_lease_storage() -> Result<(), Box> { let db = DB::open(&EngineConfig::Memory)?; + let index = Index::new(); let (lease_store, rev_gen) = init_store(db); let rev_gen_state = rev_gen.state(); let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); - let _ignore1 = exe_and_sync_req(&lease_store, &req1, &rev_gen_state)?; + let _ignore1 = exe_and_sync_req(&lease_store, index.state(), &req1, &rev_gen_state)?; let lo = lease_store.look_up(1).unwrap(); assert_eq!(lo.id(), 1); @@ -419,7 +419,7 @@ mod test { lease_store.lease_collection.detach(1, "key".as_bytes())?; let req2 = RequestWrapper::from(LeaseRevokeRequest { id: 1 }); - let _ignore2 = exe_and_sync_req(&lease_store, &req2, &rev_gen_state)?; + let _ignore2 = exe_and_sync_req(&lease_store, index.state(), &req2, &rev_gen_state)?; assert!(lease_store.look_up(1).is_none()); assert!(lease_store.leases().is_empty()); @@ -427,9 +427,9 @@ mod test { let req4 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 4 }); let req5 = RequestWrapper::from(LeaseRevokeRequest { id: 3 }); let req6 = RequestWrapper::from(LeaseLeasesRequest {}); - let _ignore3 = exe_and_sync_req(&lease_store, &req3, &rev_gen_state)?; - let _ignore4 = exe_and_sync_req(&lease_store, &req4, &rev_gen_state)?; - let resp_1 = exe_and_sync_req(&lease_store, &req6, &rev_gen_state)?; + let _ignore3 = exe_and_sync_req(&lease_store, index.state(), &req3, &rev_gen_state)?; + let _ignore4 = exe_and_sync_req(&lease_store, index.state(), &req4, &rev_gen_state)?; + let resp_1 = exe_and_sync_req(&lease_store, index.state(), &req6, &rev_gen_state)?; let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else { panic!("wrong response type: {resp_1:?}"); @@ -437,8 +437,8 @@ mod test { assert_eq!(leases_1.leases[0].id, 3); assert_eq!(leases_1.leases[1].id, 4); - let _ignore5 = exe_and_sync_req(&lease_store, &req5, &rev_gen_state)?; - let resp_2 = exe_and_sync_req(&lease_store, &req6, &rev_gen_state)?; + let _ignore5 = exe_and_sync_req(&lease_store, index.state(), &req5, &rev_gen_state)?; + let resp_2 = exe_and_sync_req(&lease_store, index.state(), &req6, &rev_gen_state)?; let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else { panic!("wrong response type: {resp_2:?}"); }; @@ -505,11 +505,12 @@ mod test { #[abort_on_panic] async fn test_recover() -> Result<(), ExecuteError> { let db = DB::open(&EngineConfig::Memory)?; + let index = Index::new(); let (store, rev_gen) = init_store(Arc::clone(&db)); let rev_gen_state = rev_gen.state(); let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); - let _ignore1 = exe_and_sync_req(&store, &req1, &rev_gen_state)?; + let _ignore1 = exe_and_sync_req(&store, index.state(), &req1, &rev_gen_state)?; store.lease_collection.attach(1, "key".into())?; let (new_store, _) = init_store(db); @@ -531,21 +532,20 @@ mod test { let lease_collection = Arc::new(LeaseCollection::new(0)); let (kv_update_tx, _) = flume::bounded(1); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); - let index = Arc::new(Index::new()); ( - LeaseStore::new(lease_collection, header_gen, db, index, kv_update_tx, true), + LeaseStore::new(lease_collection, header_gen, db, kv_update_tx, true), RevisionNumberGenerator::new(1), ) } fn exe_and_sync_req( ls: &LeaseStore, + index: IndexState, req: &RequestWrapper, rev_gen: &RevisionNumberGeneratorState<'_>, ) -> Result { let cmd_res = ls.execute(req)?; let txn = ls.db.transaction(); - let index = ls.index.state(); let (_ignore, _ops) = ls.after_sync(req, rev_gen, &txn, &index)?; txn.commit() .map_err(|e| ExecuteError::DbError(e.to_string()))?;