Skip to content

Commit

Permalink
feat: support DROP SCHEMA CASCADE and some refactoring (#19702)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Jan 16, 2025
1 parent 277fa86 commit 297a634
Show file tree
Hide file tree
Showing 26 changed files with 612 additions and 748 deletions.
4 changes: 3 additions & 1 deletion dashboard/scripts/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ rm -rf tmp_gen
mkdir tmp_gen
cp -a ../proto/*.proto tmp_gen

# Array in proto will conflict with JavaScript's Array, so we replace it with RwArray.
# Replace some keywords in JavaScript to avoid conflicts: Array, Object.
if [[ "$OSTYPE" == "darwin"* ]]; then
sed -i "" -e "s/Array/RwArray/" "tmp_gen/data.proto"
sed -i "" -e "s/ Object / RwObject /" "tmp_gen/meta.proto"
else
sed -i -e "s/Array/RwArray/" "tmp_gen/data.proto"
sed -i -e "s/ Object / RwObject /" "tmp_gen/meta.proto"
fi

mkdir -p proto/gen
Expand Down
63 changes: 63 additions & 0 deletions e2e_test/ddl/drop/drop_schema_cascade.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
statement ok
create schema schema1;

statement ok
create schema schema2;

statement ok
create table schema1.t1 (v1 int, v2 int);

statement ok
create index idx1 on schema1.t1(v1);

statement ok
create subscription schema1.sub from schema1.t1 with(retention = '1D');

statement ok
create materialized view schema2.mv2 as select v2, v1 from schema1.t1;

statement ok
create materialized view schema1.mv1 as select v2, v1 from schema1.t1;

statement ok
CREATE SOURCE schema1.src1 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '1',
fields.v.end = '10',
datagen.rows.per.second='15',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

statement ok
create function schema1.f1(INT) returns int language sql as 'select $1';

statement ok
create table schema1.t2 (v1 int, v2 int);

statement ok
create table schema1.t3 (v1 int primary key, v2 int);

statement ok
create sink schema1.s1 into schema1.t3 from schema1.t2;

statement ok
create table schema2.t4 (v1 int primary key, v2 int);

statement ok
create sink schema1.s2 into schema2.t4 from schema1.t2;

statement error
drop schema schema1;

statement error Found sink into table in dependency
drop schema schema1 cascade;

statement ok
drop table schema2.t4 cascade;

statement ok
drop schema schema1 cascade;

statement ok
drop schema schema2;
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message CreateSchemaResponse {

message DropSchemaRequest {
uint32 schema_id = 1;
bool cascade = 2;
}

message DropSchemaResponse {
Expand Down
25 changes: 15 additions & 10 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -483,19 +483,24 @@ message MetaSnapshot {
SnapshotVersion version = 13;
}

message Relation {
oneof relation_info {
catalog.Table table = 1;
catalog.Source source = 2;
catalog.Sink sink = 3;
message Object {
oneof object_info {
catalog.Database database = 1;
catalog.Schema schema = 2;
catalog.Table table = 3;
catalog.Index index = 4;
catalog.View view = 5;
catalog.Subscription subscription = 6;
catalog.Source source = 5;
catalog.Sink sink = 6;
catalog.View view = 7;
catalog.Function function = 8;
catalog.Connection connection = 9;
catalog.Subscription subscription = 10;
catalog.Secret secret = 11;
}
}

message RelationGroup {
repeated Relation relations = 1;
message ObjectGroup {
repeated Object objects = 1;
}

message Recovery {}
Expand Down Expand Up @@ -526,7 +531,7 @@ message SubscribeResponse {
backup_service.MetaBackupManifestId meta_backup_manifest_id = 17;
SystemParams system_params = 19;
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
ObjectGroup object_group = 21;
catalog.Connection connection = 22;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ where
notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() {
Info::Database(_)
| Info::Schema(_)
| Info::RelationGroup(_)
| Info::ObjectGroup(_)
| Info::User(_)
| Info::Connection(_)
| Info::Secret(_)
Expand Down
8 changes: 3 additions & 5 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::ops::Deref;
use either::Either;
use itertools::{EitherOrBoth, Itertools};
use risingwave_common::bail;
use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME};
use risingwave_common::catalog::{Field, TableId};
use risingwave_sqlparser::ast::{
AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
TableFactor,
Expand Down Expand Up @@ -486,12 +486,10 @@ impl Binder {
})?
.into();

let schema = args
.get(1)
.map_or(DEFAULT_SCHEMA_NAME.to_owned(), |arg| arg.to_string());
let schema = args.get(1).map(|arg| arg.to_string());

let table_name = self.catalog.get_table_name_by_id(table_id)?;
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, None)
self.bind_relation_by_name_inner(schema.as_deref(), &table_name, alias, None)
}

pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result<Relation> {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub trait CatalogWriter: Send + Sync {

async fn drop_database(&self, database_id: u32) -> Result<()>;

async fn drop_schema(&self, schema_id: u32) -> Result<()>;
async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;

async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;

Expand Down Expand Up @@ -487,8 +487,8 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn drop_schema(&self, schema_id: u32) -> Result<()> {
let version = self.meta_client.drop_schema(schema_id).await?;
async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
let version = self.meta_client.drop_schema(schema_id, cascade).await?;
self.wait_version(version).await
}

Expand Down
10 changes: 6 additions & 4 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,8 @@ pub(super) async fn handle_create_table_plan(

let session = &handler_args.session;
let db_name = &session.database();
let user_name = &session.user_name();
let search_path = session.config().search_path();
let (schema_name, resolved_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let (database_id, schema_id) =
Expand All @@ -1127,12 +1129,12 @@ pub(super) async fn handle_create_table_plan(

let source = {
let catalog_reader = session.env().catalog_reader().read_guard();
let schema_name = format_encode
.clone()
.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
let schema_path =
SchemaPath::new(format_encode.as_deref(), &search_path, user_name);

let (source, _) = catalog_reader.get_source_by_name(
db_name,
SchemaPath::Name(schema_name.as_str()),
schema_path,
source_name.as_str(),
)?;
source.clone()
Expand Down
16 changes: 5 additions & 11 deletions src/frontend/src/handler/drop_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::is_system_schema;
use risingwave_sqlparser::ast::{DropMode, ObjectName};

Expand Down Expand Up @@ -59,19 +58,14 @@ pub async fn handle_drop_schema(
}
}
};
match mode {
Some(DropMode::Restrict) | None => {
// Note: we don't check if the schema is empty here.
// The check is done in meta `ensure_schema_empty`.
}
Some(DropMode::Cascade) => {
bail_not_implemented!(issue = 6773, "drop schema with cascade mode");
}
};

// Note: we don't check if the schema is empty here when drop mode is cascade.
// The check is done in meta `ensure_schema_empty`.
let cascade = matches!(mode, Some(DropMode::Cascade));

session.check_privilege_for_drop_alter_db_schema(&schema)?;

let catalog_writer = session.catalog_writer()?;
catalog_writer.drop_schema(schema.id()).await?;
catalog_writer.drop_schema(schema.id(), cascade).await?;
Ok(PgResponse::empty_result(StatementType::DROP_SCHEMA))
}
6 changes: 3 additions & 3 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,11 @@ pub async fn handle(
| ObjectType::Source
| ObjectType::Subscription
| ObjectType::Index
| ObjectType::Table => {
| ObjectType::Table
| ObjectType::Schema => {
cascade = true;
}
ObjectType::Schema
| ObjectType::Database
ObjectType::Database
| ObjectType::User
| ObjectType::Connection
| ObjectType::Secret => {
Expand Down
74 changes: 62 additions & 12 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common_service::ObserverState;
use risingwave_hummock_sdk::FrontendHummockVersion;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
use risingwave_rpc_client::ComputeClientPoolRef;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl ObserverState for FrontendObserverNode {
match info.to_owned() {
Info::Database(_)
| Info::Schema(_)
| Info::RelationGroup(_)
| Info::ObjectGroup(_)
| Info::Function(_)
| Info::Connection(_) => {
self.handle_catalog_notification(resp);
Expand Down Expand Up @@ -261,13 +261,27 @@ impl FrontendObserverNode {
Operation::Update => catalog_guard.update_schema(schema),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::RelationGroup(relation_group) => {
for relation in &relation_group.relations {
let Some(relation) = relation.relation_info.as_ref() else {
Info::ObjectGroup(object_group) => {
for object in &object_group.objects {
let Some(obj) = object.object_info.as_ref() else {
continue;
};
match relation {
RelationInfo::Table(table) => match resp.operation() {
match obj {
ObjectInfo::Database(db) => match resp.operation() {
Operation::Add => catalog_guard.create_database(db),
Operation::Delete => catalog_guard.drop_database(db.id),
Operation::Update => catalog_guard.update_database(db),
_ => panic!("receive an unsupported notify {:?}", resp),
},
ObjectInfo::Schema(schema) => match resp.operation() {
Operation::Add => catalog_guard.create_schema(schema),
Operation::Delete => {
catalog_guard.drop_schema(schema.database_id, schema.id)
}
Operation::Update => catalog_guard.update_schema(schema),
_ => panic!("receive an unsupported notify {:?}", resp),
},
PbObjectInfo::Table(table) => match resp.operation() {
Operation::Add => catalog_guard.create_table(table),
Operation::Delete => catalog_guard.drop_table(
table.database_id,
Expand All @@ -289,7 +303,7 @@ impl FrontendObserverNode {
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Source(source) => match resp.operation() {
PbObjectInfo::Source(source) => match resp.operation() {
Operation::Add => catalog_guard.create_source(source),
Operation::Delete => catalog_guard.drop_source(
source.database_id,
Expand All @@ -299,15 +313,15 @@ impl FrontendObserverNode {
Operation::Update => catalog_guard.update_source(source),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Sink(sink) => match resp.operation() {
PbObjectInfo::Sink(sink) => match resp.operation() {
Operation::Add => catalog_guard.create_sink(sink),
Operation::Delete => {
catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
}
Operation::Update => catalog_guard.update_sink(sink),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Subscription(subscription) => match resp.operation() {
PbObjectInfo::Subscription(subscription) => match resp.operation() {
Operation::Add => catalog_guard.create_subscription(subscription),
Operation::Delete => catalog_guard.drop_subscription(
subscription.database_id,
Expand All @@ -317,7 +331,7 @@ impl FrontendObserverNode {
Operation::Update => catalog_guard.update_subscription(subscription),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Index(index) => match resp.operation() {
PbObjectInfo::Index(index) => match resp.operation() {
Operation::Add => catalog_guard.create_index(index),
Operation::Delete => catalog_guard.drop_index(
index.database_id,
Expand All @@ -327,14 +341,50 @@ impl FrontendObserverNode {
Operation::Update => catalog_guard.update_index(index),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::View(view) => match resp.operation() {
PbObjectInfo::View(view) => match resp.operation() {
Operation::Add => catalog_guard.create_view(view),
Operation::Delete => {
catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
}
Operation::Update => catalog_guard.update_view(view),
_ => panic!("receive an unsupported notify {:?}", resp),
},
ObjectInfo::Function(function) => match resp.operation() {
Operation::Add => catalog_guard.create_function(function),
Operation::Delete => catalog_guard.drop_function(
function.database_id,
function.schema_id,
function.id.into(),
),
Operation::Update => catalog_guard.update_function(function),
_ => panic!("receive an unsupported notify {:?}", resp),
},
ObjectInfo::Connection(connection) => match resp.operation() {
Operation::Add => catalog_guard.create_connection(connection),
Operation::Delete => catalog_guard.drop_connection(
connection.database_id,
connection.schema_id,
connection.id,
),
Operation::Update => catalog_guard.update_connection(connection),
_ => panic!("receive an unsupported notify {:?}", resp),
},
ObjectInfo::Secret(secret) => {
let mut secret = secret.clone();
// The secret value should not be revealed to users. So mask it in the frontend catalog.
secret.value =
"SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
match resp.operation() {
Operation::Add => catalog_guard.create_secret(&secret),
Operation::Delete => catalog_guard.drop_secret(
secret.database_id,
secret.schema_id,
SecretId::new(secret.id),
),
Operation::Update => catalog_guard.update_secret(&secret),
_ => panic!("receive an unsupported notify {:?}", resp),
}
}
}
}
}
Expand Down
Loading

0 comments on commit 297a634

Please sign in to comment.