Skip to content

Commit

Permalink
refactor: refactor drop table executor (#3589)
Browse files Browse the repository at this point in the history
* refactor: refactor drop table executor

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Mar 27, 2024
1 parent 5fa01e7 commit 623c930
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 71 deletions.
8 changes: 7 additions & 1 deletion src/cmd/src/cli/bench/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,15 @@ impl TableMetadataBencher {
.await
.unwrap();
let start = Instant::now();
let table_info = table_info.unwrap();
let table_id = table_info.table_info.ident.table_id;
let _ = self
.table_metadata_manager
.delete_table_metadata(&table_info.unwrap(), &table_route.unwrap())
.delete_table_metadata(
table_id,
&table_info.table_name(),
table_route.unwrap().region_routes().unwrap(),
)
.await;
start.elapsed()
},
Expand Down
41 changes: 22 additions & 19 deletions src/common/meta/src/ddl/drop_database/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
use common_procedure::Status;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;

use super::executor::DropDatabaseExecutor;
use super::metadata::DropDatabaseRemoveMetadata;
use super::DropTableTarget;
use crate::ddl::drop_database::{DropDatabaseContext, State};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::table_name::TableName;

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -66,31 +64,36 @@ impl DropDatabaseCursor {
ctx: &mut DropDatabaseContext,
table_name: String,
table_id: TableId,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_route_value: TableRouteValue,
) -> Result<(Box<dyn State>, Status)> {
match (self.target, table_route_value.get_inner_ref()) {
(DropTableTarget::Logical, TableRouteValue::Logical(_))
| (DropTableTarget::Physical, TableRouteValue::Physical(_)) => {
// TODO(weny): Maybe we can drop the table without fetching the `TableInfoValue`
let table_info_value = ddl_ctx
match (self.target, table_route_value) {
(DropTableTarget::Logical, TableRouteValue::Logical(route)) => {
let table_id = route.physical_table_id();

let (_, table_route) = ddl_ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.context(error::TableNotFoundSnafu {
table_name: &table_name,
})?;
.table_route_manager()
.get_physical_table_route(table_id)
.await?;
Ok((
Box::new(DropDatabaseExecutor::new(
TableName::new(&ctx.catalog, &ctx.schema, &table_name),
table_id,
table_info_value,
table_route_value,
TableName::new(&ctx.catalog, &ctx.schema, &table_name),
table_route.region_routes,
self.target,
)),
Status::executing(true),
))
}
(DropTableTarget::Physical, TableRouteValue::Physical(table_route)) => Ok((
Box::new(DropDatabaseExecutor::new(
table_id,
TableName::new(&ctx.catalog, &ctx.schema, &table_name),
table_route.region_routes,
self.target,
)),
Status::executing(true),
)),
_ => Ok((
Box::new(DropDatabaseCursor::new(self.target)),
Status::executing(false),
Expand Down Expand Up @@ -122,7 +125,7 @@ impl State for DropDatabaseCursor {
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.get(table_id)
.await?
{
Some(table_route_value) => {
Expand Down
25 changes: 9 additions & 16 deletions src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,15 @@ use crate::ddl::drop_database::State;
use crate::ddl::drop_table::executor::DropTableExecutor;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::router::operating_leader_regions;
use crate::rpc::router::{operating_leader_regions, RegionRoute};
use crate::table_name::TableName;

#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseExecutor {
table_name: TableName,
table_id: TableId,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_name: TableName,
region_routes: Vec<RegionRoute>,
target: DropTableTarget,
#[serde(skip)]
dropping_regions: Vec<OperatingRegionGuard>,
Expand All @@ -45,17 +41,15 @@ pub struct DropDatabaseExecutor {
impl DropDatabaseExecutor {
/// Returns a new [DropDatabaseExecutor].
pub fn new(
table_name: TableName,
table_id: TableId,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_name: TableName,
region_routes: Vec<RegionRoute>,
target: DropTableTarget,
) -> Self {
Self {
table_name,
table_id,
table_info_value,
table_route_value,
region_routes,
target,
dropping_regions: vec![],
}
Expand All @@ -64,8 +58,7 @@ impl DropDatabaseExecutor {

impl DropDatabaseExecutor {
fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
let region_routes = self.table_route_value.region_routes()?;
let dropping_regions = operating_leader_regions(region_routes);
let dropping_regions = operating_leader_regions(&self.region_routes);
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
for (region_id, datanode_id) in dropping_regions {
let guard = ddl_ctx
Expand Down Expand Up @@ -93,11 +86,11 @@ impl State for DropDatabaseExecutor {
self.register_dropping_regions(ddl_ctx)?;
let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
executor
.on_remove_metadata(ddl_ctx, &self.table_info_value, &self.table_route_value)
.on_remove_metadata(ddl_ctx, &self.region_routes)
.await?;
executor.invalidate_table_cache(ddl_ctx).await?;
executor
.on_drop_regions(ddl_ctx, &self.table_route_value)
.on_drop_regions(ddl_ctx, &self.region_routes)
.await?;
info!("Table: {}({}) is dropped", self.table_name, self.table_id);

Expand Down
11 changes: 4 additions & 7 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;

use self::executor::DropTableExecutor;
use super::utils::handle_retry_error;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_info::TableInfoValue;
Expand Down Expand Up @@ -121,11 +121,7 @@ impl DropTableProcedure {
// TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping.
let table_id = self.data.table_id();
executor
.on_remove_metadata(
&self.context,
&self.data.table_info_value,
&self.data.table_route_value,
)
.on_remove_metadata(&self.context, self.data.region_routes()?)
.await?;
info!("Deleted table metadata for table {table_id}");
self.data.state = DropTableState::InvalidateTableCache;
Expand All @@ -142,7 +138,7 @@ impl DropTableProcedure {

pub async fn on_datanode_drop_regions(&self, executor: &DropTableExecutor) -> Result<Status> {
executor
.on_drop_regions(&self.context, &self.data.table_route_value)
.on_drop_regions(&self.context, self.data.region_routes()?)
.await?;
Ok(Status::done())
}
Expand Down Expand Up @@ -192,6 +188,7 @@ impl Procedure for DropTableProcedure {
}

#[derive(Debug, Serialize, Deserialize)]
/// TODO(weny): simplify the table data.
pub struct DropTableData {
pub state: DropTableState,
pub cluster_id: u64,
Expand Down
15 changes: 5 additions & 10 deletions src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::rpc::router::{find_leader_regions, find_leaders};
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::table_name::TableName;

/// [Control] indicated to the caller whether to go to the next step.
Expand Down Expand Up @@ -106,11 +103,10 @@ impl DropTableExecutor {
pub async fn on_remove_metadata(
&self,
ctx: &DdlContext,
table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
region_routes: &[RegionRoute],
) -> Result<()> {
ctx.table_metadata_manager
.delete_table_metadata(table_info_value, table_route_value)
.delete_table_metadata(self.table_id, &self.table, region_routes)
.await
}

Expand Down Expand Up @@ -138,10 +134,8 @@ impl DropTableExecutor {
pub async fn on_drop_regions(
&self,
ctx: &DdlContext,
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
region_routes: &[RegionRoute],
) -> Result<()> {
// The `table_route_value` always be the physical table route.
let region_routes = table_route_value.region_routes()?;
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
let table_id = self.table_id;
Expand Down Expand Up @@ -202,6 +196,7 @@ mod tests {
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::key::table_route::TableRouteValue;
use crate::table_name::TableName;
use crate::test_util::{new_ddl_context, MockDatanodeManager};

Expand Down
33 changes: 16 additions & 17 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ use crate::error::{self, Result, SerdeJsonSnafu};
use crate::kv_backend::txn::{Txn, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
use crate::table_name::TableName;
use crate::DatanodeId;

pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*";
Expand Down Expand Up @@ -552,17 +553,15 @@ impl TableMetadataManager {
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn delete_table_metadata(
&self,
table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
table_id: TableId,
table_name: &TableName,
region_routes: &[RegionRoute],
) -> Result<()> {
let table_info = &table_info_value.table_info;
let table_id = table_info.ident.table_id;

// Deletes table name.
let table_name = TableNameKey::new(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name,
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);

let delete_table_name_txn = self.table_name_manager().build_delete_txn(&table_name)?;
Expand All @@ -571,7 +570,7 @@ impl TableMetadataManager {
let delete_table_info_txn = self.table_info_manager().build_delete_txn(table_id)?;

// Deletes datanode table key value pairs.
let distribution = region_distribution(table_route_value.region_routes()?);
let distribution = region_distribution(region_routes);
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
Expand Down Expand Up @@ -929,6 +928,7 @@ mod tests {
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionStatus};
use crate::table_name::TableName;

#[test]
fn test_deserialized_value_with_bytes() {
Expand Down Expand Up @@ -1144,9 +1144,6 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let table_route_value = DeserializedValueWithBytes::from_inner(TableRouteValue::physical(
region_routes.clone(),
));

// creates metadata.
create_physical_table_metadata(
Expand All @@ -1157,18 +1154,20 @@ mod tests {
.await
.unwrap();

let table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));

let table_name = TableName::new(
table_info.catalog_name,
table_info.schema_name,
table_info.name,
);
// deletes metadata.
table_metadata_manager
.delete_table_metadata(&table_info_value, &table_route_value)
.delete_table_metadata(table_id, &table_name, region_routes)
.await
.unwrap();

// if metadata was already deleted, it should be ok.
table_metadata_manager
.delete_table_metadata(&table_info_value, &table_route_value)
.delete_table_metadata(table_id, &table_name, region_routes)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl TableRouteValue {
///
/// # Panic
/// If it is not the [`PhysicalTableRouteValue`].
fn into_physical_table_route(self) -> PhysicalTableRouteValue {
pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
match self {
TableRouteValue::Physical(x) => x,
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
Expand Down

0 comments on commit 623c930

Please sign in to comment.