-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(frontend): reuse def purification for assembling cdc table def when auto schema change #19997
refactor(frontend): reuse def purification for assembling cdc table def when auto schema change #19997
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ | |
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::collections::{HashMap, HashSet}; | ||
use std::collections::HashSet; | ||
use std::sync::Arc; | ||
|
||
use itertools::Itertools; | ||
|
@@ -26,22 +26,19 @@ use risingwave_pb::catalog::{Source, Table}; | |
use risingwave_pb::ddl_service::TableJobType; | ||
use risingwave_pb::stream_plan::stream_node::PbNodeBody; | ||
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; | ||
use risingwave_sqlparser::ast::{ | ||
AlterTableOperation, ColumnDef, ColumnOption, Ident, ObjectName, Statement, TableConstraint, | ||
}; | ||
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement}; | ||
|
||
use super::create_source::schema_has_schema_registry; | ||
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; | ||
use super::util::SourceSchemaCompatExt; | ||
use super::{HandlerArgs, RwPgResponse}; | ||
use crate::catalog::purify::try_purify_table_source_create_sql_ast; | ||
use crate::catalog::root_catalog::SchemaPath; | ||
use crate::catalog::table_catalog::TableType; | ||
use crate::error::{ErrorCode, Result, RwError}; | ||
use crate::expr::{Expr, ExprImpl, InputRef, Literal}; | ||
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; | ||
use crate::handler::create_table::bind_table_constraints; | ||
use crate::session::SessionImpl; | ||
use crate::utils::data_type::DataTypeToAst; | ||
use crate::{Binder, TableCatalog}; | ||
|
||
/// Used in auto schema change process | ||
|
@@ -52,71 +49,47 @@ pub async fn get_new_table_definition_for_cdc_table( | |
) -> Result<(Statement, Arc<TableCatalog>)> { | ||
let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; | ||
|
||
// Retrieve the original table definition and parse it to AST. | ||
let mut definition = original_catalog.create_sql_ast()?; | ||
|
||
let Statement::CreateTable { | ||
columns: original_columns, | ||
format_encode, | ||
constraints, | ||
.. | ||
} = &mut definition | ||
else { | ||
panic!("unexpected statement: {:?}", definition); | ||
}; | ||
|
||
assert!( | ||
format_encode.is_none(), | ||
"source schema should be None for CDC table" | ||
assert_eq!( | ||
original_catalog.row_id_index, None, | ||
"primary key of cdc table must be user defined" | ||
); | ||
|
||
if bind_table_constraints(constraints)?.is_empty() { | ||
// For table created by `create table t (*)` the constraint is empty, we need to | ||
// retrieve primary key names from original table catalog if available | ||
let pk_names: Vec<_> = original_catalog | ||
.pk | ||
.iter() | ||
.map(|x| original_catalog.columns[x.column_index].name().to_owned()) | ||
.collect(); | ||
|
||
constraints.push(TableConstraint::Unique { | ||
name: None, | ||
columns: pk_names.iter().map(Ident::new_unchecked).collect(), | ||
is_primary: true, | ||
}); | ||
} | ||
// Retrieve the original table definition. | ||
let mut definition = original_catalog.create_sql_ast()?; | ||
|
||
let orig_column_catalog: HashMap<String, ColumnCatalog> = HashMap::from_iter( | ||
original_catalog | ||
.columns() | ||
.iter() | ||
.map(|col| (col.name().to_owned(), col.clone())), | ||
); | ||
// Clear the original columns field, so that we'll follow `new_columns` to generate a | ||
// purified definition. | ||
{ | ||
let Statement::CreateTable { | ||
columns, | ||
constraints, | ||
.. | ||
} = &mut definition | ||
else { | ||
panic!("unexpected statement: {:?}", definition); | ||
}; | ||
|
||
// update the original columns with new version columns | ||
let mut new_column_defs = vec![]; | ||
for new_col in new_columns { | ||
// if the column exists in the original catalog, use it to construct the column definition. | ||
// since we don't support altering the column type right now | ||
if let Some(original_col) = orig_column_catalog.get(new_col.name()) { | ||
let ty = original_col.data_type().to_ast(); | ||
new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![])); | ||
} else { | ||
let ty = new_col.data_type().to_ast(); | ||
new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![])); | ||
} | ||
columns.clear(); | ||
constraints.clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The constraint is also cleared here, does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, for the primary key table constraint. |
||
} | ||
*original_columns = new_column_defs; | ||
|
||
Ok((definition, original_catalog)) | ||
let new_definition = try_purify_table_source_create_sql_ast( | ||
definition, | ||
new_columns, | ||
None, | ||
// The IDs of `new_columns` may not be consistently maintained at this point. | ||
// So we use the column names to identify the primary key columns. | ||
&original_catalog.pk_column_names(), | ||
)?; | ||
|
||
Ok((new_definition, original_catalog)) | ||
} | ||
|
||
pub async fn get_replace_table_plan( | ||
session: &Arc<SessionImpl>, | ||
table_name: ObjectName, | ||
new_definition: Statement, | ||
old_catalog: &Arc<TableCatalog>, | ||
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change | ||
) -> Result<( | ||
Option<Source>, | ||
Table, | ||
|
@@ -169,7 +142,6 @@ pub async fn get_replace_table_plan( | |
on_conflict, | ||
with_version_column, | ||
cdc_table_info, | ||
new_version_columns, | ||
include_column_options, | ||
engine, | ||
) | ||
|
@@ -419,7 +391,7 @@ pub async fn handle_alter_table_column( | |
}; | ||
|
||
let (source, table, graph, col_index_mapping, job_type) = | ||
get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?; | ||
get_replace_table_plan(&session, table_name, definition, &original_catalog).await?; | ||
|
||
let catalog_writer = session.catalog_writer()?; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to keep this line of comment as a side note.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will always be handled in
try_purify_table_source_create_sql_ast
.