diff --git a/.gitmodules b/.gitmodules index 9efeaae70..c7f0c33cd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,8 +3,4 @@ url = https://github.com/xline-kv/xline-proto.git [submodule "curp/proto/common"] path = crates/curp/proto/common - url = https://github.com/xline-kv/curp-proto.git -[submodule "crates/jepsen-rs"] - path = crates/jepsen-rs - url = https://github.com/lxl66566/jepsen-rs.git - branch = main + url = https://github.com/xline-kv/curp-proto.git \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 70d6dfd98..642a7447a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -968,19 +968,6 @@ dependencies = [ "workspace-hack", ] -[[package]] -name = "env_logger" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" -dependencies = [ - "humantime", - "is-terminal", - "log", - "regex", - "termcolor", -] - [[package]] name = "equivalent" version = "1.0.1" @@ -1280,12 +1267,6 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" -[[package]] -name = "hermit-abi" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" - [[package]] name = "hex" version = "0.4.3" @@ -1347,12 +1328,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "1.4.1" @@ -1495,17 +1470,6 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" -[[package]] -name = "is-terminal" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" -dependencies = [ - "hermit-abi 0.4.0", - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1570,6 +1534,7 @@ dependencies = [ [[package]] name = "jepsen-rs" version = "0.1.0" +source = "git+https://github.com/lxl66566/jepsen-rs.git?branch=nemesis#9134ff52a2ae5b1e8f92f302ea74f77553f6db76" dependencies = [ "anyhow", "async-recursion", @@ -1581,7 +1546,6 @@ dependencies = [ "madsim", "madsim-tokio", "once_cell", - "pretty_env_logger", "serde", "serde_json", "serde_with", @@ -1864,7 +1828,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ - "hermit-abi 0.3.9", + "hermit-abi", "libc", "wasi", "windows-sys 0.52.0", @@ -2312,16 +2276,6 @@ dependencies = [ "termtree", ] -[[package]] -name = "pretty_env_logger" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" -dependencies = [ - "env_logger", - "log", -] - [[package]] name = "prettyplease" version = "0.2.22" @@ -3030,15 +2984,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "termcolor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" -dependencies = [ - "winapi-util", -] - [[package]] name = "termtree" version = "0.4.1" @@ -3708,15 +3653,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/crates/jepsen-rs b/crates/jepsen-rs deleted file mode 160000 index ce07e5769..000000000 --- a/crates/jepsen-rs +++ /dev/null @@ -1 +0,0 @@ -Subproject commit ce07e5769cee731a87b89781882e8ae977c88893 diff --git a/crates/simulation/Cargo.toml b/crates/simulation/Cargo.toml index 5176698fb..61a34bd09 100644 --- a/crates/simulation/Cargo.toml +++ b/crates/simulation/Cargo.toml @@ -40,7 +40,7 @@ workspace-hack = { version = "0.1", path = "../../workspace-hack" } xline = { path = "../xline" } xline-client = { path = "../xline-client" } xlineapi = { path = "../xlineapi" } -jepsen-rs = { path = "../jepsen-rs" } +jepsen-rs = { git = "https://github.com/lxl66566/jepsen-rs.git", branch = "nemesis" } [build-dependencies] tonic-build = { version = "0.5.0", package = "madsim-tonic-build" } diff --git a/crates/simulation/src/xline_group.rs b/crates/simulation/src/xline_group.rs index 31bc2032f..a072287df 100644 --- a/crates/simulation/src/xline_group.rs +++ b/crates/simulation/src/xline_group.rs @@ -1,7 +1,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; +use curp::rpc::{protocol_client::ProtocolClient, FetchClusterRequest, FetchClusterResponse}; use itertools::Itertools; -use jepsen_rs::client::ElleRwClusterClient; +use jepsen_rs::{client::ElleRwClusterClient, nemesis::implementation::NemesisCluster, op::Op}; use madsim::runtime::NodeHandle; use tonic::transport::Channel; use tracing::debug; @@ -14,7 +15,8 @@ use xline_client::{ error::XlineClientError, types::{ kv::{ - CompactionResponse, PutOptions, PutResponse, RangeOptions, RangeResponse, TxnRequest, + CompactionResponse, PutOptions, PutResponse, RangeOptions, RangeResponse, TxnOp, + TxnRequest, }, watch::{WatchOptions, WatchStreaming, Watcher}, }, @@ -25,6 +27,8 @@ use xlineapi::{ TxnResponse, WatchClient, }; +type ServerId = u64; + pub struct XlineNode { pub client_url: String, pub peer_url: String, @@ -34,6 +38,7 @@ pub struct XlineNode { pub struct XlineGroup { pub nodes: HashMap, + pub all_members: Vec, pub client_handle: NodeHandle, } @@ -42,6 +47,9 @@ impl XlineGroup { assert!(size >= 3, "the number of nodes must >= 3"); let handle = madsim::runtime::Handle::current(); + let all_members = (0..size) + .map(|x| format!("192.168.1.{}:2380", x + 1)) + .collect(); let all: HashMap<_, _> = (0..size) .map(|x| (format!("S{x}"), vec![format!("192.168.1.{}:2380", x + 1)])) .collect(); @@ -112,6 +120,7 @@ impl XlineGroup { madsim::time::sleep(Duration::from_secs(20)).await; Self { nodes, + all_members, client_handle, } } @@ -137,22 +146,174 @@ impl XlineGroup { } } + #[inline] pub fn get_node(&self, name: &str) -> &XlineNode { - self.nodes.get(name).unwrap() + self.nodes + .get(name) + .expect("no node with name {name} the simulator") + } + + /// Get the server node handle from ServerId + #[inline] + fn get_node_handle(&self, id: ServerId) -> NodeHandle { + self.get_node(id.to_string().as_str()).handle.clone() } - pub async fn crash(&mut self, name: &str) { + pub async fn try_get_leader(&self) -> Option<(ServerId, u64)> { + debug!("cluster trying to get leader"); + let mut leader = None; + let mut max_term = 0; + + let all_members = self.all_members.clone(); + self.client_handle + .spawn(async move { + for addr in all_members { + let addr = format!("http://{}", addr); + tracing::warn!("connecting to : {}", addr); + let mut client = if let Ok(client) = ProtocolClient::connect(addr.clone()).await + { + client + } else { + continue; + }; + + let FetchClusterResponse { + leader_id, term, .. + } = if let Ok(resp) = client.fetch_cluster(FetchClusterRequest::default()).await + { + resp.into_inner() + } else { + continue; + }; + if term > max_term { + max_term = term; + leader = leader_id; + } else if term == max_term && leader.is_none() { + leader = leader_id; + } + } + leader.map(|l| (l.into(), max_term)) + }) + .await + .unwrap() + } + + pub async fn get_leader(&self) -> (ServerId, u64) { + const RETRY_INTERVAL: u64 = 100; + loop { + if let Some(leader) = self.try_get_leader().await { + return leader; + } + debug!("failed to get leader"); + madsim::time::sleep(Duration::from_millis(RETRY_INTERVAL)).await; + } + } + + pub async fn crash(&self, name: impl AsRef + std::fmt::Display) { let handle = madsim::runtime::Handle::current(); - handle.kill(name); + handle.kill(name.as_ref()); madsim::time::sleep(Duration::from_secs(10)).await; - if !handle.is_exit(name) { + if !handle.is_exit(name.as_ref()) { panic!("failed to crash node: {name}"); } } - pub async fn restart(&mut self, name: &str) { + pub async fn restart(&self, name: impl AsRef) { + let handle = madsim::runtime::Handle::current(); + handle.restart(name.as_ref()); + } +} + +impl Drop for XlineGroup { + fn drop(&mut self) { + let handle = madsim::runtime::Handle::current(); + for node in self.nodes.values() { + handle.send_ctrl_c(node.handle.id()); + } + handle.send_ctrl_c(self.client_handle.id()); + for (name, node) in &self.nodes { + if !handle.is_exit(node.handle.id()) { + panic!("failed to graceful shutdown {name}"); + } + } + debug!("all nodes shutdowned"); + } +} + +#[async_trait::async_trait] +impl NemesisCluster for XlineGroup { + async fn kill(&self, servers: &[ServerId]) { + let handle = madsim::runtime::Handle::current(); + for id in servers { + handle.kill(id.to_string()); + } + madsim::time::sleep(Duration::from_secs(10)).await; + assert!( + servers.iter().all(|x| handle.is_exit(x.to_string())), + "failed to kill nodes: {servers:?}" + ); + } + async fn restart(&self, servers: &[ServerId]) { let handle = madsim::runtime::Handle::current(); - handle.restart(name); + for id in servers { + handle.restart(id.to_string()); + } + } + async fn pause(&self, servers: &[ServerId]) { + let handle = madsim::runtime::Handle::current(); + for id in servers { + handle.pause(id.to_string()); + } + } + async fn resume(&self, servers: &[ServerId]) { + let handle = madsim::runtime::Handle::current(); + for id in servers { + handle.resume(id.to_string()); + } + } + async fn get_leader_without_term(&self) -> ServerId { + self.get_leader().await.0 + } + /// clog link for both side. + fn clog_link_both(&self, fst: ServerId, snd: ServerId) { + let net = madsim::net::NetSim::current(); + let (fst, snd) = ( + self.get_node_handle(fst).id(), + self.get_node_handle(snd).id(), + ); + net.clog_link(fst, snd); + net.clog_link(snd, fst); + } + /// unclog link for both side. + fn unclog_link_both(&self, fst: ServerId, snd: ServerId) { + let net = madsim::net::NetSim::current(); + let (fst, snd) = ( + self.get_node_handle(fst).id(), + self.get_node_handle(snd).id(), + ); + net.unclog_link(fst, snd); + net.unclog_link(snd, fst); + } + /// clog link for one side. + fn clog_link_single(&self, fst: ServerId, snd: ServerId) { + let net = madsim::net::NetSim::current(); + let (fst, snd) = ( + self.get_node_handle(fst).id(), + self.get_node_handle(snd).id(), + ); + net.clog_link(fst, snd); + } + /// unclog link for one side. + fn unclog_link_single(&self, fst: ServerId, snd: ServerId) { + let net = madsim::net::NetSim::current(); + let (fst, snd) = ( + self.get_node_handle(fst).id(), + self.get_node_handle(snd).id(), + ); + net.unclog_link(fst, snd); + } + fn size(&self) -> usize { + return self.nodes.len(); } } @@ -244,21 +405,52 @@ impl ElleRwClusterClient for SimClient { .map_err(|err| err.to_string())?; Ok(()) } -} - -impl Drop for XlineGroup { - fn drop(&mut self) { - let handle = madsim::runtime::Handle::current(); - for node in self.nodes.values() { - handle.send_ctrl_c(node.handle.id()); - } - handle.send_ctrl_c(self.client_handle.id()); - for (name, node) in &self.nodes { - if !handle.is_exit(node.handle.id()) { - panic!("failed to graceful shutdown {name}"); - } - } - debug!("all nodes shutdowned"); + async fn txn(&self, ops: Vec) -> std::result::Result, String> { + let txn_op = ops + .clone() + .into_iter() + .map(|op| match op { + Op::Read(key, _value) => TxnOp::range(key.to_be_bytes(), None), + Op::Write(key, value) => TxnOp::put(key.to_be_bytes(), value.to_be_bytes(), None), + _ => unimplemented!("txn Ops should not contain Txn"), + }) + .collect::>(); + let txn = TxnRequest::new().when(&[]).and_then(txn_op).or_else(&[]); + let txn_response = self.txn(txn).await.map_err(|err| err.to_string())?; + assert!(txn_response.succeeded, "txn has no compare value"); + assert_eq!( + ops.len(), + txn_response.responses.len(), + "txn op and response mismatch" + ); + let res = txn_response + .responses + .into_iter() + .enumerate() + .filter_map(|(index, res)| { + res.response.map(|res| match res { + xlineapi::Response::ResponseRange(range) => { + let original_op = ops[index].clone(); + if range.kvs.len() == 0 { + // key not found, read nothing + return original_op; + } + let kv = range.kvs.into_iter().next().unwrap(); + Op::Read( + u64::from_be_bytes(kv.key.try_into().expect("key should be 8 bytes")), + Some(u64::from_be_bytes( + kv.value.try_into().expect("key should be 8 bytes"), + )), + ) + } + xlineapi::Response::ResponsePut(_) => { + ops[index].clone() // put operation + } + _ => unimplemented!("txn response should only contains range and put"), + }) + }) + .collect(); + Ok(res) } } @@ -414,3 +606,25 @@ impl SimEtcdClient { .unwrap() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[madsim::test] + async fn test_sim_client_kv_op() -> Result<(), Box> { + let group = XlineGroup::new(5).await; + let client = group.client().await; + ElleRwClusterClient::put(&client, 1, 2).await?; + let res = ElleRwClusterClient::get(&client, 1).await?; + assert_eq!(res, Some(2)); + + let txn = vec![Op::Read(1, None), Op::Write(1, 3), Op::Read(1, None)]; + let res = ElleRwClusterClient::txn(&client, txn).await?; + assert_eq!( + res, + vec![Op::Read(1, Some(2)), Op::Write(1, 3), Op::Read(1, Some(3))] + ); + Ok(()) + } +} diff --git a/crates/simulation/tests/it/jepsen/basic.rs b/crates/simulation/tests/it/jepsen/basic.rs index de05f23ba..9f4927894 100644 --- a/crates/simulation/tests/it/jepsen/basic.rs +++ b/crates/simulation/tests/it/jepsen/basic.rs @@ -1,7 +1,12 @@ +use std::sync::Arc; + use jepsen_rs::{ client::{Client, JepsenClient}, - generator::{controller::GeneratorGroupStrategy, elle_rw::ElleRwGenerator, GeneratorGroup}, - op::Op, + generator::{ + controller::GeneratorGroupStrategy, elle_rw::ElleRwGenerator, GeneratorGroup, + NemesisRawGenWrapper, + }, + op::{nemesis::OpOrNemesis, Op}, }; use tracing::info; @@ -13,16 +18,20 @@ fn basic_test() { rt.set_allow_system_thread(true); rt.block_on(async move { - let group = XlineGroup::new(5).await; + let group = Arc::new(XlineGroup::new(5).await); let sim_client = group.client().await; - let jepsen_client = JepsenClient::new(sim_client, ElleRwGenerator::new().unwrap()); + let jepsen_client = JepsenClient::new( + group, + sim_client, + NemesisRawGenWrapper(Box::new(ElleRwGenerator::new().unwrap())), + ); let jepsen_client = Box::leak(jepsen_client.into()); info!("basic_test: client created"); // get generators, transform and merge them let g1 = jepsen_client .new_generator(100) - .filter(|o| matches!(o, Op::Txn(txn) if txn.len() == 1)) + .filter(|o| matches!(o, OpOrNemesis::Op(Op::Txn(txn)) if txn.len() == 1)) .await; let g2 = jepsen_client.new_generator(50); let g3 = jepsen_client.new_generator(50); diff --git a/crates/simulation/tests/it/xline.rs b/crates/simulation/tests/it/xline.rs index 2e9b48501..414bb4321 100644 --- a/crates/simulation/tests/it/xline.rs +++ b/crates/simulation/tests/it/xline.rs @@ -43,7 +43,7 @@ async fn watch_compacted_revision_should_receive_canceled_response() { #[madsim::test] async fn xline_members_restore() { init_logger(); - let mut group = XlineGroup::new(3).await; + let group = XlineGroup::new(3).await; let node = group.get_node("S1"); let addr = node.client_url.clone(); let mut client = SimEtcdClient::new(addr, group.client_handle.clone()).await;