diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 40df9401cb24..de36cc854cbc 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod close_downgraded_region; pub(crate) mod downgrade_leader_region; pub(crate) mod manager; pub(crate) mod migration_abort; @@ -43,6 +44,7 @@ use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey}; +use common_telemetry::info; use manager::RegionMigrationProcedureGuard; pub use manager::{ RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker, @@ -91,7 +93,9 @@ impl PersistentContext { let lock_key = vec![ CatalogLock::Read(&self.catalog).into(), SchemaLock::read(&self.catalog, &self.schema).into(), - TableLock::Read(region_id.table_id()).into(), + // The optimistic updating of table route is not working very well, + // so we need to use the write lock here. + TableLock::Write(region_id.table_id()).into(), RegionLock::Write(region_id).into(), ]; @@ -253,7 +257,7 @@ impl Context { .await .context(error::TableMetadataManagerSnafu) .map_err(BoxedError::new) - .context(error::RetryLaterWithSourceSnafu { + .with_context(|_| error::RetryLaterWithSourceSnafu { reason: format!("Failed to get TableRoute: {table_id}"), })? .context(error::TableRouteNotFoundSnafu { table_id })?; @@ -317,7 +321,7 @@ impl Context { .await .context(error::TableMetadataManagerSnafu) .map_err(BoxedError::new) - .context(error::RetryLaterWithSourceSnafu { + .with_context(|_| error::RetryLaterWithSourceSnafu { reason: format!("Failed to get TableInfo: {table_id}"), })? .context(error::TableInfoNotFoundSnafu { table_id })?; @@ -350,7 +354,7 @@ impl Context { .await .context(error::TableMetadataManagerSnafu) .map_err(BoxedError::new) - .context(error::RetryLaterWithSourceSnafu { + .with_context(|_| error::RetryLaterWithSourceSnafu { reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"), })? .context(error::DatanodeTableNotFoundSnafu { @@ -474,6 +478,48 @@ impl RegionMigrationProcedure { _guard: guard, }) } + + async fn rollback_inner(&mut self) -> Result<()> { + let _timer = METRIC_META_REGION_MIGRATION_EXECUTE + .with_label_values(&["rollback"]) + .start_timer(); + + let table_id = self.context.region_id().table_id(); + let region_id = self.context.region_id(); + self.context.remove_table_route_value(); + let table_metadata_manager = self.context.table_metadata_manager.clone(); + let table_route = self.context.get_table_route_value().await?; + + // Safety: It must be a physical table route. + let downgraded = table_route + .region_routes() + .unwrap() + .iter() + .filter(|route| route.region.id == region_id) + .any(|route| route.is_leader_downgrading()); + + if downgraded { + info!("Rollbacking downgraded region leader table route, region: {region_id}"); + table_metadata_manager + .update_leader_region_status(table_id, table_route, |route| { + if route.region.id == region_id { + Some(None) + } else { + None + } + }) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"), + })?; + } + + self.context.register_failure_detectors().await; + + Ok(()) + } } #[async_trait::async_trait] @@ -482,6 +528,16 @@ impl Procedure for RegionMigrationProcedure { Self::TYPE_NAME } + async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> { + self.rollback_inner() + .await + .map_err(ProcedureError::external) + } + + fn rollback_supported(&self) -> bool { + true + } + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; @@ -707,6 +763,12 @@ mod tests { Assertion::simple(assert_update_metadata_upgrade, assert_no_persist), ), // UpdateMetadata::Upgrade + Step::next( + "Should be the close downgraded region", + None, + Assertion::simple(assert_close_downgraded_region, assert_no_persist), + ), + // CloseDowngradedRegion Step::next( "Should be the region migration end", None, @@ -1077,6 +1139,12 @@ mod tests { Assertion::simple(assert_update_metadata_upgrade, assert_no_persist), ), // UpdateMetadata::Upgrade + Step::next( + "Should be the close downgraded region", + None, + Assertion::simple(assert_close_downgraded_region, assert_no_persist), + ), + // CloseDowngradedRegion Step::next( "Should be the region migration end", None, diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs new file mode 100644 index 000000000000..9113607681cc --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::time::Duration; + +use api::v1::meta::MailboxMessage; +use common_meta::distributed_time_constants::MAILBOX_RTT_SECS; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::key::datanode_table::RegionInfo; +use common_meta::RegionIdent; +use common_procedure::Status; +use common_telemetry::{info, warn}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; +use crate::procedure::region_migration::migration_end::RegionMigrationEnd; +use crate::procedure::region_migration::{Context, State}; +use crate::service::mailbox::Channel; + +const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS); + +#[derive(Debug, Serialize, Deserialize)] +pub struct CloseDowngradedRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for CloseDowngradedRegion { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + if let Err(err) = self.close_downgraded_leader_region(ctx).await { + let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer; + let region_id = ctx.region_id(); + warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode); + } + + Ok((Box::new(RegionMigrationEnd), Status::done())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl CloseDowngradedRegion { + /// Builds close region instruction. + /// + /// Abort(non-retry): + /// - Datanode Table is not found. + async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result { + let pc = &ctx.persistent_ctx; + let downgrade_leader_datanode_id = pc.from_peer.id; + let cluster_id = pc.cluster_id; + let table_id = pc.region_id.table_id(); + let region_number = pc.region_id.region_number(); + let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; + + let RegionInfo { engine, .. } = datanode_table_value.region_info.clone(); + + Ok(Instruction::CloseRegion(RegionIdent { + cluster_id, + datanode_id: downgrade_leader_datanode_id, + table_id, + region_number, + engine, + })) + } + + /// Closes the downgraded leader region. + async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> { + let close_instruction = self.build_close_region_instruction(ctx).await?; + let region_id = ctx.region_id(); + let pc = &ctx.persistent_ctx; + let downgrade_leader_datanode = &pc.from_peer; + let msg = MailboxMessage::json_message( + &format!("Close downgraded region: {}", region_id), + &format!("Meta@{}", ctx.server_addr()), + &format!( + "Datanode-{}@{}", + downgrade_leader_datanode.id, downgrade_leader_datanode.addr + ), + common_time::util::current_time_millis(), + &close_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: close_instruction.to_string(), + })?; + + let ch = Channel::Datanode(downgrade_leader_datanode.id); + let receiver = ctx + .mailbox + .send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT) + .await?; + + match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received close downgraded leade region reply: {:?}, region: {}", + reply, region_id + ); + let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect close region reply", + } + .fail(); + }; + + if result { + Ok(()) + } else { + error::UnexpectedSnafu { + violated: format!( + "Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}", + downgrade_leader_datanode, + ), + } + .fail() + } + } + + Err(e) => Err(e), + } + } +} diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 3f8103341029..4c097631d35f 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -21,11 +21,11 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use super::migration_abort::RegionMigrationAbort; -use super::migration_end::RegionMigrationEnd; -use super::open_candidate_region::OpenCandidateRegion; -use super::update_metadata::UpdateMetadata; use crate::error::{self, Result}; +use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; +use crate::procedure::region_migration::migration_end::RegionMigrationEnd; +use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; +use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{Context, State}; /// The behaviors: diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 22b64b01423f..6a96540b82fb 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -25,9 +25,9 @@ use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use super::update_metadata::UpdateMetadata; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; +use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; @@ -145,7 +145,10 @@ impl OpenCandidateRegion { match receiver.await? { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; - info!("Received open region reply: {:?}", reply); + info!( + "Received open region reply: {:?}, region: {}", + reply, region_id + ); let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else { return error::UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 2058782396e8..2fe55edcab41 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -44,19 +44,21 @@ use store_api::storage::RegionId; use table::metadata::RawTableInfo; use tokio::sync::mpsc::{Receiver, Sender}; -use super::manager::RegionMigrationProcedureTracker; -use super::migration_abort::RegionMigrationAbort; -use super::upgrade_candidate_region::UpgradeCandidateRegion; -use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext}; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::error::{self, Error, Result}; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::metasrv::MetasrvInfo; +use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; +use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker; +use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; use crate::procedure::region_migration::update_metadata::UpdateMetadata; -use crate::procedure::region_migration::PersistentContext; +use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion; +use crate::procedure::region_migration::{ + Context, ContextFactory, DefaultContextFactory, PersistentContext, State, VolatileContext, +}; use crate::service::mailbox::{Channel, MailboxRef}; pub type MockHeartbeatReceiver = Receiver>; @@ -569,6 +571,14 @@ pub(crate) fn assert_region_migration_end(next: &dyn State) { let _ = next.as_any().downcast_ref::().unwrap(); } +/// Asserts the [State] should be [CloseDowngradedRegion]. +pub(crate) fn assert_close_downgraded_region(next: &dyn State) { + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); +} + /// Asserts the [State] should be [RegionMigrationAbort]. pub(crate) fn assert_region_migration_abort(next: &dyn State) { let _ = next diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index 180cf31fe1c4..858669ea2136 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -22,10 +22,10 @@ use common_procedure::Status; use common_telemetry::warn; use serde::{Deserialize, Serialize}; -use super::migration_abort::RegionMigrationAbort; -use super::migration_end::RegionMigrationEnd; use crate::error::Result; +use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; +use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] @@ -58,7 +58,7 @@ impl State for UpdateMetadata { if let Err(err) = ctx.invalidate_table_cache().await { warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}"); }; - Ok((Box::new(RegionMigrationEnd), Status::done())) + Ok((Box::new(CloseDowngradedRegion), Status::executing(false))) } UpdateMetadata::Rollback => { self.rollback_downgraded_region(ctx).await?; diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index b710a0e1f3e0..c180456bd47b 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -195,7 +195,7 @@ mod tests { use store_api::storage::RegionId; use crate::error::Error; - use crate::procedure::region_migration::migration_end::RegionMigrationEnd; + use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; use crate::procedure::region_migration::test_util::{self, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; @@ -443,7 +443,7 @@ mod tests { } #[tokio::test] - async fn test_next_migration_end_state() { + async fn test_next_close_downgraded_region_state() { let mut state = Box::new(UpdateMetadata::Upgrade); let env = TestingEnv::new(); let persistent_context = new_persistent_context(); @@ -471,7 +471,10 @@ mod tests { let (next, _) = state.next(&mut ctx).await.unwrap(); - let _ = next.as_any().downcast_ref::().unwrap(); + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); let table_route = table_metadata_manager .table_route_manager() diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 49100e92f36e..fa989274b44e 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -23,9 +23,9 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::time::{sleep, Instant}; -use super::update_metadata::UpdateMetadata; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; +use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; @@ -155,7 +155,7 @@ impl UpgradeCandidateRegion { exists, error::UnexpectedSnafu { violated: format!( - "Expected region {} doesn't exist on datanode {:?}", + "Candidate region {} doesn't exist on datanode {:?}", region_id, candidate ) } diff --git a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs index d4fa4d08fd5e..5bcddea53abf 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs @@ -229,6 +229,29 @@ async fn create_logical_table_and_insert_values( Ok(()) } +async fn wait_for_migration(ctx: &FuzzContext, migration: &Migration, procedure_id: &str) { + info!("Waits for migration: {migration:?}"); + let region_id = migration.region_id.as_u64(); + wait_condition_fn( + Duration::from_secs(120), + || { + let greptime = ctx.greptime.clone(); + let procedure_id = procedure_id.to_string(); + Box::pin(async move { + let output = procedure_state(&greptime, &procedure_id).await; + info!("Checking procedure: {procedure_id}, output: {output}"); + (fetch_partition(&greptime, region_id).await.unwrap(), output) + }) + }, + |(partition, output)| { + info!("Region: {region_id}, datanode: {}", partition.datanode_id); + partition.datanode_id == migration.to_peer && output.contains("Done") + }, + Duration::from_secs(1), + ) + .await; +} + async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { let mut rng = ChaCha20Rng::seed_from_u64(input.seed); // Creates a physical table. @@ -297,28 +320,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { } info!("Excepted new region distribution: {new_distribution:?}"); for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) { - info!("Waits for migration: {migration:?}"); - let region_id = migration.region_id.as_u64(); - wait_condition_fn( - Duration::from_secs(120), - || { - let greptime = ctx.greptime.clone(); - let procedure_id = procedure_id.to_string(); - Box::pin(async move { - { - let output = procedure_state(&greptime, &procedure_id).await; - info!("Checking procedure: {procedure_id}, output: {output}"); - fetch_partition(&greptime, region_id).await.unwrap() - } - }) - }, - |partition| { - info!("Region: {region_id}, datanode: {}", partition.datanode_id); - partition.datanode_id == migration.to_peer - }, - Duration::from_secs(1), - ) - .await; + wait_for_migration(&ctx, &migration, &procedure_id).await; } // Validates value rows @@ -388,29 +390,8 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { procedure_ids.push(procedure_id); } info!("Excepted new region distribution: {new_distribution:?}"); - for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) { - info!("Waits for migration: {migration:?}"); - let region_id = migration.region_id.as_u64(); - wait_condition_fn( - Duration::from_secs(120), - || { - let greptime = ctx.greptime.clone(); - let procedure_id = procedure_id.to_string(); - Box::pin(async move { - { - let output = procedure_state(&greptime, &procedure_id).await; - info!("Checking procedure: {procedure_id}, output: {output}"); - fetch_partition(&greptime, region_id).await.unwrap() - } - }) - }, - |partition| { - info!("Region: {region_id}, datanode: {}", partition.datanode_id); - partition.datanode_id == migration.to_peer - }, - Duration::from_secs(1), - ) - .await; + for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) { + wait_for_migration(&ctx, &migration, &procedure_id).await; } // Creates more logical tables and inserts values diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index 3f15e859c444..12c4cdae49e1 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -248,13 +248,13 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result< { let output = procedure_state(&greptime, &procedure_id).await; info!("Checking procedure: {procedure_id}, output: {output}"); - fetch_partition(&greptime, region_id).await.unwrap() + (fetch_partition(&greptime, region_id).await.unwrap(), output) } }) }, - |partition| { + |(partition, output)| { info!("Region: {region_id}, datanode: {}", partition.datanode_id); - partition.datanode_id == migration.to_peer + partition.datanode_id == migration.to_peer && output.contains("Done") }, Duration::from_secs(5), )