From 2dd86b686fb429ba8f43a221b3aac40c59c6aebc Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 19 Jun 2023 19:55:59 +0800 Subject: [PATCH] feat: extend region leases in Metasrv (#1784) * feat: extend region leases in Metasrv * fix: resolve PR comments --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/catalog/src/helper.rs | 14 +- src/catalog/src/remote.rs | 3 - src/catalog/src/remote/region_alive_keeper.rs | 141 ++++++++++- src/common/meta/Cargo.toml | 1 + src/common/meta/src/heartbeat/handler.rs | 12 +- .../handler/parse_mailbox_message.rs | 5 +- src/common/meta/src/ident.rs | 16 +- src/datanode/src/heartbeat.rs | 14 +- .../src/heartbeat/handler/close_region.rs | 4 +- .../src/heartbeat/handler/open_region.rs | 4 +- src/datanode/src/instance.rs | 4 +- src/datanode/src/tests.rs | 23 +- src/frontend/src/heartbeat.rs | 5 +- .../handler/invalidate_table_cache.rs | 4 +- src/frontend/src/heartbeat/handler/tests.rs | 10 +- src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/error.rs | 10 +- src/meta-srv/src/handler.rs | 8 +- src/meta-srv/src/handler/failure_handler.rs | 9 +- .../src/handler/region_lease_handler.rs | 226 ++++++++++++++++++ src/meta-srv/src/metasrv/builder.rs | 9 + src/meta-srv/src/procedure/region_failover.rs | 86 ++++--- .../region_failover/deactivate_region.rs | 30 ++- src/meta-srv/src/table_routes.rs | 50 +++- src/table/src/engine.rs | 2 +- 27 files changed, 608 insertions(+), 89 deletions(-) create mode 100644 src/meta-srv/src/handler/region_lease_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 71e481b5aee9..36ace61d4127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1801,6 +1801,7 @@ name = "common-meta" version = "0.4.0" dependencies = [ "api", + "async-trait", "chrono", "common-catalog", "common-error", @@ -4096,7 +4097,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=aee86f4a68c59873961c9b99ee7ed6a4341bf773#aee86f4a68c59873961c9b99ee7ed6a4341bf773" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5d5eb65bb985ff47b3a417fb2505e315e2f5c319#5d5eb65bb985ff47b3a417fb2505e315e2f5c319" dependencies = [ "prost", "serde", @@ -5185,6 +5186,7 @@ dependencies = [ "serde_json", "servers", "snafu", + "store-api", "table", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 3a3ccf501ddf..a183bbddfef4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" } futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aee86f4a68c59873961c9b99ee7ed6a4341bf773" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5d5eb65bb985ff47b3a417fb2505e315e2f5c319" } itertools = "0.10" parquet = "40.0" paste = "1.0" diff --git a/src/catalog/src/helper.rs b/src/catalog/src/helper.rs index 83a0a84a1705..8520bc3953bd 100644 --- a/src/catalog/src/helper.rs +++ b/src/catalog/src/helper.rs @@ -91,7 +91,7 @@ pub fn build_table_regional_prefix( } /// Table global info has only one key across all datanodes so it does not have `node_id` field. -#[derive(Clone)] +#[derive(Clone, Hash, Eq, PartialEq)] pub struct TableGlobalKey { pub catalog_name: String, pub schema_name: String, @@ -124,6 +124,14 @@ impl TableGlobalKey { table_name: captures[3].to_string(), }) } + + pub fn to_raw_key(&self) -> Vec { + self.to_string().into_bytes() + } + + pub fn try_from_raw_key(key: &[u8]) -> Result { + Self::parse(String::from_utf8_lossy(key)) + } } /// Table global info contains necessary info for a datanode to create table regions, including @@ -141,6 +149,10 @@ impl TableGlobalValue { pub fn table_id(&self) -> TableId { self.table_info.ident.table_id } + + pub fn engine(&self) -> &str { + &self.table_info.meta.engine + } } /// Table regional info that varies between datanode, so it contains a `node_id` field. diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index 617ec32096ba..da32e53bf173 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -29,9 +29,6 @@ mod manager; #[cfg(feature = "testing")] pub mod mock; - -// FIXME(LFC): Used in next PR. -#[allow(dead_code)] pub mod region_alive_keeper; #[derive(Debug, Clone)] diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index 51192c9d889c..327e846b3b7a 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -16,10 +16,15 @@ use std::collections::HashMap; use std::future::Future; use std::sync::Arc; +use async_trait::async_trait; +use common_meta::error::InvalidProtoMsgSnafu; +use common_meta::heartbeat::handler::{ + HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, +}; use common_meta::ident::TableIdent; use common_meta::RegionIdent; use common_telemetry::{debug, error, info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionNumber; use table::engine::manager::TableEngineManagerRef; use table::engine::{CloseTableResult, EngineContext, TableEngineRef}; @@ -35,6 +40,12 @@ use crate::error::{Result, TableEngineNotFoundSnafu}; pub struct RegionAliveKeepers { table_engine_manager: TableEngineManagerRef, keepers: Arc>>>, + + /// The epoch when [RegionAliveKeepers] is created. It's used to get a monotonically non-decreasing + /// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically + /// non-decreasing). The heartbeat request will carry the duration since this epoch, and the + /// duration acts like an "invariant point" for region's keep alive lease. + epoch: Instant, } impl RegionAliveKeepers { @@ -42,6 +53,7 @@ impl RegionAliveKeepers { Self { table_engine_manager, keepers: Arc::new(Mutex::new(HashMap::new())), + epoch: Instant::now(), } } @@ -107,6 +119,50 @@ impl RegionAliveKeepers { keeper.start(heartbeat_interval_millis).await; } } + + pub fn epoch(&self) -> Instant { + self.epoch + } +} + +#[async_trait] +impl HeartbeatResponseHandler for RegionAliveKeepers { + fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { + !ctx.response.region_leases.is_empty() + } + + async fn handle( + &self, + ctx: &mut HeartbeatResponseHandlerContext, + ) -> common_meta::error::Result { + let leases = ctx.response.region_leases.drain(..).collect::>(); + for lease in leases { + let table_ident: TableIdent = match lease + .table_ident + .context(InvalidProtoMsgSnafu { + err_msg: "'table_ident' is missing in RegionLease", + }) + .and_then(|x| x.try_into()) + { + Ok(x) => x, + Err(e) => { + error!(e; ""); + continue; + } + }; + + let Some(keeper) = self.keepers.lock().await.get(&table_ident).cloned() else { + // Alive keeper could be affected by lagging msg, just warn and ignore. + warn!("Alive keeper for table {table_ident} is not found!"); + continue; + }; + + let start_instant = self.epoch + Duration::from_millis(lease.duration_since_epoch); + let deadline = start_instant + Duration::from_secs(lease.lease_seconds); + keeper.keep_lived(lease.regions, deadline).await; + } + Ok(HandleControl::Continue) + } } /// [RegionAliveKeeper] starts a countdown for each region in a table. When deadline is reached, @@ -309,8 +365,11 @@ impl CountdownTask { debug!("Reset deadline to region {region} of table {table_ident} to {deadline:?}"); countdown.set(tokio::time::sleep_until(deadline)); } - // Else the countdown could be not started yet, or during startup protection. - // Can be safely ignored. + // Else the countdown could be either: + // - not started yet; + // - during startup protection; + // - received a lagging heartbeat message. + // All can be safely ignored. }, None => { info!( @@ -367,6 +426,8 @@ mod test { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; + use api::v1::meta::{HeartbeatResponse, RegionLease}; + use common_meta::heartbeat::mailbox::HeartbeatMailbox; use datatypes::schema::RawSchema; use table::engine::manager::MemoryTableEngineManager; use table::engine::{TableEngine, TableReference}; @@ -377,8 +438,7 @@ mod test { use super::*; use crate::remote::mock::MockTableEngine; - #[tokio::test(flavor = "multi_thread")] - async fn test_region_alive_keepers() { + async fn prepare_keepers() -> (TableIdent, RegionAliveKeepers) { let table_engine = Arc::new(MockTableEngine::default()); let table_engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine)); let keepers = RegionAliveKeepers::new(table_engine_manager); @@ -410,13 +470,82 @@ mod test { table_options: TableOptions::default(), engine: "MockTableEngine".to_string(), })); - keepers .register_table(table_ident.clone(), table) .await .unwrap(); assert!(keepers.keepers.lock().await.contains_key(&table_ident)); + (table_ident, keepers) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_heartbeat_response() { + let (table_ident, keepers) = prepare_keepers().await; + + keepers.start(5000).await; + let startup_protection_until = Instant::now() + Duration::from_secs(21); + + let duration_since_epoch = (Instant::now() - keepers.epoch).as_millis() as _; + let lease_seconds = 100; + let response = HeartbeatResponse { + region_leases: vec![RegionLease { + table_ident: Some(table_ident.clone().into()), + regions: vec![1, 3], // Not extending region 2's lease time. + duration_since_epoch, + lease_seconds, + }], + ..Default::default() + }; + let keep_alive_until = keepers.epoch + + Duration::from_millis(duration_since_epoch) + + Duration::from_secs(lease_seconds); + + let (tx, _) = mpsc::channel(8); + let mailbox = Arc::new(HeartbeatMailbox::new(tx)); + let mut ctx = HeartbeatResponseHandlerContext::new(mailbox, response); + + assert!(keepers.handle(&mut ctx).await.unwrap() == HandleControl::Continue); + + // sleep to wait for background task spawned in `handle` + tokio::time::sleep(Duration::from_secs(1)).await; + + async fn test( + keeper: &Arc, + region_number: RegionNumber, + startup_protection_until: Instant, + keep_alive_until: Instant, + is_kept_live: bool, + ) { + let handles = keeper.countdown_task_handles.lock().await; + let deadline = deadline(&handles.get(®ion_number).unwrap().tx).await; + if is_kept_live { + assert!(deadline > startup_protection_until && deadline == keep_alive_until); + } else { + assert!(deadline <= startup_protection_until); + } + } + + let keeper = &keepers + .keepers + .lock() + .await + .get(&table_ident) + .cloned() + .unwrap(); + + // Test region 1 and 3 is kept lived. Their deadlines are updated to desired instant. + test(keeper, 1, startup_protection_until, keep_alive_until, true).await; + test(keeper, 3, startup_protection_until, keep_alive_until, true).await; + + // Test region 2 is not kept lived. It's deadline is not updated: still during startup protection period. + test(keeper, 2, startup_protection_until, keep_alive_until, false).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_region_alive_keepers() { + let (table_ident, keepers) = prepare_keepers().await; + keepers .register_region(&RegionIdent { cluster_id: 1, diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index e6e68b66ecf7..e91a9bbbad11 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] api = { path = "../../api" } +async-trait.workspace = true common-catalog = { path = "../catalog" } common-error = { path = "../error" } common-runtime = { path = "../runtime" } diff --git a/src/common/meta/src/heartbeat/handler.rs b/src/common/meta/src/heartbeat/handler.rs index 567a7921345a..9b24955af3ea 100644 --- a/src/common/meta/src/heartbeat/handler.rs +++ b/src/common/meta/src/heartbeat/handler.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::v1::meta::HeartbeatResponse; +use async_trait::async_trait; use common_telemetry::error; use crate::error::Result; @@ -57,14 +58,16 @@ impl HeartbeatResponseHandlerContext { /// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`]. /// /// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`]. +#[async_trait] pub trait HeartbeatResponseHandler: Send + Sync { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool; - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result; + async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result; } +#[async_trait] pub trait HeartbeatResponseHandlerExecutor: Send + Sync { - fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>; + async fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>; } pub struct HandlerGroupExecutor { @@ -77,14 +80,15 @@ impl HandlerGroupExecutor { } } +#[async_trait] impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor { - fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> { + async fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> { for handler in &self.handlers { if !handler.is_acceptable(&ctx) { continue; } - match handler.handle(&mut ctx) { + match handler.handle(&mut ctx).await { Ok(HandleControl::Done) => break, Ok(HandleControl::Continue) => {} Err(e) => { diff --git a/src/common/meta/src/heartbeat/handler/parse_mailbox_message.rs b/src/common/meta/src/heartbeat/handler/parse_mailbox_message.rs index bc7044011517..fb9d1702fd24 100644 --- a/src/common/meta/src/heartbeat/handler/parse_mailbox_message.rs +++ b/src/common/meta/src/heartbeat/handler/parse_mailbox_message.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; + use crate::error::Result; use crate::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, @@ -21,12 +23,13 @@ use crate::heartbeat::utils::mailbox_message_to_incoming_message; #[derive(Default)] pub struct ParseMailboxMessageHandler; +#[async_trait] impl HeartbeatResponseHandler for ParseMailboxMessageHandler { fn is_acceptable(&self, _ctx: &HeartbeatResponseHandlerContext) -> bool { true } - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result { + async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result { if let Some(message) = &ctx.response.mailbox_message { if message.payload.is_some() { // mailbox_message_to_incoming_message will raise an error if payload is none diff --git a/src/common/meta/src/ident.rs b/src/common/meta/src/ident.rs index cfc08fa7bc83..522a242e2274 100644 --- a/src/common/meta/src/ident.rs +++ b/src/common/meta/src/ident.rs @@ -14,7 +14,7 @@ use std::fmt::{Display, Formatter}; -use api::v1::meta::TableIdent as RawTableIdent; +use api::v1::meta::{TableIdent as RawTableIdent, TableName}; use serde::{Deserialize, Serialize}; use snafu::OptionExt; @@ -55,3 +55,17 @@ impl TryFrom for TableIdent { }) } } + +impl From for RawTableIdent { + fn from(table_ident: TableIdent) -> Self { + Self { + table_id: table_ident.table_id, + engine: table_ident.engine, + table_name: Some(TableName { + catalog_name: table_ident.catalog, + schema_name: table_ident.schema, + table_name: table_ident.table, + }), + } + } +} diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index c240af2c55b7..87275b66136c 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, NodeStat, Peer}; +use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::{datanode_stat, CatalogManagerRef}; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, @@ -42,6 +43,7 @@ pub struct HeartbeatTask { catalog_manager: CatalogManagerRef, interval: u64, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + region_alive_keepers: Arc, } impl Drop for HeartbeatTask { @@ -59,6 +61,7 @@ impl HeartbeatTask { meta_client: Arc, catalog_manager: CatalogManagerRef, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + region_alive_keepers: Arc, ) -> Self { Self { node_id, @@ -69,6 +72,7 @@ impl HeartbeatTask { catalog_manager, interval: 5_000, // default interval is set to 5 secs resp_handler_executor, + region_alive_keepers, } } @@ -94,7 +98,7 @@ impl HeartbeatTask { } let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res); - if let Err(e) = Self::handle_response(ctx, handler_executor.clone()) { + if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await { error!(e; "Error while handling heartbeat response"); } if !running.load(Ordering::Acquire) { @@ -106,13 +110,14 @@ impl HeartbeatTask { Ok(tx) } - fn handle_response( + async fn handle_response( ctx: HeartbeatResponseHandlerContext, handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Result<()> { trace!("heartbeat response: {:?}", ctx.response); handler_executor .handle(ctx) + .await .context(error::HandleHeartbeatResponseSnafu) } @@ -131,8 +136,7 @@ impl HeartbeatTask { let addr = resolve_addr(&self.server_addr, &self.server_hostname); info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); - // TODO(LFC): Continued in next PR. - // self.region_alive_keepers.start(interval).await; + self.region_alive_keepers.start(interval).await; let meta_client = self.meta_client.clone(); let catalog_manager_clone = self.catalog_manager.clone(); @@ -150,6 +154,7 @@ impl HeartbeatTask { ) .await?; + let epoch = self.region_alive_keepers.epoch(); common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); @@ -195,6 +200,7 @@ impl HeartbeatTask { ..Default::default() }), region_stats, + duration_since_epoch: (Instant::now() - epoch).as_millis() as u64, ..Default::default() }; sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval)); diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index 638c1aa014e8..1dc0157fe723 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use async_trait::async_trait; use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::{CatalogManagerRef, DeregisterTableRequest}; use common_catalog::format_full_table_name; @@ -38,6 +39,7 @@ pub struct CloseRegionHandler { region_alive_keepers: Arc, } +#[async_trait] impl HeartbeatResponseHandler for CloseRegionHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!( @@ -46,7 +48,7 @@ impl HeartbeatResponseHandler for CloseRegionHandler { ) } - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { + async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() else { unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'"); }; diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 71b4863f6d64..e56116a48ff5 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use async_trait::async_trait; use catalog::error::Error as CatalogError; use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::{CatalogManagerRef, RegisterTableRequest}; @@ -39,6 +40,7 @@ pub struct OpenRegionHandler { region_alive_keepers: Arc, } +#[async_trait] impl HeartbeatResponseHandler for OpenRegionHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!( @@ -47,7 +49,7 @@ impl HeartbeatResponseHandler for OpenRegionHandler { ) } - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { + async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else { unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'"); }; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 72336f95b7a1..5e5a63006fbf 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -215,8 +215,9 @@ impl Instance { Arc::new(CloseRegionHandler::new( catalog_manager.clone(), engine_manager.clone(), - region_alive_keepers, + region_alive_keepers.clone(), )), + region_alive_keepers.clone(), ]); let heartbeat_task = Some(HeartbeatTask::new( @@ -226,6 +227,7 @@ impl Instance { meta_client, catalog_manager.clone(), Arc::new(handlers_executor), + region_alive_keepers, )); (catalog_manager as CatalogManagerRef, None, heartbeat_task) diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 6a278cefa4a2..1796a6c8755b 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -75,7 +75,8 @@ async fn test_close_region_handler() { executor.clone(), mailbox.clone(), close_region_instruction(), - ); + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, @@ -89,7 +90,8 @@ async fn test_close_region_handler() { executor.clone(), mailbox.clone(), close_region_instruction(), - ); + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, @@ -112,7 +114,8 @@ async fn test_close_region_handler() { cluster_id: 1, datanode_id: 2, }), - ); + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, @@ -149,7 +152,7 @@ async fn test_open_region_handler() { prepare_table(instance.inner()).await; // Opens a opened table - handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()); + handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()).await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, @@ -172,7 +175,8 @@ async fn test_open_region_handler() { cluster_id: 1, datanode_id: 2, }), - ); + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, @@ -184,7 +188,8 @@ async fn test_open_region_handler() { executor.clone(), mailbox.clone(), close_region_instruction(), - ); + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, @@ -193,7 +198,7 @@ async fn test_open_region_handler() { assert_test_table_not_found(instance.inner()).await; // Opens demo table - handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()); + handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()).await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, @@ -228,7 +233,7 @@ pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> Messag } } -fn handle_instruction( +async fn handle_instruction( executor: Arc, mailbox: Arc, instruction: Instruction, @@ -237,7 +242,7 @@ fn handle_instruction( let mut ctx: HeartbeatResponseHandlerContext = HeartbeatResponseHandlerContext::new(mailbox, response); ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction)); - executor.handle(ctx).unwrap(); + executor.handle(ctx).await.unwrap(); } fn close_region_instruction() -> Instruction { diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 72644fb25e1f..edf608573bba 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -84,7 +84,7 @@ impl HeartbeatTask { Ok(Some(resp)) => { debug!("Receiving heartbeat response: {:?}", resp); let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp); - if let Err(e) = capture_self.handle_response(ctx) { + if let Err(e) = capture_self.handle_response(ctx).await { error!(e; "Error while handling heartbeat response"); } } @@ -153,9 +153,10 @@ impl HeartbeatTask { }); } - fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> { + async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> { self.resp_handler_executor .handle(ctx) + .await .context(error::HandleHeartbeatResponseSnafu) } diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 111c1ae86f2f..e728a1f93953 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; use catalog::helper::TableGlobalKey; use catalog::remote::KvCacheInvalidatorRef; use common_meta::error::Result as MetaResult; @@ -30,6 +31,7 @@ pub struct InvalidateTableCacheHandler { table_route_cache_invalidator: TableRouteCacheInvalidatorRef, } +#[async_trait] impl HeartbeatResponseHandler for InvalidateTableCacheHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!( @@ -38,7 +40,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { ) } - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { + async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { // TODO(weny): considers introducing a macro let Some((meta, Instruction::InvalidateTableCache(table_ident))) = ctx.incoming_message.take() else { unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"); diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index c066ad601b59..e80b52ae77b5 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -90,7 +90,8 @@ async fn test_invalidate_table_cache_handler() { table_id: 0, engine: "mito".to_string(), }), - ); + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( @@ -126,7 +127,8 @@ async fn test_invalidate_table_cache_handler() { table_id: 0, engine: "mito".to_string(), }), - ); + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( @@ -144,7 +146,7 @@ pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> Messag } } -fn handle_instruction( +async fn handle_instruction( executor: Arc, mailbox: Arc, instruction: Instruction, @@ -153,5 +155,5 @@ fn handle_instruction( let mut ctx: HeartbeatResponseHandlerContext = HeartbeatResponseHandlerContext::new(mailbox, response); ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction)); - executor.handle(ctx).unwrap(); + executor.handle(ctx).await.unwrap(); } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 539a3140230c..ca50243ed9d1 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -38,6 +38,7 @@ regex = "1.6" serde = "1.0" serde_json = "1.0" snafu.workspace = true +store-api = { path = "../store-api" } table = { path = "../table" } tokio.workspace = true tokio-stream = { version = "0.1", features = ["net"] } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 3dd295f4ecdd..3ed8819d6e11 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -354,6 +354,12 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to convert proto data, source: {}", source))] + ConvertProtoData { + location: Location, + source: common_meta::error::Error, + }, + // this error is used for custom error mapping // please do not delete it #[snafu(display("Other error, source: {}", source))] @@ -442,7 +448,9 @@ impl ErrorExt for Error { Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted, Error::RegisterProcedureLoader { source, .. } => source.status_code(), - Error::TableRouteConversion { source, .. } => source.status_code(), + Error::TableRouteConversion { source, .. } | Error::ConvertProtoData { source, .. } => { + source.status_code() + } Error::Other { source, .. } => source.status_code(), } } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 658f45d3a5be..84acb376c463 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -19,8 +19,8 @@ use std::time::Duration; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{ - HeartbeatRequest, HeartbeatResponse, MailboxMessage, RequestHeader, ResponseHeader, Role, - PROTOCOL_VERSION, + HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, RequestHeader, + ResponseHeader, Role, PROTOCOL_VERSION, }; pub use check_leader_handler::CheckLeaderHandler; pub use collect_stats_handler::CollectStatsHandler; @@ -54,6 +54,7 @@ pub mod mailbox_handler; pub mod node_stat; mod on_leader_start; mod persist_stats_handler; +pub(crate) mod region_lease_handler; mod response_header_handler; #[async_trait::async_trait] @@ -73,6 +74,7 @@ pub struct HeartbeatAccumulator { pub header: Option, pub instructions: Vec, pub stat: Option, + pub region_leases: Vec, } impl HeartbeatAccumulator { @@ -233,7 +235,7 @@ impl HeartbeatHandlerGroup { let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { header, - mailbox_message: acc.into_mailbox_message(), + region_leases: acc.region_leases, ..Default::default() }; Ok(res) diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 953efaf6af96..39bd454cfba5 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -36,6 +36,7 @@ pub(crate) struct DatanodeHeartbeat { pub struct RegionFailureHandler { failure_detect_runner: FailureDetectRunner, + region_failover_manager: Arc, } impl RegionFailureHandler { @@ -45,13 +46,19 @@ impl RegionFailureHandler { ) -> Result { region_failover_manager.try_start()?; - let mut failure_detect_runner = FailureDetectRunner::new(election, region_failover_manager); + let mut failure_detect_runner = + FailureDetectRunner::new(election, region_failover_manager.clone()); failure_detect_runner.start().await; Ok(Self { failure_detect_runner, + region_failover_manager, }) } + + pub(crate) fn region_failover_manager(&self) -> &Arc { + &self.region_failover_manager + } } #[async_trait] diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs new file mode 100644 index 000000000000..6eeb0ef0bb2e --- /dev/null +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -0,0 +1,226 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; +use async_trait::async_trait; +use catalog::helper::TableGlobalKey; +use common_meta::ident::TableIdent; +use common_meta::ClusterId; +use store_api::storage::RegionNumber; + +use crate::error::Result; +use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; +use crate::procedure::region_failover::{RegionFailoverKey, RegionFailoverManager}; +use crate::service::store::kv::KvStoreRef; +use crate::table_routes; + +/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus +/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second). +// TODO(LFC): Make region lease seconds calculated from Datanode heartbeat configuration. +pub(crate) const REGION_LEASE_SECONDS: u64 = 20; + +pub(crate) struct RegionLeaseHandler { + kv_store: KvStoreRef, + region_failover_manager: Option>, +} + +impl RegionLeaseHandler { + pub(crate) fn new( + kv_store: KvStoreRef, + region_failover_manager: Option>, + ) -> Self { + Self { + kv_store, + region_failover_manager, + } + } + + /// Filter out the regions that are currently in failover. + /// It's meaningless to extend the lease of a region if it is in failover. + fn filter_failover_regions( + &self, + cluster_id: ClusterId, + table_ident: &TableIdent, + regions: Vec, + ) -> Vec { + if let Some(region_failover_manager) = &self.region_failover_manager { + let mut region_failover_key = RegionFailoverKey { + cluster_id, + table_ident: table_ident.clone(), + region_number: 0, + }; + + regions + .into_iter() + .filter(|region| { + region_failover_key.region_number = *region; + !region_failover_manager.is_region_failover_running(®ion_failover_key) + }) + .collect() + } else { + regions + } + } +} + +#[async_trait] +impl HeartbeatHandler for RegionLeaseHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + _: &mut Context, + acc: &mut HeartbeatAccumulator, + ) -> Result<()> { + let Some(stat) = acc.stat.as_ref() else { return Ok(()) }; + + let mut datanode_regions = HashMap::new(); + stat.region_stats.iter().for_each(|x| { + let key = TableGlobalKey { + catalog_name: x.catalog.to_string(), + schema_name: x.schema.to_string(), + table_name: x.table.to_string(), + }; + datanode_regions + .entry(key) + .or_insert_with(Vec::new) + .push(table::engine::region_number(x.id)); + }); + + // TODO(LFC): Retrieve table global values from some cache here. + let table_global_values = table_routes::batch_get_table_global_value( + &self.kv_store, + datanode_regions.keys().collect::>(), + ) + .await?; + + let mut region_leases = Vec::with_capacity(datanode_regions.len()); + for (table_global_key, local_regions) in datanode_regions { + let Some(Some(table_global_value)) = table_global_values.get(&table_global_key) else { continue }; + + let Some(global_regions) = table_global_value.regions_id_map.get(&stat.id) else { continue }; + + // Filter out the designated regions from table global metadata for the given table on the given Datanode. + let designated_regions = local_regions + .into_iter() + .filter(|x| global_regions.contains(x)) + .collect::>(); + + let table_ident = TableIdent { + catalog: table_global_key.catalog_name.to_string(), + schema: table_global_key.schema_name.to_string(), + table: table_global_key.table_name.to_string(), + table_id: table_global_value.table_id(), + engine: table_global_value.engine().to_string(), + }; + let designated_regions = + self.filter_failover_regions(stat.cluster_id, &table_ident, designated_regions); + + region_leases.push(RegionLease { + table_ident: Some(table_ident.into()), + regions: designated_regions, + duration_since_epoch: req.duration_since_epoch, + lease_seconds: REGION_LEASE_SECONDS, + }); + } + acc.region_leases = region_leases; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + + use super::*; + use crate::handler::node_stat::{RegionStat, Stat}; + use crate::metasrv::builder::MetaSrvBuilder; + use crate::test_util; + + #[tokio::test] + async fn test_handle_region_lease() { + let region_failover_manager = test_util::create_region_failover_manager(); + let kv_store = region_failover_manager + .create_context() + .selector_ctx + .kv_store + .clone(); + + let table_name = "my_table"; + let _ = table_routes::tests::prepare_table_global_value(&kv_store, table_name).await; + + let table_ident = TableIdent { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table_name.to_string(), + table_id: 1, + engine: "mito".to_string(), + }; + region_failover_manager + .running_procedures() + .write() + .unwrap() + .insert(RegionFailoverKey { + cluster_id: 1, + table_ident: table_ident.clone(), + region_number: 1, + }); + + let handler = RegionLeaseHandler::new(kv_store, Some(region_failover_manager)); + + let req = HeartbeatRequest { + duration_since_epoch: 1234, + ..Default::default() + }; + + let builder = MetaSrvBuilder::new(); + let metasrv = builder.build().await.unwrap(); + let ctx = &mut metasrv.new_ctx(); + + let acc = &mut HeartbeatAccumulator::default(); + let new_region_stat = |region_id: u64| -> RegionStat { + RegionStat { + id: region_id, + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table_name.to_string(), + ..Default::default() + } + }; + acc.stat = Some(Stat { + cluster_id: 1, + id: 1, + region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)], + ..Default::default() + }); + + handler.handle(&req, ctx, acc).await.unwrap(); + + // region 1 is during failover and region 3 is not in table global value, + // so only region 2's lease is extended. + assert_eq!(acc.region_leases.len(), 1); + let lease = acc.region_leases.remove(0); + assert_eq!(lease.table_ident.unwrap(), table_ident.into()); + assert_eq!(lease.regions, vec![2]); + assert_eq!(lease.duration_since_epoch, 1234); + assert_eq!(lease.lease_seconds, REGION_LEASE_SECONDS); + } +} diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 155791161edc..f10bae9a6939 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -20,6 +20,7 @@ use common_procedure::local::{LocalManager, ManagerConfig}; use crate::cluster::MetaPeerClient; use crate::error::Result; use crate::handler::mailbox_handler::MailboxHandler; +use crate::handler::region_lease_handler::RegionLeaseHandler; use crate::handler::{ CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox, KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler, @@ -170,6 +171,13 @@ impl MetaSrvBuilder { ) }; + let region_lease_handler = RegionLeaseHandler::new( + kv_store.clone(), + region_failover_handler + .as_ref() + .map(|x| x.region_failover_manager().clone()), + ); + let group = HeartbeatHandlerGroup::new(pushers); let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone()); group.add_handler(ResponseHeaderHandler::default()).await; @@ -184,6 +192,7 @@ impl MetaSrvBuilder { if let Some(region_failover_handler) = region_failover_handler { group.add_handler(region_failover_handler).await; } + group.add_handler(region_lease_handler).await; group.add_handler(PersistStatsHandler::default()).await; group } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 7424dc613916..828f69eb5469 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -21,12 +21,13 @@ mod update_metadata; use std::collections::HashSet; use std::fmt::Debug; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use std::time::Duration; use async_trait::async_trait; use catalog::helper::TableGlobalKey; -use common_meta::RegionIdent; +use common_meta::ident::TableIdent; +use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; @@ -38,6 +39,7 @@ use common_telemetry::{error, info, warn}; use failover_start::RegionFailoverStart; use serde::{Deserialize, Serialize}; use snafu::ResultExt; +use store_api::storage::RegionNumber; use crate::error::{Error, RegisterProcedureLoaderSnafu, Result}; use crate::lock::DistLockRef; @@ -48,26 +50,41 @@ use crate::service::store::ext::KvStoreExt; const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); const CLOSE_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(2); +/// A key for the preventing running multiple failover procedures for the same region. +#[derive(PartialEq, Eq, Hash, Clone)] +pub(crate) struct RegionFailoverKey { + pub(crate) cluster_id: ClusterId, + pub(crate) table_ident: TableIdent, + pub(crate) region_number: RegionNumber, +} + +impl From for RegionFailoverKey { + fn from(region_ident: RegionIdent) -> Self { + Self { + cluster_id: region_ident.cluster_id, + table_ident: region_ident.table_ident, + region_number: region_ident.region_number, + } + } +} + pub(crate) struct RegionFailoverManager { mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, selector: SelectorRef, selector_ctx: SelectorContext, dist_lock: DistLockRef, - running_procedures: Arc>>, + running_procedures: Arc>>, } -struct FailoverProcedureGuard<'a> { - running_procedures: Arc>>, - failed_region: &'a RegionIdent, +struct FailoverProcedureGuard { + running_procedures: Arc>>, + key: RegionFailoverKey, } -impl Drop for FailoverProcedureGuard<'_> { +impl Drop for FailoverProcedureGuard { fn drop(&mut self) { - self.running_procedures - .lock() - .unwrap() - .remove(self.failed_region); + self.running_procedures.write().unwrap().remove(&self.key); } } @@ -85,11 +102,11 @@ impl RegionFailoverManager { selector, selector_ctx, dist_lock, - running_procedures: Arc::new(Mutex::new(HashSet::new())), + running_procedures: Arc::new(RwLock::new(HashSet::new())), } } - fn create_context(&self) -> RegionFailoverContext { + pub(crate) fn create_context(&self) -> RegionFailoverContext { RegionFailoverContext { mailbox: self.mailbox.clone(), selector: self.selector.clone(), @@ -113,19 +130,36 @@ impl RegionFailoverManager { }) } - fn insert_running_procedures(&self, failed_region: &RegionIdent) -> bool { - let mut procedures = self.running_procedures.lock().unwrap(); - if procedures.contains(failed_region) { - return false; + pub(crate) fn is_region_failover_running(&self, key: &RegionFailoverKey) -> bool { + self.running_procedures.read().unwrap().contains(key) + } + + fn insert_running_procedures( + &self, + failed_region: &RegionIdent, + ) -> Option { + let key = RegionFailoverKey::from(failed_region.clone()); + let mut procedures = self.running_procedures.write().unwrap(); + if procedures.insert(key.clone()) { + Some(FailoverProcedureGuard { + running_procedures: self.running_procedures.clone(), + key, + }) + } else { + None } - procedures.insert(failed_region.clone()) + } + + #[cfg(test)] + pub(crate) fn running_procedures(&self) -> Arc>> { + self.running_procedures.clone() } pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> { - if !self.insert_running_procedures(failed_region) { + let Some(guard) = self.insert_running_procedures(failed_region) else { warn!("Region failover procedure for region {failed_region} is already running!"); return Ok(()); - } + }; if !self.table_exists(failed_region).await? { // The table could be dropped before the failure detector knows it. Then the region @@ -142,13 +176,9 @@ impl RegionFailoverManager { info!("Starting region failover procedure {procedure_id} for region {failed_region:?}"); let procedure_manager = self.procedure_manager.clone(); - let running_procedures = self.running_procedures.clone(); let failed_region = failed_region.clone(); common_runtime::spawn_bg(async move { - let _guard = FailoverProcedureGuard { - running_procedures, - failed_region: &failed_region, - }; + let _ = guard; let watcher = &mut match procedure_manager.submit(procedure_with_id).await { Ok(watcher) => watcher, @@ -178,7 +208,7 @@ impl RegionFailoverManager { let table_global_value = self .selector_ctx .kv_store - .get(table_global_key.to_string().into_bytes()) + .get(table_global_key.to_raw_key()) .await?; Ok(table_global_value.is_some()) } @@ -232,7 +262,8 @@ trait State: Sync + Send + Debug { /// │ │ │ /// └─────────┘ │ Sends "Close Region" request /// │ to the failed Datanode, and -/// ┌─────────┐ │ wait for 2 seconds +/// | wait for the Region lease expiry +/// ┌─────────┐ │ seconds /// │ │ │ /// │ ┌──▼────▼──────┐ /// Wait candidate │ │ActivateRegion◄───────────────────────┐ @@ -260,7 +291,6 @@ trait State: Sync + Send + Debug { /// │ Broadcast Invalidate Table /// │ Cache /// │ -/// │ /// ┌────────▼────────┐ /// │RegionFailoverEnd│ /// └─────────────────┘ diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index b24e188c05fa..15ea43625582 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -28,6 +28,7 @@ use super::{RegionFailoverContext, State}; use crate::error::{ Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; +use crate::handler::region_lease_handler::REGION_LEASE_SECONDS; use crate::handler::HeartbeatMailbox; use crate::procedure::region_failover::CLOSE_REGION_MESSAGE_TIMEOUT; use crate::service::mailbox::{Channel, MailboxReceiver}; @@ -35,11 +36,15 @@ use crate::service::mailbox::{Channel, MailboxReceiver}; #[derive(Serialize, Deserialize, Debug)] pub(super) struct DeactivateRegion { candidate: Peer, + region_lease_expiry_seconds: u64, } impl DeactivateRegion { pub(super) fn new(candidate: Peer) -> Self { - Self { candidate } + Self { + candidate, + region_lease_expiry_seconds: REGION_LEASE_SECONDS * 2, + } } async fn send_close_region_message( @@ -95,15 +100,21 @@ impl DeactivateRegion { } Err(e) if matches!(e, Error::MailboxTimeout { .. }) => { // Since we are in a region failover situation, the Datanode that the failed region - // resides might be unreachable. So region deactivation is happened in a "try our - // best" effort, do not retry if mailbox received timeout. - // However, if the region failover procedure is also used in a planned maintenance - // situation in the future, a proper retry is a must. + // resides might be unreachable. So we wait for the region lease to expire. The + // region would be closed by its own [RegionAliveKeeper]. + self.wait_for_region_lease_expiry().await; Ok(Box::new(ActivateRegion::new(self.candidate))) } Err(e) => Err(e), } } + + /// Sleep for `region_lease_expiry_seconds`, to make sure the region is closed (by its + /// region alive keeper). This is critical for region not being opened in multiple Datanodes + /// simultaneously. + async fn wait_for_region_lease_expiry(&self) { + tokio::time::sleep(Duration::from_secs(self.region_lease_expiry_seconds)).await; + } } #[async_trait] @@ -120,8 +131,8 @@ impl State for DeactivateRegion { let mailbox_receiver = match result { Ok(mailbox_receiver) => mailbox_receiver, Err(e) if matches!(e, Error::PusherNotFound { .. }) => { - // The Datanode could be unreachable and deregistered from pushers, - // so simply advancing to the next state here. + // See the mailbox received timeout situation comments above. + self.wait_for_region_lease_expiry().await; return Ok(Box::new(ActivateRegion::new(self.candidate))); } Err(e) => return Err(e), @@ -212,7 +223,10 @@ mod tests { let mut env = TestingEnvBuilder::new().build().await; let failed_region = env.failed_region(1).await; - let state = DeactivateRegion::new(Peer::new(2, "")); + let state = DeactivateRegion { + candidate: Peer::new(2, ""), + region_lease_expiry_seconds: 2, + }; let mailbox_receiver = state .send_close_region_message(&env.context, &failed_region, Duration::from_millis(100)) .await diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 39b5ed28a303..35e1a8a02b33 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::meta::{PutRequest, TableRouteValue}; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_meta::key::TableRouteKey; +use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse}; use snafu::{OptionExt, ResultExt}; use crate::error::{ - DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableRouteNotFoundSnafu, + ConvertProtoDataSnafu, DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, + TableRouteNotFoundSnafu, }; use crate::service::store::ext::KvStoreExt; use crate::service::store::kv::KvStoreRef; @@ -27,12 +31,40 @@ pub async fn get_table_global_value( kv_store: &KvStoreRef, key: &TableGlobalKey, ) -> Result> { - let key = key.to_string().into_bytes(); - let kv = kv_store.get(key).await?; + let kv = kv_store.get(key.to_raw_key()).await?; kv.map(|kv| TableGlobalValue::from_bytes(kv.value).context(InvalidCatalogValueSnafu)) .transpose() } +pub(crate) async fn batch_get_table_global_value( + kv_store: &KvStoreRef, + keys: Vec<&TableGlobalKey>, +) -> Result>> { + let req = BatchGetRequest { + keys: keys.iter().map(|x| x.to_raw_key()).collect::>(), + }; + let mut resp: BatchGetResponse = kv_store + .batch_get(req.into()) + .await? + .try_into() + .context(ConvertProtoDataSnafu)?; + + let kvs = resp.take_kvs(); + let mut result = HashMap::with_capacity(kvs.len()); + for kv in kvs { + let key = TableGlobalKey::try_from_raw_key(kv.key()).context(InvalidCatalogValueSnafu)?; + let value = TableGlobalValue::from_bytes(kv.value()).context(InvalidCatalogValueSnafu)?; + result.insert(key, Some(value)); + } + + for key in keys { + if !result.contains_key(key) { + result.insert(key.clone(), None); + } + } + Ok(result) +} + pub(crate) async fn put_table_global_value( kv_store: &KvStoreRef, key: &TableGlobalKey, @@ -40,7 +72,7 @@ pub(crate) async fn put_table_global_value( ) -> Result<()> { let req = PutRequest { header: None, - key: key.to_string().into_bytes(), + key: key.to_raw_key(), value: value.as_bytes().context(InvalidCatalogValueSnafu)?, prev_kv: false, }; @@ -228,12 +260,12 @@ pub(crate) mod tests { async fn test_put_and_get_table_global_value() { let kv_store = Arc::new(MemStore::new()) as _; - let key = TableGlobalKey { + let not_exist_key = TableGlobalKey { catalog_name: "not_exist_catalog".to_string(), schema_name: "not_exist_schema".to_string(), table_name: "not_exist_table".to_string(), }; - assert!(get_table_global_value(&kv_store, &key) + assert!(get_table_global_value(&kv_store, ¬_exist_key) .await .unwrap() .is_none()); @@ -244,6 +276,12 @@ pub(crate) mod tests { .unwrap() .unwrap(); assert_eq!(actual, value); + + let keys = vec![¬_exist_key, &key]; + let result = batch_get_table_global_value(&kv_store, keys).await.unwrap(); + assert_eq!(result.len(), 2); + assert!(result.get(¬_exist_key).unwrap().is_none()); + assert_eq!(result.get(&key).unwrap().as_ref().unwrap(), &value); } #[tokio::test] diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 68c0ca1a50ad..d0fc69cf212f 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -28,7 +28,7 @@ use crate::TableRef; pub mod manager; /// Represents a resolved path to a table of the form “catalog.schema.table” -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq, Hash)] pub struct TableReference<'a> { pub catalog: &'a str, pub schema: &'a str,