From 623c930736e66320127480014a46ade7cf049a90 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 27 Mar 2024 14:29:54 +0800 Subject: [PATCH] refactor: refactor drop table executor (#3589) * refactor: refactor drop table executor * chore: apply suggestions from CR --- src/cmd/src/cli/bench/metadata.rs | 8 +++- .../meta/src/ddl/drop_database/cursor.rs | 41 ++++++++++--------- .../meta/src/ddl/drop_database/executor.rs | 25 ++++------- src/common/meta/src/ddl/drop_table.rs | 11 ++--- .../meta/src/ddl/drop_table/executor.rs | 15 +++---- src/common/meta/src/key.rs | 33 ++++++++------- src/common/meta/src/key/table_route.rs | 2 +- 7 files changed, 64 insertions(+), 71 deletions(-) diff --git a/src/cmd/src/cli/bench/metadata.rs b/src/cmd/src/cli/bench/metadata.rs index 6eedc18eac18..fc337d1ac466 100644 --- a/src/cmd/src/cli/bench/metadata.rs +++ b/src/cmd/src/cli/bench/metadata.rs @@ -106,9 +106,15 @@ impl TableMetadataBencher { .await .unwrap(); let start = Instant::now(); + let table_info = table_info.unwrap(); + let table_id = table_info.table_info.ident.table_id; let _ = self .table_metadata_manager - .delete_table_metadata(&table_info.unwrap(), &table_route.unwrap()) + .delete_table_metadata( + table_id, + &table_info.table_name(), + table_route.unwrap().region_routes().unwrap(), + ) .await; start.elapsed() }, diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index afc5b152afc0..5ea7a19585fa 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -15,7 +15,6 @@ use common_procedure::Status; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; use table::metadata::TableId; use super::executor::DropDatabaseExecutor; @@ -23,9 +22,8 @@ use super::metadata::DropDatabaseRemoveMetadata; use super::DropTableTarget; use crate::ddl::drop_database::{DropDatabaseContext, State}; use crate::ddl::DdlContext; -use crate::error::{self, Result}; +use crate::error::Result; use crate::key::table_route::TableRouteValue; -use crate::key::DeserializedValueWithBytes; use crate::table_name::TableName; #[derive(Debug, Serialize, Deserialize)] @@ -66,31 +64,36 @@ impl DropDatabaseCursor { ctx: &mut DropDatabaseContext, table_name: String, table_id: TableId, - table_route_value: DeserializedValueWithBytes, + table_route_value: TableRouteValue, ) -> Result<(Box, Status)> { - match (self.target, table_route_value.get_inner_ref()) { - (DropTableTarget::Logical, TableRouteValue::Logical(_)) - | (DropTableTarget::Physical, TableRouteValue::Physical(_)) => { - // TODO(weny): Maybe we can drop the table without fetching the `TableInfoValue` - let table_info_value = ddl_ctx + match (self.target, table_route_value) { + (DropTableTarget::Logical, TableRouteValue::Logical(route)) => { + let table_id = route.physical_table_id(); + + let (_, table_route) = ddl_ctx .table_metadata_manager - .table_info_manager() - .get(table_id) - .await? - .context(error::TableNotFoundSnafu { - table_name: &table_name, - })?; + .table_route_manager() + .get_physical_table_route(table_id) + .await?; Ok(( Box::new(DropDatabaseExecutor::new( - TableName::new(&ctx.catalog, &ctx.schema, &table_name), table_id, - table_info_value, - table_route_value, + TableName::new(&ctx.catalog, &ctx.schema, &table_name), + table_route.region_routes, self.target, )), Status::executing(true), )) } + (DropTableTarget::Physical, TableRouteValue::Physical(table_route)) => Ok(( + Box::new(DropDatabaseExecutor::new( + table_id, + TableName::new(&ctx.catalog, &ctx.schema, &table_name), + table_route.region_routes, + self.target, + )), + Status::executing(true), + )), _ => Ok(( Box::new(DropDatabaseCursor::new(self.target)), Status::executing(false), @@ -122,7 +125,7 @@ impl State for DropDatabaseCursor { .table_metadata_manager .table_route_manager() .table_route_storage() - .get_raw(table_id) + .get(table_id) .await? { Some(table_route_value) => { diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 096493b9ce43..0bbdc2271955 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -24,19 +24,15 @@ use crate::ddl::drop_database::State; use crate::ddl::drop_table::executor::DropTableExecutor; use crate::ddl::DdlContext; use crate::error::{self, Result}; -use crate::key::table_info::TableInfoValue; -use crate::key::table_route::TableRouteValue; -use crate::key::DeserializedValueWithBytes; use crate::region_keeper::OperatingRegionGuard; -use crate::rpc::router::operating_leader_regions; +use crate::rpc::router::{operating_leader_regions, RegionRoute}; use crate::table_name::TableName; #[derive(Debug, Serialize, Deserialize)] pub struct DropDatabaseExecutor { - table_name: TableName, table_id: TableId, - table_info_value: DeserializedValueWithBytes, - table_route_value: DeserializedValueWithBytes, + table_name: TableName, + region_routes: Vec, target: DropTableTarget, #[serde(skip)] dropping_regions: Vec, @@ -45,17 +41,15 @@ pub struct DropDatabaseExecutor { impl DropDatabaseExecutor { /// Returns a new [DropDatabaseExecutor]. pub fn new( - table_name: TableName, table_id: TableId, - table_info_value: DeserializedValueWithBytes, - table_route_value: DeserializedValueWithBytes, + table_name: TableName, + region_routes: Vec, target: DropTableTarget, ) -> Self { Self { table_name, table_id, - table_info_value, - table_route_value, + region_routes, target, dropping_regions: vec![], } @@ -64,8 +58,7 @@ impl DropDatabaseExecutor { impl DropDatabaseExecutor { fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> { - let region_routes = self.table_route_value.region_routes()?; - let dropping_regions = operating_leader_regions(region_routes); + let dropping_regions = operating_leader_regions(&self.region_routes); let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); for (region_id, datanode_id) in dropping_regions { let guard = ddl_ctx @@ -93,11 +86,11 @@ impl State for DropDatabaseExecutor { self.register_dropping_regions(ddl_ctx)?; let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true); executor - .on_remove_metadata(ddl_ctx, &self.table_info_value, &self.table_route_value) + .on_remove_metadata(ddl_ctx, &self.region_routes) .await?; executor.invalidate_table_cache(ddl_ctx).await?; executor - .on_drop_regions(ddl_ctx, &self.table_route_value) + .on_drop_regions(ddl_ctx, &self.region_routes) .await?; info!("Table: {}({}) is dropped", self.table_name, self.table_id); diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index aa60caaaeb86..d2ca94590351 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -27,7 +27,7 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; use self::executor::DropTableExecutor; -use super::utils::handle_retry_error; +use crate::ddl::utils::handle_retry_error; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_info::TableInfoValue; @@ -121,11 +121,7 @@ impl DropTableProcedure { // TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping. let table_id = self.data.table_id(); executor - .on_remove_metadata( - &self.context, - &self.data.table_info_value, - &self.data.table_route_value, - ) + .on_remove_metadata(&self.context, self.data.region_routes()?) .await?; info!("Deleted table metadata for table {table_id}"); self.data.state = DropTableState::InvalidateTableCache; @@ -142,7 +138,7 @@ impl DropTableProcedure { pub async fn on_datanode_drop_regions(&self, executor: &DropTableExecutor) -> Result { executor - .on_drop_regions(&self.context, &self.data.table_route_value) + .on_drop_regions(&self.context, self.data.region_routes()?) .await?; Ok(Status::done()) } @@ -192,6 +188,7 @@ impl Procedure for DropTableProcedure { } #[derive(Debug, Serialize, Deserialize)] +/// TODO(weny): simplify the table data. pub struct DropTableData { pub state: DropTableState, pub cluster_id: u64, diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index d869af7c90d2..37ca7c20c4e8 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -29,11 +29,8 @@ use crate::ddl::utils::add_peer_context_if_needed; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::instruction::CacheIdent; -use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; -use crate::key::table_route::TableRouteValue; -use crate::key::DeserializedValueWithBytes; -use crate::rpc::router::{find_leader_regions, find_leaders}; +use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; use crate::table_name::TableName; /// [Control] indicated to the caller whether to go to the next step. @@ -106,11 +103,10 @@ impl DropTableExecutor { pub async fn on_remove_metadata( &self, ctx: &DdlContext, - table_info_value: &DeserializedValueWithBytes, - table_route_value: &DeserializedValueWithBytes, + region_routes: &[RegionRoute], ) -> Result<()> { ctx.table_metadata_manager - .delete_table_metadata(table_info_value, table_route_value) + .delete_table_metadata(self.table_id, &self.table, region_routes) .await } @@ -138,10 +134,8 @@ impl DropTableExecutor { pub async fn on_drop_regions( &self, ctx: &DdlContext, - table_route_value: &DeserializedValueWithBytes, + region_routes: &[RegionRoute], ) -> Result<()> { - // The `table_route_value` always be the physical table route. - let region_routes = table_route_value.region_routes()?; let leaders = find_leaders(region_routes); let mut drop_region_tasks = Vec::with_capacity(leaders.len()); let table_id = self.table_id; @@ -202,6 +196,7 @@ mod tests { use crate::ddl::test_util::create_table::{ build_raw_table_info_from_expr, TestCreateTableExprBuilder, }; + use crate::key::table_route::TableRouteValue; use crate::table_name::TableName; use crate::test_util::{new_ddl_context, MockDatanodeManager}; diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index c6fc331a2be8..f1c7d1bfa86d 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -88,6 +88,7 @@ use crate::error::{self, Result, SerdeJsonSnafu}; use crate::kv_backend::txn::{Txn, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; +use crate::table_name::TableName; use crate::DatanodeId; pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*"; @@ -552,17 +553,15 @@ impl TableMetadataManager { /// The caller MUST ensure it has the exclusive access to `TableNameKey`. pub async fn delete_table_metadata( &self, - table_info_value: &DeserializedValueWithBytes, - table_route_value: &DeserializedValueWithBytes, + table_id: TableId, + table_name: &TableName, + region_routes: &[RegionRoute], ) -> Result<()> { - let table_info = &table_info_value.table_info; - let table_id = table_info.ident.table_id; - // Deletes table name. let table_name = TableNameKey::new( - &table_info.catalog_name, - &table_info.schema_name, - &table_info.name, + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, ); let delete_table_name_txn = self.table_name_manager().build_delete_txn(&table_name)?; @@ -571,7 +570,7 @@ impl TableMetadataManager { let delete_table_info_txn = self.table_info_manager().build_delete_txn(table_id)?; // Deletes datanode table key value pairs. - let distribution = region_distribution(table_route_value.region_routes()?); + let distribution = region_distribution(region_routes); let delete_datanode_txn = self .datanode_table_manager() .build_delete_txn(table_id, distribution)?; @@ -929,6 +928,7 @@ mod tests { use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionStatus}; + use crate::table_name::TableName; #[test] fn test_deserialized_value_with_bytes() { @@ -1144,9 +1144,6 @@ mod tests { new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); let table_id = table_info.ident.table_id; let datanode_id = 2; - let table_route_value = DeserializedValueWithBytes::from_inner(TableRouteValue::physical( - region_routes.clone(), - )); // creates metadata. create_physical_table_metadata( @@ -1157,18 +1154,20 @@ mod tests { .await .unwrap(); - let table_info_value = - DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone())); - + let table_name = TableName::new( + table_info.catalog_name, + table_info.schema_name, + table_info.name, + ); // deletes metadata. table_metadata_manager - .delete_table_metadata(&table_info_value, &table_route_value) + .delete_table_metadata(table_id, &table_name, region_routes) .await .unwrap(); // if metadata was already deleted, it should be ok. table_metadata_manager - .delete_table_metadata(&table_info_value, &table_route_value) + .delete_table_metadata(table_id, &table_name, region_routes) .await .unwrap(); diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 3b2e643176d9..82a5a4e1f6f6 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -147,7 +147,7 @@ impl TableRouteValue { /// /// # Panic /// If it is not the [`PhysicalTableRouteValue`]. - fn into_physical_table_route(self) -> PhysicalTableRouteValue { + pub fn into_physical_table_route(self) -> PhysicalTableRouteValue { match self { TableRouteValue::Physical(x) => x, _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),