diff --git a/Cargo.lock b/Cargo.lock index 93b9319b079a..71e481b5aee9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,8 +209,8 @@ dependencies = [ "greptime-proto", "prost", "snafu", - "tonic 0.9.2", - "tonic-build 0.9.2", + "tonic", + "tonic-build", ] [[package]] @@ -382,7 +382,7 @@ dependencies = [ "paste", "prost", "tokio", - "tonic 0.9.2", + "tonic", ] [[package]] @@ -1538,7 +1538,7 @@ dependencies = [ "substrait 0.7.5", "tokio", "tokio-stream", - "tonic 0.9.2", + "tonic", "tracing", "tracing-subscriber", ] @@ -1760,7 +1760,7 @@ dependencies = [ "rand", "snafu", "tokio", - "tonic 0.9.2", + "tonic", "tower", ] @@ -2005,7 +2005,7 @@ checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" dependencies = [ "prost", "prost-types", - "tonic 0.9.2", + "tonic", "tracing-core", ] @@ -2027,7 +2027,7 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic 0.9.2", + "tonic", "tracing", "tracing-core", "tracing-subscriber", @@ -2647,7 +2647,7 @@ dependencies = [ "tokio", "tokio-stream", "toml", - "tonic 0.9.2", + "tonic", "tower", "tower-http", "url", @@ -3025,16 +3025,16 @@ dependencies = [ [[package]] name = "etcd-client" -version = "0.10.4" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58" +checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89" dependencies = [ "http", "prost", "tokio", "tokio-stream", - "tonic 0.8.3", - "tonic-build 0.8.4", + "tonic", + "tonic-build", "tower", "tower-service", ] @@ -3257,7 +3257,7 @@ dependencies = [ "table", "tokio", "toml", - "tonic 0.9.2", + "tonic", "tower", "uuid", ] @@ -4096,13 +4096,13 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4398d20c56d5f7939cc2960789cb1fa7dd18e6fe#4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=aee86f4a68c59873961c9b99ee7ed6a4341bf773#aee86f4a68c59873961c9b99ee7ed6a4341bf773" dependencies = [ "prost", "serde", "serde_json", - "tonic 0.9.2", - "tonic-build 0.9.2", + "tonic", + "tonic-build", ] [[package]] @@ -5141,7 +5141,7 @@ dependencies = [ "table", "tokio", "tokio-stream", - "tonic 0.9.2", + "tonic", "tower", "tracing", "tracing-subscriber", @@ -5188,7 +5188,7 @@ dependencies = [ "table", "tokio", "tokio-stream", - "tonic 0.9.2", + "tonic", "tower", "tracing", "tracing-subscriber", @@ -8584,7 +8584,7 @@ dependencies = [ "tokio-rustls 0.24.0", "tokio-stream", "tokio-test", - "tonic 0.9.2", + "tonic", "tonic-reflection", "tower", "tower-http", @@ -9138,8 +9138,8 @@ dependencies = [ "table", "tokio", "tokio-util", - "tonic 0.9.2", - "tonic-build 0.9.2", + "tonic", + "tonic-build", "uuid", ] @@ -9596,7 +9596,7 @@ dependencies = [ "table", "tempfile", "tokio", - "tonic 0.9.2", + "tonic", "tower", "uuid", ] @@ -9971,38 +9971,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "tonic" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64 0.13.1", - "bytes", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost", - "prost-derive", - "tokio", - "tokio-stream", - "tokio-util", - "tower", - "tower-layer", - "tower-service", - "tracing", - "tracing-futures", -] - [[package]] name = "tonic" version = "0.9.2" @@ -10034,19 +10002,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tonic-build" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4" -dependencies = [ - "prettyplease 0.1.25", - "proc-macro2", - "prost-build", - "quote", - "syn 1.0.109", -] - [[package]] name = "tonic-build" version = "0.9.2" @@ -10070,7 +10025,7 @@ dependencies = [ "prost-types", "tokio", "tokio-stream", - "tonic 0.9.2", + "tonic", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4a7e192689df..3a3ccf501ddf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" } futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aee86f4a68c59873961c9b99ee7ed6a4341bf773" } itertools = "0.10" parquet = "40.0" paste = "1.0" diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index 03f068d09bd3..617ec32096ba 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -32,7 +32,7 @@ pub mod mock; // FIXME(LFC): Used in next PR. #[allow(dead_code)] -mod region_alive_keeper; +pub mod region_alive_keeper; #[derive(Debug, Clone)] pub struct Kv(pub Vec, pub Vec); diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index a3d754e7be07..6cc2c787997e 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -20,13 +20,14 @@ use std::sync::Arc; use async_stream::stream; use async_trait::async_trait; use common_catalog::consts::{MAX_SYS_TABLE_ID, MITO_ENGINE}; +use common_meta::ident::TableIdent; use common_telemetry::{debug, error, info, warn}; use dashmap::DashMap; use futures::Stream; use futures_util::{StreamExt, TryStreamExt}; use metrics::{decrement_gauge, increment_gauge}; use parking_lot::RwLock; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use table::engine::manager::TableEngineManagerRef; use table::engine::{EngineContext, TableReference}; use table::requests::{CreateTableRequest, OpenTableRequest}; @@ -43,6 +44,7 @@ use crate::helper::{ build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, CATALOG_KEY_PREFIX, }; +use crate::remote::region_alive_keeper::RegionAliveKeepers; use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogManager, CatalogProvider, CatalogProviderRef, @@ -57,16 +59,23 @@ pub struct RemoteCatalogManager { catalogs: Arc>>, engine_manager: TableEngineManagerRef, system_table_requests: Mutex>, + region_alive_keepers: Arc, } impl RemoteCatalogManager { - pub fn new(engine_manager: TableEngineManagerRef, node_id: u64, backend: KvBackendRef) -> Self { + pub fn new( + engine_manager: TableEngineManagerRef, + node_id: u64, + backend: KvBackendRef, + region_alive_keepers: Arc, + ) -> Self { Self { engine_manager, node_id, backend, catalogs: Default::default(), system_table_requests: Default::default(), + region_alive_keepers, } } @@ -576,34 +585,44 @@ impl CatalogManager for RemoteCatalogManager { } async fn register_table(&self, request: RegisterTableRequest) -> Result { - let catalog_name = request.catalog; - let schema_name = request.schema; + let catalog = &request.catalog; + let schema = &request.schema; + let table_name = &request.table_name; + let schema_provider = self - .catalog(&catalog_name) + .catalog(catalog) .await? .context(CatalogNotFoundSnafu { - catalog_name: &catalog_name, + catalog_name: catalog, })? - .schema(&schema_name) + .schema(schema) .await? - .with_context(|| SchemaNotFoundSnafu { - catalog: &catalog_name, - schema: &schema_name, - })?; - if schema_provider.table_exist(&request.table_name).await? { - return TableExistsSnafu { - table: format!("{}.{}.{}", &catalog_name, &schema_name, &request.table_name), + .context(SchemaNotFoundSnafu { catalog, schema })?; + ensure!( + !schema_provider.table_exist(table_name).await?, + TableExistsSnafu { + table: common_catalog::format_full_table_name(catalog, schema, table_name), } - .fail(); - } + ); increment_gauge!( crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, 1.0, - &[crate::metrics::db_label(&catalog_name, &schema_name)], + &[crate::metrics::db_label(catalog, schema)], ); schema_provider - .register_table(request.table_name, request.table) + .register_table(table_name.to_string(), request.table.clone()) + .await?; + + let table_ident = TableIdent { + catalog: request.catalog, + schema: request.schema, + table: request.table_name, + table_id: request.table_id, + engine: request.table.table_info().meta.engine.clone(), + }; + self.region_alive_keepers + .register_table(table_ident, request.table) .await?; Ok(true) @@ -626,6 +645,21 @@ impl CatalogManager for RemoteCatalogManager { 1.0, &[crate::metrics::db_label(catalog_name, schema_name)], ); + + if let Some(table) = result.as_ref() { + let table_info = table.table_info(); + let table_ident = TableIdent { + catalog: request.catalog, + schema: request.schema, + table: request.table_name, + table_id: table_info.ident.table_id, + engine: table_info.meta.engine.clone(), + }; + self.region_alive_keepers + .deregister_table(&table_ident) + .await; + } + Ok(result.is_none()) } diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index a291fe9337de..51192c9d889c 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -12,15 +12,193 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::TableIdent; +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; + +use common_meta::ident::TableIdent; +use common_meta::RegionIdent; use common_telemetry::{debug, error, info, warn}; +use snafu::ResultExt; use store_api::storage::RegionNumber; +use table::engine::manager::TableEngineManagerRef; use table::engine::{CloseTableResult, EngineContext, TableEngineRef}; use table::requests::CloseTableRequest; -use tokio::sync::mpsc; +use table::TableRef; +use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; +use crate::error::{Result, TableEngineNotFoundSnafu}; + +/// [RegionAliveKeepers] manages all [RegionAliveKeeper] in a scope of tables. +pub struct RegionAliveKeepers { + table_engine_manager: TableEngineManagerRef, + keepers: Arc>>>, +} + +impl RegionAliveKeepers { + pub fn new(table_engine_manager: TableEngineManagerRef) -> Self { + Self { + table_engine_manager, + keepers: Arc::new(Mutex::new(HashMap::new())), + } + } + + async fn find_keeper(&self, table_ident: &TableIdent) -> Option> { + self.keepers.lock().await.get(table_ident).cloned() + } + + pub(crate) async fn register_table( + &self, + table_ident: TableIdent, + table: TableRef, + ) -> Result<()> { + let keeper = self.find_keeper(&table_ident).await; + if keeper.is_some() { + return Ok(()); + } + + let table_engine = self + .table_engine_manager + .engine(&table_ident.engine) + .context(TableEngineNotFoundSnafu { + engine_name: &table_ident.engine, + })?; + + let keeper = Arc::new(RegionAliveKeeper::new(table_engine, table_ident.clone())); + for r in table.table_info().meta.region_numbers.iter() { + keeper.register_region(*r).await; + } + + info!("Register RegionAliveKeeper for table {table_ident}"); + self.keepers.lock().await.insert(table_ident, keeper); + Ok(()) + } + + pub(crate) async fn deregister_table(&self, table_ident: &TableIdent) { + if self.keepers.lock().await.remove(table_ident).is_some() { + info!("Deregister RegionAliveKeeper for table {table_ident}"); + } + } + + pub async fn register_region(&self, region_ident: &RegionIdent) { + let table_ident = ®ion_ident.table_ident; + let Some(keeper) = self.find_keeper(table_ident).await else { + // Alive keeper could be affected by lagging msg, just warn and ignore. + warn!("Alive keeper for region {region_ident} is not found!"); + return; + }; + keeper.register_region(region_ident.region_number).await + } + + pub async fn deregister_region(&self, region_ident: &RegionIdent) { + let table_ident = ®ion_ident.table_ident; + let Some(keeper) = self.find_keeper(table_ident).await else { + // Alive keeper could be affected by lagging msg, just warn and ignore. + warn!("Alive keeper for region {region_ident} is not found!"); + return; + }; + keeper.deregister_region(region_ident.region_number).await + } + + pub async fn start(&self, heartbeat_interval_millis: u64) { + for keeper in self.keepers.lock().await.values() { + keeper.start(heartbeat_interval_millis).await; + } + } +} + +/// [RegionAliveKeeper] starts a countdown for each region in a table. When deadline is reached, +/// the region will be closed. +/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its +/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this +/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to +/// countdown. +struct RegionAliveKeeper { + table_engine: TableEngineRef, + table_ident: TableIdent, + countdown_task_handles: Arc>>>, +} + +impl RegionAliveKeeper { + fn new(table_engine: TableEngineRef, table_ident: TableIdent) -> Self { + Self { + table_engine, + table_ident, + countdown_task_handles: Arc::new(Mutex::new(HashMap::new())), + } + } + + async fn find_handle(&self, region: &RegionNumber) -> Option> { + self.countdown_task_handles + .lock() + .await + .get(region) + .cloned() + } + + async fn register_region(&self, region: RegionNumber) { + if self.find_handle(®ion).await.is_some() { + return; + } + + let countdown_task_handles = self.countdown_task_handles.clone(); + let on_task_finished = async move { + let _ = countdown_task_handles.lock().await.remove(®ion); + }; + let handle = Arc::new(CountdownTaskHandle::new( + self.table_engine.clone(), + self.table_ident.clone(), + region, + || on_task_finished, + )); + + self.countdown_task_handles + .lock() + .await + .insert(region, handle); + info!( + "Register alive countdown for new region {region} in table {}", + self.table_ident + ) + } + + async fn deregister_region(&self, region: RegionNumber) { + if self + .countdown_task_handles + .lock() + .await + .remove(®ion) + .is_some() + { + info!( + "Deregister alive countdown for region {region} in table {}", + self.table_ident + ) + } + } + + async fn start(&self, heartbeat_interval_millis: u64) { + for handle in self.countdown_task_handles.lock().await.values() { + handle.start(heartbeat_interval_millis).await; + } + info!( + "RegionAliveKeeper for table {} is started!", + self.table_ident + ) + } + + async fn keep_lived(&self, designated_regions: Vec, deadline: Instant) { + for region in designated_regions { + if let Some(handle) = self.find_handle(®ion).await { + handle.reset_deadline(deadline).await; + } + // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it. + } + } +} + #[derive(Debug)] enum CountdownCommand { Start(u64), @@ -42,14 +220,14 @@ impl CountdownTaskHandle { /// be invoked if the task is cancelled (by dropping the handle). This is because we want something /// meaningful to be done when the task is finished, e.g. deregister the handle from the map. /// While dropping the handle does not necessarily mean the task is finished. - fn new( + fn new( table_engine: TableEngineRef, table_ident: TableIdent, region: RegionNumber, - on_task_finished: F, + on_task_finished: impl FnOnce() -> Fut + Send + 'static, ) -> Self where - F: FnOnce() + Send + 'static, + Fut: Future + Send, { let (tx, rx) = mpsc::channel(1024); @@ -60,7 +238,8 @@ impl CountdownTaskHandle { rx, }; let handler = common_runtime::spawn_bg(async move { - countdown_task.run(on_task_finished).await; + countdown_task.run().await; + on_task_finished().await; }); Self { tx, handler } @@ -103,10 +282,7 @@ struct CountdownTask { } impl CountdownTask { - async fn run(&mut self, on_task_finished: F) - where - F: FnOnce() + Send + 'static, - { + async fn run(&mut self) { // 30 years. See `Instant::far_future`. let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); @@ -133,16 +309,8 @@ impl CountdownTask { debug!("Reset deadline to region {region} of table {table_ident} to {deadline:?}"); countdown.set(tokio::time::sleep_until(deadline)); } - // Else we have received a past deadline, it could be the following - // possible reasons: - // 1. the clock drift happened in Metasrv or Datanode; - // 2. some messages are lagged; - // 3. during the period of Datanode startup. - // We can safely ignore case 2 and 3. However, case 1 is catastrophic. - // We must think of a way to resolve it, maybe using logical clock, or - // simply fire an alarm for it? For now, we can tolerate that, because it's - // seconds resolution to deadline, and clock drift is less likely - // to happen in that resolution. + // Else the countdown could be not started yet, or during startup protection. + // Can be safely ignored. }, None => { info!( @@ -168,8 +336,6 @@ impl CountdownTask { } } } - - on_task_finished(); } async fn close_region(&self) -> CloseTableResult { @@ -202,12 +368,142 @@ mod test { use std::sync::Arc; use datatypes::schema::RawSchema; + use table::engine::manager::MemoryTableEngineManager; use table::engine::{TableEngine, TableReference}; use table::requests::{CreateTableRequest, TableOptions}; + use table::test_util::EmptyTable; + use tokio::sync::oneshot; use super::*; use crate::remote::mock::MockTableEngine; + #[tokio::test(flavor = "multi_thread")] + async fn test_region_alive_keepers() { + let table_engine = Arc::new(MockTableEngine::default()); + let table_engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine)); + let keepers = RegionAliveKeepers::new(table_engine_manager); + + let catalog = "my_catalog"; + let schema = "my_schema"; + let table = "my_table"; + let table_ident = TableIdent { + catalog: catalog.to_string(), + schema: schema.to_string(), + table: table.to_string(), + table_id: 1, + engine: "MockTableEngine".to_string(), + }; + let table = Arc::new(EmptyTable::new(CreateTableRequest { + id: 1, + catalog_name: catalog.to_string(), + schema_name: schema.to_string(), + table_name: table.to_string(), + desc: None, + schema: RawSchema { + column_schemas: vec![], + timestamp_index: None, + version: 0, + }, + region_numbers: vec![1, 2, 3], + primary_key_indices: vec![], + create_if_not_exists: false, + table_options: TableOptions::default(), + engine: "MockTableEngine".to_string(), + })); + + keepers + .register_table(table_ident.clone(), table) + .await + .unwrap(); + assert!(keepers.keepers.lock().await.contains_key(&table_ident)); + + keepers + .register_region(&RegionIdent { + cluster_id: 1, + datanode_id: 1, + table_ident: table_ident.clone(), + region_number: 4, + }) + .await; + + keepers.start(5000).await; + for keeper in keepers.keepers.lock().await.values() { + for handle in keeper.countdown_task_handles.lock().await.values() { + // assert countdown tasks are started + assert!(deadline(&handle.tx).await <= Instant::now() + Duration::from_secs(20)); + } + } + + keepers + .deregister_region(&RegionIdent { + cluster_id: 1, + datanode_id: 1, + table_ident: table_ident.clone(), + region_number: 1, + }) + .await; + let mut regions = keepers + .find_keeper(&table_ident) + .await + .unwrap() + .countdown_task_handles + .lock() + .await + .keys() + .copied() + .collect::>(); + regions.sort(); + assert_eq!(regions, vec![2, 3, 4]); + + keepers.deregister_table(&table_ident).await; + assert!(keepers.keepers.lock().await.is_empty()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_region_alive_keeper() { + let table_engine = Arc::new(MockTableEngine::default()); + let table_ident = TableIdent { + catalog: "my_catalog".to_string(), + schema: "my_schema".to_string(), + table: "my_table".to_string(), + table_id: 1024, + engine: "mito".to_string(), + }; + let keeper = RegionAliveKeeper::new(table_engine, table_ident); + + let region = 1; + assert!(keeper.find_handle(®ion).await.is_none()); + keeper.register_region(region).await; + assert!(keeper.find_handle(®ion).await.is_some()); + + let sender = &keeper + .countdown_task_handles + .lock() + .await + .get(®ion) + .unwrap() + .tx + .clone(); + + let ten_seconds_later = || Instant::now() + Duration::from_secs(10); + + keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await; + assert!(keeper.find_handle(&2).await.is_none()); + assert!(keeper.find_handle(&3).await.is_none()); + + let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29); + // assert if keeper is not started, keep_lived is of no use + assert!(deadline(sender).await > far_future); + + keeper.start(1000).await; + keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await; + // assert keep_lived works if keeper is started + assert!(deadline(sender).await <= ten_seconds_later()); + + keeper.deregister_region(region).await; + assert!(keeper.find_handle(®ion).await.is_none()); + } + #[tokio::test(flavor = "multi_thread")] async fn test_countdown_task_handle() { let table_engine = Arc::new(MockTableEngine::default()); @@ -220,10 +516,12 @@ mod test { }; let finished = Arc::new(AtomicBool::new(false)); let finished_clone = finished.clone(); - let handle = - CountdownTaskHandle::new(table_engine.clone(), table_ident.clone(), 1, move || { - finished_clone.store(true, Ordering::Relaxed) - }); + let handle = CountdownTaskHandle::new( + table_engine.clone(), + table_ident.clone(), + 1, + || async move { finished_clone.store(true, Ordering::Relaxed) }, + ); let tx = handle.tx.clone(); // assert countdown task is running @@ -244,7 +542,7 @@ mod test { let finished = Arc::new(AtomicBool::new(false)); let finished_clone = finished.clone(); - let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, move || { + let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, || async move { finished_clone.store(true, Ordering::Relaxed) }); handle.tx.send(CountdownCommand::Start(100)).await.unwrap(); @@ -296,15 +594,9 @@ mod test { rx, }; common_runtime::spawn_bg(async move { - task.run(|| ()).await; + task.run().await; }); - async fn deadline(tx: &mpsc::Sender) -> Instant { - let (s, r) = tokio::sync::oneshot::channel(); - tx.send(CountdownCommand::Deadline(s)).await.unwrap(); - r.await.unwrap() - } - // if countdown task is not started, its deadline is set to far future assert!(deadline(&tx).await > Instant::now() + Duration::from_secs(86400 * 365 * 29)); @@ -326,4 +618,10 @@ mod test { tokio::time::sleep(Duration::from_millis(2000)).await; assert!(!table_engine.table_exists(ctx, &table_ref)); } + + async fn deadline(tx: &mpsc::Sender) -> Instant { + let (s, r) = oneshot::channel(); + tx.send(CountdownCommand::Deadline(s)).await.unwrap(); + r.await.unwrap() + } } diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 324bae49f232..776c6be6c901 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -22,6 +22,7 @@ mod tests { use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use catalog::remote::mock::{MockKvBackend, MockTableEngine}; + use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::remote::{ CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, @@ -138,8 +139,12 @@ mod tests { table_engine.clone(), )); - let catalog_manager = - RemoteCatalogManager::new(engine_manager.clone(), node_id, cached_backend.clone()); + let catalog_manager = RemoteCatalogManager::new( + engine_manager.clone(), + node_id, + cached_backend.clone(), + Arc::new(RegionAliveKeepers::new(engine_manager.clone())), + ); catalog_manager.start().await.unwrap(); ( diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 20c9edbb7cc6..a75234fd924e 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -52,6 +52,9 @@ pub enum Error { err_msg: String, location: Location, }, + + #[snafu(display("Invalid protobuf message, err: {}", err_msg))] + InvalidProtoMsg { err_msg: String, location: Location }, } pub type Result = std::result::Result; @@ -61,7 +64,10 @@ impl ErrorExt for Error { use Error::*; match self { IllegalServerState { .. } => StatusCode::Internal, - SerdeJson { .. } | RouteInfoCorrupted { .. } => StatusCode::Unexpected, + + SerdeJson { .. } | RouteInfoCorrupted { .. } | InvalidProtoMsg { .. } => { + StatusCode::Unexpected + } SendMessage { .. } => StatusCode::Internal, diff --git a/src/common/meta/src/ident.rs b/src/common/meta/src/ident.rs new file mode 100644 index 000000000000..cfc08fa7bc83 --- /dev/null +++ b/src/common/meta/src/ident.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Display, Formatter}; + +use api::v1::meta::TableIdent as RawTableIdent; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; + +use crate::error::{Error, InvalidProtoMsgSnafu}; + +#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct TableIdent { + pub catalog: String, + pub schema: String, + pub table: String, + pub table_id: u32, + pub engine: String, +} + +impl Display for TableIdent { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Table(id={}, name='{}.{}.{}', engine='{}')", + self.table_id, self.catalog, self.schema, self.table, self.engine, + ) + } +} + +impl TryFrom for TableIdent { + type Error = Error; + + fn try_from(value: RawTableIdent) -> Result { + let table_name = value.table_name.context(InvalidProtoMsgSnafu { + err_msg: "'table_name' is missing in TableIdent", + })?; + Ok(Self { + catalog: table_name.catalog_name, + schema: table_name.schema_name, + table: table_name.table_name, + table_id: value.table_id, + engine: value.engine, + }) + } +} diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 0b9bce4bdd62..da09f81b1b72 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -16,6 +16,7 @@ use std::fmt::{Display, Formatter}; use serde::{Deserialize, Serialize}; +use crate::ident::TableIdent; use crate::{ClusterId, DatanodeId}; #[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -49,25 +50,6 @@ impl From for TableIdent { } } -#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct TableIdent { - pub catalog: String, - pub schema: String, - pub table: String, - pub table_id: u32, - pub engine: String, -} - -impl Display for TableIdent { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "TableIdent(table_id='{}', table_name='{}.{}.{}', table_engine='{}')", - self.table_id, self.catalog, self.schema, self.table, self.engine, - ) - } -} - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct SimpleReply { pub result: bool, diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index b49a7c4620f9..7659bfed2c13 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -14,6 +14,7 @@ pub mod error; pub mod heartbeat; +pub mod ident; pub mod instruction; pub mod key; pub mod peer; diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index fa3e362df0bf..c240af2c55b7 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -131,6 +131,9 @@ impl HeartbeatTask { let addr = resolve_addr(&self.server_addr, &self.server_hostname); info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); + // TODO(LFC): Continued in next PR. + // self.region_alive_keepers.start(interval).await; + let meta_client = self.meta_client.clone(); let catalog_manager_clone = self.catalog_manager.clone(); diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index 6fac2cf5da33..638c1aa014e8 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -14,15 +14,14 @@ use std::sync::Arc; +use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::{CatalogManagerRef, DeregisterTableRequest}; use common_catalog::format_full_table_name; use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::{ - Instruction, InstructionReply, RegionIdent, SimpleReply, TableIdent, -}; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_telemetry::{error, info, warn}; use snafu::ResultExt; use store_api::storage::RegionNumber; @@ -36,6 +35,7 @@ use crate::error::{self, Result}; pub struct CloseRegionHandler { catalog_manager: CatalogManagerRef, table_engine_manager: TableEngineManagerRef, + region_alive_keepers: Arc, } impl HeartbeatResponseHandler for CloseRegionHandler { @@ -53,29 +53,26 @@ impl HeartbeatResponseHandler for CloseRegionHandler { let mailbox = ctx.mailbox.clone(); let self_ref = Arc::new(self.clone()); - - let RegionIdent { - table_ident: - TableIdent { - engine, - catalog, - schema, - table, - .. - }, - region_number, - .. - } = region_ident; - + let region_alive_keepers = self.region_alive_keepers.clone(); common_runtime::spawn_bg(async move { + let table_ident = ®ion_ident.table_ident; + let table_ref = TableReference::full( + &table_ident.catalog, + &table_ident.schema, + &table_ident.table, + ); let result = self_ref .close_region_inner( - engine, - &TableReference::full(&catalog, &schema, &table), - vec![region_number], + table_ident.engine.clone(), + &table_ref, + vec![region_ident.region_number], ) .await; + if matches!(result, Ok(true)) { + region_alive_keepers.deregister_region(®ion_ident).await; + } + if let Err(e) = mailbox .send((meta, CloseRegionHandler::map_result(result))) .await @@ -92,10 +89,12 @@ impl CloseRegionHandler { pub fn new( catalog_manager: CatalogManagerRef, table_engine_manager: TableEngineManagerRef, + region_alive_keepers: Arc, ) -> Self { Self { catalog_manager, table_engine_manager, + region_alive_keepers, } } diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 361b50e279a0..71b4863f6d64 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -15,15 +15,14 @@ use std::sync::Arc; use catalog::error::Error as CatalogError; +use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::{CatalogManagerRef, RegisterTableRequest}; use common_catalog::format_full_table_name; use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::{ - Instruction, InstructionReply, RegionIdent, SimpleReply, TableIdent, -}; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_telemetry::{error, warn}; use snafu::ResultExt; use store_api::storage::RegionNumber; @@ -37,6 +36,7 @@ use crate::error::{self, Result}; pub struct OpenRegionHandler { catalog_manager: CatalogManagerRef, table_engine_manager: TableEngineManagerRef, + region_alive_keepers: Arc, } impl HeartbeatResponseHandler for OpenRegionHandler { @@ -55,9 +55,24 @@ impl HeartbeatResponseHandler for OpenRegionHandler { let mailbox = ctx.mailbox.clone(); let self_ref = Arc::new(self.clone()); + let region_alive_keepers = self.region_alive_keepers.clone(); common_runtime::spawn_bg(async move { - let (engine, request) = OpenRegionHandler::prepare_request(region_ident); - let result = self_ref.open_region_inner(engine, request).await; + let table_ident = ®ion_ident.table_ident; + let request = OpenTableRequest { + catalog_name: table_ident.catalog.clone(), + schema_name: table_ident.schema.clone(), + table_name: table_ident.table.clone(), + table_id: table_ident.table_id, + region_numbers: vec![region_ident.region_number], + }; + let result = self_ref + .open_region_inner(table_ident.engine.clone(), request) + .await; + + if matches!(result, Ok(true)) { + region_alive_keepers.register_region(®ion_ident).await; + } + if let Err(e) = mailbox .send((meta, OpenRegionHandler::map_result(result))) .await @@ -73,10 +88,12 @@ impl OpenRegionHandler { pub fn new( catalog_manager: CatalogManagerRef, table_engine_manager: TableEngineManagerRef, + region_alive_keepers: Arc, ) -> Self { Self { catalog_manager, table_engine_manager, + region_alive_keepers, } } @@ -97,32 +114,6 @@ impl OpenRegionHandler { ) } - fn prepare_request(ident: RegionIdent) -> (String, OpenTableRequest) { - let RegionIdent { - table_ident: - TableIdent { - catalog, - schema, - table, - table_id, - engine, - }, - region_number, - .. - } = ident; - - ( - engine, - OpenTableRequest { - catalog_name: catalog, - schema_name: schema, - table_name: table, - table_id, - region_numbers: vec![region_number], - }, - ) - } - /// Returns true if a table or target regions have been opened. async fn regions_opened( &self, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index aa00d6423b2f..72336f95b7a1 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -18,7 +18,8 @@ use std::time::Duration; use std::{fs, path}; use api::v1::meta::Role; -use catalog::remote::CachedMetaKvBackend; +use catalog::remote::region_alive_keeper::RegionAliveKeepers; +use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::paths::{CLUSTER_DIR, WAL_DIR}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; @@ -56,9 +57,9 @@ use table::Table; use crate::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig}; use crate::error::{ - self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, - NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu, - StartProcedureManagerSnafu, StopProcedureManagerSnafu, + self, CatalogSnafu, IncorrectInternalStateSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, + MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, + ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::heartbeat::handler::close_region::CloseRegionHandler; use crate::heartbeat::handler::open_region::OpenRegionHandler; @@ -150,7 +151,7 @@ impl Instance { ); // create remote catalog manager - let (catalog_manager, table_id_provider) = match opts.mode { + let (catalog_manager, table_id_provider, heartbeat_task) = match opts.mode { Mode::Standalone => { if opts.enable_memory_catalog { let catalog = Arc::new(catalog::local::MemoryCatalogManager::default()); @@ -170,6 +171,7 @@ impl Instance { ( catalog.clone() as CatalogManagerRef, Some(catalog as TableIdProviderRef), + None, ) } else { let catalog = Arc::new( @@ -181,51 +183,58 @@ impl Instance { ( catalog.clone() as CatalogManagerRef, Some(catalog as TableIdProviderRef), + None, ) } } Mode::Distributed => { - let kv_backend = Arc::new(CachedMetaKvBackend::new( - meta_client.as_ref().unwrap().clone(), - )); + let meta_client = meta_client.context(IncorrectInternalStateSnafu { + state: "meta client is not provided when creating distributed Datanode", + })?; + + let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new( + let region_alive_keepers = + Arc::new(RegionAliveKeepers::new(engine_manager.clone())); + + let catalog_manager = Arc::new(RemoteCatalogManager::new( engine_manager.clone(), opts.node_id.context(MissingNodeIdSnafu)?, kv_backend, + region_alive_keepers.clone(), )); - (catalog as CatalogManagerRef, None) + + let handlers_executor = HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler::default()), + Arc::new(OpenRegionHandler::new( + catalog_manager.clone(), + engine_manager.clone(), + region_alive_keepers.clone(), + )), + Arc::new(CloseRegionHandler::new( + catalog_manager.clone(), + engine_manager.clone(), + region_alive_keepers, + )), + ]); + + let heartbeat_task = Some(HeartbeatTask::new( + opts.node_id.context(MissingNodeIdSnafu)?, + opts.rpc_addr.clone(), + opts.rpc_hostname.clone(), + meta_client, + catalog_manager.clone(), + Arc::new(handlers_executor), + )); + + (catalog_manager as CatalogManagerRef, None, heartbeat_task) } }; let factory = QueryEngineFactory::new(catalog_manager.clone(), false); let query_engine = factory.query_engine(); - let handlers_executor = HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler::default()), - Arc::new(OpenRegionHandler::new( - catalog_manager.clone(), - engine_manager.clone(), - )), - Arc::new(CloseRegionHandler::new( - catalog_manager.clone(), - engine_manager.clone(), - )), - ]); - - let heartbeat_task = match opts.mode { - Mode::Standalone => None, - Mode::Distributed => Some(HeartbeatTask::new( - opts.node_id.context(MissingNodeIdSnafu)?, - opts.rpc_addr.clone(), - opts.rpc_hostname.clone(), - meta_client.as_ref().unwrap().clone(), - catalog_manager.clone(), - Arc::new(handlers_executor), - )), - }; - let procedure_manager = create_procedure_manager(opts.node_id.unwrap_or(0), &opts.procedure, object_store) .await?; diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index e17c18914223..6a278cefa4a2 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -19,14 +19,14 @@ use api::v1::greptime_request::Request as GrpcRequest; use api::v1::meta::HeartbeatResponse; use api::v1::query_request::Query; use api::v1::QueryRequest; +use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::CatalogManagerRef; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::instruction::{ - Instruction, InstructionReply, RegionIdent, SimpleReply, TableIdent, -}; +use common_meta::ident::TableIdent; +use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply}; use common_query::Output; use datatypes::prelude::ConcreteDataType; use servers::query_handler::grpc::GrpcQueryHandler; @@ -61,7 +61,11 @@ async fn test_close_region_handler() { } = prepare_handler_test("test_close_region_handler").await; let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( - CloseRegionHandler::new(catalog_manager_ref.clone(), engine_manager_ref.clone()), + CloseRegionHandler::new( + catalog_manager_ref.clone(), + engine_manager_ref.clone(), + Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone())), + ), )])); prepare_table(instance.inner()).await; @@ -127,14 +131,18 @@ async fn test_open_region_handler() { .. } = prepare_handler_test("test_open_region_handler").await; + let region_alive_keeper = Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone())); + let executor = Arc::new(HandlerGroupExecutor::new(vec![ Arc::new(OpenRegionHandler::new( catalog_manager_ref.clone(), engine_manager_ref.clone(), + region_alive_keeper.clone(), )), Arc::new(CloseRegionHandler::new( catalog_manager_ref.clone(), engine_manager_ref.clone(), + region_alive_keeper, )), ])); diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 92d905fdbbfe..111c1ae86f2f 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -18,7 +18,8 @@ use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent}; +use common_meta::ident::TableIdent; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::table_name::TableName; use common_telemetry::{error, info}; use partition::manager::TableRouteCacheInvalidatorRef; diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index 0e6d03782438..c066ad601b59 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -23,7 +23,8 @@ use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent}; +use common_meta::ident::TableIdent; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::table_name::TableName; use partition::manager::TableRouteCacheInvalidator; use tokio::sync::mpsc; diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 3785db611eef..0945cadd3741 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -12,7 +12,7 @@ common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-telemetry = { path = "../common/telemetry" } common-meta = { path = "../common/meta" } -etcd-client = "0.10" +etcd-client = "0.11" rand.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 77679b943c96..539a3140230c 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -24,7 +24,7 @@ common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } dashmap = "5.4" derive_builder = "0.12" -etcd-client = "0.10" +etcd-client = "0.11" futures.workspace = true h2 = "0.3" http-body = "0.4" diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 7b1205dfb9f0..658f45d3a5be 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -130,6 +130,7 @@ impl Pushers { .push(HeartbeatResponse { header: Some(pusher.header()), mailbox_message: Some(mailbox_message), + ..Default::default() }) .await } @@ -151,6 +152,7 @@ impl Pushers { .push(HeartbeatResponse { header: Some(pusher.header()), mailbox_message: Some(mailbox_message), + ..Default::default() }) .await?; } @@ -232,6 +234,7 @@ impl HeartbeatHandlerGroup { let res = HeartbeatResponse { header, mailbox_message: acc.into_mailbox_message(), + ..Default::default() }; Ok(res) } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 68f772959007..953efaf6af96 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; use common_catalog::consts::MITO_ENGINE; -use common_meta::instruction::TableIdent; +use common_meta::ident::TableIdent; use common_meta::RegionIdent; use table::engine::table_id; diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index b9fd28ef937b..c292c118b9bd 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -246,7 +246,7 @@ impl FailureDetectorContainer { #[cfg(test)] mod tests { use common_catalog::consts::MITO_ENGINE; - use common_meta::instruction::TableIdent; + use common_meta::ident::TableIdent; use rand::Rng; use super::*; diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 305e40a5e2d8..6a0934b220a0 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -88,6 +88,7 @@ mod tests { let res = HeartbeatResponse { header, mailbox_message: acc.into_mailbox_message(), + ..Default::default() }; assert_eq!(1, res.header.unwrap().cluster_id); } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index a409dbc13674..7424dc613916 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -343,7 +343,8 @@ mod tests { use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader}; use catalog::helper::TableGlobalKey; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; - use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent}; + use common_meta::ident::TableIdent; + use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::DatanodeId; use common_procedure::BoxedProcedure; use rand::prelude::SliceRandom; diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index 2956fd026ecf..27ec574f4bfa 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -14,7 +14,7 @@ use async_trait::async_trait; use common_error::prelude::{ErrorExt, StatusCode}; -use common_meta::instruction::TableIdent; +use common_meta::ident::TableIdent; use common_meta::peer::Peer; use common_meta::RegionIdent; use common_telemetry::info; diff --git a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs index 52739a0ab89a..e43ec05cb106 100644 --- a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs +++ b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs @@ -14,7 +14,8 @@ use api::v1::meta::MailboxMessage; use async_trait::async_trait; -use common_meta::instruction::{Instruction, TableIdent}; +use common_meta::ident::TableIdent; +use common_meta::instruction::Instruction; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index c2388dc2922e..679ace68876b 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -36,6 +36,7 @@ impl EmptyTable { .primary_key_indices(req.primary_key_indices) .next_column_id(0) .options(req.table_options) + .region_numbers(req.region_numbers) .build(); let table_info = TableInfoBuilder::default() .catalog_name(req.catalog_name) diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index ecb888f8d09a..f2e942ce8d9b 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -77,7 +77,7 @@ impl MemTable { .schema(schema) .primary_key_indices(vec![]) .value_indices(vec![]) - .engine("mock".to_string()) + .engine("mito".to_string()) .next_column_id(0) .engine_options(Default::default()) .options(Default::default()) diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 869faebe4666..053d5e355b15 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -20,7 +20,7 @@ use api::v1::meta::Peer; use catalog::helper::TableGlobalKey; use catalog::remote::{CachedMetaKvBackend, Kv}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; -use common_meta::instruction::TableIdent; +use common_meta::ident::TableIdent; use common_meta::rpc::router::TableRoute; use common_meta::table_name::TableName; use common_meta::RegionIdent;