Skip to content

Commit

Permalink
fix: rollback only if dropping the metric physical table fails (#4525)
Browse files Browse the repository at this point in the history
* fix: rollback only if dropping the metric physical table fails

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Aug 9, 2024
1 parent 8f3293d commit 27d9aa0
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 9 deletions.
5 changes: 4 additions & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Procedure for DropTableProcedure {
}

fn rollback_supported(&self) -> bool {
!matches!(self.data.state, DropTableState::Prepare)
!matches!(self.data.state, DropTableState::Prepare) && self.data.allow_rollback
}

async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
Expand Down Expand Up @@ -256,6 +256,8 @@ pub struct DropTableData {
pub task: DropTableTask,
pub physical_region_routes: Vec<RegionRoute>,
pub physical_table_id: Option<TableId>,
#[serde(default)]
pub allow_rollback: bool,
}

impl DropTableData {
Expand All @@ -266,6 +268,7 @@ impl DropTableData {
task,
physical_region_routes: vec![],
physical_table_id: None,
allow_rollback: false,
}
}

Expand Down
23 changes: 22 additions & 1 deletion src/common/meta/src/ddl/drop_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_catalog::format_full_table_name;
use snafu::OptionExt;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;

use crate::ddl::drop_table::DropTableProcedure;
use crate::error::Result;
use crate::error::{self, Result};

impl DropTableProcedure {
/// Fetches the table info and physical table route.
Expand All @@ -29,6 +33,23 @@ impl DropTableProcedure {
self.data.physical_region_routes = physical_table_route_value.region_routes;
self.data.physical_table_id = Some(physical_table_id);

if physical_table_id == self.data.table_id() {
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(task.table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: format_full_table_name(&task.catalog, &task.schema, &task.table),
})?
.into_inner();

let engine = table_info_value.table_info.meta.engine;
// rollback only if dropping the metric physical table fails
self.data.allow_rollback = engine.as_str() == METRIC_ENGINE_NAME
}

Ok(())
}
}
7 changes: 7 additions & 0 deletions src/common/meta/src/ddl/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::collections::HashMap;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, SemanticType};
use common_procedure::Status;
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use table::metadata::{RawTableInfo, TableId};

use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
Expand Down Expand Up @@ -130,6 +131,11 @@ pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.engine(METRIC_ENGINE_NAME)
.table_options(HashMap::from([(
LOGICAL_TABLE_METADATA_KEY.to_string(),
"phy".to_string(),
)]))
.build()
.unwrap()
.into();
Expand Down Expand Up @@ -166,6 +172,7 @@ pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
.time_index("ts")
.primary_keys(["value".into()])
.table_name(name)
.engine(METRIC_ENGINE_NAME)
.build()
.unwrap()
.into();
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/test_util/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
engine: expr.engine.to_string(),
next_column_id: expr.column_defs.len() as u32,
region_numbers: vec![],
options: TableOptions::default(),
options: TableOptions::try_from_iter(&expr.table_options).unwrap(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
Expand Down
11 changes: 5 additions & 6 deletions src/common/meta/src/ddl/tests/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async fn test_on_prepare_table() {
// Drop if exists
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
procedure.on_prepare().await.unwrap();
assert!(!procedure.rollback_supported());

let task = new_drop_table_task(table_name, table_id, false);
// Drop table
Expand Down Expand Up @@ -224,9 +225,12 @@ async fn test_on_rollback() {
let task = new_drop_table_task("phy_table", physical_table_id, false);
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
procedure.on_prepare().await.unwrap();
assert!(procedure.rollback_supported());
procedure.on_delete_metadata().await.unwrap();
assert!(procedure.rollback_supported());
procedure.rollback(&ctx).await.unwrap();
// Rollback again
assert!(procedure.rollback_supported());
procedure.rollback(&ctx).await.unwrap();
let kvs = kv_backend.dump();
assert_eq!(kvs, expected_kvs);
Expand All @@ -236,12 +240,7 @@ async fn test_on_rollback() {
let task = new_drop_table_task("foo", table_ids[0], false);
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
procedure.on_prepare().await.unwrap();
procedure.on_delete_metadata().await.unwrap();
procedure.rollback(&ctx).await.unwrap();
// Rollback again
procedure.rollback(&ctx).await.unwrap();
let kvs = kv_backend.dump();
assert_eq!(kvs, expected_kvs);
assert!(!procedure.rollback_supported());
}

fn new_drop_table_task(table_name: &str, table_id: TableId, drop_if_exists: bool) -> DropTableTask {
Expand Down

0 comments on commit 27d9aa0

Please sign in to comment.