Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): lossy "purify" column default value #19993

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 59 additions & 32 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use itertools::Itertools;
use prost::Message as _;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnId};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
Expand Down Expand Up @@ -53,65 +56,89 @@ pub fn try_purify_table_source_create_sql_ast(
// Filter out columns that are not defined by users in SQL.
let defined_columns = columns.iter().filter(|c| c.is_user_defined());

// If all columns are defined...
// - either the schema is fully specified by the user,
// - the persisted definition is already purified.
// No need to proceed.
// If all columns are defined, check if the count matches.
if !column_defs.is_empty() && wildcard_idx.is_none() {
let defined_columns_len = defined_columns.count();
let defined_columns_len = defined_columns.clone().count();
if column_defs.len() != defined_columns_len {
bail /* unlikely */ !(
"column count mismatch: defined {} columns, but {} columns in the definition",
defined_columns_len,
column_defs.len()
);
}

return Ok(base);
Comment on lines -69 to -70
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we short-circuited the procedure once we find all columns are present. Now we should always go through the step for filling the default value.

}

// Schema inferred. Now derive the missing columns and constraints.
// Now derive the missing columns and constraints.

// First, remove the wildcard from the definition.
*wildcard_idx = None;

// Derive `ColumnDef` from `ColumnCatalog`.
let mut purified_column_defs = Vec::new();
for column in defined_columns {
// If the column is already defined in the persisted definition, keep it.
if let Some(existing) = column_defs
let mut column_def = if let Some(existing) = column_defs
.iter()
.find(|c| c.name.real_value() == column.name())
{
purified_column_defs.push(existing.clone());
continue;
}
// If the column is already defined in the persisted definition, retrieve it.
existing.clone()
} else {
assert!(
!column.is_generated(),
"generated column must not be inferred"
);

if let Some(c) = &column.column_desc.generated_or_default_column {
match c {
GeneratedOrDefaultColumn::GeneratedColumn(_) => {
unreachable!("generated column must not be inferred");
}
GeneratedOrDefaultColumn::DefaultColumn(_) => {
// TODO: convert `ExprNode` back to ast can be a bit tricky.
// Fortunately, this case is rare as inferring default values is not
// widely supported.
bail /* unlikely */ !("purifying default value is not supported yet");
}
// Generate a new `ColumnDef` from the catalog.
ColumnDef {
name: column.name().into(),
data_type: Some(column.data_type().to_ast()),
collation: None,
options: Vec::new(), // pk will be specified with table constraints
}
};

// Fill in the persisted default value desc.
if let Some(c) = &column.column_desc.generated_or_default_column
&& let GeneratedOrDefaultColumn::DefaultColumn(desc) = c
{
let persisted = desc.encode_to_vec().into_boxed_slice();

let default_value_option = column_def
.options
.extract_if(|o| {
matches!(
o.option,
ColumnOption::DefaultValue { .. }
| ColumnOption::DefaultValueInternal { .. }
)
})
.at_most_one()
.ok()
.context("multiple default value options found")?;

let expr = default_value_option.and_then(|o| match o.option {
ColumnOption::DefaultValue(expr) => Some(expr),
ColumnOption::DefaultValueInternal { expr, .. } => expr,
_ => unreachable!(),
});

column_def.options.push(ColumnOptionDef {
name: None,
option: ColumnOption::DefaultValueInternal { persisted, expr },
});
}

let column_def = ColumnDef {
name: column.name().into(),
data_type: Some(column.data_type().to_ast()),
collation: None,
options: Vec::new(), // pk will be specified with table constraints
};
purified_column_defs.push(column_def);
}
*column_defs = purified_column_defs;

if row_id_index.is_none() {
// User-defined primary key.
// Specify user-defined primary key in table constraints.
let has_pk_column_constraint = column_defs.iter().any(|c| {
c.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Unique { is_primary: true }))
});
if !has_pk_column_constraint && row_id_index.is_none() {
let mut pk_columns = Vec::new();

for &id in pk_column_ids {
Expand Down
37 changes: 29 additions & 8 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use either::Either;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use prost::Message as _;
use risingwave_common::catalog::{
CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, Engine, FieldLike, TableId,
TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_TABLE_VERSION_ID, RISINGWAVE_ICEBERG_ROW_ID,
Expand Down Expand Up @@ -69,7 +70,7 @@ use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::table_catalog::{TableVersion, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId};
use crate::error::{ErrorCode, Result, RwError};
use crate::error::{bail_bind_error, ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::handler::create_source::{
bind_connector_props, bind_create_source_or_table_with_connector, bind_source_watermark,
Expand Down Expand Up @@ -179,7 +180,8 @@ fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
for option_def in &c.options {
match option_def.option {
ColumnOption::GeneratedColumns(_) => {}
ColumnOption::DefaultColumns(_) => {}
ColumnOption::DefaultValue(_) => {}
ColumnOption::DefaultValueInternal { .. } => {}
ColumnOption::Unique { is_primary: true } => {}
_ => bail_not_implemented!("column constraints \"{}\"", option_def),
}
Expand Down Expand Up @@ -323,12 +325,13 @@ pub fn bind_sql_column_constraints(
binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;

for column in columns {
let idx = binder.get_column_binding_index(table_name.clone(), &column.name.real_value())?;

for option_def in column.options {
match option_def.option {
ColumnOption::GeneratedColumns(expr) => {
binder.set_clause(Some(Clause::GeneratedColumn));
let idx = binder
.get_column_binding_index(table_name.clone(), &column.name.real_value())?;

let expr_impl = binder.bind_expr(expr).with_context(|| {
format!(
"fail to bind expression in generated column \"{}\"",
Expand All @@ -352,9 +355,7 @@ pub fn bind_sql_column_constraints(
);
binder.set_clause(None);
}
ColumnOption::DefaultColumns(expr) => {
let idx = binder
.get_column_binding_index(table_name.clone(), &column.name.real_value())?;
ColumnOption::DefaultValue(expr) => {
let expr_impl = binder
.bind_expr(expr)?
.cast_assign(column_catalogs[idx].data_type().clone())?;
Expand Down Expand Up @@ -389,6 +390,24 @@ pub fn bind_sql_column_constraints(
.into());
}
}
ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
// When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
// should already be set during purifcation. So if we encounter an empty value here, it
// means the user has specified it explicitly in the SQL statement, typically by
// directly copying the result of `SHOW CREATE TABLE` and executing it.
if persisted.is_empty() {
bail_bind_error!(
"DEFAULT INTERNAL is only used for internal purposes, \
please specify a concrete default value"
);
}

let desc = DefaultColumnDesc::decode(&*persisted)
.expect("failed to decode persisted `DefaultColumnDesc`");

column_catalogs[idx].column_desc.generated_or_default_column =
Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
}
_ => {}
}
}
Expand Down Expand Up @@ -1128,7 +1147,9 @@ pub(super) async fn handle_create_table_plan(
None => {
for column_def in &column_defs {
for option_def in &column_def.options {
if let ColumnOption::DefaultColumns(_) = option_def.option {
if let ColumnOption::DefaultValue(_)
| ColumnOption::DefaultValueInternal { .. } = option_def.option
{
return Err(ErrorCode::NotSupported(
"Default value for columns defined on the table created from a CDC source".into(),
"Remove the default value expression in the column definitions".into(),
Expand Down
21 changes: 19 additions & 2 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,17 @@ pub enum ColumnOption {
/// `NOT NULL`
NotNull,
/// `DEFAULT <restricted-expr>`
DefaultColumns(Expr),
DefaultValue(Expr),
/// Default value from previous bound `DefaultColumnDesc`. Used internally
/// for schema change and should not be specified by users.
DefaultValueInternal {
/// Protobuf encoded `DefaultColumnDesc`.
persisted: Box<[u8]>,
/// Optional AST for unparsing. If `None`, the default value will be
/// shown as `DEFAULT INTERNAL` which is for demonstrating and should
/// not be specified by users.
expr: Option<Expr>,
},
/// `{ PRIMARY KEY | UNIQUE }`
Unique { is_primary: bool },
/// A referential integrity constraint (`[FOREIGN KEY REFERENCES
Expand Down Expand Up @@ -765,7 +775,14 @@ impl fmt::Display for ColumnOption {
match self {
Null => write!(f, "NULL"),
NotNull => write!(f, "NOT NULL"),
DefaultColumns(expr) => write!(f, "DEFAULT {}", expr),
DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
DefaultValueInternal { persisted: _, expr } => {
if let Some(expr) = expr {
write!(f, "DEFAULT {}", expr)
} else {
write!(f, "DEFAULT INTERNAL")
}
}
Unique { is_primary } => {
write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
}
Expand Down
10 changes: 9 additions & 1 deletion src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2794,7 +2794,15 @@ impl Parser<'_> {
} else if self.parse_keyword(Keyword::NULL) {
Ok(Some(ColumnOption::Null))
} else if self.parse_keyword(Keyword::DEFAULT) {
Ok(Some(ColumnOption::DefaultColumns(self.parse_expr()?)))
if self.parse_keyword(Keyword::INTERNAL) {
Ok(Some(ColumnOption::DefaultValueInternal {
// Placeholder. Will fill during definition purification for schema change.
persisted: Default::default(),
expr: None,
}))
} else {
Ok(Some(ColumnOption::DefaultValue(self.parse_expr()?)))
}
} else if self.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
Ok(Some(ColumnOption::Unique { is_primary: true }))
} else if self.parse_keyword(Keyword::UNIQUE) {
Expand Down
8 changes: 4 additions & 4 deletions src/sqlparser/tests/sqlparser_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn parse_create_table_with_defaults() {
None,
vec![ColumnOptionDef {
name: None,
option: ColumnOption::DefaultColumns(verified_expr(
option: ColumnOption::DefaultValue(verified_expr(
"nextval(public.customer_customer_id_seq)"
))
}],
Expand Down Expand Up @@ -100,7 +100,7 @@ fn parse_create_table_with_defaults() {
vec![
ColumnOptionDef {
name: None,
option: ColumnOption::DefaultColumns(Expr::Value(Value::Boolean(
option: ColumnOption::DefaultValue(Expr::Value(Value::Boolean(
true
))),
},
Expand All @@ -117,7 +117,7 @@ fn parse_create_table_with_defaults() {
vec![
ColumnOptionDef {
name: None,
option: ColumnOption::DefaultColumns(verified_expr(
option: ColumnOption::DefaultValue(verified_expr(
"CAST(now() AS TEXT)"
))
},
Expand All @@ -134,7 +134,7 @@ fn parse_create_table_with_defaults() {
vec![
ColumnOptionDef {
name: None,
option: ColumnOption::DefaultColumns(verified_expr("now()")),
option: ColumnOption::DefaultValue(verified_expr("now()")),
},
ColumnOptionDef {
name: None,
Expand Down
Loading