Skip to content

Commit

Permalink
refactor: compare with origin bytes during the transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Oct 8, 2023
1 parent 0679178 commit be4c6fa
Show file tree
Hide file tree
Showing 18 changed files with 320 additions and 121 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl CatalogManager for KvBackendCatalogManager {
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return Ok(None);
};
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ api = { workspace = true }
arrow-flight.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bytes = "1.4"
common-catalog = { workspace = true }
common-error = { workspace = true }
common-grpc-expr.workspace = true
Expand Down
16 changes: 9 additions & 7 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::error::{
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
Expand All @@ -63,7 +64,7 @@ impl AlterTableProcedure {
pub fn new(
cluster_id: u64,
task: AlterTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
context: DdlContext,
) -> Result<Self> {
let alter_kind = task
Expand All @@ -74,7 +75,7 @@ impl AlterTableProcedure {
err_msg: "'kind' is absent",
})?;
let (kind, next_column_id) =
create_proto_alter_kind(&table_info_value.table_info, alter_kind)?;
create_proto_alter_kind(&table_info_value.inner.table_info, alter_kind)?;

debug!(
"New AlterTableProcedure, kind: {:?}, next_column_id: {:?}",
Expand All @@ -100,7 +101,7 @@ impl AlterTableProcedure {
})
.map_err(ProcedureError::external)?;
let (kind, next_column_id) =
create_proto_alter_kind(&data.table_info_value.table_info, alter_kind)
create_proto_alter_kind(&data.table_info_value.inner.table_info, alter_kind)
.map_err(ProcedureError::external)?;
assert_eq!(data.next_column_id, next_column_id);

Expand Down Expand Up @@ -191,7 +192,8 @@ impl AlterTableProcedure {
.await?
.with_context(|| TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
})?
.into_inner();

let leaders = find_leaders(&region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
Expand Down Expand Up @@ -413,7 +415,7 @@ pub struct AlterTableData {
state: AlterTableState,
task: AlterTableTask,
/// Table info value before alteration.
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
cluster_id: u64,
/// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>,
Expand All @@ -422,7 +424,7 @@ pub struct AlterTableData {
impl AlterTableData {
pub fn new(
task: AlterTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
cluster_id: u64,
next_column_id: Option<ColumnId>,
) -> Self {
Expand All @@ -444,7 +446,7 @@ impl AlterTableData {
}

fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
&self.table_info_value.inner.table_info
}
}

Expand Down
17 changes: 9 additions & 8 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::error::{self, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::DropTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
Expand All @@ -55,8 +56,8 @@ impl DropTableProcedure {
pub fn new(
cluster_id: u64,
task: DropTableTask,
table_route_value: TableRouteValue,
table_info_value: TableInfoValue,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
context: DdlContext,
) -> Self {
Self {
Expand Down Expand Up @@ -231,16 +232,16 @@ pub struct DropTableData {
pub state: DropTableState,
pub cluster_id: u64,
pub task: DropTableTask,
pub table_route_value: TableRouteValue,
pub table_info_value: TableInfoValue,
pub table_route_value: DeserializedValueWithBytes<TableRouteValue>,
pub table_info_value: DeserializedValueWithBytes<TableInfoValue>,
}

impl DropTableData {
pub fn new(
cluster_id: u64,
task: DropTableTask,
table_route_value: TableRouteValue,
table_info_value: TableInfoValue,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
) -> Self {
Self {
state: DropTableState::Prepare,
Expand All @@ -256,11 +257,11 @@ impl DropTableData {
}

fn region_routes(&self) -> &Vec<RegionRoute> {
&self.table_route_value.region_routes
&self.table_route_value.inner.region_routes
}

fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
&self.table_info_value.inner.table_info
}

fn table_id(&self) -> TableId {
Expand Down
9 changes: 5 additions & 4 deletions src/common/meta/src/ddl/truncate_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::ddl::DdlContext;
use crate::error::{Result, TableNotFoundSnafu};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::TruncateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
Expand Down Expand Up @@ -90,7 +91,7 @@ impl TruncateTableProcedure {
pub(crate) fn new(
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
context: DdlContext,
) -> Self {
Expand Down Expand Up @@ -188,15 +189,15 @@ pub struct TruncateTableData {
state: TruncateTableState,
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
}

impl TruncateTableData {
pub fn new(
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
) -> Self {
Self {
Expand All @@ -217,7 +218,7 @@ impl TruncateTableData {
}

fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
&self.table_info_value.inner.table_info
}

fn table_id(&self) -> TableId {
Expand Down
12 changes: 6 additions & 6 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::error::{
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable};
use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl DdlManager {
&self,
cluster_id: u64,
alter_table_task: AlterTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
) -> Result<ProcedureId> {
let context = self.create_context();

Expand Down Expand Up @@ -176,8 +176,8 @@ impl DdlManager {
&self,
cluster_id: u64,
drop_table_task: DropTableTask,
table_info_value: TableInfoValue,
table_route_value: TableRouteValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
) -> Result<ProcedureId> {
let context = self.create_context();

Expand All @@ -198,7 +198,7 @@ impl DdlManager {
&self,
cluster_id: u64,
truncate_table_task: TruncateTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
) -> Result<ProcedureId> {
let context = self.create_context();
Expand Down Expand Up @@ -252,7 +252,7 @@ async fn handle_truncate_table_task(
table_name: table_ref.to_string(),
})?;

let table_route = table_route_value.region_routes;
let table_route = table_route_value.inner.region_routes;

let id = ddl_manager
.submit_truncate_table_task(
Expand Down
Loading

0 comments on commit be4c6fa

Please sign in to comment.