Skip to content

Commit

Permalink
feat(frontend): initially introduce table def sql purification (#19949)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jan 6, 2025
1 parent d8f9d96 commit 3f75df9
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 46 deletions.
20 changes: 20 additions & 0 deletions e2e_test/ddl/show_purify.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Test definition purification for `CREATE TABLE AS`, mainly focusing on the data types.
statement ok
create table ctas as select
0::int as v0,
1::decimal as v1,
'2022-03-13 01:00:00'::timestamp as v2,
'2022-03-13 01:00:00Z'::timestamptz as v3,
array['foo', 'bar', 'null'] as v4,
(1, (2, 3))::STRUCT<i BIGINT, j STRUCT<a BIGINT, b VARCHAR>> as v5,
hex_to_int256('0x11') as v6,
map{'key1': 1, 'key2': 2, 'key3': 3} as v7
;

query TT
show create table ctas;
----
public.ctas CREATE TABLE ctas (v0 INT, v1 NUMERIC, v2 TIMESTAMP, v3 TIMESTAMP WITH TIME ZONE, v4 CHARACTER VARYING[], v5 STRUCT<i BIGINT, j STRUCT<a BIGINT, b CHARACTER VARYING>>, v6 rw_int256, v7 MAP(CHARACTER VARYING,INT))

statement ok
drop table ctas;
12 changes: 12 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

# Demonstrate purified definition
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tables WHERE name = 't';
----
CREATE TABLE t (bar INT, foo CHARACTER VARYING, gen_col INT AS bar + 1)

sleep 4s

query ?
Expand All @@ -43,6 +49,12 @@ sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root"
statement ok
ALTER TABLE t REFRESH SCHEMA;

# Demonstrate purified definition
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tables WHERE name = 't';
----
CREATE TABLE t (bar INT, foo CHARACTER VARYING, nested STRUCT<baz INT>, gen_col INT AS bar + 1)

query ?
select * from t
----
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ impl ColumnCatalog {
!self.is_generated() && !self.is_rw_timestamp_column()
}

/// Returns whether the column is defined by user within the column definition clause
/// in the `CREATE TABLE` statement.
pub fn is_user_defined(&self) -> bool {
!self.is_hidden() && !self.is_rw_sys_column() && !self.is_connector_additional_column()
}

/// If the column is a generated column
pub fn generated_expr(&self) -> Option<&ExprNode> {
if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use thiserror::Error;

use crate::error::{ErrorCode, Result, RwError};
pub(crate) mod catalog_service;
mod purify;

pub(crate) mod connection_catalog;
pub(crate) mod database_catalog;
Expand Down
143 changes: 143 additions & 0 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnId};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_sqlparser::ast::*;

use crate::error::Result;
use crate::utils::data_type::DataTypeToAst as _;

/// Try to restore missing column definitions and constraints in the persisted table definition,
/// if the schema of the table is derived from external systems (like schema registry) or it's
/// created by `CREATE TABLE AS`.
///
/// Returns error if restoring failed, or called on non-`TableType::Table`, or the persisted
/// definition is invalid.
pub fn try_purify_table_create_sql_ast(
mut base: Statement,
columns: &[ColumnCatalog],
row_id_index: Option<usize>,
pk_column_ids: &[ColumnId],
) -> Result<Statement> {
let Statement::CreateTable {
columns: column_defs,
constraints,
wildcard_idx,
..
} = &mut base
else {
bail!("expect `CREATE TABLE` statement, found: `{:?}`", base);
};

// Filter out columns that are not defined by users in SQL.
let defined_columns = columns.iter().filter(|c| c.is_user_defined());

// If all columns are defined...
// - either the schema is fully specified by the user,
// - the persisted definition is already purified.
// No need to proceed.
if !column_defs.is_empty() && wildcard_idx.is_none() {
let defined_columns_len = defined_columns.count();
if column_defs.len() != defined_columns_len {
bail /* unlikely */ !(
"column count mismatch: defined {} columns, but {} columns in the definition",
defined_columns_len,
column_defs.len()
);
}

return Ok(base);
}

// Schema inferred. Now derive the missing columns and constraints.
// First, remove the wildcard from the definition.
*wildcard_idx = None;

// Derive `ColumnDef` from `ColumnCatalog`.
let mut purified_column_defs = Vec::new();
for column in defined_columns {
// If the column is already defined in the persisted definition, keep it.
if let Some(existing) = column_defs
.iter()
.find(|c| c.name.real_value() == column.name())
{
purified_column_defs.push(existing.clone());
continue;
}

if let Some(c) = &column.column_desc.generated_or_default_column {
match c {
GeneratedOrDefaultColumn::GeneratedColumn(_) => {
unreachable!("generated column must not be inferred");
}
GeneratedOrDefaultColumn::DefaultColumn(_) => {
// TODO: convert `ExprNode` back to ast can be a bit tricky.
// Fortunately, this case is rare as inferring default values is not
// widely supported.
bail /* unlikely */ !("purifying default value is not supported yet");
}
}
}

let column_def = ColumnDef {
name: column.name().into(),
data_type: Some(column.data_type().to_ast()),
collation: None,
options: Vec::new(), // pk will be specified with table constraints
};
purified_column_defs.push(column_def);
}
*column_defs = purified_column_defs;

if row_id_index.is_none() {
// User-defined primary key.
let mut pk_columns = Vec::new();

for &id in pk_column_ids {
let column = columns.iter().find(|c| c.column_id() == id).unwrap();
if !column.is_user_defined() {
bail /* unlikely */ !(
"primary key column \"{}\" is not user-defined",
column.name()
);
}
pk_columns.push(column.name().into());
}

let pk_constraint = TableConstraint::Unique {
name: None,
columns: pk_columns,
is_primary: true,
};

// We don't support table constraints other than `PRIMARY KEY`, thus simply overwrite.
assert!(
constraints.len() <= 1
&& constraints.iter().all(|c| matches!(
c,
TableConstraint::Unique {
is_primary: true,
..
}
)),
"unexpected table constraints: {constraints:?}",
);

*constraints = vec![pk_constraint];
}

Ok(base)
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn read_rw_table_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwTable>> {
name: table.name().to_owned(),
schema_id: schema.id() as i32,
owner: table.owner as i32,
definition: table.create_sql(),
definition: table.create_sql_purified(),
append_only: table.append_only,
acl: get_acl_items(
&Object::TableId(table.id.table_id),
Expand Down
47 changes: 47 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::DefaultColumnDesc;
use risingwave_sqlparser::ast;
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport as _;

use super::purify::try_purify_table_create_sql_ast;
use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::session::current::notice_to_user;
use crate::user::UserId;

/// `TableCatalog` Includes full information about a table.
Expand Down Expand Up @@ -273,6 +276,50 @@ impl TableVersion {
}
}

impl TableCatalog {
/// Returns the SQL definition when the table was created, purified with best effort
/// if it's a table.
pub fn create_sql_purified(&self) -> String {
self.create_sql_ast_purified()
.map(|stmt| stmt.to_string())
.unwrap_or_else(|_| self.create_sql())
}

/// Returns the parsed SQL definition when the table was created, purified with best effort
/// if it's a table.
///
/// Returns error if it's invalid.
pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
// Purification is only applicable to tables.
if let TableType::Table = self.table_type() {
let base = if self.definition.is_empty() {
// Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
let name = ast::ObjectName(vec![self.name.as_str().into()]);
ast::Statement::default_create_table(name)
} else {
self.create_sql_ast()?
};

match try_purify_table_create_sql_ast(
base,
self.columns(),
self.row_id_index,
&self.pk_column_ids(),
) {
Ok(stmt) => return Ok(stmt),
Err(e) => notice_to_user(format!(
"error occurred while purifying definition for table \"{}\", \
results may be inaccurate: {}",
self.name,
e.as_report()
)),
}
}

self.create_sql_ast()
}
}

impl TableCatalog {
/// Get a reference to the table catalog's table id.
pub fn id(&self) -> TableId {
Expand Down
47 changes: 4 additions & 43 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::anyhow;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ColumnCatalog, Engine};
use risingwave_common::hash::VnodeCount;
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::{bail, bail_not_implemented};
use risingwave_connector::sink::catalog::SinkCatalog;
Expand All @@ -29,8 +27,7 @@ use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Ident, ObjectName,
Statement, StructField, TableConstraint,
AlterTableOperation, ColumnDef, ColumnOption, Ident, ObjectName, Statement, TableConstraint,
};

use super::create_source::schema_has_schema_registry;
Expand All @@ -44,6 +41,7 @@ use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::utils::data_type::DataTypeToAst;
use crate::{Binder, TableCatalog};

/// Used in auto schema change process
Expand Down Expand Up @@ -101,10 +99,10 @@ pub async fn get_new_table_definition_for_cdc_table(
// if the column exists in the original catalog, use it to construct the column definition.
// since we don't support altering the column type right now
if let Some(original_col) = orig_column_catalog.get(new_col.name()) {
let ty = to_ast_data_type(original_col.data_type())?;
let ty = original_col.data_type().to_ast();
new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![]));
} else {
let ty = to_ast_data_type(new_col.data_type())?;
let ty = new_col.data_type().to_ast();
new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![]));
}
}
Expand All @@ -113,43 +111,6 @@ pub async fn get_new_table_definition_for_cdc_table(
Ok((definition, original_catalog))
}

fn to_ast_data_type(ty: &DataType) -> Result<AstDataType> {
match ty {
DataType::Boolean => Ok(AstDataType::Boolean),
DataType::Int16 => Ok(AstDataType::SmallInt),
DataType::Int32 => Ok(AstDataType::Int),
DataType::Int64 => Ok(AstDataType::BigInt),
DataType::Float32 => Ok(AstDataType::Real),
DataType::Float64 => Ok(AstDataType::Double),
// TODO: handle precision and scale for decimal
DataType::Decimal => Ok(AstDataType::Decimal(None, None)),
DataType::Date => Ok(AstDataType::Date),
DataType::Varchar => Ok(AstDataType::Varchar),
DataType::Time => Ok(AstDataType::Time(false)),
DataType::Timestamp => Ok(AstDataType::Timestamp(false)),
DataType::Timestamptz => Ok(AstDataType::Timestamp(true)),
DataType::Interval => Ok(AstDataType::Interval),
DataType::Jsonb => Ok(AstDataType::Jsonb),
DataType::Bytea => Ok(AstDataType::Bytea),
DataType::List(item_ty) => Ok(AstDataType::Array(Box::new(to_ast_data_type(item_ty)?))),
DataType::Struct(fields) => {
let fields = fields
.iter()
.map(|(name, ty)| {
Ok::<StructField, RwError>(StructField {
name: name.into(),
data_type: to_ast_data_type(ty)?,
})
})
.try_collect()?;
Ok(AstDataType::Struct(fields))
}
DataType::Serial | DataType::Int256 | DataType::Map(_) => {
Err(anyhow!("unsupported data type: {:?}", ty).context("to_ast_data_type"))?
}
}
}

pub async fn get_replace_table_plan(
session: &Arc<SessionImpl>,
table_name: ObjectName,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ pub async fn handle_create_as(
vec![], // No watermark should be defined in for `CREATE TABLE AS`
col_id_gen.into_version(),
CreateTableProps {
definition: "".to_owned(), // TODO: empty definition means no schema change support
// Note: by providing and persisting an empty definition, querying the definition of the table
// will hit the purification logic, which will construct it based on the catalog.
definition: "".to_owned(),
append_only,
on_conflict: on_conflict.into(),
with_version_column,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ pub fn handle_show_create_object(
.get_created_table_by_name(&object_name)
.filter(|t| t.is_user_table())
.ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
table.create_sql()
table.create_sql_purified()
}
ShowCreateType::Sink => {
let sink = schema
Expand Down
Loading

0 comments on commit 3f75df9

Please sign in to comment.