From c9e8984d0ebd359d3152a6a79d2b03a0be8f17d7 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 3 Jan 2025 02:45:25 +0800 Subject: [PATCH 01/10] Refactor recovery, enhance error logging, adjust parallelism control in streaming processes. --- src/meta/src/barrier/context/recovery.rs | 66 ++++++++++++------------ src/meta/src/rpc/ddl_controller.rs | 10 ---- src/meta/src/stream/scale.rs | 47 +++++++++++++---- src/meta/src/stream/stream_manager.rs | 18 +++++++ 4 files changed, 87 insertions(+), 54 deletions(-) diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index 8c5fcf5610f38..378a8442611b3 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -138,6 +138,38 @@ impl GlobalBarrierWorkerContextImpl { .await .context("clean dirty streaming jobs")?; + // This is a quick path to accelerate the process of dropping and canceling streaming jobs. + let _ = self.scheduled_barriers.pre_apply_drop_cancel(None); + + let mut active_streaming_nodes = + ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) + .await?; + + // Resolve actor info for recovery. If there's no actor to recover, most of the + // following steps will be no-op, while the compute nodes will still be reset. + // FIXME: Transactions should be used. + // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. + let mut info = if !self.env.opts.disable_automatic_parallelism_control { + info!("trigger offline scaling"); + self.scale_actors(&active_streaming_nodes) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "scale actors failed"); + })?; + + self.resolve_graph_info(None).await.inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? + } else { + info!("trigger actor migration"); + // Migrate actors in expired CN to newly joined one. + self.migrate_actors(&mut active_streaming_nodes) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "migrate actors failed"); + })? + }; + // Mview progress needs to be recovered. tracing::info!("recovering mview progress"); let background_jobs = { @@ -172,13 +204,6 @@ impl GlobalBarrierWorkerContextImpl { }; tracing::info!("recovered mview progress"); - // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self.scheduled_barriers.pre_apply_drop_cancel(None); - - let mut active_streaming_nodes = - ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) - .await?; - let background_streaming_jobs = background_jobs.keys().cloned().collect_vec(); info!( "background streaming jobs: {:?} total {}", @@ -186,33 +211,6 @@ impl GlobalBarrierWorkerContextImpl { background_streaming_jobs.len() ); - // Resolve actor info for recovery. If there's no actor to recover, most of the - // following steps will be no-op, while the compute nodes will still be reset. - // FIXME: Transactions should be used. - // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. - let mut info = if !self.env.opts.disable_automatic_parallelism_control - && background_streaming_jobs.is_empty() - { - info!("trigger offline scaling"); - self.scale_actors(&active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "scale actors failed"); - })?; - - self.resolve_graph_info(None).await.inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })? - } else { - info!("trigger actor migration"); - // Migrate actors in expired CN to newly joined one. - self.migrate_actors(&mut active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "migrate actors failed"); - })? - }; - if self.scheduled_barriers.pre_apply_drop_cancel(None) { info = self.resolve_graph_info(None).await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index f67051b4a2c07..1854cb4510dfa 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -401,16 +401,6 @@ impl DdlController { deferred = true; } - if !deferred - && !self - .metadata_manager - .list_background_creating_jobs() - .await? - .is_empty() - { - bail!("The system is creating jobs in the background, please try again later") - } - self.stream_manager .alter_table_parallelism(table_id, parallelism.into(), deferred) .await diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 3451d74a7cea6..2d3c142688c16 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2265,6 +2265,24 @@ impl ScaleController { Ok(()) } + + pub async fn resolve_related_no_shuffle_jobs( + &self, + jobs: &[TableId], + ) -> MetaResult> { + let RescheduleWorkingSet { related_jobs, .. } = self + .metadata_manager + .catalog_controller + .resolve_working_set_for_reschedule_tables( + jobs.iter().map(|id| id.table_id as _).collect(), + ) + .await?; + + Ok(related_jobs + .keys() + .map(|id| TableId::new(*id as _)) + .collect()) + } } /// At present, for table level scaling, we use the strategy `TableResizePolicy`. @@ -2357,23 +2375,31 @@ impl GlobalStreamManager { /// - `Ok(false)` if no jobs can be scaled; /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled. async fn trigger_parallelism_control(&self) -> MetaResult { + tracing::info!("trigger parallelism control"); + + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + let background_streaming_jobs = self .metadata_manager .list_background_creating_jobs() .await?; - if !background_streaming_jobs.is_empty() { - tracing::debug!( - "skipping parallelism control due to background jobs {:?}", - background_streaming_jobs - ); - // skip if there are background creating jobs - return Ok(true); - } + let skipped_jobs = if !background_streaming_jobs.is_empty() { + let jobs = self + .scale_controller + .resolve_related_no_shuffle_jobs(&background_streaming_jobs) + .await?; - tracing::info!("trigger parallelism control"); + tracing::info!( + "skipping parallelism control of background jobs {:?} and associated jobs {:?}", + background_streaming_jobs, + jobs + ); - let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + jobs + } else { + HashSet::new() + }; let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = self @@ -2384,6 +2410,7 @@ impl GlobalStreamManager { streaming_parallelisms .into_iter() + .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _))) .map(|(table_id, parallelism)| { let table_parallelism = match parallelism { StreamingParallelism::Adaptive => TableParallelism::Adaptive, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index a2898fc3fa07a..550747635b06f 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -596,6 +596,24 @@ impl GlobalStreamManager { ) -> MetaResult<()> { let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + let background_jobs = self + .metadata_manager + .list_background_creating_jobs() + .await?; + + if !background_jobs.is_empty() { + let related_jobs = self + .scale_controller + .resolve_related_no_shuffle_jobs(&background_jobs) + .await?; + + for job in background_jobs { + if related_jobs.contains(&job) { + bail!("Cannot alter the job {} because the related job {} is currently being created", table_id, job.table_id); + } + } + } + let database_id = DatabaseId::new( self.metadata_manager .catalog_controller From ff3eac1f60d4190246f019d50958af92c652a687 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 13 Jan 2025 18:55:10 +0800 Subject: [PATCH 02/10] Remove actor scaling, add job checks, introduce backfill scan type retrieval Signed-off-by: Shanicky Chen --- src/meta/src/barrier/context/recovery.rs | 94 +++++++++++++++--------- src/meta/src/controller/fragment.rs | 39 +++++++++- src/meta/src/manager/metadata.rs | 13 +++- src/prost/src/lib.rs | 14 ++++ 4 files changed, 124 insertions(+), 36 deletions(-) diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index 378a8442611b3..bf8b602a02ad2 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -138,38 +138,6 @@ impl GlobalBarrierWorkerContextImpl { .await .context("clean dirty streaming jobs")?; - // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self.scheduled_barriers.pre_apply_drop_cancel(None); - - let mut active_streaming_nodes = - ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) - .await?; - - // Resolve actor info for recovery. If there's no actor to recover, most of the - // following steps will be no-op, while the compute nodes will still be reset. - // FIXME: Transactions should be used. - // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. - let mut info = if !self.env.opts.disable_automatic_parallelism_control { - info!("trigger offline scaling"); - self.scale_actors(&active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "scale actors failed"); - })?; - - self.resolve_graph_info(None).await.inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })? - } else { - info!("trigger actor migration"); - // Migrate actors in expired CN to newly joined one. - self.migrate_actors(&mut active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "migrate actors failed"); - })? - }; - // Mview progress needs to be recovered. tracing::info!("recovering mview progress"); let background_jobs = { @@ -202,7 +170,6 @@ impl GlobalBarrierWorkerContextImpl { } background_jobs }; - tracing::info!("recovered mview progress"); let background_streaming_jobs = background_jobs.keys().cloned().collect_vec(); info!( @@ -211,6 +178,67 @@ impl GlobalBarrierWorkerContextImpl { background_streaming_jobs.len() ); + let unreschedulable_jobs = { + let mut unreschedulable_jobs = HashSet::new(); + + for job_id in background_streaming_jobs { + let scan_types = self + .metadata_manager + .get_job_backfill_scan_types(&job_id) + .await?; + + if scan_types + .values() + .any(|scan_type| !scan_type.is_reschedulable()) + { + unreschedulable_jobs.insert(job_id); + } + } + + unreschedulable_jobs + }; + + if !unreschedulable_jobs.is_empty() { + tracing::info!( + "unreschedulable background jobs: {:?}", + unreschedulable_jobs + ); + } + + // This is a quick path to accelerate the process of dropping and canceling streaming jobs. + let _ = self.scheduled_barriers.pre_apply_drop_cancel(None); + + let mut active_streaming_nodes = + ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) + .await?; + + // Resolve actor info for recovery. If there's no actor to recover, most of the + // following steps will be no-op, while the compute nodes will still be reset. + // FIXME: Transactions should be used. + // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. + let mut info = if !self.env.opts.disable_automatic_parallelism_control + && unreschedulable_jobs.is_empty() + { + info!("trigger offline scaling"); + self.scale_actors(&active_streaming_nodes) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "scale actors failed"); + })?; + + self.resolve_graph_info(None).await.inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? + } else { + info!("trigger actor migration"); + // Migrate actors in expired CN to newly joined one. + self.migrate_actors(&mut active_streaming_nodes) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "migrate actors failed"); + })? + }; + if self.scheduled_barriers.pre_apply_drop_cancel(None) { info = self.resolve_graph_info(None).await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 5b8a91fa44d66..9df3bf86359a0 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -47,7 +47,8 @@ use risingwave_pb::meta::{ use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, + DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, PbStreamScanType, + StreamScanType, }; use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; @@ -66,7 +67,7 @@ use crate::controller::utils::{ use crate::manager::LocalNotification; use crate::model::TableParallelism; use crate::stream::SplitAssignment; -use crate::{MetaError, MetaResult}; +use crate::{model, MetaError, MetaResult}; #[derive(Clone, Debug)] pub struct InflightFragmentInfo { @@ -721,6 +722,40 @@ impl CatalogController { ) } + pub async fn get_job_fragment_backfill_scan_type( + &self, + job_id: ObjectId, + ) -> MetaResult> { + let inner = self.inner.read().await; + let fragments: Vec<_> = Fragment::find() + .filter(fragment::Column::JobId.eq(job_id)) + .all(&inner.db) + .await?; + + let mut result = HashMap::new(); + + for fragment::Model { + fragment_id, + stream_node, + .. + } in fragments + { + let mut stream_node = stream_node.to_protobuf(); + visit_stream_node(&mut stream_node, |body| { + if let NodeBody::StreamScan(node) = body { + match node.stream_scan_type() { + StreamScanType::Unspecified => {} + scan_type => { + result.insert(fragment_id as model::FragmentId, scan_type); + } + } + } + }); + } + + Ok(result) + } + pub async fn list_streaming_job_infos(&self) -> MetaResult> { let inner = self.inner.read().await; let job_states = StreamingJob::find() diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index ca8d4515f96a8..65cca28ba6e93 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -26,7 +26,7 @@ use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; -use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor}; +use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType, StreamActor}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; use tokio::time::{sleep, Instant}; @@ -743,6 +743,17 @@ impl MetadataManager { let rate_limits = self.catalog_controller.list_rate_limits().await?; Ok(rate_limits) } + + pub async fn get_job_backfill_scan_types( + &self, + job_id: &TableId, + ) -> MetaResult> { + let backfill_types = self + .catalog_controller + .get_job_fragment_backfill_scan_type(job_id.table_id as _) + .await?; + Ok(backfill_types) + } } impl MetadataManager { diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index e7ba096de023b..4a0d3db634b03 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -32,6 +32,7 @@ use risingwave_error::tonic::ToTonicStatus; use thiserror::Error; use crate::common::WorkerType; +use crate::stream_plan::PbStreamScanType; #[rustfmt::skip] #[cfg_attr(madsim, path = "sim/catalog.rs")] @@ -399,6 +400,19 @@ impl catalog::StreamSourceInfo { } } +impl stream_plan::PbStreamScanType { + pub fn is_reschedulable(&self) -> bool { + match self { + // todo: should this be true? + PbStreamScanType::UpstreamOnly => false, + PbStreamScanType::ArrangementBackfill => true, + // todo: true when stable + PbStreamScanType::SnapshotBackfill => false, + _ => false, + } + } +} + impl catalog::Sink { // TODO: remove this placeholder // creating table sink does not have an id, so we need a placeholder From 868aa2680fd07a569a867903276470e195bd38b1 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 14 Jan 2025 23:35:49 +0800 Subject: [PATCH 03/10] Refactor `recovery.rs`: reorder ops, log mview progress, preempt drop/cancel, init streaming nodes. --- src/meta/src/barrier/context/recovery.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index bf8b602a02ad2..544cd6bdde824 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -171,6 +171,15 @@ impl GlobalBarrierWorkerContextImpl { background_jobs }; + tracing::info!("recovered mview progress"); + + // This is a quick path to accelerate the process of dropping and canceling streaming jobs. + let _ = self.scheduled_barriers.pre_apply_drop_cancel(None); + + let mut active_streaming_nodes = + ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) + .await?; + let background_streaming_jobs = background_jobs.keys().cloned().collect_vec(); info!( "background streaming jobs: {:?} total {}", @@ -205,13 +214,6 @@ impl GlobalBarrierWorkerContextImpl { ); } - // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self.scheduled_barriers.pre_apply_drop_cancel(None); - - let mut active_streaming_nodes = - ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) - .await?; - // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. // FIXME: Transactions should be used. From d2d27aa7a07de12dad610b7209a35b6f2cb2ff72 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 23 Jan 2025 14:36:05 +0800 Subject: [PATCH 04/10] Simplify by removing `streaming_parallelisms` conversion; clean up imports in `fragment.rs`. --- src/meta/src/controller/fragment.rs | 4 +--- src/meta/src/stream/scale.rs | 13 ++----------- src/meta/src/stream/stream_manager.rs | 1 - 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index a8af176e5d554..3174c8b29f4c6 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -65,11 +65,9 @@ use crate::controller::utils::{ FragmentDesc, PartialActorLocation, PartialFragmentStateTables, }; use crate::manager::LocalNotification; -use crate::model::TableParallelism; -use crate::{model, MetaError, MetaResult}; use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{build_actor_split_impls, SplitAssignment}; - +use crate::{model, MetaError, MetaResult}; #[derive(Clone, Debug)] pub struct InflightFragmentInfo { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 49f73890e79dd..d72e308426928 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2502,21 +2502,12 @@ impl GlobalStreamManager { .catalog_controller .get_all_created_streaming_parallelisms() .await?; + streaming_parallelisms .into_iter() .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _))) - .map(|(table_id, parallelism)| { - let table_parallelism = match parallelism { - StreamingParallelism::Adaptive => TableParallelism::Adaptive, - StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n), - StreamingParallelism::Custom => TableParallelism::Custom, - }; - - (table_id, table_parallelism) - }) + .map(|(table_id, _)| table_id) .collect() - - streaming_parallelisms.into_keys().collect() }; let workers = self diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 1459639165d32..9f935e594f318 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -630,7 +630,6 @@ impl GlobalStreamManager { resource_group: resource_group_change, } = target; - let database_id = DatabaseId::new( self.metadata_manager .catalog_controller From b11ddfd05126486b448959cf68877ca7f6a0c0f4 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 23 Jan 2025 20:06:32 +0800 Subject: [PATCH 05/10] Update logging, refactor method names, and align interfaces across multiple modules. --- .../src/worker_manager/worker_node_manager.rs | 6 +- src/meta/src/barrier/context/recovery.rs | 2 +- src/meta/src/controller/catalog/get_op.rs | 4 +- src/meta/src/controller/streaming_job.rs | 2 + src/meta/src/stream/scale.rs | 24 +++--- .../integration_tests/scale/background_ddl.rs | 81 +++++++++++++++++++ .../tests/integration_tests/scale/mod.rs | 1 + 7 files changed, 103 insertions(+), 17 deletions(-) create mode 100644 src/tests/simulation/tests/integration_tests/scale/background_ddl.rs diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index db63578a0edd5..040f5fa111c90 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -206,10 +206,12 @@ impl WorkerNodeManager { vnode_mapping: WorkerSlotMapping, ) { let mut guard = self.inner.write().unwrap(); - guard + if let None = guard .streaming_fragment_vnode_mapping .insert(fragment_id, vnode_mapping) - .unwrap(); + { + tracing::info!("Previous vnode mapping for fragment {fragment_id}, maybe offline scaling with background ddl"); + } } pub fn remove_streaming_fragment_mapping(&self, fragment_id: &FragmentId) { diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index f9aac27725a08..2bec189529b16 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -611,7 +611,7 @@ impl GlobalBarrierWorkerContextImpl { let reschedule_targets: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller - .get_all_created_streaming_parallelisms() + .get_all_streaming_parallelisms() .await?; let mut result = HashMap::new(); diff --git a/src/meta/src/controller/catalog/get_op.rs b/src/meta/src/controller/catalog/get_op.rs index eab5438d765e5..5ebe64a57a611 100644 --- a/src/meta/src/controller/catalog/get_op.rs +++ b/src/meta/src/controller/catalog/get_op.rs @@ -171,13 +171,13 @@ impl CatalogController { .collect()) } - pub async fn get_all_created_streaming_parallelisms( + pub async fn get_all_streaming_parallelisms( &self, ) -> MetaResult> { let inner = self.inner.read().await; let job_parallelisms = StreamingJob::find() - .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) + // .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .select_only() .columns([ streaming_job::Column::JobId, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 29d900663443f..042b1d6b71adc 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -910,6 +910,8 @@ impl CatalogController { let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; + println!("mapping {:#?}", fragment_mapping); + let replace_table_mapping_update = match replace_stream_job_info { Some(ReplaceStreamJobPlan { streaming_job, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d72e308426928..2bd0bb30bb51e 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -707,17 +707,17 @@ impl ScaleController { .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?; // Check if the reschedule is supported. - match fragment_state[fragment_id] { - table_fragments::State::Unspecified => unreachable!(), - state @ table_fragments::State::Initial - | state @ table_fragments::State::Creating => { - bail!( - "the materialized view of fragment {fragment_id} is in state {}", - state.as_str_name() - ) - } - table_fragments::State::Created => {} - } + // match fragment_state[fragment_id] { + // table_fragments::State::Unspecified => unreachable!(), + // state @ table_fragments::State::Initial + // | state @ table_fragments::State::Creating => { + // bail!( + // "the materialized view of fragment {fragment_id} is in state {}", + // state.as_str_name() + // ) + // } + // table_fragments::State::Created => {} + // } if no_shuffle_target_fragment_ids.contains(fragment_id) { bail!("rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"); @@ -2500,7 +2500,7 @@ impl GlobalStreamManager { let streaming_parallelisms = self .metadata_manager .catalog_controller - .get_all_created_streaming_parallelisms() + .get_all_streaming_parallelisms() .await?; streaming_parallelisms diff --git a/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs b/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs new file mode 100644 index 0000000000000..4c296bf8af2fa --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs @@ -0,0 +1,81 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use risingwave_simulation::cluster::{Cluster, Configuration}; +use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; +use tokio::time::sleep; + +#[tokio::test] +async fn test_background_arrangement_backfill_offline_scaling() -> Result<()> { + let config = Configuration::for_background_ddl(); + + let cores_per_node = config.compute_node_cores; + let node_count = config.compute_nodes; + + let mut cluster = Cluster::start(config).await?; + let mut session = cluster.start_session(); + + session + .run("SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;") + .await?; + session.run("create table t (v int);").await?; + + println!("0"); + session + .run("insert into t select * from generate_series(1, 10000);") + .await?; + + println!("01"); + session.run("SET BACKGROUND_DDL=true;").await?; + session.run("SET BACKFILL_RATE_LIMIT=1;").await?; + + session + .run("create materialized view m as select * from t;") + .await?; + + let mat_fragment = cluster + .locate_one_fragment([ + identity_contains("materialize"), + no_identity_contains("union"), + ]) + .await?; + + assert_eq!(mat_fragment.inner.actors.len(), cores_per_node * node_count); + + println!("1"); + + sleep(Duration::from_secs(10)).await; + + cluster.simple_kill_nodes(["compute-2", "compute-3"]).await; + + sleep(Duration::from_secs(100)).await; + + let mat_fragment = cluster + .locate_one_fragment([ + identity_contains("materialize"), + no_identity_contains("union"), + ]) + .await?; + + + + println!("2"); + + assert_eq!(mat_fragment.inner.actors.len(), cores_per_node); + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/scale/mod.rs b/src/tests/simulation/tests/integration_tests/scale/mod.rs index aa79401de47b7..38f1792c30b08 100644 --- a/src/tests/simulation/tests/integration_tests/scale/mod.rs +++ b/src/tests/simulation/tests/integration_tests/scale/mod.rs @@ -26,3 +26,4 @@ mod singleton_migration; mod sink; mod streaming_parallelism; mod table; +mod background_ddl; From 71819a03584ee45dab665f00da5a76366fe66672 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 23 Jan 2025 20:18:21 +0800 Subject: [PATCH 06/10] Refactor: clean logs & remove debug prints in Worker Node, Streaming Controller, Integration Tests --- src/batch/src/worker_manager/worker_node_manager.rs | 2 +- src/meta/src/controller/streaming_job.rs | 2 -- .../tests/integration_tests/scale/background_ddl.rs | 11 +---------- .../simulation/tests/integration_tests/scale/mod.rs | 2 +- 4 files changed, 3 insertions(+), 14 deletions(-) diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index 040f5fa111c90..19ab5e73a793b 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -210,7 +210,7 @@ impl WorkerNodeManager { .streaming_fragment_vnode_mapping .insert(fragment_id, vnode_mapping) { - tracing::info!("Previous vnode mapping for fragment {fragment_id}, maybe offline scaling with background ddl"); + tracing::info!("Previous vnode mapping not found for fragment {fragment_id}, maybe offline scaling with background ddl"); } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 042b1d6b71adc..29d900663443f 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -910,8 +910,6 @@ impl CatalogController { let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; - println!("mapping {:#?}", fragment_mapping); - let replace_table_mapping_update = match replace_stream_job_info { Some(ReplaceStreamJobPlan { streaming_job, diff --git a/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs b/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs index 4c296bf8af2fa..9f6dac64e6653 100644 --- a/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use anyhow::{anyhow, Result}; +use anyhow::Result; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; use tokio::time::sleep; @@ -33,13 +33,10 @@ async fn test_background_arrangement_backfill_offline_scaling() -> Result<()> { .run("SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;") .await?; session.run("create table t (v int);").await?; - - println!("0"); session .run("insert into t select * from generate_series(1, 10000);") .await?; - println!("01"); session.run("SET BACKGROUND_DDL=true;").await?; session.run("SET BACKFILL_RATE_LIMIT=1;").await?; @@ -56,8 +53,6 @@ async fn test_background_arrangement_backfill_offline_scaling() -> Result<()> { assert_eq!(mat_fragment.inner.actors.len(), cores_per_node * node_count); - println!("1"); - sleep(Duration::from_secs(10)).await; cluster.simple_kill_nodes(["compute-2", "compute-3"]).await; @@ -71,10 +66,6 @@ async fn test_background_arrangement_backfill_offline_scaling() -> Result<()> { ]) .await?; - - - println!("2"); - assert_eq!(mat_fragment.inner.actors.len(), cores_per_node); Ok(()) diff --git a/src/tests/simulation/tests/integration_tests/scale/mod.rs b/src/tests/simulation/tests/integration_tests/scale/mod.rs index 38f1792c30b08..1a0c5d92c1658 100644 --- a/src/tests/simulation/tests/integration_tests/scale/mod.rs +++ b/src/tests/simulation/tests/integration_tests/scale/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod auto_parallelism; +mod background_ddl; mod cascade_materialized_view; mod dynamic_filter; mod nexmark_chaos; @@ -26,4 +27,3 @@ mod singleton_migration; mod sink; mod streaming_parallelism; mod table; -mod background_ddl; From 7fe388f2ab2eb51ae9c7b53675b9129e39c477cf Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 24 Jan 2025 18:42:30 +0800 Subject: [PATCH 07/10] Enhance vnode mapping logic & add MV progress tracking in `worker_node_manager.rs` & `recovery.rs`. --- .../src/worker_manager/worker_node_manager.rs | 11 ++++++++--- src/meta/src/barrier/context/recovery.rs | 17 +++++++++++++++++ .../integration_tests/scale/background_ddl.rs | 8 +++++++- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index 19ab5e73a793b..531ebc5f03d3c 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -192,12 +192,16 @@ impl WorkerNodeManager { fragment_id: FragmentId, vnode_mapping: WorkerSlotMapping, ) { - self.inner + if self + .inner .write() .unwrap() .streaming_fragment_vnode_mapping .try_insert(fragment_id, vnode_mapping) - .unwrap(); + .is_err() + { + tracing::info!("Previous batch vnode mapping not found for fragment {fragment_id}, maybe offline scaling with background ddl"); + } } pub fn update_streaming_fragment_mapping( @@ -206,9 +210,10 @@ impl WorkerNodeManager { vnode_mapping: WorkerSlotMapping, ) { let mut guard = self.inner.write().unwrap(); - if let None = guard + if guard .streaming_fragment_vnode_mapping .insert(fragment_id, vnode_mapping) + .is_none() { tracing::info!("Previous vnode mapping not found for fragment {fragment_id}, maybe offline scaling with background ddl"); } diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index 2bec189529b16..b2172c490436e 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -292,6 +292,23 @@ impl GlobalBarrierWorkerContextImpl { warn!(error = %err.as_report(), "update actors failed"); })?; + let background_jobs = { + let jobs = self + .list_background_mv_progress() + .await + .context("recover mview progress should not fail")?; + let mut background_jobs = HashMap::new(); + for (definition, stream_job_fragments) in jobs { + background_jobs + .try_insert( + stream_job_fragments.stream_job_id(), + (definition, stream_job_fragments), + ) + .expect("non-duplicate"); + } + background_jobs + }; + // get split assignments for all actors let source_splits = self.source_manager.list_assignments().await; Ok(BarrierWorkerRuntimeInfoSnapshot { diff --git a/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs b/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs index 9f6dac64e6653..e50eea435aa4e 100644 --- a/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/scale/background_ddl.rs @@ -17,6 +17,7 @@ use std::time::Duration; use anyhow::Result; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; +use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; #[tokio::test] @@ -34,7 +35,7 @@ async fn test_background_arrangement_backfill_offline_scaling() -> Result<()> { .await?; session.run("create table t (v int);").await?; session - .run("insert into t select * from generate_series(1, 10000);") + .run("insert into t select * from generate_series(1, 1000);") .await?; session.run("SET BACKGROUND_DDL=true;").await?; @@ -68,5 +69,10 @@ async fn test_background_arrangement_backfill_offline_scaling() -> Result<()> { assert_eq!(mat_fragment.inner.actors.len(), cores_per_node); + sleep(Duration::from_secs(2000)).await; + + // job is finished + session.run("show jobs;").await?.assert_result_eq(""); + Ok(()) } From 284af92149ec7177b264485b473be02d48902f97 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 24 Jan 2025 19:09:41 +0800 Subject: [PATCH 08/10] Enhance `ScaleController` rescheduling with `ArrangementBackfill` node check --- src/meta/src/stream/scale.rs | 54 ++++++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 2bd0bb30bb51e..32133c58f1eaf 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -42,6 +42,7 @@ use risingwave_pb::meta::FragmentWorkerSlotMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamNode, + StreamScanType, }; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -139,6 +140,7 @@ impl CustomFragmentInfo { } use educe::Educe; +use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use super::SourceChange; use crate::controller::id::IdCategory; @@ -706,18 +708,46 @@ impl ScaleController { .get(fragment_id) .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?; - // Check if the reschedule is supported. - // match fragment_state[fragment_id] { - // table_fragments::State::Unspecified => unreachable!(), - // state @ table_fragments::State::Initial - // | state @ table_fragments::State::Creating => { - // bail!( - // "the materialized view of fragment {fragment_id} is in state {}", - // state.as_str_name() - // ) - // } - // table_fragments::State::Created => {} - // } + // Check if the rescheduling is supported. + match fragment_state[fragment_id] { + table_fragments::State::Unspecified => unreachable!(), + state @ table_fragments::State::Initial => { + bail!( + "the materialized view of fragment {fragment_id} is in state {}", + state.as_str_name() + ) + } + state @ table_fragments::State::Creating => { + let stream_node = fragment + .actor_template + .nodes + .as_ref() + .expect("empty nodes in fragment actor template"); + + let mut is_reschedulable = true; + visit_stream_node_cont(stream_node, |body| { + if let Some(NodeBody::StreamScan(node)) = &body.node_body { + return match node.stream_scan_type() { + // Question: Is it possible to have multiple stream scans in one fragment? + StreamScanType::ArrangementBackfill => true, + _ => { + is_reschedulable = false; + false + } + }; + } + true + }); + + if !is_reschedulable { + bail!( + "the materialized view of fragment {fragment_id} is in state {}", + state.as_str_name() + ) + } + } + table_fragments::State::Created => {} + } if no_shuffle_target_fragment_ids.contains(fragment_id) { bail!("rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"); From 14a699860c85f0aec5cc656d77259f5010dac4af Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 24 Jan 2025 19:19:40 +0800 Subject: [PATCH 09/10] Clean up `get_op.rs`; refactor `scale.rs` by removing `StreamScanType` enum. --- src/meta/src/controller/catalog/get_op.rs | 1 - src/meta/src/stream/scale.rs | 20 +++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/meta/src/controller/catalog/get_op.rs b/src/meta/src/controller/catalog/get_op.rs index 5ebe64a57a611..13b7993e39be6 100644 --- a/src/meta/src/controller/catalog/get_op.rs +++ b/src/meta/src/controller/catalog/get_op.rs @@ -177,7 +177,6 @@ impl CatalogController { let inner = self.inner.read().await; let job_parallelisms = StreamingJob::find() - // .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .select_only() .columns([ streaming_job::Column::JobId, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 32133c58f1eaf..210414aff646f 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -42,7 +42,6 @@ use risingwave_pb::meta::FragmentWorkerSlotMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamNode, - StreamScanType, }; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -727,15 +726,18 @@ impl ScaleController { let mut is_reschedulable = true; visit_stream_node_cont(stream_node, |body| { if let Some(NodeBody::StreamScan(node)) = &body.node_body { - return match node.stream_scan_type() { - // Question: Is it possible to have multiple stream scans in one fragment? - StreamScanType::ArrangementBackfill => true, - _ => { - is_reschedulable = false; - false - } - }; + if !node.stream_scan_type().is_reschedulable() { + is_reschedulable = false; + + // fail fast + return false; + } + + // continue visiting + return true; } + + // continue visiting true }); From c139904e10271a7da1a6939104268a5f7086e0ec Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 27 Jan 2025 16:10:21 +0800 Subject: [PATCH 10/10] Refactor: Use immutable ref for `stream_node` in CatalogController; add `visit_stream_node` import. --- src/meta/src/controller/fragment.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 04b468dc3e5e5..3ac4c3b0dda7d 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -20,7 +20,7 @@ use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; -use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut; +use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut}; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; @@ -742,8 +742,8 @@ impl CatalogController { .. } in fragments { - let mut stream_node = stream_node.to_protobuf(); - visit_stream_node(&mut stream_node, |body| { + let stream_node = stream_node.to_protobuf(); + visit_stream_node(&stream_node, |body| { if let NodeBody::StreamScan(node) = body { match node.stream_scan_type() { StreamScanType::Unspecified => {}