Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(frontend): reuse def purification for assembling cdc table def when auto schema change #19997

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use thiserror::Error;

use crate::error::{ErrorCode, Result, RwError};
pub(crate) mod catalog_service;
mod purify;
pub mod purify;

pub(crate) mod connection_catalog;
pub(crate) mod database_catalog;
Expand Down
30 changes: 27 additions & 3 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ use risingwave_sqlparser::ast::*;
use crate::error::Result;
use crate::utils::data_type::DataTypeToAst as _;

mod pk_column {
use super::*;
// Identifies a primary key column...
pub trait PkColumn {
fn is(&self, column: &ColumnCatalog) -> bool;
}
// ...by column name.
impl PkColumn for &str {
fn is(&self, column: &ColumnCatalog) -> bool {
column.name() == *self
}
}
// ...by column ID.
impl PkColumn for ColumnId {
fn is(&self, column: &ColumnCatalog) -> bool {
column.column_id() == *self
}
}
}
use pk_column::PkColumn;

/// Try to restore missing column definitions and constraints in the persisted table (or source)
/// definition, if the schema is derived from external systems (like schema registry) or it's
/// created by `CREATE TABLE AS`.
Expand All @@ -32,7 +53,7 @@ pub fn try_purify_table_source_create_sql_ast(
mut base: Statement,
columns: &[ColumnCatalog],
row_id_index: Option<usize>,
pk_column_ids: &[ColumnId],
pk_column_ids: &[impl PkColumn],
) -> Result<Statement> {
let (Statement::CreateTable {
columns: column_defs,
Expand Down Expand Up @@ -141,8 +162,11 @@ pub fn try_purify_table_source_create_sql_ast(
if !has_pk_column_constraint && row_id_index.is_none() {
let mut pk_columns = Vec::new();

for &id in pk_column_ids {
let column = columns.iter().find(|c| c.column_id() == id).unwrap();
for id in pk_column_ids {
let column = columns
.iter()
.find(|c| id.is(c))
.context("primary key column not found")?;
if !column.is_user_defined() {
bail /* unlikely */ !(
"primary key column \"{}\" is not user-defined",
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,14 @@ impl TableCatalog {
.collect()
}

/// Get the column names of the primary key.
pub fn pk_column_names(&self) -> Vec<&str> {
self.pk
.iter()
.map(|x| self.columns[x.column_index].name())
.collect()
}

/// Get a [`TableDesc`] of the table.
///
/// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
Expand Down
92 changes: 32 additions & 60 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -26,22 +26,19 @@ use risingwave_pb::catalog::{Source, Table};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, Ident, ObjectName, Statement, TableConstraint,
};
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement};

use super::create_source::schema_has_schema_registry;
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::purify::try_purify_table_source_create_sql_ast;
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::utils::data_type::DataTypeToAst;
use crate::{Binder, TableCatalog};

/// Used in auto schema change process
Expand All @@ -52,71 +49,47 @@ pub async fn get_new_table_definition_for_cdc_table(
) -> Result<(Statement, Arc<TableCatalog>)> {
let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;

// Retrieve the original table definition and parse it to AST.
let mut definition = original_catalog.create_sql_ast()?;

let Statement::CreateTable {
columns: original_columns,
format_encode,
constraints,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};

assert!(
format_encode.is_none(),
"source schema should be None for CDC table"
assert_eq!(
original_catalog.row_id_index, None,
"primary key of cdc table must be user defined"
);

if bind_table_constraints(constraints)?.is_empty() {
// For table created by `create table t (*)` the constraint is empty, we need to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to keep this line of comment as a side note.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always be handled in try_purify_table_source_create_sql_ast.

// retrieve primary key names from original table catalog if available
let pk_names: Vec<_> = original_catalog
.pk
.iter()
.map(|x| original_catalog.columns[x.column_index].name().to_owned())
.collect();

constraints.push(TableConstraint::Unique {
name: None,
columns: pk_names.iter().map(Ident::new_unchecked).collect(),
is_primary: true,
});
}
// Retrieve the original table definition.
let mut definition = original_catalog.create_sql_ast()?;

let orig_column_catalog: HashMap<String, ColumnCatalog> = HashMap::from_iter(
original_catalog
.columns()
.iter()
.map(|col| (col.name().to_owned(), col.clone())),
);
// Clear the original columns field, so that we'll follow `new_columns` to generate a
// purified definition.
{
let Statement::CreateTable {
columns,
constraints,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};

// update the original columns with new version columns
let mut new_column_defs = vec![];
for new_col in new_columns {
// if the column exists in the original catalog, use it to construct the column definition.
// since we don't support altering the column type right now
if let Some(original_col) = orig_column_catalog.get(new_col.name()) {
let ty = original_col.data_type().to_ast();
new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![]));
} else {
let ty = new_col.data_type().to_ast();
new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![]));
}
columns.clear();
constraints.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constraint is also cleared here, does try_purify_table_source_create_sql_ast can rebuild the constraint from the new_columns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for the primary key table constraint.

}
*original_columns = new_column_defs;

Ok((definition, original_catalog))
let new_definition = try_purify_table_source_create_sql_ast(
definition,
new_columns,
None,
// The IDs of `new_columns` may not be consistently maintained at this point.
// So we use the column names to identify the primary key columns.
&original_catalog.pk_column_names(),
)?;

Ok((new_definition, original_catalog))
}

pub async fn get_replace_table_plan(
session: &Arc<SessionImpl>,
table_name: ObjectName,
new_definition: Statement,
old_catalog: &Arc<TableCatalog>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Table,
Expand Down Expand Up @@ -169,7 +142,6 @@ pub async fn get_replace_table_plan(
on_conflict,
with_version_column,
cdc_table_info,
new_version_columns,
include_column_options,
engine,
)
Expand Down Expand Up @@ -419,7 +391,7 @@ pub async fn handle_alter_table_column(
};

let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?;
get_replace_table_plan(&session, table_name, definition, &original_catalog).await?;

let catalog_writer = session.catalog_writer()?;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub async fn handle_refresh_schema(

let (source, table, graph, col_index_mapping, job_type) = {
let result =
get_replace_table_plan(&session, table_name, definition, &original_table, None).await;
get_replace_table_plan(&session, table_name, definition, &original_table).await;
match result {
Ok((source, table, graph, col_index_mapping, job_type)) => {
Ok((source, table, graph, col_index_mapping, job_type))
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ pub(crate) async fn reparse_table_for_sink(
on_conflict,
with_version_column,
None,
None,
include_column_options,
engine,
)
Expand Down
26 changes: 4 additions & 22 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,13 +1160,13 @@ pub(super) async fn handle_create_table_plan(
}

let (mut columns, pk_names) =
bind_cdc_table_schema(&column_defs, &constraints, None)?;
bind_cdc_table_schema(&column_defs, &constraints)?;
// read default value definition from external db
let (options, secret_refs) = cdc_with_options.clone().into_parts();
let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
.context("failed to extract external table config")?;

let table = ExternalTableImpl::connect(config)
let table: ExternalTableImpl = ExternalTableImpl::connect(config)
.await
.context("failed to auto derive table schema")?;
let external_columns: Vec<_> = table
Expand Down Expand Up @@ -1320,25 +1320,9 @@ async fn bind_cdc_table_schema_externally(
fn bind_cdc_table_schema(
column_defs: &Vec<ColumnDef>,
constraints: &Vec<TableConstraint>,
new_version_columns: Option<Vec<ColumnCatalog>>,
) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
let mut columns = bind_sql_columns(column_defs)?;
// If new_version_columns is provided, we are in the process of auto schema change.
// update the default value column since the default value column is not set in the
// column sql definition.
if let Some(new_version_columns) = new_version_columns {
for (col, new_version_col) in columns
.iter_mut()
.zip_eq_fast(new_version_columns.into_iter())
{
assert_eq!(col.name(), new_version_col.name());
col.column_desc.generated_or_default_column =
new_version_col.column_desc.generated_or_default_column;
}
}

let columns = bind_sql_columns(column_defs)?;
let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;

Ok((columns, pk_names))
}

Expand Down Expand Up @@ -1816,7 +1800,6 @@ pub async fn generate_stream_graph_for_replace_table(
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
cdc_table_info: Option<CdcTableInfo>,
new_version_columns: Option<Vec<ColumnCatalog>>,
include_column_options: IncludeOption,
engine: Engine,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
Expand Down Expand Up @@ -1872,8 +1855,7 @@ pub async fn generate_stream_graph_for_replace_table(
cdc_table.external_table_name.clone(),
)?;

let (columns, pk_names) =
bind_cdc_table_schema(&column_defs, &constraints, new_version_columns)?;
let (columns, pk_names) = bind_cdc_table_schema(&column_defs, &constraints)?;

let context: OptimizerContextRef =
OptimizerContext::new(handler_args, ExplainOptions::default()).into();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async fn get_new_table_plan(
.map(|c| c.into())
.collect_vec();
let table_name = ObjectName::from(vec![table_name.as_str().into()]);

let (new_table_definition, original_catalog) =
get_new_table_definition_for_cdc_table(&session, table_name.clone(), &new_version_columns)
.await?;
Expand All @@ -105,7 +106,6 @@ async fn get_new_table_plan(
table_name,
new_table_definition,
&original_catalog,
Some(new_version_columns),
)
.await?;

Expand Down
Loading