From 509ec7a7b196ed1e4fe5290655cf93c1dd785b29 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 21 Jan 2025 17:10:29 +0800 Subject: [PATCH] feat: add `specific_resource_group` to `rw_streaming_jobs` (#20207) Signed-off-by: Shanicky Chen --- proto/meta.proto | 12 +++-- .../src/catalog/system_catalog/mod.rs | 4 +- .../catalog/system_catalog/rw_catalog/mod.rs | 1 + .../rw_catalog/rw_actor_infos.rs | 2 +- .../rw_catalog/rw_streaming_jobs.rs | 50 +++++++++++++++++++ .../rw_catalog/rw_table_fragments.rs | 2 +- src/frontend/src/meta_client.rs | 8 +-- src/frontend/src/test_utils.rs | 4 +- src/meta/service/src/stream_service.rs | 14 ++++-- src/meta/src/controller/catalog/get_op.rs | 9 +--- src/meta/src/controller/fragment.rs | 20 ++++++-- src/rpc_client/src/meta_client.rs | 8 +-- .../integration_tests/scale/resource_group.rs | 29 +++++++++++ 13 files changed, 128 insertions(+), 35 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_jobs.rs diff --git a/proto/meta.proto b/proto/meta.proto index 5aea8ccef8233..975798de90da9 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -232,16 +232,18 @@ message ListTableFragmentsResponse { map table_fragments = 1; } -message ListTableFragmentStatesRequest {} +message ListStreamingJobStatesRequest {} -message ListTableFragmentStatesResponse { - message TableFragmentState { +message ListStreamingJobStatesResponse { + message StreamingJobState { uint32 table_id = 1; TableFragments.State state = 2; TableParallelism parallelism = 3; uint32 max_parallelism = 4; + string name = 5; + string resource_group = 6; } - repeated TableFragmentState states = 1; + repeated StreamingJobState states = 1; } message ListFragmentDistributionRequest {} @@ -333,7 +335,7 @@ service StreamManagerService { rpc Resume(ResumeRequest) returns (ResumeResponse); rpc CancelCreatingJobs(CancelCreatingJobsRequest) returns (CancelCreatingJobsResponse); rpc ListTableFragments(ListTableFragmentsRequest) returns (ListTableFragmentsResponse); - rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse); + rpc ListStreamingJobStates(ListStreamingJobStatesRequest) returns (ListStreamingJobStatesResponse); rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse); rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse); rpc ListActorSplits(ListActorSplitsRequest) returns (ListActorSplitsResponse); diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 2e1d0d3618987..8e7f7ed1db221 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -32,7 +32,7 @@ use risingwave_common::error::BoxedError; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::types::DataType; -use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; +use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism}; use risingwave_pb::user::grant_privilege::Object; @@ -216,7 +216,7 @@ pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String { ) } -fn extract_parallelism_from_table_state(state: &TableFragmentState) -> String { +fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String { match state .parallelism .as_ref() diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index c3b11e69fcb6a..77345f508f045 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -62,4 +62,5 @@ mod rw_actor_id_to_ddl; mod rw_actor_splits; mod rw_fragment_id_to_ddl; mod rw_internal_table_info; +mod rw_streaming_jobs; mod rw_worker_actor_count; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs index 4e66f011a5c9b..a658539ead2b7 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs @@ -33,7 +33,7 @@ struct RwActorInfo { async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result> { let table_ids = reader .meta_client - .list_table_fragment_states() + .list_streaming_job_states() .await? .into_iter() .map(|fragment| fragment.table_id) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_jobs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_jobs.rs new file mode 100644 index 0000000000000..4557b296f371e --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_jobs.rs @@ -0,0 +1,50 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::Fields; +use risingwave_frontend_macro::system_catalog; + +use crate::catalog::system_catalog::{extract_parallelism_from_table_state, SysCatalogReaderImpl}; +use crate::error::Result; + +#[derive(Fields)] +struct RwStreamingJob { + #[primary_key] + job: i32, + name: String, + status: String, + parallelism: String, + max_parallelism: i32, + resource_group: String, +} + +#[system_catalog(table, "rw_catalog.rw_streaming_jobs")] +async fn read_rw_streaming_jobs(reader: &SysCatalogReaderImpl) -> Result> { + let states = reader.meta_client.list_streaming_job_states().await?; + + Ok(states + .into_iter() + .map(|state| { + let parallelism = extract_parallelism_from_table_state(&state); + RwStreamingJob { + job: state.table_id as i32, + status: state.state().as_str_name().into(), + name: state.name, + parallelism: parallelism.to_uppercase(), + max_parallelism: state.max_parallelism as i32, + resource_group: state.resource_group, + } + }) + .collect()) +} diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs index 7458afb1b43ae..f6da43879a755 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs @@ -31,7 +31,7 @@ struct RwTableFragment { async fn read_rw_table_fragments_info( reader: &SysCatalogReaderImpl, ) -> Result> { - let states = reader.meta_client.list_table_fragment_states().await?; + let states = reader.meta_client.list_streaming_job_states().await?; Ok(states .into_iter() diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index ecf37b7a4d710..02b9ac06836d9 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -34,7 +34,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; -use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; +use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus}; use risingwave_rpc_client::error::Result; @@ -64,7 +64,7 @@ pub trait FrontendMetaClient: Send + Sync { table_ids: &[u32], ) -> Result>; - async fn list_table_fragment_states(&self) -> Result>; + async fn list_streaming_job_states(&self) -> Result>; async fn list_fragment_distribution(&self) -> Result>; @@ -163,8 +163,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.list_table_fragments(table_ids).await } - async fn list_table_fragment_states(&self) -> Result> { - self.0.list_table_fragment_states().await + async fn list_streaming_job_states(&self) -> Result> { + self.0.list_streaming_job_states().await } async fn list_fragment_distribution(&self) -> Result> { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index ef4c2ee2207c4..2ec64cd91e8fd 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -59,7 +59,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; -use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; +use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{ EventLog, PbTableParallelism, PbThrottleTarget, RecoveryStatus, SystemParams, @@ -990,7 +990,7 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(HashMap::default()) } - async fn list_table_fragment_states(&self) -> RpcResult> { + async fn list_streaming_job_states(&self) -> RpcResult> { Ok(vec![]) } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 6ed247903ef81..648d7f12367da 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -243,10 +243,10 @@ impl StreamManagerService for StreamServiceImpl { } #[cfg_attr(coverage, coverage(off))] - async fn list_table_fragment_states( + async fn list_streaming_job_states( &self, - _request: Request, - ) -> Result, Status> { + _request: Request, + ) -> Result, Status> { let job_infos = self .metadata_manager .catalog_controller @@ -258,8 +258,10 @@ impl StreamManagerService for StreamServiceImpl { |StreamingJobInfo { job_id, job_status, + name, parallelism, max_parallelism, + resource_group, .. }| { let parallelism = match parallelism { @@ -268,17 +270,19 @@ impl StreamManagerService for StreamServiceImpl { StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _), }; - list_table_fragment_states_response::TableFragmentState { + list_streaming_job_states_response::StreamingJobState { table_id: job_id as _, + name, state: PbState::from(job_status) as _, parallelism: Some(parallelism.into()), max_parallelism: max_parallelism as _, + resource_group, } }, ) .collect_vec(); - Ok(Response::new(ListTableFragmentStatesResponse { states })) + Ok(Response::new(ListStreamingJobStatesResponse { states })) } #[cfg_attr(coverage, coverage(off))] diff --git a/src/meta/src/controller/catalog/get_op.rs b/src/meta/src/controller/catalog/get_op.rs index a925315a7bfc5..eab5438d765e5 100644 --- a/src/meta/src/controller/catalog/get_op.rs +++ b/src/meta/src/controller/catalog/get_op.rs @@ -300,12 +300,9 @@ impl CatalogController { streaming_job_ids: Vec, ) -> MetaResult> { let inner = self.inner.read().await; - let txn = inner.db.begin().await?; - let mut resource_groups = HashMap::new(); - for job_id in streaming_job_ids { - let resource_group = get_existing_job_resource_group(&txn, job_id).await?; + let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?; resource_groups.insert(job_id, resource_group); } @@ -317,14 +314,12 @@ impl CatalogController { streaming_job_id: ObjectId, ) -> MetaResult { let inner = self.inner.read().await; - let txn = inner.db.begin().await?; - let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id) .select_only() .join(JoinType::InnerJoin, streaming_job::Relation::Object.def()) .column(object::Column::DatabaseId) .into_tuple() - .one(&txn) + .one(&inner.db) .await? .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index d7d82c5a3088a..f2c95b0f59e3b 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -26,10 +26,10 @@ use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob}; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, object, sink, source, streaming_job, table, ActorId, - ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, - ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, - WorkerId, + actor, actor_dispatcher, database, fragment, object, sink, source, streaming_job, table, + ActorId, ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, + JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, + VnodeBitmap, WorkerId, }; use risingwave_meta_model_migration::{Alias, SelectStatement}; use risingwave_pb::common::PbActorLocation; @@ -90,6 +90,7 @@ pub struct StreamingJobInfo { pub job_status: JobStatus, pub parallelism: StreamingParallelism, pub max_parallelism: i32, + pub resource_group: String, } impl CatalogControllerInner { @@ -728,6 +729,7 @@ impl CatalogController { .select_only() .column(streaming_job::Column::JobId) .join(JoinType::InnerJoin, streaming_job::Relation::Object.def()) + .join(JoinType::InnerJoin, object::Relation::Database2.def()) .column(object::Column::ObjType) .join(JoinType::LeftJoin, table::Relation::Object1.def().rev()) .join(JoinType::LeftJoin, source::Relation::Object.def().rev()) @@ -750,6 +752,16 @@ impl CatalogController { streaming_job::Column::Parallelism, streaming_job::Column::MaxParallelism, ]) + .column_as( + Expr::if_null( + Expr::col(( + streaming_job::Entity, + streaming_job::Column::SpecificResourceGroup, + )), + Expr::col((database::Entity, database::Column::ResourceGroup)), + ), + "resource_group", + ) .into_model() .all(&inner.db) .await?; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index bde6074b6c70a..40c07e0acad8e 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -78,7 +78,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; -use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; +use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient; use risingwave_pb::meta::notification_service_client::NotificationServiceClient; @@ -997,10 +997,10 @@ impl MetaClient { Ok(resp.table_fragments) } - pub async fn list_table_fragment_states(&self) -> Result> { + pub async fn list_streaming_job_states(&self) -> Result> { let resp = self .inner - .list_table_fragment_states(ListTableFragmentStatesRequest {}) + .list_streaming_job_states(ListStreamingJobStatesRequest {}) .await?; Ok(resp.states) } @@ -2097,7 +2097,7 @@ macro_rules! for_all_meta_rpc { ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse } ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse } ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse } - ,{ stream_client, list_table_fragment_states, ListTableFragmentStatesRequest, ListTableFragmentStatesResponse } + ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse } ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse } ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse } ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse } diff --git a/src/tests/simulation/tests/integration_tests/scale/resource_group.rs b/src/tests/simulation/tests/integration_tests/scale/resource_group.rs index c2acf421a2e9f..43633aee46bbc 100644 --- a/src/tests/simulation/tests/integration_tests/scale/resource_group.rs +++ b/src/tests/simulation/tests/integration_tests/scale/resource_group.rs @@ -20,6 +20,7 @@ use risingwave_common::config::default; use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; +use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; #[tokio::test] @@ -107,5 +108,33 @@ async fn test_resource_group() -> Result<()> { assert_eq!(union_fragment.inner.actors.len(), 2); assert_eq!(mat_fragment.inner.actors.len(), 4); + session + .run("select resource_group from rw_streaming_jobs where name = 'm'") + .await? + .assert_result_eq("test"); + + let _ = session + .run("alter materialized view m reset resource_group;") + .await?; + + session + .run("select resource_group from rw_streaming_jobs where name = 'm'") + .await? + .assert_result_eq(DEFAULT_RESOURCE_GROUP); + + let union_fragment = cluster + .locate_one_fragment([identity_contains("union")]) + .await?; + + let mat_fragment = cluster + .locate_one_fragment([ + identity_contains("materialize"), + no_identity_contains("union"), + ]) + .await?; + + assert_eq!(union_fragment.inner.actors.len(), 2); + assert_eq!(mat_fragment.inner.actors.len(), 2); + Ok(()) }