diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 5d3f0e447ce8..a48e46913173 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -45,7 +45,6 @@ use crate::error::{ }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; -use crate::key::table_route::TableRouteValue; use crate::key::DeserializedValueWithBytes; use crate::metrics; use crate::rpc::ddl::AlterTableTask; @@ -185,7 +184,7 @@ impl AlterTableProcedure { let table_id = self.data.table_id(); let table_ref = self.data.table_ref(); - let TableRouteValue { region_routes, .. } = self + let table_route = self .context .table_metadata_manager .table_route_manager() @@ -195,13 +194,14 @@ impl AlterTableProcedure { table_name: table_ref.to_string(), })? .into_inner(); + let region_routes = table_route.region_routes(); - let leaders = find_leaders(®ion_routes); + let leaders = find_leaders(region_routes); let mut alter_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { let requester = self.context.datanode_manager.datanode(&datanode).await; - let regions = find_leader_regions(®ion_routes, &datanode); + let regions = find_leader_regions(region_routes, &datanode); for region in regions { let region_id = RegionId::new(table_id, region); diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 6076e6125294..94c6cdf0a06a 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -307,7 +307,7 @@ impl DropTableData { } fn region_routes(&self) -> &Vec { - &self.table_route_value.region_routes + self.table_route_value.region_routes() } fn table_info(&self) -> &RawTableInfo { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 873ea04a7c17..8d94b8bfcac7 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -279,7 +279,7 @@ async fn handle_truncate_table_task( table_name: table_ref.to_string(), })?; - let table_route = table_route_value.into_inner().region_routes; + let table_route = table_route_value.into_inner().region_routes().clone(); let id = ddl_manager .submit_truncate_table_task( diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index d6ed8e04c5bf..66460672de87 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -468,7 +468,7 @@ impl TableMetadataManager { .build_delete_txn(table_id, table_info_value)?; // Deletes datanode table key value pairs. - let distribution = region_distribution(&table_route_value.region_routes)?; + let distribution = region_distribution(table_route_value.region_routes())?; let delete_datanode_txn = self .datanode_table_manager() .build_delete_txn(table_id, distribution)?; @@ -593,7 +593,7 @@ impl TableMetadataManager { ) -> Result<()> { // Updates the datanode table key value pairs. let current_region_distribution = - region_distribution(¤t_table_route_value.region_routes)?; + region_distribution(current_table_route_value.region_routes())?; let new_region_distribution = region_distribution(&new_region_routes)?; let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( @@ -641,7 +641,7 @@ impl TableMetadataManager { where F: Fn(&RegionRoute) -> Option>, { - let mut new_region_routes = current_table_route_value.region_routes.clone(); + let mut new_region_routes = current_table_route_value.region_routes().clone(); let mut updated = 0; for route in &mut new_region_routes { @@ -826,7 +826,7 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); - let region_routes = vec![region_route.clone()]; + let region_routes = &vec![region_route.clone()]; let table_info: RawTableInfo = new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); // creates metadata. @@ -869,7 +869,7 @@ mod tests { table_info ); assert_eq!( - remote_table_route.unwrap().into_inner().region_routes, + remote_table_route.unwrap().into_inner().region_routes(), region_routes ); } @@ -879,7 +879,7 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); - let region_routes = vec![region_route.clone()]; + let region_routes = &vec![region_route.clone()]; let table_info: RawTableInfo = new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); let table_id = table_info.ident.table_id; @@ -950,7 +950,7 @@ mod tests { .unwrap() .unwrap() .into_inner(); - assert_eq!(removed_table_route.region_routes, region_routes); + assert_eq!(removed_table_route.region_routes(), region_routes); } #[tokio::test] @@ -1144,11 +1144,11 @@ mod tests { .unwrap(); assert_eq!( - updated_route_value.region_routes[0].leader_status, + updated_route_value.region_routes()[0].leader_status, Some(RegionStatus::Downgraded) ); assert_eq!( - updated_route_value.region_routes[1].leader_status, + updated_route_value.region_routes()[1].leader_status, Some(RegionStatus::Downgraded) ); } diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 231c71ccba92..852c17937c34 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -38,42 +38,70 @@ impl TableRouteKey { } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] -pub struct TableRouteValue { +pub enum TableRouteValue { + Physical(PhysicalTableRouteValue), + Logical(LogicalTableRouteValue), +} + +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct PhysicalTableRouteValue { pub region_routes: Vec, version: u64, } +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct LogicalTableRouteValue { + // TODO(LFC): Add table route for MetricsEngine table. +} + impl TableRouteValue { pub fn new(region_routes: Vec) -> Self { - Self { + Self::Physical(PhysicalTableRouteValue { region_routes, version: 0, - } + }) } /// Returns a new version [TableRouteValue] with `region_routes`. pub fn update(&self, region_routes: Vec) -> Self { - Self { + let version = self.physical_table_route().version; + Self::Physical(PhysicalTableRouteValue { region_routes, - version: self.version + 1, - } + version: version + 1, + }) } /// Returns the version. /// /// For test purpose. - #[cfg(any(tets, feature = "testing"))] + #[cfg(any(test, feature = "testing"))] pub fn version(&self) -> u64 { - self.version + self.physical_table_route().version } /// Returns the corresponding [RegionRoute]. pub fn region_route(&self, region_id: RegionId) -> Option { - self.region_routes + self.physical_table_route() + .region_routes .iter() .find(|route| route.region.id == region_id) .cloned() } + + /// Gets the [RegionRoute]s of this [TableRouteValue::Physical]. + /// + /// # Panics + /// The route type is not the [TableRouteValue::Physical]. + pub fn region_routes(&self) -> &Vec { + &self.physical_table_route().region_routes + } + + fn physical_table_route(&self) -> &PhysicalTableRouteValue { + match self { + TableRouteValue::Physical(x) => x, + _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"), + } + } } impl TableMetaKey for TableRouteKey { @@ -269,7 +297,7 @@ impl TableRouteManager { ) -> Result> { self.get(table_id) .await? - .map(|table_route| region_distribution(&table_route.into_inner().region_routes)) + .map(|table_route| region_distribution(table_route.region_routes())) .transpose() } } diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index 84466eb19928..c2d06590aec2 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -207,7 +207,7 @@ mod tests { .unwrap(); let should_downgraded = table_route_value - .region_routes + .region_routes() .iter() .find(|route| route.region.id.region_number() == failed_region.region_number) .unwrap(); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 505f1cb55a51..23ade1a2a1fe 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -85,7 +85,7 @@ impl UpdateRegionMetadata { .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; - let mut new_region_routes = table_route_value.region_routes.clone(); + let mut new_region_routes = table_route_value.region_routes().clone(); for region_route in new_region_routes.iter_mut() { if region_route.region.id.region_number() == failed_region.region_number { @@ -233,7 +233,8 @@ mod tests { .unwrap() .unwrap() .into_inner() - .region_routes + .region_routes() + .clone() } // Original region routes: @@ -395,8 +396,8 @@ mod tests { .unwrap() .into_inner(); - let peers = &extract_all_peers(&table_route_value.region_routes); - let actual = &table_route_value.region_routes; + let peers = &extract_all_peers(table_route_value.region_routes()); + let actual = table_route_value.region_routes(); let expected = &vec![ new_region_route(1, peers, 2), new_region_route(2, peers, 3), @@ -415,7 +416,7 @@ mod tests { .unwrap() .into_inner(); - let map = region_distribution(&table_route_value.region_routes).unwrap(); + let map = region_distribution(table_route_value.region_routes()).unwrap(); assert_eq!(map.len(), 2); assert_eq!(map.get(&2), Some(&vec![1, 3])); assert_eq!(map.get(&3), Some(&vec![2, 4])); diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 3ef5d46c6595..cd9b5bad5a5d 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -84,7 +84,7 @@ impl RegionMigrationStart { let table_route = ctx.get_table_route_value().await?; let region_route = table_route - .region_routes + .region_routes() .iter() .find(|route| route.region.id == region_id) .cloned() diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 1c95a2d393a7..6496c18ee516 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -377,7 +377,7 @@ impl ProcedureMigrationTestSuite { /// Verifies table metadata after region migration. pub(crate) async fn verify_table_metadata(&self) { let region_id = self.context.persistent_ctx.region_id; - let region_routes = self + let table_route = self .env .table_metadata_manager .table_route_manager() @@ -385,22 +385,25 @@ impl ProcedureMigrationTestSuite { .await .unwrap() .unwrap() - .into_inner() - .region_routes; + .into_inner(); + let region_routes = table_route.region_routes(); let expected_leader_id = self.context.persistent_ctx.to_peer.id; let removed_follower_id = self.context.persistent_ctx.from_peer.id; let region_route = region_routes - .into_iter() + .iter() .find(|route| route.region.id == region_id) .unwrap(); assert!(!region_route.is_leader_downgraded()); - assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id); + assert_eq!( + region_route.leader_peer.as_ref().unwrap().id, + expected_leader_id + ); assert!(!region_route .follower_peers - .into_iter() + .iter() .any(|route| route.id == removed_follower_id)) } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 7deaddb5c27b..05dbb1935f19 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -212,7 +212,7 @@ mod tests { // It should remain unchanged. assert_eq!(latest_table_route.version(), 0); - assert!(!latest_table_route.region_routes[0].is_leader_downgraded()); + assert!(!latest_table_route.region_routes()[0].is_leader_downgraded()); assert!(ctx.volatile_ctx.table_route.is_none()); } @@ -253,7 +253,7 @@ mod tests { .unwrap() .unwrap(); - assert!(latest_table_route.region_routes[0].is_leader_downgraded()); + assert!(latest_table_route.region_routes()[0].is_leader_downgraded()); assert!(ctx.volatile_ctx.table_route.is_none()); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 6c1a2648535a..e7fa73dedf8d 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -166,15 +166,14 @@ mod tests { state.rollback_downgraded_region(&mut ctx).await.unwrap(); - let region_routes = table_metadata_manager + let table_route = table_metadata_manager .table_route_manager() .get(table_id) .await .unwrap() .unwrap() - .into_inner() - .region_routes; - assert_eq!(expected_region_routes, region_routes); + .into_inner(); + assert_eq!(&expected_region_routes, table_route.region_routes()); } #[tokio::test] @@ -229,14 +228,13 @@ mod tests { assert!(ctx.volatile_ctx.table_route.is_none()); - let region_routes = table_metadata_manager + let table_route = table_metadata_manager .table_route_manager() .get(table_id) .await .unwrap() .unwrap() - .into_inner() - .region_routes; - assert_eq!(expected_region_routes, region_routes); + .into_inner(); + assert_eq!(&expected_region_routes, table_route.region_routes()); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 4886df0e5af4..bb86280ba000 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -33,7 +33,7 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_route_value = ctx.get_table_route_value().await?.clone(); - let mut region_routes = table_route_value.region_routes.clone(); + let mut region_routes = table_route_value.region_routes().clone(); let region_route = region_routes .iter_mut() .find(|route| route.region.id == region_id) @@ -81,7 +81,7 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_route_value = ctx.get_table_route_value().await?.clone(); - let region_routes = table_route_value.region_routes.clone(); + let region_routes = table_route_value.region_routes().clone(); let region_route = region_routes .into_iter() .find(|route| route.region.id == region_id) @@ -480,14 +480,14 @@ mod tests { let _ = next.as_any().downcast_ref::().unwrap(); - let region_routes = table_metadata_manager + let table_route = table_metadata_manager .table_route_manager() .get(table_id) .await .unwrap() .unwrap() - .into_inner() - .region_routes; + .into_inner(); + let region_routes = table_route.region_routes(); assert!(ctx.volatile_ctx.table_route.is_none()); assert!(ctx.volatile_ctx.opening_region_guard.is_none()); diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 114a48beff72..a5f5beeacd35 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -143,7 +143,7 @@ async fn get_leader_peer_ids( .context(error::TableMetadataManagerSnafu) .map(|route| { route.map_or_else(Vec::new, |route| { - find_leaders(&route.region_routes) + find_leaders(route.region_routes()) .into_iter() .map(|peer| peer.id) .collect() diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 41b3bef065f8..ad15c62cc1dd 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -19,7 +19,7 @@ use api::v1::Rows; use common_meta::key::table_route::TableRouteManager; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoutes}; +use common_meta::rpc::router::RegionRoutes; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; @@ -76,56 +76,7 @@ impl PartitionRuleManager { .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); - Ok(RegionRoutes(route.region_routes)) - } - - /// Find datanodes of corresponding regions of given table. - pub async fn find_region_datanodes( - &self, - table_id: TableId, - regions: Vec, - ) -> Result>> { - let route = self - .table_route_manager - .get(table_id) - .await - .context(error::TableRouteManagerSnafu)? - .context(error::FindTableRoutesSnafu { table_id })? - .into_inner(); - let mut datanodes = HashMap::with_capacity(regions.len()); - let region_map = convert_to_region_leader_map(&route.region_routes); - for region in regions.iter() { - let datanode = *region_map.get(region).context(error::FindDatanodeSnafu { - table_id, - region: *region, - })?; - datanodes - .entry(datanode.clone()) - .or_insert_with(Vec::new) - .push(*region); - } - Ok(datanodes) - } - - /// Find all leader peers of given table. - pub async fn find_table_region_leaders(&self, table_id: TableId) -> Result> { - let route = self - .table_route_manager - .get(table_id) - .await - .context(error::TableRouteManagerSnafu)? - .context(error::FindTableRoutesSnafu { table_id })? - .into_inner(); - let mut peers = Vec::with_capacity(route.region_routes.len()); - - for peer in &route.region_routes { - peers.push(peer.leader_peer.clone().with_context(|| FindLeaderSnafu { - region_id: peer.region.id, - table_id, - })?); - } - - Ok(peers) + Ok(RegionRoutes(route.region_routes().clone())) } pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { @@ -136,13 +87,15 @@ impl PartitionRuleManager { .context(error::TableRouteManagerSnafu)? .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); + let region_routes = route.region_routes(); + ensure!( - !route.region_routes.is_empty(), + !region_routes.is_empty(), error::FindTableRoutesSnafu { table_id } ); - let mut partitions = Vec::with_capacity(route.region_routes.len()); - for r in route.region_routes.iter() { + let mut partitions = Vec::with_capacity(region_routes.len()); + for r in region_routes { let partition = r .region .partition diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index e997139b5357..e9731cc336fa 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -521,7 +521,7 @@ CREATE TABLE {table_name} ( .unwrap() .into_inner(); - let region_to_dn_map = region_distribution(&table_route_value.region_routes) + let region_to_dn_map = region_distribution(table_route_value.region_routes()) .unwrap() .iter() .map(|(k, v)| (v[0], *k)) diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index ac5a2e4b3ca9..05253dc0a236 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -216,7 +216,7 @@ mod tests { .unwrap() .into_inner(); - let region_to_dn_map = region_distribution(&table_route_value.region_routes) + let region_to_dn_map = region_distribution(table_route_value.region_routes()) .unwrap() .iter() .map(|(k, v)| (v[0], *k))