diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 16ad8e427eebe..1aa5b9a2297a3 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -349,6 +349,11 @@ impl CatalogWriter for CatalogWriterImpl { mapping: ColIndexMapping, drop_table_associated_source_id: u32, ) -> Result<()> { + tracing::info!( + "replace table drop table associated source: {:?}, table id {}", + drop_table_associated_source_id, + table.id + ); let version = self .meta_client .replace_job( diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 8bdd0b4da3bec..1a1618878cc17 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -116,6 +116,12 @@ impl DdlServiceImpl { table, }, ) => { + tracing::info!( + "replace table drop table associated source: {:?}, table id {}", + associated_source_id, + table.as_ref().unwrap().id + ); + drop_table_associated_source_id = Some(associated_source_id); StreamingJob::Table(None, table.unwrap(), TableJobType::General) } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 5f8aecd19a433..09c7aa618352c 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -46,7 +46,6 @@ use super::info::{CommandFragmentChanges, InflightStreamingJobInfo}; use crate::barrier::info::BarrierInfo; use crate::barrier::utils::collect_resp_info; use crate::barrier::InflightSubscriptionInfo; -use crate::controller::catalog::ReleaseContext; use crate::controller::fragment::InflightFragmentInfo; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::{StreamingJob, StreamingJobType}; @@ -109,8 +108,6 @@ pub struct ReplaceStreamJobPlan { pub streaming_job: StreamingJob, /// The temporary dummy job fragments id of new table fragment pub tmp_id: u32, - // Objects to be dropped after replace job succeeds. Used in drop table associated source. - pub release_ctx: Option, } impl ReplaceStreamJobPlan { diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index 001088566e141..ea479e50ce73d 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -201,7 +201,6 @@ impl CommandContext { new_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, - None, ) .await?; barrier_manager_context @@ -254,7 +253,6 @@ impl CommandContext { stream_job_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, - None, ) .await?; @@ -287,7 +285,6 @@ impl CommandContext { new_fragments, dispatchers, init_split_assignment, - release_ctx, .. }, ) => { @@ -300,7 +297,6 @@ impl CommandContext { new_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, - release_ctx.as_ref(), ) .await?; @@ -314,20 +310,6 @@ impl CommandContext { replace_plan.fragment_replacements(), ) .await; - - // handle release ctx: drop the objects after replace job succeeds. For drop table associated source. - if let Some(release_ctx) = release_ctx { - barrier_manager_context - .source_manager - .apply_source_change(SourceChange::DropSource { - dropped_source_ids: release_ctx - .removed_source_ids - .iter() - .map(|id| *id as _) - .collect(), - }) - .await; - } } Command::CreateSubscription { diff --git a/src/meta/src/controller/catalog/drop_op.rs b/src/meta/src/controller/catalog/drop_op.rs index f6b002631ba6b..a5c0c62a28dae 100644 --- a/src/meta/src/controller/catalog/drop_op.rs +++ b/src/meta/src/controller/catalog/drop_op.rs @@ -34,10 +34,7 @@ impl CatalogController { object::Column::SchemaId, object::Column::DatabaseId, ]) - .join(JoinType::InnerJoin, object::Relation::Table.def()) - .filter( - table::Column::BelongsToJobId.is_in(release_ctx.removed_state_table_ids.clone()), - ) + .filter(object::Column::Oid.is_in(release_ctx.removed_state_table_ids.clone())) .into_partial_model() .all(txn) .await?; @@ -55,6 +52,11 @@ impl CatalogController { .all(txn) .await?; + tracing::info!( + "drop_table_associated_source: to_drop_objects: {:?}", + to_drop_objects + ); + // delete all in to_drop_objects. let res = Object::delete_many() .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid))) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 16b339a0a9af9..40951dd596a3d 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -624,18 +624,10 @@ impl CatalogController { actor_ids: Vec, new_actor_dispatchers: HashMap>, split_assignment: &SplitAssignment, - release_ctx: Option<&ReleaseContext>, ) -> MetaResult<()> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; - let mut notification_objs: Option<(Vec, Vec)> = None; - // apply release ctx: drop the objects after replace job succeeds. For drop table associated source. - if let Some(release_ctx) = release_ctx { - tracing::info!("apply release ctx: {:?}", release_ctx); - notification_objs = Some(self.drop_table_associated_source(&txn, release_ctx).await?); - } - Actor::update_many() .col_expr( actor::Column::Status, @@ -688,16 +680,6 @@ impl CatalogController { .await?; txn.commit().await?; - - if let Some((user_infos, to_drop_objects)) = notification_objs { - self.notify_users_update(user_infos).await; - self.notify_frontend( - NotificationOperation::Delete, - build_relation_group_for_delete(to_drop_objects), - ) - .await; - } - Ok(()) } @@ -994,6 +976,7 @@ impl CatalogController { merge_updates: Vec, col_index_mapping: Option, sink_into_table_context: SinkIntoTableContext, + release_ctx: Option<&ReleaseContext>, ) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -1008,6 +991,11 @@ impl CatalogController { ) .await?; + let mut notification_objs: Option<(Vec, Vec)> = None; + if let Some(release_ctx) = release_ctx { + notification_objs = Some(self.drop_table_associated_source(&txn, release_ctx).await?); + } + txn.commit().await?; // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table @@ -1017,13 +1005,23 @@ impl CatalogController { // .await; self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) .await; - let version = self + let mut version = self .notify_frontend( NotificationOperation::Update, NotificationInfo::RelationGroup(PbRelationGroup { relations }), ) .await; + if let Some((user_infos, to_drop_objects)) = notification_objs { + self.notify_users_update(user_infos).await; + version = self + .notify_frontend( + NotificationOperation::Delete, + build_relation_group_for_delete(to_drop_objects), + ) + .await; + } + Ok(version) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 6691bc0ba0ab3..ec2574552f5a5 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1252,6 +1252,7 @@ impl DdlController { dropping_sink_id: Some(sink_id), updated_sink_catalogs: vec![], }, + None, // release_ctx is None because we already drop the objects and not require atomic drop things when replacing table. ) .await?; Ok(version) @@ -1373,6 +1374,7 @@ impl DdlController { tracing::debug!(id = job_id, "building replace streaming job"); let mut updated_sink_catalogs = vec![]; + let mut release_ctx = None; let result: MetaResult> = try { let (mut ctx, mut stream_job_fragments) = self .build_replace_job( @@ -1384,6 +1386,7 @@ impl DdlController { drop_table_associated_source_id, ) .await?; + release_ctx = ctx.release_ctx.clone(); if let StreamingJob::Table(_, table, ..) = &streaming_job { let catalogs = self @@ -1446,8 +1449,28 @@ impl DdlController { dropping_sink_id: None, updated_sink_catalogs, }, + release_ctx.as_ref(), ) .await?; + if let Some(release_ctx) = &release_ctx { + // drop table associated source, we have already drop the objects in catalog above and need to run command to clean up the hummock entry. + self.source_manager + .apply_source_change(SourceChange::DropSource { + dropped_source_ids: release_ctx.removed_source_ids.clone(), + }) + .await; + self.stream_manager + .drop_streaming_jobs( + risingwave_common::catalog::DatabaseId::from( + release_ctx.database_id as u32, + ), + vec![], + vec![], + release_ctx.removed_state_table_ids.clone(), + HashSet::new(), + ) + .await; + } Ok(version) } Err(err) => { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 524d6383c5368..ef9117daf0f52 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -373,7 +373,6 @@ impl GlobalStreamManager { init_split_assignment, streaming_job, tmp_id: tmp_table_id.table_id, - release_ctx: None, /* release_ctx is None because we already drop the objects and not require atomic drop things when replacing table. */ }); } @@ -456,7 +455,6 @@ impl GlobalStreamManager { dispatchers, tmp_id, streaming_job, - release_ctx, .. }: ReplaceStreamJobContext, ) -> MetaResult<()> { @@ -468,6 +466,10 @@ impl GlobalStreamManager { } else { self.source_manager.allocate_splits(&tmp_table_id).await? }; + tracing::info!( + "replace_stream_job - allocate split: {:?}", + init_split_assignment + ); self.barrier_scheduler .run_config_change_command_with_pause( @@ -480,7 +482,6 @@ impl GlobalStreamManager { init_split_assignment, streaming_job, tmp_id, - release_ctx, }), ) .await?;