diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index a9103db3eaba..1fffb50ea835 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod close_downgraded_region; pub(crate) mod downgrade_leader_region; pub(crate) mod manager; pub(crate) mod migration_abort; @@ -772,6 +773,12 @@ mod tests { Assertion::simple(assert_update_metadata_upgrade, assert_no_persist), ), // UpdateMetadata::Upgrade + Step::next( + "Should be the close downgraded region", + None, + Assertion::simple(assert_close_downgraded_region, assert_no_persist), + ), + // CloseDowngradedRegion Step::next( "Should be the region migration end", None, @@ -1142,6 +1149,12 @@ mod tests { Assertion::simple(assert_update_metadata_upgrade, assert_no_persist), ), // UpdateMetadata::Upgrade + Step::next( + "Should be the close downgraded region", + None, + Assertion::simple(assert_close_downgraded_region, assert_no_persist), + ), + // CloseDowngradedRegion Step::next( "Should be the region migration end", None, diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs new file mode 100644 index 000000000000..9113607681cc --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::time::Duration; + +use api::v1::meta::MailboxMessage; +use common_meta::distributed_time_constants::MAILBOX_RTT_SECS; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::key::datanode_table::RegionInfo; +use common_meta::RegionIdent; +use common_procedure::Status; +use common_telemetry::{info, warn}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; +use crate::procedure::region_migration::migration_end::RegionMigrationEnd; +use crate::procedure::region_migration::{Context, State}; +use crate::service::mailbox::Channel; + +const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS); + +#[derive(Debug, Serialize, Deserialize)] +pub struct CloseDowngradedRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for CloseDowngradedRegion { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + if let Err(err) = self.close_downgraded_leader_region(ctx).await { + let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer; + let region_id = ctx.region_id(); + warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode); + } + + Ok((Box::new(RegionMigrationEnd), Status::done())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl CloseDowngradedRegion { + /// Builds close region instruction. + /// + /// Abort(non-retry): + /// - Datanode Table is not found. + async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result { + let pc = &ctx.persistent_ctx; + let downgrade_leader_datanode_id = pc.from_peer.id; + let cluster_id = pc.cluster_id; + let table_id = pc.region_id.table_id(); + let region_number = pc.region_id.region_number(); + let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; + + let RegionInfo { engine, .. } = datanode_table_value.region_info.clone(); + + Ok(Instruction::CloseRegion(RegionIdent { + cluster_id, + datanode_id: downgrade_leader_datanode_id, + table_id, + region_number, + engine, + })) + } + + /// Closes the downgraded leader region. + async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> { + let close_instruction = self.build_close_region_instruction(ctx).await?; + let region_id = ctx.region_id(); + let pc = &ctx.persistent_ctx; + let downgrade_leader_datanode = &pc.from_peer; + let msg = MailboxMessage::json_message( + &format!("Close downgraded region: {}", region_id), + &format!("Meta@{}", ctx.server_addr()), + &format!( + "Datanode-{}@{}", + downgrade_leader_datanode.id, downgrade_leader_datanode.addr + ), + common_time::util::current_time_millis(), + &close_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: close_instruction.to_string(), + })?; + + let ch = Channel::Datanode(downgrade_leader_datanode.id); + let receiver = ctx + .mailbox + .send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT) + .await?; + + match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received close downgraded leade region reply: {:?}, region: {}", + reply, region_id + ); + let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect close region reply", + } + .fail(); + }; + + if result { + Ok(()) + } else { + error::UnexpectedSnafu { + violated: format!( + "Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}", + downgrade_leader_datanode, + ), + } + .fail() + } + } + + Err(e) => Err(e), + } + } +} diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 2058782396e8..2fe55edcab41 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -44,19 +44,21 @@ use store_api::storage::RegionId; use table::metadata::RawTableInfo; use tokio::sync::mpsc::{Receiver, Sender}; -use super::manager::RegionMigrationProcedureTracker; -use super::migration_abort::RegionMigrationAbort; -use super::upgrade_candidate_region::UpgradeCandidateRegion; -use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext}; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::error::{self, Error, Result}; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::metasrv::MetasrvInfo; +use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; +use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker; +use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; use crate::procedure::region_migration::update_metadata::UpdateMetadata; -use crate::procedure::region_migration::PersistentContext; +use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion; +use crate::procedure::region_migration::{ + Context, ContextFactory, DefaultContextFactory, PersistentContext, State, VolatileContext, +}; use crate::service::mailbox::{Channel, MailboxRef}; pub type MockHeartbeatReceiver = Receiver>; @@ -569,6 +571,14 @@ pub(crate) fn assert_region_migration_end(next: &dyn State) { let _ = next.as_any().downcast_ref::().unwrap(); } +/// Asserts the [State] should be [CloseDowngradedRegion]. +pub(crate) fn assert_close_downgraded_region(next: &dyn State) { + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); +} + /// Asserts the [State] should be [RegionMigrationAbort]. pub(crate) fn assert_region_migration_abort(next: &dyn State) { let _ = next diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index b026322a5df4..858669ea2136 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -23,9 +23,9 @@ use common_telemetry::warn; use serde::{Deserialize, Serialize}; use crate::error::Result; +use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; -use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] @@ -58,7 +58,7 @@ impl State for UpdateMetadata { if let Err(err) = ctx.invalidate_table_cache().await { warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}"); }; - Ok((Box::new(RegionMigrationEnd), Status::done())) + Ok((Box::new(CloseDowngradedRegion), Status::executing(false))) } UpdateMetadata::Rollback => { self.rollback_downgraded_region(ctx).await?; diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index b710a0e1f3e0..c180456bd47b 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -195,7 +195,7 @@ mod tests { use store_api::storage::RegionId; use crate::error::Error; - use crate::procedure::region_migration::migration_end::RegionMigrationEnd; + use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; use crate::procedure::region_migration::test_util::{self, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; @@ -443,7 +443,7 @@ mod tests { } #[tokio::test] - async fn test_next_migration_end_state() { + async fn test_next_close_downgraded_region_state() { let mut state = Box::new(UpdateMetadata::Upgrade); let env = TestingEnv::new(); let persistent_context = new_persistent_context(); @@ -471,7 +471,10 @@ mod tests { let (next, _) = state.next(&mut ctx).await.unwrap(); - let _ = next.as_any().downcast_ref::().unwrap(); + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); let table_route = table_metadata_manager .table_route_manager()