diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index db63578a0edd5..531ebc5f03d3c 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -192,12 +192,16 @@ impl WorkerNodeManager { fragment_id: FragmentId, vnode_mapping: WorkerSlotMapping, ) { - self.inner + if self + .inner .write() .unwrap() .streaming_fragment_vnode_mapping .try_insert(fragment_id, vnode_mapping) - .unwrap(); + .is_err() + { + tracing::info!("Previous batch vnode mapping not found for fragment {fragment_id}, maybe offline scaling with background ddl"); + } } pub fn update_streaming_fragment_mapping( @@ -206,10 +210,13 @@ impl WorkerNodeManager { vnode_mapping: WorkerSlotMapping, ) { let mut guard = self.inner.write().unwrap(); - guard + if guard .streaming_fragment_vnode_mapping .insert(fragment_id, vnode_mapping) - .unwrap(); + .is_none() + { + tracing::info!("Previous vnode mapping not found for fragment {fragment_id}, maybe offline scaling with background ddl"); + } } pub fn remove_streaming_fragment_mapping(&self, fragment_id: &FragmentId) { diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index c13eaabf90822..b2172c490436e 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -172,6 +172,7 @@ impl GlobalBarrierWorkerContextImpl { } background_jobs }; + tracing::info!("recovered mview progress"); // This is a quick path to accelerate the process of dropping and canceling streaming jobs. @@ -188,12 +189,39 @@ impl GlobalBarrierWorkerContextImpl { background_streaming_jobs.len() ); + let unreschedulable_jobs = { + let mut unreschedulable_jobs = HashSet::new(); + + for job_id in background_streaming_jobs { + let scan_types = self + .metadata_manager + .get_job_backfill_scan_types(&job_id) + .await?; + + if scan_types + .values() + .any(|scan_type| !scan_type.is_reschedulable()) + { + unreschedulable_jobs.insert(job_id); + } + } + + unreschedulable_jobs + }; + + if !unreschedulable_jobs.is_empty() { + tracing::info!( + "unreschedulable background jobs: {:?}", + unreschedulable_jobs + ); + } + // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. // FIXME: Transactions should be used. // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. let mut info = if !self.env.opts.disable_automatic_parallelism_control - && background_streaming_jobs.is_empty() + && unreschedulable_jobs.is_empty() { info!("trigger offline scaling"); self.scale_actors(&active_streaming_nodes) @@ -264,6 +292,23 @@ impl GlobalBarrierWorkerContextImpl { warn!(error = %err.as_report(), "update actors failed"); })?; + let background_jobs = { + let jobs = self + .list_background_mv_progress() + .await + .context("recover mview progress should not fail")?; + let mut background_jobs = HashMap::new(); + for (definition, stream_job_fragments) in jobs { + background_jobs + .try_insert( + stream_job_fragments.stream_job_id(), + (definition, stream_job_fragments), + ) + .expect("non-duplicate"); + } + background_jobs + }; + // get split assignments for all actors let source_splits = self.source_manager.list_assignments().await; Ok(BarrierWorkerRuntimeInfoSnapshot { @@ -583,7 +628,7 @@ impl GlobalBarrierWorkerContextImpl { let reschedule_targets: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller - .get_all_created_streaming_parallelisms() + .get_all_streaming_parallelisms() .await?; let mut result = HashMap::new(); diff --git a/src/meta/src/controller/catalog/get_op.rs b/src/meta/src/controller/catalog/get_op.rs index eab5438d765e5..13b7993e39be6 100644 --- a/src/meta/src/controller/catalog/get_op.rs +++ b/src/meta/src/controller/catalog/get_op.rs @@ -171,13 +171,12 @@ impl CatalogController { .collect()) } - pub async fn get_all_created_streaming_parallelisms( + pub async fn get_all_streaming_parallelisms( &self, ) -> MetaResult> { let inner = self.inner.read().await; let job_parallelisms = StreamingJob::find() - .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .select_only() .columns([ streaming_job::Column::JobId, diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 4a13155c547aa..3ac4c3b0dda7d 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -20,7 +20,7 @@ use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; -use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut; +use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut}; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; @@ -47,7 +47,8 @@ use risingwave_pb::meta::{ use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, + DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, PbStreamScanType, + StreamScanType, }; use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; @@ -66,7 +67,7 @@ use crate::controller::utils::{ use crate::manager::LocalNotification; use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{build_actor_split_impls, SplitAssignment}; -use crate::{MetaError, MetaResult}; +use crate::{model, MetaError, MetaResult}; #[derive(Clone, Debug)] pub struct InflightFragmentInfo { @@ -723,6 +724,40 @@ impl CatalogController { ) } + pub async fn get_job_fragment_backfill_scan_type( + &self, + job_id: ObjectId, + ) -> MetaResult> { + let inner = self.inner.read().await; + let fragments: Vec<_> = Fragment::find() + .filter(fragment::Column::JobId.eq(job_id)) + .all(&inner.db) + .await?; + + let mut result = HashMap::new(); + + for fragment::Model { + fragment_id, + stream_node, + .. + } in fragments + { + let stream_node = stream_node.to_protobuf(); + visit_stream_node(&stream_node, |body| { + if let NodeBody::StreamScan(node) = body { + match node.stream_scan_type() { + StreamScanType::Unspecified => {} + scan_type => { + result.insert(fragment_id as model::FragmentId, scan_type); + } + } + } + }); + } + + Ok(result) + } + pub async fn list_streaming_job_infos(&self) -> MetaResult> { let inner = self.inner.read().await; let job_states = StreamingJob::find() diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index fe99a59a550ac..fbf41a78db752 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -26,7 +26,7 @@ use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; -use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor}; +use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType, StreamActor}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; use tokio::time::{sleep, Instant}; @@ -753,6 +753,17 @@ impl MetadataManager { let rate_limits = self.catalog_controller.list_rate_limits().await?; Ok(rate_limits) } + + pub async fn get_job_backfill_scan_types( + &self, + job_id: &TableId, + ) -> MetaResult> { + let backfill_types = self + .catalog_controller + .get_job_fragment_backfill_scan_type(job_id.table_id as _) + .await?; + Ok(backfill_types) + } } impl MetadataManager { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 362e888a90fac..3d67a37519e97 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -403,16 +403,6 @@ impl DdlController { deferred = true; } - if !deferred - && !self - .metadata_manager - .list_background_creating_jobs() - .await? - .is_empty() - { - bail!("The system is creating jobs in the background, please try again later") - } - self.stream_manager .reschedule_streaming_job(table_id, target, deferred) .await diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index bfbd226a36a96..210414aff646f 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -139,6 +139,7 @@ impl CustomFragmentInfo { } use educe::Educe; +use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use super::SourceChange; use crate::controller::id::IdCategory; @@ -706,16 +707,47 @@ impl ScaleController { .get(fragment_id) .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?; - // Check if the reschedule is supported. + // Check if the rescheduling is supported. match fragment_state[fragment_id] { table_fragments::State::Unspecified => unreachable!(), - state @ table_fragments::State::Initial - | state @ table_fragments::State::Creating => { + state @ table_fragments::State::Initial => { bail!( "the materialized view of fragment {fragment_id} is in state {}", state.as_str_name() ) } + state @ table_fragments::State::Creating => { + let stream_node = fragment + .actor_template + .nodes + .as_ref() + .expect("empty nodes in fragment actor template"); + + let mut is_reschedulable = true; + visit_stream_node_cont(stream_node, |body| { + if let Some(NodeBody::StreamScan(node)) = &body.node_body { + if !node.stream_scan_type().is_reschedulable() { + is_reschedulable = false; + + // fail fast + return false; + } + + // continue visiting + return true; + } + + // continue visiting + true + }); + + if !is_reschedulable { + bail!( + "the materialized view of fragment {fragment_id} is in state {}", + state.as_str_name() + ) + } + } table_fragments::State::Created => {} } @@ -2331,6 +2363,24 @@ impl ScaleController { Ok(()) } + + pub async fn resolve_related_no_shuffle_jobs( + &self, + jobs: &[TableId], + ) -> MetaResult> { + let RescheduleWorkingSet { related_jobs, .. } = self + .metadata_manager + .catalog_controller + .resolve_working_set_for_reschedule_tables( + jobs.iter().map(|id| id.table_id as _).collect(), + ) + .await?; + + Ok(related_jobs + .keys() + .map(|id| TableId::new(*id as _)) + .collect()) + } } #[derive(Debug)] @@ -2452,45 +2502,44 @@ impl GlobalStreamManager { /// - `Ok(false)` if no jobs can be scaled; /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled. async fn trigger_parallelism_control(&self) -> MetaResult { + tracing::info!("trigger parallelism control"); + + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + let background_streaming_jobs = self .metadata_manager .list_background_creating_jobs() .await?; - if !background_streaming_jobs.is_empty() { - tracing::debug!( - "skipping parallelism control due to background jobs {:?}", - background_streaming_jobs - ); - // skip if there are background creating jobs - return Ok(true); - } + let skipped_jobs = if !background_streaming_jobs.is_empty() { + let jobs = self + .scale_controller + .resolve_related_no_shuffle_jobs(&background_streaming_jobs) + .await?; - tracing::info!("trigger parallelism control"); + tracing::info!( + "skipping parallelism control of background jobs {:?} and associated jobs {:?}", + background_streaming_jobs, + jobs + ); - let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + jobs + } else { + HashSet::new() + }; let job_ids: HashSet<_> = { let streaming_parallelisms = self .metadata_manager .catalog_controller - .get_all_created_streaming_parallelisms() + .get_all_streaming_parallelisms() .await?; - // streaming_parallelisms - // .into_iter() - // .map(|(table_id, parallelism)| { - // let table_parallelism = match parallelism { - // StreamingParallelism::Adaptive => TableParallelism::Adaptive, - // StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n), - // StreamingParallelism::Custom => TableParallelism::Custom, - // }; - // - // (table_id, table_parallelism) - // }) - // .collect() - - streaming_parallelisms.into_keys().collect() + streaming_parallelisms + .into_iter() + .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _))) + .map(|(table_id, _)| table_id) + .collect() }; let workers = self diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index e20165fb50078..9f935e594f318 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -607,6 +607,23 @@ impl GlobalStreamManager { deferred: bool, ) -> MetaResult<()> { let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + let background_jobs = self + .metadata_manager + .list_background_creating_jobs() + .await?; + + if !background_jobs.is_empty() { + let related_jobs = self + .scale_controller + .resolve_related_no_shuffle_jobs(&background_jobs) + .await?; + + for job in background_jobs { + if related_jobs.contains(&job) { + bail!("Cannot alter the job {} because the related job {} is currently being created", table_id, job.table_id); + } + } + } let JobRescheduleTarget { parallelism: parallelism_change, diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index b0e64dfad00a9..9eadd0d125928 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -32,6 +32,7 @@ use risingwave_error::tonic::ToTonicStatus; use thiserror::Error; use crate::common::WorkerType; +use crate::stream_plan::PbStreamScanType; #[rustfmt::skip] #[cfg_attr(madsim, path = "sim/catalog.rs")] @@ -405,6 +406,19 @@ impl catalog::StreamSourceInfo { } } +impl stream_plan::PbStreamScanType { + pub fn is_reschedulable(&self) -> bool { + match self { + // todo: should this be true? + PbStreamScanType::UpstreamOnly => false, + PbStreamScanType::ArrangementBackfill => true, + // todo: true when stable + PbStreamScanType::SnapshotBackfill => false, + _ => false, + } + } +} + impl catalog::Sink { // TODO: remove this placeholder // creating table sink does not have an id, so we need a placeholder diff --git a/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs b/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs new file mode 100644 index 0000000000000..e50eea435aa4e --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs @@ -0,0 +1,78 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use anyhow::Result; +use risingwave_simulation::cluster::{Cluster, Configuration}; +use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; +use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; + +#[tokio::test] +async fn test_background_arrangement_backfill_offline_scaling() -> Result<()> { + let config = Configuration::for_background_ddl(); + + let cores_per_node = config.compute_node_cores; + let node_count = config.compute_nodes; + + let mut cluster = Cluster::start(config).await?; + let mut session = cluster.start_session(); + + session + .run("SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;") + .await?; + session.run("create table t (v int);").await?; + session + .run("insert into t select * from generate_series(1, 1000);") + .await?; + + session.run("SET BACKGROUND_DDL=true;").await?; + session.run("SET BACKFILL_RATE_LIMIT=1;").await?; + + session + .run("create materialized view m as select * from t;") + .await?; + + let mat_fragment = cluster + .locate_one_fragment([ + identity_contains("materialize"), + no_identity_contains("union"), + ]) + .await?; + + assert_eq!(mat_fragment.inner.actors.len(), cores_per_node * node_count); + + sleep(Duration::from_secs(10)).await; + + cluster.simple_kill_nodes(["compute-2", "compute-3"]).await; + + sleep(Duration::from_secs(100)).await; + + let mat_fragment = cluster + .locate_one_fragment([ + identity_contains("materialize"), + no_identity_contains("union"), + ]) + .await?; + + assert_eq!(mat_fragment.inner.actors.len(), cores_per_node); + + sleep(Duration::from_secs(2000)).await; + + // job is finished + session.run("show jobs;").await?.assert_result_eq(""); + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/scale/mod.rs b/src/tests/simulation/tests/integration_tests/scale/mod.rs index aa79401de47b7..1a0c5d92c1658 100644 --- a/src/tests/simulation/tests/integration_tests/scale/mod.rs +++ b/src/tests/simulation/tests/integration_tests/scale/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod auto_parallelism; +mod background_ddl; mod cascade_materialized_view; mod dynamic_filter; mod nexmark_chaos;