-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(frontend): use purified definition for replacing table #20131
Changes from all commits
ae5a320
165e30b
514f615
9551f1b
374bf0e
93975ac
442512c
9490a5d
abe84cc
8aacd71
4014b51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; | |
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; | ||
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement}; | ||
|
||
use super::create_source::schema_has_schema_registry; | ||
use super::create_source::{schema_has_schema_registry, SqlColumnStrategy}; | ||
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; | ||
use super::util::SourceSchemaCompatExt; | ||
use super::{HandlerArgs, RwPgResponse}; | ||
|
@@ -90,6 +90,7 @@ pub async fn get_replace_table_plan( | |
table_name: ObjectName, | ||
new_definition: Statement, | ||
old_catalog: &Arc<TableCatalog>, | ||
sql_column_strategy: SqlColumnStrategy, | ||
) -> Result<( | ||
Option<Source>, | ||
Table, | ||
|
@@ -144,6 +145,7 @@ pub async fn get_replace_table_plan( | |
cdc_table_info, | ||
include_column_options, | ||
engine, | ||
sql_column_strategy, | ||
) | ||
.await?; | ||
|
||
|
@@ -257,7 +259,7 @@ pub async fn handle_alter_table_column( | |
} | ||
|
||
// Retrieve the original table definition and parse it to AST. | ||
let mut definition = original_catalog.create_sql_ast()?; | ||
let mut definition = original_catalog.create_sql_ast_purified()?; | ||
let Statement::CreateTable { | ||
columns, | ||
format_encode, | ||
|
@@ -275,6 +277,7 @@ pub async fn handle_alter_table_column( | |
if let Some(format_encode) = &format_encode | ||
&& schema_has_schema_registry(format_encode) | ||
{ | ||
// TODO(purify): we may support this. | ||
Err(ErrorCode::NotSupported( | ||
"alter table with schema registry".to_owned(), | ||
"try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(), | ||
|
@@ -284,13 +287,6 @@ pub async fn handle_alter_table_column( | |
} | ||
}; | ||
|
||
if columns.is_empty() { | ||
Err(ErrorCode::NotSupported( | ||
"alter a table with empty column definitions".to_owned(), | ||
"Please recreate the table with column definitions.".to_owned(), | ||
))? | ||
} | ||
|
||
Comment on lines
-287
to
-293
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously there were two cases where
For case 1, as we always use a purified definition here, altering it is feasible. For case 2, we should have allowed it. |
||
if !original_catalog.incoming_sinks.is_empty() | ||
&& matches!(operation, AlterTableOperation::DropColumn { .. }) | ||
{ | ||
|
@@ -390,8 +386,14 @@ pub async fn handle_alter_table_column( | |
_ => unreachable!(), | ||
}; | ||
|
||
let (source, table, graph, col_index_mapping, job_type) = | ||
get_replace_table_plan(&session, table_name, definition, &original_catalog).await?; | ||
let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan( | ||
&session, | ||
table_name, | ||
definition, | ||
&original_catalog, | ||
SqlColumnStrategy::Follow, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strictly follow the (altered) columns defined in SQL and avoid refreshing. |
||
) | ||
.await?; | ||
|
||
let catalog_writer = session.catalog_writer()?; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,10 @@ use fancy_regex::Regex; | |
use pgwire::pg_response::{PgResponse, StatementType}; | ||
use risingwave_common::bail_not_implemented; | ||
use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement}; | ||
use risingwave_sqlparser::parser::Parser; | ||
use thiserror_ext::AsReport; | ||
|
||
use super::alter_source_with_sr::alter_definition_format_encode; | ||
use super::alter_table_column::fetch_table_catalog_for_alter; | ||
use super::create_source::schema_has_schema_registry; | ||
use super::create_source::{schema_has_schema_registry, SqlColumnStrategy}; | ||
use super::util::SourceSchemaCompatExt; | ||
use super::{get_replace_table_plan, HandlerArgs, RwPgResponse}; | ||
use crate::error::{ErrorCode, Result}; | ||
|
@@ -47,35 +45,31 @@ pub async fn handle_refresh_schema( | |
bail_not_implemented!("alter table with incoming sinks"); | ||
} | ||
|
||
let format_encode = { | ||
let format_encode = get_format_encode_from_table(&original_table)?; | ||
if !format_encode | ||
.as_ref() | ||
.is_some_and(schema_has_schema_registry) | ||
{ | ||
return Err(ErrorCode::NotSupported( | ||
"tables without schema registry cannot refreshed".to_owned(), | ||
"try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(), | ||
) | ||
.into()); | ||
} | ||
format_encode.unwrap() | ||
}; | ||
|
||
// NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use. | ||
let definition = alter_definition_format_encode( | ||
&original_table.definition, | ||
format_encode.row_options.clone(), | ||
)?; | ||
Comment on lines
-65
to
-69
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is no-op. Don't get the point. Removed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes.. I used to find it in a refator but forget to delete it. |
||
let format_encode = get_format_encode_from_table(&original_table)?; | ||
if !format_encode | ||
.as_ref() | ||
.is_some_and(schema_has_schema_registry) | ||
{ | ||
return Err(ErrorCode::NotSupported( | ||
"tables without schema registry cannot be refreshed".to_owned(), | ||
"try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(), | ||
) | ||
.into()); | ||
} | ||
|
||
let [definition]: [_; 1] = Parser::parse_sql(&definition) | ||
.context("unable to parse original table definition")? | ||
.try_into() | ||
.unwrap(); | ||
let definition = original_table | ||
.create_sql_ast_purified() | ||
.context("unable to parse original table definition")?; | ||
|
||
let (source, table, graph, col_index_mapping, job_type) = { | ||
let result = | ||
get_replace_table_plan(&session, table_name, definition, &original_table).await; | ||
let result = get_replace_table_plan( | ||
&session, | ||
table_name, | ||
definition, | ||
&original_table, | ||
SqlColumnStrategy::Ignore, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resolve columns from schema registry again and fully replace the current definition. |
||
) | ||
.await; | ||
match result { | ||
Ok((source, table, graph, col_index_mapping, job_type)) => { | ||
Ok((source, table, graph, col_index_mapping, job_type)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,7 +44,7 @@ use risingwave_sqlparser::ast::{ | |
}; | ||
|
||
use super::create_mv::get_column_names; | ||
use super::create_source::UPSTREAM_SOURCE_KEY; | ||
use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY}; | ||
use super::util::gen_query_from_table_name; | ||
use super::RwPgResponse; | ||
use crate::binder::Binder; | ||
|
@@ -502,7 +502,7 @@ pub(crate) async fn reparse_table_for_sink( | |
table_catalog: &Arc<TableCatalog>, | ||
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> { | ||
// Retrieve the original table definition and parse it to AST. | ||
let definition = table_catalog.create_sql_ast()?; | ||
let definition = table_catalog.create_sql_ast_purified()?; | ||
let Statement::CreateTable { | ||
name, | ||
format_encode, | ||
|
@@ -558,6 +558,7 @@ pub(crate) async fn reparse_table_for_sink( | |
None, | ||
include_column_options, | ||
engine, | ||
SqlColumnStrategy::Follow, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strictly follow the columns defined in SQL and avoid refreshing. Creating a new sink into a table should not alter the table's schema. |
||
) | ||
.await?; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Showing that
This will panic the compute node in the current release.