Skip to content

Commit

Permalink
use purified sql for sr refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 13, 2025
1 parent d9fbf84 commit ece77bb
Showing 1 changed file with 14 additions and 25 deletions.
39 changes: 14 additions & 25 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,21 @@ pub async fn handle_refresh_schema(
bail_not_implemented!("alter table with incoming sinks");
}

let format_encode = {
let format_encode = get_format_encode_from_table(&original_table)?;
if !format_encode
.as_ref()
.is_some_and(schema_has_schema_registry)
{
return Err(ErrorCode::NotSupported(
"tables without schema registry cannot refreshed".to_owned(),
"try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
)
.into());
}
format_encode.unwrap()
};

// NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use.
// TODO(purify): use purified definition.
let definition = alter_definition_format_encode(
&original_table.definition,
format_encode.row_options.clone(),
)?;
let format_encode = get_format_encode_from_table(&original_table)?;
if !format_encode
.as_ref()
.is_some_and(schema_has_schema_registry)
{
return Err(ErrorCode::NotSupported(
"tables without schema registry cannot refreshed".to_owned(),
"try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
)
.into());
}

let [definition]: [_; 1] = Parser::parse_sql(&definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let definition = original_table
.create_sql_ast_purified()
.context("unable to parse original table definition")?;

let (source, table, graph, col_index_mapping, job_type) = {
let result =
Expand Down

0 comments on commit ece77bb

Please sign in to comment.