Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka wal integration test utils #3069

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend};
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
use common_config::WalConfig;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
Expand Down Expand Up @@ -64,12 +65,14 @@ pub struct GreptimeDbCluster {
pub frontend: Arc<FeInstance>,
}

#[derive(Clone)]
pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
store_config: Option<ObjectStoreConfig>,
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
wal_config: WalConfig,
}

impl GreptimeDbClusterBuilder {
Expand All @@ -95,6 +98,7 @@ impl GreptimeDbClusterBuilder {
store_config: None,
store_providers: None,
datanodes: None,
wal_config: WalConfig::default(),
}
}

Expand All @@ -113,6 +117,11 @@ impl GreptimeDbClusterBuilder {
self
}

pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
self.wal_config = wal_config;
self
}

pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);

Expand Down Expand Up @@ -176,19 +185,27 @@ impl GreptimeDbClusterBuilder {

for i in 0..datanodes {
let datanode_id = i as u64 + 1;

let mode = Mode::Distributed;
let mut opts = if let Some(store_config) = &self.store_config {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();

dir_guards.push(FileDirGuard::new(home_tmp_dir));

create_datanode_opts(store_config.clone(), vec![], home_dir)
create_datanode_opts(
mode,
store_config.clone(),
vec![],
home_dir,
self.wal_config.clone(),
)
} else {
let (opts, guard) = create_tmp_dir_and_datanode_opts(
mode,
StorageType::File,
self.store_providers.clone().unwrap_or_default(),
&format!("{}-dn-{}", self.cluster_name, datanode_id),
self.wal_config.clone(),
);

storage_guards.push(guard.storage_guards);
Expand All @@ -197,7 +214,6 @@ impl GreptimeDbClusterBuilder {
opts
};
opts.node_id = Some(datanode_id);
opts.mode = Mode::Distributed;

let datanode = self.create_datanode(opts, meta_srv.clone()).await;

Expand Down
2 changes: 1 addition & 1 deletion tests-integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod otlp;
mod prom_store;
pub mod test_util;

mod standalone;
pub mod standalone;
#[cfg(test)]
mod tests;

Expand Down
20 changes: 17 additions & 3 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use cmd::options::MixOptions;
use common_base::Plugins;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::KvBackendConfig;
use common_config::{KvBackendConfig, WalConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl_manager::DdlManager;
Expand All @@ -32,6 +32,7 @@ use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use servers::Mode;

use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};

Expand All @@ -42,8 +43,10 @@ pub struct GreptimeDbStandalone {
pub guard: TestGuard,
}

#[derive(Clone)]
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: WalConfig,
store_providers: Option<Vec<StorageType>>,
default_store: Option<StorageType>,
plugin: Option<Plugins>,
Expand All @@ -56,6 +59,7 @@ impl GreptimeDbStandaloneBuilder {
store_providers: None,
plugin: None,
default_store: None,
wal_config: WalConfig::default(),
}
}

Expand All @@ -82,12 +86,22 @@ impl GreptimeDbStandaloneBuilder {
}
}

pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
self.wal_config = wal_config;
self
}

pub async fn build(self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.unwrap_or_default();

let (opts, guard) =
create_tmp_dir_and_datanode_opts(default_store_type, store_types, &self.instance_name);
let (opts, guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);

let procedure_config = ProcedureConfig::default();
let kv_backend_config = KvBackendConfig::default();
Expand Down
10 changes: 8 additions & 2 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::Duration;
use auth::UserProviderRef;
use axum::Router;
use catalog::kvbackend::KvBackendCatalogManager;
use common_config::WalConfig;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
Expand Down Expand Up @@ -294,9 +295,11 @@ impl TestGuard {
}

pub fn create_tmp_dir_and_datanode_opts(
mode: Mode,
default_store_type: StorageType,
store_provider_types: Vec<StorageType>,
name: &str,
wal_config: WalConfig,
) -> (DatanodeOptions, TestGuard) {
let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
Expand All @@ -314,7 +317,7 @@ pub fn create_tmp_dir_and_datanode_opts(
store_providers.push(store);
storage_guards.push(StorageGuard(data_tmp_dir))
}
let opts = create_datanode_opts(default_store, store_providers, home_dir);
let opts = create_datanode_opts(mode, default_store, store_providers, home_dir, wal_config);

(
opts,
Expand All @@ -326,9 +329,11 @@ pub fn create_tmp_dir_and_datanode_opts(
}

pub(crate) fn create_datanode_opts(
mode: Mode,
default_store: ObjectStoreConfig,
providers: Vec<ObjectStoreConfig>,
home_dir: String,
wal_config: WalConfig,
) -> DatanodeOptions {
DatanodeOptions {
node_id: Some(0),
Expand All @@ -339,7 +344,8 @@ pub(crate) fn create_datanode_opts(
store: default_store,
..Default::default()
},
mode: Mode::Standalone,
mode,
wal: wal_config,
..Default::default()
}
}
Expand Down
2 changes: 2 additions & 0 deletions tests-integration/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

mod instance_test;
mod promql_test;
// TODO(weny): Remove it.
#[allow(dead_code, unused_macros)]
mod test_util;

use std::collections::HashMap;
Expand Down
115 changes: 114 additions & 1 deletion tests-integration/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env;
use std::sync::Arc;

use common_config::wal::KafkaConfig;
use common_config::WalConfig;
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::warn;
use common_test_util::find_workspace_path;
use frontend::instance::Instance;
use rstest_reuse::{self, template};
Expand All @@ -25,7 +29,13 @@ use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
use crate::test_util::StorageType;
use crate::tests::{create_distributed_instance, MockDistributedInstance};

pub(crate) trait MockInstance {
#[async_trait::async_trait]
pub(crate) trait RebuildableMockInstance: MockInstance {
// Rebuilds the instance and returns rebuilt frontend instance.
async fn rebuild(&mut self) -> Arc<Instance>;
}

pub(crate) trait MockInstance: Sync + Send {
fn frontend(&self) -> Arc<Instance>;

fn is_distributed_mode(&self) -> bool;
Expand All @@ -51,6 +61,54 @@ impl MockInstance for MockDistributedInstance {
}
}

pub(crate) enum MockInstanceBuilder {
Standalone(GreptimeDbStandaloneBuilder),
Distributed(GreptimeDbClusterBuilder),
}

impl MockInstanceBuilder {
async fn build(&self) -> Arc<dyn MockInstance> {
match self {
MockInstanceBuilder::Standalone(builder) => Arc::new(builder.clone().build().await),
MockInstanceBuilder::Distributed(builder) => {
Arc::new(MockDistributedInstance(builder.clone().build().await))
}
}
}
}

pub(crate) struct TestContext {
instance: Arc<dyn MockInstance>,
builder: MockInstanceBuilder,
}

impl TestContext {
async fn new(builder: MockInstanceBuilder) -> Self {
let instance = builder.build().await;

Self { instance, builder }
}
}

#[async_trait::async_trait]
impl RebuildableMockInstance for TestContext {
async fn rebuild(&mut self) -> Arc<Instance> {
let instance = self.builder.build().await;
self.instance = instance;
self.instance.frontend()
}
}

impl MockInstance for TestContext {
fn frontend(&self) -> Arc<Instance> {
self.instance.frontend()
}

fn is_distributed_mode(&self) -> bool {
self.instance.is_distributed_mode()
}
}

pub(crate) async fn standalone() -> Arc<dyn MockInstance> {
let test_name = uuid::Uuid::new_v4().to_string();
let instance = GreptimeDbStandaloneBuilder::new(&test_name).build().await;
Expand Down Expand Up @@ -86,6 +144,61 @@ pub(crate) async fn distributed_with_multiple_object_stores() -> Arc<dyn MockIns
Arc::new(MockDistributedInstance(cluster))
}

pub(crate) async fn standalone_with_kafka_wal() -> Option<Box<dyn RebuildableMockInstance>> {
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return None;
}

let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbStandaloneBuilder::new(&test_name).with_wal_config(WalConfig::Kafka(
KafkaConfig {
broker_endpoints: endpoints,
..Default::default()
},
));
let instance = TestContext::new(MockInstanceBuilder::Standalone(builder)).await;
Some(Box::new(instance))
}

pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMockInstance>> {
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return None;
}

let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbClusterBuilder::new(&test_name)
.await
.with_wal_config(WalConfig::Kafka(KafkaConfig {
broker_endpoints: endpoints,
..Default::default()
}));
let instance = TestContext::new(MockInstanceBuilder::Distributed(builder)).await;
Some(Box::new(instance))
}

#[template]
#[rstest]
#[case::test_with_standalone(standalone_with_kafka_wal())]
#[case::test_with_distributed(distributed_with_kafka_wal())]
#[awt]
#[tokio::test(flavor = "multi_thread")]
pub(crate) fn both_instances_cases_with_kafka_wal(
#[future]
#[case]
instance: Arc<dyn MockInstance>,
) {
}

#[template]
#[rstest]
#[case::test_with_standalone(standalone_with_multiple_object_stores())]
Expand Down
Loading