diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index 8d27aad3c4b2..5519cc65e1a3 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -1,4 +1,14 @@ flush_stats_factor = 1 +{{ if setup_etcd }} +## Store server address default to etcd store. +store_addrs = [{store_addrs | unescaped}] + +## Store data in memory. +use_memory_store = false + +## The datastore for meta server. +backend = "EtcdStore" +{{ endif }} [wal] {{ if is_raft_engine }} provider = "raft_engine" diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 5175c785b085..9212288cd7e7 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -65,6 +65,12 @@ pub enum WalConfig { }, } +#[derive(Clone)] +pub struct StoreConfig { + pub store_addrs: Vec, + pub setup_etcd: bool, +} + #[derive(Clone)] pub struct Env { sqlness_home: PathBuf, @@ -79,6 +85,8 @@ pub struct Env { pull_version_on_need: bool, /// old bins dir, useful when switching versions old_bins_dir: Arc>>, + /// Store address for metasrv metadata + store_config: StoreConfig, } #[async_trait] @@ -107,6 +115,7 @@ impl Env { wal: WalConfig, pull_version_on_need: bool, bins_dir: Option, + store_config: StoreConfig, ) -> Self { Self { sqlness_home: data_home, @@ -115,6 +124,7 @@ impl Env { pull_version_on_need, bins_dir: Arc::new(Mutex::new(bins_dir)), old_bins_dir: Arc::new(Mutex::new(None)), + store_config, } } @@ -144,6 +154,7 @@ impl Env { } else { self.build_db(); self.setup_wal(); + self.setup_etcd(); let db_ctx = GreptimeDBContext::new(self.wal.clone()); @@ -539,6 +550,19 @@ impl Env { } } + /// Setup etcd if needed. + fn setup_etcd(&self) { + if self.store_config.setup_etcd { + let client_ports = self + .store_config + .store_addrs + .iter() + .map(|s| s.split(':').nth(1).unwrap().parse::().unwrap()) + .collect::>(); + util::setup_etcd(client_ports, None, None); + } + } + /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String { let mut tt = TinyTemplate::new(); @@ -555,6 +579,8 @@ impl Env { procedure_dir: String, is_raft_engine: bool, kafka_wal_broker_endpoints: String, + setup_etcd: bool, + store_addrs: String, } let data_home = self.sqlness_home.join(format!("greptimedb-{subcommand}")); @@ -568,6 +594,15 @@ impl Env { procedure_dir, is_raft_engine: db_ctx.is_raft_engine(), kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(), + setup_etcd: self.store_config.setup_etcd, + store_addrs: self + .store_config + .store_addrs + .clone() + .iter() + .map(|p| format!("\"{p}\"")) + .collect::>() + .join(","), }; let rendered = tt.render(subcommand, &ctx).unwrap(); diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index a5386a3f05df..aed0de953531 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -22,6 +22,8 @@ use env::{Env, WalConfig}; use sqlness::interceptor::Registry; use sqlness::{ConfigBuilder, Runner}; +use crate::env::StoreConfig; + mod env; mod protocol_interceptor; mod util; @@ -96,6 +98,14 @@ struct Args { /// Pull Different versions of GreptimeDB on need. #[clap(long, default_value = "true")] pull_version_on_need: bool, + + /// The store addresses for the raft engine. + #[clap(long, default_value = "0.0.0.0:2379")] + store_addrs: Vec, + + /// Whether to setup etcd, by default it is true. + #[clap(long, default_value = "false")] + setup_etcd: bool, } #[tokio::main] @@ -114,6 +124,11 @@ async fn main() { Arc::new(protocol_interceptor::ProtocolInterceptorFactory), ); + if let Some(d) = &args.case_dir { + if !d.is_dir() { + panic!("{} is not a directory", d.display()); + } + } let config = ConfigBuilder::default() .case_dir(util::get_case_dir(args.case_dir)) .fail_fast(args.fail_fast) @@ -136,6 +151,11 @@ async fn main() { }, }; + let store = StoreConfig { + store_addrs: args.store_addrs.clone(), + setup_etcd: args.setup_etcd, + }; + let runner = Runner::new( config, Env::new( @@ -144,12 +164,15 @@ async fn main() { wal, args.pull_version_on_need, args.bins_dir, + store, ), ); runner.run().await.unwrap(); // clean up and exit if !args.preserve_state { + println!("Stopping etcd"); + util::stop_etcd(); println!("Removing state in {:?}", sqlness_home); tokio::fs::remove_dir_all(sqlness_home).await.unwrap(); } diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index 5f9cbfa1bc59..61bed6e233e0 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -30,7 +30,7 @@ const PORT_CHECK_INTERVAL: Duration = Duration::from_millis(100); fn http_proxy() -> Option { for proxy in ["http_proxy", "HTTP_PROXY", "all_proxy", "ALL_PROXY"] { if let Ok(proxy_addr) = std::env::var(proxy) { - println!("Setting Proxy from env: {}={}", proxy, proxy_addr); + println!("Getting Proxy from env var: {}={}", proxy, proxy_addr); return Some(proxy_addr); } } @@ -40,7 +40,7 @@ fn http_proxy() -> Option { fn https_proxy() -> Option { for proxy in ["https_proxy", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"] { if let Ok(proxy_addr) = std::env::var(proxy) { - println!("Setting Proxy from env: {}={}", proxy, proxy_addr); + println!("Getting Proxy from env var: {}={}", proxy, proxy_addr); return Some(proxy_addr); } } @@ -57,19 +57,37 @@ async fn download_files(url: &str, path: &str) { }; let client = proxy - .map(|proxy| reqwest::Client::builder().proxy(proxy).build().unwrap()) + .map(|proxy| { + reqwest::Client::builder() + .proxy(proxy) + .build() + .expect("Failed to build client") + }) .unwrap_or(reqwest::Client::new()); - let mut file = tokio::fs::File::create(path).await.unwrap(); + let mut file = tokio::fs::File::create(path) + .await + .unwrap_or_else(|_| panic!("Failed to create file in {path}")); println!("Downloading {}...", url); - let mut stream = client.get(url).send().await.unwrap().bytes_stream(); + let resp = client + .get(url) + .send() + .await + .expect("Failed to send download request"); + let len = resp.content_length(); + let mut stream = resp.bytes_stream(); let mut size_downloaded = 0; while let Some(chunk_result) = stream.next().await { let chunk = chunk_result.unwrap(); size_downloaded += chunk.len(); - print!("\rDownloaded {} bytes", size_downloaded); + if let Some(len) = len { + print!("\rDownloading {}/{} bytes", size_downloaded, len); + } else { + print!("\rDownloaded {} bytes", size_downloaded); + } + file.write_all(&chunk).await.unwrap(); } @@ -166,6 +184,92 @@ pub async fn maybe_pull_binary(version: &str, pull_version_on_need: bool) { } } +/// Set up a standalone etcd in docker. +pub fn setup_etcd(client_ports: Vec, peer_port: Option, etcd_version: Option<&str>) { + if std::process::Command::new("docker") + .args(["-v"]) + .status() + .is_err() + { + panic!("Docker is not installed"); + } + let peer_port = peer_port.unwrap_or(2380); + let exposed_port: Vec<_> = client_ports.iter().chain(Some(&peer_port)).collect(); + let exposed_port_str = exposed_port + .iter() + .flat_map(|p| ["-p".to_string(), format!("{p}:{p}")]) + .collect::>(); + let etcd_version = etcd_version.unwrap_or("v3.5.17"); + let etcd_image = format!("quay.io/coreos/etcd:{etcd_version}"); + let peer_url = format!("http://0.0.0.0:{peer_port}"); + + let client_ports_fmt = client_ports + .iter() + .map(|p| format!("http://0.0.0.0:{p}")) + .collect::>(); + let mut arg_list = vec![]; + arg_list.extend([ + "run", + "-d", + "-v", + "/usr/share/ca-certificates/:/etc/ssl/certs", + ]); + arg_list.extend(exposed_port_str.iter().map(std::ops::Deref::deref)); + arg_list.extend([ + "--name", + "etcd", + &etcd_image, + "etcd", + "-name", + "etcd0", + "-listen-client-urls", + ]); + + arg_list.extend(client_ports_fmt.iter().map(std::ops::Deref::deref)); + arg_list.extend(["-listen-peer-urls", &peer_url, "-initial-cluster-state new"]); + + let mut cmd = std::process::Command::new("docker"); + + cmd.args(arg_list); + + println!("Starting etcd with command: {:?}", cmd); + + let status = cmd.status(); + if status.is_err() { + panic!("Failed to start etcd: {:?}", status); + } else if let Ok(status) = status { + if status.success() { + println!( + "Started etcd with client ports {:?} and peer port {}, statues:{status:?}", + client_ports, peer_port + ); + } else { + panic!("Failed to start etcd: {:?}", status); + } + } +} + +/// Stop the etcd container +pub fn stop_etcd() { + let status = std::process::Command::new("docker") + .args(["container", "stop", "etcd"]) + .status(); + if status.is_err() { + panic!("Failed to stop etcd: {:?}", status); + } else { + println!("Stopped etcd"); + } + // rm the container + let status = std::process::Command::new("docker") + .args(["container", "rm", "etcd"]) + .status(); + if status.is_err() { + panic!("Failed to remove etcd container: {:?}", status); + } else { + println!("Removed etcd container"); + } +} + /// Get the dir of test cases. This function only works when the runner is run /// under the project's dir because it depends on some envs set by cargo. pub fn get_case_dir(case_dir: Option) -> String {