Skip to content

Commit

Permalink
feat: MetricsEngine table route (part 1) (#2952)
Browse files Browse the repository at this point in the history
* refactor: make `TableRouteValue` an enum to add the variant of MetricsEngine table route

* fix: resolve PR comments

* Update src/common/meta/src/key/table_route.rs

Co-authored-by: Weny Xu <[email protected]>

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
MichaelScofield and WenyXu authored Dec 20, 2023
1 parent 7d1724f commit 6ac47e9
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 110 deletions.
8 changes: 4 additions & 4 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::error::{
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
Expand Down Expand Up @@ -185,7 +184,7 @@ impl AlterTableProcedure {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();

let TableRouteValue { region_routes, .. } = self
let table_route = self
.context
.table_metadata_manager
.table_route_manager()
Expand All @@ -195,13 +194,14 @@ impl AlterTableProcedure {
table_name: table_ref.to_string(),
})?
.into_inner();
let region_routes = table_route.region_routes();

let leaders = find_leaders(&region_routes);
let leaders = find_leaders(region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(&region_routes, &datanode);
let regions = find_leader_regions(region_routes, &datanode);

for region in regions {
let region_id = RegionId::new(table_id, region);
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl DropTableData {
}

fn region_routes(&self) -> &Vec<RegionRoute> {
&self.table_route_value.region_routes
self.table_route_value.region_routes()
}

fn table_info(&self) -> &RawTableInfo {
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ async fn handle_truncate_table_task(
table_name: table_ref.to_string(),
})?;

let table_route = table_route_value.into_inner().region_routes;
let table_route = table_route_value.into_inner().region_routes().clone();

let id = ddl_manager
.submit_truncate_table_task(
Expand Down
18 changes: 9 additions & 9 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;

// Deletes datanode table key value pairs.
let distribution = region_distribution(&table_route_value.region_routes)?;
let distribution = region_distribution(table_route_value.region_routes())?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
Expand Down Expand Up @@ -593,7 +593,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(&current_table_route_value.region_routes)?;
region_distribution(current_table_route_value.region_routes())?;
let new_region_distribution = region_distribution(&new_region_routes)?;

let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
Expand Down Expand Up @@ -641,7 +641,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes.clone();
let mut new_region_routes = current_table_route_value.region_routes().clone();

let mut updated = 0;
for route in &mut new_region_routes {
Expand Down Expand Up @@ -826,7 +826,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
// creates metadata.
Expand Down Expand Up @@ -869,7 +869,7 @@ mod tests {
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes,
remote_table_route.unwrap().into_inner().region_routes(),
region_routes
);
}
Expand All @@ -879,7 +879,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
Expand Down Expand Up @@ -950,7 +950,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes, region_routes);
assert_eq!(removed_table_route.region_routes(), region_routes);
}

#[tokio::test]
Expand Down Expand Up @@ -1144,11 +1144,11 @@ mod tests {
.unwrap();

assert_eq!(
updated_route_value.region_routes[0].leader_status,
updated_route_value.region_routes()[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes[1].leader_status,
updated_route_value.region_routes()[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
Expand Down
48 changes: 38 additions & 10 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,42 +38,70 @@ impl TableRouteKey {
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct TableRouteValue {
pub enum TableRouteValue {
Physical(PhysicalTableRouteValue),
Logical(LogicalTableRouteValue),
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct PhysicalTableRouteValue {
pub region_routes: Vec<RegionRoute>,
version: u64,
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
// TODO(LFC): Add table route for MetricsEngine table.
}

impl TableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
Self {
Self::Physical(PhysicalTableRouteValue {
region_routes,
version: 0,
}
})
}

/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
Self {
let version = self.physical_table_route().version;
Self::Physical(PhysicalTableRouteValue {
region_routes,
version: self.version + 1,
}
version: version + 1,
})
}

/// Returns the version.
///
/// For test purpose.
#[cfg(any(tets, feature = "testing"))]
#[cfg(any(test, feature = "testing"))]
pub fn version(&self) -> u64 {
self.version
self.physical_table_route().version
}

/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.region_routes
self.physical_table_route()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
}

/// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.physical_table_route().region_routes
}

fn physical_table_route(&self) -> &PhysicalTableRouteValue {
match self {
TableRouteValue::Physical(x) => x,
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
}
}
}

impl TableMetaKey for TableRouteKey {
Expand Down Expand Up @@ -269,7 +297,7 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(&table_route.into_inner().region_routes))
.map(|table_route| region_distribution(table_route.region_routes()))
.transpose()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
.unwrap();

let should_downgraded = table_route_value
.region_routes
.region_routes()
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();
Expand Down
11 changes: 6 additions & 5 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;

let mut new_region_routes = table_route_value.region_routes.clone();
let mut new_region_routes = table_route_value.region_routes().clone();

for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
Expand Down Expand Up @@ -233,7 +233,8 @@ mod tests {
.unwrap()
.unwrap()
.into_inner()
.region_routes
.region_routes()
.clone()
}

// Original region routes:
Expand Down Expand Up @@ -395,8 +396,8 @@ mod tests {
.unwrap()
.into_inner();

let peers = &extract_all_peers(&table_route_value.region_routes);
let actual = &table_route_value.region_routes;
let peers = &extract_all_peers(table_route_value.region_routes());
let actual = table_route_value.region_routes();
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
Expand All @@ -415,7 +416,7 @@ mod tests {
.unwrap()
.into_inner();

let map = region_distribution(&table_route_value.region_routes).unwrap();
let map = region_distribution(table_route_value.region_routes()).unwrap();
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![1, 3]));
assert_eq!(map.get(&3), Some(&vec![2, 4]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl RegionMigrationStart {
let table_route = ctx.get_table_route_value().await?;

let region_route = table_route
.region_routes
.region_routes()
.iter()
.find(|route| route.region.id == region_id)
.cloned()
Expand Down
15 changes: 9 additions & 6 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,30 +377,33 @@ impl ProcedureMigrationTestSuite {
/// Verifies table metadata after region migration.
pub(crate) async fn verify_table_metadata(&self) {
let region_id = self.context.persistent_ctx.region_id;
let region_routes = self
let table_route = self
.env
.table_metadata_manager
.table_route_manager()
.get(region_id.table_id())
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
.into_inner();
let region_routes = table_route.region_routes();

let expected_leader_id = self.context.persistent_ctx.to_peer.id;
let removed_follower_id = self.context.persistent_ctx.from_peer.id;

let region_route = region_routes
.into_iter()
.iter()
.find(|route| route.region.id == region_id)
.unwrap();

assert!(!region_route.is_leader_downgraded());
assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id);
assert_eq!(
region_route.leader_peer.as_ref().unwrap().id,
expected_leader_id
);
assert!(!region_route
.follower_peers
.into_iter()
.iter()
.any(|route| route.id == removed_follower_id))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ mod tests {

// It should remain unchanged.
assert_eq!(latest_table_route.version(), 0);
assert!(!latest_table_route.region_routes[0].is_leader_downgraded());
assert!(!latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}

Expand Down Expand Up @@ -253,7 +253,7 @@ mod tests {
.unwrap()
.unwrap();

assert!(latest_table_route.region_routes[0].is_leader_downgraded());
assert!(latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,14 @@ mod tests {

state.rollback_downgraded_region(&mut ctx).await.unwrap();

let region_routes = table_metadata_manager
let table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
}

#[tokio::test]
Expand Down Expand Up @@ -229,14 +228,13 @@ mod tests {

assert!(ctx.volatile_ctx.table_route.is_none());

let region_routes = table_metadata_manager
let table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
}
}
Loading

0 comments on commit 6ac47e9

Please sign in to comment.