diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index cf0d1fd02e27..ae54a00d986a 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -22,6 +22,7 @@ use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend}; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; +use common_config::WalConfig; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; @@ -64,12 +65,14 @@ pub struct GreptimeDbCluster { pub frontend: Arc, } +#[derive(Clone)] pub struct GreptimeDbClusterBuilder { cluster_name: String, kv_backend: KvBackendRef, store_config: Option, store_providers: Option>, datanodes: Option, + wal_config: WalConfig, } impl GreptimeDbClusterBuilder { @@ -95,6 +98,7 @@ impl GreptimeDbClusterBuilder { store_config: None, store_providers: None, datanodes: None, + wal_config: WalConfig::default(), } } @@ -113,6 +117,11 @@ impl GreptimeDbClusterBuilder { self } + pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self { + self.wal_config = wal_config; + self + } + pub async fn build(self) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); @@ -176,19 +185,27 @@ impl GreptimeDbClusterBuilder { for i in 0..datanodes { let datanode_id = i as u64 + 1; - + let mode = Mode::Distributed; let mut opts = if let Some(store_config) = &self.store_config { let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name)); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); dir_guards.push(FileDirGuard::new(home_tmp_dir)); - create_datanode_opts(store_config.clone(), vec![], home_dir) + create_datanode_opts( + mode, + store_config.clone(), + vec![], + home_dir, + self.wal_config.clone(), + ) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( + mode, StorageType::File, self.store_providers.clone().unwrap_or_default(), &format!("{}-dn-{}", self.cluster_name, datanode_id), + self.wal_config.clone(), ); storage_guards.push(guard.storage_guards); @@ -197,7 +214,6 @@ impl GreptimeDbClusterBuilder { opts }; opts.node_id = Some(datanode_id); - opts.mode = Mode::Distributed; let datanode = self.create_datanode(opts, meta_srv.clone()).await; diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index b0b28ba4651c..730694b8c67f 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -21,7 +21,7 @@ mod otlp; mod prom_store; pub mod test_util; -mod standalone; +pub mod standalone; #[cfg(test)] mod tests; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 072ff2282099..20348c462aa0 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use cmd::options::MixOptions; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_config::KvBackendConfig; +use common_config::{KvBackendConfig, WalConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl_manager::DdlManager; @@ -32,6 +32,7 @@ use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; +use servers::Mode; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -42,8 +43,10 @@ pub struct GreptimeDbStandalone { pub guard: TestGuard, } +#[derive(Clone)] pub struct GreptimeDbStandaloneBuilder { instance_name: String, + wal_config: WalConfig, store_providers: Option>, default_store: Option, plugin: Option, @@ -56,6 +59,7 @@ impl GreptimeDbStandaloneBuilder { store_providers: None, plugin: None, default_store: None, + wal_config: WalConfig::default(), } } @@ -82,12 +86,22 @@ impl GreptimeDbStandaloneBuilder { } } + pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self { + self.wal_config = wal_config; + self + } + pub async fn build(self) -> GreptimeDbStandalone { let default_store_type = self.default_store.unwrap_or(StorageType::File); let store_types = self.store_providers.unwrap_or_default(); - let (opts, guard) = - create_tmp_dir_and_datanode_opts(default_store_type, store_types, &self.instance_name); + let (opts, guard) = create_tmp_dir_and_datanode_opts( + Mode::Standalone, + default_store_type, + store_types, + &self.instance_name, + self.wal_config.clone(), + ); let procedure_config = ProcedureConfig::default(); let kv_backend_config = KvBackendConfig::default(); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 6bb91b89eb98..04e31d91ca3f 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -21,6 +21,7 @@ use std::time::Duration; use auth::UserProviderRef; use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; +use common_config::WalConfig; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_query::Output; @@ -294,9 +295,11 @@ impl TestGuard { } pub fn create_tmp_dir_and_datanode_opts( + mode: Mode, default_store_type: StorageType, store_provider_types: Vec, name: &str, + wal_config: WalConfig, ) -> (DatanodeOptions, TestGuard) { let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); @@ -314,7 +317,7 @@ pub fn create_tmp_dir_and_datanode_opts( store_providers.push(store); storage_guards.push(StorageGuard(data_tmp_dir)) } - let opts = create_datanode_opts(default_store, store_providers, home_dir); + let opts = create_datanode_opts(mode, default_store, store_providers, home_dir, wal_config); ( opts, @@ -326,9 +329,11 @@ pub fn create_tmp_dir_and_datanode_opts( } pub(crate) fn create_datanode_opts( + mode: Mode, default_store: ObjectStoreConfig, providers: Vec, home_dir: String, + wal_config: WalConfig, ) -> DatanodeOptions { DatanodeOptions { node_id: Some(0), @@ -339,7 +344,8 @@ pub(crate) fn create_datanode_opts( store: default_store, ..Default::default() }, - mode: Mode::Standalone, + mode, + wal: wal_config, ..Default::default() } } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 692a9de8d6ab..8d1e421738aa 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -14,6 +14,8 @@ mod instance_test; mod promql_test; +// TODO(weny): Remove it. +#[allow(dead_code, unused_macros)] mod test_util; use std::collections::HashMap; diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index edf21ba7601d..32be423e69bb 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::env; use std::sync::Arc; +use common_config::wal::KafkaConfig; +use common_config::WalConfig; use common_query::Output; use common_recordbatch::util; +use common_telemetry::warn; use common_test_util::find_workspace_path; use frontend::instance::Instance; use rstest_reuse::{self, template}; @@ -25,7 +29,13 @@ use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; use crate::test_util::StorageType; use crate::tests::{create_distributed_instance, MockDistributedInstance}; -pub(crate) trait MockInstance { +#[async_trait::async_trait] +pub(crate) trait RebuildableMockInstance: MockInstance { + // Rebuilds the instance and returns rebuilt frontend instance. + async fn rebuild(&mut self) -> Arc; +} + +pub(crate) trait MockInstance: Sync + Send { fn frontend(&self) -> Arc; fn is_distributed_mode(&self) -> bool; @@ -51,6 +61,54 @@ impl MockInstance for MockDistributedInstance { } } +pub(crate) enum MockInstanceBuilder { + Standalone(GreptimeDbStandaloneBuilder), + Distributed(GreptimeDbClusterBuilder), +} + +impl MockInstanceBuilder { + async fn build(&self) -> Arc { + match self { + MockInstanceBuilder::Standalone(builder) => Arc::new(builder.clone().build().await), + MockInstanceBuilder::Distributed(builder) => { + Arc::new(MockDistributedInstance(builder.clone().build().await)) + } + } + } +} + +pub(crate) struct TestContext { + instance: Arc, + builder: MockInstanceBuilder, +} + +impl TestContext { + async fn new(builder: MockInstanceBuilder) -> Self { + let instance = builder.build().await; + + Self { instance, builder } + } +} + +#[async_trait::async_trait] +impl RebuildableMockInstance for TestContext { + async fn rebuild(&mut self) -> Arc { + let instance = self.builder.build().await; + self.instance = instance; + self.instance.frontend() + } +} + +impl MockInstance for TestContext { + fn frontend(&self) -> Arc { + self.instance.frontend() + } + + fn is_distributed_mode(&self) -> bool { + self.instance.is_distributed_mode() + } +} + pub(crate) async fn standalone() -> Arc { let test_name = uuid::Uuid::new_v4().to_string(); let instance = GreptimeDbStandaloneBuilder::new(&test_name).build().await; @@ -86,6 +144,61 @@ pub(crate) async fn distributed_with_multiple_object_stores() -> Arc Option> { + let _ = dotenv::dotenv(); + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + common_telemetry::init_default_ut_logging(); + if endpoints.is_empty() { + warn!("The endpoints is empty, skipping the test"); + return None; + } + + let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect(); + let test_name = uuid::Uuid::new_v4().to_string(); + let builder = GreptimeDbStandaloneBuilder::new(&test_name).with_wal_config(WalConfig::Kafka( + KafkaConfig { + broker_endpoints: endpoints, + ..Default::default() + }, + )); + let instance = TestContext::new(MockInstanceBuilder::Standalone(builder)).await; + Some(Box::new(instance)) +} + +pub(crate) async fn distributed_with_kafka_wal() -> Option> { + let _ = dotenv::dotenv(); + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + common_telemetry::init_default_ut_logging(); + if endpoints.is_empty() { + warn!("The endpoints is empty, skipping the test"); + return None; + } + + let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect(); + let test_name = uuid::Uuid::new_v4().to_string(); + let builder = GreptimeDbClusterBuilder::new(&test_name) + .await + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints, + ..Default::default() + })); + let instance = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + Some(Box::new(instance)) +} + +#[template] +#[rstest] +#[case::test_with_standalone(standalone_with_kafka_wal())] +#[case::test_with_distributed(distributed_with_kafka_wal())] +#[awt] +#[tokio::test(flavor = "multi_thread")] +pub(crate) fn both_instances_cases_with_kafka_wal( + #[future] + #[case] + instance: Arc, +) { +} + #[template] #[rstest] #[case::test_with_standalone(standalone_with_multiple_object_stores())]