diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt index bbb03c178fa2f..210f17bad63ab 100644 --- a/e2e_test/source_inline/kafka/alter/add_column_shared.slt +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -48,6 +48,12 @@ select * from mv_before_alter; statement ok alter source s add column v3 varchar; +# Demonstrate definition change. +query T +SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's'; +---- +CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING, v3 CHARACTER VARYING) + # New MV will have v3. # Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799> diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 8bce7f4efd5cf..bb36e116279e4 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -52,6 +52,12 @@ No tracking issue yet. Feel free to submit a feature request at https://github.c statement ok alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); +# Demonstrate definition change. +query T +SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's'; +---- +CREATE SOURCE s (foo CHARACTER VARYING, bar INT) + query ?? select * from s ---- diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt index 658d4fa95c6a0..19ded681789cb 100644 --- a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt +++ b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt @@ -20,6 +20,11 @@ FORMAT PLAIN ENCODE PROTOBUF( message = 'test.User' ); +query T +SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user'; +---- +CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>) INCLUDE timestamp + statement ok CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; @@ -45,6 +50,11 @@ set streaming_use_shared_source to false; statement ok ALTER SOURCE src_user REFRESH SCHEMA; +query T +SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user'; +---- +CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT) INCLUDE timestamp + # Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799> query EXPLAIN CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index bf02092c1cc0a..c4856e329d605 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -142,6 +142,15 @@ impl SourceCatalog { self.create_sql_ast() } + + /// Fills the `definition` field with the purified SQL definition. + /// + /// There's no need to call this method for correctness because we automatically purify the + /// SQL definition at the time of querying. However, this helps to maintain more accurate + /// `definition` field in the catalog when directly inspected for debugging purposes. + pub fn fill_purified_create_sql(&mut self) { + self.definition = self.create_sql_purified(); + } } impl From<&PbSource> for SourceCatalog { diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs index e5276c4f4f6bc..a4b3eb18387d8 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs @@ -174,7 +174,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat schemaname: schema.clone(), relationname: t.name.clone(), relationowner: t.owner as i32, - definition: t.definition.clone(), + definition: t.create_sql_purified(), relationtype: "SOURCE".into(), relationid: t.id as i32, relationtimezone: timezone, diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 1fbd859483b43..6e9509ebbd64c 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -12,15 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::max_column_id; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; -use risingwave_sqlparser::ast::{ - AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, -}; -use risingwave_sqlparser::parser::Parser; +use risingwave_sqlparser::ast::{AlterSourceOperation, ObjectName}; use super::create_source::generate_stream_graph_for_source; use super::create_table::bind_sql_columns; @@ -107,17 +103,17 @@ pub async fn handle_alter_source_column( "column \"{new_column_name}\" of source \"{source_name}\" already exists" )))? } - catalog.definition = - alter_definition_add_column(&catalog.definition, column_def.clone())?; let mut bound_column = bind_sql_columns(&[column_def])?.remove(0); bound_column.column_desc.column_id = max_column_id(columns).next(); columns.push(bound_column); + // No need to update the definition here. It will be done by purification later. } _ => unreachable!(), } // update version catalog.version += 1; + catalog.fill_purified_create_sql(); let catalog_writer = session.catalog_writer()?; if catalog.info.is_shared() { @@ -148,27 +144,6 @@ pub async fn handle_alter_source_column( Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) } -/// `alter_definition_add_column` adds a new column to the definition of the relation. -#[inline(always)] -pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Result<String> { - let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); - let mut stmt = ast - .into_iter() - .exactly_one() - .expect("should contains only one statement"); - - match &mut stmt { - Statement::CreateSource { - stmt: CreateSourceStatement { columns, .. }, - } => { - columns.push(column); - } - _ => unreachable!(), - } - - Ok(stmt.to_string()) -} - #[cfg(test)] pub mod tests { use std::collections::BTreeMap; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index e90a250e34bfa..472ce8772b99b 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use anyhow::Context; use either::Either; use itertools::Itertools; use pgwire::pg_response::StatementType; @@ -28,7 +27,6 @@ use risingwave_sqlparser::ast::{ CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName, SqlOption, Statement, }; -use risingwave_sqlparser::parser::Parser; use super::create_source::{ generate_stream_graph_for_source, schema_has_schema_registry, validate_compatibility, @@ -198,10 +196,7 @@ pub async fn refresh_sr_and_get_columns_diff( } fn get_format_encode_from_source(source: &SourceCatalog) -> Result<FormatEncodeOptions> { - let [stmt]: [_; 1] = Parser::parse_sql(&source.definition) - .context("unable to parse original source definition")? - .try_into() - .unwrap(); + let stmt = source.create_sql_ast()?; let Statement::CreateSource { stmt: CreateSourceStatement { format_encode, .. }, } = stmt @@ -263,8 +258,10 @@ pub async fn handle_alter_source_with_sr( source.info = source_info; source.columns.extend(added_columns); - source.definition = - alter_definition_format_encode(&source.definition, format_encode.row_options.clone())?; + source.definition = alter_definition_format_encode( + source.create_sql_ast_purified()?, + format_encode.row_options.clone(), + )?; let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options( WithOptions::try_from(format_encode.row_options())?, @@ -313,15 +310,9 @@ pub async fn handle_alter_source_with_sr( /// Apply the new `format_encode_options` to the source/table definition. pub fn alter_definition_format_encode( - definition: &str, + mut stmt: Statement, format_encode_options: Vec<SqlOption>, ) -> Result<String> { - let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); - let mut stmt = ast - .into_iter() - .exactly_one() - .expect("should contain only one statement"); - match &mut stmt { Statement::CreateSource { stmt: CreateSourceStatement { format_encode, .. }, @@ -391,6 +382,9 @@ pub mod tests { .clone() }; + let source = get_source(); + expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://')"].assert_eq(&source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), "")); + let sql = format!( r#"ALTER SOURCE src FORMAT UPSERT ENCODE PROTOBUF ( message = '.test.TestRecord', @@ -434,10 +428,6 @@ pub mod tests { .unwrap(); assert_eq!(name_column.column_desc.data_type, DataType::Varchar); - let altered_sql = format!( - r#"CREATE SOURCE src WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://{}')"#, - proto_file.path().to_str().unwrap() - ); - assert_eq!(altered_sql, altered_source.definition); + expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL, name CHARACTER VARYING) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://')"].assert_eq(&altered_source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), "")); } }