From f50f2a84a9b47d9bc7d9ee68f23db865dd4bd18e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Sat, 7 Oct 2023 16:17:16 +0900 Subject: [PATCH] fix: open region missing options (#2473) * fix: open region missing options * refactor: remove redundant clone * chore: apply suggestions from CR * chore: apply suggestions * chore: apply suggestions * test: add test for initialize_region_server * feat: introduce RegionInfo --- src/cmd/src/cli/upgrade.rs | 9 +- src/common/meta/src/instruction.rs | 8 +- src/common/meta/src/key.rs | 50 +++++++---- src/common/meta/src/key/datanode_table.rs | 84 +++++++++--------- src/datanode/src/datanode.rs | 87 +++++++++++++++++-- src/datanode/src/heartbeat/handler.rs | 5 +- src/datanode/src/tests.rs | 58 +++++++++++++ src/meta-srv/src/procedure/region_failover.rs | 3 +- .../region_failover/activate_region.rs | 23 +++-- .../region_failover/deactivate_region.rs | 4 +- .../region_failover/update_metadata.rs | 36 ++++++-- 11 files changed, 281 insertions(+), 86 deletions(-) diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index c7bf1b693cf3..5cc4fb46ce07 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -20,7 +20,7 @@ use client::api::v1::meta::TableRouteValue; use common_meta::ddl::utils::region_storage_path; use common_meta::error as MetaError; use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue}; -use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; +use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo}; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::table_info::{TableInfoKey, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameValue}; @@ -405,8 +405,11 @@ impl MigrateTableMetadata { DatanodeTableValue::new( table_id, regions, - engine.to_string(), - region_storage_path.clone(), + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.clone(), + region_options: (&value.table_info.meta.options).into(), + }, ), ) }) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 304463c59fea..860575645ffc 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{Display, Formatter}; use serde::{Deserialize, Serialize}; @@ -73,13 +74,15 @@ impl Display for OpenRegion { pub struct OpenRegion { pub region_ident: RegionIdent, pub region_storage_path: String, + pub options: HashMap, } impl OpenRegion { - pub fn new(region_ident: RegionIdent, path: &str) -> Self { + pub fn new(region_ident: RegionIdent, path: &str, options: HashMap) -> Self { Self { region_ident, region_storage_path: path.to_string(), + options, } } } @@ -127,12 +130,13 @@ mod tests { engine: "mito2".to_string(), }, "test/foo", + HashMap::new(), )); let serialized = serde_json::to_string(&open_region).unwrap(); assert_eq!( - r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo"}}"#, + r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","options":{}}}"#, serialized ); diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index e0aa5c1ec9dc..a37d2e16f117 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -55,7 +55,7 @@ pub mod table_region; #[allow(deprecated)] pub mod table_route; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -69,6 +69,7 @@ use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; +use self::datanode_table::RegionInfo; use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; use crate::ddl::utils::region_storage_path; @@ -256,6 +257,7 @@ impl TableMetadataManager { .table_name_manager() .build_create_txn(&table_name, table_id)?; + let region_options = (&table_info.meta.options).into(); // Creates table info. let table_info_value = TableInfoValue::new(table_info); let (create_table_info_txn, on_create_table_info_failure) = self @@ -268,6 +270,7 @@ impl TableMetadataManager { table_id, &engine, ®ion_storage_path, + region_options, distribution, )?; @@ -446,10 +449,10 @@ impl TableMetadataManager { pub async fn update_table_route( &self, table_id: TableId, - engine: &str, - region_storage_path: &str, + region_info: RegionInfo, current_table_route_value: TableRouteValue, new_region_routes: Vec, + new_region_options: &HashMap, ) -> Result<()> { // Updates the datanode table key value pairs. let current_region_distribution = @@ -458,10 +461,10 @@ impl TableMetadataManager { let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( table_id, - engine, - region_storage_path, + region_info, current_region_distribution, new_region_distribution, + new_region_options, )?; // Updates the table_route. @@ -553,7 +556,7 @@ impl_optional_meta_value! { #[cfg(test)] mod tests { - use std::collections::BTreeMap; + use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use datatypes::prelude::ConcreteDataType; @@ -563,6 +566,7 @@ mod tests { use super::datanode_table::DatanodeTableKey; use crate::ddl::utils::region_storage_path; + use crate::key::datanode_table::RegionInfo; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; @@ -894,10 +898,14 @@ mod tests { table_metadata_manager .update_table_route( table_id, - engine, - ®ion_storage_path, + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.to_string(), + region_options: HashMap::new(), + }, current_table_route_value.clone(), new_region_routes.clone(), + &HashMap::new(), ) .await .unwrap(); @@ -907,10 +915,14 @@ mod tests { table_metadata_manager .update_table_route( table_id, - engine, - ®ion_storage_path, + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.to_string(), + region_options: HashMap::new(), + }, current_table_route_value.clone(), new_region_routes.clone(), + &HashMap::new(), ) .await .unwrap(); @@ -921,10 +933,14 @@ mod tests { table_metadata_manager .update_table_route( table_id, - engine, - ®ion_storage_path, + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.to_string(), + region_options: HashMap::new(), + }, current_table_route_value.clone(), new_region_routes.clone(), + &HashMap::new(), ) .await .unwrap(); @@ -941,10 +957,14 @@ mod tests { assert!(table_metadata_manager .update_table_route( table_id, - engine, - ®ion_storage_path, + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.to_string(), + region_options: HashMap::new(), + }, wrong_table_route_value, - new_region_routes + new_region_routes, + &HashMap::new(), ) .await .is_err()); diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 64053da75274..4b7d8201abc8 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use futures::stream::BoxStream; @@ -32,6 +33,21 @@ use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; use crate::DatanodeId; +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +/// RegionInfo +/// For compatible reason, DON'T modify the field name. +pub struct RegionInfo { + #[serde(default)] + // The table engine, it SHOULD be immutable after created. + pub engine: String, + // The region storage path, it SHOULD be immutable after created. + #[serde(default)] + pub region_storage_path: String, + // The region options. + #[serde(default)] + pub region_options: HashMap, +} + pub struct DatanodeTableKey { datanode_id: DatanodeId, table_id: TableId, @@ -85,25 +101,17 @@ impl TableMetaKey for DatanodeTableKey { pub struct DatanodeTableValue { pub table_id: TableId, pub regions: Vec, - #[serde(default)] - pub engine: String, - #[serde(default)] - pub region_storage_path: String, + #[serde(flatten)] + pub region_info: RegionInfo, version: u64, } impl DatanodeTableValue { - pub fn new( - table_id: TableId, - regions: Vec, - engine: String, - region_storage_path: String, - ) -> Self { + pub fn new(table_id: TableId, regions: Vec, region_info: RegionInfo) -> Self { Self { table_id, regions, - engine, - region_storage_path, + region_info, version: 0, } } @@ -156,6 +164,7 @@ impl DatanodeTableManager { table_id: TableId, engine: &str, region_storage_path: &str, + region_options: HashMap, distribution: RegionDistribution, ) -> Result { let txns = distribution @@ -165,8 +174,11 @@ impl DatanodeTableManager { let val = DatanodeTableValue::new( table_id, regions, - engine.to_string(), - region_storage_path.to_string(), + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.to_string(), + region_options: region_options.clone(), + }, ); Ok(TxnOp::Put(key.as_raw_key(), val.try_as_raw_value()?)) @@ -182,10 +194,10 @@ impl DatanodeTableManager { pub(crate) fn build_update_txn( &self, table_id: TableId, - engine: &str, - region_storage_path: &str, + region_info: RegionInfo, current_region_distribution: RegionDistribution, new_region_distribution: RegionDistribution, + new_region_options: &HashMap, ) -> Result { let mut opts = Vec::new(); @@ -197,33 +209,20 @@ impl DatanodeTableManager { opts.push(TxnOp::Delete(raw_key)) } } - + let need_update_options = region_info.region_options != *new_region_options; for (datanode, regions) in new_region_distribution.into_iter() { - if let Some(current_region) = current_region_distribution.get(&datanode) { - // Updates if need. - if *current_region != regions { - let key = DatanodeTableKey::new(datanode, table_id); - let raw_key = key.as_raw_key(); - let val = DatanodeTableValue::new( - table_id, - regions, - engine.to_string(), - region_storage_path.to_string(), - ) - .try_as_raw_value()?; - opts.push(TxnOp::Put(raw_key, val)); - } - } else { - // New datanodes + let need_update = + if let Some(current_region) = current_region_distribution.get(&datanode) { + // Updates if need. + *current_region != regions || need_update_options + } else { + true + }; + if need_update { let key = DatanodeTableKey::new(datanode, table_id); let raw_key = key.as_raw_key(); - let val = DatanodeTableValue::new( - table_id, - regions, - engine.to_string(), - region_storage_path.to_string(), - ) - .try_as_raw_value()?; + let val = DatanodeTableValue::new(table_id, regions, region_info.clone()) + .try_as_raw_value()?; opts.push(TxnOp::Put(raw_key, val)); } } @@ -270,11 +269,10 @@ mod tests { let value = DatanodeTableValue { table_id: 42, regions: vec![1, 2, 3], - engine: Default::default(), - region_storage_path: Default::default(), + region_info: RegionInfo::default(), version: 1, }; - let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","version":1}"#; + let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"version":1}"#; let raw_value = value.try_as_raw_value().unwrap(); assert_eq!(raw_value, literal); diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 885ed8bbddb5..387a66426f24 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -14,7 +14,6 @@ //! Datanode implementation. -use std::collections::HashMap; use std::path::Path; use std::sync::Arc; @@ -286,8 +285,9 @@ impl DatanodeBuilder { for region_number in table_value.regions { regions.push(( RegionId::new(table_value.table_id, region_number), - table_value.engine.clone(), - table_value.region_storage_path.clone(), + table_value.region_info.engine.clone(), + table_value.region_info.region_storage_path.clone(), + table_value.region_info.region_options.clone(), )); } } @@ -296,7 +296,7 @@ impl DatanodeBuilder { let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); let mut tasks = vec![]; - for (region_id, engine, store_path) in regions { + for (region_id, engine, store_path, options) in regions { let region_dir = region_dir(&store_path, region_id); let semaphore_moved = semaphore.clone(); tasks.push(async move { @@ -307,7 +307,7 @@ impl DatanodeBuilder { RegionRequest::Open(RegionOpenRequest { engine: engine.clone(), region_dir, - options: HashMap::new(), + options, }), ) .await?; @@ -410,3 +410,80 @@ impl DatanodeBuilder { Ok(engines) } } + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::collections::{BTreeMap, HashMap}; + use std::sync::Arc; + + use common_base::Plugins; + use common_meta::key::datanode_table::DatanodeTableManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::KvBackendRef; + use store_api::region_request::RegionRequest; + use store_api::storage::RegionId; + + use crate::config::DatanodeOptions; + use crate::datanode::DatanodeBuilder; + use crate::tests::{mock_region_server, MockRegionEngine}; + + async fn setup_table_datanode(kv: &KvBackendRef) { + let mgr = DatanodeTableManager::new(kv.clone()); + let txn = mgr + .build_create_txn( + 1028, + "mock", + "foo/bar/weny", + HashMap::from([("foo".to_string(), "bar".to_string())]), + BTreeMap::from([(0, vec![0, 1, 2])]), + ) + .unwrap(); + + let r = kv.txn(txn).await.unwrap(); + assert!(r.succeeded); + } + + #[tokio::test] + async fn test_initialize_region_server() { + let mut mock_region_server = mock_region_server(); + let (mock_region, mut mock_region_handler) = MockRegionEngine::new(); + + mock_region_server.register_engine(mock_region.clone()); + + let builder = DatanodeBuilder::new( + DatanodeOptions { + node_id: Some(0), + ..Default::default() + }, + None, + Arc::new(Plugins::default()), + ); + + let kv = Arc::new(MemoryKvBackend::default()) as _; + setup_table_datanode(&kv).await; + + builder + .initialize_region_server(&mock_region_server, kv.clone(), false) + .await + .unwrap(); + + for i in 0..3 { + let (region_id, req) = mock_region_handler.recv().await.unwrap(); + assert_eq!(region_id, RegionId::new(1028, i)); + if let RegionRequest::Open(req) = req { + assert_eq!( + req.options, + HashMap::from([("foo".to_string(), "bar".to_string())]) + ) + } else { + unreachable!() + } + } + + assert_matches!( + mock_region_handler.try_recv(), + Err(tokio::sync::mpsc::error::TryRecvError::Empty) + ); + } +} diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 026245259cec..c4a2d57d07ad 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use async_trait::async_trait; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -49,12 +47,13 @@ impl RegionHeartbeatResponseHandler { Instruction::OpenRegion(OpenRegion { region_ident, region_storage_path, + options, }) => { let region_id = Self::region_ident_to_region_id(®ion_ident); let open_region_req = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, region_dir: region_dir(®ion_storage_path, region_id), - options: HashMap::new(), + options, }); Ok((region_id, open_region_req)) } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 43a06b34db96..7fd8a8a98fc8 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -13,10 +13,12 @@ // limitations under the License. use std::any::Any; +use std::collections::HashMap; use std::sync::Arc; use api::v1::meta::HeartbeatResponse; use async_trait::async_trait; +use common_error::ext::BoxedError; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::FunctionRef; use common_meta::heartbeat::handler::{ @@ -26,6 +28,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; use common_meta::instruction::{Instruction, OpenRegion, RegionIdent}; use common_query::prelude::ScalarUdf; use common_query::Output; +use common_recordbatch::SendableRecordBatchStream; use common_runtime::Runtime; use query::dataframe::DataFrame; use query::plan::LogicalPlan; @@ -33,7 +36,12 @@ use query::planner::LogicalPlanner; use query::query_engine::DescribeResult; use query::QueryEngine; use session::context::QueryContextRef; +use store_api::metadata::RegionMetadataRef; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; +use tokio::sync::mpsc::{Receiver, Sender}; use crate::event_listener::NoopRegionServerEventListener; use crate::region_server::RegionServer; @@ -79,6 +87,7 @@ fn open_region_instruction() -> Instruction { engine: "mito2".to_string(), }, "path/dir", + HashMap::new(), )) } @@ -129,3 +138,52 @@ pub fn mock_region_server() -> RegionServer { Box::new(NoopRegionServerEventListener), ) } + +pub struct MockRegionEngine { + sender: Sender<(RegionId, RegionRequest)>, +} + +impl MockRegionEngine { + pub fn new() -> (Arc, Receiver<(RegionId, RegionRequest)>) { + let (tx, rx) = tokio::sync::mpsc::channel(8); + + (Arc::new(Self { sender: tx }), rx) + } +} + +#[async_trait::async_trait] +impl RegionEngine for MockRegionEngine { + fn name(&self) -> &str { + "mock" + } + + async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { + let _ = self.sender.send((region_id, request)).await; + + Ok(Output::AffectedRows(0)) + } + + async fn handle_query( + &self, + _region_id: RegionId, + _request: ScanRequest, + ) -> Result { + unimplemented!() + } + + async fn get_metadata(&self, _region_id: RegionId) -> Result { + unimplemented!() + } + + async fn stop(&self) -> Result<(), BoxedError> { + Ok(()) + } + + fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> { + Ok(()) + } +} diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index e30816e6c70c..88f3e0434559 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -630,7 +630,8 @@ mod tests { Some(Payload::Json( serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new( opening_region, - &path + &path, + HashMap::new(), ))) .unwrap(), )) diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index 9b78ecbfac3d..69158c2476a5 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::time::Duration; use api::v1::meta::MailboxMessage; @@ -42,6 +43,7 @@ pub(super) struct ActivateRegion { // to prevent it from renewing the lease. remark_inactive_region: bool, region_storage_path: Option, + region_options: Option>, } impl ActivateRegion { @@ -50,6 +52,7 @@ impl ActivateRegion { candidate, remark_inactive_region: false, region_storage_path: None, + region_options: None, } } @@ -77,14 +80,15 @@ impl ActivateRegion { ..failed_region.clone() }; info!("Activating region: {candidate_ident:?}"); - + let region_options: HashMap = (&table_info.meta.options).into(); let instruction = Instruction::OpenRegion(OpenRegion::new( candidate_ident.clone(), ®ion_storage_path, + region_options.clone(), )); self.region_storage_path = Some(region_storage_path); - + self.region_options = Some(region_options); let msg = MailboxMessage::json_message( "Activate Region", &format!("Metasrv@{}", ctx.selector_ctx.server_addr), @@ -139,6 +143,11 @@ impl ActivateRegion { .context(error::UnexpectedSnafu { violated: "expected region_storage_path", })?, + self.region_options + .clone() + .context(error::UnexpectedSnafu { + violated: "expected region_options", + })?, ))) } else { // The region could be just indeed cannot be opened by the candidate, retry @@ -193,6 +202,8 @@ impl State for ActivateRegion { #[cfg(test)] mod tests { + use std::collections::HashMap; + use api::v1::meta::mailbox_message::Payload; use common_meta::instruction::SimpleReply; @@ -231,7 +242,8 @@ mod tests { datanode_id: candidate, ..failed_region.clone() }, - &env.path + &env.path, + HashMap::new(), ))) .unwrap(), )) @@ -266,7 +278,7 @@ mod tests { .unwrap(); assert_eq!( format!("{next_state:?}"), - r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public" }"# + r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {} }"# ); } @@ -300,7 +312,8 @@ mod tests { datanode_id: candidate, ..failed_region.clone() }, - &env.path + &env.path, + HashMap::new(), ))) .unwrap(), )) diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index 12aca79f6182..c3d177cce988 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -226,7 +226,7 @@ mod tests { .unwrap(); assert_eq!( format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None }"# + r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None }"# ); } @@ -268,7 +268,7 @@ mod tests { // Timeout or not, proceed to `ActivateRegion`. assert_eq!( format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None }"# + r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None }"# ); } } diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 6bb352849f38..98fb6ac557ac 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use async_trait::async_trait; +use common_meta::key::datanode_table::RegionInfo; use common_meta::key::table_route::TableRouteKey; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; @@ -31,13 +34,19 @@ use crate::lock::Opts; pub(super) struct UpdateRegionMetadata { candidate: Peer, region_storage_path: String, + region_options: HashMap, } impl UpdateRegionMetadata { - pub(super) fn new(candidate: Peer, region_storage_path: String) -> Self { + pub(super) fn new( + candidate: Peer, + region_storage_path: String, + region_options: HashMap, + ) -> Self { Self { candidate, region_storage_path, + region_options, } } @@ -90,10 +99,14 @@ impl UpdateRegionMetadata { ctx.table_metadata_manager .update_table_route( table_id, - engine, - &self.region_storage_path, + RegionInfo { + engine: engine.to_string(), + region_storage_path: self.region_storage_path.to_string(), + region_options: self.region_options.clone(), + }, table_route_value, new_region_routes, + &self.region_options, ) .await .context(error::UpdateTableRouteSnafu)?; @@ -174,7 +187,8 @@ mod tests { let env = TestingEnvBuilder::new().build().await; let failed_region = env.failed_region(1).await; - let mut state = UpdateRegionMetadata::new(Peer::new(2, ""), env.path.clone()); + let mut state = + UpdateRegionMetadata::new(Peer::new(2, ""), env.path.clone(), HashMap::new()); let next_state = state.next(&env.context, &failed_region).await.unwrap(); assert_eq!(format!("{next_state:?}"), "InvalidateCache"); @@ -187,7 +201,11 @@ mod tests { async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> Vec { let failed_region = env.failed_region(failed_region).await; - let state = UpdateRegionMetadata::new(Peer::new(candidate, ""), env.path.clone()); + let state = UpdateRegionMetadata::new( + Peer::new(candidate, ""), + env.path.clone(), + HashMap::new(), + ); state .update_table_route(&env.context, &failed_region) .await @@ -328,14 +346,18 @@ mod tests { let path = env.path.clone(); let _ = futures::future::join_all(vec![ tokio::spawn(async move { - let state = UpdateRegionMetadata::new(Peer::new(2, ""), path); + let state = UpdateRegionMetadata::new(Peer::new(2, ""), path, HashMap::new()); state .update_metadata(&ctx_1, &failed_region_1) .await .unwrap(); }), tokio::spawn(async move { - let state = UpdateRegionMetadata::new(Peer::new(3, ""), env.path.clone()); + let state = UpdateRegionMetadata::new( + Peer::new(3, ""), + env.path.clone(), + HashMap::new(), + ); state .update_metadata(&ctx_2, &failed_region_2) .await