Skip to content

Commit

Permalink
introduce column strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 15, 2025
1 parent eedda4d commit b599ec8
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 29 deletions.
14 changes: 11 additions & 3 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement};

use super::create_source::schema_has_schema_registry;
use super::create_source::{schema_has_schema_registry, SqlColumnStrategy};
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
Expand Down Expand Up @@ -91,6 +91,7 @@ pub async fn get_replace_table_plan(
table_name: ObjectName,
new_definition: Statement,
old_catalog: &Arc<TableCatalog>,
sql_column_strategy: SqlColumnStrategy,
) -> Result<(
Option<Source>,
Table,
Expand Down Expand Up @@ -145,6 +146,7 @@ pub async fn get_replace_table_plan(
cdc_table_info,
include_column_options,
engine,
sql_column_strategy,
)
.await?;

Expand Down Expand Up @@ -392,8 +394,14 @@ pub async fn handle_alter_table_column(
_ => unreachable!(),
};

let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog).await?;
let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
&session,
table_name,
definition,
&original_catalog,
SqlColumnStrategy::Follow,
)
.await?;

let catalog_writer = session.catalog_writer()?;

Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
use thiserror_ext::AsReport;

use super::alter_table_column::fetch_table_catalog_for_alter;
use super::create_source::schema_has_schema_registry;
use super::create_source::{schema_has_schema_registry, SqlColumnStrategy};
use super::util::SourceSchemaCompatExt;
use super::{get_replace_table_plan, HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
Expand Down Expand Up @@ -64,8 +64,14 @@ pub async fn handle_refresh_schema(
.context("unable to parse original table definition")?;

let (source, table, graph, col_index_mapping, job_type) = {
let result =
get_replace_table_plan(&session, table_name, definition, &original_table).await;
let result = get_replace_table_plan(
&session,
table_name,
definition,
&original_table,
SqlColumnStrategy::Ignore,
)
.await;
match result {
Ok((source, table, graph, col_index_mapping, job_type)) => {
Ok((source, table, graph, col_index_mapping, job_type))
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use risingwave_sqlparser::ast::{
};

use super::create_mv::get_column_names;
use super::create_source::UPSTREAM_SOURCE_KEY;
use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
use super::util::gen_query_from_table_name;
use super::RwPgResponse;
use crate::binder::Binder;
Expand Down Expand Up @@ -559,6 +559,7 @@ pub(crate) async fn reparse_table_for_sink(
None,
include_column_options,
engine,
SqlColumnStrategy::Follow,
)
.await?;

Expand Down
93 changes: 72 additions & 21 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,31 +198,71 @@ pub(crate) fn bind_all_columns(
cols_from_sql: Vec<ColumnCatalog>,
col_defs_from_sql: &[ColumnDef],
wildcard_idx: Option<usize>,
sql_column_strategy: SqlColumnStrategy,
) -> Result<Vec<ColumnCatalog>> {
if let Some(cols_from_source) = cols_from_source {
if cols_from_sql.is_empty() {
Ok(cols_from_source)
} else if let Some(wildcard_idx) = wildcard_idx {
if col_defs_from_sql.iter().any(|c| !c.is_generated()) {
Err(RwError::from(NotSupported(
"Only generated columns are allowed in user-defined schema from SQL".to_owned(),
"Remove the non-generated columns".to_owned(),
)))
} else {
// Replace `*` with `cols_from_source`
let mut cols_from_sql = cols_from_sql;
let mut cols_from_source = cols_from_source;
let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx);
cols_from_sql.append(&mut cols_from_source);
cols_from_sql.append(&mut cols_from_sql_r);
Ok(cols_from_sql)
match sql_column_strategy {
// Ignore `cols_from_source`, follow `cols_from_sql` without checking.
SqlColumnStrategy::Follow => {
assert!(
wildcard_idx.is_none(),
"wildcard still exists while strategy is Follows, not correctly purified?"
);
return Ok(cols_from_sql);
}

// Will merge generated columns from `cols_from_sql` into `cols_from_source`.
SqlColumnStrategy::Ignore => {}
SqlColumnStrategy::Reject => {
// Perform extra check to see if there are non-generated columns in `cols_from_sql`
// and reject the request if there are.

// Need to check `col_defs` to see if a column is generated, as we haven't bind the
// `GeneratedColumnDesc` in `ColumnCatalog` yet.
let generated_cols_from_sql = cols_from_sql
.iter()
.filter(|c| {
col_defs_from_sql
.iter()
.find(|d| d.name.real_value() == c.name())
.unwrap()
.is_generated()
})
.cloned()
.collect_vec();

if generated_cols_from_sql.len() != cols_from_sql.len() {
if wildcard_idx.is_some() {
// (*, normal_column INT)
return Err(RwError::from(NotSupported(
"Only generated columns are allowed in user-defined schema from SQL"
.to_owned(),
"Remove the non-generated columns".to_owned(),
)));
} else {
// (normal_column INT)
// TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209
return Err(RwError::from(ProtocolError(format!(
"User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ \
for more information.",
format_encode.format, format_encode.row_encode
))));
}
}
}
} else {
// TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209
Err(RwError::from(ProtocolError(
format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", format_encode.format, format_encode.row_encode))))
}

let wildcard_idx = wildcard_idx.unwrap_or(0);

// Replace `*` with `cols_from_source`
let mut cols_from_sql = cols_from_sql;
let mut cols_from_source = cols_from_source;
let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx);
cols_from_sql.append(&mut cols_from_source);
cols_from_sql.append(&mut cols_from_sql_r);

Ok(cols_from_sql)
} else {
if wildcard_idx.is_some() {
return Err(RwError::from(NotSupported(
Expand Down Expand Up @@ -610,6 +650,14 @@ pub fn bind_connector_props(
Ok(with_properties)
}

/// When the schema can be inferred from external system,
/// how to handle the columns (excluding generated columns) from SQL?
pub enum SqlColumnStrategy {
Ignore,
Follow,
Reject,
}

#[allow(clippy::too_many_arguments)]
pub async fn bind_create_source_or_table_with_connector(
handler_args: HandlerArgs,
Expand All @@ -626,6 +674,7 @@ pub async fn bind_create_source_or_table_with_connector(
col_id_gen: &mut ColumnIdGenerator,
create_source_type: CreateSourceType,
source_rate_limit: Option<u32>,
sql_column_strategy: SqlColumnStrategy,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
let db_name: &str = &session.database();
Expand Down Expand Up @@ -666,6 +715,7 @@ pub async fn bind_create_source_or_table_with_connector(
columns_from_sql,
sql_columns_defs,
wildcard_idx,
sql_column_strategy,
)?;

// add additional columns before bind pk, because `format upsert` requires the key column
Expand Down Expand Up @@ -854,6 +904,7 @@ pub async fn handle_create_source(
&mut col_id_gen,
create_source_type,
overwrite_options.source_rate_limit,
SqlColumnStrategy::Reject,
)
.await?;

Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use risingwave_sqlparser::ast::{
use risingwave_sqlparser::parser::{IncludeOption, Parser};
use thiserror_ext::AsReport;

use super::create_source::{bind_columns_from_source, CreateSourceType};
use super::create_source::{bind_columns_from_source, CreateSourceType, SqlColumnStrategy};
use super::{create_sink, create_source, RwPgResponse};
use crate::binder::{bind_data_type, bind_struct_field, Clause, SecureCompareContext};
use crate::catalog::root_catalog::SchemaPath;
Expand Down Expand Up @@ -524,6 +524,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
mut col_id_gen: ColumnIdGenerator,
include_column_options: IncludeOption,
props: CreateTableProps,
sql_column_strategy: SqlColumnStrategy,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
if props.append_only
&& format_encode.format != Format::Plain
Expand Down Expand Up @@ -567,6 +568,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
&mut col_id_gen,
CreateSourceType::Table,
rate_limit,
sql_column_strategy,
)
.await?;

Expand Down Expand Up @@ -1086,6 +1088,7 @@ pub(super) async fn handle_create_table_plan(
col_id_gen,
include_column_options,
props,
SqlColumnStrategy::Reject,
)
.await?,
TableJobType::General,
Expand Down Expand Up @@ -1802,6 +1805,7 @@ pub async fn generate_stream_graph_for_replace_table(
cdc_table_info: Option<CdcTableInfo>,
include_column_options: IncludeOption,
engine: Engine,
sql_column_strategy: SqlColumnStrategy,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

Expand All @@ -1828,6 +1832,7 @@ pub async fn generate_stream_graph_for_replace_table(
col_id_gen,
include_column_options,
props,
sql_column_strategy,
)
.await?,
TableJobType::General,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_sqlparser::ast::ObjectName;
use tonic::{Request as RpcRequest, Response as RpcResponse, Status};

use crate::error::RwError;
use crate::handler::create_source::SqlColumnStrategy;
use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
use crate::session::SESSION_MANAGER;

Expand Down Expand Up @@ -106,6 +107,7 @@ async fn get_new_table_plan(
table_name,
new_table_definition,
&original_catalog,
SqlColumnStrategy::Follow, // not used
)
.await?;

Expand Down

0 comments on commit b599ec8

Please sign in to comment.