diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 7efffaac61f9..5fefa25666d5 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -25,6 +25,7 @@ use common_telemetry::{error, info, warn}; use common_wal::options::WalOptions; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use tokio::time::{sleep, Instant}; use super::update_metadata::UpdateMetadata; @@ -107,6 +108,23 @@ impl DowngradeLeaderRegion { }) } + async fn should_reject_write(ctx: &mut Context, region_id: RegionId) -> Result { + let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; + let reject_write = if let Some(wal_option) = datanode_table_value + .region_info + .region_wal_options + .get(®ion_id.region_number()) + { + let options: WalOptions = serde_json::from_str(wal_option) + .with_context(|_| error::DeserializeFromJsonSnafu { input: wal_option })?; + matches!(options, WalOptions::RaftEngine) + } else { + true + }; + + Ok(reject_write) + } + /// Tries to downgrade a leader region. /// /// Retry: @@ -127,20 +145,7 @@ impl DowngradeLeaderRegion { .context(error::ExceededDeadlineSnafu { operation: "Downgrade region", })?; - - let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; - let reject_write = if let Some(wal_option) = datanode_table_value - .region_info - .region_wal_options - .get(®ion_id.region_number()) - { - let options: WalOptions = serde_json::from_str(wal_option) - .with_context(|_| error::DeserializeFromJsonSnafu { input: wal_option })?; - matches!(options, WalOptions::RaftEngine) - } else { - true - }; - + let reject_write = Self::should_reject_write(ctx, region_id).await?; let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout, reject_write); @@ -263,6 +268,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; + use common_wal::options::KafkaWalOptions; use store_api::storage::RegionId; use tokio::time::Instant; @@ -298,7 +304,7 @@ mod tests { assert!(!err.is_retryable()); } - async fn prepare_table_metadata(ctx: &Context) { + async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { let table_info = new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into(); let region_routes = vec![RegionRoute { @@ -311,12 +317,47 @@ mod tests { .create_table_metadata( table_info, TableRouteValue::physical(region_routes), - HashMap::default(), + wal_options, ) .await .unwrap(); } + #[tokio::test] + async fn test_should_reject_writes() { + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let wal_options = + HashMap::from([(1, serde_json::to_string(&WalOptions::RaftEngine).unwrap())]); + prepare_table_metadata(&ctx, wal_options).await; + + let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id) + .await + .unwrap(); + assert!(reject_write); + + // Remote WAL + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let wal_options = HashMap::from([( + 1, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "my_topic".to_string(), + })) + .unwrap(), + )]); + prepare_table_metadata(&ctx, wal_options).await; + + let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id) + .await + .unwrap(); + assert!(!reject_write); + } + #[tokio::test] async fn test_pusher_dropped() { let state = DowngradeLeaderRegion::default(); @@ -325,7 +366,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - prepare_table_metadata(&ctx).await; + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -348,7 +389,7 @@ mod tests { let persistent_context = new_persistent_context(); let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - prepare_table_metadata(&ctx).await; + prepare_table_metadata(&ctx, HashMap::default()).await; ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1); let err = state.downgrade_region(&mut ctx).await.unwrap_err(); @@ -372,7 +413,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - prepare_table_metadata(&ctx).await; + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -399,7 +440,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - prepare_table_metadata(&ctx).await; + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -427,7 +468,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - prepare_table_metadata(&ctx).await; + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -461,7 +502,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - prepare_table_metadata(&ctx).await; + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -554,7 +595,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - prepare_table_metadata(&ctx).await; + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone();