Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): use purified definition for replacing table #20131

Merged
merged 11 commits into from
Jan 17, 2025
29 changes: 27 additions & 2 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,28 @@ select * from t
system ok
sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"},{"name":"nested","type":["null",{"type":"record","name":"Nested","fields":[{"name":"baz","type":"int"}]}],"default":null}]}'

# Refresh table schema should succeed
# Before refreshing schema, we create a `SINK INTO TABLE` which involves table replacement,
# showing that this will NOT accidentally refresh the schema.
Comment on lines +48 to +49
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Showing that

2. For SINK INTO TABLE, ensure schema unchanged by using the defined schema after purification.

Case 2 actually fixes a long-standing issue caused by impurity. Previously, creating a sink into a table will accidentally refresh its schema.

This will panic the compute node in the current release.

statement ok
CREATE SINK sk INTO t AS SELECT 8 AS bar, 'XYZ' AS foo WITH (type = 'append-only');

# Demonstrate purified definition
# Should not include the new field `nested`
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tables WHERE name = 't';
----
CREATE TABLE t (bar INT, foo CHARACTER VARYING, gen_col INT AS bar + 1)

query ? rowsort
select * from t
----
1 ABC 2
8 XYZ 9

statement ok
DROP SINK sk;

# Now, refresh table schema. Should reflect the new schema.
statement ok
ALTER TABLE t REFRESH SCHEMA;

Expand All @@ -55,10 +76,11 @@ SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tab
----
CREATE TABLE t (bar INT, foo CHARACTER VARYING, nested STRUCT<baz INT>, gen_col INT AS bar + 1)

query ?
query ? rowsort
select * from t
----
1 ABC NULL 2
8 XYZ NULL 9

# Produce a new message with the new schema
system ok
Expand All @@ -71,6 +93,7 @@ select * from t
----
1 ABC NULL 2
2 DEF (2) 3
8 XYZ NULL 9

# create a new version of schema that added a new field to "nested"
system ok
Expand Down Expand Up @@ -106,6 +129,7 @@ Caused by these errors (recent errors listed first):


# Can't drop non-generated column
# TODO(purify): may support it.
statement error
ALTER TABLE t DROP COLUMN foo;
----
Expand All @@ -129,6 +153,7 @@ select * from t
----
ABC
DEF
XYZ

statement ok
drop table t;
Expand Down
4 changes: 3 additions & 1 deletion e2e_test/source_legacy/cdc_inline/alter/cdc_table_alter.slt
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ drop table my_orders;
statement ok
create table orders_test (*) from mysql_source table 'testdb1.orders';

statement error Not supported: alter a table with empty column definitions
# Previously this was unsupported:
# statement error Not supported: alter a table with empty column definitions
statement error column "order_comment" of table "orders_test" already exists
ALTER TABLE orders_test ADD COLUMN order_comment VARCHAR;

statement ok
Expand Down
20 changes: 3 additions & 17 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,12 @@ pub fn try_purify_table_source_create_sql_ast(
bail!("expect `CREATE TABLE` or `CREATE SOURCE` statement, found: `{base:?}`");
};

// Filter out columns that are not defined by users in SQL.
let defined_columns = columns.iter().filter(|c| c.is_user_defined());

// If all columns are defined, check if the count matches.
if !column_defs.is_empty() && wildcard_idx.is_none() {
let defined_columns_len = defined_columns.clone().count();
if column_defs.len() != defined_columns_len {
bail /* unlikely */ !(
"column count mismatch: defined {} columns, but {} columns in the definition",
defined_columns_len,
column_defs.len()
);
}
}

// Now derive the missing columns and constraints.

// First, remove the wildcard from the definition.
*wildcard_idx = None;

// Filter out columns that are not defined by users in SQL.
let defined_columns = columns.iter().filter(|c| c.is_user_defined());

// Derive `ColumnDef` from `ColumnCatalog`.
let mut purified_column_defs = Vec::new();
for column in defined_columns {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner as i32,
definition: t.definition.clone(),
definition: t.create_sql(),
relationtype: "MATERIALIZED VIEW".into(),
relationid: t.id.table_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
Expand All @@ -106,7 +106,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner as i32,
definition: t.definition.clone(),
definition: t.create_sql_purified(),
relationtype: "TABLE".into(),
relationid: t.id.table_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
Expand Down Expand Up @@ -144,7 +144,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.index_table.owner as i32,
definition: t.index_table.definition.clone(),
definition: t.index_table.create_sql(),
relationtype: "INDEX".into(),
relationid: t.index_table.id.table_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
Expand Down
24 changes: 13 additions & 11 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement};

use super::create_source::schema_has_schema_registry;
use super::create_source::{schema_has_schema_registry, SqlColumnStrategy};
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
Expand Down Expand Up @@ -90,6 +90,7 @@ pub async fn get_replace_table_plan(
table_name: ObjectName,
new_definition: Statement,
old_catalog: &Arc<TableCatalog>,
sql_column_strategy: SqlColumnStrategy,
) -> Result<(
Option<Source>,
Table,
Expand Down Expand Up @@ -144,6 +145,7 @@ pub async fn get_replace_table_plan(
cdc_table_info,
include_column_options,
engine,
sql_column_strategy,
)
.await?;

Expand Down Expand Up @@ -257,7 +259,7 @@ pub async fn handle_alter_table_column(
}

// Retrieve the original table definition and parse it to AST.
let mut definition = original_catalog.create_sql_ast()?;
let mut definition = original_catalog.create_sql_ast_purified()?;
let Statement::CreateTable {
columns,
format_encode,
Expand All @@ -275,6 +277,7 @@ pub async fn handle_alter_table_column(
if let Some(format_encode) = &format_encode
&& schema_has_schema_registry(format_encode)
{
// TODO(purify): we may support this.
Err(ErrorCode::NotSupported(
"alter table with schema registry".to_owned(),
"try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
Expand All @@ -284,13 +287,6 @@ pub async fn handle_alter_table_column(
}
};

if columns.is_empty() {
Err(ErrorCode::NotSupported(
"alter a table with empty column definitions".to_owned(),
"Please recreate the table with column definitions.".to_owned(),
))?
}

Comment on lines -287 to -293
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously there were two cases where columns is empty:

  1. All columns are resolved from external source instead of defined in SQL.
  2. The table indeed has zero columns.

For case 1, as we always use a purified definition here, altering it is feasible. For case 2, we should have allowed it.

if !original_catalog.incoming_sinks.is_empty()
&& matches!(operation, AlterTableOperation::DropColumn { .. })
{
Expand Down Expand Up @@ -390,8 +386,14 @@ pub async fn handle_alter_table_column(
_ => unreachable!(),
};

let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog).await?;
let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
&session,
table_name,
definition,
&original_catalog,
SqlColumnStrategy::Follow,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly follow the (altered) columns defined in SQL and avoid refreshing.

)
.await?;

let catalog_writer = session.catalog_writer()?;

Expand Down
52 changes: 23 additions & 29 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ use fancy_regex::Regex;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;

use super::alter_source_with_sr::alter_definition_format_encode;
use super::alter_table_column::fetch_table_catalog_for_alter;
use super::create_source::schema_has_schema_registry;
use super::create_source::{schema_has_schema_registry, SqlColumnStrategy};
use super::util::SourceSchemaCompatExt;
use super::{get_replace_table_plan, HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
Expand All @@ -47,35 +45,31 @@ pub async fn handle_refresh_schema(
bail_not_implemented!("alter table with incoming sinks");
}

let format_encode = {
let format_encode = get_format_encode_from_table(&original_table)?;
if !format_encode
.as_ref()
.is_some_and(schema_has_schema_registry)
{
return Err(ErrorCode::NotSupported(
"tables without schema registry cannot refreshed".to_owned(),
"try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
)
.into());
}
format_encode.unwrap()
};

// NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use.
let definition = alter_definition_format_encode(
&original_table.definition,
format_encode.row_options.clone(),
)?;
Comment on lines -65 to -69
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no-op. Don't get the point. Removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. I used to find it in a refator but forget to delete it.

let format_encode = get_format_encode_from_table(&original_table)?;
if !format_encode
.as_ref()
.is_some_and(schema_has_schema_registry)
{
return Err(ErrorCode::NotSupported(
"tables without schema registry cannot be refreshed".to_owned(),
"try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
)
.into());
}

let [definition]: [_; 1] = Parser::parse_sql(&definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let definition = original_table
.create_sql_ast_purified()
.context("unable to parse original table definition")?;

let (source, table, graph, col_index_mapping, job_type) = {
let result =
get_replace_table_plan(&session, table_name, definition, &original_table).await;
let result = get_replace_table_plan(
&session,
table_name,
definition,
&original_table,
SqlColumnStrategy::Ignore,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolve columns from schema registry again and fully replace the current definition.

)
.await;
match result {
Ok((source, table, graph, col_index_mapping, job_type)) => {
Ok((source, table, graph, col_index_mapping, job_type))
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use risingwave_sqlparser::ast::{
};

use super::create_mv::get_column_names;
use super::create_source::UPSTREAM_SOURCE_KEY;
use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
use super::util::gen_query_from_table_name;
use super::RwPgResponse;
use crate::binder::Binder;
Expand Down Expand Up @@ -502,7 +502,7 @@ pub(crate) async fn reparse_table_for_sink(
table_catalog: &Arc<TableCatalog>,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> {
// Retrieve the original table definition and parse it to AST.
let definition = table_catalog.create_sql_ast()?;
let definition = table_catalog.create_sql_ast_purified()?;
let Statement::CreateTable {
name,
format_encode,
Expand Down Expand Up @@ -558,6 +558,7 @@ pub(crate) async fn reparse_table_for_sink(
None,
include_column_options,
engine,
SqlColumnStrategy::Follow,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly follow the columns defined in SQL and avoid refreshing. Creating a new sink into a table should not alter the table's schema.

)
.await?;

Expand Down
Loading
Loading