From 63681f0e4df8412d4c163d0f84acef08486c9f1a Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 28 Mar 2024 19:31:14 +0800 Subject: [PATCH] refactor(table): remove unused table requests (#3603) * refactor(table): remove unused requests Signed-off-by: Zhenchi * update comments Signed-off-by: Zhenchi * fix: clippy Signed-off-by: Zhenchi * fix: compile Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 1 + src/catalog/Cargo.toml | 1 + src/catalog/src/lib.rs | 6 +- src/datanode/src/datanode.rs | 2 +- src/frontend/src/error.rs | 7 +- src/frontend/src/script.rs | 63 ++------------- src/script/src/manager.rs | 27 ++++--- src/script/src/table.rs | 59 +++++++-------- src/table/src/requests.rs | 101 +------------------------ src/table/src/test_util/empty_table.rs | 29 +------ 10 files changed, 55 insertions(+), 241 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e6d00e94206..db8b5ef8ce4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1236,6 +1236,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" name = "catalog" version = "0.7.1" dependencies = [ + "api", "arrow", "arrow-schema", "async-stream", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 8d91421ebaac..1c0a9a9b170b 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -11,6 +11,7 @@ testing = [] workspace = true [dependencies] +api.workspace = true arrow.workspace = true arrow-schema.workspace = true async-stream.workspace = true diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 6864748881cd..5a98a0eb0032 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -19,10 +19,10 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use api::v1::CreateTableExpr; use futures::future::BoxFuture; use futures_util::stream::BoxStream; use table::metadata::TableId; -use table::requests::CreateTableRequest; use table::TableRef; use crate::error::Result; @@ -75,9 +75,9 @@ pub type OpenSystemTableHook = /// Register system table request: /// - When system table is already created and registered, the hook will be called /// with table ref after opening the system table -/// - When system table is not exists, create and register the table by create_table_request and calls open_hook with the created table. +/// - When system table is not exists, create and register the table by `create_table_expr` and calls `open_hook` with the created table. pub struct RegisterSystemTableRequest { - pub create_table_request: CreateTableRequest, + pub create_table_expr: CreateTableExpr, pub open_hook: Option, } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5fcd7d7af7f0..e55ac27dfad0 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -467,7 +467,7 @@ async fn open_all_regions( )); } } - info!("going to open {} regions", regions.len()); + info!("going to open {} region(s)", regions.len()); let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); let mut tasks = vec![]; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 3a3547c01c92..7c4d35c674fc 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -108,9 +108,6 @@ pub enum Error { #[snafu(display("Invalid DeleteRequest, reason: {}", reason))] InvalidDeleteRequest { reason: String, location: Location }, - #[snafu(display("Invalid system table definition: {err_msg}"))] - InvalidSystemTableDef { err_msg: String, location: Location }, - #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String }, @@ -322,9 +319,7 @@ impl ErrorExt for Error { | Error::VectorToGrpcColumn { .. } | Error::InvalidRegionRequest { .. } => StatusCode::Internal, - Error::ContextValueNotFound { .. } | Error::InvalidSystemTableDef { .. } => { - StatusCode::Unexpected - } + Error::ContextValueNotFound { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, diff --git a/src/frontend/src/script.rs b/src/frontend/src/script.rs index 03bd2db0a7a3..88d5edb68424 100644 --- a/src/frontend/src/script.rs +++ b/src/frontend/src/script.rs @@ -67,21 +67,19 @@ mod dummy { mod python { use api::v1::ddl_request::Expr; use api::v1::greptime_request::Request; - use api::v1::{CreateTableExpr, DdlRequest}; + use api::v1::DdlRequest; use arc_swap::ArcSwap; use catalog::RegisterSystemTableRequest; use common_error::ext::BoxedError; use common_meta::table_name::TableName; use common_telemetry::{error, info}; - use operator::expr_factory; use script::manager::ScriptManager; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; - use table::requests::CreateTableRequest; use super::*; - use crate::error::{CatalogSnafu, InvalidSystemTableDefSnafu, TableNotFoundSnafu}; + use crate::error::{CatalogSnafu, TableNotFoundSnafu}; use crate::instance::Instance; /// A placeholder for the real gRPC handler. @@ -148,17 +146,13 @@ mod python { } let RegisterSystemTableRequest { - create_table_request: request, + create_table_expr: expr, open_hook, } = self.script_manager.create_table_request(catalog); if let Some(table) = self .catalog_manager - .table( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) + .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) .await .context(CatalogSnafu)? { @@ -171,13 +165,8 @@ mod python { return Ok(()); } - let table_name = TableName::new( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ); - - let expr = Self::create_table_expr(request)?; + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); let _ = self .grpc_handler @@ -217,46 +206,6 @@ mod python { Ok(()) } - fn create_table_expr(request: CreateTableRequest) -> Result { - let column_schemas = request.schema.column_schemas; - - let time_index = column_schemas - .iter() - .find_map(|x| { - if x.is_time_index() { - Some(x.name.clone()) - } else { - None - } - }) - .context(InvalidSystemTableDefSnafu { - err_msg: "Time index is not defined.", - })?; - - let primary_keys = request - .primary_key_indices - .iter() - // Indexing has to be safe because the create script table request is pre-defined. - .map(|i| column_schemas[*i].name.clone()) - .collect::>(); - - let column_defs = expr_factory::column_schemas_to_defs(column_schemas, &primary_keys)?; - - Ok(CreateTableExpr { - catalog_name: request.catalog_name, - schema_name: request.schema_name, - table_name: request.table_name, - desc: request.desc.unwrap_or_default(), - column_defs, - time_index, - primary_keys, - create_if_not_exists: request.create_if_not_exists, - table_options: (&request.table_options).into(), - table_id: None, // Should and will be assigned by Meta. - engine: request.engine, - }) - } - pub async fn insert_script( &self, query_ctx: QueryContextRef, diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index c7301ab8d3cc..111adc24771e 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -16,9 +16,10 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use api::v1::CreateTableExpr; use arc_swap::ArcSwap; use catalog::{OpenSystemTableHook, RegisterSystemTableRequest}; -use common_catalog::consts::{default_engine, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID}; +use common_catalog::consts::{default_engine, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_query::Output; use common_telemetry::logging; @@ -26,7 +27,6 @@ use futures::future::FutureExt; use query::QueryEngineRef; use servers::query_handler::grpc::GrpcQueryHandlerRef; use snafu::{OptionExt, ResultExt}; -use table::requests::{CreateTableRequest, TableOptions}; use table::TableRef; use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine}; @@ -34,10 +34,7 @@ use crate::error::{ CompilePythonSnafu, ExecutePythonSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu, }; use crate::python::{PyEngine, PyScript}; -use crate::table::{ - build_scripts_schema, get_primary_key_indices, ScriptsTable, ScriptsTableRef, - SCRIPTS_TABLE_NAME, -}; +use crate::table::{build_scripts_schema, ScriptsTable, ScriptsTableRef, SCRIPTS_TABLE_NAME}; pub struct ScriptManager { compiled: RwLock>>, @@ -69,19 +66,21 @@ impl ScriptManager { } pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest { - let request = CreateTableRequest { - id: SCRIPTS_TABLE_ID, + let (time_index, primary_keys, column_defs) = build_scripts_schema(); + + let create_table_expr = CreateTableExpr { catalog_name: catalog.to_string(), // TODO(dennis): put the scripts table into `system` schema? // We always put the scripts table into `public` schema right now. schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: SCRIPTS_TABLE_NAME.to_string(), - desc: Some("GreptimeDB scripts table for Python".to_string()), - schema: build_scripts_schema(), - region_numbers: vec![0], - primary_key_indices: get_primary_key_indices(), + desc: "GreptimeDB scripts table for Python".to_string(), + column_defs, + time_index, + primary_keys, create_if_not_exists: true, - table_options: TableOptions::default(), + table_options: Default::default(), + table_id: None, // Should and will be assigned by Meta. engine: default_engine().to_string(), }; @@ -94,7 +93,7 @@ impl ScriptManager { }); RegisterSystemTableRequest { - create_table_request: request, + create_table_expr, open_hook: Some(hook), } } diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 6620cd86fdb7..676118feb854 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -15,12 +15,11 @@ //! Scripts table use std::sync::Arc; -use api::helper::ColumnDataTypeWrapper; use api::v1::greptime_request::Request; use api::v1::value::ValueData; use api::v1::{ - ColumnDataType, ColumnSchema as PbColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, - SemanticType, + ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest, + RowInsertRequests, Rows, SemanticType, }; use catalog::error::CompileScriptInternalSnafu; use common_error::ext::{BoxedError, ErrorExt}; @@ -33,7 +32,6 @@ use datafusion::logical_expr::{and, col, lit}; use datafusion_common::TableReference; use datafusion_expr::LogicalPlanBuilder; use datatypes::prelude::ScalarVector; -use datatypes::schema::{ColumnSchema, RawSchema}; use datatypes::vectors::{StringVector, Vector}; use query::plan::LogicalPlan; use query::QueryEngineRef; @@ -344,38 +342,35 @@ fn query_ctx(table_info: &TableInfo) -> QueryContextRef { .build() } -/// Returns the scripts schema's primary key indices -pub fn get_primary_key_indices() -> Vec { - let mut indices = vec![]; - for (index, c) in build_insert_column_schemas().into_iter().enumerate() { - if c.semantic_type == (SemanticType::Tag as i32) { - indices.push(index); - } - } +/// Builds scripts schema, returns (time index, primary keys, column defs) +pub fn build_scripts_schema() -> (String, Vec, Vec) { + let cols = build_insert_column_schemas(); - indices -} + let time_index = cols + .iter() + .find_map(|c| { + (c.semantic_type == (SemanticType::Timestamp as i32)).then(|| c.column_name.clone()) + }) + .unwrap(); // Safety: the column always exists -/// Build scripts table -pub fn build_scripts_schema() -> RawSchema { - let cols = build_insert_column_schemas() + let primary_keys = cols + .iter() + .filter(|c| (c.semantic_type == (SemanticType::Tag as i32))) + .map(|c| c.column_name.clone()) + .collect(); + + let column_defs = cols .into_iter() - .map(|c| { - let cs = ColumnSchema::new( - c.column_name, - // Safety: the type always exists - ColumnDataTypeWrapper::try_new(c.datatype, c.datatype_extension) - .unwrap() - .into(), - false, - ); - if c.semantic_type == SemanticType::Timestamp as i32 { - cs.with_time_index(true) - } else { - cs - } + .map(|c| ColumnDef { + name: c.column_name, + data_type: c.datatype, + is_nullable: false, + default_constraint: vec![], + semantic_type: c.semantic_type, + comment: "".to_string(), + datatype_extension: None, }) .collect(); - RawSchema::new(cols) + (time_index, primary_keys, column_defs) } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 21541218ff7c..0c1e322952ac 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -23,11 +23,10 @@ use common_datasource::object_store::s3::is_supported_in_s3; use common_query::AddColumnLocation; use common_time::range::TimestampRange; use datatypes::prelude::VectorRef; -use datatypes::schema::{ColumnSchema, RawSchema}; +use datatypes::schema::ColumnSchema; use serde::{Deserialize, Serialize}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY}; use store_api::mito_engine_options::is_mito_engine_option_key; -use store_api::storage::RegionNumber; use crate::error; use crate::error::ParseTableOptionSnafu; @@ -66,38 +65,6 @@ pub fn validate_table_option(key: &str) -> bool { .contains(&key) } -#[derive(Debug, Clone)] -pub struct CreateDatabaseRequest { - pub db_name: String, - pub create_if_not_exists: bool, -} - -/// Create table request -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CreateTableRequest { - pub id: TableId, - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub desc: Option, - pub schema: RawSchema, - pub region_numbers: Vec, - pub primary_key_indices: Vec, - pub create_if_not_exists: bool, - pub table_options: TableOptions, - pub engine: String, -} - -impl CreateTableRequest { - pub fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } -} - #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct TableOptions { @@ -177,16 +144,6 @@ impl From<&TableOptions> for HashMap { } } -/// Open table request -#[derive(Debug, Clone)] -pub struct OpenTableRequest { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub table_id: TableId, - pub region_numbers: Vec, -} - /// Alter table request #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AlterTableRequest { @@ -199,20 +156,6 @@ pub struct AlterTableRequest { pub table_version: Option, } -impl AlterTableRequest { - pub fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } - - pub fn is_rename_table(&self) -> bool { - matches!(self.alter_kind, AlterKind::RenameTable { .. }) - } -} - /// Add column request #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AddColumnRequest { @@ -228,48 +171,6 @@ pub enum AlterKind { RenameTable { new_table_name: String }, } -/// Drop table request -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DropTableRequest { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub table_id: TableId, -} - -impl DropTableRequest { - pub fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } -} - -/// Close table request -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CloseTableRequest { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub table_id: TableId, - /// Do nothing if region_numbers is empty - pub region_numbers: Vec, - /// flush regions - pub flush: bool, -} - -impl CloseTableRequest { - pub fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } -} - #[derive(Debug)] pub struct InsertRequest { pub catalog_name: String, diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index 8fde98e44068..bf5d68c2bd7c 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -20,40 +20,13 @@ use datatypes::schema::SchemaRef; use store_api::data_source::DataSource; use store_api::storage::ScanRequest; -use crate::metadata::{ - FilterPushDownType, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, -}; -use crate::requests::CreateTableRequest; +use crate::metadata::{FilterPushDownType, TableInfo}; use crate::thin_table::{ThinTable, ThinTableAdapter}; use crate::TableRef; pub struct EmptyTable; impl EmptyTable { - pub fn table(req: CreateTableRequest) -> TableRef { - let schema = Arc::new(req.schema.try_into().unwrap()); - let table_meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices(req.primary_key_indices) - .next_column_id(0) - .options(req.table_options) - .region_numbers(req.region_numbers) - .engine(req.engine) - .build(); - let table_info = TableInfoBuilder::default() - .table_id(req.id) - .catalog_name(req.catalog_name) - .schema_name(req.schema_name) - .name(req.table_name) - .meta(table_meta.unwrap()) - .table_type(TableType::Temporary) - .desc(req.desc) - .build() - .unwrap(); - - Self::from_table_info(&table_info) - } - pub fn from_table_info(info: &TableInfo) -> TableRef { let thin_table = ThinTable::new(Arc::new(info.clone()), FilterPushDownType::Unsupported); let data_source = Arc::new(EmptyDataSource {