Skip to content

Commit

Permalink
use column name for primary key
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 15, 2025
1 parent 6bfa7ea commit f95d993
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
27 changes: 24 additions & 3 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ use risingwave_sqlparser::ast::*;
use crate::error::Result;
use crate::utils::data_type::DataTypeToAst as _;

mod pk_column {
use super::*;
// Identifies a primary key column...
pub trait PkColumn {
fn is(&self, column: &ColumnCatalog) -> bool;
}
// ...by column name.
impl PkColumn for &str {
fn is(&self, column: &ColumnCatalog) -> bool {
column.name() == *self
}
}
// ...by column ID.
impl PkColumn for ColumnId {
fn is(&self, column: &ColumnCatalog) -> bool {
column.column_id() == *self
}
}
}
use pk_column::PkColumn;

/// Try to restore missing column definitions and constraints in the persisted table (or source)
/// definition, if the schema is derived from external systems (like schema registry) or it's
/// created by `CREATE TABLE AS`.
Expand All @@ -32,7 +53,7 @@ pub fn try_purify_table_source_create_sql_ast(
mut base: Statement,
columns: &[ColumnCatalog],
row_id_index: Option<usize>,
pk_column_ids: &[ColumnId],
pk_column_ids: &[impl PkColumn],
) -> Result<Statement> {
let (Statement::CreateTable {
columns: column_defs,
Expand Down Expand Up @@ -141,10 +162,10 @@ pub fn try_purify_table_source_create_sql_ast(
if !has_pk_column_constraint && row_id_index.is_none() {
let mut pk_columns = Vec::new();

for &id in pk_column_ids {
for id in pk_column_ids {
let column = columns
.iter()
.find(|c| c.column_id() == id)
.find(|c| id.is(c))
.context("primary key column not found")?;
if !column.is_user_defined() {
bail /* unlikely */ !(
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,14 @@ impl TableCatalog {
.collect()
}

/// Get the column names of the primary key.
pub fn pk_column_names(&self) -> Vec<&str> {
self.pk
.iter()
.map(|x| self.columns[x.column_index].name())
.collect()
}

/// Get a [`TableDesc`] of the table.
///
/// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ pub async fn get_new_table_definition_for_cdc_table(
definition,
new_columns,
None,
&original_catalog.pk_column_ids(),
// The IDs of `new_columns` may not be consistently maintained at this point.
// So we use the column names to identify the primary key columns.
&original_catalog.pk_column_names(),
)?;

Ok((new_definition, original_catalog))
Expand Down

0 comments on commit f95d993

Please sign in to comment.