From eedda4dbaa26851a9936fb1125e2d8be221f4adf Mon Sep 17 00:00:00 2001 From: Bugen Zhao <i@bugenzhao.com> Date: Tue, 14 Jan 2025 15:00:46 +0800 Subject: [PATCH] use un-purified sql for sr refresh Signed-off-by: Bugen Zhao <i@bugenzhao.com> --- .../src/handler/alter_table_with_sr.rs | 42 +++++++------------ 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs index bb2d320f294e7..e39b72f96ae93 100644 --- a/src/frontend/src/handler/alter_table_with_sr.rs +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -17,10 +17,8 @@ use fancy_regex::Regex; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement}; -use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; -use super::alter_source_with_sr::alter_definition_format_encode; use super::alter_table_column::fetch_table_catalog_for_alter; use super::create_source::schema_has_schema_registry; use super::util::SourceSchemaCompatExt; @@ -48,32 +46,22 @@ pub async fn handle_refresh_schema( bail_not_implemented!("alter table with incoming sinks"); } - let format_encode = { - let format_encode = get_format_encode_from_table(&original_table)?; - if !format_encode - .as_ref() - .is_some_and(schema_has_schema_registry) - { - return Err(ErrorCode::NotSupported( - "tables without schema registry cannot refreshed".to_owned(), - "try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(), - ) - .into()); - } - format_encode.unwrap() - }; - - // NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use. - // TODO(purify): use purified definition. - let definition = alter_definition_format_encode( - &original_table.definition, - format_encode.row_options.clone(), - )?; + let format_encode = get_format_encode_from_table(&original_table)?; + if !format_encode + .as_ref() + .is_some_and(schema_has_schema_registry) + { + return Err(ErrorCode::NotSupported( + "tables without schema registry cannot be refreshed".to_owned(), + "try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(), + ) + .into()); + } - let [definition]: [_; 1] = Parser::parse_sql(&definition) - .context("unable to parse original table definition")? - .try_into() - .unwrap(); + // Not using the purified definition because we want to re-fetch the schema. + let definition = original_table + .create_sql_ast() + .context("unable to parse original table definition")?; let (source, table, graph, col_index_mapping, job_type) = { let result =