Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure table route metadata is eventually rolled back on failure #5174

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod close_downgraded_region;
pub(crate) mod downgrade_leader_region;
pub(crate) mod manager;
pub(crate) mod migration_abort;
Expand Down Expand Up @@ -43,6 +44,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
use common_telemetry::info;
use manager::RegionMigrationProcedureGuard;
pub use manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
Expand Down Expand Up @@ -91,7 +93,9 @@ impl PersistentContext {
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
TableLock::Read(region_id.table_id()).into(),
// The optimistic updating of table route is not working very well,
// so we need to use the write lock here.
TableLock::Write(region_id.table_id()).into(),
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
RegionLock::Write(region_id).into(),
];

Expand Down Expand Up @@ -253,7 +257,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
Expand Down Expand Up @@ -317,7 +321,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableInfo: {table_id}"),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?;
Expand Down Expand Up @@ -350,7 +354,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
})?
.context(error::DatanodeTableNotFoundSnafu {
Expand Down Expand Up @@ -474,6 +478,48 @@ impl RegionMigrationProcedure {
_guard: guard,
})
}

async fn rollback_inner(&mut self) -> Result<()> {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&["rollback"])
.start_timer();

let table_id = self.context.region_id().table_id();
let region_id = self.context.region_id();
self.context.remove_table_route_value();
let table_metadata_manager = self.context.table_metadata_manager.clone();
let table_route = self.context.get_table_route_value().await?;

// Safety: It must be a physical table route.
let downgraded = table_route
.region_routes()
.unwrap()
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.filter(|route| route.region.id == region_id)
.any(|route| route.is_leader_downgrading());

if downgraded {
info!("Rollbacking downgraded region leader table route, region: {region_id}");
table_metadata_manager
.update_leader_region_status(table_id, table_route, |route| {
if route.region.id == region_id {
Some(None)
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
})?;
}

self.context.register_failure_detectors().await;

Ok(())
}
}

#[async_trait::async_trait]
Expand All @@ -482,6 +528,16 @@ impl Procedure for RegionMigrationProcedure {
Self::TYPE_NAME
}

async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner()
.await
.map_err(ProcedureError::external)
}

fn rollback_supported(&self) -> bool {
true
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

Expand Down Expand Up @@ -707,6 +763,12 @@ mod tests {
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the close downgraded region",
None,
Assertion::simple(assert_close_downgraded_region, assert_no_persist),
),
// CloseDowngradedRegion
Step::next(
"Should be the region migration end",
None,
Expand Down Expand Up @@ -1077,6 +1139,12 @@ mod tests {
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the close downgraded region",
None,
Assertion::simple(assert_close_downgraded_region, assert_no_persist),
),
// CloseDowngradedRegion
Step::next(
"Should be the region migration end",
None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::time::Duration;

use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;

use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);

#[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion;

#[async_trait::async_trait]
#[typetag::serde]
impl State for CloseDowngradedRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
if let Err(err) = self.close_downgraded_leader_region(ctx).await {
let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
let region_id = ctx.region_id();
warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode);
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}

Ok((Box::new(RegionMigrationEnd), Status::done()))
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl CloseDowngradedRegion {
/// Builds close region instruction.
///
/// Abort(non-retry):
/// - Datanode Table is not found.
async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode_id = pc.from_peer.id;
let cluster_id = pc.cluster_id;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;

let RegionInfo { engine, .. } = datanode_table_value.region_info.clone();

Ok(Instruction::CloseRegion(RegionIdent {
cluster_id,
datanode_id: downgrade_leader_datanode_id,
table_id,
region_number,
engine,
}))
}

/// Closes the downgraded leader region.
async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
let close_instruction = self.build_close_region_instruction(ctx).await?;
let region_id = ctx.region_id();
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode = &pc.from_peer;
let msg = MailboxMessage::json_message(
&format!("Close downgraded region: {}", region_id),
&format!("Meta@{}", ctx.server_addr()),
&format!(
"Datanode-{}@{}",
downgrade_leader_datanode.id, downgrade_leader_datanode.addr
),
common_time::util::current_time_millis(),
&close_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: close_instruction.to_string(),
})?;

let ch = Channel::Datanode(downgrade_leader_datanode.id);
let receiver = ctx
.mailbox
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
.await?;

match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received close downgraded leade region reply: {:?}, region: {}",
reply, region_id
);
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect close region reply",
}
.fail();
};

if result {
Ok(())
} else {
error::UnexpectedSnafu {
violated: format!(
"Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}",
downgrade_leader_datanode,
),
}
.fail()
}
}

Err(e) => Err(e),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};

/// The behaviors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

Expand Down Expand Up @@ -145,7 +145,10 @@ impl OpenCandidateRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Received open region reply: {:?}", reply);
info!(
"Received open region reply: {:?}, region: {}",
reply, region_id
);
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
Expand Down
20 changes: 15 additions & 5 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@ use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use tokio::sync::mpsc::{Receiver, Sender};

use super::manager::RegionMigrationProcedureTracker;
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::PersistentContext;
use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
use crate::procedure::region_migration::{
Context, ContextFactory, DefaultContextFactory, PersistentContext, State, VolatileContext,
};
use crate::service::mailbox::{Channel, MailboxRef};

pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;
Expand Down Expand Up @@ -569,6 +571,14 @@ pub(crate) fn assert_region_migration_end(next: &dyn State) {
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}

/// Asserts the [State] should be [CloseDowngradedRegion].
pub(crate) fn assert_close_downgraded_region(next: &dyn State) {
let _ = next
.as_any()
.downcast_ref::<CloseDowngradedRegion>()
.unwrap();
}

/// Asserts the [State] should be [RegionMigrationAbort].
pub(crate) fn assert_region_migration_abort(next: &dyn State) {
let _ = next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use common_procedure::Status;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use crate::error::Result;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -58,7 +58,7 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
};
Ok((Box::new(RegionMigrationEnd), Status::done()))
Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;
Expand Down
Loading
Loading