diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index 8d7a34f05239..fd68fa4735b8 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -175,6 +175,7 @@ impl TableMetadataAllocator { }; Ok(table_route) } + pub async fn create( &self, ctx: &TableMetadataAllocatorContext, diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index 65dfcc8cf7c4..d127a10f4fc9 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -18,6 +18,8 @@ mod grpc; mod http; #[macro_use] mod sql; +#[macro_use] +mod region_migration; // #[macro_use] // mod region_failover; @@ -26,4 +28,6 @@ http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); // region_failover_tests!(File, S3, S3WithCache, Oss, Azblob); sql_tests!(File); +region_migration_tests!(File); + // TODO(niebayes): add integration tests for remote wal. diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index b267c01fc675..cd17c4f4d7bd 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -38,22 +38,57 @@ use store_api::storage::RegionId; use table::metadata::TableId; use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; use tests_integration::test_util::{ - check_output_stream, get_test_store_config, run_test_with_kafka_wal, StorageType, - PEER_PLACEHOLDER_ADDR, + check_output_stream, get_test_store_config, StorageType, PEER_PLACEHOLDER_ADDR, }; use uuid::Uuid; const TEST_TABLE_NAME: &str = "migration_target"; -#[tokio::test(flavor = "multi_thread")] -async fn test_region_migration_fs() { - common_telemetry::init_default_ut_logging(); - run_test_with_kafka_wal(|endpoints| { - Box::pin(async move { test_region_migration(StorageType::File, endpoints).await }) - }) - .await +#[macro_export] +macro_rules! region_migration_test { + ($service:ident, $($(#[$meta:meta])* $test:ident),*,) => { + paste::item! { + mod [] { + $( + #[tokio::test(flavor = "multi_thread")] + $( + #[$meta] + )* + async fn [< $test >]() { + let store_type = tests_integration::test_util::StorageType::$service; + if store_type.test_on() { + common_telemetry::init_default_ut_logging(); + tests_integration::test_util::run_test_with_kafka_wal(|endpoints| { + Box::pin(async move { $crate::region_migration::$test(store_type, endpoints).await }) + }) + .await + } + + } + )* + } + } + }; } +#[macro_export] +macro_rules! region_migration_tests { + ($($service:ident),*) => { + $( + region_migration_test!( + $service, + + test_region_migration, + test_region_migration_multiple_regions, + test_region_migration_all_regions, + test_region_migration_incorrect_from_peer, + test_region_migration_incorrect_region_id, + ); + )* + }; +} + +/// A naive region migration test. pub async fn test_region_migration(store_type: StorageType, endpoints: Vec) { let cluster_name = "test_region_migration"; let peer_factory = |id| Peer { @@ -66,9 +101,11 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec, +) { + let cluster_name = "test_region_migration_multiple_regions"; + let peer_factory = |id| Peer { + id, + addr: PEER_PLACEHOLDER_ADDR.to_string(), + }; + + // Prepares test cluster. + let (store_config, _guard) = get_test_store_config(&store_type); + let home_dir = create_temp_dir("test_region_migration_multiple_regions_data_home"); + let datanodes = 5u64; + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let const_selector = Arc::new(ConstNodeSelector::new(vec![ + peer_factory(1), + peer_factory(2), + peer_factory(2), + ])); + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints.clone(), + linger: Duration::from_millis(25), + ..Default::default() + })) + .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { + broker_endpoints: endpoints, + num_topics: 3, + topic_name_prefix: Uuid::new_v4().to_string(), + ..Default::default() + })) + .with_shared_home_dir(Arc::new(home_dir)) + .with_meta_selector(const_selector.clone()) + .build() + .await; + let mut logical_timer = 1685508715000; + let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone(); + + // Prepares test table. + let table_id = prepare_testing_table(&cluster).await; + + // Inserts data + let results = insert_values(&cluster.frontend, logical_timer).await; + logical_timer += 1000; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // The region distribution + let mut distribution = find_region_distribution(&table_metadata_manager, table_id).await; + assert_eq!(distribution.len(), 2); + + // Selecting target of region migration. + let region_migration_manager = cluster.meta_srv.region_migration_manager(); + let (peer_1, peer_1_regions) = distribution.pop_first().unwrap(); + let (peer_2, peer_2_regions) = distribution.pop_first().unwrap(); + + // Picks the peer only contains as from peer. + let ((from_peer_id, from_regions), (to_peer_id, mut to_regions)) = if peer_1_regions.len() == 1 + { + ((peer_1, peer_1_regions), (peer_2, peer_2_regions)) + } else { + ((peer_2, peer_2_regions), (peer_1, peer_1_regions)) + }; + + info!( + "Selecting from peer: {from_peer_id}, and regions: {:?}", + from_regions + ); + info!( + "Selecting to peer: {to_peer_id}, and regions: {:?}", to_regions ); @@ -176,10 +351,292 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec) { + let cluster_name = "test_region_migration_all_regions"; + let peer_factory = |id| Peer { + id, + addr: PEER_PLACEHOLDER_ADDR.to_string(), + }; + + // Prepares test cluster. + let (store_config, _guard) = get_test_store_config(&store_type); + let home_dir = create_temp_dir("test_region_migration_all_regions_data_home"); + let datanodes = 5u64; + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let const_selector = Arc::new(ConstNodeSelector::new(vec![ + peer_factory(2), + peer_factory(2), + peer_factory(2), + ])); + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints.clone(), + linger: Duration::from_millis(25), + ..Default::default() + })) + .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { + broker_endpoints: endpoints, + num_topics: 3, + topic_name_prefix: Uuid::new_v4().to_string(), + ..Default::default() + })) + .with_shared_home_dir(Arc::new(home_dir)) + .with_meta_selector(const_selector.clone()) + .build() + .await; + let mut logical_timer = 1685508715000; + let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone(); + + // Prepares test table. + let table_id = prepare_testing_table(&cluster).await; + + // Inserts data + let results = insert_values(&cluster.frontend, logical_timer).await; + logical_timer += 1000; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // The region distribution + let mut distribution = find_region_distribution(&table_metadata_manager, table_id).await; + assert_eq!(distribution.len(), 1); + + // Selecting target of region migration. + let region_migration_manager = cluster.meta_srv.region_migration_manager(); + let (from_peer_id, mut from_regions) = distribution.pop_first().unwrap(); + let to_peer_id = 1; + let mut to_regions = Vec::new(); + info!( + "Selecting from peer: {from_peer_id}, and regions: {:?}", + from_regions + ); + info!( + "Selecting to peer: {to_peer_id}, and regions: {:?}", + to_regions + ); + + let region_id = RegionId::new(table_id, from_regions[0]); + // Trigger region migration. + let procedure = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + region_id, + peer_factory(from_peer_id), + peer_factory(to_peer_id), + )) + .await + .unwrap(); + info!("Started region procedure: {}!", procedure.unwrap()); + + // Prepares expected region distribution. + to_regions.push(from_regions.remove(0)); + // Keeps asc order. + to_regions.sort(); + distribution.insert(to_peer_id, to_regions); + distribution.insert(from_peer_id, from_regions); + + // Waits condition + wait_condition( + Duration::from_secs(10), + Box::pin(async move { + loop { + let region_migration = + find_region_distribution(&table_metadata_manager, table_id).await; + if region_migration == distribution { + info!("Found new distribution: {region_migration:?}"); + break; + } else { + info!("Found the unexpected distribution: {region_migration:?}, expected: {distribution:?}"); + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + }), + ) + .await; + + // Inserts more table. + let results = insert_values(&cluster.frontend, logical_timer).await; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // Asserts the writes. + assert_values(&cluster.frontend).await; + + // Triggers again. + let procedure = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + region_id, + peer_factory(from_peer_id), + peer_factory(to_peer_id), + )) + .await + .unwrap(); + assert!(procedure.is_none()); +} + +pub async fn test_region_migration_incorrect_from_peer( + store_type: StorageType, + endpoints: Vec, +) { + let cluster_name = "test_region_migration_incorrect_from_peer"; + let peer_factory = |id| Peer { + id, + addr: PEER_PLACEHOLDER_ADDR.to_string(), + }; + + // Prepares test cluster. + let (store_config, _guard) = get_test_store_config(&store_type); + let home_dir = create_temp_dir("test_region_migration_incorrect_from_peer_data_home"); + let datanodes = 5u64; + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let const_selector = Arc::new(ConstNodeSelector::new(vec![ + peer_factory(1), + peer_factory(2), + peer_factory(3), + ])); + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints.clone(), + linger: Duration::from_millis(25), + ..Default::default() + })) + .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { + broker_endpoints: endpoints, + num_topics: 3, + topic_name_prefix: Uuid::new_v4().to_string(), + ..Default::default() + })) + .with_shared_home_dir(Arc::new(home_dir)) + .with_meta_selector(const_selector.clone()) + .build() + .await; + let logical_timer = 1685508715000; + let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone(); + + // Prepares test table. + let table_id = prepare_testing_table(&cluster).await; + + // Inserts data + let results = insert_values(&cluster.frontend, logical_timer).await; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // The region distribution + let distribution = find_region_distribution(&table_metadata_manager, table_id).await; + assert_eq!(distribution.len(), 3); + let region_migration_manager = cluster.meta_srv.region_migration_manager(); + + let region_id = RegionId::new(table_id, 1); + + // Trigger region migration. + let err = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + region_id, + peer_factory(5), + peer_factory(1), + )) + .await + .unwrap_err(); + + assert!(matches!( + err, + meta_srv::error::Error::InvalidArguments { .. } + )); +} + +pub async fn test_region_migration_incorrect_region_id( + store_type: StorageType, + endpoints: Vec, +) { + let cluster_name = "test_region_migration_incorrect_region_id"; + let peer_factory = |id| Peer { + id, + addr: PEER_PLACEHOLDER_ADDR.to_string(), + }; + + // Prepares test cluster. + let (store_config, _guard) = get_test_store_config(&store_type); + let home_dir = create_temp_dir("test_region_migration_incorrect_region_id_data_home"); + let datanodes = 5u64; + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let const_selector = Arc::new(ConstNodeSelector::new(vec![ + peer_factory(1), + peer_factory(2), + peer_factory(3), + ])); + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints.clone(), + linger: Duration::from_millis(25), + ..Default::default() + })) + .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { + broker_endpoints: endpoints, + num_topics: 3, + topic_name_prefix: Uuid::new_v4().to_string(), + ..Default::default() + })) + .with_shared_home_dir(Arc::new(home_dir)) + .with_meta_selector(const_selector.clone()) + .build() + .await; + let logical_timer = 1685508715000; + let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone(); + + // Prepares test table. + let table_id = prepare_testing_table(&cluster).await; + + // Inserts data + let results = insert_values(&cluster.frontend, logical_timer).await; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // The region distribution + let distribution = find_region_distribution(&table_metadata_manager, table_id).await; + assert_eq!(distribution.len(), 3); + let region_migration_manager = cluster.meta_srv.region_migration_manager(); + + let region_id = RegionId::new(table_id, 5); + + // Trigger region migration. + let err = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + region_id, + peer_factory(2), + peer_factory(1), + )) + .await + .unwrap_err(); + + assert!(matches!( + err, + meta_srv::error::Error::RegionRouteNotFound { .. } + )); +} + +struct ConstNodeSelector { peers: Vec, } +impl ConstNodeSelector { + fn new(peers: Vec) -> Self { + Self { peers } + } +} + #[async_trait::async_trait] impl Selector for ConstNodeSelector { type Context = SelectorContext;