Skip to content

Commit

Permalink
chore: by comment
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jan 25, 2024
1 parent b0f1e5e commit c53f8cb
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 62 deletions.
9 changes: 1 addition & 8 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,7 @@ impl CreateLogicalTablesProcedure {
let num_tables = tables_data.len();

if num_tables > 0 {
let region_numbers = self.creator.data.regin_numbers();
manager
.create_logic_tables_metadata(tables_data, region_numbers)
.await?;
manager.create_logic_tables_metadata(tables_data).await?;
}

info!("Created {num_tables} tables metadata for physical table {physical_table_id}");
Expand Down Expand Up @@ -387,10 +384,6 @@ impl CreateTablesData {
})
.collect::<Vec<_>>()
}

fn regin_numbers(&self) -> Vec<RegionNumber> {
self.physical_region_numbers.clone()
}
}

#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl TableMetadataAllocator {
})
}

/// Sets table ids with all tasks and return the physical table info.
/// Sets table ids with all tasks.
pub async fn set_table_ids_on_logic_create(&self, tasks: &mut [CreateTableTask]) -> Result<()> {
for task in tasks {
let table_id = self.allocate_table_id(task).await?;
Expand Down
34 changes: 27 additions & 7 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use crate::key::table_route::TableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::DdlTask::{
AlterTable, CreateLogicalTables, CreateTable, DropTable, TruncateTable,
AlterLogicalTables, AlterTable, CreateLogicalTables, CreateTable, DropLogicalTables, DropTable,
TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
Expand Down Expand Up @@ -495,13 +496,20 @@ async fn handle_create_table_task(
region_wal_options,
)
.await?;
let output = output.context(error::ProcedureOutputSnafu)?;

let table_id = *(output.downcast_ref::<u32>().unwrap());
let procedure_id = id.to_string();
let output = output.context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "empty output",
})?;
let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!("Table: {table_id} is created via procedure_id {id:?}");

Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
key: procedure_id.into(),
table_id: Some(table_id),
..Default::default()
})
Expand Down Expand Up @@ -531,11 +539,21 @@ async fn handle_create_logical_table_tasks(

info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");

let output = output.context(ProcedureOutputSnafu)?;
let table_ids = output.downcast_ref::<Vec<TableId>>().unwrap().clone();
let procedure_id = id.to_string();
let output = output.context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "empty output",
})?;
let table_ids = output
.downcast_ref::<Vec<TableId>>()
.context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "downcast to `Vec<TableId>`",
})?
.clone();

Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
key: procedure_id.into(),
table_ids,
..Default::default()
})
Expand Down Expand Up @@ -573,6 +591,8 @@ impl DdlTaskExecutor for DdlManager {
CreateLogicalTables(create_table_tasks) => {
handle_create_logical_table_tasks(self, cluster_id, create_table_tasks).await
}
DropLogicalTables(_) => todo!(),
AlterLogicalTables(_) => todo!(),
}
}
.trace(span)
Expand Down
10 changes: 8 additions & 2 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,14 @@ pub enum Error {
source: common_procedure::Error,
},

#[snafu(display("Failed to get procedure output"))]
ProcedureOutput { location: Location },
#[snafu(display(
"Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
))]
ProcedureOutput {
procedure_id: String,
err_msg: String,
location: Location,
},

#[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
ConvertRawTableInfo {
Expand Down
11 changes: 4 additions & 7 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ impl TableMetadataManager {
pub async fn create_logic_tables_metadata(
&self,
tables_data: Vec<(RawTableInfo, TableRouteValue)>,
region_numbers: Vec<RegionNumber>,
) -> Result<()> {
let len = tables_data.len();
let mut txns = Vec::with_capacity(3 * len);
Expand All @@ -477,7 +476,7 @@ impl TableMetadataManager {
}
let mut on_failures = Vec::with_capacity(len);
for (mut table_info, table_route_value) in tables_data {
table_info.meta.region_numbers = region_numbers.clone();
table_info.meta.region_numbers = table_route_value.region_numbers();
let table_id = table_info.ident.table_id;

// Creates table name.
Expand Down Expand Up @@ -1000,27 +999,25 @@ mod tests {
let table_route_value = TableRouteValue::physical(region_routes.clone());

let tables_data = vec![(table_info.clone(), table_route_value.clone())];
let region_numbers = vec![1];
// creates metadata.
table_metadata_manager
.create_logic_tables_metadata(tables_data.clone(), region_numbers.clone())
.create_logic_tables_metadata(tables_data.clone())
.await
.unwrap();

// if metadata was already created, it should be ok.
assert!(table_metadata_manager
.create_logic_tables_metadata(tables_data, region_numbers.clone())
.create_logic_tables_metadata(tables_data)
.await
.is_ok());

let mut modified_region_routes = region_routes.clone();
modified_region_routes.push(new_region_route(2, 3));
let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
let modified_region_numbers = vec![1, 2];
// if remote metadata was exists, it should return an error.
assert!(table_metadata_manager
.create_logic_tables_metadata(modified_tables_data, modified_region_numbers)
.create_logic_tables_metadata(modified_tables_data)
.await
.is_err());

Expand Down
118 changes: 81 additions & 37 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::result;

use api::v1::meta::submit_ddl_task_request::Task;
use api::v1::meta::{
AlterTableTask as PbAlterTableTask, CreateTableTask as PbCreateTableTask,
CreateTableTasks as PbCreateTableTasks, DropTableTask as PbDropTableTask, Partition,
AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks,
CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition,
SubmitDdlTaskRequest as PbSubmitDdlTaskRequest,
SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, TruncateTableTask as PbTruncateTableTask,
};
Expand All @@ -40,6 +41,8 @@ pub enum DdlTask {
AlterTable(AlterTableTask),
TruncateTable(TruncateTableTask),
CreateLogicalTables(Vec<CreateTableTask>),
DropLogicalTables(Vec<DropTableTask>),
AlterLogicalTables(Vec<AlterTableTask>),
}

impl DdlTask {
Expand Down Expand Up @@ -123,7 +126,7 @@ impl TryFrom<Task> for DdlTask {
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;

Ok(DdlTask::DropTable(tasks.into_iter().next().unwrap()))
Ok(DdlTask::DropLogicalTables(tasks))
}
Task::AlterTableTasks(alter_tables) => {
let tasks = alter_tables
Expand All @@ -132,7 +135,7 @@ impl TryFrom<Task> for DdlTask {
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;

Ok(DdlTask::AlterTable(tasks.into_iter().next().unwrap()))
Ok(DdlTask::AlterLogicalTables(tasks))
}
}
}
Expand All @@ -148,46 +151,34 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {

fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
let task = match request.task {
DdlTask::CreateTable(task) => Task::CreateTableTask(PbCreateTableTask {
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
create_table: Some(task.create_table),
partitions: task.partitions,
}),
DdlTask::DropTable(task) => Task::DropTableTask(PbDropTableTask {
drop_table: Some(DropTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
drop_if_exists: task.drop_if_exists,
}),
}),
DdlTask::AlterTable(task) => Task::AlterTableTask(PbAlterTableTask {
alter_table: Some(task.alter_table),
}),
DdlTask::TruncateTable(task) => Task::TruncateTableTask(PbTruncateTableTask {
truncate_table: Some(TruncateTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
}),
}),
DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
DdlTask::DropTable(task) => Task::DropTableTask(task.try_into()?),
DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
DdlTask::CreateLogicalTables(tasks) => {
let tasks = tasks
.into_iter()
.map(|task| {
Ok(PbCreateTableTask {
table_info: serde_json::to_vec(&task.table_info)
.context(error::SerdeJsonSnafu)?,
create_table: Some(task.create_table),
partitions: task.partitions,
})
})
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;

Task::CreateTableTasks(PbCreateTableTasks { tasks })
}
DdlTask::DropLogicalTables(tasks) => {
let tasks = tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;

Task::DropTableTasks(PbDropTableTasks { tasks })
}
DdlTask::AlterLogicalTables(tasks) => {
let tasks = tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;

Task::AlterTableTasks(PbAlterTableTasks { tasks })
}
};

Ok(Self {
Expand Down Expand Up @@ -284,6 +275,22 @@ impl TryFrom<PbDropTableTask> for DropTableTask {
}
}

impl TryFrom<DropTableTask> for PbDropTableTask {
type Error = error::Error;

fn try_from(task: DropTableTask) -> Result<Self> {
Ok(PbDropTableTask {
drop_table: Some(DropTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
drop_if_exists: task.drop_if_exists,
}),
})
}
}

#[derive(Debug, PartialEq, Clone)]
pub struct CreateTableTask {
pub create_table: CreateTableExpr,
Expand All @@ -307,6 +314,18 @@ impl TryFrom<PbCreateTableTask> for CreateTableTask {
}
}

impl TryFrom<CreateTableTask> for PbCreateTableTask {
type Error = error::Error;

fn try_from(task: CreateTableTask) -> Result<Self> {
Ok(PbCreateTableTask {
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
create_table: Some(task.create_table),
partitions: task.partitions,
})
}
}

impl CreateTableTask {
pub fn new(
expr: CreateTableExpr,
Expand Down Expand Up @@ -416,6 +435,16 @@ impl TryFrom<PbAlterTableTask> for AlterTableTask {
}
}

impl TryFrom<AlterTableTask> for PbAlterTableTask {
type Error = error::Error;

fn try_from(task: AlterTableTask) -> Result<Self> {
Ok(PbAlterTableTask {
alter_table: Some(task.alter_table),
})
}
}

impl Serialize for AlterTableTask {
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where
Expand Down Expand Up @@ -497,6 +526,21 @@ impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
}
}

impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
type Error = error::Error;

fn try_from(task: TruncateTableTask) -> Result<Self> {
Ok(PbTruncateTableTask {
truncate_table: Some(TruncateTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
}),
})
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down

0 comments on commit c53f8cb

Please sign in to comment.