Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add offline scaling support for background ddl with arrangement backfill #20006

Merged
merged 12 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,16 @@ impl WorkerNodeManager {
fragment_id: FragmentId,
vnode_mapping: WorkerSlotMapping,
) {
self.inner
if self
.inner
.write()
.unwrap()
.streaming_fragment_vnode_mapping
.try_insert(fragment_id, vnode_mapping)
.unwrap();
.is_err()
{
tracing::info!("Previous batch vnode mapping not found for fragment {fragment_id}, maybe offline scaling with background ddl");
}
}

pub fn update_streaming_fragment_mapping(
Expand All @@ -206,10 +210,13 @@ impl WorkerNodeManager {
vnode_mapping: WorkerSlotMapping,
) {
let mut guard = self.inner.write().unwrap();
guard
if guard
.streaming_fragment_vnode_mapping
.insert(fragment_id, vnode_mapping)
.unwrap();
.is_none()
{
tracing::info!("Previous vnode mapping not found for fragment {fragment_id}, maybe offline scaling with background ddl");
}
}

pub fn remove_streaming_fragment_mapping(&self, fragment_id: &FragmentId) {
Expand Down
49 changes: 47 additions & 2 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl GlobalBarrierWorkerContextImpl {
}
background_jobs
};

tracing::info!("recovered mview progress");

// This is a quick path to accelerate the process of dropping and canceling streaming jobs.
Expand All @@ -188,12 +189,39 @@ impl GlobalBarrierWorkerContextImpl {
background_streaming_jobs.len()
);

let unreschedulable_jobs = {
let mut unreschedulable_jobs = HashSet::new();

for job_id in background_streaming_jobs {
let scan_types = self
.metadata_manager
.get_job_backfill_scan_types(&job_id)
.await?;

if scan_types
.values()
.any(|scan_type| !scan_type.is_reschedulable())
{
unreschedulable_jobs.insert(job_id);
}
}

unreschedulable_jobs
};

if !unreschedulable_jobs.is_empty() {
tracing::info!(
"unreschedulable background jobs: {:?}",
unreschedulable_jobs
);
}

// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
// FIXME: Transactions should be used.
// TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
let mut info = if !self.env.opts.disable_automatic_parallelism_control
&& background_streaming_jobs.is_empty()
&& unreschedulable_jobs.is_empty()
{
info!("trigger offline scaling");
self.scale_actors(&active_streaming_nodes)
Expand Down Expand Up @@ -264,6 +292,23 @@ impl GlobalBarrierWorkerContextImpl {
warn!(error = %err.as_report(), "update actors failed");
})?;

let background_jobs = {
let jobs = self
.list_background_mv_progress()
.await
.context("recover mview progress should not fail")?;
let mut background_jobs = HashMap::new();
for (definition, stream_job_fragments) in jobs {
background_jobs
.try_insert(
stream_job_fragments.stream_job_id(),
(definition, stream_job_fragments),
)
.expect("non-duplicate");
}
background_jobs
};

// get split assignments for all actors
let source_splits = self.source_manager.list_assignments().await;
Ok(BarrierWorkerRuntimeInfoSnapshot {
Expand Down Expand Up @@ -583,7 +628,7 @@ impl GlobalBarrierWorkerContextImpl {
let reschedule_targets: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_created_streaming_parallelisms()
.get_all_streaming_parallelisms()
.await?;

let mut result = HashMap::new();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/controller/catalog/get_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,12 @@ impl CatalogController {
.collect())
}

pub async fn get_all_created_streaming_parallelisms(
pub async fn get_all_streaming_parallelisms(
&self,
) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
let inner = self.inner.read().await;

let job_parallelisms = StreamingJob::find()
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.select_only()
.columns([
streaming_job::Column::JobId,
Expand Down
41 changes: 38 additions & 3 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use anyhow::Context;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
use risingwave_meta_model::actor::ActorStatus;
use risingwave_meta_model::fragment::DistributionType;
use risingwave_meta_model::object::ObjectType;
Expand All @@ -47,7 +47,8 @@ use risingwave_pb::meta::{
use risingwave_pb::source::PbConnectorSplits;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext,
DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, PbStreamScanType,
StreamScanType,
};
use sea_orm::sea_query::Expr;
use sea_orm::ActiveValue::Set;
Expand All @@ -66,7 +67,7 @@ use crate::controller::utils::{
use crate::manager::LocalNotification;
use crate::model::{StreamContext, StreamJobFragments, TableParallelism};
use crate::stream::{build_actor_split_impls, SplitAssignment};
use crate::{MetaError, MetaResult};
use crate::{model, MetaError, MetaResult};

#[derive(Clone, Debug)]
pub struct InflightFragmentInfo {
Expand Down Expand Up @@ -723,6 +724,40 @@ impl CatalogController {
)
}

pub async fn get_job_fragment_backfill_scan_type(
&self,
job_id: ObjectId,
) -> MetaResult<HashMap<model::FragmentId, PbStreamScanType>> {
let inner = self.inner.read().await;
let fragments: Vec<_> = Fragment::find()
.filter(fragment::Column::JobId.eq(job_id))
.all(&inner.db)
.await?;

let mut result = HashMap::new();

for fragment::Model {
fragment_id,
stream_node,
..
} in fragments
{
let stream_node = stream_node.to_protobuf();
visit_stream_node(&stream_node, |body| {
if let NodeBody::StreamScan(node) = body {
match node.stream_scan_type() {
StreamScanType::Unspecified => {}
scan_type => {
result.insert(fragment_id as model::FragmentId, scan_type);
}
}
}
});
}

Ok(result)
}

pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
let inner = self.inner.read().await;
let job_states = StreamingJob::find()
Expand Down
13 changes: 12 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty
use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::meta::table_fragments::{Fragment, PbFragment};
use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor};
use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType, StreamActor};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use tokio::time::{sleep, Instant};
Expand Down Expand Up @@ -753,6 +753,17 @@ impl MetadataManager {
let rate_limits = self.catalog_controller.list_rate_limits().await?;
Ok(rate_limits)
}

pub async fn get_job_backfill_scan_types(
&self,
job_id: &TableId,
) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
let backfill_types = self
.catalog_controller
.get_job_fragment_backfill_scan_type(job_id.table_id as _)
.await?;
Ok(backfill_types)
}
}

impl MetadataManager {
Expand Down
10 changes: 0 additions & 10 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,6 @@ impl DdlController {
deferred = true;
}

if !deferred
&& !self
.metadata_manager
.list_background_creating_jobs()
.await?
.is_empty()
{
bail!("The system is creating jobs in the background, please try again later")
}

self.stream_manager
.reschedule_streaming_job(table_id, target, deferred)
.await
Expand Down
105 changes: 77 additions & 28 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl CustomFragmentInfo {
}

use educe::Educe;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;

use super::SourceChange;
use crate::controller::id::IdCategory;
Expand Down Expand Up @@ -706,16 +707,47 @@ impl ScaleController {
.get(fragment_id)
.ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;

// Check if the reschedule is supported.
// Check if the rescheduling is supported.
match fragment_state[fragment_id] {
table_fragments::State::Unspecified => unreachable!(),
state @ table_fragments::State::Initial
| state @ table_fragments::State::Creating => {
state @ table_fragments::State::Initial => {
bail!(
"the materialized view of fragment {fragment_id} is in state {}",
state.as_str_name()
)
}
state @ table_fragments::State::Creating => {
let stream_node = fragment
.actor_template
.nodes
.as_ref()
.expect("empty nodes in fragment actor template");

let mut is_reschedulable = true;
visit_stream_node_cont(stream_node, |body| {
if let Some(NodeBody::StreamScan(node)) = &body.node_body {
if !node.stream_scan_type().is_reschedulable() {
is_reschedulable = false;

// fail fast
return false;
}

// continue visiting
return true;
}

// continue visiting
true
});

if !is_reschedulable {
bail!(
"the materialized view of fragment {fragment_id} is in state {}",
state.as_str_name()
)
}
}
table_fragments::State::Created => {}
}

Expand Down Expand Up @@ -2331,6 +2363,24 @@ impl ScaleController {

Ok(())
}

pub async fn resolve_related_no_shuffle_jobs(
&self,
jobs: &[TableId],
) -> MetaResult<HashSet<TableId>> {
let RescheduleWorkingSet { related_jobs, .. } = self
.metadata_manager
.catalog_controller
.resolve_working_set_for_reschedule_tables(
jobs.iter().map(|id| id.table_id as _).collect(),
)
.await?;

Ok(related_jobs
.keys()
.map(|id| TableId::new(*id as _))
.collect())
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -2452,45 +2502,44 @@ impl GlobalStreamManager {
/// - `Ok(false)` if no jobs can be scaled;
/// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
tracing::info!("trigger parallelism control");

let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

let background_streaming_jobs = self
.metadata_manager
.list_background_creating_jobs()
.await?;

if !background_streaming_jobs.is_empty() {
tracing::debug!(
"skipping parallelism control due to background jobs {:?}",
background_streaming_jobs
);
// skip if there are background creating jobs
return Ok(true);
}
let skipped_jobs = if !background_streaming_jobs.is_empty() {
let jobs = self
.scale_controller
.resolve_related_no_shuffle_jobs(&background_streaming_jobs)
.await?;

tracing::info!("trigger parallelism control");
tracing::info!(
"skipping parallelism control of background jobs {:?} and associated jobs {:?}",
background_streaming_jobs,
jobs
);

let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
jobs
} else {
HashSet::new()
};

let job_ids: HashSet<_> = {
let streaming_parallelisms = self
.metadata_manager
.catalog_controller
.get_all_created_streaming_parallelisms()
.get_all_streaming_parallelisms()
.await?;

// streaming_parallelisms
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
// .into_iter()
// .map(|(table_id, parallelism)| {
// let table_parallelism = match parallelism {
// StreamingParallelism::Adaptive => TableParallelism::Adaptive,
// StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
// StreamingParallelism::Custom => TableParallelism::Custom,
// };
//
// (table_id, table_parallelism)
// })
// .collect()

streaming_parallelisms.into_keys().collect()
streaming_parallelisms
.into_iter()
.filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
.map(|(table_id, _)| table_id)
.collect()
};

let workers = self
Expand Down
Loading
Loading