diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 8e4ccd2c96860..d718864a68f1a 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -279,6 +279,8 @@ impl TableVersion { impl TableCatalog { /// Returns the SQL definition when the table was created, purified with best effort /// if it's a table. + /// + /// See [`Self::create_sql_ast_purified`] for more details. pub fn create_sql_purified(&self) -> String { self.create_sql_ast_purified() .map(|stmt| stmt.to_string()) @@ -297,7 +299,7 @@ impl TableCatalog { let name = ast::ObjectName(vec![self.name.as_str().into()]); ast::Statement::default_create_table(name) } else { - self.create_sql_ast()? + self.create_sql_ast_raw()? }; match try_purify_table_source_create_sql_ast( @@ -316,7 +318,7 @@ impl TableCatalog { } } - self.create_sql_ast() + self.create_sql_ast_raw() } } @@ -481,14 +483,33 @@ impl TableCatalog { } /// Returns the SQL definition when the table was created. + /// + /// See [`Self::create_sql_ast`] for more details. pub fn create_sql(&self) -> String { - self.definition.clone() + self.create_sql_ast() + .map(|stmt| stmt.to_string()) + .unwrap_or_else(|_| self.definition.clone()) } /// Returns the parsed SQL definition when the table was created. /// + /// Re-create the table with this statement may have different schema if the schema is derived + /// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this + /// is not desired, use [`Self::create_sql_ast_purified`] instead. + /// /// Returns error if it's invalid. pub fn create_sql_ast(&self) -> Result { + if let TableType::Table = self.table_type() + && self.definition.is_empty() + { + // Fix `CREATE TABLE AS`. + self.create_sql_ast_purified() + } else { + self.create_sql_ast_raw() + } + } + + fn create_sql_ast_raw(&self) -> Result { Ok(Parser::parse_sql(&self.definition) .context("unable to parse definition sql")? .into_iter() diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 662a5aedcade5..b98958ee5f6e7 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -503,9 +503,7 @@ pub(crate) async fn reparse_table_for_sink( ) -> Result<(StreamFragmentGraph, Table, Option)> { // Retrieve the original table definition and parse it to AST. let definition = table_catalog.create_sql_ast_purified()?; - let raw_definition = table_catalog - .create_sql_ast() - .unwrap_or_else(|_| definition.clone() /* create table as */); + let raw_definition = table_catalog.create_sql_ast()?; let Statement::CreateTable { name,