diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 3963e3d03e4c3..91197c34fbb8d 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -550,7 +550,7 @@ message MergeNode { // // `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used. // See `compose_fragment`. - repeated uint32 upstream_actor_id = 1; + repeated uint32 upstream_actor_id = 1 [deprecated = true]; uint32 upstream_fragment_id = 2; // Type of the upstream dispatcher. If there's always one upstream according to this // type, the compute node may use the `ReceiverExecutor` as an optimization. @@ -972,7 +972,7 @@ message StreamActor { // Note that upstream actor ids are also stored in the proto of merge nodes. // It is painstaking to traverse through the node tree and get upstream actor id from the root StreamNode. // We duplicate the information here to ease the parsing logic in stream manager. - repeated uint32 upstream_actor_id = 6; + repeated uint32 upstream_actor_id = 6 [deprecated = true]; // Vnodes that the executors in this actor own. // If the fragment is a singleton, this field will not be set and leave a `None`. common.Buffer vnode_bitmap = 8; diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 7a5f9a3711f6a..949e0be84e23e 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -17,8 +17,17 @@ message InjectBarrierRequest { repeated uint32 table_ids_to_sync = 5; uint32 partial_graph_id = 6; + message BuildActorInfo { + message UpstreamActors { + repeated uint32 actors = 1; + } + + stream_plan.StreamActor actor = 1; + map upstreams = 2; + } + repeated common.ActorInfo broadcast_info = 8; - repeated stream_plan.StreamActor actors_to_build = 9; + repeated BuildActorInfo actors_to_build = 9; repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10; repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11; } diff --git a/src/ctl/src/cmd_impl/meta/cluster_info.rs b/src/ctl/src/cmd_impl/meta/cluster_info.rs index 9b6780da35f01..ac58b94cf4aba 100644 --- a/src/ctl/src/cmd_impl/meta/cluster_info.rs +++ b/src/ctl/src/cmd_impl/meta/cluster_info.rs @@ -115,7 +115,7 @@ pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow: for actor in &fragment.actors { if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) { println!( - "\t\tActor{} ({} splits): [{}]{}", + "\t\tActor{} ({} splits): [{}]", if ignore_id { "".to_owned() } else { @@ -123,24 +123,6 @@ pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow: }, split_count, splits, - if !actor.upstream_actor_id.is_empty() { - let upstream_splits = actor - .upstream_actor_id - .iter() - .find_map(|id| actor_splits_map.get(id)) - .expect("should have one upstream source actor"); - format!( - " <- Upstream Actor{}: [{}]", - if ignore_id { - "".to_owned() - } else { - format!(" #{}", actor.upstream_actor_id[0]) - }, - upstream_splits.1 - ) - } else { - "".to_owned() - } ); } else { println!( diff --git a/src/meta/src/barrier/checkpoint/creating_job/status.rs b/src/meta/src/barrier/checkpoint/creating_job/status.rs index 6c33f0a00abfd..18a74aee570cf 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/status.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/status.rs @@ -21,7 +21,6 @@ use risingwave_common::util::epoch::Epoch; use risingwave_meta_model::WorkerId; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; -use risingwave_pb::stream_plan::StreamActor; use risingwave_pb::stream_service::barrier_complete_response::{ CreateMviewProgress, PbCreateMviewProgress, }; @@ -29,6 +28,7 @@ use tracing::warn; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch}; +use crate::model::StreamActorWithUpstreams; #[derive(Debug)] pub(super) struct CreateMviewLogStoreProgressTracker { @@ -110,7 +110,7 @@ pub(super) enum CreatingStreamingJobStatus { pending_non_checkpoint_barriers: Vec, /// Info of the first barrier: (`actors_to_create`, `mutation`) /// Take the mutation out when injecting the first barrier - initial_barrier_info: Option<(HashMap>, Mutation)>, + initial_barrier_info: Option<(HashMap>, Mutation)>, }, /// The creating job is consuming log store. /// @@ -126,7 +126,7 @@ pub(super) enum CreatingStreamingJobStatus { pub(super) struct CreatingJobInjectBarrierInfo { pub barrier_info: BarrierInfo, - pub new_actors: Option>>, + pub new_actors: Option>>, pub mutation: Option, } @@ -252,7 +252,10 @@ impl CreatingStreamingJobStatus { pub(super) fn new_fake_barrier( prev_epoch_fake_physical_time: &mut u64, pending_non_checkpoint_barriers: &mut Vec, - initial_barrier_info: &mut Option<(HashMap>, Mutation)>, + initial_barrier_info: &mut Option<( + HashMap>, + Mutation, + )>, is_checkpoint: bool, ) -> CreatingJobInjectBarrierInfo { { diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index f1abe6e65dd02..86e6b9101734a 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -37,7 +37,7 @@ use risingwave_pb::stream_plan::update_mutation::*; use risingwave_pb::stream_plan::{ AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation, - StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation, + StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation, }; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::warn; @@ -49,7 +49,9 @@ use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::{StreamingJob, StreamingJobType}; -use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments}; +use crate::model::{ + ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobFragments, +}; use crate::stream::{ build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig, }; @@ -83,7 +85,7 @@ pub struct Reschedule { /// `Source` and `SourceBackfill` are handled together here. pub actor_splits: HashMap>, - pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>, + pub newly_created_actors: Vec<(StreamActorWithUpstreams, PbActorStatus)>, } /// Replacing an old job with a new one. All actors in the job will be rebuilt. @@ -952,7 +954,7 @@ impl Command { mutation } - pub fn actors_to_create(&self) -> Option>> { + pub fn actors_to_create(&self) -> Option>> { match self { Command::CreateStreamingJob { info, job_type } => { let mut map = match job_type { diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index c13eaabf90822..ad0bd7c64955b 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -23,7 +23,6 @@ use risingwave_common::catalog::{DatabaseId, TableId}; use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::WorkerSlotId; use risingwave_meta_model::StreamingParallelism; -use risingwave_pb::stream_plan::StreamActor; use thiserror_ext::AsReport; use tokio::time::Instant; use tracing::{debug, info, warn}; @@ -34,7 +33,7 @@ use crate::barrier::info::InflightDatabaseInfo; use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo}; use crate::controller::fragment::InflightFragmentInfo; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{ActorId, StreamJobFragments, TableParallelism}; +use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments, TableParallelism}; use crate::stream::{ JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget, RescheduleOptions, SourceChange, @@ -724,7 +723,7 @@ impl GlobalBarrierWorkerContextImpl { } /// Update all actors in compute nodes. - async fn load_all_actors(&self) -> MetaResult> { + async fn load_all_actors(&self) -> MetaResult> { self.metadata_manager.all_active_actors().await } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index ac05a5fa9c12e..da024591c229b 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -20,13 +20,12 @@ use risingwave_connector::source::SplitImpl; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PbRecoveryStatus; -use risingwave_pb::stream_plan::StreamActor; use tokio::sync::oneshot::Sender; use self::notifier::Notifier; use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo}; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{ActorId, StreamJobFragments}; +use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments}; use crate::{MetaError, MetaResult}; mod checkpoint; @@ -104,7 +103,7 @@ struct BarrierWorkerRuntimeInfoSnapshot { database_fragment_infos: HashMap, state_table_committed_epochs: HashMap, subscription_infos: HashMap, - stream_actors: HashMap, + stream_actors: HashMap, source_splits: HashMap>, background_jobs: HashMap, hummock_version_stats: HummockVersionStats, @@ -115,7 +114,7 @@ impl BarrierWorkerRuntimeInfoSnapshot { database_id: DatabaseId, database_info: &InflightDatabaseInfo, active_streaming_nodes: &ActiveStreamingWorkerNodes, - stream_actors: &HashMap, + stream_actors: &HashMap, state_table_committed_epochs: &HashMap, ) -> MetaResult<()> { { @@ -190,7 +189,7 @@ struct DatabaseRuntimeInfoSnapshot { database_fragment_info: InflightDatabaseInfo, state_table_committed_epochs: HashMap, subscription_info: InflightSubscriptionInfo, - stream_actors: HashMap, + stream_actors: HashMap, source_splits: HashMap>, background_jobs: HashMap, } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 3240e2c1b5b97..94a9ffecc922f 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -32,9 +32,9 @@ use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier_mutation::Mutation; -use risingwave_pb::stream_plan::{ - AddMutation, Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo, -}; +use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo}; +use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors; +use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo; use risingwave_pb::stream_service::streaming_control_stream_request::{ CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph, RemovePartialGraphRequest, ResetDatabaseRequest, @@ -57,7 +57,7 @@ use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::controller::fragment::InflightFragmentInfo; use crate::manager::MetaSrvEnv; -use crate::model::{ActorId, StreamJobFragments}; +use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments}; use crate::stream::build_actor_connector_splits; use crate::{MetaError, MetaResult}; @@ -320,7 +320,7 @@ impl ControlStreamManager { database_id: DatabaseId, info: InflightDatabaseInfo, state_table_committed_epochs: &mut HashMap, - stream_actors: &mut HashMap, + stream_actors: &mut HashMap, source_splits: &mut HashMap>, background_jobs: &mut HashMap, subscription_info: InflightSubscriptionInfo, @@ -455,7 +455,7 @@ impl ControlStreamManager { barrier_info: &BarrierInfo, pre_applied_graph_info: impl IntoIterator, applied_graph_info: impl IntoIterator + 'a, - mut new_actors: Option>>, + mut new_actors: Option>>, subscriptions_to_add: Vec, subscriptions_to_remove: Vec, ) -> MetaResult> { @@ -484,7 +484,7 @@ impl ControlStreamManager { .flatten() .flat_map(|(worker_id, actor_infos)| { actor_infos.iter().map(|actor_info| ActorInfo { - actor_id: actor_info.actor_id, + actor_id: actor_info.0.actor_id, host: self .nodes .get(worker_id) @@ -541,6 +541,22 @@ impl ControlStreamManager { .into_iter() .flatten() .flatten() + .map(|(actor, upstreams)| BuildActorInfo { + actor: Some(actor), + upstreams: upstreams + .into_iter() + .map(|(fragment_id, upstreams)| { + ( + fragment_id, + UpstreamActors { + actors: upstreams + .into_iter() + .collect(), + }, + ) + }) + .collect(), + }) .collect(), subscriptions_to_add: subscriptions_to_add.clone(), subscriptions_to_remove: subscriptions_to_remove.clone(), diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 4a13155c547aa..5e032474a31db 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -14,13 +14,13 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; -use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut; +use risingwave_connector::source::SplitImpl; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; @@ -41,10 +41,8 @@ use risingwave_pb::meta::table_fragments::fragment::{ FragmentDistributionType, PbFragmentDistributionType, }; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, PbState}; -use risingwave_pb::meta::{ - FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbTableFragments, -}; -use risingwave_pb::source::PbConnectorSplits; +use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping}; +use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, @@ -64,7 +62,10 @@ use crate::controller::utils::{ FragmentDesc, PartialActorLocation, PartialFragmentStateTables, }; use crate::manager::LocalNotification; -use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; +use crate::model::{ + ActorUpstreams, FragmentActorUpstreams, StreamActorWithUpstreams, StreamContext, + StreamJobFragments, TableParallelism, +}; use crate::stream::{build_actor_split_impls, SplitAssignment}; use crate::{MetaError, MetaResult}; @@ -166,13 +167,7 @@ impl CatalogController { #[allow(clippy::type_complexity)] pub fn extract_fragment_and_actors_from_fragments( - PbTableFragments { - table_id, - fragments, - actor_status, - actor_splits, - .. - }: PbTableFragments, + stream_job_fragments: &StreamJobFragments, ) -> MetaResult< Vec<( fragment::Model, @@ -182,14 +177,15 @@ impl CatalogController { > { let mut result = vec![]; - let fragments: BTreeMap<_, _> = fragments.into_iter().collect(); - - for (_, fragment) in fragments { + for fragment in stream_job_fragments.fragments.values() { let (fragment, actors, dispatchers) = Self::extract_fragment_and_actors( - table_id as _, + stream_job_fragments.stream_job_id.table_id as _, fragment, - &actor_status, - &actor_splits, + stream_job_fragments + .actor_upstreams + .get(&fragment.fragment_id), + &stream_job_fragments.actor_status, + &stream_job_fragments.actor_splits, )?; result.push((fragment, actors, dispatchers)); @@ -198,18 +194,19 @@ impl CatalogController { Ok(result) } - #[allow(clippy::type_complexity)] + #[expect(clippy::type_complexity)] pub fn extract_fragment_and_actors( job_id: ObjectId, - pb_fragment: PbFragment, - pb_actor_status: &HashMap, - pb_actor_splits: &HashMap, + fragment: &PbFragment, + fragment_actor_upstreams: Option<&FragmentActorUpstreams>, + actor_status: &BTreeMap, + actor_splits: &HashMap>, ) -> MetaResult<( fragment::Model, Vec, HashMap>, )> { - let vnode_count = pb_fragment.vnode_count(); + let vnode_count = fragment.vnode_count(); let PbFragment { fragment_id: pb_fragment_id, fragment_type_mask: pb_fragment_type_mask, @@ -218,9 +215,9 @@ impl CatalogController { state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, .. - } = pb_fragment; + } = fragment; - let state_table_ids = pb_state_table_ids.into(); + let state_table_ids = pb_state_table_ids.clone().into(); assert!(!pb_actors.is_empty()); @@ -228,6 +225,7 @@ impl CatalogController { let actor_template = pb_actors.first().cloned().unwrap(); let mut stream_node = actor_template.nodes.unwrap(); visit_stream_node_mut(&mut stream_node, |body| { + #[expect(deprecated)] if let NodeBody::Merge(m) = body { m.upstream_actor_id = vec![]; } @@ -239,37 +237,39 @@ impl CatalogController { let mut actors = vec![]; let mut actor_dispatchers = HashMap::new(); - for mut actor in pb_actors { + for actor in pb_actors { let mut upstream_actors = BTreeMap::new(); - let node = actor.nodes.as_mut().context("nodes are empty")?; - - visit_stream_node_mut(node, |body| { - if let NodeBody::Merge(m) = body { - let mut upstream_actor_ids = vec![]; - swap(&mut m.upstream_actor_id, &mut upstream_actor_ids); - assert!( - upstream_actors - .insert(m.upstream_fragment_id, upstream_actor_ids) - .is_none(), - "There should only be one link between two fragments" - ); + if let Some(actor_upstreams) = fragment_actor_upstreams + .and_then(|actor_upstreams| actor_upstreams.get(&actor.actor_id)) + { + for (upstream_fragment_id, actor_upstream_actors) in actor_upstreams { + upstream_actors + .try_insert( + *upstream_fragment_id, + actor_upstream_actors.iter().cloned().collect_vec(), + ) + .expect("There should only be one link between two fragments"); } - }); + } let PbStreamActor { actor_id, fragment_id, nodes: _, dispatcher: pb_dispatcher, - upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, mview_definition: _, expr_context: pb_expr_context, + .. } = actor; - let splits = pb_actor_splits.get(&actor_id).map(ConnectorSplits::from); - let status = pb_actor_status.get(&actor_id).cloned().ok_or_else(|| { + let splits = actor_splits.get(actor_id).map(|splits| { + ConnectorSplits::from(&PbConnectorSplits { + splits: splits.iter().map(ConnectorSplit::from).collect(), + }) + }); + let status = actor_status.get(actor_id).cloned().ok_or_else(|| { anyhow::anyhow!( "actor {} in fragment {} has no actor_status", actor_id, @@ -279,50 +279,41 @@ impl CatalogController { let worker_id = status.worker_id() as _; - assert_eq!( - pb_upstream_actor_id - .iter() - .cloned() - .collect::>(), - upstream_actors - .values() - .flatten() - .cloned() - .collect::>() - ); - let pb_expr_context = pb_expr_context.expect("no expression context found"); + let pb_expr_context = pb_expr_context + .as_ref() + .expect("no expression context found"); actors.push(actor::Model { - actor_id: actor_id as _, - fragment_id: fragment_id as _, + actor_id: *actor_id as _, + fragment_id: *fragment_id as _, status: status.get_state().unwrap().into(), splits, worker_id, upstream_actor_ids: upstream_actors.into(), vnode_bitmap: pb_vnode_bitmap.as_ref().map(VnodeBitmap::from), - expr_context: ExprContext::from(&pb_expr_context), + expr_context: ExprContext::from(pb_expr_context), }); actor_dispatchers.insert( - actor_id as ActorId, + *actor_id as _, pb_dispatcher - .into_iter() - .map(|dispatcher| (actor_id, dispatcher).into()) + .iter() + .map(|dispatcher| (*actor_id, dispatcher.clone()).into()) .collect(), ); } - let upstream_fragment_id = pb_upstream_fragment_ids.into(); + let upstream_fragment_id = pb_upstream_fragment_ids.clone().into(); let stream_node = StreamNode::from(&stream_node); - let distribution_type = PbFragmentDistributionType::try_from(pb_distribution_type) + let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type) .unwrap() .into(); let fragment = fragment::Model { - fragment_id: pb_fragment_id as _, + fragment_id: *pb_fragment_id as _, job_id, - fragment_type_mask: pb_fragment_type_mask as _, + fragment_type_mask: *pb_fragment_type_mask as _, distribution_type, stream_node, state_table_ids, @@ -349,11 +340,15 @@ impl CatalogController { let mut pb_fragments = BTreeMap::new(); let mut pb_actor_splits = HashMap::new(); let mut pb_actor_status = BTreeMap::new(); + let mut fragment_actor_upstreams = BTreeMap::new(); for (fragment, actors, actor_dispatcher) in fragments { - let (fragment, fragment_actor_status, fragment_actor_splits) = + let (fragment, actor_upstreams, fragment_actor_status, fragment_actor_splits) = Self::compose_fragment(fragment, actors, actor_dispatcher)?; + fragment_actor_upstreams + .try_insert(fragment.fragment_id, actor_upstreams) + .expect("non-duplicate"); pb_fragments.insert(fragment.fragment_id, fragment); pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits)); @@ -364,6 +359,7 @@ impl CatalogController { stream_job_id: table_id.into(), state: state as _, fragments: pb_fragments, + actor_upstreams: fragment_actor_upstreams, actor_status: pb_actor_status, actor_splits: pb_actor_splits, ctx: ctx @@ -388,8 +384,9 @@ impl CatalogController { mut actor_dispatcher: HashMap>, ) -> MetaResult<( PbFragment, - HashMap, - HashMap, + FragmentActorUpstreams, + HashMap, + HashMap, )> { let fragment::Model { fragment_id, @@ -406,6 +403,7 @@ impl CatalogController { let mut pb_actors = vec![]; + let mut actor_upstreams: HashMap<_, ActorUpstreams> = HashMap::new(); let mut pb_actor_status = HashMap::new(); let mut pb_actor_splits = HashMap::new(); @@ -440,8 +438,14 @@ impl CatalogController { && let Some(upstream_actor_ids) = upstream_fragment_actors.get(&(m.upstream_fragment_id as _)) { - m.upstream_actor_id = - upstream_actor_ids.iter().map(|id| *id as _).collect(); + actor_upstreams + .entry(actor_id as _) + .or_default() + .try_insert( + m.upstream_fragment_id, + upstream_actor_ids.iter().map(|id| *id as _).collect(), + ) + .expect("non-duplicate"); } }); @@ -451,12 +455,6 @@ impl CatalogController { let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.to_protobuf()); let pb_expr_context = Some(expr_context.to_protobuf()); - let pb_upstream_actor_id = upstream_fragment_actors - .values() - .flatten() - .map(|&id| id as _) - .collect(); - let pb_dispatcher = actor_dispatcher .remove(&actor_id) .unwrap_or_default() @@ -476,12 +474,13 @@ impl CatalogController { pb_actor_splits.insert(actor_id as _, splits.to_protobuf()); } + #[expect(deprecated)] pb_actors.push(PbStreamActor { actor_id: actor_id as _, fragment_id: fragment_id as _, nodes: pb_nodes, dispatcher: pb_dispatcher, - upstream_actor_id: pb_upstream_actor_id, + upstream_actor_id: vec![], vnode_bitmap: pb_vnode_bitmap, mview_definition: "".to_owned(), expr_context: pb_expr_context, @@ -501,7 +500,12 @@ impl CatalogController { maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), }; - Ok((pb_fragment, pb_actor_status, pb_actor_splits)) + Ok(( + pb_fragment, + actor_upstreams, + pb_actor_status, + pb_actor_splits, + )) } pub async fn running_fragment_parallelisms( @@ -1205,7 +1209,7 @@ impl CatalogController { pub async fn all_node_actors( &self, include_inactive: bool, - ) -> MetaResult>> { + ) -> MetaResult>> { let inner = self.inner.read().await; let fragment_actors = if include_inactive { Fragment::find() @@ -1238,14 +1242,15 @@ impl CatalogController { } } - let (table_fragments, actor_status, _) = + let (table_fragments, mut actor_upstreams, actor_status, _) = Self::compose_fragment(fragment, actors, dispatcher_info)?; for actor in table_fragments.actors { let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId; - node_actors - .entry(node_id) - .or_insert_with(Vec::new) - .push(actor); + node_actors.entry(node_id).or_insert_with(Vec::new).push({ + let actor_upstreams = + actor_upstreams.remove(&actor.actor_id).unwrap_or_default(); + (actor, actor_upstreams) + }); } } @@ -1620,12 +1625,12 @@ impl CatalogController { #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashMap}; + use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount}; use risingwave_common::util::iter_util::ZipEqDebug; - use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut}; + use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::{ @@ -1638,13 +1643,14 @@ mod tests { use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment}; use risingwave_pb::plan_common::PbExprContext; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; - use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; + use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ Dispatcher, MergeNode, PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, PbStreamNode, PbUnionNode, }; use crate::controller::catalog::CatalogController; + use crate::model::{ActorUpstreams, FragmentActorUpstreams}; use crate::MetaResult; const TEST_FRAGMENT_ID: FragmentId = 1; @@ -1664,24 +1670,24 @@ mod tests { }] } - fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> BTreeMap> { + fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams { let mut upstream_actor_ids = BTreeMap::new(); - upstream_actor_ids.insert(TEST_UPSTREAM_FRAGMENT_ID, vec![(actor_id + 100) as ActorId]); upstream_actor_ids.insert( - TEST_UPSTREAM_FRAGMENT_ID + 1, - vec![(actor_id + 200) as ActorId], + TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId, + HashSet::from_iter([(actor_id + 100)]), + ); + upstream_actor_ids.insert( + (TEST_UPSTREAM_FRAGMENT_ID + 1) as _, + HashSet::from_iter([(actor_id + 200)]), ); upstream_actor_ids } - fn generate_merger_stream_node( - actor_upstream_actor_ids: &BTreeMap>, - ) -> PbStreamNode { + fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode { let mut input = vec![]; - for (upstream_fragment_id, upstream_actor_ids) in actor_upstream_actor_ids { + for upstream_fragment_id in actor_upstream_actor_ids.keys() { input.push(PbStreamNode { node_body: Some(PbNodeBody::Merge(Box::new(MergeNode { - upstream_actor_id: upstream_actor_ids.iter().map(|id| *id as _).collect(), upstream_fragment_id: *upstream_fragment_id as _, ..Default::default() }))), @@ -1699,8 +1705,7 @@ mod tests { #[tokio::test] async fn test_extract_fragment() -> MetaResult<()> { let actor_count = 3u32; - let upstream_actor_ids: HashMap>> = (0 - ..actor_count) + let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count) .map(|actor_id| { ( actor_id as _, @@ -1726,11 +1731,6 @@ mod tests { fragment_id: TEST_FRAGMENT_ID as _, nodes: Some(stream_node), dispatcher: generate_dispatchers_for_actor(actor_id), - upstream_actor_id: actor_upstream_actor_ids - .values() - .flatten() - .map(|id| *id as _) - .collect(), vnode_bitmap: actor_bitmaps .get(&actor_id) .cloned() @@ -1740,6 +1740,7 @@ mod tests { time_zone: String::from("America/New_York"), strict_mode: false, }), + ..Default::default() } }) .collect_vec(); @@ -1773,42 +1774,31 @@ mod tests { let (fragment, actors, actor_dispatchers) = CatalogController::extract_fragment_and_actors( TEST_JOB_ID, - pb_fragment.clone(), + &pb_fragment, + Some(&upstream_actor_ids), &pb_actor_status, &pb_actor_splits, )?; - check_fragment_template(fragment.clone(), pb_actors.clone(), &upstream_actor_ids); + check_fragment_template(fragment.clone(), pb_actors.clone()); check_fragment(fragment, pb_fragment); - check_actors(actors, actor_dispatchers, pb_actors, pb_actor_splits); + check_actors( + actors, + &upstream_actor_ids, + actor_dispatchers, + pb_actors, + Default::default(), + ); Ok(()) } - fn check_fragment_template( - fragment: fragment::Model, - actors: Vec, - upstream_actor_ids: &HashMap>>, - ) { + fn check_fragment_template(fragment: fragment::Model, actors: Vec) { let stream_node_template = fragment.stream_node.to_protobuf(); - for PbStreamActor { - nodes, actor_id, .. - } in actors - { - let mut template_node = stream_node_template.clone(); + for PbStreamActor { nodes, .. } in actors { + let template_node = stream_node_template.clone(); let nodes = nodes.unwrap(); - let actor_upstream_actor_ids = - upstream_actor_ids.get(&(actor_id as _)).cloned().unwrap(); - visit_stream_node_mut(&mut template_node, |body| { - if let NodeBody::Merge(m) = body { - m.upstream_actor_id = actor_upstream_actor_ids - .get(&(m.upstream_fragment_id as _)) - .map(|actors| actors.iter().map(|id| *id as _).collect()) - .unwrap(); - } - }); - assert_eq!(nodes, template_node); } } @@ -1817,8 +1807,7 @@ mod tests { async fn test_compose_fragment() -> MetaResult<()> { let actor_count = 3u32; - let upstream_actor_ids: HashMap>> = (0 - ..actor_count) + let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count) .map(|actor_id| { ( actor_id as _, @@ -1851,7 +1840,20 @@ mod tests { status: ActorStatus::Running, splits: actor_splits, worker_id: 0, - upstream_actor_ids: ActorUpstreamActors(actor_upstream_actor_ids), + upstream_actor_ids: ActorUpstreamActors( + actor_upstream_actor_ids + .into_iter() + .map(|(fragment_id, actors_ids)| { + ( + fragment_id as _, + actors_ids + .into_iter() + .map(|actor_id| actor_id as _) + .collect(), + ) + }) + .collect(), + ), vnode_bitmap: actor_bitmaps .remove(&actor_id) .map(|bitmap| bitmap.to_protobuf()) @@ -1885,7 +1887,7 @@ mod tests { .upstream_actor_ids .into_inner() .into_keys() - .map(|k| (k, vec![])) + .map(|k| (k as _, Default::default())) .collect(); generate_merger_stream_node(&template_upstream_actor_ids) @@ -1902,27 +1904,35 @@ mod tests { vnode_count: VirtualNode::COUNT_FOR_TEST as _, }; - let (pb_fragment, pb_actor_status, pb_actor_splits) = CatalogController::compose_fragment( - fragment.clone(), - actors.clone(), - actor_dispatchers.clone(), - ) - .unwrap(); + let (pb_fragment, actor_upstreams, pb_actor_status, pb_actor_splits) = + CatalogController::compose_fragment( + fragment.clone(), + actors.clone(), + actor_dispatchers.clone(), + ) + .unwrap(); assert_eq!(pb_actor_status.len(), actor_count as usize); assert_eq!(pb_actor_splits.len(), actor_count as usize); let pb_actors = pb_fragment.actors.clone(); - check_fragment_template(fragment.clone(), pb_actors.clone(), &upstream_actor_ids); + check_fragment_template(fragment.clone(), pb_actors.clone()); check_fragment(fragment, pb_fragment); - check_actors(actors, actor_dispatchers, pb_actors, pb_actor_splits); + check_actors( + actors, + &actor_upstreams, + actor_dispatchers, + pb_actors, + pb_actor_splits, + ); Ok(()) } fn check_actors( actors: Vec, + actor_upstreams: &FragmentActorUpstreams, mut actor_dispatchers: HashMap>, pb_actors: Vec, pb_actor_splits: HashMap, @@ -1943,10 +1953,10 @@ mod tests { fragment_id: pb_fragment_id, nodes: pb_nodes, dispatcher: pb_dispatcher, - upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, mview_definition, expr_context: pb_expr_context, + .. }, ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter()) { @@ -1954,15 +1964,6 @@ mod tests { assert_eq!(fragment_id, pb_fragment_id as FragmentId); let upstream_actor_ids = upstream_actor_ids.into_inner(); - assert_eq!( - upstream_actor_ids - .values() - .flatten() - .map(|id| *id as u32) - .collect_vec(), - pb_upstream_actor_id - ); - let actor_dispatcher: Vec = actor_dispatchers .remove(&actor_id) .unwrap() @@ -1980,11 +1981,19 @@ mod tests { visit_stream_node(pb_nodes.as_ref().unwrap(), |body| { if let PbNodeBody::Merge(m) = body { - let upstream_actor_ids = upstream_actor_ids + let expected_upstream_actor_ids = upstream_actor_ids .get(&(m.upstream_fragment_id as _)) .map(|actors| actors.iter().map(|id| *id as u32).collect_vec()) .unwrap(); - assert_eq!(upstream_actor_ids, m.upstream_actor_id); + let upstream_actor_ids = actor_upstreams + .get(&(actor_id as _)) + .unwrap() + .get(&m.upstream_fragment_id) + .unwrap() + .iter() + .cloned() + .collect_vec(); + assert_eq!(expected_upstream_actor_ids, upstream_actor_ids); } }); diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 669c6494667b7..6dccd81dc3719 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -429,7 +429,7 @@ impl CatalogController { for_replace: bool, ) -> MetaResult<()> { let fragment_actors = - Self::extract_fragment_and_actors_from_fragments(stream_job_fragments.to_protobuf())?; + Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?; let all_tables = stream_job_fragments.all_tables(); let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -1117,10 +1117,7 @@ impl CatalogController { .map(|update| { ( update.upstream_fragment_id, - ( - update.new_upstream_fragment_id.unwrap(), - update.added_upstream_actor_id.clone(), - ), + update.new_upstream_fragment_id.unwrap(), ) }) .collect(); @@ -1185,15 +1182,13 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; visit_stream_node_mut(&mut stream_node, |body| { if let PbNodeBody::Merge(m) = body - && let Some((new_fragment_id, new_actor_ids)) = - fragment_replace_map.get(&m.upstream_fragment_id) + && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id) { m.upstream_fragment_id = *new_fragment_id; - m.upstream_actor_id.clone_from(new_actor_ids); } }); for fragment_id in &mut upstream_fragment_id.0 { - if let Some((new_fragment_id, _)) = fragment_replace_map.get(&(*fragment_id as _)) { + if let Some(new_fragment_id) = fragment_replace_map.get(&(*fragment_id as _)) { *fragment_id = *new_fragment_id as _; } } @@ -1632,31 +1627,28 @@ impl CatalogController { // add new actors for ( - PbStreamActor { - actor_id, - fragment_id, - nodes, - dispatcher, - upstream_actor_id, - vnode_bitmap, - expr_context, - .. - }, + ( + PbStreamActor { + actor_id, + fragment_id, + dispatcher, + vnode_bitmap, + expr_context, + .. + }, + node_actor_upstreams, + ), actor_status, ) in newly_created_actors { let mut actor_upstreams = BTreeMap::>::new(); let mut new_actor_dispatchers = vec![]; - if let Some(nodes) = &nodes { - visit_stream_node(nodes, |node| { - if let PbNodeBody::Merge(node) = node { - actor_upstreams - .entry(node.upstream_fragment_id as FragmentId) - .or_default() - .extend(node.upstream_actor_id.iter().map(|id| *id as ActorId)); - } - }); + for (fragment_id, upstream_actor_ids) in node_actor_upstreams { + actor_upstreams + .entry(fragment_id as FragmentId) + .or_default() + .extend(upstream_actor_ids.iter().map(|id| *id as ActorId)); } let actor_upstreams: BTreeMap> = actor_upstreams @@ -1664,20 +1656,6 @@ impl CatalogController { .map(|(k, v)| (k, v.into_iter().collect())) .collect(); - debug_assert_eq!( - actor_upstreams - .values() - .flatten() - .cloned() - .sorted() - .collect_vec(), - upstream_actor_id - .iter() - .map(|actor_id| *actor_id as i32) - .sorted() - .collect_vec() - ); - let actor_upstreams = ActorUpstreamActors(actor_upstreams); let splits = actor_splits diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index fe99a59a550ac..9841888d535ca 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -26,7 +26,7 @@ use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; -use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor}; +use risingwave_pb::stream_plan::PbDispatchStrategy; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; use tokio::time::{sleep, Instant}; @@ -37,7 +37,9 @@ use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo}; use crate::controller::fragment::FragmentParallelismInfo; use crate::manager::{LocalNotification, NotificationVersion}; -use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId}; +use crate::model::{ + ActorId, ClusterId, FragmentId, StreamActorWithUpstreams, StreamJobFragments, SubscriptionId, +}; use crate::stream::{JobReschedulePostUpdates, SplitAssignment}; use crate::telemetry::MetaTelemetryJobDesc; use crate::{MetaError, MetaResult}; @@ -586,13 +588,15 @@ impl MetadataManager { Ok(table_fragments) } - pub async fn all_active_actors(&self) -> MetaResult> { + pub async fn all_active_actors( + &self, + ) -> MetaResult> { let table_fragments = self.catalog_controller.table_fragments().await?; let mut actor_maps = HashMap::new(); for (_, tf) in table_fragments { for actor in tf.active_actors() { actor_maps - .try_insert(actor.actor_id, actor) + .try_insert(actor.0.actor_id, actor) .expect("non duplicate"); } } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index e11dfb5d27117..bacf07358f3ca 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -105,6 +105,10 @@ impl From for StreamingParallelism { } } +pub type ActorUpstreams = BTreeMap>; +pub type FragmentActorUpstreams = HashMap; +pub type StreamActorWithUpstreams = (StreamActor, ActorUpstreams); + /// Fragments of a streaming job. Corresponds to [`PbTableFragments`]. /// (It was previously called `TableFragments` due to historical reasons.) /// @@ -120,6 +124,7 @@ pub struct StreamJobFragments { /// The table fragments. pub fragments: BTreeMap, + pub actor_upstreams: BTreeMap, /// The status of actors pub actor_status: BTreeMap, @@ -199,9 +204,11 @@ impl StreamJobFragments { impl StreamJobFragments { /// Create a new `TableFragments` with state of `Initial`, with other fields empty. pub fn for_test(table_id: TableId, fragments: BTreeMap) -> Self { + let actor_upstreams = BTreeMap::new(); Self::new( table_id, fragments, + actor_upstreams, &BTreeMap::new(), StreamContext::default(), TableParallelism::Adaptive, @@ -214,6 +221,7 @@ impl StreamJobFragments { pub fn new( stream_job_id: TableId, fragments: BTreeMap, + actor_upstreams: BTreeMap, actor_locations: &BTreeMap, ctx: StreamContext, table_parallelism: TableParallelism, @@ -236,6 +244,7 @@ impl StreamJobFragments { stream_job_id, state: State::Initial, fragments, + actor_upstreams, actor_status, actor_splits: HashMap::default(), ctx, @@ -455,7 +464,7 @@ impl StreamJobFragments { /// Find the table job's `Union` fragment. /// Panics if not found. - pub fn union_fragment_for_table(&mut self) -> &mut Fragment { + pub fn union_fragment_for_table(&mut self) -> (&mut Fragment, &mut FragmentActorUpstreams) { let mut union_fragment_id = None; for (fragment_id, fragment) in &self.fragments { for actor in &fragment.actors { @@ -480,7 +489,10 @@ impl StreamJobFragments { .fragments .get_mut(&union_fragment_id) .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id)); - union_fragment + ( + union_fragment, + self.actor_upstreams.entry(union_fragment_id).or_default(), + ) } /// Resolve dependent table @@ -534,31 +546,50 @@ impl StreamJobFragments { } /// Returns the status of actors group by worker id. - pub fn active_actors(&self) -> Vec { + pub fn active_actors(&self) -> Vec { let mut actors = vec![]; for fragment in self.fragments.values() { for actor in &fragment.actors { if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 { continue; } - actors.push(actor.clone()); + actors.push(( + actor.clone(), + self.actor_upstreams + .get(&fragment.fragment_id) + .and_then(|actor_upstreams| actor_upstreams.get(&actor.actor_id)) + .cloned() + .unwrap_or_default(), + )); } } actors } - pub fn actors_to_create(&self) -> HashMap> { + pub fn actors_to_create(&self) -> HashMap> { let mut actor_map: HashMap<_, Vec<_>> = HashMap::new(); self.fragments .values() - .flat_map(|fragment| fragment.actors.iter()) - .for_each(|actor| { + .flat_map(|fragment| { + let actor_upstreams = self.actor_upstreams.get(&fragment.fragment_id); + fragment.actors.iter().map(move |actor| { + ( + actor, + actor_upstreams + .and_then(|actor_upstreams| actor_upstreams.get(&actor.actor_id)), + ) + }) + }) + .for_each(|(actor, actor_upstream)| { let worker_id = self .actor_status .get(&actor.actor_id) .expect("should exist") .worker_id() as WorkerId; - actor_map.entry(worker_id).or_default().push(actor.clone()); + actor_map + .entry(worker_id) + .or_default() + .push((actor.clone(), actor_upstream.cloned().unwrap_or_default())); }); actor_map } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 362e888a90fac..95cee4f9f1b9d 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -69,7 +69,7 @@ use crate::manager::{ LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType, IGNORED_NOTIFICATION_VERSION, }; -use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; +use crate::model::{FragmentActorUpstreams, StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{ create_source_worker, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, @@ -757,7 +757,17 @@ impl DdlController { if let Some(node) = &actor.nodes { visit_stream_node(node, |node| { if let NodeBody::Merge(merge_node) = node { - assert!(!merge_node.upstream_actor_id.is_empty(), "All the mergers for the union should have been fully assigned beforehand."); + let fragment_actor_upstreams = stream_job_fragments + .actor_upstreams + .get(&fragment.fragment_id) + .expect("should exist"); + let actor_upstreams = fragment_actor_upstreams + .get(&actor.actor_id) + .expect("should exist"); + let upstreams = actor_upstreams + .get(&merge_node.upstream_fragment_id) + .expect("should exist"); + assert!(!upstreams.is_empty(), "All the mergers for the union should have been fully assigned beforehand."); } }); } @@ -772,7 +782,10 @@ impl DdlController { sink_fragment: &PbFragment, table: &Table, replace_table_ctx: &mut ReplaceStreamJobContext, - union_fragment: &mut PbFragment, + (union_fragment, union_fragment_actor_upstreams): ( + &mut PbFragment, + &mut FragmentActorUpstreams, + ), unique_identity: Option<&str>, ) { let sink_actor_ids = sink_fragment @@ -863,7 +876,13 @@ impl DdlController { if let Some(NodeBody::Merge(merge_node)) = &mut merge_stream_node.node_body - && merge_node.upstream_actor_id.is_empty() + && union_fragment_actor_upstreams + .get(&actor.actor_id) + .and_then(|actor_upstream| { + actor_upstream.get(&merge_node.upstream_fragment_id) + }) + .map(|upstream_actor_ids| upstream_actor_ids.is_empty()) + .unwrap_or(true) { if let Some(sink_id) = sink_id { merge_stream_node.identity = @@ -873,13 +892,25 @@ impl DdlController { format!("ProjectExecutor(from sink {})", sink_id); } - **merge_node = MergeNode { - upstream_actor_id: sink_actor_ids.clone(), - upstream_fragment_id, - upstream_dispatcher_type: DispatcherType::Hash as _, - fields: sink_fields.to_vec(), + **merge_node = { + #[expect(deprecated)] + MergeNode { + upstream_actor_id: vec![], + upstream_fragment_id, + upstream_dispatcher_type: DispatcherType::Hash as _, + fields: sink_fields.to_vec(), + } }; + union_fragment_actor_upstreams + .entry(actor.actor_id) + .or_default() + .try_insert( + upstream_fragment_id, + HashSet::from_iter(sink_actor_ids.iter().cloned()), + ) + .expect("checked non-exist"); + merge_stream_node.fields = sink_fields.to_vec(); return false; @@ -892,11 +923,6 @@ impl DdlController { } } - // update downstream actors' upstream_actor_id and upstream_fragment_id - for actor in &mut union_fragment.actors { - actor.upstream_actor_id.extend(sink_actor_ids.clone()); - } - union_fragment .upstream_fragment_ids .push(upstream_fragment_id); @@ -1596,6 +1622,7 @@ impl DdlController { let ActorGraphBuildResult { graph, + actor_upstreams, building_locations, existing_locations, dispatchers, @@ -1617,6 +1644,7 @@ impl DdlController { let stream_job_fragments = StreamJobFragments::new( id.into(), graph, + actor_upstreams, &building_locations.actor_locations, stream_ctx.clone(), table_parallelism, @@ -1809,6 +1837,7 @@ impl DdlController { let ActorGraphBuildResult { graph, + actor_upstreams, building_locations, existing_locations, dispatchers, @@ -1829,6 +1858,7 @@ impl DdlController { let stream_job_fragments = StreamJobFragments::new( (tmp_job_id as u32).into(), graph, + actor_upstreams, &building_locations.actor_locations, stream_ctx, old_fragments.assigned_parallelism, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index bfbd226a36a96..c9f82e3ede5af 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -41,7 +41,7 @@ use risingwave_pb::meta::table_fragments::{self, ActorStatus, PbFragment, State} use risingwave_pb::meta::FragmentWorkerSlotMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamNode, + Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamActor, }; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -52,7 +52,7 @@ use tokio::time::{Instant, MissedTickBehavior}; use crate::barrier::{Command, Reschedule}; use crate::controller::scale::RescheduleWorkingSet; use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager}; -use crate::model::{ActorId, DispatcherId, FragmentId, TableParallelism}; +use crate::model::{ActorId, ActorUpstreams, DispatcherId, FragmentId, TableParallelism}; use crate::serving::{ to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping, ServingVnodeMapping, }; @@ -79,7 +79,6 @@ pub struct CustomActorInfo { pub actor_id: u32, pub fragment_id: u32, pub dispatcher: Vec, - pub upstream_actor_id: Vec, /// `None` if singleton. pub vnode_bitmap: Option, } @@ -90,7 +89,6 @@ impl From<&PbStreamActor> for CustomActorInfo { actor_id, fragment_id, dispatcher, - upstream_actor_id, vnode_bitmap, .. }: &PbStreamActor, @@ -99,7 +97,6 @@ impl From<&PbStreamActor> for CustomActorInfo { actor_id: *actor_id, fragment_id: *fragment_id, dispatcher: dispatcher.clone(), - upstream_actor_id: upstream_actor_id.clone(), vnode_bitmap: vnode_bitmap.as_ref().map(Bitmap::from), } } @@ -139,6 +136,7 @@ impl CustomFragmentInfo { } use educe::Educe; +use risingwave_common::util::stream_graph_visitor::visit_stream_node; use super::SourceChange; use crate::controller::id::IdCategory; @@ -529,9 +527,9 @@ impl ScaleController { status: _, splits: _, worker_id, - upstream_actor_ids, vnode_bitmap, expr_context, + .. }, ) in actors { @@ -546,12 +544,6 @@ impl ScaleController { actor_id: actor_id as _, fragment_id: fragment_id as _, dispatcher: dispatchers, - upstream_actor_id: upstream_actor_ids - .into_inner() - .values() - .flatten() - .map(|id| *id as _) - .collect(), vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())), }; @@ -589,13 +581,13 @@ impl ScaleController { actor_id, fragment_id, dispatcher, - upstream_actor_id, vnode_bitmap, } = actors.first().unwrap().clone(); let (related_job, job_definition) = related_jobs.get(&job_id).expect("job not found"); + #[expect(deprecated)] let fragment = CustomFragmentInfo { fragment_id: fragment_id as _, fragment_type_mask: fragment_type_mask as _, @@ -607,7 +599,7 @@ impl ScaleController { actor_id, fragment_id: fragment_id as _, dispatcher, - upstream_actor_id, + upstream_actor_id: vec![], vnode_bitmap: vnode_bitmap.map(|b| b.to_protobuf()), mview_definition: job_definition.to_owned(), expr_context: expr_contexts @@ -1197,6 +1189,7 @@ impl ScaleController { { let new_actor_id = actor_to_create.0; let mut new_actor = sample_actor.clone(); + let mut new_actor_upstream = ActorUpstreams::new(); // This should be assigned before the `modify_actor_upstream_and_downstream` call, // because we need to use the new actor id to find the upstream and @@ -1210,7 +1203,7 @@ impl ScaleController { &fragment_actor_bitmap, &no_shuffle_upstream_actor_map, &no_shuffle_downstream_actors_map, - &mut new_actor, + (&mut new_actor, &mut new_actor_upstream), )?; if let Some(bitmap) = fragment_actor_bitmap @@ -1220,7 +1213,7 @@ impl ScaleController { new_actor.vnode_bitmap = Some(bitmap.to_protobuf()); } - new_created_actors.insert(*new_actor_id, new_actor); + new_created_actors.insert(*new_actor_id, (new_actor, new_actor_upstream)); } } @@ -1433,11 +1426,11 @@ impl ScaleController { for (fragment_id, actors_to_create) in &fragment_actors_to_create { let mut created_actors = HashMap::new(); for (actor_id, worker_id) in actors_to_create { - let actor = new_created_actors.get(actor_id).cloned().unwrap(); + let (actor, actor_upstreams) = new_created_actors.get(actor_id).cloned().unwrap(); created_actors.insert( *actor_id, ( - actor, + (actor, actor_upstreams), ActorStatus { location: PbActorLocation::from_worker(*worker_id as _), state: ActorState::Inactive as i32, @@ -1565,7 +1558,7 @@ impl ScaleController { fragment_actor_bitmap: &HashMap>, no_shuffle_upstream_actor_map: &HashMap>, no_shuffle_downstream_actors_map: &HashMap>, - new_actor: &mut PbStreamActor, + (new_actor, actor_upstreams): (&mut StreamActor, &mut ActorUpstreams), ) -> MetaResult<()> { let fragment = &ctx.fragment_map[&new_actor.fragment_id]; let mut applied_upstream_fragment_actor_ids = HashMap::new(); @@ -1581,11 +1574,11 @@ impl ScaleController { DispatcherType::Unspecified => unreachable!(), DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => { let upstream_fragment = &ctx.fragment_map[upstream_fragment_id]; - let mut upstream_actor_ids = upstream_fragment + let mut upstream_actor_ids: HashSet<_> = upstream_fragment .actors .iter() .map(|actor| actor.actor_id as ActorId) - .collect_vec(); + .collect(); if let Some(upstream_actors_to_remove) = fragment_actors_to_remove.get(upstream_fragment_id) @@ -1600,10 +1593,8 @@ impl ScaleController { upstream_actor_ids.extend(upstream_actors_to_create.keys().cloned()); } - applied_upstream_fragment_actor_ids.insert( - *upstream_fragment_id as FragmentId, - upstream_actor_ids.clone(), - ); + applied_upstream_fragment_actor_ids + .insert(*upstream_fragment_id as FragmentId, upstream_actor_ids); } DispatcherType::NoShuffle => { let no_shuffle_upstream_actor_id = *no_shuffle_upstream_actor_map @@ -1613,36 +1604,24 @@ impl ScaleController { applied_upstream_fragment_actor_ids.insert( *upstream_fragment_id as FragmentId, - vec![no_shuffle_upstream_actor_id as ActorId], + HashSet::from_iter([no_shuffle_upstream_actor_id as ActorId]), ); } } } - new_actor.upstream_actor_id = applied_upstream_fragment_actor_ids - .values() - .flatten() - .cloned() - .collect_vec(); - - fn replace_merge_node_upstream( - stream_node: &mut StreamNode, - applied_upstream_fragment_actor_ids: &HashMap>, - ) { - if let Some(NodeBody::Merge(s)) = stream_node.node_body.as_mut() { - s.upstream_actor_id = applied_upstream_fragment_actor_ids - .get(&s.upstream_fragment_id) - .cloned() - .unwrap(); - } - - for child in &mut stream_node.input { - replace_merge_node_upstream(child, applied_upstream_fragment_actor_ids); - } - } - if let Some(node) = new_actor.nodes.as_mut() { - replace_merge_node_upstream(node, &applied_upstream_fragment_actor_ids); + visit_stream_node(node, |node_body| { + if let NodeBody::Merge(s) = node_body { + let upstream_actor_ids = applied_upstream_fragment_actor_ids + .get(&s.upstream_fragment_id) + .cloned() + .unwrap(); + actor_upstreams + .try_insert(s.upstream_fragment_id, upstream_actor_ids) + .expect("non-duplicate"); + } + }); } // Update downstream actor ids diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 4ad4575672259..abf5413b5a90b 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::num::NonZeroUsize; use std::sync::Arc; @@ -37,7 +37,9 @@ use super::id::GlobalFragmentIdsExt; use super::Locations; use crate::controller::cluster::StreamingClusterInfo; use crate::manager::{MetaSrvEnv, StreamingJob}; -use crate::model::{DispatcherId, FragmentId}; +use crate::model::{ + ActorUpstreams, DispatcherId, FragmentActorUpstreams, FragmentId, StreamActorWithUpstreams, +}; use crate::stream::stream_graph::fragment::{ CompleteStreamFragmentGraph, EdgeId, EitherFragment, StreamFragmentEdge, }; @@ -126,11 +128,18 @@ impl ActorBuilder { /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the /// compute nodes. /// 2. Fill the upstream mview info of the `Merge` node under the other "leaf" nodes. - fn rewrite(&self) -> MetaResult { - self.rewrite_inner(&self.nodes, 0) + fn rewrite(&self) -> MetaResult<(StreamNode, ActorUpstreams)> { + let mut actor_upstreams = ActorUpstreams::new(); + let node = self.rewrite_inner(&self.nodes, &mut actor_upstreams, 0)?; + Ok((node, actor_upstreams)) } - fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult { + fn rewrite_inner( + &self, + stream_node: &StreamNode, + actor_upstreams: &mut ActorUpstreams, + depth: usize, + ) -> MetaResult { match stream_node.get_node_body()? { // Leaf node `Exchange`. NodeBody::Exchange(exchange) => { @@ -150,12 +159,23 @@ impl ActorBuilder { link_id: stream_node.get_operator_id(), }]; + let upstream_fragment_id = upstreams.fragment_id.as_global_id(); + actor_upstreams + .try_insert( + upstream_fragment_id, + HashSet::from_iter(upstreams.actors.as_global_ids()), + ) + .expect("non-duplicate"); + Ok(StreamNode { - node_body: Some(NodeBody::Merge(Box::new(MergeNode { - upstream_actor_id: upstreams.actors.as_global_ids(), - upstream_fragment_id: upstreams.fragment_id.as_global_id(), - upstream_dispatcher_type: exchange.get_strategy()?.r#type, - fields: stream_node.get_fields().clone(), + node_body: Some(NodeBody::Merge(Box::new({ + #[expect(deprecated)] + MergeNode { + upstream_actor_id: vec![], + upstream_fragment_id, + upstream_dispatcher_type: exchange.get_strategy()?.r#type, + fields: stream_node.get_fields().clone(), + } }))), identity: "MergeExecutor".to_owned(), ..stream_node.clone() @@ -194,14 +214,22 @@ impl ActorBuilder { DispatcherType::NoShuffle as _ }; + let upstream_fragment_id = upstreams.fragment_id.as_global_id(); + actor_upstreams + .try_insert(upstream_fragment_id, HashSet::from_iter(upstream_actor_id)) + .expect("non-duplicate"); + let input = vec![ // Fill the merge node body with correct upstream info. StreamNode { - node_body: Some(NodeBody::Merge(Box::new(MergeNode { - upstream_actor_id, - upstream_fragment_id: upstreams.fragment_id.as_global_id(), - upstream_dispatcher_type, - fields: merge_node.fields.clone(), + node_body: Some(NodeBody::Merge(Box::new({ + #[expect(deprecated)] + MergeNode { + upstream_actor_id: vec![], + upstream_fragment_id, + upstream_dispatcher_type, + fields: merge_node.fields.clone(), + } }))), ..merge_node.clone() }, @@ -242,15 +270,23 @@ impl ActorBuilder { // So they both should have only one upstream actor. assert_eq!(upstream_actor_id.len(), 1); + let upstream_fragment_id = upstreams.fragment_id.as_global_id(); + actor_upstreams + .try_insert(upstream_fragment_id, HashSet::from_iter(upstream_actor_id)) + .expect("non-duplicate"); + // rewrite the input let input = vec![ // Fill the merge node body with correct upstream info. StreamNode { - node_body: Some(NodeBody::Merge(Box::new(MergeNode { - upstream_actor_id, - upstream_fragment_id: upstreams.fragment_id.as_global_id(), - upstream_dispatcher_type: DispatcherType::NoShuffle as _, - fields: merge_node.fields.clone(), + node_body: Some(NodeBody::Merge(Box::new({ + #[expect(deprecated)] + MergeNode { + upstream_actor_id: vec![], + upstream_fragment_id, + upstream_dispatcher_type: DispatcherType::NoShuffle as _, + fields: merge_node.fields.clone(), + } }))), ..merge_node.clone() }, @@ -269,7 +305,7 @@ impl ActorBuilder { .iter() .zip_eq_fast(&mut new_stream_node.input) { - *new_input = self.rewrite_inner(input, depth + 1)?; + *new_input = self.rewrite_inner(input, actor_upstreams, depth + 1)?; } Ok(new_stream_node) } @@ -277,31 +313,33 @@ impl ActorBuilder { } /// Build an actor after all the upstreams and downstreams are processed. - fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult { - let rewritten_nodes = self.rewrite()?; - - // TODO: store each upstream separately - let upstream_actor_id = self - .upstreams - .into_values() - .flat_map(|ActorUpstream { actors, .. }| actors.as_global_ids()) - .collect(); + fn build( + self, + job: &StreamingJob, + expr_context: ExprContext, + ) -> MetaResult { + let (rewritten_nodes, actor_upstreams) = self.rewrite()?; + // Only fill the definition when debug assertions enabled, otherwise use name instead. #[cfg(not(debug_assertions))] let mview_definition = job.name(); #[cfg(debug_assertions)] let mview_definition = job.definition(); - Ok(StreamActor { - actor_id: self.actor_id.as_global_id(), - fragment_id: self.fragment_id.as_global_id(), - nodes: Some(rewritten_nodes), - dispatcher: self.downstreams.into_values().collect(), - upstream_actor_id, - vnode_bitmap: self.vnode_bitmap.map(|b| b.to_protobuf()), - mview_definition, - expr_context: Some(expr_context), - }) + Ok(( + #[expect(deprecated)] + StreamActor { + actor_id: self.actor_id.as_global_id(), + fragment_id: self.fragment_id.as_global_id(), + nodes: Some(rewritten_nodes), + dispatcher: self.downstreams.into_values().collect(), + upstream_actor_id: vec![], + vnode_bitmap: self.vnode_bitmap.map(|b| b.to_protobuf()), + mview_definition, + expr_context: Some(expr_context), + }, + actor_upstreams, + )) } } @@ -628,6 +666,7 @@ impl ActorGraphBuildState { pub struct ActorGraphBuildResult { /// The graph of sealed fragments, including all actors. pub graph: BTreeMap, + pub actor_upstreams: BTreeMap, /// The scheduled locations of the actors to be built. pub building_locations: Locations, @@ -781,28 +820,37 @@ impl ActorGraphBuilder { } // Serialize the graph into a map of sealed fragments. - let graph = { + let (graph, actor_upstreams) = { let mut actors: HashMap> = HashMap::new(); + let mut fragment_actor_upstreams: BTreeMap<_, FragmentActorUpstreams> = BTreeMap::new(); // As all fragments are processed, we can now `build` the actors where the `Exchange` // and `Chain` are rewritten. for builder in actor_builders.into_values() { let fragment_id = builder.fragment_id(); - let actor = builder.build(job, expr_context.clone())?; + let (actor, actor_upstreams) = builder.build(job, expr_context.clone())?; + fragment_actor_upstreams + .entry(fragment_id.as_global_id()) + .or_default() + .try_insert(actor.actor_id, actor_upstreams) + .expect("non-duplicate"); actors.entry(fragment_id).or_default().push(actor); } - actors - .into_iter() - .map(|(fragment_id, actors)| { - let distribution = self.distributions[&fragment_id].clone(); - let fragment = - self.fragment_graph - .seal_fragment(fragment_id, actors, distribution); - let fragment_id = fragment_id.as_global_id(); - (fragment_id, fragment) - }) - .collect() + ( + actors + .into_iter() + .map(|(fragment_id, actors)| { + let distribution = self.distributions[&fragment_id].clone(); + let fragment = + self.fragment_graph + .seal_fragment(fragment_id, actors, distribution); + let fragment_id = fragment_id.as_global_id(); + (fragment_id, fragment) + }) + .collect(), + fragment_actor_upstreams, + ) }; // Convert the actor location map to the `Locations` struct. @@ -847,6 +895,7 @@ impl ActorGraphBuilder { Ok(ActorGraphBuildResult { graph, + actor_upstreams, building_locations, existing_locations, dispatchers, diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 500b28ba60a3a..3f281491b7d28 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -471,8 +471,11 @@ async fn test_graph_builder() -> MetaResult<()> { make_cluster_info(), NonZeroUsize::new(parallel_degree).unwrap(), )?; - let ActorGraphBuildResult { graph, .. } = - actor_graph_builder.generate_graph(&env, &job, expr_context)?; + let ActorGraphBuildResult { + graph, + actor_upstreams, + .. + } = actor_graph_builder.generate_graph(&env, &job, expr_context)?; let stream_job_fragments = StreamJobFragments::for_test(TableId::default(), graph); let actors = stream_job_fragments.actors(); @@ -534,8 +537,13 @@ async fn test_graph_builder() -> MetaResult<()> { .unwrap() .iter() .collect::>(), - merge_node - .get_upstream_actor_id() + actor_upstreams + .get(&actor.fragment_id) + .unwrap() + .get(&actor.actor_id) + .unwrap() + .get(&merge_node.upstream_fragment_id) + .unwrap() .iter() .collect::>(), ); diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 027b946d40c32..e1eb4091f37f2 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -32,6 +32,7 @@ use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID, VNODE_COUNT use risingwave_expr::ExprError; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::stream_plan::PbStreamActor; +use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors; use risingwave_rpc_client::MetaClient; use thiserror_ext::AsReport; use tokio_stream::StreamExt; @@ -41,7 +42,7 @@ use super::monitor::StreamingMetrics; use super::subtask::SubtaskHandle; use super::StreamConsumer; use crate::error::StreamResult; -use crate::task::{ActorId, LocalBarrierManager}; +use crate::task::{ActorId, FragmentId, LocalBarrierManager}; /// Shared by all operators of an actor. pub struct ActorContext { @@ -61,6 +62,7 @@ pub struct ActorContext { pub initial_dispatch_num: usize, // mv_table_id to subscription id pub related_subscriptions: Arc>>, + pub initial_upstream_actors: HashMap, // Meta client. currently used for auto schema change. `None` for test only pub meta_client: Option, @@ -84,17 +86,20 @@ impl ActorContext { // Set 1 for test to enable sanity check on table initial_dispatch_num: 1, related_subscriptions: HashMap::new().into(), + initial_upstream_actors: Default::default(), meta_client: None, streaming_config: Arc::new(StreamingConfig::default()), }) } + #[expect(clippy::too_many_arguments)] pub fn create( stream_actor: &PbStreamActor, total_mem_val: Arc>, streaming_metrics: Arc, initial_dispatch_num: usize, related_subscriptions: Arc>>, + initial_upstream_actors: HashMap, meta_client: Option, streaming_config: Arc, ) -> ActorContextRef { @@ -112,6 +117,7 @@ impl ActorContext { streaming_metrics, initial_dispatch_num, related_subscriptions, + initial_upstream_actors, meta_client, streaming_config, }) diff --git a/src/stream/src/from_proto/merge.rs b/src/stream/src/from_proto/merge.rs index c16b7e49e323f..cf530c85298de 100644 --- a/src/stream/src/from_proto/merge.rs +++ b/src/stream/src/from_proto/merge.rs @@ -33,11 +33,14 @@ impl MergeExecutorBuilder { node: &MergeNode, chunk_size: usize, ) -> StreamResult { - let upstreams = node.get_upstream_actor_id(); let upstream_fragment_id = node.get_upstream_fragment_id(); - let inputs: Vec<_> = upstreams - .iter() + let inputs: Vec<_> = actor_context + .initial_upstream_actors + .get(&node.upstream_fragment_id) + .map(|actors| actors.actors.iter()) + .into_iter() + .flatten() .map(|&upstream_actor_id| { new_input( &shared_context, diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index dba424afbd55c..e1c78dcc597e4 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -762,6 +762,8 @@ impl DatabaseManagedBarrierState { let subscriptions = LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone())); for actor in request.actors_to_build { + let upstream = actor.upstreams; + let actor = actor.actor.unwrap(); let actor_id = actor.actor_id; assert!(!is_stop_actor(actor_id)); assert!(new_actors.insert(actor_id)); @@ -769,6 +771,7 @@ impl DatabaseManagedBarrierState { let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor( actor, (*subscriptions).clone(), + upstream, self.current_shared_context.clone(), self.local_barrier_manager.clone(), ); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index f186ef604b949..f79d37e85949c 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -34,6 +34,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{StreamActor, StreamNode, StreamScanNode, StreamScanType}; +use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors; use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; use risingwave_pb::stream_service::{ StreamingControlStreamRequest, StreamingControlStreamResponse, @@ -603,6 +604,7 @@ impl StreamActorManager { actor: StreamActor, shared_context: Arc, related_subscriptions: Arc>>, + upstreams: HashMap, local_barrier_manager: LocalBarrierManager, ) -> StreamResult> { { @@ -614,6 +616,7 @@ impl StreamActorManager { self.streaming_metrics.clone(), actor.dispatcher.len(), related_subscriptions, + upstreams, self.env.meta_client().clone(), streaming_config, ); @@ -658,6 +661,7 @@ impl StreamActorManager { self: &Arc, actor: StreamActor, related_subscriptions: Arc>>, + upstreams: HashMap, current_shared_context: Arc, local_barrier_manager: LocalBarrierManager, ) -> (JoinHandle<()>, Option>) { @@ -670,7 +674,7 @@ impl StreamActorManager { format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition); let barrier_manager = local_barrier_manager.clone(); // wrap the future of `create_actor` with `boxed` to avoid stack overflow - let actor = self.clone().create_actor(actor, current_shared_context, related_subscriptions, barrier_manager.clone()).boxed().and_then(|actor| actor.run()).map(move |result| { + let actor = self.clone().create_actor(actor, current_shared_context, related_subscriptions, upstreams, barrier_manager.clone()).boxed().and_then(|actor| actor.run()).map(move |result| { if let Err(err) = result { // TODO: check error type and panic if it's unexpected. // Intentionally use `?` on the report to also include the backtrace.