Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): allow schema change & sink into a CTAS table #20174

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions e2e_test/ddl/show_purify.slt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,23 @@ show create table ctas;
----
public.ctas CREATE TABLE ctas (v0 INT, v1 NUMERIC, v2 TIMESTAMP, v3 TIMESTAMP WITH TIME ZONE, v4 CHARACTER VARYING[], v5 STRUCT<i BIGINT, j STRUCT<a BIGINT, b CHARACTER VARYING>>, v6 rw_int256, v7 MAP(CHARACTER VARYING,INT))

statement ok
alter table ctas add column v8 double;

# Show that we can add or drop columns to the CTAS table.

query TT
show create table ctas;
----
public.ctas CREATE TABLE ctas (v0 INT, v1 NUMERIC, v2 TIMESTAMP, v3 TIMESTAMP WITH TIME ZONE, v4 CHARACTER VARYING[], v5 STRUCT<i BIGINT, j STRUCT<a BIGINT, b CHARACTER VARYING>>, v6 rw_int256, v7 MAP(CHARACTER VARYING,INT), v8 DOUBLE)

statement ok
alter table ctas drop column v0;

query TT
show create table ctas;
----
public.ctas CREATE TABLE ctas (v1 NUMERIC, v2 TIMESTAMP, v3 TIMESTAMP WITH TIME ZONE, v4 CHARACTER VARYING[], v5 STRUCT<i BIGINT, j STRUCT<a BIGINT, b CHARACTER VARYING>>, v6 rw_int256, v7 MAP(CHARACTER VARYING,INT), v8 DOUBLE)

statement ok
drop table ctas;
25 changes: 25 additions & 0 deletions e2e_test/sink/sink_into_table/ctas.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table ctas as select 1 as v;

query I rowsort
select * from ctas;
----
1

statement ok
create sink sk into ctas as select 2 as v with (type = 'append-only');

query I rowsort
select * from ctas;
----
1
2

statement ok
drop sink sk;

statement ok
drop table ctas;
28 changes: 25 additions & 3 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ impl TableVersion {
impl TableCatalog {
/// Returns the SQL definition when the table was created, purified with best effort
/// if it's a table.
///
/// See [`Self::create_sql_ast_purified`] for more details.
pub fn create_sql_purified(&self) -> String {
self.create_sql_ast_purified()
.map(|stmt| stmt.to_string())
Expand All @@ -297,7 +299,7 @@ impl TableCatalog {
let name = ast::ObjectName(vec![self.name.as_str().into()]);
ast::Statement::default_create_table(name)
} else {
self.create_sql_ast()?
self.create_sql_ast_from_persisted()?
};

match try_purify_table_source_create_sql_ast(
Expand All @@ -316,7 +318,7 @@ impl TableCatalog {
}
}

self.create_sql_ast()
self.create_sql_ast_from_persisted()
}
}

Expand Down Expand Up @@ -481,14 +483,34 @@ impl TableCatalog {
}

/// Returns the SQL definition when the table was created.
///
/// See [`Self::create_sql_ast`] for more details.
pub fn create_sql(&self) -> String {
self.definition.clone()
self.create_sql_ast()
.map(|stmt| stmt.to_string())
.unwrap_or_else(|_| self.definition.clone())
}

/// Returns the parsed SQL definition when the table was created.
///
/// Re-create the table with this statement may have different schema if the schema is derived
/// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this
/// is not desired, use [`Self::create_sql_ast_purified`] instead.
///
/// Returns error if it's invalid.
pub fn create_sql_ast(&self) -> Result<ast::Statement> {
if let TableType::Table = self.table_type()
&& self.definition.is_empty()
{
// Always fix definition for `CREATE TABLE AS`.
self.create_sql_ast_purified()
} else {
// Directly parse the persisted definition.
self.create_sql_ast_from_persisted()
}
}

fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
Ok(Parser::parse_sql(&self.definition)
.context("unable to parse definition sql")?
.into_iter()
Expand Down
Loading