Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Jan 7, 2025
1 parent b054958 commit f008d79
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 47 deletions.
5 changes: 5 additions & 0 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ impl CatalogWriter for CatalogWriterImpl {
mapping: ColIndexMapping,
drop_table_associated_source_id: u32,
) -> Result<()> {
tracing::info!(
"replace table drop table associated source: {:?}, table id {}",
drop_table_associated_source_id,
table.id
);
let version = self
.meta_client
.replace_job(
Expand Down
6 changes: 6 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ impl DdlServiceImpl {
table,
},
) => {
tracing::info!(
"replace table drop table associated source: {:?}, table id {}",
associated_source_id,
table.as_ref().unwrap().id
);

drop_table_associated_source_id = Some(associated_source_id);
StreamingJob::Table(None, table.unwrap(), TableJobType::General)
}
Expand Down
3 changes: 0 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::utils::collect_resp_info;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::catalog::ReleaseContext;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{StreamingJob, StreamingJobType};
Expand Down Expand Up @@ -109,8 +108,6 @@ pub struct ReplaceStreamJobPlan {
pub streaming_job: StreamingJob,
/// The temporary dummy job fragments id of new table fragment
pub tmp_id: u32,
// Objects to be dropped after replace job succeeds. Used in drop table associated source.
pub release_ctx: Option<ReleaseContext>,
}

impl ReplaceStreamJobPlan {
Expand Down
18 changes: 0 additions & 18 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ impl CommandContext {
new_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
None,
)
.await?;
barrier_manager_context
Expand Down Expand Up @@ -254,7 +253,6 @@ impl CommandContext {
stream_job_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
None,
)
.await?;

Expand Down Expand Up @@ -287,7 +285,6 @@ impl CommandContext {
new_fragments,
dispatchers,
init_split_assignment,
release_ctx,
..
},
) => {
Expand All @@ -300,7 +297,6 @@ impl CommandContext {
new_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
release_ctx.as_ref(),
)
.await?;

Expand All @@ -314,20 +310,6 @@ impl CommandContext {
replace_plan.fragment_replacements(),
)
.await;

// handle release ctx: drop the objects after replace job succeeds. For drop table associated source.
if let Some(release_ctx) = release_ctx {
barrier_manager_context
.source_manager
.apply_source_change(SourceChange::DropSource {
dropped_source_ids: release_ctx
.removed_source_ids
.iter()
.map(|id| *id as _)
.collect(),
})
.await;
}
}

Command::CreateSubscription {
Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/controller/catalog/drop_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ impl CatalogController {
object::Column::SchemaId,
object::Column::DatabaseId,
])
.join(JoinType::InnerJoin, object::Relation::Table.def())
.filter(
table::Column::BelongsToJobId.is_in(release_ctx.removed_state_table_ids.clone()),
)
.filter(object::Column::Oid.is_in(release_ctx.removed_state_table_ids.clone()))
.into_partial_model()
.all(txn)
.await?;
Expand All @@ -55,6 +52,11 @@ impl CatalogController {
.all(txn)
.await?;

tracing::info!(
"drop_table_associated_source: to_drop_objects: {:?}",
to_drop_objects
);

// delete all in to_drop_objects.
let res = Object::delete_many()
.filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
Expand Down
36 changes: 17 additions & 19 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,18 +624,10 @@ impl CatalogController {
actor_ids: Vec<crate::model::ActorId>,
new_actor_dispatchers: HashMap<crate::model::ActorId, Vec<PbDispatcher>>,
split_assignment: &SplitAssignment,
release_ctx: Option<&ReleaseContext>,
) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
// apply release ctx: drop the objects after replace job succeeds. For drop table associated source.
if let Some(release_ctx) = release_ctx {
tracing::info!("apply release ctx: {:?}", release_ctx);
notification_objs = Some(self.drop_table_associated_source(&txn, release_ctx).await?);
}

Actor::update_many()
.col_expr(
actor::Column::Status,
Expand Down Expand Up @@ -688,16 +680,6 @@ impl CatalogController {
.await?;

txn.commit().await?;

if let Some((user_infos, to_drop_objects)) = notification_objs {
self.notify_users_update(user_infos).await;
self.notify_frontend(
NotificationOperation::Delete,
build_relation_group_for_delete(to_drop_objects),
)
.await;
}

Ok(())
}

Expand Down Expand Up @@ -994,6 +976,7 @@ impl CatalogController {
merge_updates: Vec<PbMergeUpdate>,
col_index_mapping: Option<ColIndexMapping>,
sink_into_table_context: SinkIntoTableContext,
release_ctx: Option<&ReleaseContext>,
) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand All @@ -1008,6 +991,11 @@ impl CatalogController {
)
.await?;

let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
if let Some(release_ctx) = release_ctx {
notification_objs = Some(self.drop_table_associated_source(&txn, release_ctx).await?);
}

txn.commit().await?;

// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
Expand All @@ -1017,13 +1005,23 @@ impl CatalogController {
// .await;
self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
.await;
let version = self
let mut version = self
.notify_frontend(
NotificationOperation::Update,
NotificationInfo::RelationGroup(PbRelationGroup { relations }),
)
.await;

if let Some((user_infos, to_drop_objects)) = notification_objs {
self.notify_users_update(user_infos).await;
version = self
.notify_frontend(
NotificationOperation::Delete,
build_relation_group_for_delete(to_drop_objects),
)
.await;
}

Ok(version)
}

Expand Down
23 changes: 23 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,7 @@ impl DdlController {
dropping_sink_id: Some(sink_id),
updated_sink_catalogs: vec![],
},
None, // release_ctx is None because we already drop the objects and not require atomic drop things when replacing table.
)
.await?;
Ok(version)
Expand Down Expand Up @@ -1373,6 +1374,7 @@ impl DdlController {
tracing::debug!(id = job_id, "building replace streaming job");
let mut updated_sink_catalogs = vec![];

let mut release_ctx = None;
let result: MetaResult<Vec<PbMergeUpdate>> = try {
let (mut ctx, mut stream_job_fragments) = self
.build_replace_job(
Expand All @@ -1384,6 +1386,7 @@ impl DdlController {
drop_table_associated_source_id,
)
.await?;
release_ctx = ctx.release_ctx.clone();

if let StreamingJob::Table(_, table, ..) = &streaming_job {
let catalogs = self
Expand Down Expand Up @@ -1446,8 +1449,28 @@ impl DdlController {
dropping_sink_id: None,
updated_sink_catalogs,
},
release_ctx.as_ref(),
)
.await?;
if let Some(release_ctx) = &release_ctx {
// drop table associated source, we have already drop the objects in catalog above and need to run command to clean up the hummock entry.
self.source_manager
.apply_source_change(SourceChange::DropSource {
dropped_source_ids: release_ctx.removed_source_ids.clone(),
})
.await;
self.stream_manager
.drop_streaming_jobs(
risingwave_common::catalog::DatabaseId::from(
release_ctx.database_id as u32,
),
vec![],
vec![],
release_ctx.removed_state_table_ids.clone(),
HashSet::new(),
)
.await;
}
Ok(version)
}
Err(err) => {
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ impl GlobalStreamManager {
init_split_assignment,
streaming_job,
tmp_id: tmp_table_id.table_id,
release_ctx: None, /* release_ctx is None because we already drop the objects and not require atomic drop things when replacing table. */
});
}

Expand Down Expand Up @@ -456,7 +455,6 @@ impl GlobalStreamManager {
dispatchers,
tmp_id,
streaming_job,
release_ctx,
..
}: ReplaceStreamJobContext,
) -> MetaResult<()> {
Expand All @@ -468,6 +466,10 @@ impl GlobalStreamManager {
} else {
self.source_manager.allocate_splits(&tmp_table_id).await?
};
tracing::info!(
"replace_stream_job - allocate split: {:?}",
init_split_assignment
);

self.barrier_scheduler
.run_config_change_command_with_pause(
Expand All @@ -480,7 +482,6 @@ impl GlobalStreamManager {
init_split_assignment,
streaming_job,
tmp_id,
release_ctx,
}),
)
.await?;
Expand Down

0 comments on commit f008d79

Please sign in to comment.