diff --git a/src/frontend/src/catalog/mod.rs b/src/frontend/src/catalog/mod.rs index 49ed5f65c67ef..af22f83f12fcc 100644 --- a/src/frontend/src/catalog/mod.rs +++ b/src/frontend/src/catalog/mod.rs @@ -26,7 +26,7 @@ use thiserror::Error; use crate::error::{ErrorCode, Result, RwError}; pub(crate) mod catalog_service; -mod purify; +pub mod purify; pub(crate) mod connection_catalog; pub(crate) mod database_catalog; diff --git a/src/frontend/src/catalog/purify.rs b/src/frontend/src/catalog/purify.rs index 7a5e031a945b2..02db182c71d87 100644 --- a/src/frontend/src/catalog/purify.rs +++ b/src/frontend/src/catalog/purify.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::Context; use prost::Message as _; use risingwave_common::bail; use risingwave_common::catalog::{ColumnCatalog, ColumnId}; @@ -132,7 +133,10 @@ pub fn try_purify_table_source_create_sql_ast( let mut pk_columns = Vec::new(); for &id in pk_column_ids { - let column = columns.iter().find(|c| c.column_id() == id).unwrap(); + let column = columns + .iter() + .find(|c| c.column_id() == id) + .context("primary key column not found")?; if !column.is_user_defined() { bail /* unlikely */ !( "primary key column \"{}\" is not user-defined", diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index b8a819801d14b..c775f7ae4a879 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; use itertools::Itertools; @@ -26,22 +26,19 @@ use risingwave_pb::catalog::{Source, Table}; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; -use risingwave_sqlparser::ast::{ - AlterTableOperation, ColumnDef, ColumnOption, Ident, ObjectName, Statement, TableConstraint, -}; +use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement}; use super::create_source::schema_has_schema_registry; use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; +use crate::catalog::purify::try_purify_table_source_create_sql_ast; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::table_catalog::TableType; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, InputRef, Literal}; use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; -use crate::handler::create_table::bind_table_constraints; use crate::session::SessionImpl; -use crate::utils::data_type::DataTypeToAst; use crate::{Binder, TableCatalog}; /// Used in auto schema change process @@ -52,63 +49,38 @@ pub async fn get_new_table_definition_for_cdc_table( ) -> Result<(Statement, Arc)> { let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; - // Retrieve the original table definition and parse it to AST. - let mut definition = original_catalog.create_sql_ast()?; - - let Statement::CreateTable { - columns: original_columns, - format_encode, - constraints, - .. - } = &mut definition - else { - panic!("unexpected statement: {:?}", definition); - }; - - assert!( - format_encode.is_none(), - "source schema should be None for CDC table" + assert_eq!( + original_catalog.row_id_index, None, + "primary key of cdc table must be user defined" ); - if bind_table_constraints(constraints)?.is_empty() { - // For table created by `create table t (*)` the constraint is empty, we need to - // retrieve primary key names from original table catalog if available - let pk_names: Vec<_> = original_catalog - .pk - .iter() - .map(|x| original_catalog.columns[x.column_index].name().to_owned()) - .collect(); - - constraints.push(TableConstraint::Unique { - name: None, - columns: pk_names.iter().map(Ident::new_unchecked).collect(), - is_primary: true, - }); - } + // Retrieve the original table definition. + let mut definition = original_catalog.create_sql_ast()?; - let orig_column_catalog: HashMap = HashMap::from_iter( - original_catalog - .columns() - .iter() - .map(|col| (col.name().to_owned(), col.clone())), - ); + // Clear the original columns field, so that we'll follow `new_columns` to generate a + // purified definition. + { + let Statement::CreateTable { + columns, + constraints, + .. + } = &mut definition + else { + panic!("unexpected statement: {:?}", definition); + }; - // update the original columns with new version columns - let mut new_column_defs = vec![]; - for new_col in new_columns { - // if the column exists in the original catalog, use it to construct the column definition. - // since we don't support altering the column type right now - if let Some(original_col) = orig_column_catalog.get(new_col.name()) { - let ty = original_col.data_type().to_ast(); - new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![])); - } else { - let ty = new_col.data_type().to_ast(); - new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![])); - } + columns.clear(); + constraints.clear(); } - *original_columns = new_column_defs; - Ok((definition, original_catalog)) + let new_definition = try_purify_table_source_create_sql_ast( + definition, + new_columns, + None, + &original_catalog.pk_column_ids(), + )?; + + Ok((new_definition, original_catalog)) } pub async fn get_replace_table_plan( diff --git a/src/frontend/src/rpc/mod.rs b/src/frontend/src/rpc/mod.rs index a255d34c1623b..a3000ddcb86b9 100644 --- a/src/frontend/src/rpc/mod.rs +++ b/src/frontend/src/rpc/mod.rs @@ -97,6 +97,7 @@ async fn get_new_table_plan( .map(|c| c.into()) .collect_vec(); let table_name = ObjectName::from(vec![table_name.as_str().into()]); + let (new_table_definition, original_catalog) = get_new_table_definition_for_cdc_table(&session, table_name.clone(), &new_version_columns) .await?;