From 95ec9e1f078005a46be35927dd98d95cc77227be Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 7 Feb 2024 10:29:07 +0000 Subject: [PATCH 1/6] refactor: refactor the create logical tables --- .../meta/src/ddl/create_logical_tables.rs | 142 +++++++++--------- src/common/meta/src/ddl/table_meta.rs | 2 +- src/common/meta/src/key/table_route.rs | 5 + src/common/meta/src/rpc/ddl.rs | 5 + 4 files changed, 85 insertions(+), 69 deletions(-) diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index beda7bbc20bd..18f0be9fd0a8 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -66,7 +66,16 @@ impl CreateLogicalTablesProcedure { Ok(Self { context, creator }) } - async fn on_prepare(&mut self) -> Result { + /// On the prepares step, it performs: + /// - Checks whether physical table exists. + /// - Checks whether logical tables exist. + /// - Allocates the table ids. + /// + /// Abort(non-retry): + /// - The physical table does not exist. + /// - Failed to check whether tables exist. + /// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`. + pub(crate) async fn on_prepare(&mut self) -> Result { let manager = &self.context.table_metadata_manager; // Sets physical region numbers @@ -80,7 +89,7 @@ impl CreateLogicalTablesProcedure { .data .set_physical_region_numbers(physical_region_numbers); - // Checks if the tables exists + // Checks if the tables exist let table_name_keys = self .creator .data @@ -96,24 +105,9 @@ impl CreateLogicalTablesProcedure { .map(|x| x.map(|x| x.table_id())) .collect::>(); - // Sets table ids already exists - self.creator - .data - .set_table_ids_already_exists(already_exists_tables_ids); - - // If all tables do not exists, we can create them directly. - if self.creator.data.is_all_tables_not_exists() { - self.creator.data.state = CreateTablesState::DatanodeCreateRegions; - return Ok(Status::executing(true)); - } - - // Filter out the tables that already exist. - let tasks = &self.creator.data.tasks; - let mut filtered_tasks = Vec::with_capacity(tasks.len()); - for (task, table_id) in tasks - .iter() - .zip(self.creator.data.table_ids_already_exists().iter()) - { + // Validates the tasks + let tasks = &mut self.creator.data.tasks; + for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) { if table_id.is_some() { // If a table already exists, we just ignore it. ensure!( @@ -124,17 +118,34 @@ impl CreateLogicalTablesProcedure { ); continue; } - filtered_tasks.push(task.clone()); } - // Resets tasks - self.creator.data.tasks = filtered_tasks; - if self.creator.data.tasks.is_empty() { - // If all tables already exist, we can skip the `DatanodeCreateRegions` stage. - self.creator.data.state = CreateTablesState::CreateMetadata; - return Ok(Status::executing(true)); + // If all tables already exist, returns the table_ids. + if already_exists_tables_ids.iter().all(Option::is_some) { + return Ok(Status::done_with_output( + already_exists_tables_ids + .into_iter() + .flatten() + .collect::>(), + )); } + // Allocates table ids + for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) { + let table_id = if let Some(table_id) = table_id { + *table_id + } else { + self.context + .table_metadata_allocator + .allocate_table_id(task) + .await? + }; + task.set_table_id(table_id); + } + + self.creator + .data + .set_table_ids_already_exists(already_exists_tables_ids); self.creator.data.state = CreateTablesState::DatanodeCreateRegions; Ok(Status::executing(true)) } @@ -154,15 +165,14 @@ impl CreateLogicalTablesProcedure { pub async fn on_create_metadata(&self) -> Result { let manager = &self.context.table_metadata_manager; - let physical_table_id = self.creator.data.physical_table_id(); - let tables_data = self.creator.data.all_tables_data(); - let num_tables = tables_data.len(); + let remaining_tasks = self.creator.data.remaining_tasks(); + let num_tables = remaining_tasks.len(); if num_tables > 0 { let chunk_size = manager.max_logical_tables_per_batch(); if num_tables > chunk_size { - let chunks = tables_data + let chunks = remaining_tasks .into_iter() .chunks(chunk_size) .into_iter() @@ -172,11 +182,21 @@ impl CreateLogicalTablesProcedure { manager.create_logical_tables_metadata(chunk).await?; } } else { - manager.create_logical_tables_metadata(tables_data).await?; + manager + .create_logical_tables_metadata(remaining_tasks) + .await?; } } - let table_ids = self.creator.data.real_table_ids(); + // The `table_id` MUST be collected after the [Prepare::Prepare], + // ensures the all `table_id`s have been allocated. + let table_ids = self + .creator + .data + .tasks + .iter() + .map(|task| task.table_info.ident.table_id) + .collect::>(); info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}"); @@ -310,17 +330,13 @@ impl TablesCreator { tasks: Vec, physical_table_id: TableId, ) -> Self { - let table_ids_from_tasks = tasks - .iter() - .map(|task| task.table_info.ident.table_id) - .collect::>(); - let len = table_ids_from_tasks.len(); + let len = tasks.len(); + Self { data: CreateTablesData { cluster_id, state: CreateTablesState::Prepare, tasks, - table_ids_from_tasks, table_ids_already_exists: vec![None; len], physical_table_id, physical_region_numbers: vec![], @@ -334,7 +350,6 @@ pub struct CreateTablesData { cluster_id: ClusterId, state: CreateTablesState, tasks: Vec, - table_ids_from_tasks: Vec, // Because the table_id is allocated before entering the distributed lock, // it needs to recheck if the table exists when creating a table. // If it does exist, then the table_id needs to be replaced with the existing one. @@ -360,24 +375,6 @@ impl CreateTablesData { self.table_ids_already_exists = table_ids_already_exists; } - fn table_ids_already_exists(&self) -> &[Option] { - &self.table_ids_already_exists - } - - fn is_all_tables_not_exists(&self) -> bool { - self.table_ids_already_exists.iter().all(Option::is_none) - } - - pub fn real_table_ids(&self) -> Vec { - self.table_ids_from_tasks - .iter() - .zip(self.table_ids_already_exists.iter()) - .map(|(table_id_from_task, table_id_already_exists)| { - table_id_already_exists.unwrap_or(*table_id_from_task) - }) - .collect::>() - } - fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> { self.tasks .iter() @@ -385,18 +382,27 @@ impl CreateTablesData { .collect::>() } - fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> { + /// Returns the remaining tasks. + /// The length of tasks must be greater than 0. + fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> { self.tasks .iter() - .map(|task| { - let table_info = task.table_info.clone(); - let region_ids = self - .physical_region_numbers - .iter() - .map(|region_number| RegionId::new(table_info.ident.table_id, *region_number)) - .collect(); - let table_route = TableRouteValue::logical(self.physical_table_id, region_ids); - (table_info, table_route) + .zip(self.table_ids_already_exists.iter()) + .flat_map(|(task, table_id)| { + if table_id.is_none() { + let table_info = task.table_info.clone(); + let region_ids = self + .physical_region_numbers + .iter() + .map(|region_number| { + RegionId::new(table_info.ident.table_id, *region_number) + }) + .collect(); + let table_route = TableRouteValue::logical(self.physical_table_id, region_ids); + Some((table_info, table_route)) + } else { + None + } }) .collect::>() } diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index f35adee5c4f7..119395b42404 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -70,7 +70,7 @@ impl TableMetadataAllocator { } } - async fn allocate_table_id(&self, task: &CreateTableTask) -> Result { + pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result { let table_id = if let Some(table_id) = &task.create_table.table_id { let table_id = table_id.id; diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index b1526704531f..a932f1474000 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -325,6 +325,11 @@ impl TableRouteManager { } } + /// Returns the [TableRouteValue::Physical] of table. + /// + /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: + /// - the physical table(`logical_or_physical_table_id`) does not exists + /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exists. pub async fn get_physical_table_route( &self, logical_or_physical_table_id: TableId, diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 957bb7e4bfe0..b88867bd3d09 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -363,6 +363,11 @@ impl CreateTableTask { table: &table.table_name, } } + + /// Sets the `table_info`'s table_id. + pub fn set_table_id(&mut self, table_id: TableId) { + self.table_info.ident.table_id = table_id; + } } impl Serialize for CreateTableTask { From 7eb010c6d83fc6c6de1d6394136eb2b7c0b82bef Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 7 Feb 2024 10:30:14 +0000 Subject: [PATCH 2/6] test(create_logical_tables): add tests for on_prepare --- src/common/meta/src/ddl/tests.rs | 1 + .../src/ddl/tests/create_logical_tables.rs | 326 ++++++++++++++++++ 2 files changed, 327 insertions(+) create mode 100644 src/common/meta/src/ddl/tests/create_logical_tables.rs diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs index 2b94057c8c5c..5ea7d6a85803 100644 --- a/src/common/meta/src/ddl/tests.rs +++ b/src/common/meta/src/ddl/tests.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod create_logical_tables; mod create_table; diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs new file mode 100644 index 000000000000..4b4e39969096 --- /dev/null +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -0,0 +1,326 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::meta::Partition; +use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::{ColumnDataType, SemanticType}; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_procedure::Status; +use common_recordbatch::SendableRecordBatchStream; +use store_api::storage::RegionId; +use table::metadata::RawTableInfo; + +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::test_util::create_table::build_raw_table_info_from_expr; +use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; +use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; +use crate::error::{Error, Result}; +use crate::key::table_route::TableRouteValue; +use crate::peer::Peer; +use crate::rpc::ddl::CreateTableTask; +use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager}; + +// Note: this code may be duplicated with others. +// However, it's by design, ensures the tests are easy to be modified or added. +fn test_create_logical_table_task(name: &str) -> CreateTableTask { + let create_table = TestCreateTableExprBuilder::default() + .column_defs([ + TestColumnDefBuilder::default() + .name("ts") + .data_type(ColumnDataType::TimestampMillisecond) + .semantic_type(SemanticType::Timestamp) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("host") + .data_type(ColumnDataType::String) + .semantic_type(SemanticType::Tag) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("cpu") + .data_type(ColumnDataType::Float64) + .semantic_type(SemanticType::Field) + .build() + .unwrap() + .into(), + ]) + .time_index("ts") + .primary_keys(["host".into()]) + .table_name(name) + .build() + .unwrap() + .into(); + let table_info = build_raw_table_info_from_expr(&create_table); + CreateTableTask { + create_table, + // Single region + partitions: vec![Partition { + column_list: vec![], + value_list: vec![], + }], + table_info, + } +} + +// Note: this code may be duplicated with others. +// However, it's by design, ensures the tests are easy to be modified or added. +fn test_create_physical_table_task(name: &str) -> CreateTableTask { + let create_table = TestCreateTableExprBuilder::default() + .column_defs([ + TestColumnDefBuilder::default() + .name("ts") + .data_type(ColumnDataType::TimestampMillisecond) + .semantic_type(SemanticType::Timestamp) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("value") + .data_type(ColumnDataType::Float64) + .semantic_type(SemanticType::Field) + .build() + .unwrap() + .into(), + ]) + .time_index("ts") + .primary_keys(["value".into()]) + .table_name(name) + .build() + .unwrap() + .into(); + let table_info = build_raw_table_info_from_expr(&create_table); + CreateTableTask { + create_table, + // Single region + partitions: vec![Partition { + column_list: vec![], + value_list: vec![], + }], + table_info, + } +} + +#[tokio::test] +async fn test_on_prepare_physical_table_not_found() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let tasks = vec![test_create_logical_table_task("foo")]; + let physical_table_id = 1024u32; + let mut procedure = + CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, Error::TableRouteNotFound { .. }); +} + +async fn create_physical_table_metadata( + ddl_context: &DdlContext, + table_info: RawTableInfo, + table_route: TableRouteValue, +) { + ddl_context + .table_metadata_manager + .create_table_metadata(table_info, table_route, HashMap::default()) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_on_prepare() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // The create logical table procedure. + let tasks = vec![test_create_logical_table_task("foo")]; + let physical_table_id = table_id; + let mut procedure = + CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); +} + +#[tokio::test] +async fn test_on_prepare_logical_table_exists_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // Creates the logical table metadata. + let mut task = test_create_logical_table_task("foo"); + task.set_table_id(1025); + ddl_context + .table_metadata_manager + .create_logic_tables_metadata(vec![( + task.table_info.clone(), + TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]), + )]) + .await + .unwrap(); + // The create logical table procedure. + let physical_table_id = table_id; + let mut procedure = + CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, Error::TableAlreadyExists { .. }); + assert_eq!(err.status_code(), StatusCode::TableAlreadyExists); +} + +#[tokio::test] +async fn test_on_prepare_with_create_if_table_exists() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // Creates the logical table metadata. + let mut task = test_create_logical_table_task("foo"); + task.set_table_id(8192); + ddl_context + .table_metadata_manager + .create_logic_tables_metadata(vec![( + task.table_info.clone(), + TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), + )]) + .await + .unwrap(); + // The create logical table procedure. + let physical_table_id = table_id; + // Sets `create_if_not_exists` + task.create_table.create_if_not_exists = true; + let mut procedure = + CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context); + let status = procedure.on_prepare().await.unwrap(); + let output = status.downcast_output_ref::>().unwrap(); + assert_eq!(*output, vec![8192]); +} + +#[tokio::test] +async fn test_on_prepare_part_logical_tables_exist() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // Creates the logical table metadata. + let mut task = test_create_logical_table_task("exists"); + task.set_table_id(8192); + ddl_context + .table_metadata_manager + .create_logic_tables_metadata(vec![( + task.table_info.clone(), + TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), + )]) + .await + .unwrap(); + // The create logical table procedure. + let physical_table_id = table_id; + // Sets `create_if_not_exists` + task.create_table.create_if_not_exists = true; + let non_exist_task = test_create_logical_table_task("non_exists"); + let mut procedure = CreateLogicalTablesProcedure::new( + cluster_id, + vec![task, non_exist_task], + physical_table_id, + ddl_context, + ); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); +} From f9d17fd82fe367a9a29383729b864e080eb574f4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 7 Feb 2024 10:50:51 +0000 Subject: [PATCH 3/6] test(create_logical_tables): add tests for on_create_metadata --- src/common/meta/Cargo.toml | 1 + .../meta/src/ddl/create_logical_tables.rs | 4 + .../src/ddl/tests/create_logical_tables.rs | 194 +++++++++++++++++- 3 files changed, 198 insertions(+), 1 deletion(-) diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 554b0d6d795d..c16ebc33ef05 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -22,6 +22,7 @@ common-catalog.workspace = true common-error.workspace = true common-grpc-expr.workspace = true common-macro.workspace = true +common-procedure-test.workspace = true common-procedure.workspace = true common-procedure-test.workspace = true common-recordbatch.workspace = true diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 18f0be9fd0a8..86adfd89873f 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -163,6 +163,10 @@ impl CreateLogicalTablesProcedure { self.create_regions(region_routes).await } + /// Creates table metadata + /// + /// Abort(not-retry): + /// - Failed to create table metadata. pub async fn on_create_metadata(&self) -> Result { let manager = &self.context.table_metadata_manager; let physical_table_id = self.creator.data.physical_table_id(); diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index 4b4e39969096..3fbabdf081e5 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -21,8 +21,10 @@ use api::v1::region::{QueryRequest, RegionRequest}; use api::v1::{ColumnDataType, SemanticType}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; +use common_procedure_test::MockContextProvider; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::debug; use store_api::storage::RegionId; use table::metadata::RawTableInfo; @@ -324,3 +326,193 @@ async fn test_on_prepare_part_logical_tables_exist() { let status = procedure.on_prepare().await.unwrap(); assert_matches!(status, Status::Executing { persist: true }); } + +#[derive(Clone)] +pub struct NaiveDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for NaiveDatanodeHandler { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); + Ok(0) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[tokio::test] +async fn test_on_create_metadata() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // The create logical table procedure. + let physical_table_id = table_id; + // Creates the logical table metadata. + let task = test_create_logical_table_task("foo"); + let yet_another_task = test_create_logical_table_task("bar"); + let mut procedure = CreateLogicalTablesProcedure::new( + cluster_id, + vec![task, yet_another_task], + physical_table_id, + ddl_context, + ); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + // Triggers procedure to create table metadata + let status = procedure.execute(&ctx).await.unwrap(); + let table_ids = status.downcast_output_ref::>().unwrap(); + assert_eq!(*table_ids, vec![1025, 1026]); +} + +#[tokio::test] +async fn test_on_create_metadata_part_logical_tables_exist() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // Creates the logical table metadata. + let mut task = test_create_logical_table_task("exists"); + task.set_table_id(8192); + ddl_context + .table_metadata_manager + .create_logic_tables_metadata(vec![( + task.table_info.clone(), + TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), + )]) + .await + .unwrap(); + // The create logical table procedure. + let physical_table_id = table_id; + // Sets `create_if_not_exists` + task.create_table.create_if_not_exists = true; + let non_exist_task = test_create_logical_table_task("non_exists"); + let mut procedure = CreateLogicalTablesProcedure::new( + cluster_id, + vec![task, non_exist_task], + physical_table_id, + ddl_context, + ); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + // Triggers procedure to create table metadata + let status = procedure.execute(&ctx).await.unwrap(); + let table_ids = status.downcast_output_ref::>().unwrap(); + assert_eq!(*table_ids, vec![8192, 1025]); +} + +#[tokio::test] +async fn test_on_create_metadata_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // The create logical table procedure. + let physical_table_id = table_id; + // Creates the logical table metadata. + let task = test_create_logical_table_task("foo"); + let yet_another_task = test_create_logical_table_task("bar"); + let mut procedure = CreateLogicalTablesProcedure::new( + cluster_id, + vec![task.clone(), yet_another_task], + physical_table_id, + ddl_context.clone(), + ); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + // Creates logical table metadata(different with the task) + let mut task = task.clone(); + task.table_info.ident.table_id = 1025; + ddl_context + .table_metadata_manager + .create_logic_tables_metadata(vec![( + task.table_info, + TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]), + )]) + .await + .unwrap(); + // Triggers procedure to create table metadata + let error = procedure.execute(&ctx).await.unwrap_err(); + assert!(!error.is_retry_later()); +} From b812952e86f33a5698ba481f9e4e19baf1e8b60f Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 21 Feb 2024 00:06:32 +0900 Subject: [PATCH 4/6] refactor: rename to create_logical_tables_metadata --- src/common/meta/src/ddl/tests/create_logical_tables.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index 3fbabdf081e5..e6710d7440c8 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -212,7 +212,7 @@ async fn test_on_prepare_logical_table_exists_err() { task.set_table_id(1025); ddl_context .table_metadata_manager - .create_logic_tables_metadata(vec![( + .create_logical_tables_metadata(vec![( task.table_info.clone(), TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]), )]) @@ -258,7 +258,7 @@ async fn test_on_prepare_with_create_if_table_exists() { task.set_table_id(8192); ddl_context .table_metadata_manager - .create_logic_tables_metadata(vec![( + .create_logical_tables_metadata(vec![( task.table_info.clone(), TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), )]) @@ -306,7 +306,7 @@ async fn test_on_prepare_part_logical_tables_exist() { task.set_table_id(8192); ddl_context .table_metadata_manager - .create_logic_tables_metadata(vec![( + .create_logical_tables_metadata(vec![( task.table_info.clone(), TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), )]) @@ -427,7 +427,7 @@ async fn test_on_create_metadata_part_logical_tables_exist() { task.set_table_id(8192); ddl_context .table_metadata_manager - .create_logic_tables_metadata(vec![( + .create_logical_tables_metadata(vec![( task.table_info.clone(), TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), )]) @@ -506,7 +506,7 @@ async fn test_on_create_metadata_err() { task.table_info.ident.table_id = 1025; ddl_context .table_metadata_manager - .create_logic_tables_metadata(vec![( + .create_logical_tables_metadata(vec![( task.table_info, TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]), )]) From 227cf2458e14d7ad809395aea26658b14c209fbd Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 21 Feb 2024 00:10:19 +0900 Subject: [PATCH 5/6] chore: fmt toml --- src/common/meta/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index c16ebc33ef05..554b0d6d795d 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -22,7 +22,6 @@ common-catalog.workspace = true common-error.workspace = true common-grpc-expr.workspace = true common-macro.workspace = true -common-procedure-test.workspace = true common-procedure.workspace = true common-procedure-test.workspace = true common-recordbatch.workspace = true From 627a86d503f5efa7908f60b426fe27e57f9ae741 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 22 Feb 2024 18:26:33 +0900 Subject: [PATCH 6/6] chore: apply suggestions from CR --- src/common/meta/src/ddl/create_logical_tables.rs | 11 ++++------- src/common/meta/src/ddl/truncate_table.rs | 8 ++++---- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 86adfd89873f..16292e14bfc1 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -262,10 +262,10 @@ impl CreateLogicalTablesProcedure { body: Some(PbRegionRequest::Creates(creates)), }; create_region_tasks.push(async move { - if let Err(err) = requester.handle(request).await { - return Err(add_peer_context_if_needed(datanode)(err)); - } - Ok(()) + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(datanode)) }); } @@ -354,9 +354,6 @@ pub struct CreateTablesData { cluster_id: ClusterId, state: CreateTablesState, tasks: Vec, - // Because the table_id is allocated before entering the distributed lock, - // it needs to recheck if the table exists when creating a table. - // If it does exist, then the table_id needs to be replaced with the existing one. table_ids_already_exists: Vec>, physical_table_id: TableId, physical_region_numbers: Vec, diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 478efe7808e9..7890f70fbf99 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -169,10 +169,10 @@ impl TruncateTableProcedure { let requester = requester.clone(); truncate_region_tasks.push(async move { - if let Err(err) = requester.handle(request).await { - return Err(add_peer_context_if_needed(datanode)(err)); - } - Ok(()) + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(datanode)) }); } }