From 6b481578bd6661b0d30a531ef92ba86eaa1c3524 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 21 Feb 2024 13:38:46 +0900 Subject: [PATCH] refactor: allocate table id in the procedure (#3271) * refactor: replace TableMetadataManager with TableNameManager * refactor: allocate table id in the procedure * refactor: refactor client logical of handling retries * feat(test_util): add TestCreateTableExprBuilder * feat(test_util): add MockDatanodeManager * feat(test_util): add new_ddl_context * feat(test_util): add build_raw_table_info_from_expr * feat(test_util): add MockDatanodeManager::new * feat(procedure): add downcast_output_ref to Status * test(create_table): add tests for CreateTableProcedure on_prepare * refactor(ddl): rename handle_operate_region_error to add_peer_context_if_need * test(create_table): add tests for CreateTableProcedure on_datanode_create_regions * test(create_table): add tests for CreateTableProcedure on_create_metadata * refactor(meta): use CreateTableExprBuilder * feat(create_table): ensure number of partitions is greater than 0 * refactor: rename to add_peer_context_if_needed * feat: add context for panic * refactor: simplify the should_retry * refactor: use Option<&T> instead of &Option * refactor: move downcast_output_ref under cfg(test) * chore: fmt toml --- Cargo.lock | 1 + src/client/src/error.rs | 11 +- src/cmd/src/standalone.rs | 10 +- src/common/meta/Cargo.toml | 1 + src/common/meta/src/ddl.rs | 6 + src/common/meta/src/ddl/alter_table.rs | 4 +- .../meta/src/ddl/create_logical_tables.rs | 4 +- src/common/meta/src/ddl/create_table.rs | 142 +++++--- src/common/meta/src/ddl/drop_table.rs | 4 +- src/common/meta/src/ddl/table_meta.rs | 26 +- src/common/meta/src/ddl/test_util.rs | 19 + .../meta/src/ddl/test_util/create_table.rs | 165 +++++++++ src/common/meta/src/ddl/tests.rs | 15 + src/common/meta/src/ddl/tests/create_table.rs | 328 ++++++++++++++++++ src/common/meta/src/ddl/truncate_table.rs | 4 +- src/common/meta/src/ddl/utils.rs | 14 +- src/common/meta/src/ddl_manager.rs | 58 +--- src/common/meta/src/key/table_name.rs | 1 + src/common/meta/src/lib.rs | 2 + src/common/meta/src/test_util.rs | 105 ++++++ src/common/procedure/src/procedure.rs | 16 + src/meta-srv/src/metasrv/builder.rs | 14 +- src/meta-srv/src/procedure/tests.rs | 127 ++++--- src/meta-srv/src/procedure/utils.rs | 10 +- tests-integration/src/standalone.rs | 6 +- 25 files changed, 902 insertions(+), 191 deletions(-) create mode 100644 src/common/meta/src/ddl/test_util.rs create mode 100644 src/common/meta/src/ddl/test_util/create_table.rs create mode 100644 src/common/meta/src/ddl/tests.rs create mode 100644 src/common/meta/src/ddl/tests/create_table.rs create mode 100644 src/common/meta/src/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index 522c9790bec2..a3214306b920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1919,6 +1919,7 @@ dependencies = [ "common-grpc-expr", "common-macro", "common-procedure", + "common-procedure-test", "common-recordbatch", "common-runtime", "common-telemetry", diff --git a/src/client/src/error.rs b/src/client/src/error.rs index cac8ebb0b513..aa0b66558fd2 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -134,10 +134,17 @@ impl From for Error { impl Error { pub fn should_retry(&self) -> bool { - !matches!( + // TODO(weny): figure out each case of these codes. + matches!( self, Self::RegionServer { - code: Code::InvalidArgument, + code: Code::Cancelled, + .. + } | Self::RegionServer { + code: Code::DeadlineExceeded, + .. + } | Self::RegionServer { + code: Code::Unavailable, .. } ) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index ea963e497d1f..944f8623e017 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,7 +21,7 @@ use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; -use common_meta::ddl::table_meta::TableMetadataAllocator; +use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::DdlTaskExecutorRef; use common_meta::ddl_manager::DdlManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -419,11 +419,11 @@ impl StartCommand { let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; - let table_meta_allocator = TableMetadataAllocator::new( + let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), - table_metadata_manager.clone(), - ); + table_metadata_manager.table_name_manager().clone(), + )); let ddl_task_executor = Self::create_ddl_task_executor( table_metadata_manager, @@ -458,7 +458,7 @@ impl StartCommand { table_metadata_manager: TableMetadataManagerRef, procedure_manager: ProcedureManagerRef, datanode_manager: DatanodeManagerRef, - table_meta_allocator: TableMetadataAllocator, + table_meta_allocator: TableMetadataAllocatorRef, ) -> Result { let ddl_task_executor: DdlTaskExecutorRef = Arc::new( DdlManager::try_new( diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 8a6942ba12ca..c6feeddacb87 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -20,6 +20,7 @@ common-error.workspace = true common-grpc-expr.workspace = true common-macro.workspace = true common-procedure.workspace = true +common-procedure-test.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 8ac4bb22b945..4a8335ef3087 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionNumber, TableId}; +use self::table_meta::TableMetadataAllocatorRef; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; use crate::error::Result; @@ -32,6 +33,10 @@ pub mod create_table; mod create_table_template; pub mod drop_table; pub mod table_meta; +#[cfg(any(test, feature = "testing"))] +pub mod test_util; +#[cfg(test)] +mod tests; pub mod truncate_table; pub mod utils; @@ -73,4 +78,5 @@ pub struct DdlContext { pub cache_invalidator: CacheInvalidatorRef, pub table_metadata_manager: TableMetadataManagerRef, pub memory_region_keeper: MemoryRegionKeeperRef, + pub table_metadata_allocator: TableMetadataAllocatorRef, } diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 0c3f0930961f..adffa5258602 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -40,7 +40,7 @@ use table::requests::AlterKind; use table::table_reference::TableReference; use crate::cache_invalidator::Context; -use crate::ddl::utils::handle_operate_region_error; +use crate::ddl::utils::add_peer_context_if_needed; use crate::ddl::DdlContext; use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result}; use crate::key::table_info::TableInfoValue; @@ -226,7 +226,7 @@ impl AlterTableProcedure { // The engine will throw this code when the schema version not match. // As this procedure has locked the table, the only reason for this error // is procedure is succeeded before and is retrying. - return Err(handle_operate_region_error(datanode)(err)); + return Err(add_peer_context_if_needed(datanode)(err)); } } Ok(()) diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index a93283cfdd2c..beda7bbc20bd 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -31,7 +31,7 @@ use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; -use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path}; +use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path}; use crate::ddl::DdlContext; use crate::error::{Result, TableAlreadyExistsSnafu}; use crate::key::table_name::TableNameKey; @@ -239,7 +239,7 @@ impl CreateLogicalTablesProcedure { }; create_region_tasks.push(async move { if let Err(err) = requester.handle(request).await { - return Err(handle_operate_region_error(datanode)(err)); + return Err(add_peer_context_if_needed(datanode)(err)); } Ok(()) }); diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index c1c45c30911f..ec5e35767ebd 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -33,8 +33,8 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; -use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path}; -use crate::ddl::DdlContext; +use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path}; +use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; use crate::error::{self, Result, TableRouteNotFoundSnafu}; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; @@ -45,7 +45,6 @@ use crate::rpc::router::{ find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, }; use crate::{metrics, ClusterId}; - pub struct CreateTableProcedure { pub context: DdlContext, pub creator: TableCreator, @@ -54,16 +53,10 @@ pub struct CreateTableProcedure { impl CreateTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; - pub fn new( - cluster_id: ClusterId, - task: CreateTableTask, - table_route: TableRouteValue, - region_wal_options: HashMap, - context: DdlContext, - ) -> Self { + pub fn new(cluster_id: ClusterId, task: CreateTableTask, context: DdlContext) -> Self { Self { context, - creator: TableCreator::new(cluster_id, task, table_route, region_wal_options), + creator: TableCreator::new(cluster_id, task), } } @@ -75,7 +68,8 @@ impl CreateTableProcedure { opening_regions: vec![], }; - if let TableRouteValue::Physical(x) = &creator.data.table_route { + // Only registers regions if the table route is allocated. + if let Some(TableRouteValue::Physical(x)) = &creator.data.table_route { creator.opening_regions = creator .register_opening_regions(&context, &x.region_routes) .map_err(BoxedError::new) @@ -85,20 +79,41 @@ impl CreateTableProcedure { Ok(CreateTableProcedure { context, creator }) } - pub fn table_info(&self) -> &RawTableInfo { + fn table_info(&self) -> &RawTableInfo { &self.creator.data.task.table_info } - fn table_id(&self) -> TableId { + pub(crate) fn table_id(&self) -> TableId { self.table_info().ident.table_id } - pub fn region_wal_options(&self) -> &HashMap { - &self.creator.data.region_wal_options + fn region_wal_options(&self) -> Option<&HashMap> { + self.creator.data.region_wal_options.as_ref() } - /// Checks whether the table exists. - async fn on_prepare(&mut self) -> Result { + fn table_route(&self) -> Option<&TableRouteValue> { + self.creator.data.table_route.as_ref() + } + + #[cfg(any(test, feature = "testing"))] + pub fn set_allocated_metadata( + &mut self, + table_id: TableId, + table_route: TableRouteValue, + region_wal_options: HashMap, + ) { + self.creator + .set_allocated_metadata(table_id, table_route, region_wal_options) + } + + /// On the prepare step, it performs: + /// - Checks whether the table exists. + /// - Allocates the table id. + /// + /// Abort(non-retry): + /// - TableName exists and `create_if_not_exists` is false. + /// - Failed to allocate [TableMetadata]. + pub(crate) async fn on_prepare(&mut self) -> Result { let expr = &self.creator.data.task.create_table; let table_name_value = self .context @@ -124,6 +139,22 @@ impl CreateTableProcedure { } self.creator.data.state = CreateTableState::DatanodeCreateRegions; + let TableMetadata { + table_id, + table_route, + region_wal_options, + } = self + .context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { + cluster_id: self.creator.data.cluster_id, + }, + &self.creator.data.task, + ) + .await?; + self.creator + .set_allocated_metadata(table_id, table_route, region_wal_options); Ok(Status::executing(true)) } @@ -137,8 +168,20 @@ impl CreateTableProcedure { Ok(CreateRequestBuilder::new(template, physical_table_id)) } + /// Creates regions on datanodes + /// + /// Abort(non-retry): + /// - Failed to create [CreateRequestBuilder]. + /// - Failed to get the table route of physical table (for logical table). + /// + /// Retry: + /// - If the underlying servers returns one of the following [Code](tonic::status::Code): + /// - [Code::Cancelled](tonic::status::Code::Cancelled) + /// - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded) + /// - [Code::Unavailable](tonic::status::Code::Unavailable) pub async fn on_datanode_create_regions(&mut self) -> Result { - match &self.creator.data.table_route { + // Safety: the table route must be allocated. + match &self.creator.data.table_route.clone().unwrap() { TableRouteValue::Physical(x) => { let region_routes = x.region_routes.clone(); let request_builder = self.new_region_request_builder(None)?; @@ -170,7 +213,8 @@ impl CreateTableProcedure { region_routes: &[RegionRoute], request_builder: CreateRequestBuilder, ) -> Result { - if self.creator.data.table_route.is_physical() { + // Safety: the table_route must be allocated. + if self.table_route().unwrap().is_physical() { // Registers opening regions let guards = self .creator @@ -181,13 +225,12 @@ impl CreateTableProcedure { } let create_table_data = &self.creator.data; - let region_wal_options = &create_table_data.region_wal_options; - + // Safety: the region_wal_options must be allocated + let region_wal_options = self.region_wal_options().unwrap(); let create_table_expr = &create_table_data.task.create_table; let catalog = &create_table_expr.catalog_name; let schema = &create_table_expr.schema_name; let storage_path = region_storage_path(catalog, schema); - let leaders = find_leaders(region_routes); let mut create_region_tasks = Vec::with_capacity(leaders.len()); @@ -203,7 +246,6 @@ impl CreateTableProcedure { storage_path.clone(), region_wal_options, )?; - requests.push(PbRegionRequest::Create(create_region_request)); } @@ -218,12 +260,11 @@ impl CreateTableProcedure { let datanode = datanode.clone(); let requester = requester.clone(); - create_region_tasks.push(async move { - if let Err(err) = requester.handle(request).await { - return Err(handle_operate_region_error(datanode)(err)); - } - Ok(()) + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(datanode)) }); } } @@ -240,18 +281,21 @@ impl CreateTableProcedure { Ok(Status::executing(false)) } + /// Creates table metadata + /// + /// Abort(not-retry): + /// - Failed to create table metadata. async fn on_create_metadata(&self) -> Result { let table_id = self.table_id(); let manager = &self.context.table_metadata_manager; let raw_table_info = self.table_info().clone(); - let region_wal_options = self.region_wal_options().clone(); + // Safety: the region_wal_options must be allocated. + let region_wal_options = self.region_wal_options().unwrap().clone(); + // Safety: the table_route must be allocated. + let table_route = self.table_route().unwrap().clone(); manager - .create_table_metadata( - raw_table_info, - self.creator.data.table_route.clone(), - region_wal_options, - ) + .create_table_metadata(raw_table_info, table_route, region_wal_options) .await?; info!("Created table metadata for table {table_id}"); @@ -303,19 +347,14 @@ pub struct TableCreator { } impl TableCreator { - pub fn new( - cluster_id: u64, - task: CreateTableTask, - table_route: TableRouteValue, - region_wal_options: HashMap, - ) -> Self { + pub fn new(cluster_id: u64, task: CreateTableTask) -> Self { Self { data: CreateTableData { state: CreateTableState::Prepare, cluster_id, task, - table_route, - region_wal_options, + table_route: None, + region_wal_options: None, }, opening_regions: vec![], } @@ -347,6 +386,17 @@ impl TableCreator { } Ok(opening_region_guards) } + + fn set_allocated_metadata( + &mut self, + table_id: TableId, + table_route: TableRouteValue, + region_wal_options: HashMap, + ) { + self.data.task.table_info.ident.table_id = table_id; + self.data.table_route = Some(table_route); + self.data.region_wal_options = Some(region_wal_options); + } } #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] @@ -363,8 +413,10 @@ pub enum CreateTableState { pub struct CreateTableData { pub state: CreateTableState, pub task: CreateTableTask, - table_route: TableRouteValue, - pub region_wal_options: HashMap, + /// None stands for not allocated yet. + table_route: Option, + /// None stands for not allocated yet. + pub region_wal_options: Option>, pub cluster_id: ClusterId, } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index ceb47193d7c7..5e89fee10ff2 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -34,7 +34,7 @@ use table::table_reference::TableReference; use super::utils::handle_retry_error; use crate::cache_invalidator::Context; -use crate::ddl::utils::handle_operate_region_error; +use crate::ddl::utils::add_peer_context_if_needed; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_info::TableInfoValue; @@ -223,7 +223,7 @@ impl DropTableProcedure { drop_region_tasks.push(async move { if let Err(err) = requester.handle(request).await { if err.status_code() != StatusCode::RegionNotFound { - return Err(handle_operate_region_error(datanode)(err)); + return Err(add_peer_context_if_needed(datanode)(err)); } } Ok(()) diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index 5ade45c22f6b..f35adee5c4f7 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -23,21 +23,22 @@ use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::ddl::{TableMetadata, TableMetadataAllocatorContext}; -use crate::error::{Result, TableNotFoundSnafu, UnsupportedSnafu}; -use crate::key::table_name::TableNameKey; +use crate::error::{self, Result, TableNotFoundSnafu, UnsupportedSnafu}; +use crate::key::table_name::{TableNameKey, TableNameManager}; use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue}; -use crate::key::TableMetadataManagerRef; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{Region, RegionRoute}; use crate::sequence::SequenceRef; use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef}; +pub type TableMetadataAllocatorRef = Arc; + #[derive(Clone)] pub struct TableMetadataAllocator { table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocatorRef, - table_metadata_manager: TableMetadataManagerRef, + table_name_manager: TableNameManager, peer_allocator: PeerAllocatorRef, } @@ -45,12 +46,12 @@ impl TableMetadataAllocator { pub fn new( table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocatorRef, - table_metadata_manager: TableMetadataManagerRef, + table_name_manager: TableNameManager, ) -> Self { Self::with_peer_allocator( table_id_sequence, wal_options_allocator, - table_metadata_manager, + table_name_manager, Arc::new(NoopPeerAllocator), ) } @@ -58,13 +59,13 @@ impl TableMetadataAllocator { pub fn with_peer_allocator( table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocatorRef, - table_metadata_manager: TableMetadataManagerRef, + table_name_manager: TableNameManager, peer_allocator: PeerAllocatorRef, ) -> Self { Self { table_id_sequence, wal_options_allocator, - table_metadata_manager, + table_name_manager, peer_allocator, } } @@ -123,6 +124,12 @@ impl TableMetadataAllocator { task: &CreateTableTask, ) -> Result { let regions = task.partitions.len(); + ensure!( + regions > 0, + error::UnexpectedSnafu { + err_msg: "The number of partitions must be greater than 0" + } + ); let table_route = if task.create_table.engine == METRIC_ENGINE && let Some(physical_table_name) = task @@ -131,8 +138,7 @@ impl TableMetadataAllocator { .get(LOGICAL_TABLE_METADATA_KEY) { let physical_table_id = self - .table_metadata_manager - .table_name_manager() + .table_name_manager .get(TableNameKey::new( &task.create_table.catalog_name, &task.create_table.schema_name, diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs new file mode 100644 index 000000000000..239a655fb2cb --- /dev/null +++ b/src/common/meta/src/ddl/test_util.rs @@ -0,0 +1,19 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod create_table; + +pub use create_table::{ + TestColumnDef, TestColumnDefBuilder, TestCreateTableExpr, TestCreateTableExprBuilder, +}; diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs new file mode 100644 index 000000000000..12b13d74f93c --- /dev/null +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::column_def::try_as_column_schema; +use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType}; +use chrono::DateTime; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE}; +use datatypes::schema::RawSchema; +use derive_builder::Builder; +use store_api::storage::TableId; +use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; +use table::requests::TableOptions; + +#[derive(Default, Builder)] +pub struct TestColumnDef { + #[builder(setter(into), default)] + name: String, + data_type: ColumnDataType, + #[builder(default)] + is_nullable: bool, + semantic_type: SemanticType, + #[builder(setter(into), default)] + comment: String, +} + +impl From for ColumnDef { + fn from( + TestColumnDef { + name, + data_type, + is_nullable, + semantic_type, + comment, + }: TestColumnDef, + ) -> Self { + Self { + name, + data_type: data_type as i32, + is_nullable, + default_constraint: vec![], + semantic_type: semantic_type as i32, + comment, + datatype_extension: None, + } + } +} + +#[derive(Default, Builder)] +#[builder(default)] +pub struct TestCreateTableExpr { + #[builder(setter(into), default = "DEFAULT_CATALOG_NAME.to_string()")] + catalog_name: String, + #[builder(setter(into), default = "DEFAULT_SCHEMA_NAME.to_string()")] + schema_name: String, + #[builder(setter(into))] + table_name: String, + #[builder(setter(into))] + desc: String, + #[builder(setter(into))] + column_defs: Vec, + #[builder(setter(into))] + time_index: String, + #[builder(setter(into))] + primary_keys: Vec, + create_if_not_exists: bool, + table_options: HashMap, + table_id: Option, + #[builder(setter(into), default = "MITO2_ENGINE.to_string()")] + engine: String, +} + +impl From for CreateTableExpr { + fn from( + TestCreateTableExpr { + catalog_name, + schema_name, + table_name, + desc, + column_defs, + time_index, + primary_keys, + create_if_not_exists, + table_options, + table_id, + engine, + }: TestCreateTableExpr, + ) -> Self { + Self { + catalog_name, + schema_name, + table_name, + desc, + column_defs, + time_index, + primary_keys, + create_if_not_exists, + table_options, + table_id: table_id.map(|id| api::v1::TableId { id }), + engine, + } + } +} + +/// Builds [RawTableInfo] from [CreateTableExpr]. +pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo { + RawTableInfo { + ident: TableIdent { + table_id: expr + .table_id + .as_ref() + .map(|table_id| table_id.id) + .unwrap_or(0), + version: 1, + }, + name: expr.table_name.to_string(), + desc: Some(expr.desc.to_string()), + catalog_name: expr.catalog_name.to_string(), + schema_name: expr.schema_name.to_string(), + meta: RawTableMeta { + schema: RawSchema { + column_schemas: expr + .column_defs + .iter() + .map(|column| try_as_column_schema(column).unwrap()) + .collect(), + timestamp_index: expr + .column_defs + .iter() + .position(|column| column.semantic_type() == SemanticType::Timestamp), + version: 0, + }, + primary_key_indices: expr + .primary_keys + .iter() + .map(|key| { + expr.column_defs + .iter() + .position(|column| &column.name == key) + .unwrap() + }) + .collect(), + value_indices: vec![], + engine: expr.engine.to_string(), + next_column_id: expr.column_defs.len() as u32, + region_numbers: vec![], + options: TableOptions::default(), + created_on: DateTime::default(), + partition_key_indices: vec![], + }, + table_type: TableType::Base, + } +} diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs new file mode 100644 index 000000000000..2b94057c8c5c --- /dev/null +++ b/src/common/meta/src/ddl/tests.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod create_table; diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs new file mode 100644 index 000000000000..d31c78479da6 --- /dev/null +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -0,0 +1,328 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::meta::Partition; +use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::{ColumnDataType, SemanticType}; +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; +use common_procedure_test::MockContextProvider; +use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::debug; + +use crate::ddl::create_table::CreateTableProcedure; +use crate::ddl::test_util::create_table::build_raw_table_info_from_expr; +use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; +use crate::error; +use crate::error::{Error, Result}; +use crate::key::table_route::TableRouteValue; +use crate::peer::Peer; +use crate::rpc::ddl::CreateTableTask; +use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager}; + +#[async_trait::async_trait] +impl MockDatanodeHandler for () { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { + unreachable!() + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +fn test_create_table_task(name: &str) -> CreateTableTask { + let create_table = TestCreateTableExprBuilder::default() + .column_defs([ + TestColumnDefBuilder::default() + .name("ts") + .data_type(ColumnDataType::TimestampMillisecond) + .semantic_type(SemanticType::Timestamp) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("host") + .data_type(ColumnDataType::String) + .semantic_type(SemanticType::Tag) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("cpu") + .data_type(ColumnDataType::Float64) + .semantic_type(SemanticType::Field) + .build() + .unwrap() + .into(), + ]) + .time_index("ts") + .primary_keys(["host".into()]) + .table_name(name) + .build() + .unwrap() + .into(); + let table_info = build_raw_table_info_from_expr(&create_table); + CreateTableTask { + create_table, + // Single region + partitions: vec![Partition { + column_list: vec![], + value_list: vec![], + }], + table_info, + } +} + +#[tokio::test] +async fn test_on_prepare_table_exists_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let task = test_create_table_task("foo"); + assert!(!task.create_table.create_if_not_exists); + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, Error::TableAlreadyExists { .. }); + assert_eq!(err.status_code(), StatusCode::TableAlreadyExists); +} + +#[tokio::test] +async fn test_on_prepare_with_create_if_table_exists() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let mut task = test_create_table_task("foo"); + task.create_table.create_if_not_exists = true; + task.table_info.ident.table_id = 1024; + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Done { output: Some(..) }); + let table_id = *status.downcast_output_ref::().unwrap(); + assert_eq!(table_id, 1024); +} + +#[tokio::test] +async fn test_on_prepare_without_create_if_table_exists() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let mut task = test_create_table_task("foo"); + task.create_table.create_if_not_exists = true; + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + assert_eq!(procedure.table_id(), 1024); +} + +#[tokio::test] +async fn test_on_prepare_with_no_partition_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let mut task = test_create_table_task("foo"); + task.partitions = vec![]; + task.create_table.create_if_not_exists = true; + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + assert!(err + .to_string() + .contains("The number of partitions must be greater than 0"),); +} + +#[derive(Clone)] +pub struct RetryErrorDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for RetryErrorDatanodeHandler { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning retry later for request: {request:?}, peer: {peer:?}"); + Err(Error::RetryLater { + source: BoxedError::new( + error::UnexpectedSnafu { + err_msg: "retry later", + } + .build(), + ), + }) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[tokio::test] +async fn test_on_datanode_create_regions_should_retry() { + common_telemetry::init_default_ut_logging(); + let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let task = test_create_table_task("foo"); + assert!(!task.create_table.create_if_not_exists); + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + let error = procedure.execute(&ctx).await.unwrap_err(); + assert!(error.is_retry_later()); +} + +#[derive(Clone)] +pub struct UnexpectedErrorDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning mock error for request: {request:?}, peer: {peer:?}"); + error::UnexpectedSnafu { + err_msg: "mock error", + } + .fail() + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[tokio::test] +async fn test_on_datanode_create_regions_should_not_retry() { + common_telemetry::init_default_ut_logging(); + let datanode_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let task = test_create_table_task("foo"); + assert!(!task.create_table.create_if_not_exists); + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + let error = procedure.execute(&ctx).await.unwrap_err(); + assert!(!error.is_retry_later()); +} + +#[derive(Clone)] +pub struct NaiveDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for NaiveDatanodeHandler { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); + Ok(0) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[tokio::test] +async fn test_on_create_metadata_error() { + common_telemetry::init_default_ut_logging(); + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let task = test_create_table_task("foo"); + assert!(!task.create_table.create_if_not_exists); + let mut procedure = CreateTableProcedure::new(cluster_id, task.clone(), ddl_context.clone()); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + let mut task = task; + // Creates table metadata(different with the task) + task.table_info.ident.table_id = procedure.table_id(); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + // Triggers procedure to create table metadata + let error = procedure.execute(&ctx).await.unwrap_err(); + assert!(!error.is_retry_later()); +} + +#[tokio::test] +async fn test_on_create_metadata() { + common_telemetry::init_default_ut_logging(); + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let task = test_create_table_task("foo"); + assert!(!task.create_table.create_if_not_exists); + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + // Triggers procedure to create table metadata + let status = procedure.execute(&ctx).await.unwrap(); + let table_id = status.downcast_output_ref::().unwrap(); + assert_eq!(*table_id, 1024); +} diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 609feef26ffa..478efe7808e9 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -31,7 +31,7 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; use super::utils::handle_retry_error; -use crate::ddl::utils::handle_operate_region_error; +use crate::ddl::utils::add_peer_context_if_needed; use crate::ddl::DdlContext; use crate::error::{Result, TableNotFoundSnafu}; use crate::key::table_info::TableInfoValue; @@ -170,7 +170,7 @@ impl TruncateTableProcedure { truncate_region_tasks.push(async move { if let Err(err) = requester.handle(request).await { - return Err(handle_operate_region_error(datanode)(err)); + return Err(add_peer_context_if_needed(datanode)(err)); } Ok(()) }); diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index f93feb9b6778..e8fc3ef5fe38 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -27,19 +27,17 @@ use crate::key::TableMetadataManagerRef; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; -pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(Error) -> Error { +/// Adds [Peer] context if the error is unretryable. +pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error { move |err| { - if matches!(err, Error::RetryLater { .. }) { - Error::RetryLater { - source: BoxedError::new(err), - } - } else { - Error::OperateDatanode { + if !err.is_retry_later() { + return Error::OperateDatanode { location: location!(), peer: datanode, source: BoxedError::new(err), - } + }; } + err } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index d741e109d97f..efad73dae6a3 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, tracing}; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::{RegionNumber, TableId}; +use store_api::storage::TableId; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; @@ -27,12 +26,9 @@ use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; -use crate::ddl::table_meta::TableMetadataAllocator; +use crate::ddl::table_meta::TableMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; -use crate::ddl::{ - utils, DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, - TableMetadataAllocatorContext, -}; +use crate::ddl::{utils, DdlContext, DdlTaskExecutor, ExecutorContext}; use crate::error::{ self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu, @@ -62,7 +58,7 @@ pub struct DdlManager { datanode_manager: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocator, + table_metadata_allocator: TableMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, } @@ -73,7 +69,7 @@ impl DdlManager { datanode_clients: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocator, + table_metadata_allocator: TableMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, ) -> Result { let manager = Self { @@ -100,6 +96,7 @@ impl DdlManager { cache_invalidator: self.cache_invalidator.clone(), table_metadata_manager: self.table_metadata_manager.clone(), memory_region_keeper: self.memory_region_keeper.clone(), + table_metadata_allocator: self.table_metadata_allocator.clone(), } } @@ -205,18 +202,10 @@ impl DdlManager { &self, cluster_id: ClusterId, create_table_task: CreateTableTask, - table_route: TableRouteValue, - region_wal_options: HashMap, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); - let procedure = CreateTableProcedure::new( - cluster_id, - create_table_task, - table_route, - region_wal_options, - context, - ); + let procedure = CreateTableProcedure::new(cluster_id, create_table_task, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -470,31 +459,10 @@ async fn handle_drop_table_task( async fn handle_create_table_task( ddl_manager: &DdlManager, cluster_id: ClusterId, - mut create_table_task: CreateTableTask, + create_table_task: CreateTableTask, ) -> Result { - let table_meta = ddl_manager - .table_metadata_allocator - .create( - &TableMetadataAllocatorContext { cluster_id }, - &create_table_task, - ) - .await?; - - let TableMetadata { - table_id, - table_route, - region_wal_options, - } = table_meta; - - create_table_task.table_info.ident.table_id = table_id; - let (id, output) = ddl_manager - .submit_create_table_task( - cluster_id, - create_table_task, - table_route, - region_wal_options, - ) + .submit_create_table_task(cluster_id, create_table_task) .await?; let procedure_id = id.to_string(); @@ -644,12 +612,12 @@ mod tests { procedure_manager.clone(), Arc::new(DummyDatanodeManager), Arc::new(DummyCacheInvalidator), - table_metadata_manager, - TableMetadataAllocator::new( + table_metadata_manager.clone(), + Arc::new(TableMetadataAllocator::new( Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), Arc::new(WalOptionsAllocator::default()), - Arc::new(TableMetadataManager::new(kv_backend)), - ), + table_metadata_manager.table_name_manager().clone(), + )), Arc::new(MemoryRegionKeeper::default()), ); diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index eb8ca7a8c015..35bf0a5a3bb0 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -144,6 +144,7 @@ impl TableNameValue { } } +#[derive(Clone)] pub struct TableNameManager { kv_backend: KvBackendRef, } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index b990f456217d..7f515b79cd55 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -36,6 +36,8 @@ pub mod rpc; pub mod sequence; pub mod state_store; pub mod table_name; +#[cfg(any(test, feature = "testing"))] +pub mod test_util; pub mod util; pub mod wal_options_allocator; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs new file mode 100644 index 000000000000..20a006ebce49 --- /dev/null +++ b/src/common/meta/src/test_util.rs @@ -0,0 +1,105 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::region::{QueryRequest, RegionRequest}; +use common_recordbatch::SendableRecordBatchStream; + +use crate::cache_invalidator::DummyCacheInvalidator; +use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef}; +use crate::ddl::table_meta::TableMetadataAllocator; +use crate::ddl::DdlContext; +use crate::error::Result; +use crate::key::TableMetadataManager; +use crate::kv_backend::memory::MemoryKvBackend; +use crate::peer::Peer; +use crate::region_keeper::MemoryRegionKeeper; +use crate::sequence::SequenceBuilder; +use crate::wal_options_allocator::WalOptionsAllocator; + +pub type AffectedRows = u64; + +#[async_trait::async_trait] +pub trait MockDatanodeHandler: Sync + Send + Clone { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result; + + async fn handle_query( + &self, + peer: &Peer, + request: QueryRequest, + ) -> Result; +} + +/// A mock struct implements [DatanodeManager]. +#[derive(Clone)] +pub struct MockDatanodeManager { + handler: T, +} + +impl MockDatanodeManager { + pub fn new(handler: T) -> Self { + Self { handler } + } +} + +/// A mock struct implements [Datanode]. +#[derive(Clone)] +struct MockDatanode { + peer: Peer, + handler: T, +} + +#[async_trait::async_trait] +impl Datanode for MockDatanode { + async fn handle(&self, request: RegionRequest) -> Result { + self.handler.handle(&self.peer, request).await + } + + async fn handle_query(&self, request: QueryRequest) -> Result { + self.handler.handle_query(&self.peer, request).await + } +} + +#[async_trait::async_trait] +impl DatanodeManager for MockDatanodeManager { + async fn datanode(&self, peer: &Peer) -> DatanodeRef { + Arc::new(MockDatanode { + peer: peer.clone(), + handler: self.handler.clone(), + }) + } +} + +/// Returns a test purpose [DdlContext]. +pub fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + + DdlContext { + datanode_manager, + cache_invalidator: Arc::new(DummyCacheInvalidator), + memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), + table_metadata_allocator: Arc::new(TableMetadataAllocator::new( + Arc::new( + SequenceBuilder::new("test", kv_backend) + .initial(1024) + .build(), + ), + Arc::new(WalOptionsAllocator::default()), + table_metadata_manager.table_name_manager().clone(), + )), + table_metadata_manager, + } +} diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 93d8711edd5a..b46428eb6860 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -57,6 +57,22 @@ impl Status { Status::Done { output: None } } + #[cfg(any(test, feature = "testing"))] + /// Downcasts [Status::Done]'s output to &T + /// #Panic: + /// - if [Status] is not the [Status::Done]. + /// - if the output is None. + pub fn downcast_output_ref(&self) -> Option<&T> { + if let Status::Done { output } = self { + output + .as_ref() + .expect("Try to downcast the output of Status::Done, but the output is None") + .downcast_ref() + } else { + panic!("Expected the Status::Done, but got: {:?}", self) + } + } + /// Returns a [Status::Done] with output. pub fn done_with_output(output: T) -> Status { Status::Done { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ed727132ea83..0a38bc3f37be 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -21,7 +21,7 @@ use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc::channel_manager::ChannelConfig; use common_meta::datanode_manager::DatanodeManagerRef; -use common_meta::ddl::table_meta::TableMetadataAllocator; +use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -211,7 +211,7 @@ impl MetaSrvBuilder { options.wal.clone(), kv_backend.clone(), )); - let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { + let table_metadata_allocator = Arc::new(table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) @@ -225,10 +225,10 @@ impl MetaSrvBuilder { TableMetadataAllocator::with_peer_allocator( sequence, wal_options_allocator.clone(), - table_metadata_manager.clone(), + table_metadata_manager.table_name_manager().clone(), peer_allocator, ) - }); + })); let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); @@ -238,7 +238,7 @@ impl MetaSrvBuilder { &procedure_manager, &mailbox, &table_metadata_manager, - table_metadata_allocator, + &table_metadata_allocator, &opening_region_keeper, )?; @@ -386,7 +386,7 @@ fn build_ddl_manager( procedure_manager: &ProcedureManagerRef, mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocator, + table_metadata_allocator: &TableMetadataAllocatorRef, memory_region_keeper: &MemoryRegionKeeperRef, ) -> Result { let datanode_clients = datanode_clients.unwrap_or_else(|| { @@ -413,7 +413,7 @@ fn build_ddl_manager( datanode_clients, cache_invalidator, table_metadata_manager.clone(), - table_metadata_allocator, + table_metadata_allocator.clone(), memory_region_keeper.clone(), ) .context(error::InitDdlManagerSnafu)?, diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 827c8e866271..39b984476a43 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -17,11 +17,12 @@ use std::sync::{Arc, Mutex}; use api::v1::add_column_location::LocationType; use api::v1::alter_expr::Kind; +use api::v1::meta::Partition; use api::v1::region::region_request::{self, Body as PbRegionRequest}; use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; use api::v1::{ region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ColumnDataType, - ColumnDef as PbColumnDef, CreateTableExpr, DropColumn, DropColumns, SemanticType, + ColumnDef as PbColumnDef, DropColumn, DropColumns, SemanticType, }; use client::client_manager::DatanodeClients; use common_catalog::consts::MITO2_ENGINE; @@ -30,6 +31,8 @@ use common_meta::ddl::alter_table::AlterTableProcedure; use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState}; use common_meta::ddl::create_table::*; use common_meta::ddl::drop_table::DropTableProcedure; +use common_meta::ddl::test_util::create_table::build_raw_table_info_from_expr; +use common_meta::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::DeserializedValueWithBytes; @@ -42,68 +45,74 @@ use crate::procedure::utils::mock::EchoRegionServer; use crate::procedure::utils::test_data; fn create_table_task(table_name: Option<&str>) -> CreateTableTask { - let create_table_expr = CreateTableExpr { - catalog_name: "my_catalog".to_string(), - schema_name: "my_schema".to_string(), - table_name: table_name.unwrap_or("my_table").to_string(), - desc: "blabla".to_string(), - column_defs: vec![ - PbColumnDef { - name: "ts".to_string(), - data_type: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Timestamp as i32, - comment: String::new(), - ..Default::default() - }, - PbColumnDef { - name: "my_tag1".to_string(), - data_type: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - comment: String::new(), - ..Default::default() - }, - PbColumnDef { - name: "my_tag2".to_string(), - data_type: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - comment: String::new(), - ..Default::default() - }, - PbColumnDef { - name: "my_field_column".to_string(), - data_type: ColumnDataType::Int32 as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: String::new(), - ..Default::default() - }, - ], - time_index: "ts".to_string(), - primary_keys: vec!["my_tag2".to_string(), "my_tag1".to_string()], - create_if_not_exists: false, - table_options: HashMap::new(), - table_id: None, - engine: MITO2_ENGINE.to_string(), - }; - - CreateTableTask::new(create_table_expr, vec![], test_data::new_table_info()) + let expr = TestCreateTableExprBuilder::default() + .catalog_name("my_catalog") + .schema_name("my_schema") + .table_name(table_name.unwrap_or("my_table")) + .desc("blabla") + .column_defs([ + TestColumnDefBuilder::default() + .name("ts") + .data_type(ColumnDataType::TimestampMillisecond) + .is_nullable(false) + .semantic_type(SemanticType::Timestamp) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("my_tag1") + .data_type(ColumnDataType::String) + .is_nullable(true) + .semantic_type(SemanticType::Tag) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("my_tag2") + .data_type(ColumnDataType::String) + .is_nullable(true) + .semantic_type(SemanticType::Tag) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("my_field_column") + .data_type(ColumnDataType::Int32) + .is_nullable(true) + .semantic_type(SemanticType::Field) + .build() + .unwrap() + .into(), + ]) + .time_index("ts") + .primary_keys(vec!["my_tag2".into(), "my_tag1".into()]) + .build() + .unwrap() + .into(); + + let table_info = build_raw_table_info_from_expr(&expr); + CreateTableTask::new( + expr, + vec![Partition { + column_list: vec![], + value_list: vec![], + }], + table_info, + ) } #[test] fn test_region_request_builder() { - let procedure = CreateTableProcedure::new( + let mut procedure = CreateTableProcedure::new( 1, create_table_task(None), + test_data::new_ddl_context(Arc::new(DatanodeClients::default())), + ); + + procedure.set_allocated_metadata( + 1024, TableRouteValue::physical(test_data::new_region_routes()), HashMap::default(), - test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ); let template = procedure.new_region_request_builder(None).unwrap(); @@ -192,11 +201,15 @@ async fn test_on_datanode_create_regions() { let mut procedure = CreateTableProcedure::new( 1, create_table_task(None), - TableRouteValue::physical(region_routes), - HashMap::default(), test_data::new_ddl_context(datanode_manager), ); + procedure.set_allocated_metadata( + 42, + TableRouteValue::physical(test_data::new_region_routes()), + HashMap::default(), + ); + let expected_created_regions = Arc::new(Mutex::new(HashSet::from([ RegionId::new(42, 1), RegionId::new(42, 2), diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index b8565c8dc742..0a9ace054c45 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -105,6 +105,7 @@ pub mod test_data { use chrono::DateTime; use common_catalog::consts::MITO2_ENGINE; use common_meta::datanode_manager::DatanodeManagerRef; + use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::DdlContext; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -112,6 +113,7 @@ pub mod test_data { use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::SequenceBuilder; + use common_meta::wal_options_allocator::WalOptionsAllocator; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; @@ -192,6 +194,7 @@ pub mod test_data { SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); DdlContext { datanode_manager, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( @@ -200,8 +203,13 @@ pub mod test_data { server_addr: "127.0.0.1:4321".to_string(), }, )), - table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), + table_metadata_manager: table_metadata_manager.clone(), memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), + table_metadata_allocator: Arc::new(TableMetadataAllocator::new( + Arc::new(SequenceBuilder::new("test", kv_backend).build()), + Arc::new(WalOptionsAllocator::default()), + table_metadata_manager.table_name_manager().clone(), + )), } } } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 16b78aae346e..b0749c3a23f3 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -136,11 +136,11 @@ impl GreptimeDbStandaloneBuilder { mix_options.wal_meta.clone(), kv_backend.clone(), )); - let table_meta_allocator = TableMetadataAllocator::new( + let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), - table_metadata_manager.clone(), - ); + table_metadata_manager.table_name_manager().clone(), + )); let ddl_task_executor = Arc::new( DdlManager::try_new(