From eed301b66dafec01a3200cccaf77f509ce88ef03 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 21 Jan 2025 14:11:43 +0800 Subject: [PATCH 1/6] fix: fix several issues about the public schema (#20201) --- src/frontend/src/handler/show.rs | 272 ++++++++++++++++++------------- src/frontend/src/session.rs | 13 +- 2 files changed, 160 insertions(+), 125 deletions(-) diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 48743b240c4d4..bb952249788f8 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -20,7 +20,7 @@ use pgwire::pg_protocol::truncated_fmt; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::pg_server::Session; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, DEFAULT_SCHEMA_NAME}; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc}; use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD}; use risingwave_common::types::{DataType, Fields, Timestamptz}; use risingwave_common::util::addr::HostAddr; @@ -33,8 +33,11 @@ use risingwave_sqlparser::ast::{ use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt}; use crate::binder::{Binder, Relation}; +use crate::catalog::catalog_service::CatalogReadGuard; +use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::schema_catalog::SchemaCatalog; use crate::catalog::{CatalogError, IndexCatalog}; -use crate::error::Result; +use crate::error::{Result, RwError}; use crate::handler::create_connection::print_connection_params; use crate::handler::HandlerArgs; use crate::session::cursor_manager::SubscriptionCursor; @@ -102,12 +105,6 @@ pub fn get_indexes_from_table( Ok(indexes) } -fn schema_or_default(schema: &Option) -> String { - schema - .as_ref() - .map_or_else(|| DEFAULT_SCHEMA_NAME.to_owned(), |s| s.real_value()) -} - fn schema_or_search_path( session: &Arc, schema: &Option, @@ -130,6 +127,28 @@ fn schema_or_search_path( } } +fn iter_schema_items( + session: &Arc, + schema: &Option, + reader: &CatalogReadGuard, + mut f: F, +) -> Vec +where + F: FnMut(&SchemaCatalog) -> Vec, +{ + let search_path = session.config().search_path(); + + schema_or_search_path(session, schema, &search_path) + .into_iter() + .filter_map(|schema| { + reader + .get_schema_by_name(&session.database(), schema.as_ref()) + .ok() + }) + .flat_map(|s| f(s).into_iter()) + .collect() +} + #[derive(Fields)] #[fields(style = "Title Case")] struct ShowObjectRow { @@ -307,78 +326,72 @@ pub async fn handle_show_object( let catalog_reader = session.env().catalog_reader(); let names = match command { - // If not include schema name, use default schema name ShowObject::Table { schema } => { - let search_path = session.config().search_path(); - let mut table_names_in_schema = vec![]; - for schema in schema_or_search_path(&session, &schema, &search_path) { - // If the schema is not found, skip it - if let Ok(schema_catalog) = catalog_reader - .read_guard() - .get_schema_by_name(&session.database(), schema.as_ref()) - { - table_names_in_schema - .extend(schema_catalog.iter_user_table().map(|t| t.name.clone())); - } - } - - table_names_in_schema + let reader = catalog_reader.read_guard(); + iter_schema_items(&session, &schema, &reader, |schema| { + schema.iter_user_table().map(|t| t.name.clone()).collect() + }) + } + ShowObject::InternalTable { schema } => { + let reader = catalog_reader.read_guard(); + iter_schema_items(&session, &schema, &reader, |schema| { + schema + .iter_internal_table() + .map(|t| t.name.clone()) + .collect() + }) } - ShowObject::InternalTable { schema } => catalog_reader - .read_guard() - .get_schema_by_name(&session.database(), &schema_or_default(&schema))? - .iter_internal_table() - .map(|t| t.name.clone()) - .collect(), ShowObject::Database => catalog_reader.read_guard().get_all_database_names(), ShowObject::Schema => catalog_reader .read_guard() .get_all_schema_names(&session.database())?, - ShowObject::View { schema } => catalog_reader - .read_guard() - .get_schema_by_name(&session.database(), &schema_or_default(&schema))? - .iter_view() - .map(|t| t.name.clone()) - .collect(), - ShowObject::MaterializedView { schema } => catalog_reader - .read_guard() - .get_schema_by_name(&session.database(), &schema_or_default(&schema))? - .iter_created_mvs() - .map(|t| t.name.clone()) - .collect(), - ShowObject::Source { schema } => catalog_reader - .read_guard() - .get_schema_by_name(&session.database(), &schema_or_default(&schema))? - .iter_source() - .map(|t| t.name.clone()) - .chain(session.temporary_source_manager().keys()) - .collect(), - ShowObject::Sink { schema } => catalog_reader - .read_guard() - .get_schema_by_name(&session.database(), &schema_or_default(&schema))? - .iter_sink() - .map(|t| t.name.clone()) - .collect(), + ShowObject::View { schema } => { + let reader = catalog_reader.read_guard(); + iter_schema_items(&session, &schema, &reader, |schema| { + schema.iter_view().map(|t| t.name.clone()).collect() + }) + } + ShowObject::MaterializedView { schema } => { + let reader = catalog_reader.read_guard(); + iter_schema_items(&session, &schema, &reader, |schema| { + schema.iter_created_mvs().map(|t| t.name.clone()).collect() + }) + } + ShowObject::Source { schema } => { + let reader = catalog_reader.read_guard(); + let mut sources = iter_schema_items(&session, &schema, &reader, |schema| { + schema.iter_source().map(|t| t.name.clone()).collect() + }); + sources.extend(session.temporary_source_manager().keys()); + sources + } + ShowObject::Sink { schema } => { + let reader = catalog_reader.read_guard(); + iter_schema_items(&session, &schema, &reader, |schema| { + schema.iter_sink().map(|t| t.name.clone()).collect() + }) + } ShowObject::Subscription { schema } => { - let rows = catalog_reader - .read_guard() - .get_schema_by_name(&session.database(), &schema_or_default(&schema))? - .iter_subscription() - .map(|t| ShowSubscriptionRow { - name: t.name.clone(), - retention_seconds: t.retention_seconds as i64, - }) - .collect_vec(); + let reader = catalog_reader.read_guard(); + let rows = iter_schema_items(&session, &schema, &reader, |schema| { + schema + .iter_subscription() + .map(|t| ShowSubscriptionRow { + name: t.name.clone(), + retention_seconds: t.retention_seconds as i64, + }) + .collect() + }); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) .rows(rows) .into()); } - ShowObject::Secret { schema } => catalog_reader - .read_guard() - .get_schema_by_name(&session.database(), &schema_or_default(&schema))? - .iter_secret() - .map(|t| t.name.clone()) - .collect(), + ShowObject::Secret { schema } => { + let reader = catalog_reader.read_guard(); + iter_schema_items(&session, &schema, &reader, |schema| { + schema.iter_secret().map(|t| t.name.clone()).collect() + }) + } ShowObject::Columns { table } => { let Ok(columns) = get_columns_from_table(&session, table.clone()) .or(get_columns_from_sink(&session, table.clone())) @@ -404,10 +417,8 @@ pub async fn handle_show_object( } ShowObject::Connection { schema } => { let reader = catalog_reader.read_guard(); - let schema = - reader.get_schema_by_name(&session.database(), &schema_or_default(&schema))?; - let rows = schema - .iter_connections() + let rows = iter_schema_items(&session, &schema, &reader, |schema| { + schema.iter_connections() .map(|c| { let name = c.name.clone(); let r#type = match &c.info { @@ -420,13 +431,13 @@ pub async fn handle_show_object( }; let source_names = schema .get_source_ids_by_connection(c.id) - .unwrap_or(Vec::new()) + .unwrap_or_default() .into_iter() .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str())) .collect_vec(); let sink_names = schema .get_sink_ids_by_connection(c.id) - .unwrap_or(Vec::new()) + .unwrap_or_default() .into_iter() .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str())) .collect_vec(); @@ -452,23 +463,26 @@ pub async fn handle_show_object( r#type, properties, } - }); + }).collect_vec() + }); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) .rows(rows) .into()); } ShowObject::Function { schema } => { let reader = catalog_reader.read_guard(); - let rows = reader - .get_schema_by_name(&session.database(), &schema_or_default(&schema))? - .iter_function() - .map(|t| ShowFunctionRow { - name: t.name.clone(), - arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "), - return_type: t.return_type.to_string(), - language: t.language.clone(), - link: t.link.clone(), - }); + let rows = iter_schema_items(&session, &schema, &reader, |schema| { + schema + .iter_function() + .map(|t| ShowFunctionRow { + name: t.name.clone(), + arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "), + return_type: t.return_type.to_string(), + language: t.language.clone(), + link: t.link.clone(), + }) + .collect() + }); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) .rows(rows) .into()); @@ -628,57 +642,81 @@ pub fn handle_show_create_object( let database = session.database(); let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, name.clone())?; - let schema_name = schema_name.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned()); - let schema = catalog_reader.get_schema_by_name(&database, &schema_name)?; - let sql = match show_create_type { + let search_path = session.config().search_path(); + let user_name = &session.user_name(); + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let (sql, schema_name) = match show_create_type { ShowCreateType::MaterializedView => { - let mv = schema - .get_created_table_by_name(&object_name) - .filter(|t| t.is_mview()) + let (mv, schema) = schema_path + .try_find(|schema_name| { + Ok::<_, RwError>( + catalog_reader + .get_schema_by_name(&database, schema_name)? + .get_created_table_by_name(&object_name) + .filter(|t| t.is_mview()), + ) + })? .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?; - mv.create_sql() + (mv.create_sql(), schema) } ShowCreateType::View => { - let view = schema - .get_view_by_name(&object_name) - .ok_or_else(|| CatalogError::NotFound("view", name.to_string()))?; - view.create_sql(schema.name()) + let (view, schema) = + catalog_reader.get_view_by_name(&database, schema_path, &object_name)?; + (view.create_sql(schema.to_owned()), schema) } ShowCreateType::Table => { - let table = schema - .get_created_table_by_name(&object_name) - .filter(|t| t.is_user_table()) + let (table, schema) = schema_path + .try_find(|schema_name| { + Ok::<_, RwError>( + catalog_reader + .get_schema_by_name(&database, schema_name)? + .get_created_table_by_name(&object_name) + .filter(|t| t.is_user_table()), + ) + })? .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?; - table.create_sql_purified() + + (table.create_sql_purified(), schema) } ShowCreateType::Sink => { - let sink = schema - .get_sink_by_name(&object_name) - .ok_or_else(|| CatalogError::NotFound("sink", name.to_string()))?; - sink.create_sql() + let (sink, schema) = + catalog_reader.get_sink_by_name(&database, schema_path, &object_name)?; + (sink.create_sql(), schema) } ShowCreateType::Source => { - let source = schema - .get_source_by_name(&object_name) - .filter(|s| s.associated_table_id.is_none()) + let (source, schema) = schema_path + .try_find(|schema_name| { + Ok::<_, RwError>( + catalog_reader + .get_schema_by_name(&database, schema_name)? + .get_source_by_name(&object_name) + .filter(|s| s.associated_table_id.is_none()), + ) + })? .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?; - source.create_sql_purified() + (source.create_sql_purified(), schema) } ShowCreateType::Index => { - let index = schema - .get_created_table_by_name(&object_name) - .filter(|t| t.is_index()) + let (index, schema) = schema_path + .try_find(|schema_name| { + Ok::<_, RwError>( + catalog_reader + .get_schema_by_name(&database, schema_name)? + .get_created_table_by_name(&object_name) + .filter(|t| t.is_index()), + ) + })? .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?; - index.create_sql() + (index.create_sql(), schema) } ShowCreateType::Function => { bail_not_implemented!("show create on: {}", show_create_type); } ShowCreateType::Subscription => { - let subscription = schema - .get_subscription_by_name(&object_name) - .ok_or_else(|| CatalogError::NotFound("subscription", name.to_string()))?; - subscription.create_sql() + let (subscription, schema) = + catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?; + (subscription.create_sql(), schema) } }; let name = format!("{}.{}", schema_name, object_name); diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 7f99d91d6e0d5..dbc9f4f91a010 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -42,7 +42,6 @@ use risingwave_batch::worker_manager::worker_node_manager::{ WorkerNodeManager, WorkerNodeManagerRef, }; use risingwave_common::acl::AclMode; -use risingwave_common::catalog::DEFAULT_SCHEMA_NAME; #[cfg(test)] use risingwave_common::catalog::{ DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID, @@ -1017,13 +1016,11 @@ impl SessionImpl { }; check_schema_writable(&schema.name())?; - if schema.name() != DEFAULT_SCHEMA_NAME { - self.check_privileges(&[ObjectCheckItem::new( - schema.owner(), - AclMode::Create, - Object::SchemaId(schema.id()), - )])?; - } + self.check_privileges(&[ObjectCheckItem::new( + schema.owner(), + AclMode::Create, + Object::SchemaId(schema.id()), + )])?; let db_id = catalog_reader.get_database_by_name(db_name)?.id(); Ok((db_id, schema.id())) From 5874059dda1ed9100269bfa6c8ce9ac1e95a4431 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Jan 2025 14:45:12 +0800 Subject: [PATCH 2/6] fix(risedev): set `CONNECTOR_LIBS_PATH` for `risedev playground` (#20224) Signed-off-by: Bugen Zhao --- Makefile.toml | 4 ++++ src/jni_core/src/jvm_runtime.rs | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/Makefile.toml b/Makefile.toml index 98ccb13e3eaaf..d088108964225 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -688,6 +688,10 @@ if [[ $ENABLE_COREDUMP == "true" ]]; then ulimit -c unlimited fi +if [[ $ENABLE_BUILD_RW_CONNECTOR == "true" ]]; then + export CONNECTOR_LIBS_PATH="${PREFIX_BIN}/connector-node/libs/" +fi + set -x target/${RISEDEV_BUILD_TARGET_DIR}${BUILD_MODE_DIR}/risingwave playground $@ ''' diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs index 63bf7b4fbef26..3345ba0c6c0c1 100644 --- a/src/jni_core/src/jvm_runtime.rs +++ b/src/jni_core/src/jvm_runtime.rs @@ -78,7 +78,14 @@ impl JavaVmWrapper { let mut class_vec = vec![]; - let entries = fs::read_dir(&libs_path).context("failed to read connector libs")?; + let entries = fs::read_dir(&libs_path).context(if cfg!(debug_assertions) { + "failed to read connector libs; \ + for RiseDev users, please check if ENABLE_BUILD_RW_CONNECTOR is set with `risedev configure` + " + } else { + "failed to read connector libs, \ + please check if env var CONNECTOR_LIBS_PATH is correctly configured" + })?; for entry in entries.flatten() { let entry_path = entry.path(); if entry_path.file_name().is_some() { From 1b74a95f2d507c0f310f789693c04ea697d21fd3 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 21 Jan 2025 16:18:59 +0800 Subject: [PATCH 3/6] feat: support real system func pg_table_is_visible (#20234) --- proto/expr.proto | 1 + .../tests/testdata/output/subquery.yaml | 26 ++++--- .../binder/expr/function/builtin_scalar.rs | 2 +- src/frontend/src/catalog/schema_catalog.rs | 11 +++ .../src/catalog/system_catalog/mod.rs | 4 ++ src/frontend/src/expr/function_impl/mod.rs | 1 + .../expr/function_impl/pg_table_is_visible.rs | 72 +++++++++++++++++++ src/frontend/src/expr/pure.rs | 3 +- .../src/optimizer/plan_expr_visitor/strong.rs | 1 + 9 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 src/frontend/src/expr/function_impl/pg_table_is_visible.rs diff --git a/proto/expr.proto b/proto/expr.proto index 5bd2809f7d314..57f49a1b3d7fe 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -328,6 +328,7 @@ message ExprNode { PG_IS_IN_RECOVERY = 2411; RW_RECOVERY_STATUS = 2412; RW_EPOCH_TO_TS = 2413; + PG_TABLE_IS_VISIBLE = 2414; // EXTERNAL ICEBERG_TRANSFORM = 2201; diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index 6d92ab8947741..1e3935a11dfa1 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -224,7 +224,7 @@ ORDER BY 1,2; logical_plan: |- LogicalProject { exprs: [rw_schemas.name, rw_tables.name, Case(($expr1 = 'r':Varchar), 'table':Varchar, ($expr1 = 'v':Varchar), 'view':Varchar, ($expr1 = 'm':Varchar), 'materialized view':Varchar, ($expr1 = 'i':Varchar), 'index':Varchar, ($expr1 = 'S':Varchar), 'sequence':Varchar, ($expr1 = 's':Varchar), 'special':Varchar, ($expr1 = 't':Varchar), 'TOAST table':Varchar, ($expr1 = 'f':Varchar), 'foreign table':Varchar, ($expr1 = 'p':Varchar), 'partitioned table':Varchar, ($expr1 = 'I':Varchar), 'partitioned index':Varchar) as $expr4, PgGetUserbyid(rw_tables.owner) as $expr5] } - └─LogicalFilter { predicate: In($expr1, 'r':Varchar, 'p':Varchar, 'v':Varchar, 'm':Varchar, 'S':Varchar, 'f':Varchar, '':Varchar) AND (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) } + └─LogicalFilter { predicate: In($expr1, 'r':Varchar, 'p':Varchar, 'v':Varchar, 'm':Varchar, 'S':Varchar, 'f':Varchar, '':Varchar) AND (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) AND PgTableIsVisible(rw_tables.id) } └─LogicalJoin { type: LeftOuter, on: (rw_schemas.id = rw_tables.schema_id), output: all } ├─LogicalShare { id: 18 } │ └─LogicalProject { exprs: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, 'p':Varchar, Case(('table':Varchar = 'table':Varchar), 'r':Varchar, ('table':Varchar = 'system table':Varchar), 'r':Varchar, ('table':Varchar = 'index':Varchar), 'i':Varchar, ('table':Varchar = 'view':Varchar), 'v':Varchar, ('table':Varchar = 'materialized view':Varchar), 'm':Varchar) as $expr1, 0:Int32::Int16 as $expr2, 0:Int32, 0:Int32, Array as $expr3, false:Boolean, null:Varchar] } @@ -264,24 +264,28 @@ ├─BatchExchange { order: [], dist: HashShard(rw_tables.schema_id) } │ └─BatchUnion { all: true } │ ├─BatchProject { exprs: [rw_tables.name, 'table':Varchar, rw_tables.schema_id, rw_tables.owner] } - │ │ └─BatchScan { table: rw_tables, columns: [rw_tables.name, rw_tables.schema_id, rw_tables.owner], distribution: Single } + │ │ └─BatchFilter { predicate: PgTableIsVisible(rw_tables.id) } + │ │ └─BatchScan { table: rw_tables, columns: [rw_tables.name, rw_tables.schema_id, rw_tables.owner, rw_tables.id], distribution: Single } │ ├─BatchProject { exprs: [rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner] } - │ │ └─BatchScan { table: rw_system_tables, columns: [rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner], distribution: Single } + │ │ └─BatchFilter { predicate: PgTableIsVisible(rw_system_tables.id) } + │ │ └─BatchScan { table: rw_system_tables, columns: [rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.id], distribution: Single } │ ├─BatchProject { exprs: [rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner] } - │ │ └─BatchFilter { predicate: null:Boolean } - │ │ └─BatchScan { table: rw_sources, columns: [rw_sources.name, rw_sources.schema_id, rw_sources.owner], distribution: Single } + │ │ └─BatchFilter { predicate: null:Boolean AND PgTableIsVisible(rw_sources.id) } + │ │ └─BatchScan { table: rw_sources, columns: [rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.id], distribution: Single } │ ├─BatchProject { exprs: [rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner] } │ │ └─BatchValues { rows: [] } │ ├─BatchProject { exprs: [rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner] } - │ │ └─BatchFilter { predicate: null:Boolean } - │ │ └─BatchScan { table: rw_sinks, columns: [rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner], distribution: Single } + │ │ └─BatchFilter { predicate: null:Boolean AND PgTableIsVisible(rw_sinks.id) } + │ │ └─BatchScan { table: rw_sinks, columns: [rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.id], distribution: Single } │ ├─BatchProject { exprs: [rw_subscriptions.name, 'subscription':Varchar, rw_subscriptions.schema_id, rw_subscriptions.owner] } - │ │ └─BatchFilter { predicate: null:Boolean } - │ │ └─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.name, rw_subscriptions.schema_id, rw_subscriptions.owner], distribution: Single } + │ │ └─BatchFilter { predicate: null:Boolean AND PgTableIsVisible(rw_subscriptions.id) } + │ │ └─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.name, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.id], distribution: Single } │ ├─BatchProject { exprs: [rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner] } - │ │ └─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner], distribution: Single } + │ │ └─BatchFilter { predicate: PgTableIsVisible(rw_materialized_views.id) } + │ │ └─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.id], distribution: Single } │ └─BatchProject { exprs: [rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner] } - │ └─BatchScan { table: rw_views, columns: [rw_views.name, rw_views.schema_id, rw_views.owner], distribution: Single } + │ └─BatchFilter { predicate: PgTableIsVisible(rw_views.id) } + │ └─BatchScan { table: rw_views, columns: [rw_views.name, rw_views.schema_id, rw_views.owner, rw_views.id], distribution: Single } └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) } └─BatchFilter { predicate: (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) } └─BatchScan { table: rw_schemas, columns: [rw_schemas.id, rw_schemas.name], distribution: Single } diff --git a/src/frontend/src/binder/expr/function/builtin_scalar.rs b/src/frontend/src/binder/expr/function/builtin_scalar.rs index 7b75689895c41..2797104012e49 100644 --- a/src/frontend/src/binder/expr/function/builtin_scalar.rs +++ b/src/frontend/src/binder/expr/function/builtin_scalar.rs @@ -598,7 +598,7 @@ impl Binder { Ok(ExprImpl::literal_varchar(new_value.to_string())) }))), ("format_type", raw_call(ExprType::FormatType)), - ("pg_table_is_visible", raw_literal(ExprImpl::literal_bool(true))), + ("pg_table_is_visible", raw_call(ExprType::PgTableIsVisible)), ("pg_type_is_visible", raw_literal(ExprImpl::literal_bool(true))), ("pg_get_constraintdef", raw_literal(ExprImpl::literal_null(DataType::Varchar))), ("pg_get_partkeydef", raw_literal(ExprImpl::literal_null(DataType::Varchar))), diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 129105c551114..2dd489fcb0b92 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -824,6 +824,17 @@ impl SchemaCatalog { } } + pub fn contains_object(&self, oid: u32) -> bool { + self.table_by_id.contains_key(&TableId::new(oid)) + || self.index_by_id.contains_key(&IndexId::new(oid)) + || self.source_by_id.contains_key(&oid) + || self.sink_by_id.contains_key(&oid) + || self.view_by_id.contains_key(&oid) + || self.function_by_id.contains_key(&FunctionId::new(oid)) + || self.subscription_by_id.contains_key(&oid) + || self.connection_by_id.contains_key(&oid) + } + pub fn id(&self) -> SchemaId { self.id } diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index d3162d7c95fd2..2e1d0d3618987 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -314,6 +314,10 @@ pub fn get_sys_views_in_schema(schema_name: &str) -> Vec> { .collect() } +pub fn is_system_catalog(oid: u32) -> bool { + oid >= SYS_CATALOG_START_ID as u32 +} + /// The global registry of all builtin catalogs. pub static SYS_CATALOGS: LazyLock = LazyLock::new(|| { tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len()); diff --git a/src/frontend/src/expr/function_impl/mod.rs b/src/frontend/src/expr/function_impl/mod.rs index 25d74cc7b023b..2f194cc5a9505 100644 --- a/src/frontend/src/expr/function_impl/mod.rs +++ b/src/frontend/src/expr/function_impl/mod.rs @@ -22,5 +22,6 @@ mod pg_get_viewdef; mod pg_index_column_has_property; mod pg_indexes_size; mod pg_relation_size; +mod pg_table_is_visible; mod rw_epoch_to_ts; mod rw_recovery_status; diff --git a/src/frontend/src/expr/function_impl/pg_table_is_visible.rs b/src/frontend/src/expr/function_impl/pg_table_is_visible.rs new file mode 100644 index 0000000000000..dcb8cda1b708b --- /dev/null +++ b/src/frontend/src/expr/function_impl/pg_table_is_visible.rs @@ -0,0 +1,72 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::acl::AclMode; +use risingwave_common::session_config::SearchPath; +use risingwave_expr::{capture_context, function, Result}; +use risingwave_pb::user::grant_privilege::Object as GrantObject; + +use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER}; +use crate::catalog::system_catalog::is_system_catalog; +use crate::catalog::CatalogReader; +use crate::expr::function_impl::has_privilege::user_not_found_err; +use crate::session::AuthContext; +use crate::user::user_service::UserInfoReader; + +#[function("pg_table_is_visible(int4) -> boolean")] +fn pg_table_is_visible(oid: i32) -> Result> { + pg_table_is_visible_impl_captured(oid) +} + +#[capture_context(CATALOG_READER, USER_INFO_READER, AUTH_CONTEXT, SEARCH_PATH, DB_NAME)] +fn pg_table_is_visible_impl( + catalog: &CatalogReader, + user_info: &UserInfoReader, + auth_context: &AuthContext, + search_path: &SearchPath, + db_name: &str, + oid: i32, +) -> Result> { + // To maintain consistency with PostgreSQL, we ensure that system catalogs are always visible. + if is_system_catalog(oid as u32) { + return Ok(Some(true)); + } + + let catalog_reader = catalog.read_guard(); + let user_reader = user_info.read_guard(); + let user_info = user_reader + .get_user_by_name(&auth_context.user_name) + .ok_or(user_not_found_err( + format!("User {} not found", auth_context.user_name).as_str(), + ))?; + // Return true only if: + // 1. The schema of the object exists in the search path. + // 2. User have `USAGE` privilege on the schema. + for schema in search_path.path() { + if let Ok(schema) = catalog_reader.get_schema_by_name(db_name, schema) { + if schema.contains_object(oid as u32) { + return if user_info.is_super + || user_info + .check_privilege(&GrantObject::SchemaId(schema.id()), AclMode::Usage) + { + Ok(Some(true)) + } else { + Ok(Some(false)) + }; + } + } + } + + Ok(None) +} diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index 013f9f1d7ddb1..74fa79c476b60 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -294,7 +294,8 @@ impl ExprVisitor for ImpureAnalyzer { | Type::HasSchemaPrivilege | Type::MakeTimestamptz | Type::PgIsInRecovery - | Type::RwRecoveryStatus => self.impure = true, + | Type::RwRecoveryStatus + | Type::PgTableIsVisible => self.impure = true, } } } diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index 6a0ac319c1db0..53c6f61a9511a 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -322,6 +322,7 @@ impl Strong { | ExprType::PgGetSerialSequence | ExprType::PgIndexColumnHasProperty | ExprType::PgIsInRecovery + | ExprType::PgTableIsVisible | ExprType::RwRecoveryStatus | ExprType::IcebergTransform | ExprType::HasTablePrivilege From 509ec7a7b196ed1e4fe5290655cf93c1dd785b29 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 21 Jan 2025 17:10:29 +0800 Subject: [PATCH 4/6] feat: add `specific_resource_group` to `rw_streaming_jobs` (#20207) Signed-off-by: Shanicky Chen --- proto/meta.proto | 12 +++-- .../src/catalog/system_catalog/mod.rs | 4 +- .../catalog/system_catalog/rw_catalog/mod.rs | 1 + .../rw_catalog/rw_actor_infos.rs | 2 +- .../rw_catalog/rw_streaming_jobs.rs | 50 +++++++++++++++++++ .../rw_catalog/rw_table_fragments.rs | 2 +- src/frontend/src/meta_client.rs | 8 +-- src/frontend/src/test_utils.rs | 4 +- src/meta/service/src/stream_service.rs | 14 ++++-- src/meta/src/controller/catalog/get_op.rs | 9 +--- src/meta/src/controller/fragment.rs | 20 ++++++-- src/rpc_client/src/meta_client.rs | 8 +-- .../integration_tests/scale/resource_group.rs | 29 +++++++++++ 13 files changed, 128 insertions(+), 35 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_jobs.rs diff --git a/proto/meta.proto b/proto/meta.proto index 5aea8ccef8233..975798de90da9 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -232,16 +232,18 @@ message ListTableFragmentsResponse { map table_fragments = 1; } -message ListTableFragmentStatesRequest {} +message ListStreamingJobStatesRequest {} -message ListTableFragmentStatesResponse { - message TableFragmentState { +message ListStreamingJobStatesResponse { + message StreamingJobState { uint32 table_id = 1; TableFragments.State state = 2; TableParallelism parallelism = 3; uint32 max_parallelism = 4; + string name = 5; + string resource_group = 6; } - repeated TableFragmentState states = 1; + repeated StreamingJobState states = 1; } message ListFragmentDistributionRequest {} @@ -333,7 +335,7 @@ service StreamManagerService { rpc Resume(ResumeRequest) returns (ResumeResponse); rpc CancelCreatingJobs(CancelCreatingJobsRequest) returns (CancelCreatingJobsResponse); rpc ListTableFragments(ListTableFragmentsRequest) returns (ListTableFragmentsResponse); - rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse); + rpc ListStreamingJobStates(ListStreamingJobStatesRequest) returns (ListStreamingJobStatesResponse); rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse); rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse); rpc ListActorSplits(ListActorSplitsRequest) returns (ListActorSplitsResponse); diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 2e1d0d3618987..8e7f7ed1db221 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -32,7 +32,7 @@ use risingwave_common::error::BoxedError; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::types::DataType; -use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; +use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism}; use risingwave_pb::user::grant_privilege::Object; @@ -216,7 +216,7 @@ pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String { ) } -fn extract_parallelism_from_table_state(state: &TableFragmentState) -> String { +fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String { match state .parallelism .as_ref() diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index c3b11e69fcb6a..77345f508f045 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -62,4 +62,5 @@ mod rw_actor_id_to_ddl; mod rw_actor_splits; mod rw_fragment_id_to_ddl; mod rw_internal_table_info; +mod rw_streaming_jobs; mod rw_worker_actor_count; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs index 4e66f011a5c9b..a658539ead2b7 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs @@ -33,7 +33,7 @@ struct RwActorInfo { async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result> { let table_ids = reader .meta_client - .list_table_fragment_states() + .list_streaming_job_states() .await? .into_iter() .map(|fragment| fragment.table_id) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_jobs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_jobs.rs new file mode 100644 index 0000000000000..4557b296f371e --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_jobs.rs @@ -0,0 +1,50 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::Fields; +use risingwave_frontend_macro::system_catalog; + +use crate::catalog::system_catalog::{extract_parallelism_from_table_state, SysCatalogReaderImpl}; +use crate::error::Result; + +#[derive(Fields)] +struct RwStreamingJob { + #[primary_key] + job: i32, + name: String, + status: String, + parallelism: String, + max_parallelism: i32, + resource_group: String, +} + +#[system_catalog(table, "rw_catalog.rw_streaming_jobs")] +async fn read_rw_streaming_jobs(reader: &SysCatalogReaderImpl) -> Result> { + let states = reader.meta_client.list_streaming_job_states().await?; + + Ok(states + .into_iter() + .map(|state| { + let parallelism = extract_parallelism_from_table_state(&state); + RwStreamingJob { + job: state.table_id as i32, + status: state.state().as_str_name().into(), + name: state.name, + parallelism: parallelism.to_uppercase(), + max_parallelism: state.max_parallelism as i32, + resource_group: state.resource_group, + } + }) + .collect()) +} diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs index 7458afb1b43ae..f6da43879a755 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs @@ -31,7 +31,7 @@ struct RwTableFragment { async fn read_rw_table_fragments_info( reader: &SysCatalogReaderImpl, ) -> Result> { - let states = reader.meta_client.list_table_fragment_states().await?; + let states = reader.meta_client.list_streaming_job_states().await?; Ok(states .into_iter() diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index ecf37b7a4d710..02b9ac06836d9 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -34,7 +34,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; -use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; +use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus}; use risingwave_rpc_client::error::Result; @@ -64,7 +64,7 @@ pub trait FrontendMetaClient: Send + Sync { table_ids: &[u32], ) -> Result>; - async fn list_table_fragment_states(&self) -> Result>; + async fn list_streaming_job_states(&self) -> Result>; async fn list_fragment_distribution(&self) -> Result>; @@ -163,8 +163,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.list_table_fragments(table_ids).await } - async fn list_table_fragment_states(&self) -> Result> { - self.0.list_table_fragment_states().await + async fn list_streaming_job_states(&self) -> Result> { + self.0.list_streaming_job_states().await } async fn list_fragment_distribution(&self) -> Result> { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index ef4c2ee2207c4..2ec64cd91e8fd 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -59,7 +59,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; -use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; +use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{ EventLog, PbTableParallelism, PbThrottleTarget, RecoveryStatus, SystemParams, @@ -990,7 +990,7 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(HashMap::default()) } - async fn list_table_fragment_states(&self) -> RpcResult> { + async fn list_streaming_job_states(&self) -> RpcResult> { Ok(vec![]) } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 6ed247903ef81..648d7f12367da 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -243,10 +243,10 @@ impl StreamManagerService for StreamServiceImpl { } #[cfg_attr(coverage, coverage(off))] - async fn list_table_fragment_states( + async fn list_streaming_job_states( &self, - _request: Request, - ) -> Result, Status> { + _request: Request, + ) -> Result, Status> { let job_infos = self .metadata_manager .catalog_controller @@ -258,8 +258,10 @@ impl StreamManagerService for StreamServiceImpl { |StreamingJobInfo { job_id, job_status, + name, parallelism, max_parallelism, + resource_group, .. }| { let parallelism = match parallelism { @@ -268,17 +270,19 @@ impl StreamManagerService for StreamServiceImpl { StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _), }; - list_table_fragment_states_response::TableFragmentState { + list_streaming_job_states_response::StreamingJobState { table_id: job_id as _, + name, state: PbState::from(job_status) as _, parallelism: Some(parallelism.into()), max_parallelism: max_parallelism as _, + resource_group, } }, ) .collect_vec(); - Ok(Response::new(ListTableFragmentStatesResponse { states })) + Ok(Response::new(ListStreamingJobStatesResponse { states })) } #[cfg_attr(coverage, coverage(off))] diff --git a/src/meta/src/controller/catalog/get_op.rs b/src/meta/src/controller/catalog/get_op.rs index a925315a7bfc5..eab5438d765e5 100644 --- a/src/meta/src/controller/catalog/get_op.rs +++ b/src/meta/src/controller/catalog/get_op.rs @@ -300,12 +300,9 @@ impl CatalogController { streaming_job_ids: Vec, ) -> MetaResult> { let inner = self.inner.read().await; - let txn = inner.db.begin().await?; - let mut resource_groups = HashMap::new(); - for job_id in streaming_job_ids { - let resource_group = get_existing_job_resource_group(&txn, job_id).await?; + let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?; resource_groups.insert(job_id, resource_group); } @@ -317,14 +314,12 @@ impl CatalogController { streaming_job_id: ObjectId, ) -> MetaResult { let inner = self.inner.read().await; - let txn = inner.db.begin().await?; - let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id) .select_only() .join(JoinType::InnerJoin, streaming_job::Relation::Object.def()) .column(object::Column::DatabaseId) .into_tuple() - .one(&txn) + .one(&inner.db) .await? .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index d7d82c5a3088a..f2c95b0f59e3b 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -26,10 +26,10 @@ use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob}; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, object, sink, source, streaming_job, table, ActorId, - ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, - ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, - WorkerId, + actor, actor_dispatcher, database, fragment, object, sink, source, streaming_job, table, + ActorId, ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, + JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, + VnodeBitmap, WorkerId, }; use risingwave_meta_model_migration::{Alias, SelectStatement}; use risingwave_pb::common::PbActorLocation; @@ -90,6 +90,7 @@ pub struct StreamingJobInfo { pub job_status: JobStatus, pub parallelism: StreamingParallelism, pub max_parallelism: i32, + pub resource_group: String, } impl CatalogControllerInner { @@ -728,6 +729,7 @@ impl CatalogController { .select_only() .column(streaming_job::Column::JobId) .join(JoinType::InnerJoin, streaming_job::Relation::Object.def()) + .join(JoinType::InnerJoin, object::Relation::Database2.def()) .column(object::Column::ObjType) .join(JoinType::LeftJoin, table::Relation::Object1.def().rev()) .join(JoinType::LeftJoin, source::Relation::Object.def().rev()) @@ -750,6 +752,16 @@ impl CatalogController { streaming_job::Column::Parallelism, streaming_job::Column::MaxParallelism, ]) + .column_as( + Expr::if_null( + Expr::col(( + streaming_job::Entity, + streaming_job::Column::SpecificResourceGroup, + )), + Expr::col((database::Entity, database::Column::ResourceGroup)), + ), + "resource_group", + ) .into_model() .all(&inner.db) .await?; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index bde6074b6c70a..40c07e0acad8e 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -78,7 +78,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; -use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; +use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient; use risingwave_pb::meta::notification_service_client::NotificationServiceClient; @@ -997,10 +997,10 @@ impl MetaClient { Ok(resp.table_fragments) } - pub async fn list_table_fragment_states(&self) -> Result> { + pub async fn list_streaming_job_states(&self) -> Result> { let resp = self .inner - .list_table_fragment_states(ListTableFragmentStatesRequest {}) + .list_streaming_job_states(ListStreamingJobStatesRequest {}) .await?; Ok(resp.states) } @@ -2097,7 +2097,7 @@ macro_rules! for_all_meta_rpc { ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse } ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse } ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse } - ,{ stream_client, list_table_fragment_states, ListTableFragmentStatesRequest, ListTableFragmentStatesResponse } + ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse } ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse } ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse } ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse } diff --git a/src/tests/simulation/tests/integration_tests/scale/resource_group.rs b/src/tests/simulation/tests/integration_tests/scale/resource_group.rs index c2acf421a2e9f..43633aee46bbc 100644 --- a/src/tests/simulation/tests/integration_tests/scale/resource_group.rs +++ b/src/tests/simulation/tests/integration_tests/scale/resource_group.rs @@ -20,6 +20,7 @@ use risingwave_common::config::default; use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; +use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; #[tokio::test] @@ -107,5 +108,33 @@ async fn test_resource_group() -> Result<()> { assert_eq!(union_fragment.inner.actors.len(), 2); assert_eq!(mat_fragment.inner.actors.len(), 4); + session + .run("select resource_group from rw_streaming_jobs where name = 'm'") + .await? + .assert_result_eq("test"); + + let _ = session + .run("alter materialized view m reset resource_group;") + .await?; + + session + .run("select resource_group from rw_streaming_jobs where name = 'm'") + .await? + .assert_result_eq(DEFAULT_RESOURCE_GROUP); + + let union_fragment = cluster + .locate_one_fragment([identity_contains("union")]) + .await?; + + let mat_fragment = cluster + .locate_one_fragment([ + identity_contains("materialize"), + no_identity_contains("union"), + ]) + .await?; + + assert_eq!(union_fragment.inner.actors.len(), 2); + assert_eq!(mat_fragment.inner.actors.len(), 2); + Ok(()) } From 33bf9ba598f3c0b045a385972f7aa9db39cc5048 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 21 Jan 2025 18:37:55 +0800 Subject: [PATCH 5/6] fix(meta): fix receiving scheduled command on blocked database (#20241) --- src/meta/src/barrier/schedule.rs | 34 +++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 13653a382d4ea..299f34bf59aa1 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -64,6 +64,12 @@ enum QueueStatus { Blocked(String), } +impl QueueStatus { + fn is_blocked(&self) -> bool { + matches!(self, Self::Blocked(_)) + } +} + struct ScheduledQueueItem { command: Command, notifiers: Vec, @@ -94,8 +100,10 @@ impl StatusQueue { self.status = QueueStatus::Blocked(reason); } - fn mark_ready(&mut self) { + fn mark_ready(&mut self) -> bool { + let prev_blocked = self.status.is_blocked(); self.status = QueueStatus::Ready; + prev_blocked } fn validate_item(&mut self, scheduled: &ScheduledQueueItem) -> MetaResult<()> { @@ -395,7 +403,13 @@ impl ScheduledBarriers { let mut rx = self.inner.changed_tx.subscribe(); { let mut queue = self.inner.queue.lock(); + if queue.status.is_blocked() { + continue; + } for (database_id, queue) in &mut queue.queue { + if queue.status.is_blocked() { + continue; + } if let Some(item) = queue.queue.pop_front() { item.send_latency_timer.observe_duration(); break 'outer Scheduled { @@ -452,16 +466,26 @@ impl ScheduledBarriers { pub(super) fn mark_ready(&self, database_id: Option) { let mut queue = self.inner.queue.lock(); if let Some(database_id) = database_id { - queue + let database_queue = queue .queue .entry(database_id) - .or_insert_with(DatabaseScheduledQueue::new) - .mark_ready(); + .or_insert_with(DatabaseScheduledQueue::new); + if database_queue.mark_ready() && !database_queue.queue.is_empty() { + self.inner.changed_tx.send(()).ok(); + } } else { - queue.mark_ready(); + let prev_blocked = queue.mark_ready(); for queue in queue.queue.values_mut() { queue.mark_ready(); } + if prev_blocked + && queue + .queue + .values() + .any(|database_queue| !database_queue.queue.is_empty()) + { + self.inner.changed_tx.send(()).ok(); + } } } From c0bea545731feb2800989b81ccbccb1d421fb953 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 21 Jan 2025 18:39:45 +0800 Subject: [PATCH 6/6] fix: Add read guard on reschedule_lock for `drop_streaming_jobs` (#20240) Signed-off-by: Shanicky Chen --- src/meta/src/rpc/ddl_controller.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 976206477f8c9..5212a73ccd0c8 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1435,6 +1435,8 @@ impl DdlController { drop_mode: DropMode, target_replace_info: Option, ) -> MetaResult { + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; + let (object_id, object_type) = match job_id { StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table), StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),