Skip to content

Commit

Permalink
refactor(frontend): use purified definition for altering source (#20182)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jan 20, 2025
1 parent bf8f076 commit 04a8813
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 49 deletions.
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/alter/add_column_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ select * from mv_before_alter;
statement ok
alter source s add column v3 varchar;

# Demonstrate definition change.
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's';
----
CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING, v3 CHARACTER VARYING)

# New MV will have v3.

# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ No tracking issue yet. Feel free to submit a feature request at https://github.c
statement ok
alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

# Demonstrate definition change.
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's';
----
CREATE SOURCE s (foo CHARACTER VARYING, bar INT)

query ??
select * from s
----
Expand Down
10 changes: 10 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ FORMAT PLAIN ENCODE PROTOBUF(
message = 'test.User'
);

query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user';
----
CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>) INCLUDE timestamp

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

Expand All @@ -45,6 +50,11 @@ set streaming_use_shared_source to false;
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user';
----
CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT) INCLUDE timestamp

# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
query
EXPLAIN CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ impl SourceCatalog {

self.create_sql_ast()
}

/// Fills the `definition` field with the purified SQL definition.
///
/// There's no need to call this method for correctness because we automatically purify the
/// SQL definition at the time of querying. However, this helps to maintain more accurate
/// `definition` field in the catalog when directly inspected for debugging purposes.
pub fn fill_purified_create_sql(&mut self) {
self.definition = self.create_sql_purified();
}
}

impl From<&PbSource> for SourceCatalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner as i32,
definition: t.definition.clone(),
definition: t.create_sql_purified(),
relationtype: "SOURCE".into(),
relationid: t.id as i32,
relationtimezone: timezone,
Expand Down
31 changes: 3 additions & 28 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::max_column_id;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_sqlparser::ast::{
AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement,
};
use risingwave_sqlparser::parser::Parser;
use risingwave_sqlparser::ast::{AlterSourceOperation, ObjectName};

use super::create_source::generate_stream_graph_for_source;
use super::create_table::bind_sql_columns;
Expand Down Expand Up @@ -107,17 +103,17 @@ pub async fn handle_alter_source_column(
"column \"{new_column_name}\" of source \"{source_name}\" already exists"
)))?
}
catalog.definition =
alter_definition_add_column(&catalog.definition, column_def.clone())?;
let mut bound_column = bind_sql_columns(&[column_def])?.remove(0);
bound_column.column_desc.column_id = max_column_id(columns).next();
columns.push(bound_column);
// No need to update the definition here. It will be done by purification later.
}
_ => unreachable!(),
}

// update version
catalog.version += 1;
catalog.fill_purified_create_sql();

let catalog_writer = session.catalog_writer()?;
if catalog.info.is_shared() {
Expand Down Expand Up @@ -148,27 +144,6 @@ pub async fn handle_alter_source_column(
Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
}

/// `alter_definition_add_column` adds a new column to the definition of the relation.
#[inline(always)]
pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Result<String> {
let ast = Parser::parse_sql(definition).expect("failed to parse relation definition");
let mut stmt = ast
.into_iter()
.exactly_one()
.expect("should contains only one statement");

match &mut stmt {
Statement::CreateSource {
stmt: CreateSourceStatement { columns, .. },
} => {
columns.push(column);
}
_ => unreachable!(),
}

Ok(stmt.to_string())
}

#[cfg(test)]
pub mod tests {
use std::collections::BTreeMap;
Expand Down
30 changes: 10 additions & 20 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::sync::Arc;

use anyhow::Context;
use either::Either;
use itertools::Itertools;
use pgwire::pg_response::StatementType;
Expand All @@ -28,7 +27,6 @@ use risingwave_sqlparser::ast::{
CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName,
SqlOption, Statement,
};
use risingwave_sqlparser::parser::Parser;

use super::create_source::{
generate_stream_graph_for_source, schema_has_schema_registry, validate_compatibility,
Expand Down Expand Up @@ -198,10 +196,7 @@ pub async fn refresh_sr_and_get_columns_diff(
}

fn get_format_encode_from_source(source: &SourceCatalog) -> Result<FormatEncodeOptions> {
let [stmt]: [_; 1] = Parser::parse_sql(&source.definition)
.context("unable to parse original source definition")?
.try_into()
.unwrap();
let stmt = source.create_sql_ast()?;
let Statement::CreateSource {
stmt: CreateSourceStatement { format_encode, .. },
} = stmt
Expand Down Expand Up @@ -263,8 +258,10 @@ pub async fn handle_alter_source_with_sr(

source.info = source_info;
source.columns.extend(added_columns);
source.definition =
alter_definition_format_encode(&source.definition, format_encode.row_options.clone())?;
source.definition = alter_definition_format_encode(
source.create_sql_ast_purified()?,
format_encode.row_options.clone(),
)?;

let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options(
WithOptions::try_from(format_encode.row_options())?,
Expand Down Expand Up @@ -313,15 +310,9 @@ pub async fn handle_alter_source_with_sr(

/// Apply the new `format_encode_options` to the source/table definition.
pub fn alter_definition_format_encode(
definition: &str,
mut stmt: Statement,
format_encode_options: Vec<SqlOption>,
) -> Result<String> {
let ast = Parser::parse_sql(definition).expect("failed to parse relation definition");
let mut stmt = ast
.into_iter()
.exactly_one()
.expect("should contain only one statement");

match &mut stmt {
Statement::CreateSource {
stmt: CreateSourceStatement { format_encode, .. },
Expand Down Expand Up @@ -391,6 +382,9 @@ pub mod tests {
.clone()
};

let source = get_source();
expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://')"].assert_eq(&source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), ""));

let sql = format!(
r#"ALTER SOURCE src FORMAT UPSERT ENCODE PROTOBUF (
message = '.test.TestRecord',
Expand Down Expand Up @@ -434,10 +428,6 @@ pub mod tests {
.unwrap();
assert_eq!(name_column.column_desc.data_type, DataType::Varchar);

let altered_sql = format!(
r#"CREATE SOURCE src WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://{}')"#,
proto_file.path().to_str().unwrap()
);
assert_eq!(altered_sql, altered_source.definition);
expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL, name CHARACTER VARYING) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://')"].assert_eq(&altered_source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), ""));
}
}

0 comments on commit 04a8813

Please sign in to comment.