Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Jan 7, 2025
1 parent 546f0f3 commit b054958
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 53 deletions.
10 changes: 2 additions & 8 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::create_mv::parse_column_names;
use crate::handler::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::util::{
check_connector_match_connection_type, ensure_connection_type_allowed,
};
use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::{generic, LogicalSource, PartitionComputeInfo, StreamProject};
use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
Expand Down Expand Up @@ -503,11 +501,7 @@ pub(crate) async fn reparse_table_for_sink(
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> {
// Retrieve the original table definition and parse it to AST.
let definition = table_catalog.create_sql_ast()?;
let Statement::CreateTable {
name,
..
} = &definition
else {
let Statement::CreateTable { name, .. } = &definition else {
panic!("unexpected statement: {:?}", definition);
};
let table_name = name.clone();
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::utils::collect_resp_info;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::catalog::ReleaseContext;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{StreamingJob, StreamingJobType};
Expand Down Expand Up @@ -108,6 +109,8 @@ pub struct ReplaceStreamJobPlan {
pub streaming_job: StreamingJob,
/// The temporary dummy job fragments id of new table fragment
pub tmp_id: u32,
// Objects to be dropped after replace job succeeds. Used in drop table associated source.
pub release_ctx: Option<ReleaseContext>,
}

impl ReplaceStreamJobPlan {
Expand Down
18 changes: 18 additions & 0 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl CommandContext {
new_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
None,
)
.await?;
barrier_manager_context
Expand Down Expand Up @@ -253,6 +254,7 @@ impl CommandContext {
stream_job_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
None,
)
.await?;

Expand Down Expand Up @@ -285,6 +287,7 @@ impl CommandContext {
new_fragments,
dispatchers,
init_split_assignment,
release_ctx,
..
},
) => {
Expand All @@ -297,6 +300,7 @@ impl CommandContext {
new_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
release_ctx.as_ref(),
)
.await?;

Expand All @@ -310,6 +314,20 @@ impl CommandContext {
replace_plan.fragment_replacements(),
)
.await;

// handle release ctx: drop the objects after replace job succeeds. For drop table associated source.
if let Some(release_ctx) = release_ctx {
barrier_manager_context
.source_manager
.apply_source_change(SourceChange::DropSource {
dropped_source_ids: release_ctx
.removed_source_ids
.iter()
.map(|id| *id as _)
.collect(),
})
.await;
}
}

Command::CreateSubscription {
Expand Down
28 changes: 8 additions & 20 deletions src/meta/src/controller/catalog/drop_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ impl CatalogController {
// drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
pub async fn drop_table_associated_source(
&self,
txn: &DatabaseTransaction,
release_ctx: &ReleaseContext,
) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
let to_drop_source_objects: Vec<PartialObject> = Object::find()
.filter(object::Column::Oid.is_in(release_ctx.removed_source_ids.clone()))
.into_partial_model()
.all(&txn)
.all(txn)
.await?;
let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
.select_only()
Expand All @@ -41,7 +39,7 @@ impl CatalogController {
table::Column::BelongsToJobId.is_in(release_ctx.removed_state_table_ids.clone()),
)
.into_partial_model()
.all(&txn)
.all(txn)
.await?;
let to_drop_objects = to_drop_source_objects
.into_iter()
Expand All @@ -54,33 +52,23 @@ impl CatalogController {
.column(user_privilege::Column::UserId)
.filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
.into_tuple()
.all(&txn)
.all(txn)
.await?;

// delete all in to_drop_objects.
let res = Object::delete_many()
.filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
.exec(&txn)
.exec(txn)
.await?;
if res.rows_affected == 0 {
return Err(MetaError::catalog_id_not_found(
ObjectType::Source.as_str(),
release_ctx.removed_source_ids.first().unwrap(),
));
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;

txn.commit().await?;

self.notify_users_update(user_infos).await;
let version = self
.notify_frontend(
NotificationOperation::Delete,
build_relation_group_for_delete(to_drop_objects),
)
.await;

Ok(version)
Ok((user_infos, to_drop_objects))
}

pub async fn drop_relation(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub struct CatalogController {
pub(crate) inner: RwLock<CatalogControllerInner>,
}

#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct ReleaseContext {
pub(crate) database_id: DatabaseId,
pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
Expand Down
20 changes: 19 additions & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate;
use risingwave_pb::stream_plan::{
PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, PbStreamNode,
};
use risingwave_pb::user::PbUserInfo;
use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
use sea_orm::ActiveValue::Set;
use sea_orm::{
Expand All @@ -63,7 +64,7 @@ use sea_orm::{
};

use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
use crate::controller::catalog::CatalogController;
use crate::controller::catalog::{CatalogController, ReleaseContext};
use crate::controller::rename::ReplaceTableExprRewriter;
use crate::controller::utils::{
build_relation_group_for_delete, check_relation_name_duplicate, check_sink_into_table_cycle,
Expand Down Expand Up @@ -623,10 +624,18 @@ impl CatalogController {
actor_ids: Vec<crate::model::ActorId>,
new_actor_dispatchers: HashMap<crate::model::ActorId, Vec<PbDispatcher>>,
split_assignment: &SplitAssignment,
release_ctx: Option<&ReleaseContext>,
) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
// apply release ctx: drop the objects after replace job succeeds. For drop table associated source.
if let Some(release_ctx) = release_ctx {
tracing::info!("apply release ctx: {:?}", release_ctx);
notification_objs = Some(self.drop_table_associated_source(&txn, release_ctx).await?);
}

Actor::update_many()
.col_expr(
actor::Column::Status,
Expand Down Expand Up @@ -680,6 +689,15 @@ impl CatalogController {

txn.commit().await?;

if let Some((user_infos, to_drop_objects)) = notification_objs {
self.notify_users_update(user_infos).await;
self.notify_frontend(
NotificationOperation::Delete,
build_relation_group_for_delete(to_drop_objects),
)
.await;
}

Ok(())
}

Expand Down
30 changes: 7 additions & 23 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1269,12 +1269,7 @@ impl DdlController {
}
}?;
}
self.apply_release_context(release_ctx).await;

Ok(version)
}

async fn apply_release_context(&self, release_ctx: ReleaseContext) {
let ReleaseContext {
database_id,
removed_streaming_job_ids,
Expand Down Expand Up @@ -1322,6 +1317,7 @@ impl DdlController {
removed_fragments.iter().map(|id| *id as _).collect(),
)
.await;
Ok(version)
}

/// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
Expand Down Expand Up @@ -1377,9 +1373,6 @@ impl DdlController {
tracing::debug!(id = job_id, "building replace streaming job");
let mut updated_sink_catalogs = vec![];

// The release_ctx keep info to drop from catalog. But we do not drop it until the replace job succeeds.
// If replace job fails, we still need the old state table info to restore the job.
let mut release_ctx = None;
let result: MetaResult<Vec<PbMergeUpdate>> = try {
let (mut ctx, mut stream_job_fragments) = self
.build_replace_job(
Expand All @@ -1391,7 +1384,6 @@ impl DdlController {
drop_table_associated_source_id,
)
.await?;
release_ctx = ctx.release_ctx.clone();

if let StreamingJob::Table(_, table, ..) = &streaming_job {
let catalogs = self
Expand Down Expand Up @@ -1439,9 +1431,10 @@ impl DdlController {
merge_updates
};

let version = match result {
match result {
Ok(merge_updates) => {
self.metadata_manager
let version = self
.metadata_manager
.catalog_controller
.finish_replace_streaming_job(
tmp_id,
Expand All @@ -1454,7 +1447,8 @@ impl DdlController {
updated_sink_catalogs,
},
)
.await?
.await?;
Ok(version)
}
Err(err) => {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
Expand All @@ -1464,19 +1458,9 @@ impl DdlController {
.await.inspect_err(|err| {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
});
return Err(err);
Err(err)
}
};

// If replace job fails, we still need the old state table info to restore the job so we wait for the replace job run command to finish.
if let Some(release_ctx) = release_ctx {
// todo: apply release will do another run command
// only delete the catalog after the replace job succeeds
self.metadata_manager.catalog_controller.drop_table_associated_source(&release_ctx).await?;
self.apply_release_context(release_ctx).await;
}

Ok(version)
}

async fn drop_streaming_job(
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl GlobalStreamManager {
init_split_assignment,
streaming_job,
tmp_id: tmp_table_id.table_id,
release_ctx: None, /* release_ctx is None because we already drop the objects and not require atomic drop things when replacing table. */
});
}

Expand Down Expand Up @@ -455,6 +456,7 @@ impl GlobalStreamManager {
dispatchers,
tmp_id,
streaming_job,
release_ctx,
..
}: ReplaceStreamJobContext,
) -> MetaResult<()> {
Expand All @@ -478,6 +480,7 @@ impl GlobalStreamManager {
init_split_assignment,
streaming_job,
tmp_id,
release_ctx,
}),
)
.await?;
Expand Down

0 comments on commit b054958

Please sign in to comment.