Skip to content

Commit

Permalink
feat: create sink table
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 11, 2024
1 parent 6431650 commit a1e649b
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 59 deletions.
25 changes: 22 additions & 3 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,12 @@ impl FlowWorkerManager {
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
let ctx = Arc::new(QueryContext::with(&catalog, &schema));

let (is_ts_placeholder, proto_schema) =
self.try_fetch_or_create_table(&table_name).await?;
let (is_ts_placeholder, proto_schema) = self
.try_fetch_existing_table(&table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Table not found: {}", table_name.join(".")),
})?;
let schema_len = proto_schema.len();

trace!(
Expand Down Expand Up @@ -749,7 +753,8 @@ impl FlowWorkerManager {

debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);

// TODO(discord9): check schema against actual table schema if exists
// check schema against actual table schema if exists
// if not exist create sink table immediately
if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);

Expand Down Expand Up @@ -810,7 +815,21 @@ impl FlowWorkerManager {
node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?;
} else {
// assign inferred schema to sink table
// create sink table
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
let did_create = self
.create_table_from_relation(
&format!("flow-id={flow_id}"),
&sink_table_name,
&flow_plan.schema,
)
.await?;
if !did_create {
UnexpectedSnafu {
reason: format!("Failed to create table {:?}", sink_table_name),
}
.fail()?;
}
}

let _ = comment;
Expand Down
141 changes: 87 additions & 54 deletions src/flow/src/adapter/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,80 +12,113 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::helper::ColumnDataTypeWrapper;
use api::v1::column_def::options_from_column_schema;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType};
use common_error::ext::BoxedError;
use common_meta::key::table_info::TableInfoValue;
use datatypes::schema::ColumnSchema;
use itertools::Itertools;
use operator::expr_factory::CreateExprFactory;
use session::context::QueryContextBuilder;
use snafu::{OptionExt, ResultExt};
use table::table_reference::TableReference;

use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL};
use crate::error::{Error, ExternalSnafu, TableNotFoundSnafu};
use crate::expr::GlobalId;
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::FlowWorkerManager;

impl FlowWorkerManager {
/// Fetch table info or create table('s schema) from flow's schema if not exist
///
/// will create sink table automatically if not exist
/// TODO(discord9): impl that
pub(crate) async fn try_fetch_or_create_table(
/// Create table from given schema(will adjust to add auto column if needed), return true if table is created
pub(crate) async fn create_table_from_relation(
&self,
flow_name: &str,
table_name: &TableName,
) -> Result<(bool, Vec<api::v1::ColumnSchema>), Error> {
// TODO(discord9): instead of auto build table from request schema, actually build table
// before `create flow` to be able to assign pk and ts etc.
let (primary_keys, schema, is_ts_placeholder) =
if let Some((primary_keys, time_index, schema)) =
self.fetch_table_pk_schema(table_name).await?
{
// check if the last column is the auto created timestamp column, hence the table is auto created from
// flow's plan type
let is_auto_create = {
let correct_name = schema
.last()
.map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
.unwrap_or(false);
let correct_time_index = time_index == Some(schema.len() - 1);
correct_name && correct_time_index
};
(primary_keys, schema, is_auto_create)
} else {
let schema = {
let node_ctx = self.node_context.read().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(table_name)
.map(|x| x.1)
.unwrap();
node_ctx
.schema
.get(&gid)
.with_context(|| TableNotFoundSnafu {
name: format!("Table name = {:?}", table_name),
})?
.clone()
};
let (pks, tys, is_ts_auto) = self.adjust_auto_created_table_schema(&schema).await?;

// TODO(discord9): create sink table using pks, column types and is_ts_auto

(pks, tys, is_ts_auto)
};
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
Ok((is_ts_placeholder, proto_schema))
relation_desc: &RelationDesc,
) -> Result<bool, Error> {
if self.fetch_table_pk_schema(table_name).await?.is_some() {
return Ok(false);
}
let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?;

//create sink table using pks, column types and is_ts_auto

let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?;

// create sink table
let create_expr = CreateExprFactory {}
.create_table_expr_by_column_schemas(
&TableReference {
catalog: &table_name[0],
schema: &table_name[1],
table: &table_name[2],
},
&proto_schema,
"mito",
Some(&format!("Sink table for flow {}", flow_name)),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

self.submit_create_sink_table_ddl(create_expr).await?;
Ok(true)
}

/// Create sink table using primary keys and schema
pub(crate) async fn create_sink_table(
/// Try fetch table with adjusted schema(added auto column if needed)
pub(crate) async fn try_fetch_existing_table(
&self,
table_name: &TableName,
primary_keys: &[String],
schema: &[ColumnSchema],
) -> Result<Option<(bool, Vec<api::v1::ColumnSchema>)>, Error> {
if let Some((primary_keys, time_index, schema)) =
self.fetch_table_pk_schema(table_name).await?
{
// check if the last column is the auto created timestamp column, hence the table is auto created from
// flow's plan type
let is_auto_create = {
let correct_name = schema
.last()
.map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
.unwrap_or(false);
let correct_time_index = time_index == Some(schema.len() - 1);
correct_name && correct_time_index
};
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
Ok(Some((is_auto_create, proto_schema)))
} else {
Ok(None)
}
}

/// submit a create table ddl
pub(crate) async fn submit_create_sink_table_ddl(
&self,
mut create_table: CreateTableExpr,
) -> Result<(), Error> {
let stmt_exec = {
self.frontend_invoker
.read()
.await
.as_ref()
.map(|f| f.statement_executor())
}
.context(UnexpectedSnafu {
reason: "Failed to get statement executor",
})?;
let ctx = Arc::new(
QueryContextBuilder::default()
.current_catalog(create_table.catalog_name.clone())
.current_schema(create_table.schema_name.clone())
.build(),
);
stmt_exec
.create_table_inner(&mut create_table, None, ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Ok(())
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,4 +503,8 @@ impl FrontendInvoker {
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)
}

pub fn statement_executor(&self) -> Arc<StatementExecutor> {
self.statement_executor.clone()
}
}
3 changes: 2 additions & 1 deletion src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ impl CreateExprFactory {
table_name: &TableReference<'_>,
column_schemas: &[api::v1::ColumnSchema],
engine: &str,
desc: Option<&str>,
) -> Result<CreateTableExpr> {
let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
let create_expr = common_grpc_expr::util::build_create_table_expr(
None,
table_name,
column_exprs,
engine,
"Created on insertion",
desc.unwrap_or("Created on insertion"),
)
.context(BuildCreateExprOnInsertionSnafu)?;

Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,5 +864,5 @@ fn build_create_table_expr(
request_schema: &[ColumnSchema],
engine: &str,
) -> Result<CreateTableExpr> {
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine)
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None)
}

0 comments on commit a1e649b

Please sign in to comment.