Skip to content

Commit

Permalink
feat: add update metadata step for rollbacking downgraded region (#2812)
Browse files Browse the repository at this point in the history
* feat: add update metadata step for rollbacking downgraded region

* feat: invalidate table cache after updating metadata

* feat: add migration abort step
  • Loading branch information
WenyXu authored Dec 1, 2023
1 parent 7e68ecc commit 781f242
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 17 deletions.
6 changes: 5 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("The region migration procedure aborted, reason: {}", reason))]
MigrationAbort { location: Location, reason: String },

#[snafu(display(
"Another procedure is opening the region: {} on peer: {}",
region_id,
Expand Down Expand Up @@ -665,7 +668,8 @@ impl ErrorExt for Error {
| Error::Txn { .. }
| Error::TableIdChanged { .. }
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. } => StatusCode::Unexpected,
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Expand Down
13 changes: 13 additions & 0 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,19 @@ impl HeartbeatMailbox {
serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
}

/// Parses the [Instruction] from [MailboxMessage].
#[cfg(test)]
pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
let Payload::Json(payload) =
msg.payload
.as_ref()
.with_context(|| UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: format!("empty payload, msg: {msg:?}"),
})?;
serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
}

pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
let mailbox = Arc::new(Self::new(pushers, sequence));

Expand Down
53 changes: 52 additions & 1 deletion src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub(crate) mod downgrade_leader_region;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
pub(crate) mod migration_start;
pub(crate) mod open_candidate_region;
Expand All @@ -25,6 +26,8 @@ use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;

use api::v1::meta::MailboxMessage;
use common_meta::instruction::Instruction;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
Expand All @@ -43,7 +46,7 @@ use self::migration_start::RegionMigrationStart;
use crate::error::{self, Error, Result};
use crate::procedure::utils::region_lock_key;
use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef};
use crate::service::mailbox::MailboxRef;
use crate::service::mailbox::{BroadcastChannel, MailboxRef};

/// It's shared in each step and available even after recovering.
///
Expand Down Expand Up @@ -235,6 +238,27 @@ impl Context {
pub fn region_id(&self) -> RegionId {
self.persistent_ctx.region_id
}

/// Broadcasts the invalidate table cache message.
pub async fn invalidate_table_cache(&self) -> Result<()> {
let table_id = self.region_id().table_id();
let instruction = Instruction::InvalidateTableIdCache(table_id);

let msg = &MailboxMessage::json_message(
"Invalidate Table Cache",
&format!("Metasrv@{}", self.server_addr()),
"Frontend broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;

self.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.await
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -346,7 +370,9 @@ mod tests {

use super::migration_end::RegionMigrationEnd;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::service::mailbox::Channel;

fn new_persistent_context() -> PersistentContext {
PersistentContext {
Expand Down Expand Up @@ -446,4 +472,29 @@ mod tests {
assert_eq!(procedure.context.persistent_ctx.cluster_id, 2);
assert_matches!(status.unwrap(), Status::Done);
}

#[tokio::test]
async fn test_broadcast_invalidate_table_cache() {
let mut env = TestingEnv::new();
let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();

// No receivers.
ctx.invalidate_table_cache().await.unwrap();

let (tx, mut rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
.await;

ctx.invalidate_table_cache().await.unwrap();

let resp = rx.recv().await.unwrap().unwrap();
let msg = resp.mailbox_message.unwrap();

let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;

drop(rx);
Expand Down Expand Up @@ -306,7 +306,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;

// Sends an incorrect reply.
Expand Down Expand Up @@ -336,7 +336,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;

send_mock_reply(mailbox, rx, |id| {
Expand Down Expand Up @@ -367,7 +367,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;

send_mock_reply(mailbox, rx, |id| {
Expand Down Expand Up @@ -404,7 +404,7 @@ mod tests {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;

common_runtime::spawn_bg(async move {
Expand Down Expand Up @@ -453,7 +453,7 @@ mod tests {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;

common_runtime::spawn_bg(async move {
Expand Down Expand Up @@ -496,7 +496,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;

send_mock_reply(mailbox, rx, |id| {
Expand Down
54 changes: 54 additions & 0 deletions src/meta-srv/src/procedure/region_migration/migration_abort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_procedure::Status;
use serde::{Deserialize, Serialize};

use crate::error::{self, Result};
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationAbort {
reason: String,
}

impl RegionMigrationAbort {
/// Returns the [RegionMigrationAbort] with `reason`.
pub fn new(reason: &str) -> Self {
Self {
reason: reason.to_string(),
}
}
}

#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationAbort {
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
error::MigrationAbortSnafu {
reason: &self.reason,
}
.fail()
}

fn status(&self) -> Status {
Status::Done
}

fn as_any(&self) -> &dyn Any {
self
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;

// Sends an incorrect reply.
Expand Down Expand Up @@ -337,7 +337,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;

// Sends an timeout error.
Expand Down Expand Up @@ -372,7 +372,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;

send_mock_reply(mailbox, rx, |id| {
Expand Down Expand Up @@ -424,7 +424,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;

send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
Expand Down
5 changes: 2 additions & 3 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_procedure_test::MockContextProvider;
use common_time::util::current_time_millis;
Expand Down Expand Up @@ -55,10 +54,10 @@ impl MailboxContext {
/// Inserts a pusher for `datanode_id`
pub async fn insert_heartbeat_response_receiver(
&mut self,
datanode_id: DatanodeId,
channel: Channel,
tx: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) {
let pusher_id = Channel::Datanode(datanode_id).pusher_id();
let pusher_id = channel.pusher_id();
let pusher = Pusher::new(tx, &RequestHeader::default());
let _ = self.pushers.insert(pusher_id, pusher).await;
}
Expand Down
19 changes: 18 additions & 1 deletion src/meta-srv/src/procedure/region_migration/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
// limitations under the License.

pub(crate) mod downgrade_leader_region;
pub(crate) mod rollback_downgraded_region;
pub(crate) mod upgrade_candidate_region;

use std::any::Any;

use common_telemetry::warn;
use serde::{Deserialize, Serialize};

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use crate::error::Result;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
Expand All @@ -31,6 +34,8 @@ pub enum UpdateMetadata {
Downgrade,
/// Upgrade the candidate region.
Upgrade,
/// Rollback the downgraded leader region.
Rollback,
}

#[async_trait::async_trait]
Expand All @@ -46,9 +51,21 @@ impl State for UpdateMetadata {
UpdateMetadata::Upgrade => {
self.upgrade_candidate_region(ctx).await?;

// TODO(weny): invalidate fe cache.
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
};
Ok(Box::new(RegionMigrationEnd))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;

if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}");
};
Ok(Box::new(RegionMigrationAbort::new(
"Failed to upgrade the candidate region.",
)))
}
}
}

Expand Down
Loading

0 comments on commit 781f242

Please sign in to comment.