Skip to content

Commit

Permalink
refactor(meta): deprecate dispatcher table and compose dispatcher by …
Browse files Browse the repository at this point in the history
…fragment relation
  • Loading branch information
wenym1 committed Feb 19, 2025
1 parent 80b265d commit 878575c
Show file tree
Hide file tree
Showing 15 changed files with 543 additions and 718 deletions.
8 changes: 0 additions & 8 deletions src/meta/model/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ pub struct Model {

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::actor_dispatcher::Entity")]
ActorDispatcher,
#[sea_orm(
belongs_to = "super::fragment::Entity",
from = "Column::FragmentId",
Expand All @@ -77,12 +75,6 @@ pub enum Relation {
Fragment,
}

impl Related<super::actor_dispatcher::Entity> for Entity {
fn to() -> RelationDef {
Relation::ActorDispatcher.def()
}
}

impl Related<super::fragment::Entity> for Entity {
fn to() -> RelationDef {
Relation::Fragment.def()
Expand Down
80 changes: 1 addition & 79 deletions src/meta/model/src/actor_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

use std::hash::Hash;

use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType};
use risingwave_pb::stream_plan::PbDispatcherType;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{ActorId, ActorMapping, FragmentId, I32Array};

#[derive(
Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
)]
Expand Down Expand Up @@ -57,79 +55,3 @@ impl From<DispatcherType> for PbDispatcherType {
}
}
}

impl From<(u32, PbDispatcher)> for Model {
fn from((actor_id, dispatcher): (u32, PbDispatcher)) -> Self {
Self {
id: 0,
actor_id: actor_id as _,
dispatcher_type: dispatcher.r#type().into(),
dist_key_indices: dispatcher.dist_key_indices.into(),
output_indices: dispatcher.output_indices.into(),
hash_mapping: dispatcher.hash_mapping.as_ref().map(ActorMapping::from),
dispatcher_id: dispatcher.dispatcher_id as _,
downstream_actor_ids: dispatcher.downstream_actor_id.into(),
}
}
}

impl From<Model> for PbDispatcher {
fn from(model: Model) -> Self {
Self {
r#type: PbDispatcherType::from(model.dispatcher_type) as _,
dist_key_indices: model.dist_key_indices.into_u32_array(),
output_indices: model.output_indices.into_u32_array(),
hash_mapping: model.hash_mapping.map(|mapping| mapping.to_protobuf()),
dispatcher_id: model.dispatcher_id as _,
downstream_actor_id: model.downstream_actor_ids.into_u32_array(),
}
}
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Deserialize, Serialize)]
#[sea_orm(table_name = "actor_dispatcher")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub actor_id: ActorId,
pub dispatcher_type: DispatcherType,
pub dist_key_indices: I32Array,
pub output_indices: I32Array,
pub hash_mapping: Option<ActorMapping>,
pub dispatcher_id: FragmentId,
pub downstream_actor_ids: I32Array,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::actor::Entity",
from = "Column::ActorId",
to = "super::actor::Column::ActorId",
on_update = "NoAction",
on_delete = "Cascade"
)]
Actor,
#[sea_orm(
belongs_to = "super::fragment::Entity",
from = "Column::DispatcherId",
to = "super::fragment::Column::FragmentId",
on_update = "NoAction",
on_delete = "Cascade"
)]
Fragment,
}

impl Related<super::actor::Entity> for Entity {
fn to() -> RelationDef {
Relation::Actor.def()
}
}

impl Related<super::fragment::Entity> for Entity {
fn to() -> RelationDef {
Relation::Fragment.def()
}
}

impl ActiveModelBehavior for ActiveModel {}
8 changes: 0 additions & 8 deletions src/meta/model/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ impl From<PbFragmentDistributionType> for DistributionType {
pub enum Relation {
#[sea_orm(has_many = "super::actor::Entity")]
Actor,
#[sea_orm(has_many = "super::actor_dispatcher::Entity")]
ActorDispatcher,
#[sea_orm(
belongs_to = "super::object::Entity",
from = "Column::JobId",
Expand All @@ -84,12 +82,6 @@ impl Related<super::actor::Entity> for Entity {
}
}

impl Related<super::actor_dispatcher::Entity> for Entity {
fn to() -> RelationDef {
Relation::ActorDispatcher.def()
}
}

impl Related<super::object::Entity> for Entity {
fn to() -> RelationDef {
Relation::Object.def()
Expand Down
1 change: 0 additions & 1 deletion src/meta/model/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
pub use {Source as SourceModel, Table as TableModel};

pub use super::actor::Entity as Actor;
pub use super::actor_dispatcher::Entity as ActorDispatcher;
pub use super::catalog_version::Entity as CatalogVersion;
pub use super::cluster::Entity as Cluster;
pub use super::compaction_config::Entity as CompactionConfig;
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/backup_restore/restore_impl/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Writer<MetadataV2> for WriterModelV2ToMetaStoreV2 {
insert_models(metadata.streaming_jobs.clone(), db).await?;
insert_models(metadata.fragments.clone(), db).await?;
insert_models(metadata.actors.clone(), db).await?;
insert_models(metadata.actor_dispatchers.clone(), db).await?;
insert_models(metadata.fragment_relation.clone(), db).await?;
insert_models(metadata.connections.clone(), db).await?;
insert_models(metadata.sources.clone(), db).await?;
insert_models(metadata.tables.clone(), db).await?;
Expand Down Expand Up @@ -175,7 +175,6 @@ macro_rules! for_all_auto_increment {
{"user", users, user_id},
{"user_privilege", user_privileges, id},
{"actor", actors, actor_id},
{"actor_dispatcher", actor_dispatchers, id},
{"fragment", fragments, fragment_id},
{"object_dependency", object_dependencies, id}
)
Expand Down
Loading

0 comments on commit 878575c

Please sign in to comment.