Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): allow specifying (partial) schema when creating table with derived schema #20203

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion e2e_test/source_inline/connection/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ create sink sink_kafka from data_table with (

sleep 3s

query IT rowsort
query IT rowsort retry 3 backoff 5s
select a, b from t1;
----
1 a
Expand Down
12 changes: 0 additions & 12 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,6 @@ Caused by these errors (recent errors listed first):
4: Item not found: Invalid column: bar


# Can't drop non-generated column
# TODO(purify): may support it.
statement error
ALTER TABLE t DROP COLUMN foo;
----
db error: ERROR: Failed to run the query

Caused by:
Not supported: alter table with schema registry
HINT: try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead


# Drop generated column
statement ok
ALTER TABLE t DROP COLUMN gen_col;
Expand Down
199 changes: 199 additions & 0 deletions e2e_test/source_inline/kafka/avro/partial_schema.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
control substitution on

# cleanup
system ok
rpk topic delete 'avro_partial_schema_test' || true; \
(rpk sr subject delete 'avro_partial_schema_test-value' && rpk sr subject delete 'avro_partial_schema_test-value' --permanent) || true;

# create topic and sr subject
system ok
rpk topic create 'avro_partial_schema_test'

# create a schema
system ok
sr_register avro_partial_schema_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}'

# Specify schema
statement ok
create table t1 (foo varchar, bar int)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_partial_schema_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

# Specify partial schema
statement ok
create table t2 (bar int, gen_col int as bar + 1)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_partial_schema_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

# Specify incorrect schema
statement error
create table t (bar int, foo varchar, baz int)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_partial_schema_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);
----
db error: ERROR: Failed to run the query

Caused by:
Protocol error: Column "baz" is defined in SQL but not found in the source


statement error
create table t (bar double)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_partial_schema_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);
----
db error: ERROR: Failed to run the query

Caused by:
Protocol error: Data type mismatch for column "bar". Defined in SQL as "double precision", but found in the source as "integer"


# Resolve schema
statement ok
create table tstar (*, gen_col int as bar + 1)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_partial_schema_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

# No wildcard will be interpreted as `*` for syntax backward compatibility
statement ok
create table tstar2 (gen_col int as bar + 1)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_partial_schema_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);


# Demonstrate purified definition
query TT rowsort
SELECT name, SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tables WHERE name LIKE 't%';
----
t1 CREATE TABLE t1 (foo CHARACTER VARYING, bar INT)
t2 CREATE TABLE t2 (bar INT, gen_col INT AS bar + 1)
tstar CREATE TABLE tstar (bar INT, foo CHARACTER VARYING, gen_col INT AS bar + 1)
tstar2 CREATE TABLE tstar2 (bar INT, foo CHARACTER VARYING, gen_col INT AS bar + 1)

# create a new schema
system ok
sr_register avro_partial_schema_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}, {"name":"baz", "type":"double", "default":0}]}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add one more test

  1. declare schema (a int, b varchar)
  2. create table with generated column (a int, b varchar, c int as a + 1)
  3. create a newer version in schema registry (a int, b varchar, c int)
  4. what does the purified SQL tend to be at the moment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Step 3 will not affect the purified SQL because we don't access (then test against) the external schema unless we are going to perform schema change.

However, this is still an interesting example, as I'm not sure about the behavior if we call REFRESH or ADD COLUMN afterwards. I'm afraid this has long been undefined behavior since we allow the definition of a generated column on a source. Will test it.


# Can perform `[ADD | DROP] COLUMN` no matter whether the schema is from resolved
# However, the schema will be checked against the resolved schema
statement ok
alter table t1 drop column foo;

statement error
alter table t2 add column baz int;
----
db error: ERROR: Failed to run the query

Caused by:
Protocol error: Data type mismatch for column "baz". Defined in SQL as "integer", but found in the source as "double precision"
Comment on lines +112 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean we always keep track of the latest version in the schema registry and prevent possible future name/type collide?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes



statement ok
alter table t2 add column baz double;

statement error
alter table t2 add column bbaazz double;
----
db error: ERROR: Failed to run the query

Caused by:
Protocol error: Column "bbaazz" is defined in SQL but not found in the source


statement ok
alter table tstar drop column foo;

statement error
alter table tstar add column baz int;
----
db error: ERROR: Failed to run the query

Caused by:
Protocol error: Data type mismatch for column "baz". Defined in SQL as "integer", but found in the source as "double precision"


statement ok
alter table tstar add column baz double;

statement error
alter table tstar add column bbaazz double;
----
db error: ERROR: Failed to run the query

Caused by:
Protocol error: Column "bbaazz" is defined in SQL but not found in the source


# Demonstrate purified definition
query TT rowsort
SELECT name, SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tables WHERE name LIKE 't%';
----
t1 CREATE TABLE t1 (bar INT)
t2 CREATE TABLE t2 (bar INT, gen_col INT AS bar + 1, baz DOUBLE)
tstar CREATE TABLE tstar (bar INT, gen_col INT AS bar + 1, baz DOUBLE)
tstar2 CREATE TABLE tstar2 (bar INT, foo CHARACTER VARYING, gen_col INT AS bar + 1)

# Can refresh schema no matter whether the schema is from resolved
statement ok
alter table t1 refresh schema;

statement ok
alter table t2 refresh schema;

statement ok
alter table tstar refresh schema;

statement ok
alter table tstar2 refresh schema;

# Demonstrate purified definition
query TT rowsort
SELECT name, SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tables WHERE name LIKE 't%';
----
t1 CREATE TABLE t1 (bar INT, foo CHARACTER VARYING, baz DOUBLE)
t2 CREATE TABLE t2 (bar INT, foo CHARACTER VARYING, baz DOUBLE, gen_col INT AS bar + 1)
tstar CREATE TABLE tstar (bar INT, foo CHARACTER VARYING, baz DOUBLE, gen_col INT AS bar + 1)
tstar2 CREATE TABLE tstar2 (bar INT, foo CHARACTER VARYING, baz DOUBLE, gen_col INT AS bar + 1)

# Cleanup
statement ok
DROP TABLE t1;

statement ok
DROP TABLE t2;

statement ok
DROP TABLE tstar;

statement ok
DROP TABLE tstar2;
59 changes: 25 additions & 34 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement};

use super::create_source::{schema_has_schema_registry, SqlColumnStrategy};
use super::create_source::SqlColumnStrategy;
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
Expand Down Expand Up @@ -260,33 +260,10 @@ pub async fn handle_alter_table_column(

// Retrieve the original table definition and parse it to AST.
let mut definition = original_catalog.create_sql_ast_purified()?;
let Statement::CreateTable {
columns,
format_encode,
..
} = &mut definition
else {
let Statement::CreateTable { columns, .. } = &mut definition else {
panic!("unexpected statement: {:?}", definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());

let fail_if_has_schema_registry = || {
if let Some(format_encode) = &format_encode
&& schema_has_schema_registry(format_encode)
{
// TODO(purify): we may support this.
Err(ErrorCode::NotSupported(
"alter table with schema registry".to_owned(),
"try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
))
} else {
Ok(())
}
};

if !original_catalog.incoming_sinks.is_empty()
&& matches!(operation, AlterTableOperation::DropColumn { .. })
{
Expand All @@ -295,12 +272,27 @@ pub async fn handle_alter_table_column(
))?;
}

match operation {
// The `sql_column_strategy` will be `FollowChecked` if the operation is `AddColumn`, and
// `FollowUnchecked` if the operation is `DropColumn`.
//
// Consider the following example:
// - There was a column `foo` and a generated column `gen` that references `foo`.
// - The external schema is updated to remove `foo`.
// - The user tries to drop `foo` from the table.
//
// Dropping `foo` directly will fail because `gen` references `foo`. However, dropping `gen`
// first will also be rejected because `foo` does not exist any more. Also, executing
// `REFRESH SCHEMA` will not help because it keeps the generated column. The user gets stuck.
//
// `FollowUnchecked` workarounds this issue. There are also some alternatives:
// - Allow dropping multiple columns at once.
// - Check against the persisted schema, instead of resolving again.
//
// Applied only to tables with schema registry.
let sql_column_strategy = match operation {
AlterTableOperation::AddColumn {
column_def: new_column,
} => {
fail_if_has_schema_registry()?;

// Duplicated names can actually be checked by `StreamMaterialize`. We do here for
// better error reporting.
let new_column_name = new_column.name.real_value();
Expand All @@ -325,6 +317,8 @@ pub async fn handle_alter_table_column(

// Add the new column to the table definition if it is not created by `create table (*)` syntax.
columns.push(new_column);

SqlColumnStrategy::FollowChecked
}

AlterTableOperation::DropColumn {
Expand All @@ -338,10 +332,6 @@ pub async fn handle_alter_table_column(

// Check if the column to drop is referenced by any generated columns.
for column in original_catalog.columns() {
if column_name.real_value() == column.name() && !column.is_generated() {
fail_if_has_schema_registry()?;
}

if let Some(expr) = column.generated_expr() {
let expr = ExprImpl::from_expr_proto(expr)?;
let refs = expr.collect_input_refs(original_catalog.columns().len());
Expand Down Expand Up @@ -381,17 +371,18 @@ pub async fn handle_alter_table_column(
column_name, table_name
)))?
}

SqlColumnStrategy::FollowUnchecked
}

_ => unreachable!(),
};

let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
&session,
table_name,
definition,
&original_catalog,
SqlColumnStrategy::Follow,
sql_column_strategy,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ pub(crate) async fn reparse_table_for_sink(
None,
include_column_options,
engine,
SqlColumnStrategy::Follow,
SqlColumnStrategy::FollowUnchecked,
)
.await?;

Expand Down
Loading
Loading