Skip to content

Commit

Permalink
use un-purified sql for sr refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 15, 2025
1 parent ee40fb7 commit eedda4d
Showing 1 changed file with 15 additions and 27 deletions.
42 changes: 15 additions & 27 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ use fancy_regex::Regex;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;

use super::alter_source_with_sr::alter_definition_format_encode;
use super::alter_table_column::fetch_table_catalog_for_alter;
use super::create_source::schema_has_schema_registry;
use super::util::SourceSchemaCompatExt;
Expand Down Expand Up @@ -48,32 +46,22 @@ pub async fn handle_refresh_schema(
bail_not_implemented!("alter table with incoming sinks");
}

let format_encode = {
let format_encode = get_format_encode_from_table(&original_table)?;
if !format_encode
.as_ref()
.is_some_and(schema_has_schema_registry)
{
return Err(ErrorCode::NotSupported(
"tables without schema registry cannot refreshed".to_owned(),
"try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
)
.into());
}
format_encode.unwrap()
};

// NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use.
// TODO(purify): use purified definition.
let definition = alter_definition_format_encode(
&original_table.definition,
format_encode.row_options.clone(),
)?;
let format_encode = get_format_encode_from_table(&original_table)?;
if !format_encode
.as_ref()
.is_some_and(schema_has_schema_registry)
{
return Err(ErrorCode::NotSupported(
"tables without schema registry cannot be refreshed".to_owned(),
"try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
)
.into());
}

let [definition]: [_; 1] = Parser::parse_sql(&definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
// Not using the purified definition because we want to re-fetch the schema.
let definition = original_table
.create_sql_ast()
.context("unable to parse original table definition")?;

let (source, table, graph, col_index_mapping, job_type) = {
let result =
Expand Down

0 comments on commit eedda4d

Please sign in to comment.