diff --git a/src/meta-srv/src/lock.rs b/src/meta-srv/src/lock.rs index fb607b192bfe..5eceddac0485 100644 --- a/src/meta-srv/src/lock.rs +++ b/src/meta-srv/src/lock.rs @@ -13,7 +13,6 @@ // limitations under the License. pub mod etcd; -pub(crate) mod keys; pub(crate) mod memory; use std::sync::Arc; diff --git a/src/meta-srv/src/lock/keys.rs b/src/meta-srv/src/lock/keys.rs deleted file mode 100644 index db3f5d81282a..000000000000 --- a/src/meta-srv/src/lock/keys.rs +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! All keys used for distributed locking in the Metasrv. -//! Place them in this unified module for better maintenance. - -use common_meta::RegionIdent; - -use crate::lock::Key; - -pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key { - format!( - "table_metadata_lock_({}-{})", - region.cluster_id, region.table_id, - ) - .into_bytes() -} diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index dbe63b762c9f..1f430654d224 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO(weny): remove it. -#[allow(unused)] -pub mod region_failover; pub mod region_migration; #[cfg(test)] mod tests; diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs deleted file mode 100644 index 8e0e1424a6b1..000000000000 --- a/src/meta-srv/src/procedure/region_failover.rs +++ /dev/null @@ -1,844 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod activate_region; -mod deactivate_region; -mod failover_end; -mod failover_start; -mod invalidate_cache; -mod update_metadata; - -use std::collections::HashSet; -use std::fmt::Debug; -use std::sync::{Arc, RwLock}; -use std::time::Duration; - -use async_trait::async_trait; -use common_meta::key::datanode_table::DatanodeTableKey; -use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; -use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; -use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; -use common_meta::{ClusterId, RegionIdent}; -use common_procedure::error::{ - Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, -}; -use common_procedure::{ - watcher, Context as ProcedureContext, LockKey, Procedure, ProcedureManagerRef, ProcedureWithId, - Status, -}; -use common_telemetry::{error, info, warn}; -use failover_start::RegionFailoverStart; -use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::TableId; -use table::table_name::TableName; - -use crate::error::{ - self, KvBackendSnafu, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu, -}; -use crate::lock::DistLockRef; -use crate::metasrv::{SelectorContext, SelectorRef}; -use crate::service::mailbox::MailboxRef; - -const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); - -/// A key for the preventing running multiple failover procedures for the same region. -#[derive(PartialEq, Eq, Hash, Clone)] -pub(crate) struct RegionFailoverKey { - pub(crate) cluster_id: ClusterId, - pub(crate) table_id: TableId, - pub(crate) region_number: RegionNumber, -} - -impl From for RegionFailoverKey { - fn from(region_ident: RegionIdent) -> Self { - Self { - cluster_id: region_ident.cluster_id, - table_id: region_ident.table_id, - region_number: region_ident.region_number, - } - } -} - -pub(crate) struct RegionFailoverManager { - region_lease_secs: u64, - in_memory: ResettableKvBackendRef, - kv_backend: KvBackendRef, - mailbox: MailboxRef, - procedure_manager: ProcedureManagerRef, - selector: SelectorRef, - selector_ctx: SelectorContext, - dist_lock: DistLockRef, - running_procedures: Arc>>, - table_metadata_manager: TableMetadataManagerRef, -} - -struct FailoverProcedureGuard { - running_procedures: Arc>>, - key: RegionFailoverKey, -} - -impl Drop for FailoverProcedureGuard { - fn drop(&mut self) { - let _ = self.running_procedures.write().unwrap().remove(&self.key); - } -} - -impl RegionFailoverManager { - #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - region_lease_secs: u64, - in_memory: ResettableKvBackendRef, - kv_backend: KvBackendRef, - mailbox: MailboxRef, - procedure_manager: ProcedureManagerRef, - (selector, selector_ctx): (SelectorRef, SelectorContext), - dist_lock: DistLockRef, - table_metadata_manager: TableMetadataManagerRef, - ) -> Self { - Self { - region_lease_secs, - in_memory, - kv_backend, - mailbox, - procedure_manager, - selector, - selector_ctx, - dist_lock, - running_procedures: Arc::new(RwLock::new(HashSet::new())), - table_metadata_manager, - } - } - - pub(crate) fn create_context(&self) -> RegionFailoverContext { - RegionFailoverContext { - region_lease_secs: self.region_lease_secs, - in_memory: self.in_memory.clone(), - kv_backend: self.kv_backend.clone(), - mailbox: self.mailbox.clone(), - selector: self.selector.clone(), - selector_ctx: self.selector_ctx.clone(), - dist_lock: self.dist_lock.clone(), - table_metadata_manager: self.table_metadata_manager.clone(), - } - } - - pub(crate) fn try_start(&self) -> Result<()> { - let context = self.create_context(); - self.procedure_manager - .register_loader( - RegionFailoverProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - RegionFailoverProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: RegionFailoverProcedure::TYPE_NAME, - }) - } - - fn insert_running_procedures( - &self, - failed_region: &RegionIdent, - ) -> Option { - let key = RegionFailoverKey::from(failed_region.clone()); - let mut procedures = self.running_procedures.write().unwrap(); - if procedures.insert(key.clone()) { - Some(FailoverProcedureGuard { - running_procedures: self.running_procedures.clone(), - key, - }) - } else { - None - } - } - - pub(crate) async fn is_maintenance_mode(&self) -> Result { - self.kv_backend - .exists(MAINTENANCE_KEY.as_bytes()) - .await - .context(KvBackendSnafu) - } - - pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> { - let Some(guard) = self.insert_running_procedures(failed_region) else { - warn!("Region failover procedure for region {failed_region} is already running!"); - return Ok(()); - }; - - let table_info = self - .table_metadata_manager - .table_info_manager() - .get(failed_region.table_id) - .await - .context(error::TableMetadataManagerSnafu)?; - - if table_info.is_none() { - // The table could be dropped before the failure detector knows it. Then the region - // failover is not needed. - // Or the table could be renamed. But we will have a new region ident to detect failure. - // So the region failover here is not needed either. - return Ok(()); - } - - if !self.failed_region_exists(failed_region).await? { - // The failed region could be failover by another procedure. - return Ok(()); - } - - let context = self.create_context(); - // Safety: Check before. - let table_info = table_info.unwrap(); - let TableName { - catalog_name, - schema_name, - .. - } = table_info.table_name(); - let procedure = - RegionFailoverProcedure::new(catalog_name, schema_name, failed_region.clone(), context); - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - let procedure_id = procedure_with_id.id; - info!("Starting region failover procedure {procedure_id} for region {failed_region:?}"); - - let procedure_manager = self.procedure_manager.clone(); - let failed_region = failed_region.clone(); - let _handle = common_runtime::spawn_bg(async move { - let _ = guard; - - let watcher = &mut match procedure_manager.submit(procedure_with_id).await { - Ok(watcher) => watcher, - Err(e) => { - error!(e; "Failed to submit region failover procedure {procedure_id} for region {failed_region}"); - return; - } - }; - - if let Err(e) = watcher::wait(watcher).await { - error!(e; "Failed to wait region failover procedure {procedure_id} for region {failed_region}"); - return; - } - - info!("Region failover procedure {procedure_id} for region {failed_region} is finished successfully!"); - }); - Ok(()) - } - - async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result { - let table_id = failed_region.table_id; - let datanode_id = failed_region.datanode_id; - - let value = self - .table_metadata_manager - .datanode_table_manager() - .get(&DatanodeTableKey::new(datanode_id, table_id)) - .await - .context(TableMetadataManagerSnafu)?; - - Ok(value - .map(|value| { - value - .regions - .iter() - .any(|region| *region == failed_region.region_number) - }) - .unwrap_or_default()) - } -} - -#[derive(Serialize, Deserialize, Debug)] -struct LockMeta { - catalog: String, - schema: String, -} - -/// A "Node" in the state machine of region failover procedure. -/// Contains the current state and the data. -#[derive(Serialize, Deserialize, Debug)] -struct Node { - lock_meta: LockMeta, - failed_region: RegionIdent, - state: Box, -} - -/// The "Context" of region failover procedure state machine. -#[derive(Clone)] -pub struct RegionFailoverContext { - pub region_lease_secs: u64, - pub in_memory: ResettableKvBackendRef, - pub kv_backend: KvBackendRef, - pub mailbox: MailboxRef, - pub selector: SelectorRef, - pub selector_ctx: SelectorContext, - pub dist_lock: DistLockRef, - pub table_metadata_manager: TableMetadataManagerRef, -} - -/// The state machine of region failover procedure. Driven by the call to `next`. -#[async_trait] -#[typetag::serde(tag = "region_failover_state")] -trait State: Sync + Send + Debug { - async fn next( - &mut self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result>; - - fn status(&self) -> Status { - Status::executing(true) - } -} - -/// The states transition of region failover procedure: -/// -/// ```text -/// ┌───────────────────┐ -/// │RegionFailoverStart│ -/// └─────────┬─────────┘ -/// │ -/// │ Selects a candidate(Datanode) -/// ┌─────────┐ │ to place the failed region -/// │ │ │ -/// If replied with │ ┌───▼────▼───────┐ -/// "Close region │ │DeactivateRegion│ -/// failed" │ └───┬────┬───────┘ -/// │ │ │ -/// └─────────┘ │ Sends "Close Region" request -/// │ to the failed Datanode, and -/// | wait for the Region lease expiry -/// ┌─────────┐ │ seconds -/// │ │ │ -/// │ ┌──▼────▼──────┐ -/// Wait candidate │ │ActivateRegion◄───────────────────────┐ -/// response timeout │ └──┬────┬──────┘ │ -/// │ │ │ │ -/// └─────────┘ │ Sends "Open Region" request │ -/// │ to the candidate Datanode, │ -/// │ and wait for 30 seconds │ -/// │ │ -/// │ Check Datanode returns │ -/// │ │ -/// success ├──────────────────────────────┘ -/// │ failed -/// ┌─────────▼──────────┐ -/// │UpdateRegionMetadata│ -/// └─────────┬──────────┘ -/// │ -/// │ Updates the Region -/// │ placement metadata -/// │ -/// ┌───────▼───────┐ -/// │InvalidateCache│ -/// └───────┬───────┘ -/// │ -/// │ Broadcast Invalidate Table -/// │ Cache -/// │ -/// ┌────────▼────────┐ -/// │RegionFailoverEnd│ -/// └─────────────────┘ -/// ``` -pub struct RegionFailoverProcedure { - node: Node, - context: RegionFailoverContext, -} - -impl RegionFailoverProcedure { - const TYPE_NAME: &'static str = "metasrv-procedure::RegionFailover"; - - pub fn new( - catalog: String, - schema: String, - failed_region: RegionIdent, - context: RegionFailoverContext, - ) -> Self { - let state = RegionFailoverStart::new(); - let node = Node { - lock_meta: LockMeta { catalog, schema }, - failed_region, - state: Box::new(state), - }; - Self { node, context } - } - - fn from_json(json: &str, context: RegionFailoverContext) -> ProcedureResult { - let node: Node = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(Self { node, context }) - } -} - -#[async_trait] -impl Procedure for RegionFailoverProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let state = &mut self.node.state; - *state = state - .next(&self.context, &self.node.failed_region) - .await - .map_err(|e| { - if e.is_retryable() { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - })?; - Ok(state.status()) - } - - fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.node).context(ToJsonSnafu) - } - - fn lock_key(&self) -> LockKey { - let region_ident = &self.node.failed_region; - let lock_key = vec![ - CatalogLock::Read(&self.node.lock_meta.catalog).into(), - SchemaLock::read(&self.node.lock_meta.catalog, &self.node.lock_meta.catalog).into(), - TableLock::Read(region_ident.table_id).into(), - RegionLock::Write(RegionId::new( - region_ident.table_id, - region_ident.region_number, - )) - .into(), - ]; - - LockKey::new(lock_key) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Mutex; - - use api::v1::meta::mailbox_message::Payload; - use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_meta::ddl::utils::region_storage_path; - use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::peer::Peer; - use common_meta::sequence::SequenceBuilder; - use common_meta::DatanodeId; - use common_procedure::{BoxedProcedure, ProcedureId}; - use common_procedure_test::MockContextProvider; - use rand::prelude::SliceRandom; - use tokio::sync::mpsc::Receiver; - - use super::*; - use crate::cluster::MetaPeerClientBuilder; - use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; - use crate::lock::memory::MemLock; - use crate::selector::{Namespace, Selector, SelectorOptions}; - use crate::service::mailbox::Channel; - use crate::test_util; - - struct RandomNodeSelector { - nodes: Vec, - } - - #[async_trait] - impl Selector for RandomNodeSelector { - type Context = SelectorContext; - type Output = Vec; - - async fn select( - &self, - _ns: Namespace, - _ctx: &Self::Context, - _opts: SelectorOptions, - ) -> Result { - let mut rng = rand::thread_rng(); - let mut nodes = self.nodes.clone(); - nodes.shuffle(&mut rng); - Ok(nodes) - } - } - - pub struct TestingEnv { - pub context: RegionFailoverContext, - pub heartbeat_receivers: HashMap>>, - pub pushers: Pushers, - pub path: String, - } - - impl TestingEnv { - pub async fn failed_region(&self, region_number: u32) -> RegionIdent { - let region_distribution = self - .context - .table_metadata_manager - .table_route_manager() - .get_region_distribution(1) - .await - .unwrap() - .unwrap(); - let failed_datanode = region_distribution - .iter() - .find_map(|(&datanode_id, regions)| { - if regions.contains(®ion_number) { - Some(datanode_id) - } else { - None - } - }) - .unwrap(); - RegionIdent { - cluster_id: 0, - region_number, - datanode_id: failed_datanode, - table_id: 1, - engine: "mito2".to_string(), - } - } - } - - pub struct TestingEnvBuilder { - selector: Option, - } - - impl TestingEnvBuilder { - pub fn new() -> Self { - Self { selector: None } - } - - fn with_selector(mut self, selector: SelectorRef) -> Self { - self.selector = Some(selector); - self - } - - pub async fn build(self) -> TestingEnv { - let in_memory = Arc::new(MemoryKvBackend::new()); - let kv_backend = Arc::new(MemoryKvBackend::new()); - let meta_peer_client = MetaPeerClientBuilder::default() - .election(None) - .in_memory(Arc::new(MemoryKvBackend::new())) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap(); - - let table_id = 1; - let table = "my_table"; - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - test_util::prepare_table_region_and_info_value(&table_metadata_manager, table).await; - let region_distribution = table_metadata_manager - .table_route_manager() - .get_region_distribution(1) - .await - .unwrap() - .unwrap(); - - let pushers = Pushers::default(); - let mut heartbeat_receivers = HashMap::with_capacity(3); - for datanode_id in 1..=3 { - let (tx, rx) = tokio::sync::mpsc::channel(1); - - let pusher_id = Channel::Datanode(datanode_id).pusher_id(); - let pusher = Pusher::new(tx, &RequestHeader::default()); - let _ = pushers.insert(pusher_id, pusher).await; - - let _ = heartbeat_receivers.insert(datanode_id, rx); - } - - let mailbox_sequence = - SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()) - .initial(0) - .step(100) - .build(); - let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); - - let selector = self.selector.unwrap_or_else(|| { - let nodes = (1..=region_distribution.len()) - .map(|id| Peer { - id: id as u64, - addr: String::default(), - }) - .collect(); - Arc::new(RandomNodeSelector { nodes }) - }); - let selector_ctx = SelectorContext { - datanode_lease_secs: 10, - flownode_lease_secs: 10, - server_addr: "127.0.0.1:3002".to_string(), - kv_backend: kv_backend.clone(), - meta_peer_client, - table_id: Some(table_id), - }; - - TestingEnv { - context: RegionFailoverContext { - region_lease_secs: 10, - in_memory, - kv_backend, - mailbox, - selector, - selector_ctx, - dist_lock: Arc::new(MemLock::default()), - table_metadata_manager, - }, - pushers, - heartbeat_receivers, - path: region_storage_path(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME).to_string(), - } - } - } - - #[tokio::test] - async fn test_region_failover_procedure() { - let mut env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let mut procedure = Box::new(RegionFailoverProcedure::new( - "greptime".into(), - "public".into(), - failed_region.clone(), - env.context.clone(), - )) as BoxedProcedure; - - let mut failed_datanode = env - .heartbeat_receivers - .remove(&failed_region.datanode_id) - .unwrap(); - let mailbox_clone = env.context.mailbox.clone(); - let failed_region_clone = failed_region.clone(); - let _handle = common_runtime::spawn_bg(async move { - let resp = failed_datanode.recv().await.unwrap().unwrap(); - let received = &resp.mailbox_message.unwrap(); - assert_eq!( - received.payload, - Some(Payload::Json( - serde_json::to_string(&Instruction::CloseRegion(failed_region_clone.clone())) - .unwrap(), - )) - ); - - // simulating response from Datanode - mailbox_clone - .on_recv( - 1, - Ok(MailboxMessage { - id: 1, - subject: "Deactivate Region".to_string(), - from: format!("Datanode-{}", failed_region.datanode_id), - to: "Metasrv".to_string(), - timestamp_millis: common_time::util::current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { - result: true, - error: None, - })) - .unwrap(), - )), - }), - ) - .await - .unwrap(); - }); - - let (candidate_tx, mut candidate_rx) = tokio::sync::mpsc::channel(1); - for (datanode_id, mut recv) in env.heartbeat_receivers.into_iter() { - let mailbox_clone = env.context.mailbox.clone(); - let opening_region = RegionIdent { - datanode_id, - ..failed_region.clone() - }; - let path = env.path.to_string(); - let candidate_tx = candidate_tx.clone(); - let _handle = common_runtime::spawn_bg(async move { - let resp = recv.recv().await.unwrap().unwrap(); - let received = &resp.mailbox_message.unwrap(); - assert_eq!( - received.payload, - Some(Payload::Json( - serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new( - opening_region, - &path, - HashMap::new(), - HashMap::new(), - false - ))) - .unwrap(), - )) - ); - - candidate_tx.send(datanode_id).await.unwrap(); - - // simulating response from Datanode - mailbox_clone - .on_recv( - // Very tricky here: - // the procedure only sends two messages in sequence, the second one is - // "Activate Region", and its message id is 2. - 2, - Ok(MailboxMessage { - id: 2, - subject: "Activate Region".to_string(), - from: format!("Datanode-{datanode_id}"), - to: "Metasrv".to_string(), - timestamp_millis: common_time::util::current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { - result: true, - error: None, - })) - .unwrap(), - )), - }), - ) - .await - .unwrap(); - }); - } - - common_procedure_test::execute_procedure_until_done(&mut procedure).await; - - assert_eq!( - procedure.dump().unwrap(), - r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"# - ); - - // Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode. - let region_distribution = env - .context - .table_metadata_manager - .table_route_manager() - .get_region_distribution(1) - .await - .unwrap() - .unwrap(); - assert_eq!( - region_distribution.get(&failed_region.datanode_id).unwrap(), - &vec![2] - ); - assert!(region_distribution - .get(&candidate_rx.recv().await.unwrap()) - .unwrap() - .contains(&1)); - } - - #[tokio::test] - async fn test_state_serde() { - let env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let state = RegionFailoverStart::new(); - let node = Node { - lock_meta: LockMeta { - catalog: "greptime".into(), - schema: "public".into(), - }, - failed_region, - state: Box::new(state), - }; - let procedure = RegionFailoverProcedure { - node, - context: env.context, - }; - - let s = procedure.dump().unwrap(); - assert_eq!( - s, - r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#, - ); - let n: Node = serde_json::from_str(&s).unwrap(); - assert_eq!( - format!("{n:?}"), - r#"Node { lock_meta: LockMeta { catalog: "greptime", schema: "public" }, failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#, - ); - } - - #[tokio::test] - async fn test_state_not_changed_upon_failure() { - struct MySelector { - peers: Arc>>>, - } - - #[async_trait] - impl Selector for MySelector { - type Context = SelectorContext; - type Output = Vec; - - async fn select( - &self, - _ns: Namespace, - _ctx: &Self::Context, - _opts: SelectorOptions, - ) -> Result { - let mut peers = self.peers.lock().unwrap(); - Ok(if let Some(Some(peer)) = peers.pop() { - vec![peer] - } else { - vec![] - }) - } - } - - // Returns a valid peer the second time called "select". - let selector = MySelector { - peers: Arc::new(Mutex::new(vec![ - Some(Peer { - id: 42, - addr: String::default(), - }), - None, - ])), - }; - - let env = TestingEnvBuilder::new() - .with_selector(Arc::new(selector)) - .build() - .await; - let failed_region = env.failed_region(1).await; - - let state = RegionFailoverStart::new(); - let node = Node { - lock_meta: LockMeta { - catalog: "greptime".into(), - schema: "public".into(), - }, - failed_region, - state: Box::new(state), - }; - let mut procedure = RegionFailoverProcedure { - node, - context: env.context, - }; - - let ctx = ProcedureContext { - procedure_id: ProcedureId::random(), - provider: Arc::new(MockContextProvider::default()), - }; - - let result = procedure.execute(&ctx).await; - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!(err.is_retry_later(), "err: {:?}", err); - assert_eq!( - r#"{"region_failover_state":"RegionFailoverStart","failover_candidate":null}"#, - serde_json::to_string(&procedure.node.state).unwrap() - ); - - let result = procedure.execute(&ctx).await; - assert!(matches!(result, Ok(Status::Executing { persist: true }))); - assert_eq!( - r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""}}"#, - serde_json::to_string(&procedure.node.state).unwrap() - ); - } -} diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs deleted file mode 100644 index a2b1c8fd9303..000000000000 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ /dev/null @@ -1,328 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::time::Duration; - -use api::v1::meta::MailboxMessage; -use async_trait::async_trait; -use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; -use common_meta::key::datanode_table::{DatanodeTableKey, RegionInfo}; -use common_meta::peer::Peer; -use common_meta::RegionIdent; -use common_telemetry::{debug, info}; -use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; - -use super::update_metadata::UpdateRegionMetadata; -use super::{RegionFailoverContext, State}; -use crate::error::{ - self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, -}; -use crate::handler::HeartbeatMailbox; -use crate::procedure::region_failover::OPEN_REGION_MESSAGE_TIMEOUT; -use crate::service::mailbox::{Channel, MailboxReceiver}; - -#[derive(Serialize, Deserialize, Debug)] -pub(super) struct ActivateRegion { - candidate: Peer, - // If the meta leader node dies during the execution of the procedure, - // the new leader node needs to remark the failed region as "inactive" - // to prevent it from renewing the lease. - remark_inactive_region: bool, - // An `None` option stands for uninitialized. - region_storage_path: Option, - region_options: Option>, - region_wal_options: Option>, -} - -impl ActivateRegion { - pub(super) fn new(candidate: Peer) -> Self { - Self { - candidate, - remark_inactive_region: false, - region_storage_path: None, - region_options: None, - region_wal_options: None, - } - } - - async fn send_open_region_message( - &mut self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - timeout: Duration, - ) -> Result { - let table_id = failed_region.table_id; - // Retrieves the wal options from failed datanode table value. - let datanode_table_value = ctx - .table_metadata_manager - .datanode_table_manager() - .get(&DatanodeTableKey::new(failed_region.datanode_id, table_id)) - .await - .context(error::TableMetadataManagerSnafu)? - .context(error::DatanodeTableNotFoundSnafu { - table_id, - datanode_id: failed_region.datanode_id, - })?; - - let candidate_ident = RegionIdent { - datanode_id: self.candidate.id, - ..failed_region.clone() - }; - info!("Activating region: {candidate_ident:?}"); - - let RegionInfo { - region_storage_path, - region_options, - region_wal_options, - .. - } = datanode_table_value.region_info; - - let instruction = Instruction::OpenRegion(OpenRegion::new( - candidate_ident.clone(), - ®ion_storage_path, - region_options.clone(), - region_wal_options.clone(), - false, - )); - - self.region_storage_path = Some(region_storage_path); - self.region_options = Some(region_options); - self.region_wal_options = Some(region_wal_options); - - let msg = MailboxMessage::json_message( - "Activate Region", - &format!("Metasrv@{}", ctx.selector_ctx.server_addr), - &format!( - "Datanode-(id={}, addr={})", - self.candidate.id, self.candidate.addr - ), - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - let ch = Channel::Datanode(self.candidate.id); - ctx.mailbox.send(&ch, msg, timeout).await - } - - async fn handle_response( - &mut self, - mailbox_receiver: MailboxReceiver, - failed_region: &RegionIdent, - ) -> Result> { - match mailbox_receiver.await? { - Ok(msg) => { - debug!("Received activate region reply: {msg:?}"); - - let reply = HeartbeatMailbox::json_reply(&msg)?; - let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else { - return UnexpectedInstructionReplySnafu { - mailbox_message: msg.to_string(), - reason: "expect open region reply", - } - .fail(); - }; - if result { - Ok(Box::new(UpdateRegionMetadata::new( - self.candidate.clone(), - self.region_storage_path - .clone() - .context(error::UnexpectedSnafu { - violated: "expected region_storage_path", - })?, - self.region_options - .clone() - .context(error::UnexpectedSnafu { - violated: "expected region_options", - })?, - self.region_wal_options - .clone() - .context(error::UnexpectedSnafu { - violated: "expected region_wal_options", - })?, - ))) - } else { - // The region could be just indeed cannot be opened by the candidate, retry - // would be in vain. Then why not just end the failover procedure? Because we - // currently lack the methods or any maintenance tools to manage the whole - // procedures things, it would be easier to let the procedure keep running. - let reason = format!( - "Region {failed_region:?} is not opened by Datanode {:?}, error: {error:?}", - self.candidate, - ); - RetryLaterSnafu { reason }.fail() - } - } - Err(Error::MailboxTimeout { .. }) => { - let reason = format!( - "Mailbox received timeout for activate failed region {failed_region:?} on Datanode {:?}", - self.candidate, - ); - RetryLaterSnafu { reason }.fail() - } - Err(e) => Err(e), - } - } -} - -#[async_trait] -#[typetag::serde] -impl State for ActivateRegion { - async fn next( - &mut self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result> { - let mailbox_receiver = self - .send_open_region_message(ctx, failed_region, OPEN_REGION_MESSAGE_TIMEOUT) - .await?; - - self.handle_response(mailbox_receiver, failed_region).await - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use api::v1::meta::mailbox_message::Payload; - use common_meta::instruction::SimpleReply; - - use super::super::tests::TestingEnvBuilder; - use super::*; - - #[tokio::test] - async fn test_activate_region_success() { - common_telemetry::init_default_ut_logging(); - - let mut env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let candidate = 2; - let mut state = ActivateRegion::new(Peer::new(candidate, "")); - let mailbox_receiver = state - .send_open_region_message(&env.context, &failed_region, Duration::from_millis(100)) - .await - .unwrap(); - - let message_id = mailbox_receiver.message_id(); - - // verify that the open region message is sent - let rx = env.heartbeat_receivers.get_mut(&candidate).unwrap(); - let resp = rx.recv().await.unwrap().unwrap(); - let received = &resp.mailbox_message.unwrap(); - assert_eq!(received.id, message_id); - assert_eq!(received.subject, "Activate Region"); - assert_eq!(received.from, "Metasrv@127.0.0.1:3002"); - assert_eq!(received.to, "Datanode-(id=2, addr=)"); - assert_eq!( - received.payload, - Some(Payload::Json( - serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new( - RegionIdent { - datanode_id: candidate, - ..failed_region.clone() - }, - &env.path, - HashMap::new(), - HashMap::new(), - false - ))) - .unwrap(), - )) - ); - - // simulating response from Datanode - env.context - .mailbox - .on_recv( - message_id, - Ok(MailboxMessage { - id: message_id, - subject: "Activate Region".to_string(), - from: "Datanode-2".to_string(), - to: "Metasrv".to_string(), - timestamp_millis: common_time::util::current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { - result: true, - error: None, - })) - .unwrap(), - )), - }), - ) - .await - .unwrap(); - - let next_state = state - .handle_response(mailbox_receiver, &failed_region) - .await - .unwrap(); - assert_eq!( - format!("{next_state:?}"), - r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {}, region_wal_options: {} }"# - ); - } - - #[tokio::test] - async fn test_activate_region_timeout() { - common_telemetry::init_default_ut_logging(); - - let mut env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let candidate = 2; - let mut state = ActivateRegion::new(Peer::new(candidate, "")); - let mailbox_receiver = state - .send_open_region_message(&env.context, &failed_region, Duration::from_millis(100)) - .await - .unwrap(); - - // verify that the open region message is sent - let rx = env.heartbeat_receivers.get_mut(&candidate).unwrap(); - let resp = rx.recv().await.unwrap().unwrap(); - let received = &resp.mailbox_message.unwrap(); - assert_eq!(received.id, mailbox_receiver.message_id()); - assert_eq!(received.subject, "Activate Region"); - assert_eq!(received.from, "Metasrv@127.0.0.1:3002"); - assert_eq!(received.to, "Datanode-(id=2, addr=)"); - assert_eq!( - received.payload, - Some(Payload::Json( - serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new( - RegionIdent { - datanode_id: candidate, - ..failed_region.clone() - }, - &env.path, - HashMap::new(), - HashMap::new(), - false - ))) - .unwrap(), - )) - ); - - let result = state - .handle_response(mailbox_receiver, &failed_region) - .await; - assert!(matches!(result, Err(Error::RetryLater { .. }))); - } -} diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs deleted file mode 100644 index d6e2c088945c..000000000000 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ /dev/null @@ -1,328 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::Duration; - -use api::v1::meta::MailboxMessage; -use async_trait::async_trait; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_meta::peer::Peer; -use common_meta::rpc::router::RegionStatus; -use common_meta::RegionIdent; -use common_telemetry::{debug, info, warn}; -use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; - -use super::activate_region::ActivateRegion; -use super::{RegionFailoverContext, State}; -use crate::error::{ - self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, -}; -use crate::handler::HeartbeatMailbox; -use crate::service::mailbox::{Channel, MailboxReceiver}; - -#[derive(Serialize, Deserialize, Debug)] -pub(super) struct DeactivateRegion { - candidate: Peer, -} - -impl DeactivateRegion { - pub(super) fn new(candidate: Peer) -> Self { - Self { candidate } - } - - async fn mark_leader_downgraded( - &self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result<()> { - let table_id = failed_region.table_id; - - let table_route_value = ctx - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_raw(table_id) - .await - .context(error::TableMetadataManagerSnafu)? - .context(error::TableRouteNotFoundSnafu { table_id })?; - - ctx.table_metadata_manager - .update_leader_region_status(table_id, &table_route_value, |region| { - if region.region.id.region_number() == failed_region.region_number { - Some(Some(RegionStatus::Downgraded)) - } else { - None - } - }) - .await - .context(error::UpdateTableRouteSnafu)?; - - Ok(()) - } - - async fn send_close_region_message( - &self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result { - let instruction = Instruction::CloseRegion(failed_region.clone()); - - let msg = MailboxMessage::json_message( - "Deactivate Region", - &format!("Metasrv@{}", ctx.selector_ctx.server_addr), - &format!("Datanode-{}", failed_region.datanode_id), - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - let ch = Channel::Datanode(failed_region.datanode_id); - let timeout = Duration::from_secs(ctx.region_lease_secs); - ctx.mailbox.send(&ch, msg, timeout).await - } - - async fn handle_response( - &self, - _ctx: &RegionFailoverContext, - mailbox_receiver: MailboxReceiver, - failed_region: &RegionIdent, - ) -> Result> { - match mailbox_receiver.await? { - Ok(msg) => { - debug!("Received deactivate region reply: {msg:?}"); - - let reply = HeartbeatMailbox::json_reply(&msg)?; - let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else { - return UnexpectedInstructionReplySnafu { - mailbox_message: msg.to_string(), - reason: "expect close region reply", - } - .fail(); - }; - if result { - Ok(Box::new(ActivateRegion::new(self.candidate.clone()))) - } else { - // Under rare circumstances would a Datanode fail to close a Region. - // So simply retry. - let reason = format!( - "Region {failed_region:?} is not closed by Datanode {}, error: {error:?}", - failed_region.datanode_id, - ); - RetryLaterSnafu { reason }.fail() - } - } - Err(Error::MailboxTimeout { .. }) => { - // We have configured the timeout to match the region lease timeout before making - // the call and have disabled region lease renewal. Therefore, if a timeout error - // occurs, it can be concluded that the region has been closed. With this information, - // we can proceed confidently to the next step. - Ok(Box::new(ActivateRegion::new(self.candidate.clone()))) - } - Err(e) => Err(e), - } - } - - /// Sleep for `region_lease_expiry_seconds`, to make sure the region is closed (by its - /// region alive keeper). This is critical for region not being opened in multiple Datanodes - /// simultaneously. - async fn wait_for_region_lease_expiry(&self, ctx: &RegionFailoverContext) { - tokio::time::sleep(Duration::from_secs(ctx.region_lease_secs)).await; - } -} - -#[async_trait] -#[typetag::serde] -impl State for DeactivateRegion { - async fn next( - &mut self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result> { - info!("Deactivating region: {failed_region:?}"); - self.mark_leader_downgraded(ctx, failed_region).await?; - let result = self.send_close_region_message(ctx, failed_region).await; - let mailbox_receiver = match result { - Ok(mailbox_receiver) => mailbox_receiver, - Err(Error::PusherNotFound { .. }) => { - warn!( - "Datanode {} is not reachable, skip deactivating region {}, just wait for the region lease to expire", - failed_region.datanode_id, failed_region - ); - // See the mailbox received timeout situation comments above. - self.wait_for_region_lease_expiry(ctx).await; - return Ok(Box::new(ActivateRegion::new(self.candidate.clone()))); - } - Err(e) => return Err(e), - }; - - self.handle_response(ctx, mailbox_receiver, failed_region) - .await - } -} - -#[cfg(test)] -mod tests { - use api::v1::meta::mailbox_message::Payload; - use common_meta::instruction::SimpleReply; - - use super::super::tests::TestingEnvBuilder; - use super::*; - - #[tokio::test] - async fn test_mark_leader_downgraded() { - common_telemetry::init_default_ut_logging(); - - let env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let state = DeactivateRegion::new(Peer::new(2, "")); - - state - .mark_leader_downgraded(&env.context, &failed_region) - .await - .unwrap(); - - let table_id = failed_region.table_id; - - let table_route_value = env - .context - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get(table_id) - .await - .unwrap() - .unwrap(); - - let should_downgraded = table_route_value - .region_routes() - .unwrap() - .iter() - .find(|route| route.region.id.region_number() == failed_region.region_number) - .unwrap(); - - assert!(should_downgraded.is_leader_downgraded()); - } - - #[tokio::test] - async fn test_deactivate_region_success() { - common_telemetry::init_default_ut_logging(); - - let mut env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let state = DeactivateRegion::new(Peer::new(2, "")); - let mailbox_receiver = state - .send_close_region_message(&env.context, &failed_region) - .await - .unwrap(); - - let message_id = mailbox_receiver.message_id(); - - // verify that the close region message is sent - let rx = env - .heartbeat_receivers - .get_mut(&failed_region.datanode_id) - .unwrap(); - let resp = rx.recv().await.unwrap().unwrap(); - let received = &resp.mailbox_message.unwrap(); - assert_eq!(received.id, message_id); - assert_eq!(received.subject, "Deactivate Region"); - assert_eq!(received.from, "Metasrv@127.0.0.1:3002"); - assert_eq!(received.to, "Datanode-1"); - assert_eq!( - received.payload, - Some(Payload::Json( - serde_json::to_string(&Instruction::CloseRegion(failed_region.clone())).unwrap(), - )) - ); - - // simulating response from Datanode - env.context - .mailbox - .on_recv( - message_id, - Ok(MailboxMessage { - id: message_id, - subject: "Deactivate Region".to_string(), - from: "Datanode-1".to_string(), - to: "Metasrv".to_string(), - timestamp_millis: common_time::util::current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { - result: true, - error: None, - })) - .unwrap(), - )), - }), - ) - .await - .unwrap(); - - let next_state = state - .handle_response(&env.context, mailbox_receiver, &failed_region) - .await - .unwrap(); - assert_eq!( - format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"# - ); - } - - #[tokio::test] - async fn test_deactivate_region_timeout() { - common_telemetry::init_default_ut_logging(); - - let mut env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let state = DeactivateRegion::new(Peer::new(2, "")); - let mailbox_receiver = state - .send_close_region_message(&env.context, &failed_region) - .await - .unwrap(); - - // verify that the open region message is sent - let rx = env - .heartbeat_receivers - .get_mut(&failed_region.datanode_id) - .unwrap(); - let resp = rx.recv().await.unwrap().unwrap(); - let received = &resp.mailbox_message.unwrap(); - assert_eq!(received.id, mailbox_receiver.message_id()); - assert_eq!(received.subject, "Deactivate Region"); - assert_eq!(received.from, "Metasrv@127.0.0.1:3002"); - assert_eq!(received.to, "Datanode-1"); - assert_eq!( - received.payload, - Some(Payload::Json( - serde_json::to_string(&Instruction::CloseRegion(failed_region.clone())).unwrap(), - )) - ); - - let next_state = state - .handle_response(&env.context, mailbox_receiver, &failed_region) - .await - .unwrap(); - // Timeout or not, proceed to `ActivateRegion`. - assert_eq!( - format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"# - ); - } -} diff --git a/src/meta-srv/src/procedure/region_failover/failover_end.rs b/src/meta-srv/src/procedure/region_failover/failover_end.rs deleted file mode 100644 index 48d0a1fa1826..000000000000 --- a/src/meta-srv/src/procedure/region_failover/failover_end.rs +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use common_meta::RegionIdent; -use common_procedure::Status; -use serde::{Deserialize, Serialize}; - -use super::{RegionFailoverContext, State}; -use crate::error::Result; - -#[derive(Serialize, Deserialize, Debug)] -pub(super) struct RegionFailoverEnd; - -#[async_trait] -#[typetag::serde] -impl State for RegionFailoverEnd { - async fn next(&mut self, _: &RegionFailoverContext, _: &RegionIdent) -> Result> { - Ok(Box::new(RegionFailoverEnd)) - } - - fn status(&self) -> Status { - Status::done() - } -} diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs deleted file mode 100644 index aaa1c9949857..000000000000 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use common_error::ext::{BoxedError, ErrorExt}; -use common_error::status_code::StatusCode; -use common_meta::peer::Peer; -use common_meta::RegionIdent; -use common_telemetry::info; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, location, Location}; - -use super::deactivate_region::DeactivateRegion; -use super::{RegionFailoverContext, State}; -use crate::error::{self, RegionFailoverCandidatesNotFoundSnafu, Result}; -use crate::selector::SelectorOptions; - -#[derive(Serialize, Deserialize, Debug)] -pub(super) struct RegionFailoverStart { - failover_candidate: Option, -} - -impl RegionFailoverStart { - pub(super) fn new() -> Self { - Self { - failover_candidate: None, - } - } - - async fn choose_candidate( - &mut self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result { - if let Some(candidate) = self.failover_candidate.clone() { - return Ok(candidate); - } - - let mut selector_ctx = ctx.selector_ctx.clone(); - selector_ctx.table_id = Some(failed_region.table_id); - - let cluster_id = failed_region.cluster_id; - let opts = SelectorOptions::default(); - let candidates = ctx - .selector - .select(cluster_id, &selector_ctx, opts) - .await? - .iter() - .filter_map(|p| { - if p.id != failed_region.datanode_id { - Some(p.clone()) - } else { - None - } - }) - .collect::>(); - ensure!( - !candidates.is_empty(), - RegionFailoverCandidatesNotFoundSnafu { - failed_region: format!("{failed_region:?}"), - } - ); - - // Safety: indexing is guarded by the "ensure!" above. - let candidate = &candidates[0]; - self.failover_candidate = Some(candidate.clone()); - info!("Choose failover candidate datanode {candidate:?} for region: {failed_region}"); - Ok(candidate.clone()) - } -} - -#[async_trait] -#[typetag::serde] -impl State for RegionFailoverStart { - async fn next( - &mut self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result> { - let candidate = self - .choose_candidate(ctx, failed_region) - .await - .map_err(|e| { - if e.status_code() == StatusCode::RuntimeResourcesExhausted { - error::Error::RetryLaterWithSource { - reason: format!("Region failover aborted for {failed_region:?}"), - location: location!(), - source: BoxedError::new(e), - } - } else { - e - } - })?; - return Ok(Box::new(DeactivateRegion::new(candidate))); - } -} - -#[cfg(test)] -mod tests { - use super::super::tests::TestingEnvBuilder; - use super::*; - - #[tokio::test] - async fn test_choose_failover_candidate() { - common_telemetry::init_default_ut_logging(); - - let env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let mut state = RegionFailoverStart::new(); - assert!(state.failover_candidate.is_none()); - - let candidate = state - .choose_candidate(&env.context, &failed_region) - .await - .unwrap(); - assert_ne!(candidate.id, failed_region.datanode_id); - - let candidate_again = state - .choose_candidate(&env.context, &failed_region) - .await - .unwrap(); - assert_eq!(candidate, candidate_again); - } -} diff --git a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs deleted file mode 100644 index d7231abfc834..000000000000 --- a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::meta::MailboxMessage; -use async_trait::async_trait; -use common_meta::instruction::{CacheIdent, Instruction}; -use common_meta::RegionIdent; -use common_telemetry::info; -use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use table::metadata::TableId; - -use super::failover_end::RegionFailoverEnd; -use super::{RegionFailoverContext, State}; -use crate::error::{self, Result}; -use crate::service::mailbox::BroadcastChannel; - -#[derive(Serialize, Deserialize, Debug, Default)] -pub(super) struct InvalidateCache; - -impl InvalidateCache { - async fn broadcast_invalidate_table_cache_messages( - &self, - ctx: &RegionFailoverContext, - table_id: TableId, - ) -> Result<()> { - let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); - - let msg = &MailboxMessage::json_message( - "Invalidate Table Cache", - &format!("Metasrv@{}", ctx.selector_ctx.server_addr), - "Frontend broadcast", - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| error::SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - ctx.mailbox - .broadcast(&BroadcastChannel::Frontend, msg) - .await - } -} - -#[async_trait] -#[typetag::serde] -impl State for InvalidateCache { - async fn next( - &mut self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result> { - let table_id = failed_region.table_id; - info!( - "Broadcast invalidate table({}) cache message to frontend", - table_id - ); - self.broadcast_invalidate_table_cache_messages(ctx, table_id) - .await?; - - Ok(Box::new(RegionFailoverEnd)) - } -} - -#[cfg(test)] -mod tests { - use api::v1::meta::mailbox_message::Payload; - use api::v1::meta::RequestHeader; - - use super::super::tests::TestingEnvBuilder; - use super::*; - use crate::handler::Pusher; - use crate::procedure::region_failover::tests::TestingEnv; - use crate::service::mailbox::Channel; - - #[tokio::test] - async fn test_invalidate_table_cache() { - common_telemetry::init_default_ut_logging(); - - let env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let TestingEnv { - mut heartbeat_receivers, - context, - pushers, - .. - } = env; - - for frontend_id in 4..=7 { - let (tx, rx) = tokio::sync::mpsc::channel(1); - - let pusher_id = Channel::Frontend(frontend_id).pusher_id(); - let pusher = Pusher::new(tx, &RequestHeader::default()); - let _ = pushers.insert(pusher_id, pusher).await; - - let _ = heartbeat_receivers.insert(frontend_id, rx); - } - - let table_id = failed_region.table_id; - - // lexicographical order - // frontend-4,5,6,7 - let next_state = InvalidateCache - .next(&context, &failed_region) - .await - .unwrap(); - assert_eq!(format!("{next_state:?}"), "RegionFailoverEnd"); - - for i in 4..=7 { - // frontend id starts from 4 - let rx = heartbeat_receivers.get_mut(&i).unwrap(); - let resp = rx.recv().await.unwrap().unwrap(); - let received = &resp.mailbox_message.unwrap(); - - assert_eq!(received.id, 0); - assert_eq!(received.subject, "Invalidate Table Cache"); - assert_eq!(received.from, "Metasrv@127.0.0.1:3002"); - assert_eq!(received.to, "Frontend broadcast"); - - assert_eq!( - received.payload, - Some(Payload::Json( - serde_json::to_string(&Instruction::InvalidateCaches(vec![ - CacheIdent::TableId(table_id) - ])) - .unwrap(), - )) - ); - } - } -} diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs deleted file mode 100644 index 6302d20eee73..000000000000 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ /dev/null @@ -1,496 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use async_trait::async_trait; -use common_error::ext::BoxedError; -use common_meta::key::datanode_table::RegionInfo; -use common_meta::key::table_route::TableRouteKey; -use common_meta::peer::Peer; -use common_meta::rpc::router::RegionRoute; -use common_meta::RegionIdent; -use common_telemetry::info; -use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; - -use super::invalidate_cache::InvalidateCache; -use super::{RegionFailoverContext, State}; -use crate::error::{self, Result, TableRouteNotFoundSnafu}; -use crate::lock::keys::table_metadata_lock_key; -use crate::lock::Opts; - -#[derive(Serialize, Deserialize, Debug, PartialEq)] -pub(super) struct UpdateRegionMetadata { - candidate: Peer, - region_storage_path: String, - region_options: HashMap, - #[serde(default)] - region_wal_options: HashMap, -} - -impl UpdateRegionMetadata { - pub(super) fn new( - candidate: Peer, - region_storage_path: String, - region_options: HashMap, - region_wal_options: HashMap, - ) -> Self { - Self { - candidate, - region_storage_path, - region_options, - region_wal_options, - } - } - - /// Updates the metadata of the table. - async fn update_metadata( - &self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result<()> { - let key = table_metadata_lock_key(failed_region); - let key = ctx.dist_lock.lock(key, Opts::default()).await?; - - self.update_table_route(ctx, failed_region).await?; - - ctx.dist_lock.unlock(key).await?; - Ok(()) - } - - async fn update_table_route( - &self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result<()> { - let table_id = failed_region.table_id; - let engine = &failed_region.engine; - - let table_route_value = ctx - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_raw(table_id) - .await - .context(error::TableMetadataManagerSnafu)? - .context(TableRouteNotFoundSnafu { table_id })?; - - let mut new_region_routes = table_route_value - .region_routes() - .context(error::UnexpectedLogicalRouteTableSnafu { - err_msg: format!("{self:?} is a non-physical TableRouteValue."), - })? - .clone(); - - for region_route in new_region_routes.iter_mut() { - if region_route.region.id.region_number() == failed_region.region_number { - region_route.leader_peer = Some(self.candidate.clone()); - region_route.set_leader_status(None); - break; - } - } - - pretty_log_table_route_change( - TableRouteKey::new(table_id).to_string(), - &new_region_routes, - failed_region, - ); - - ctx.table_metadata_manager - .update_table_route( - table_id, - RegionInfo { - engine: engine.to_string(), - region_storage_path: self.region_storage_path.to_string(), - region_options: self.region_options.clone(), - region_wal_options: self.region_wal_options.clone(), - }, - &table_route_value, - new_region_routes, - &self.region_options, - &self.region_wal_options, - ) - .await - .context(error::UpdateTableRouteSnafu)?; - - Ok(()) - } -} - -fn pretty_log_table_route_change( - key: String, - region_routes: &[RegionRoute], - failed_region: &RegionIdent, -) { - let region_routes = region_routes - .iter() - .map(|x| { - format!( - "{{region: {}, leader: {}, followers: [{}]}}", - x.region.id, - x.leader_peer - .as_ref() - .map(|p| p.id.to_string()) - .unwrap_or_else(|| "?".to_string()), - x.follower_peers - .iter() - .map(|p| p.id.to_string()) - .collect::>() - .join(","), - ) - }) - .collect::>(); - - info!( - "Updating region routes in table route value (key = '{}') to [{}]. \ - Failed region {} was on Datanode {}.", - key, - region_routes.join(", "), - failed_region.region_number, - failed_region.datanode_id, - ); -} - -#[async_trait] -#[typetag::serde] -impl State for UpdateRegionMetadata { - async fn next( - &mut self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result> { - self.update_metadata(ctx, failed_region) - .await - .map_err(BoxedError::new) - .context(error::RetryLaterWithSourceSnafu { - reason: format!( - "Failed to update metadata for failed region: {}", - failed_region - ), - })?; - Ok(Box::new(InvalidateCache)) - } -} - -#[cfg(test)] -mod tests { - - use common_meta::rpc::router::{extract_all_peers, region_distribution}; - use futures::TryStreamExt; - - use super::super::tests::{TestingEnv, TestingEnvBuilder}; - use super::{State, *}; - use crate::test_util::new_region_route; - - #[tokio::test] - async fn test_next_state() { - let env = TestingEnvBuilder::new().build().await; - let failed_region = env.failed_region(1).await; - - let mut state = UpdateRegionMetadata::new( - Peer::new(2, ""), - env.path.clone(), - HashMap::new(), - HashMap::new(), - ); - - let next_state = state.next(&env.context, &failed_region).await.unwrap(); - assert_eq!(format!("{next_state:?}"), "InvalidateCache"); - } - - #[tokio::test] - async fn test_update_table_route() { - common_telemetry::init_default_ut_logging(); - - async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> Vec { - let failed_region = env.failed_region(failed_region).await; - - let state = UpdateRegionMetadata::new( - Peer::new(candidate, ""), - env.path.clone(), - HashMap::new(), - HashMap::new(), - ); - state - .update_table_route(&env.context, &failed_region) - .await - .unwrap(); - - let table_id = failed_region.table_id; - - env.context - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_raw(table_id) - .await - .unwrap() - .unwrap() - .into_inner() - .region_routes() - .unwrap() - .clone() - } - - // Original region routes: - // region number => leader node - // 1 => 1 - // 2 => 1 - // 3 => 2 - // 4 => 3 - - // Testing failed region 1 moves to Datanode 2. - let env = TestingEnvBuilder::new().build().await; - let actual = test(env, 1, 2).await; - - // Expected region routes: - // region number => leader node - // 1 => 2 - // 2 => 1 - // 3 => 2 - // 4 => 3 - let peers = &extract_all_peers(&actual); - assert_eq!(peers.len(), 3); - let expected = vec![ - new_region_route(1, peers, 2), - new_region_route(2, peers, 1), - new_region_route(3, peers, 2), - new_region_route(4, peers, 3), - ]; - assert_eq!(actual, expected); - - // Testing failed region 3 moves to Datanode 3. - let env = TestingEnvBuilder::new().build().await; - let actual = test(env, 3, 3).await; - - // Expected region routes: - // region number => leader node - // 1 => 1 - // 2 => 1 - // 3 => 3 - // 4 => 3 - let peers = &extract_all_peers(&actual); - assert_eq!(peers.len(), 2); - let expected = vec![ - new_region_route(1, peers, 1), - new_region_route(2, peers, 1), - new_region_route(3, peers, 3), - new_region_route(4, peers, 3), - ]; - assert_eq!(actual, expected); - - // Testing failed region 1 moves to a new Datanode, 4. - let env = TestingEnvBuilder::new().build().await; - let actual = test(env, 1, 4).await; - - // Expected region routes: - // region number => leader node - // 1 => 4 - // 2 => 1 - // 3 => 2 - // 4 => 3 - let peers = &extract_all_peers(&actual); - assert_eq!(peers.len(), 4); - let expected = vec![ - new_region_route(1, peers, 4), - new_region_route(2, peers, 1), - new_region_route(3, peers, 2), - new_region_route(4, peers, 3), - ]; - assert_eq!(actual, expected); - - // Testing failed region 3 moves to a new Datanode, 4. - let env = TestingEnvBuilder::new().build().await; - let actual = test(env, 3, 4).await; - - // Expected region routes: - // region number => leader node - // 1 => 1 - // 2 => 1 - // 3 => 4 - // 4 => 3 - let peers = &extract_all_peers(&actual); - assert_eq!(peers.len(), 3); - let expected = vec![ - new_region_route(1, peers, 1), - new_region_route(2, peers, 1), - new_region_route(3, peers, 4), - new_region_route(4, peers, 3), - ]; - assert_eq!(actual, expected); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_update_metadata_concurrently() { - common_telemetry::init_default_ut_logging(); - - // Test the correctness of concurrently updating the region distribution in table region - // value, and region routes in table route value. Region 1 moves to Datanode 2; region 2 - // moves to Datanode 3. - // - // Datanode => Regions - // Before: | After: - // 1 => 1, 2 | - // 2 => 3 | 2 => 3, 1 - // 3 => 4 | 3 => 4, 2 - // - // region number => leader node - // Before: | After: - // 1 => 1 | 1 => 2 - // 2 => 1 | 2 => 3 - // 3 => 2 | 3 => 2 - // 4 => 3 | 4 => 3 - // - // Test case runs 10 times to enlarge the possibility of concurrent updating. - for _ in 0..10 { - let env = TestingEnvBuilder::new().build().await; - - let ctx_1 = env.context.clone(); - let ctx_2 = env.context.clone(); - - let failed_region_1 = env.failed_region(1).await; - let failed_region_2 = env.failed_region(2).await; - - let table_id = failed_region_1.table_id; - let path = env.path.clone(); - let _ = futures::future::join_all(vec![ - tokio::spawn(async move { - let state = UpdateRegionMetadata::new( - Peer::new(2, ""), - path, - HashMap::new(), - HashMap::new(), - ); - state - .update_metadata(&ctx_1, &failed_region_1) - .await - .unwrap(); - }), - tokio::spawn(async move { - let state = UpdateRegionMetadata::new( - Peer::new(3, ""), - env.path.clone(), - HashMap::new(), - HashMap::new(), - ); - state - .update_metadata(&ctx_2, &failed_region_2) - .await - .unwrap(); - }), - ]) - .await; - - let table_route_value = env - .context - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get(table_id) - .await - .unwrap() - .unwrap(); - - let peers = &extract_all_peers(table_route_value.region_routes().unwrap()); - let actual = table_route_value.region_routes().unwrap(); - let expected = &vec![ - new_region_route(1, peers, 2), - new_region_route(2, peers, 3), - new_region_route(3, peers, 2), - new_region_route(4, peers, 3), - ]; - assert_eq!(peers.len(), 2); - assert_eq!(actual, expected); - - let manager = &env.context.table_metadata_manager; - let table_route_value = manager - .table_route_manager() - .table_route_storage() - .get(table_id) - .await - .unwrap() - .unwrap(); - - let map = region_distribution(table_route_value.region_routes().unwrap()); - assert_eq!(map.len(), 2); - assert_eq!(map.get(&2), Some(&vec![1, 3])); - assert_eq!(map.get(&3), Some(&vec![2, 4])); - - // test DatanodeTableValues matches the table region distribution - let datanode_table_manager = manager.datanode_table_manager(); - let tables = datanode_table_manager - .tables(1) - .try_collect::>() - .await - .unwrap(); - assert!(tables.is_empty()); - - let tables = datanode_table_manager - .tables(2) - .try_collect::>() - .await - .unwrap(); - assert_eq!(tables.len(), 1); - assert_eq!(tables[0].table_id, 1); - assert_eq!(tables[0].regions, vec![1, 3]); - - let tables = datanode_table_manager - .tables(3) - .try_collect::>() - .await - .unwrap(); - assert_eq!(tables.len(), 1); - assert_eq!(tables[0].table_id, 1); - assert_eq!(tables[0].regions, vec![2, 4]); - } - } - - #[derive(Debug, Clone, Serialize, Deserialize)] - struct LegacyUpdateRegionMetadata { - candidate: Peer, - region_storage_path: String, - region_options: HashMap, - } - - #[test] - fn test_compatible_serialize_update_region_metadata() { - let candidate = Peer::new(1, "test_addr"); - let region_storage_path = "test_path".to_string(); - let region_options = HashMap::from([ - ("a".to_string(), "aa".to_string()), - ("b".to_string(), "bb".to_string()), - ]); - - let legacy_update_region_metadata = LegacyUpdateRegionMetadata { - candidate: candidate.clone(), - region_storage_path: region_storage_path.clone(), - region_options: region_options.clone(), - }; - - // Serialize a LegacyUpdateRegionMetadata. - let serialized = serde_json::to_string(&legacy_update_region_metadata).unwrap(); - - // Deserialize to UpdateRegionMetadata. - let deserialized = serde_json::from_str(&serialized).unwrap(); - let expected = UpdateRegionMetadata { - candidate, - region_storage_path, - region_options, - region_wal_options: HashMap::new(), - }; - assert_eq!(expected, deserialized); - } -} diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 9f9f119e7eca..576d1aa92365 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -12,22 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; -use chrono::DateTime; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; -use common_meta::key::table_route::TableRouteValue; -use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::ClusterId; use common_time::util as time_util; -use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, RawSchema}; -use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; -use table::requests::TableOptions; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::key::{DatanodeLeaseKey, LeaseValue}; @@ -72,69 +63,6 @@ pub(crate) fn create_selector_context() -> SelectorContext { } } -pub(crate) async fn prepare_table_region_and_info_value( - table_metadata_manager: &TableMetadataManagerRef, - table: &str, -) { - let table_info = RawTableInfo { - ident: TableIdent::new(1), - name: table.to_string(), - desc: None, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - meta: RawTableMeta { - schema: RawSchema::new(vec![ColumnSchema::new( - "a", - ConcreteDataType::string_datatype(), - true, - )]), - primary_key_indices: vec![], - value_indices: vec![], - engine: MITO_ENGINE.to_string(), - next_column_id: 1, - region_numbers: vec![1, 2, 3, 4], - options: TableOptions::default(), - created_on: DateTime::default(), - partition_key_indices: vec![], - }, - table_type: TableType::Base, - }; - - let region_route_factory = |region_id: u64, peer: u64| RegionRoute { - region: Region { - id: region_id.into(), - ..Default::default() - }, - leader_peer: Some(Peer { - id: peer, - addr: String::new(), - }), - follower_peers: vec![], - leader_status: None, - leader_down_since: None, - }; - - // Region distribution: - // Datanode => Regions - // 1 => 1, 2 - // 2 => 3 - // 3 => 4 - let region_routes = vec![ - region_route_factory(1, 1), - region_route_factory(2, 1), - region_route_factory(3, 2), - region_route_factory(4, 3), - ]; - table_metadata_manager - .create_table_metadata( - table_info, - TableRouteValue::physical(region_routes), - HashMap::default(), - ) - .await - .unwrap(); -} - pub(crate) async fn put_datanodes( cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index 42560e46f4cd..4fc19f24b284 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -19,10 +19,7 @@ mod http; #[macro_use] mod sql; #[macro_use] -#[allow(dead_code)] mod region_migration; -// #[macro_use] -// mod region_failover; grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs deleted file mode 100644 index 103b9481f8ae..000000000000 --- a/tests-integration/tests/region_failover.rs +++ /dev/null @@ -1,376 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; -use std::time::Duration; - -use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; -use client::OutputData; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_meta::key::table_route::TableRouteKey; -use common_meta::key::{MetaKey, RegionDistribution}; -use common_meta::peer::Peer; -use common_meta::{distributed_time_constants, RegionIdent}; -use common_procedure::{watcher, ProcedureWithId}; -use common_query::Output; -use common_telemetry::info; -use common_test_util::recordbatch::check_output_stream; -use frontend::error::Result as FrontendResult; -use frontend::instance::Instance; -use futures::TryStreamExt; -use meta_srv::error::Result as MetaResult; -use meta_srv::metasrv::{SelectorContext, SelectorRef}; -use meta_srv::procedure::region_failover::{RegionFailoverContext, RegionFailoverProcedure}; -use meta_srv::selector::{Namespace, Selector, SelectorOptions}; -use servers::query_handler::sql::SqlQueryHandler; -use session::context::{QueryContext, QueryContextRef}; -use table::metadata::TableId; -use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; -use tests_integration::test_util::{get_test_store_config, StorageType}; -use tokio::time; - -#[macro_export] -macro_rules! region_failover_test { - ($service:ident, $($(#[$meta:meta])* $test:ident),*,) => { - paste::item! { - mod [] { - $( - #[tokio::test(flavor = "multi_thread")] - $( - #[$meta] - )* - async fn [< $test >]() { - let store_type = tests_integration::test_util::StorageType::$service; - if store_type.test_on() { - let _ = $crate::region_failover::$test(store_type).await; - } - - } - )* - } - } - }; -} - -#[macro_export] -macro_rules! region_failover_tests { - ($($service:ident),*) => { - $( - region_failover_test!( - $service, - - test_region_failover, - ); - )* - }; -} - -pub async fn test_region_failover(store_type: StorageType) { - if store_type == StorageType::File { - // Region failover doesn't make sense when using local file storage. - return; - } - common_telemetry::init_default_ut_logging(); - info!("Running region failover test for {}", store_type); - - let mut logical_timer = 1685508715000; - - let cluster_name = "test_region_failover"; - - let (store_config, _guard) = get_test_store_config(&store_type); - - let datanodes = 5u64; - let builder = GreptimeDbClusterBuilder::new(cluster_name).await; - let cluster = builder - .with_datanodes(datanodes as u32) - .with_store_config(store_config) - .build() - .await; - - let frontend = cluster.frontend.clone(); - - let table_id = prepare_testing_table(&cluster).await; - - let results = insert_values(&frontend, logical_timer).await; - logical_timer += 1000; - for result in results { - assert!(matches!(result.unwrap().data, OutputData::AffectedRows(1))); - } - - assert!(has_route_cache(&frontend, table_id).await); - - let distribution = find_region_distribution(&cluster, table_id).await; - info!("Find region distribution: {distribution:?}"); - - let mut foreign = 0; - for dn in 1..=datanodes { - if !&distribution.contains_key(&dn) { - foreign = dn - } - } - - let selector = Arc::new(ForeignNodeSelector { - foreign: Peer { - id: foreign, - // "127.0.0.1:3001" is just a placeholder, does not actually connect to it. - addr: "127.0.0.1:3001".to_string(), - }, - }); - - let failed_region = choose_failed_region(distribution); - info!("Simulating failed region: {failed_region:#?}"); - - run_region_failover_procedure(&cluster, failed_region.clone(), selector).await; - - let distribution = find_region_distribution(&cluster, table_id).await; - info!("Find region distribution again: {distribution:?}"); - - // Waits for invalidating table cache - time::sleep(Duration::from_millis(100)).await; - - assert!(!has_route_cache(&frontend, table_id).await); - - // Inserts data to each datanode after failover - let frontend = cluster.frontend.clone(); - let results = insert_values(&frontend, logical_timer).await; - for result in results { - assert!(matches!(result.unwrap().data, OutputData::AffectedRows(1))); - } - - assert_values(&frontend).await; - - assert!(!distribution.contains_key(&failed_region.datanode_id)); - - let mut success = false; - let values = distribution.values(); - for val in values { - success = success || val.contains(&failed_region.region_number); - } - assert!(success) -} - -async fn has_route_cache(instance: &Arc, table_id: TableId) -> bool { - let catalog_manager = instance - .catalog_manager() - .as_any() - .downcast_ref::() - .unwrap(); - - let kv_backend = catalog_manager.table_metadata_manager_ref().kv_backend(); - - let cache = kv_backend - .as_any() - .downcast_ref::() - .unwrap() - .cache(); - - cache - .get(TableRouteKey::new(table_id).to_bytes().as_slice()) - .await - .is_some() -} - -async fn insert_values(instance: &Arc, ts: u64) -> Vec> { - let query_ctx = QueryContext::arc(); - - let mut results = Vec::new(); - for range in [5, 15, 25, 55] { - let result = insert_value( - instance, - &format!("INSERT INTO my_table VALUES ({},{})", range, ts), - query_ctx.clone(), - ) - .await; - results.push(result); - } - - results -} - -async fn insert_value( - instance: &Arc, - sql: &str, - query_ctx: QueryContextRef, -) -> FrontendResult { - instance.do_query(sql, query_ctx).await.remove(0) -} - -async fn assert_values(instance: &Arc) { - let query_ctx = QueryContext::arc(); - - let result = instance - .do_query("select * from my_table order by i, ts", query_ctx) - .await - .remove(0); - - let expected = "\ -+----+---------------------+ -| i | ts | -+----+---------------------+ -| 5 | 2023-05-31T04:51:55 | -| 5 | 2023-05-31T04:51:56 | -| 15 | 2023-05-31T04:51:55 | -| 15 | 2023-05-31T04:51:56 | -| 25 | 2023-05-31T04:51:55 | -| 25 | 2023-05-31T04:51:56 | -| 55 | 2023-05-31T04:51:55 | -| 55 | 2023-05-31T04:51:56 | -+----+---------------------+"; - check_output_stream(result.unwrap().data, expected).await; -} - -async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId { - let sql = r" -CREATE TABLE my_table ( - i INT PRIMARY KEY, - ts TIMESTAMP TIME INDEX, -) PARTITION BY RANGE COLUMNS (i) ( - PARTITION r0 VALUES LESS THAN (10), - PARTITION r1 VALUES LESS THAN (20), - PARTITION r2 VALUES LESS THAN (50), - PARTITION r3 VALUES LESS THAN (MAXVALUE), -)"; - let result = cluster.frontend.do_query(sql, QueryContext::arc()).await; - result.first().unwrap().as_ref().unwrap(); - - let table = cluster - .frontend - .catalog_manager() - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table") - .await - .unwrap() - .unwrap(); - table.table_info().table_id() -} - -async fn find_region_distribution( - cluster: &GreptimeDbCluster, - table_id: TableId, -) -> RegionDistribution { - let manager = cluster.metasrv.table_metadata_manager(); - let region_distribution = manager - .table_route_manager() - .get_region_distribution(table_id) - .await - .unwrap() - .unwrap(); - - // test DatanodeTableValues match the table region distribution - for datanode_id in cluster.datanode_instances.keys() { - let mut actual = manager - .datanode_table_manager() - .tables(*datanode_id) - .try_collect::>() - .await - .unwrap() - .into_iter() - .filter_map(|x| { - if x.table_id == table_id { - Some(x.regions) - } else { - None - } - }) - .flatten() - .collect::>(); - actual.sort(); - - if let Some(mut expected) = region_distribution.get(datanode_id).cloned() { - expected.sort(); - assert_eq!(expected, actual); - } else { - assert!(actual.is_empty()); - } - } - - region_distribution -} - -fn choose_failed_region(distribution: RegionDistribution) -> RegionIdent { - let (failed_datanode, failed_region) = distribution - .iter() - .filter_map(|(datanode_id, regions)| { - if !regions.is_empty() { - Some((*datanode_id, regions[0])) - } else { - None - } - }) - .next() - .unwrap(); - RegionIdent { - cluster_id: 1000, - datanode_id: failed_datanode, - table_id: 1025, - region_number: failed_region, - engine: "mito2".to_string(), - } -} - -// The "foreign" means the Datanode is not containing any regions to the table before. -pub struct ForeignNodeSelector { - pub foreign: Peer, -} - -#[async_trait::async_trait] -impl Selector for ForeignNodeSelector { - type Context = SelectorContext; - type Output = Vec; - - async fn select( - &self, - _ns: Namespace, - _ctx: &Self::Context, - _opts: SelectorOptions, - ) -> MetaResult { - Ok(vec![self.foreign.clone()]) - } -} - -async fn run_region_failover_procedure( - cluster: &GreptimeDbCluster, - failed_region: RegionIdent, - selector: SelectorRef, -) { - let metasrv = &cluster.metasrv; - let procedure_manager = metasrv.procedure_manager(); - let procedure = RegionFailoverProcedure::new( - "greptime".into(), - "public".into(), - failed_region.clone(), - RegionFailoverContext { - region_lease_secs: 10, - in_memory: metasrv.in_memory().clone(), - kv_backend: metasrv.kv_backend().clone(), - mailbox: metasrv.mailbox().clone(), - selector, - selector_ctx: SelectorContext { - datanode_lease_secs: distributed_time_constants::REGION_LEASE_SECS, - flownode_lease_secs: distributed_time_constants::REGION_LEASE_SECS, - server_addr: metasrv.options().server_addr.clone(), - kv_backend: metasrv.kv_backend().clone(), - meta_peer_client: metasrv.meta_peer_client().clone(), - table_id: None, - }, - dist_lock: metasrv.lock().clone(), - table_metadata_manager: metasrv.table_metadata_manager().clone(), - }, - ); - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - let procedure_id = procedure_with_id.id; - info!("Starting region failover procedure {procedure_id} for region {failed_region:?}"); - - let watcher = &mut procedure_manager.submit(procedure_with_id).await.unwrap(); - watcher::wait(watcher).await.unwrap(); -}