Skip to content

Commit

Permalink
reorganize code
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Dec 30, 2024
1 parent afe7b1d commit 45ab48e
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 137 deletions.
1 change: 1 addition & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use thiserror::Error;

use crate::error::{ErrorCode, Result, RwError};
pub(crate) mod catalog_service;
mod purify;

pub(crate) mod connection_catalog;
pub(crate) mod database_catalog;
Expand Down
130 changes: 130 additions & 0 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnId};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_sqlparser::ast::*;

use crate::error::Result;
use crate::utils::data_type::DataTypeToAst as _;

/// Try to restore missing column definitions and constraints in the persisted table definition,
/// if the schema of the table is derived from external systems (like schema registry) or it's
/// created by `CREATE TABLE AS`.
///
/// Returns error if restoring failed, or called on non-`TableType::Table`, or the persisted
/// definition is invalid.
pub fn try_purify_table_create_sql_ast(
mut base: Statement,
columns: &[ColumnCatalog],
row_id_index: Option<usize>,
pk_column_ids: &[ColumnId],
) -> Result<Statement> {
let Statement::CreateTable {
columns: column_defs,
constraints,
wildcard_idx,
..
} = &mut base
else {
bail!("expect `CREATE TABLE` statement, found: `{:?}`", base);
};

// Filter out columns that are not defined by users in SQL.
let defined_columns = columns.iter().filter(|c| c.is_user_defined());

// If all columns are defined...
// - either the schema is fully specified by the user,
// - the persisted definition is already purified.
// No need to proceed.
if !column_defs.is_empty() && wildcard_idx.is_none() {
let defined_columns_len = defined_columns.count();
if column_defs.len() != defined_columns_len {
bail /* unlikely */ !(
"column count mismatch: defined {} columns, but {} columns in the definition",
defined_columns_len,
column_defs.len()
);
}

return Ok(base);
}

// Schema inferred. Now derive the missing columns and constraints.
// First, remove the wildcard from the definition.
*wildcard_idx = None;

// Derive `ColumnDef` from `ColumnCatalog`.
let mut purified_column_defs = Vec::new();
for column in defined_columns {
// If the column is already defined in the persisted definition, keep it.
if let Some(existing) = column_defs
.iter()
.find(|c| c.name.real_value() == column.name())
{
purified_column_defs.push(existing.clone());
continue;
}

if let Some(c) = &column.column_desc.generated_or_default_column {
match c {
GeneratedOrDefaultColumn::GeneratedColumn(_) => {
unreachable!("generated column must not be inferred");
}
GeneratedOrDefaultColumn::DefaultColumn(_) => {
// TODO: convert `ExprNode` back to ast can be a bit tricky.
// Fortunately, this case is rare as inferring default values is not
// widely supported.
bail /* unlikely */ !("purifying default value is not supported yet");
}
}
}

let column_def = ColumnDef {
name: column.name().into(),
data_type: Some(column.data_type().to_ast()?),
collation: None,
options: Vec::new(), // pk will be specified with table constraints
};
purified_column_defs.push(column_def);
}
*column_defs = purified_column_defs;

if row_id_index.is_none() {
// User-defined primary key.
let mut pk_columns = Vec::new();

for &id in pk_column_ids {
let column = columns.iter().find(|c| c.column_id() == id).unwrap();
if !column.is_user_defined() {
bail /* unlikely */ !(
"primary key column \"{}\" is not user-defined",
column.name()
);
}
pk_columns.push(column.name().into());
}

let pk_constraint = TableConstraint::Unique {
name: None,
columns: pk_columns,
is_primary: true,
};
// We don't support table constraints other than `PRIMARY KEY`, thus simply overwrite.
*constraints = vec![pk_constraint];
}

Ok(base)
}
154 changes: 17 additions & 137 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::collections::{HashMap, HashSet};
use anyhow::Context as _;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, Engine, Field, Schema, StreamJobStatus, TableDesc,
TableId, TableVersionId,
Expand All @@ -36,13 +35,13 @@ use risingwave_sqlparser::ast;
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport as _;

use super::purify::try_purify_table_create_sql_ast;
use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::session::current::notice_to_user;
use crate::user::UserId;
use crate::utils::data_type::DataTypeToAst;

/// `TableCatalog` Includes full information about a table.
///
Expand Down Expand Up @@ -289,8 +288,22 @@ impl TableCatalog {
///
/// Returns error if it's invalid.
pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
// Purification is only applicable to tables.
if let TableType::Table = self.table_type() {
match self.try_purify_table_create_sql_ast() {
let base = if self.definition.is_empty() {
// Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
let name = ast::ObjectName(vec![self.name.as_str().into()]);
ast::Statement::default_create_table(name)
} else {
self.create_sql_ast()?
};

match try_purify_table_create_sql_ast(
base,
self.columns(),
self.row_id_index,
&self.pk_column_ids(),
) {
Ok(stmt) => return Ok(stmt),
Err(e) => notice_to_user(format!(
"error occurred while purifying definition for table \"{}\", \
Expand All @@ -300,141 +313,8 @@ impl TableCatalog {
)),
}
}
self.create_sql_ast()
}

/// Try to restore missing column definitions and constraints in the persisted table definition,
/// if the schema of the table is derived from external systems (like schema registry) or it's
/// created by `CREATE TABLE AS`.
///
/// Returns error if restoring failed, or called on non-`TableType::Table`, or the persisted
/// definition is invalid.
fn try_purify_table_create_sql_ast(&self) -> Result<ast::Statement> {
use ast::*;

let mut base = if self.definition.is_empty() {
// Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
let name = ObjectName(vec![self.name.as_str().into()]);

Statement::CreateTable {
name,
or_replace: false,
temporary: false,
if_not_exists: false,
columns: Vec::new(),
wildcard_idx: None,
constraints: Vec::new(),
with_options: Vec::new(),
format_encode: None,
source_watermarks: Vec::new(),
append_only: false,
on_conflict: None,
with_version_column: None,
query: None,
cdc_table_info: None,
include_column_options: Vec::new(),
webhook_info: None,
engine: Engine::Hummock,
}
} else {
self.create_sql_ast()?
};

let Statement::CreateTable {
columns: column_defs,
constraints,
wildcard_idx,
..
} = &mut base
else {
bail!("expect `CREATE TABLE` statement, found: `{:?}`", base);
};

// Filter out columns that are not defined by users in SQL.
let defined_columns = self.columns.iter().filter(|c| c.is_user_defined());

// If all columns are defined...
// - either the schema is fully specified by the user,
// - the persisted definition is already purified.
// No need to proceed.
if !column_defs.is_empty() && wildcard_idx.is_none() {
let defined_columns_len = defined_columns.count();
if column_defs.len() != defined_columns_len {
bail /* unlikely */ !(
"column count mismatch: defined {} columns, but {} columns in the definition",
defined_columns_len,
column_defs.len()
);
}

return Ok(base);
}

// Schema inferred. Now derive the missing columns and constraints.
// First, remove the wildcard from the definition.
*wildcard_idx = None;

// Derive `ColumnDef` from `ColumnCatalog`.
let mut purified_column_defs = Vec::new();
for column in defined_columns {
// If the column is already defined in the persisted definition, keep it.
if let Some(existing) = column_defs
.iter()
.find(|c| c.name.real_value() == column.name())
{
purified_column_defs.push(existing.clone());
continue;
}

if let Some(c) = &column.column_desc.generated_or_default_column {
match c {
GeneratedOrDefaultColumn::GeneratedColumn(_) => {
unreachable!("generated column must not be inferred");
}
GeneratedOrDefaultColumn::DefaultColumn(_) => {
// TODO: convert `ExprNode` back to ast can be a bit tricky.
// Fortunately, this case is rare as inferring default values is not
// widely supported.
bail /* unlikely */ !("purifying default value is not supported yet");
}
}
}

let column_def = ColumnDef {
name: column.name().into(),
data_type: Some(column.data_type().to_ast()?),
collation: None,
options: Vec::new(), // pk will be specified with table constraints
};
purified_column_defs.push(column_def);
}
*column_defs = purified_column_defs;

if self.row_id_index.is_none() {
// User-defined primary key.
let mut pk_columns = Vec::new();

for id in self.pk_column_ids() {
let column = self.columns.iter().find(|c| c.column_id() == id).unwrap();
if !column.is_user_defined() {
bail /* unlikely */ !(
"primary key column \"{}\" is not user-defined",
column.name()
);
}
pk_columns.push(column.name().into());
}

let pk_constraint = TableConstraint::Unique {
name: None,
columns: pk_columns,
is_primary: true,
};
// We don't support table constraints other than `PRIMARY KEY`, thus simply overwrite.
*constraints = vec![pk_constraint];
}

Ok(base)
self.create_sql_ast()
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3383,6 +3383,30 @@ impl Statement {
pub fn to_redacted_string(&self, keywords: RedactSqlOptionKeywordsRef) -> String {
REDACT_SQL_OPTION_KEYWORDS.sync_scope(keywords, || self.to_string())
}

/// Create a new `CREATE TABLE` statement with the given `name` and empty fields.
pub fn default_create_table(name: ObjectName) -> Self {
Self::CreateTable {
name,
or_replace: false,
temporary: false,
if_not_exists: false,
columns: Vec::new(),
wildcard_idx: None,
constraints: Vec::new(),
with_options: Vec::new(),
format_encode: None,
source_watermarks: Vec::new(),
append_only: false,
on_conflict: None,
with_version_column: None,
query: None,
cdc_table_info: None,
include_column_options: Vec::new(),
webhook_info: None,
engine: Engine::Hummock,
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit 45ab48e

Please sign in to comment.