From a1e649b1f1361f82d1f282e562d5edf70e65812f Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 11 Dec 2024 19:57:20 +0800 Subject: [PATCH] feat: create sink table --- src/flow/src/adapter.rs | 25 +++++- src/flow/src/adapter/util.rs | 141 +++++++++++++++++++------------ src/flow/src/server.rs | 4 + src/operator/src/expr_factory.rs | 3 +- src/operator/src/insert.rs | 2 +- 5 files changed, 116 insertions(+), 59 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 5d06162948c4..871a03cd7a8f 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -259,8 +259,12 @@ impl FlowWorkerManager { let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - let (is_ts_placeholder, proto_schema) = - self.try_fetch_or_create_table(&table_name).await?; + let (is_ts_placeholder, proto_schema) = self + .try_fetch_existing_table(&table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Table not found: {}", table_name.join(".")), + })?; let schema_len = proto_schema.len(); trace!( @@ -749,7 +753,8 @@ impl FlowWorkerManager { debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); - // TODO(discord9): check schema against actual table schema if exists + // check schema against actual table schema if exists + // if not exist create sink table immediately if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? { let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema); @@ -810,7 +815,21 @@ impl FlowWorkerManager { node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?; } else { // assign inferred schema to sink table + // create sink table node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; + let did_create = self + .create_table_from_relation( + &format!("flow-id={flow_id}"), + &sink_table_name, + &flow_plan.schema, + ) + .await?; + if !did_create { + UnexpectedSnafu { + reason: format!("Failed to create table {:?}", sink_table_name), + } + .fail()?; + } } let _ = comment; diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index ec6e824c4262..95db977951af 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -12,80 +12,113 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::helper::ColumnDataTypeWrapper; use api::v1::column_def::options_from_column_schema; -use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType}; +use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType}; use common_error::ext::BoxedError; use common_meta::key::table_info::TableInfoValue; use datatypes::schema::ColumnSchema; use itertools::Itertools; +use operator::expr_factory::CreateExprFactory; +use session::context::QueryContextBuilder; use snafu::{OptionExt, ResultExt}; +use table::table_reference::TableReference; use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL}; -use crate::error::{Error, ExternalSnafu, TableNotFoundSnafu}; -use crate::expr::GlobalId; +use crate::error::{Error, ExternalSnafu, UnexpectedSnafu}; use crate::repr::{ColumnType, RelationDesc, RelationType}; use crate::FlowWorkerManager; impl FlowWorkerManager { - /// Fetch table info or create table('s schema) from flow's schema if not exist - /// - /// will create sink table automatically if not exist - /// TODO(discord9): impl that - pub(crate) async fn try_fetch_or_create_table( + /// Create table from given schema(will adjust to add auto column if needed), return true if table is created + pub(crate) async fn create_table_from_relation( &self, + flow_name: &str, table_name: &TableName, - ) -> Result<(bool, Vec), Error> { - // TODO(discord9): instead of auto build table from request schema, actually build table - // before `create flow` to be able to assign pk and ts etc. - let (primary_keys, schema, is_ts_placeholder) = - if let Some((primary_keys, time_index, schema)) = - self.fetch_table_pk_schema(table_name).await? - { - // check if the last column is the auto created timestamp column, hence the table is auto created from - // flow's plan type - let is_auto_create = { - let correct_name = schema - .last() - .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) - .unwrap_or(false); - let correct_time_index = time_index == Some(schema.len() - 1); - correct_name && correct_time_index - }; - (primary_keys, schema, is_auto_create) - } else { - let schema = { - let node_ctx = self.node_context.read().await; - let gid: GlobalId = node_ctx - .table_repr - .get_by_name(table_name) - .map(|x| x.1) - .unwrap(); - node_ctx - .schema - .get(&gid) - .with_context(|| TableNotFoundSnafu { - name: format!("Table name = {:?}", table_name), - })? - .clone() - }; - let (pks, tys, is_ts_auto) = self.adjust_auto_created_table_schema(&schema).await?; - - // TODO(discord9): create sink table using pks, column types and is_ts_auto - - (pks, tys, is_ts_auto) - }; - let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; - Ok((is_ts_placeholder, proto_schema)) + relation_desc: &RelationDesc, + ) -> Result { + if self.fetch_table_pk_schema(table_name).await?.is_some() { + return Ok(false); + } + let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?; + + //create sink table using pks, column types and is_ts_auto + + let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?; + + // create sink table + let create_expr = CreateExprFactory {} + .create_table_expr_by_column_schemas( + &TableReference { + catalog: &table_name[0], + schema: &table_name[1], + table: &table_name[2], + }, + &proto_schema, + "mito", + Some(&format!("Sink table for flow {}", flow_name)), + ) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + self.submit_create_sink_table_ddl(create_expr).await?; + Ok(true) } - /// Create sink table using primary keys and schema - pub(crate) async fn create_sink_table( + /// Try fetch table with adjusted schema(added auto column if needed) + pub(crate) async fn try_fetch_existing_table( &self, table_name: &TableName, - primary_keys: &[String], - schema: &[ColumnSchema], + ) -> Result)>, Error> { + if let Some((primary_keys, time_index, schema)) = + self.fetch_table_pk_schema(table_name).await? + { + // check if the last column is the auto created timestamp column, hence the table is auto created from + // flow's plan type + let is_auto_create = { + let correct_name = schema + .last() + .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) + .unwrap_or(false); + let correct_time_index = time_index == Some(schema.len() - 1); + correct_name && correct_time_index + }; + let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; + Ok(Some((is_auto_create, proto_schema))) + } else { + Ok(None) + } + } + + /// submit a create table ddl + pub(crate) async fn submit_create_sink_table_ddl( + &self, + mut create_table: CreateTableExpr, ) -> Result<(), Error> { + let stmt_exec = { + self.frontend_invoker + .read() + .await + .as_ref() + .map(|f| f.statement_executor()) + } + .context(UnexpectedSnafu { + reason: "Failed to get statement executor", + })?; + let ctx = Arc::new( + QueryContextBuilder::default() + .current_catalog(create_table.catalog_name.clone()) + .current_schema(create_table.schema_name.clone()) + .build(), + ); + stmt_exec + .create_table_inner(&mut create_table, None, ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + Ok(()) } } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 892c650bb444..812f7ee3fa0e 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -503,4 +503,8 @@ impl FrontendInvoker { .map_err(BoxedError::new) .context(common_frontend::error::ExternalSnafu) } + + pub fn statement_executor(&self) -> Arc { + self.statement_executor.clone() + } } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 59fe87a66ec3..bc50eff161b5 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -68,6 +68,7 @@ impl CreateExprFactory { table_name: &TableReference<'_>, column_schemas: &[api::v1::ColumnSchema], engine: &str, + desc: Option<&str>, ) -> Result { let column_exprs = ColumnExpr::from_column_schemas(column_schemas); let create_expr = common_grpc_expr::util::build_create_table_expr( @@ -75,7 +76,7 @@ impl CreateExprFactory { table_name, column_exprs, engine, - "Created on insertion", + desc.unwrap_or("Created on insertion"), ) .context(BuildCreateExprOnInsertionSnafu)?; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index ec01b329457f..2386b194694a 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -864,5 +864,5 @@ fn build_create_table_expr( request_schema: &[ColumnSchema], engine: &str, ) -> Result { - CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine) + CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None) }