Skip to content

Commit

Permalink
reuse purification in get_new_table_definition_for_cdc_table
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 2, 2025
1 parent f31c80f commit 7ec1e55
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use thiserror::Error;

use crate::error::{ErrorCode, Result, RwError};
pub(crate) mod catalog_service;
mod purify;
pub mod purify;

pub(crate) mod connection_catalog;
pub(crate) mod database_catalog;
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use prost::Message as _;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnId};
Expand Down Expand Up @@ -132,7 +133,10 @@ pub fn try_purify_table_source_create_sql_ast(
let mut pk_columns = Vec::new();

for &id in pk_column_ids {
let column = columns.iter().find(|c| c.column_id() == id).unwrap();
let column = columns
.iter()
.find(|c| c.column_id() == id)
.context("primary key column not found")?;
if !column.is_user_defined() {
bail /* unlikely */ !(
"primary key column \"{}\" is not user-defined",
Expand Down
86 changes: 29 additions & 57 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -26,22 +26,19 @@ use risingwave_pb::catalog::{Source, Table};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, Ident, ObjectName, Statement, TableConstraint,
};
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement};

use super::create_source::schema_has_schema_registry;
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::purify::try_purify_table_source_create_sql_ast;
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::utils::data_type::DataTypeToAst;
use crate::{Binder, TableCatalog};

/// Used in auto schema change process
Expand All @@ -52,63 +49,38 @@ pub async fn get_new_table_definition_for_cdc_table(
) -> Result<(Statement, Arc<TableCatalog>)> {
let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;

// Retrieve the original table definition and parse it to AST.
let mut definition = original_catalog.create_sql_ast()?;

let Statement::CreateTable {
columns: original_columns,
format_encode,
constraints,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};

assert!(
format_encode.is_none(),
"source schema should be None for CDC table"
assert_eq!(
original_catalog.row_id_index, None,
"primary key of cdc table must be user defined"
);

if bind_table_constraints(constraints)?.is_empty() {
// For table created by `create table t (*)` the constraint is empty, we need to
// retrieve primary key names from original table catalog if available
let pk_names: Vec<_> = original_catalog
.pk
.iter()
.map(|x| original_catalog.columns[x.column_index].name().to_owned())
.collect();

constraints.push(TableConstraint::Unique {
name: None,
columns: pk_names.iter().map(Ident::new_unchecked).collect(),
is_primary: true,
});
}
// Retrieve the original table definition.
let mut definition = original_catalog.create_sql_ast()?;

let orig_column_catalog: HashMap<String, ColumnCatalog> = HashMap::from_iter(
original_catalog
.columns()
.iter()
.map(|col| (col.name().to_owned(), col.clone())),
);
// Clear the original columns field, so that we'll follow `new_columns` to generate a
// purified definition.
{
let Statement::CreateTable {
columns,
constraints,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};

// update the original columns with new version columns
let mut new_column_defs = vec![];
for new_col in new_columns {
// if the column exists in the original catalog, use it to construct the column definition.
// since we don't support altering the column type right now
if let Some(original_col) = orig_column_catalog.get(new_col.name()) {
let ty = original_col.data_type().to_ast();
new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![]));
} else {
let ty = new_col.data_type().to_ast();
new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![]));
}
columns.clear();
constraints.clear();
}
*original_columns = new_column_defs;

Ok((definition, original_catalog))
let new_definition = try_purify_table_source_create_sql_ast(
definition,
new_columns,
None,
&original_catalog.pk_column_ids(),
)?;

Ok((new_definition, original_catalog))
}

pub async fn get_replace_table_plan(
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async fn get_new_table_plan(
.map(|c| c.into())
.collect_vec();
let table_name = ObjectName::from(vec![table_name.as_str().into()]);

let (new_table_definition, original_catalog) =
get_new_table_definition_for_cdc_table(&session, table_name.clone(), &new_version_columns)
.await?;
Expand Down

0 comments on commit 7ec1e55

Please sign in to comment.