From 2b181e91e03d69017ca523e758019fee6d15fd11 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 2 Jan 2024 16:40:02 +0900 Subject: [PATCH] refactor: unify the injection of WAL option (#3066) * feat: add prepare_wal_option * refactor: use integer hashmap * feat: unify the injection of WAL option * fix: fix procedure_flow_upgrade_candidate_with_retry * chore: apply suggestions from CR --- src/common/meta/src/ddl/create_table.rs | 10 +-- src/common/meta/src/instruction.rs | 6 +- src/common/meta/src/key.rs | 2 +- src/common/meta/src/key/datanode_table.rs | 75 ++++++++++++++++--- src/common/meta/src/wal.rs | 13 ++++ src/datanode/src/datanode.rs | 15 ++-- src/meta-srv/src/error.rs | 8 ++ .../region_failover/activate_region.rs | 33 ++++---- .../region_failover/update_metadata.rs | 5 +- .../src/procedure/region_migration.rs | 75 ++++++++++++++----- .../region_migration/migration_start.rs | 12 +-- .../region_migration/open_candidate_region.rs | 31 ++++---- .../upgrade_candidate_region.rs | 22 ++---- 13 files changed, 204 insertions(+), 103 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index c6e09006b470..b480c82acdfd 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -20,7 +20,6 @@ use api::v1::region::{ }; use api::v1::{ColumnDef, SemanticType}; use async_trait::async_trait; -use common_config::WAL_OPTIONS_KEY; use common_error::ext::BoxedError; use common_procedure::error::{ ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -48,6 +47,7 @@ use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{ find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, }; +use crate::wal::prepare_wal_option; pub struct CreateTableProcedure { pub context: DdlContext, @@ -455,13 +455,7 @@ impl CreateRequestBuilder { request.region_id = region_id.as_u64(); request.path = storage_path; // Stores the encoded wal options into the request options. - region_wal_options - .get(®ion_id.region_number()) - .and_then(|wal_options| { - request - .options - .insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) - }); + prepare_wal_option(&mut request.options, region_id, region_wal_options); if let Some(physical_table_id) = self.physical_table_id { // Logical table has the same region numbers with physical table, and they have a one-to-one mapping. diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 8c0ba9ecfa7a..d0f5c9a27d1a 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -92,13 +92,15 @@ impl Display for OpenRegion { } } +#[serde_with::serde_as] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct OpenRegion { pub region_ident: RegionIdent, pub region_storage_path: String, pub region_options: HashMap, #[serde(default)] - pub region_wal_options: HashMap, + #[serde_as(as = "HashMap")] + pub region_wal_options: HashMap, #[serde(default)] pub skip_wal_replay: bool, } @@ -108,7 +110,7 @@ impl OpenRegion { region_ident: RegionIdent, path: &str, region_options: HashMap, - region_wal_options: HashMap, + region_wal_options: HashMap, skip_wal_replay: bool, ) -> Self { Self { diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 57de421be202..abe116f973e5 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -604,7 +604,7 @@ impl TableMetadataManager { current_table_route_value: &DeserializedValueWithBytes, new_region_routes: Vec, new_region_options: &HashMap, - new_region_wal_options: &HashMap, + new_region_wal_options: &HashMap, ) -> Result<()> { // Updates the datanode table key value pairs. let current_region_distribution = diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index b2e25e014bc8..3000d56a5135 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -34,6 +34,7 @@ use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; use crate::DatanodeId; +#[serde_with::serde_as] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] /// RegionInfo /// For compatible reason, DON'T modify the field name. @@ -48,14 +49,15 @@ pub struct RegionInfo { #[serde(default)] pub region_options: HashMap, /// The per-region wal options. - /// Key: region number (in string representation). Value: the encoded wal options of the region. + /// Key: region number. Value: the encoded wal options of the region. #[serde(default)] - pub region_wal_options: HashMap, + #[serde_as(as = "HashMap")] + pub region_wal_options: HashMap, } pub struct DatanodeTableKey { - datanode_id: DatanodeId, - table_id: TableId, + pub datanode_id: DatanodeId, + pub table_id: TableId, } impl DatanodeTableKey { @@ -181,7 +183,7 @@ impl DatanodeTableManager { .filter_map(|region_number| { region_wal_options .get(region_number) - .map(|wal_options| (region_number.to_string(), wal_options.clone())) + .map(|wal_options| (*region_number, wal_options.clone())) }) .collect(); @@ -214,7 +216,7 @@ impl DatanodeTableManager { current_region_distribution: RegionDistribution, new_region_distribution: RegionDistribution, new_region_options: &HashMap, - new_region_wal_options: &HashMap, + new_region_wal_options: &HashMap, ) -> Result { let mut opts = Vec::new(); @@ -306,6 +308,61 @@ mod tests { assert!(parsed.is_ok()); } + #[derive(Debug, Serialize, Deserialize, PartialEq)] + struct StringHashMap { + inner: HashMap, + } + + #[serde_with::serde_as] + #[derive(Debug, Serialize, Deserialize, PartialEq)] + struct IntegerHashMap { + #[serde_as(as = "HashMap")] + inner: HashMap, + } + + #[test] + fn test_serde_with_integer_hash_map() { + let map = StringHashMap { + inner: HashMap::from([ + ("1".to_string(), "aaa".to_string()), + ("2".to_string(), "bbb".to_string()), + ("3".to_string(), "ccc".to_string()), + ]), + }; + let encoded = serde_json::to_string(&map).unwrap(); + let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap(); + assert_eq!( + IntegerHashMap { + inner: HashMap::from([ + (1, "aaa".to_string()), + (2, "bbb".to_string()), + (3, "ccc".to_string()), + ]), + }, + decoded + ); + + let map = IntegerHashMap { + inner: HashMap::from([ + (1, "aaa".to_string()), + (2, "bbb".to_string()), + (3, "ccc".to_string()), + ]), + }; + let encoded = serde_json::to_string(&map).unwrap(); + let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap(); + assert_eq!( + StringHashMap { + inner: HashMap::from([ + ("1".to_string(), "aaa".to_string()), + ("2".to_string(), "bbb".to_string()), + ("3".to_string(), "ccc".to_string()), + ]), + }, + decoded + ); + } + // This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str` // and `serde_json::to_vec` + `serde_json::from_slice` work for `DatanodeTableValue`. // Warning: if the key of `region_wal_options` is of type non-String, this test would fail. @@ -320,9 +377,9 @@ mod tests { ("c".to_string(), "cc".to_string()), ]), region_wal_options: HashMap::from([ - ("1".to_string(), "aaa".to_string()), - ("2".to_string(), "bbb".to_string()), - ("3".to_string(), "ccc".to_string()), + (1, "aaa".to_string()), + (2, "bbb".to_string()), + (3, "ccc".to_string()), ]), }; let table_value = DatanodeTableValue { diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 853c6fa5df63..c7af1d64d306 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -18,8 +18,11 @@ pub mod options_allocator; use std::collections::HashMap; use common_config::wal::StandaloneWalConfig; +use common_config::WAL_OPTIONS_KEY; +use common_telemetry::warn; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; +use store_api::storage::{RegionId, RegionNumber}; use crate::error::Result; use crate::wal::kafka::KafkaConfig; @@ -55,6 +58,16 @@ impl From for WalConfig { } } +pub fn prepare_wal_option( + options: &mut HashMap, + region_id: RegionId, + region_wal_options: &HashMap, +) { + if let Some(wal_options) = region_wal_options.get(®ion_id.region_number()) { + options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()); + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 0a0206eddc66..12c28fb16173 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -22,11 +22,12 @@ use std::sync::Arc; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; use common_config::wal::{KafkaConfig, RaftEngineConfig}; -use common_config::{WalConfig, WAL_OPTIONS_KEY}; +use common_config::WalConfig; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; use common_meta::kv_backend::KvBackendRef; +use common_meta::wal::prepare_wal_option; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; use common_telemetry::{error, info, warn}; @@ -538,13 +539,11 @@ async fn open_all_regions( for region_number in table_value.regions { // Augments region options with wal options if a wal options is provided. let mut region_options = table_value.region_info.region_options.clone(); - table_value - .region_info - .region_wal_options - .get(®ion_number.to_string()) - .and_then(|wal_options| { - region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) - }); + prepare_wal_option( + &mut region_options, + RegionId::new(table_value.table_id, region_number), + &table_value.region_info.region_wal_options, + ); regions.push(( RegionId::new(table_value.table_id, region_number), diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 5272c3abe77a..f0c29a46df7f 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -327,6 +327,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))] + DatanodeTableNotFound { + table_id: TableId, + datanode_id: DatanodeId, + location: Location, + }, + #[snafu(display("Table route corrupted, key: {}, reason: {}", key, reason))] CorruptedTableRoute { key: String, @@ -683,6 +690,7 @@ impl ErrorExt for Error { | Error::InvalidRegionKeyFromUtf8 { .. } | Error::TableRouteNotFound { .. } | Error::TableInfoNotFound { .. } + | Error::DatanodeTableNotFound { .. } | Error::CorruptedTableRoute { .. } | Error::MoveValue { .. } | Error::InvalidUtf8Value { .. } diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index a107ebfb0315..a2b1c8fd9303 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -17,13 +17,14 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use async_trait::async_trait; -use common_meta::ddl::utils::region_storage_path; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; +use common_meta::key::datanode_table::{DatanodeTableKey, RegionInfo}; use common_meta::peer::Peer; use common_meta::RegionIdent; use common_telemetry::{debug, info}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionNumber; use super::update_metadata::UpdateRegionMetadata; use super::{RegionFailoverContext, State}; @@ -44,7 +45,7 @@ pub(super) struct ActivateRegion { // An `None` option stands for uninitialized. region_storage_path: Option, region_options: Option>, - region_wal_options: Option>, + region_wal_options: Option>, } impl ActivateRegion { @@ -65,27 +66,31 @@ impl ActivateRegion { timeout: Duration, ) -> Result { let table_id = failed_region.table_id; - let table_info = ctx + // Retrieves the wal options from failed datanode table value. + let datanode_table_value = ctx .table_metadata_manager - .table_info_manager() - .get(table_id) + .datanode_table_manager() + .get(&DatanodeTableKey::new(failed_region.datanode_id, table_id)) .await .context(error::TableMetadataManagerSnafu)? - .context(error::TableInfoNotFoundSnafu { table_id })? - .into_inner() - .table_info; - - let region_storage_path = - region_storage_path(&table_info.catalog_name, &table_info.schema_name); + .context(error::DatanodeTableNotFoundSnafu { + table_id, + datanode_id: failed_region.datanode_id, + })?; let candidate_ident = RegionIdent { datanode_id: self.candidate.id, ..failed_region.clone() }; info!("Activating region: {candidate_ident:?}"); - let region_options: HashMap = (&table_info.meta.options).into(); - // TODO(niebayes): properly fetch or construct region wal options. - let region_wal_options = HashMap::new(); + + let RegionInfo { + region_storage_path, + region_options, + region_wal_options, + .. + } = datanode_table_value.region_info; + let instruction = Instruction::OpenRegion(OpenRegion::new( candidate_ident.clone(), ®ion_storage_path, diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index c2218c6afede..254ba3e8107d 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -23,6 +23,7 @@ use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionNumber; use super::invalidate_cache::InvalidateCache; use super::{RegionFailoverContext, State}; @@ -36,7 +37,7 @@ pub(super) struct UpdateRegionMetadata { region_storage_path: String, region_options: HashMap, #[serde(default)] - region_wal_options: HashMap, + region_wal_options: HashMap, } impl UpdateRegionMetadata { @@ -44,7 +45,7 @@ impl UpdateRegionMetadata { candidate: Peer, region_storage_path: String, region_options: HashMap, - region_wal_options: HashMap, + region_wal_options: HashMap, ) -> Self { Self { candidate, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index b187a026723a..fbbf19822555 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -31,6 +31,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::instruction::Instruction; +use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; @@ -90,6 +91,8 @@ pub struct VolatileContext { opening_region_guard: Option, /// `table_route` is stored via previous steps for future use. table_route: Option>, + /// `datanode_table` is stored via previous steps for future use. + from_peer_datanode_table: Option, /// `table_info` is stored via previous steps for future use. /// /// `table_info` should remain unchanged during the procedure; @@ -250,6 +253,42 @@ impl Context { Ok(table_info_value.as_ref().unwrap()) } + /// Returns the `table_info` of [VolatileContext] if any. + /// Otherwise, returns the value retrieved from remote. + /// + /// Retry: + /// - Failed to retrieve the metadata of datanode. + pub async fn get_from_peer_datanode_table_value(&mut self) -> Result<&DatanodeTableValue> { + let datanode_value = &mut self.volatile_ctx.from_peer_datanode_table; + + if datanode_value.is_none() { + let table_id = self.persistent_ctx.region_id.table_id(); + let datanode_id = self.persistent_ctx.from_peer.id; + + let datanode_table = self + .table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey { + datanode_id, + table_id, + }) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(|e| error::Error::RetryLater { + reason: e.to_string(), + location: location!(), + })? + .context(error::DatanodeTableNotFoundSnafu { + table_id, + datanode_id, + })?; + + *datanode_value = Some(datanode_table); + } + + Ok(datanode_value.as_ref().unwrap()) + } + /// Removes the `table_info` of [VolatileContext], returns true if any. pub fn remove_table_info_value(&mut self) -> bool { let value = self.volatile_ctx.table_info.take(); @@ -889,7 +928,7 @@ mod tests { to_peer_id, Arc::new(|id| Ok(new_open_region_reply(id, false, None))), )), - Assertion::error(|error| assert!(error.is_retryable())), + Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")), ), // OpenCandidateRegion Step::next( @@ -942,25 +981,25 @@ mod tests { None, Assertion::simple(assert_region_migration_end, assert_done), ), + // RegionMigrationStart + Step::setup( + "Sets state to RegionMigrationStart", + merge_before_test_fn(vec![ + setup_state(Arc::new(|| Box::new(RegionMigrationStart))), + Arc::new(reset_volatile_ctx), + ]), + ), + // RegionMigrationEnd + // Note: We can't run this test multiple times; + // the `peer_id`'s `DatanodeTable` will be removed after first-time migration success. + Step::next( + "Should be the region migration end(has been migrated)", + None, + Assertion::simple(assert_region_migration_end, assert_done), + ), ]; - let setup_to_latest_persisted_state = Step::setup( - "Sets state to OpenCandidateRegion", - merge_before_test_fn(vec![ - setup_state(Arc::new(|| Box::new(OpenCandidateRegion))), - Arc::new(reset_volatile_ctx), - ]), - ); - - let steps = [ - steps.clone(), - vec![setup_to_latest_persisted_state.clone()], - steps.clone()[1..].to_vec(), - vec![setup_to_latest_persisted_state], - steps.clone()[1..].to_vec(), - ] - .concat(); - + let steps = [steps.clone()].concat(); let timer = Instant::now(); // Run the table tests. diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 68b291cb87c1..6bacc92433aa 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -52,7 +52,7 @@ impl State for RegionMigrationStart { let region_route = self.retrieve_region_route(ctx, region_id).await?; let to_peer = &ctx.persistent_ctx.to_peer; - if self.check_leader_region_on_peer(®ion_route, to_peer)? { + if self.has_migrated(®ion_route, to_peer)? { Ok((Box::new(RegionMigrationEnd), Status::Done)) } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true))) @@ -112,16 +112,12 @@ impl RegionMigrationStart { region_opened } - /// Checks whether the leader region on region has been opened. - /// Returns true if it's been opened. + /// Checks whether the region has been migrated. + /// Returns true if it's. /// /// Abort(non-retry): /// - Leader peer of RegionRoute is not found. - fn check_leader_region_on_peer( - &self, - region_route: &RegionRoute, - to_peer: &Peer, - ) -> Result { + fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result { let region_id = region_route.region.id; let region_opened = region_route diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 74b904ce0105..176db54952b3 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -13,13 +13,12 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; use std::time::Duration; use api::v1::meta::MailboxMessage; -use common_meta::ddl::utils::region_storage_path; use common_meta::distributed_time_constants::MAILBOX_RTT_SECS; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; +use common_meta::key::datanode_table::RegionInfo; use common_meta::RegionIdent; use common_procedure::Status; use serde::{Deserialize, Serialize}; @@ -58,26 +57,21 @@ impl OpenCandidateRegion { /// Builds open region instructions /// /// Abort(non-retry): - /// - Table Info is not found. + /// - Datanode Table is not found. async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result { let pc = &ctx.persistent_ctx; let cluster_id = pc.cluster_id; let table_id = pc.region_id.table_id(); let region_number = pc.region_id.region_number(); let candidate_id = pc.to_peer.id; + let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; - let table_info_value = ctx.get_table_info_value().await?; - let table_info = &table_info_value.table_info; - - // The region storage path is immutable after the region is created. - // Therefore, it's safe to store it in `VolatileContext` for future use. - let region_storage_path = - region_storage_path(&table_info.catalog_name, &table_info.schema_name); - - let engine = table_info.meta.engine.clone(); - let region_options: HashMap = (&table_info.meta.options).into(); - // TODO(niebayes): properly fetch or construct region wal options. - let region_wal_options = HashMap::new(); + let RegionInfo { + region_storage_path, + region_options, + region_wal_options, + engine, + } = datanode_table_value.region_info.clone(); let open_instruction = Instruction::OpenRegion(OpenRegion::new( RegionIdent { @@ -185,6 +179,7 @@ impl OpenCandidateRegion { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; use common_catalog::consts::MITO2_ENGINE; use common_meta::key::table_route::TableRouteValue; @@ -222,7 +217,7 @@ mod tests { } #[tokio::test] - async fn test_table_info_is_not_found_error() { + async fn test_datanode_table_is_not_found_error() { let state = OpenCandidateRegion; let persistent_context = new_persistent_context(); let env = TestingEnv::new(); @@ -233,7 +228,7 @@ mod tests { .await .unwrap_err(); - assert_matches!(err, Error::TableInfoNotFound { .. }); + assert_matches!(err, Error::DatanodeTableNotFound { .. }); assert!(!err.is_retryable()); } @@ -405,7 +400,7 @@ mod tests { let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { region: Region::new_test(persistent_context.region_id), - leader_peer: Some(Peer::empty(3)), + leader_peer: Some(Peer::empty(1)), ..Default::default() }]; diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 745b8487a8f3..793efa4db66e 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - -use common_meta::ddl::utils::region_storage_path; use common_meta::key::datanode_table::RegionInfo; use common_meta::rpc::router::RegionRoute; use common_telemetry::{info, warn}; @@ -138,18 +135,13 @@ impl UpdateMetadata { } let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?; - let table_info_value = ctx.get_table_info_value().await?; - - let table_info = &table_info_value.table_info; - let region_storage_path = - region_storage_path(&table_info.catalog_name, &table_info.schema_name); - let engine = table_info.meta.engine.clone(); - let region_options: HashMap = (&table_info.meta.options).into(); - - // TODO(niebayes): properly fetch or construct region wal options. - let region_wal_options = HashMap::new(); - - // No remote fetch. + let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; + let RegionInfo { + region_storage_path, + region_options, + region_wal_options, + engine, + } = datanode_table_value.region_info.clone(); let table_route_value = ctx.get_table_route_value().await?; if let Err(err) = table_metadata_manager