Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support schedule/reschedule resource group #19955

Merged
merged 16 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ redundant_explicit_links = "allow"
lto = "off"
# use parallel frontend to speed up build
# TODO: may consider applying to release/production profile as well
rustflags = ["-Z", "threads=8"]
#rustflags = ["-Z", "threads=8"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be reverted once approved


[profile.release]
debug = "full"
Expand Down
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ message Database {
uint32 id = 1;
string name = 2;
uint32 owner = 3;
string resource_group = 4;
}

message Comment {
Expand Down
9 changes: 6 additions & 3 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ message WorkerNode {
bool is_unschedulable = 3;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 4;
// Meta may assign labels to worker nodes to partition workload by label.
// This is used for serverless backfilling of materialized views.
optional string node_label = 5;

reserved 5;
reserved "node_label";

uint32 parallelism = 6;

// resource group for scheduling
optional string resource_group = 7;
}
message Resource {
string rw_version = 1;
Expand Down
12 changes: 12 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ message CreateMaterializedViewRequest {

// The list of object IDs that this materialized view depends on.
repeated uint32 dependencies = 4;

// The specific resource group to use for the materialized view. If not set, the database resource group is used.
optional string specific_resource_group = 5;
}

message CreateMaterializedViewResponse {
Expand Down Expand Up @@ -270,6 +273,14 @@ message AlterParallelismRequest {

message AlterParallelismResponse {}

message AlterResourceGroupRequest {
uint32 table_id = 1;
optional string resource_group = 2;
bool deferred = 3;
}

message AlterResourceGroupResponse {}

message AlterOwnerResponse {
common.Status status = 1;
WaitVersion version = 2;
Expand Down Expand Up @@ -549,6 +560,7 @@ service DdlService {
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse);
rpc AlterParallelism(AlterParallelismRequest) returns (AlterParallelismResponse);
rpc AlterResourceGroup(AlterResourceGroupRequest) returns (AlterResourceGroupResponse);
rpc DropTable(DropTableRequest) returns (DropTableResponse);
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
rpc CreateView(CreateViewRequest) returns (CreateViewResponse);
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/worker_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

pub type WorkerNodeId = u32;

pub const DEFAULT_COMPUTE_NODE_LABEL: &str = "default";
pub const DEFAULT_RESOURCE_GROUP: &str = "default";
12 changes: 6 additions & 6 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common::util::worker_util::DEFAULT_COMPUTE_NODE_LABEL;
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
use serde::{Deserialize, Serialize};

/// If `total_memory_bytes` is not specified, the default memory limit will be set to
Expand Down Expand Up @@ -105,9 +105,9 @@ pub struct ComputeNodeOpts {
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
pub parallelism: usize,

/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long, env = "RW_NODE_LABEL", default_value_t = default_node_label())]
pub node_label: String,
/// Resource group for scheduling, default value is "default"
#[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
pub resource_group: String,

/// Decides whether the compute node can be used for streaming and serving.
#[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
Expand Down Expand Up @@ -254,8 +254,8 @@ pub fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}

pub fn default_node_label() -> String {
DEFAULT_COMPUTE_NODE_LABEL.to_owned()
pub fn default_resource_group() -> String {
DEFAULT_RESOURCE_GROUP.to_owned()
}

pub fn default_role() -> Role {
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub async fn compute_node_serve(
is_serving: opts.role.for_serving(),
is_unschedulable: false,
internal_rpc_host_addr: "".to_owned(),
node_label: Some(opts.node_label.clone()),
resource_group: Some(opts.resource_group.clone()),
},
&config.meta,
)
Expand Down
40 changes: 37 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ impl CatalogReader {
/// [observer](`crate::observer::FrontendObserverNode`).
#[async_trait::async_trait]
pub trait CatalogWriter: Send + Sync {
async fn create_database(&self, db_name: &str, owner: UserId) -> Result<()>;
async fn create_database(
&self,
db_name: &str,
owner: UserId,
resource_group: &str,
) -> Result<()>;

async fn create_schema(
&self,
Expand All @@ -81,6 +86,7 @@ pub trait CatalogWriter: Send + Sync {
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
specific_resource_group: Option<String>,
) -> Result<()>;

async fn create_table(
Expand Down Expand Up @@ -214,6 +220,13 @@ pub trait CatalogWriter: Send + Sync {
deferred: bool,
) -> Result<()>;

async fn alter_resource_group(
&self,
table_id: u32,
resource_group: Option<String>,
deferred: bool,
) -> Result<()>;

async fn alter_set_schema(
&self,
object: alter_set_schema_request::Object,
Expand All @@ -232,13 +245,19 @@ pub struct CatalogWriterImpl {

#[async_trait::async_trait]
impl CatalogWriter for CatalogWriterImpl {
async fn create_database(&self, db_name: &str, owner: UserId) -> Result<()> {
async fn create_database(
&self,
db_name: &str,
owner: UserId,
resource_group: &str,
) -> Result<()> {
let version = self
.meta_client
.create_database(PbDatabase {
name: db_name.to_owned(),
id: 0,
owner,
resource_group: resource_group.to_owned(),
})
.await?;
self.wait_version(version).await
Expand Down Expand Up @@ -268,11 +287,12 @@ impl CatalogWriter for CatalogWriterImpl {
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
specific_resource_group: Option<String>,
) -> Result<()> {
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
let version = self
.meta_client
.create_materialized_view(table, graph, dependencies)
.create_materialized_view(table, graph, dependencies, specific_resource_group)
.await?;
if matches!(create_type, PbCreateType::Foreground) {
self.wait_version(version).await?
Expand Down Expand Up @@ -579,6 +599,20 @@ impl CatalogWriter for CatalogWriterImpl {
.await?;
self.wait_version(version).await
}

async fn alter_resource_group(
&self,
table_id: u32,
resource_group: Option<String>,
deferred: bool,
) -> Result<()> {
self.meta_client
.alter_resource_group(table_id, resource_group, deferred)
.await
.map_err(|e| anyhow!(e))?;

Ok(())
}
}

impl CatalogWriterImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct RwWorkerNode {
system_total_memory_bytes: Option<i64>,
system_total_cpu_cores: Option<i64>,
started_at: Option<Timestamptz>,
label: Option<String>,
resource_group: Option<String>,
}

#[system_catalog(table, "rw_catalog.rw_worker_nodes")]
Expand Down Expand Up @@ -82,8 +82,8 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
started_at: worker
.started_at
.map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
label: if is_compute {
property.and_then(|p| p.node_label.clone())
resource_group: if is_compute {
property.and_then(|p| p.resource_group.clone())
} else {
None
},
Expand Down
99 changes: 99 additions & 0 deletions src/frontend/src/handler/alter_resource_group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 RisingWave Labs
shanicky marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::StatementType;
use risingwave_common::bail;
use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result};
use crate::Binder;

pub async fn handle_alter_resource_group(
handler_args: HandlerArgs,
obj_name: ObjectName,
resource_group: Option<SetVariableValue>,
stmt_type: StatementType,
deferred: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let table_id = {
let reader = session.env().catalog_reader().read_guard();

match stmt_type {
StatementType::ALTER_MATERIALIZED_VIEW => {
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;

match (table.table_type(), stmt_type) {
(TableType::MaterializedView, StatementType::ALTER_MATERIALIZED_VIEW) => {}
_ => {
return Err(ErrorCode::InvalidInputSyntax(format!(
"cannot alter resource group of {} {} by {}",
table.table_type().to_prost().as_str_name(),
table.name(),
stmt_type,
))
.into());
}
}

session.check_privilege_for_drop_alter(schema_name, &**table)?;
table.id.table_id()
}
_ => bail!(
"invalid statement type for alter resource group: {:?}",
stmt_type
),
}
};

let resource_group = match resource_group {
None => None,
Some(SetVariableValue::Single(SetVariableValueSingle::Ident(ident))) => {
Some(ident.real_value())
}
Some(SetVariableValue::Single(SetVariableValueSingle::Literal(
Value::SingleQuotedString(v),
))) => Some(v),
_ => {
return Err(ErrorCode::InvalidInputSyntax(
"target parallelism must be a valid number or adaptive".to_owned(),
)
.into());
}
};

let mut builder = RwPgResponse::builder(stmt_type);

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_resource_group(table_id, resource_group, deferred)
.await?;

if deferred {
builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
}

Ok(builder.into())
}
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
use risingwave_sqlparser::ast::ObjectName;

use super::RwPgResponse;
Expand Down Expand Up @@ -73,7 +74,8 @@ pub async fn handle_create_database(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_database(&database_name, database_owner)
// TODO: add support for create database with resource_group
.create_database(&database_name, database_owner, DEFAULT_RESOURCE_GROUP)
.await?;

Ok(PgResponse::empty_result(StatementType::CREATE_DATABASE))
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ It only indicates the physical clustering of the data, which may improve the per
let session = session.clone();
let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_materialized_view(table, graph, dependencies)
.create_materialized_view(table, graph, dependencies, None)
.await?;

Ok(PgResponse::empty_result(
Expand Down
19 changes: 19 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::utils::WithOptions;
mod alter_owner;
mod alter_parallelism;
mod alter_rename;
mod alter_resource_group;
mod alter_secret;
mod alter_set_schema;
mod alter_source_column;
Expand Down Expand Up @@ -822,6 +823,24 @@ pub async fn handle(
)
.await
}
Statement::AlterView {
materialized,
name,
operation:
AlterViewOperation::SetResourceGroup {
resource_group,
deferred,
},
} if materialized => {
alter_resource_group::handle_alter_resource_group(
handler_args,
name,
resource_group,
StatementType::ALTER_MATERIALIZED_VIEW,
deferred,
)
.await
}
Statement::AlterView {
materialized,
name,
Expand Down
Loading
Loading