Skip to content

Commit

Permalink
feat: add specific_resource_group to rw_streaming_jobs (#20207)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Jan 21, 2025
1 parent 1b74a95 commit 509ec7a
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 35 deletions.
12 changes: 7 additions & 5 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,18 @@ message ListTableFragmentsResponse {
map<uint32, TableFragmentInfo> table_fragments = 1;
}

message ListTableFragmentStatesRequest {}
message ListStreamingJobStatesRequest {}

message ListTableFragmentStatesResponse {
message TableFragmentState {
message ListStreamingJobStatesResponse {
message StreamingJobState {
uint32 table_id = 1;
TableFragments.State state = 2;
TableParallelism parallelism = 3;
uint32 max_parallelism = 4;
string name = 5;
string resource_group = 6;
}
repeated TableFragmentState states = 1;
repeated StreamingJobState states = 1;
}

message ListFragmentDistributionRequest {}
Expand Down Expand Up @@ -333,7 +335,7 @@ service StreamManagerService {
rpc Resume(ResumeRequest) returns (ResumeResponse);
rpc CancelCreatingJobs(CancelCreatingJobsRequest) returns (CancelCreatingJobsResponse);
rpc ListTableFragments(ListTableFragmentsRequest) returns (ListTableFragmentsResponse);
rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse);
rpc ListStreamingJobStates(ListStreamingJobStatesRequest) returns (ListStreamingJobStatesResponse);
rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse);
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
rpc ListActorSplits(ListActorSplitsRequest) returns (ListActorSplitsResponse);
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use risingwave_common::error::BoxedError;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::types::DataType;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
use risingwave_pb::user::grant_privilege::Object;

Expand Down Expand Up @@ -216,7 +216,7 @@ pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
)
}

fn extract_parallelism_from_table_state(state: &TableFragmentState) -> String {
fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
match state
.parallelism
.as_ref()
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ mod rw_actor_id_to_ddl;
mod rw_actor_splits;
mod rw_fragment_id_to_ddl;
mod rw_internal_table_info;
mod rw_streaming_jobs;
mod rw_worker_actor_count;
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct RwActorInfo {
async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorInfo>> {
let table_ids = reader
.meta_client
.list_table_fragment_states()
.list_streaming_job_states()
.await?
.into_iter()
.map(|fragment| fragment.table_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

use crate::catalog::system_catalog::{extract_parallelism_from_table_state, SysCatalogReaderImpl};
use crate::error::Result;

#[derive(Fields)]
struct RwStreamingJob {
#[primary_key]
job: i32,
name: String,
status: String,
parallelism: String,
max_parallelism: i32,
resource_group: String,
}

#[system_catalog(table, "rw_catalog.rw_streaming_jobs")]
async fn read_rw_streaming_jobs(reader: &SysCatalogReaderImpl) -> Result<Vec<RwStreamingJob>> {
let states = reader.meta_client.list_streaming_job_states().await?;

Ok(states
.into_iter()
.map(|state| {
let parallelism = extract_parallelism_from_table_state(&state);
RwStreamingJob {
job: state.table_id as i32,
status: state.state().as_str_name().into(),
name: state.name,
parallelism: parallelism.to_uppercase(),
max_parallelism: state.max_parallelism as i32,
resource_group: state.resource_group,
}
})
.collect())
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct RwTableFragment {
async fn read_rw_table_fragments_info(
reader: &SysCatalogReaderImpl,
) -> Result<Vec<RwTableFragment>> {
let states = reader.meta_client.list_table_fragment_states().await?;
let states = reader.meta_client.list_streaming_job_states().await?;

Ok(states
.into_iter()
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus};
use risingwave_rpc_client::error::Result;
Expand Down Expand Up @@ -64,7 +64,7 @@ pub trait FrontendMetaClient: Send + Sync {
table_ids: &[u32],
) -> Result<HashMap<u32, TableFragmentInfo>>;

async fn list_table_fragment_states(&self) -> Result<Vec<TableFragmentState>>;
async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;

async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;

Expand Down Expand Up @@ -163,8 +163,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.list_table_fragments(table_ids).await
}

async fn list_table_fragment_states(&self) -> Result<Vec<TableFragmentState>> {
self.0.list_table_fragment_states().await
async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
self.0.list_streaming_job_states().await
}

async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{
EventLog, PbTableParallelism, PbThrottleTarget, RecoveryStatus, SystemParams,
Expand Down Expand Up @@ -990,7 +990,7 @@ impl FrontendMetaClient for MockFrontendMetaClient {
Ok(HashMap::default())
}

async fn list_table_fragment_states(&self) -> RpcResult<Vec<TableFragmentState>> {
async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
Ok(vec![])
}

Expand Down
14 changes: 9 additions & 5 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ impl StreamManagerService for StreamServiceImpl {
}

#[cfg_attr(coverage, coverage(off))]
async fn list_table_fragment_states(
async fn list_streaming_job_states(
&self,
_request: Request<ListTableFragmentStatesRequest>,
) -> Result<Response<ListTableFragmentStatesResponse>, Status> {
_request: Request<ListStreamingJobStatesRequest>,
) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
let job_infos = self
.metadata_manager
.catalog_controller
Expand All @@ -258,8 +258,10 @@ impl StreamManagerService for StreamServiceImpl {
|StreamingJobInfo {
job_id,
job_status,
name,
parallelism,
max_parallelism,
resource_group,
..
}| {
let parallelism = match parallelism {
Expand All @@ -268,17 +270,19 @@ impl StreamManagerService for StreamServiceImpl {
StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
};

list_table_fragment_states_response::TableFragmentState {
list_streaming_job_states_response::StreamingJobState {
table_id: job_id as _,
name,
state: PbState::from(job_status) as _,
parallelism: Some(parallelism.into()),
max_parallelism: max_parallelism as _,
resource_group,
}
},
)
.collect_vec();

Ok(Response::new(ListTableFragmentStatesResponse { states }))
Ok(Response::new(ListStreamingJobStatesResponse { states }))
}

#[cfg_attr(coverage, coverage(off))]
Expand Down
9 changes: 2 additions & 7 deletions src/meta/src/controller/catalog/get_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,9 @@ impl CatalogController {
streaming_job_ids: Vec<ObjectId>,
) -> MetaResult<HashMap<ObjectId, String>> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;

let mut resource_groups = HashMap::new();

for job_id in streaming_job_ids {
let resource_group = get_existing_job_resource_group(&txn, job_id).await?;
let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
resource_groups.insert(job_id, resource_group);
}

Expand All @@ -317,14 +314,12 @@ impl CatalogController {
streaming_job_id: ObjectId,
) -> MetaResult<String> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;

let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
.select_only()
.join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
.column(object::Column::DatabaseId)
.into_tuple()
.one(&txn)
.one(&inner.db)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;

Expand Down
20 changes: 16 additions & 4 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use risingwave_meta_model::fragment::DistributionType;
use risingwave_meta_model::object::ObjectType;
use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob};
use risingwave_meta_model::{
actor, actor_dispatcher, fragment, object, sink, source, streaming_job, table, ActorId,
ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus,
ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap,
WorkerId,
actor, actor_dispatcher, database, fragment, object, sink, source, streaming_job, table,
ActorId, ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array,
JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
VnodeBitmap, WorkerId,
};
use risingwave_meta_model_migration::{Alias, SelectStatement};
use risingwave_pb::common::PbActorLocation;
Expand Down Expand Up @@ -90,6 +90,7 @@ pub struct StreamingJobInfo {
pub job_status: JobStatus,
pub parallelism: StreamingParallelism,
pub max_parallelism: i32,
pub resource_group: String,
}

impl CatalogControllerInner {
Expand Down Expand Up @@ -728,6 +729,7 @@ impl CatalogController {
.select_only()
.column(streaming_job::Column::JobId)
.join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
.join(JoinType::InnerJoin, object::Relation::Database2.def())
.column(object::Column::ObjType)
.join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
.join(JoinType::LeftJoin, source::Relation::Object.def().rev())
Expand All @@ -750,6 +752,16 @@ impl CatalogController {
streaming_job::Column::Parallelism,
streaming_job::Column::MaxParallelism,
])
.column_as(
Expr::if_null(
Expr::col((
streaming_job::Entity,
streaming_job::Column::SpecificResourceGroup,
)),
Expr::col((database::Entity, database::Column::ResourceGroup)),
),
"resource_group",
)
.into_model()
.all(&inner.db)
.await?;
Expand Down
8 changes: 4 additions & 4 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
Expand Down Expand Up @@ -997,10 +997,10 @@ impl MetaClient {
Ok(resp.table_fragments)
}

pub async fn list_table_fragment_states(&self) -> Result<Vec<TableFragmentState>> {
pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
let resp = self
.inner
.list_table_fragment_states(ListTableFragmentStatesRequest {})
.list_streaming_job_states(ListStreamingJobStatesRequest {})
.await?;
Ok(resp.states)
}
Expand Down Expand Up @@ -2097,7 +2097,7 @@ macro_rules! for_all_meta_rpc {
,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
,{ stream_client, list_table_fragment_states, ListTableFragmentStatesRequest, ListTableFragmentStatesResponse }
,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::config::default;
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
use risingwave_simulation::cluster::{Cluster, Configuration};
use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains};
use risingwave_simulation::utils::AssertResult;
use tokio::time::sleep;

#[tokio::test]
Expand Down Expand Up @@ -107,5 +108,33 @@ async fn test_resource_group() -> Result<()> {
assert_eq!(union_fragment.inner.actors.len(), 2);
assert_eq!(mat_fragment.inner.actors.len(), 4);

session
.run("select resource_group from rw_streaming_jobs where name = 'm'")
.await?
.assert_result_eq("test");

let _ = session
.run("alter materialized view m reset resource_group;")
.await?;

session
.run("select resource_group from rw_streaming_jobs where name = 'm'")
.await?
.assert_result_eq(DEFAULT_RESOURCE_GROUP);

let union_fragment = cluster
.locate_one_fragment([identity_contains("union")])
.await?;

let mat_fragment = cluster
.locate_one_fragment([
identity_contains("materialize"),
no_identity_contains("union"),
])
.await?;

assert_eq!(union_fragment.inner.actors.len(), 2);
assert_eq!(mat_fragment.inner.actors.len(), 2);

Ok(())
}

0 comments on commit 509ec7a

Please sign in to comment.