Skip to content

Commit

Permalink
feat: return table ids
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jan 19, 2024
1 parent 9d38330 commit 01785fe
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 18 deletions.
82 changes: 66 additions & 16 deletions src/common/meta/src/ddl/create_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,27 @@ impl CreateLogicalTablesProcedure {
TableNameKey::new(catalog, schema, table)
})
.collect::<Vec<_>>();
let tables_name_values = manager
let already_exists_tables_ids = manager
.table_name_manager()
.batch_get(table_name_keys)
.await?;
.await?
.iter()
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();

// If all tables do not exist, we can create them.
if tables_name_values.iter().all(|x| x.is_none()) {
if already_exists_tables_ids.iter().all(|x| x.is_none()) {
self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
return Ok(Status::executing(true));
}

let tasks = &self.creator.data.tasks;
let mut filtered_tasks = Vec::with_capacity(tasks.len());
for (task, table_name_value) in tasks.iter().zip(tables_name_values) {
if table_name_value.is_some() {
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
if table_id.is_some() {
// If a table already exists, we just ignore it.
ensure!(
task.create_table.create_if_not_exists,
Expand All @@ -107,8 +113,18 @@ impl CreateLogicalTablesProcedure {
filtered_tasks.push(task.clone());
}

self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);
// reset tasks
self.creator.data.tasks = filtered_tasks;

if self.creator.data.tasks.is_empty() {
// If all tables already exist, we can skip the `DatanodeCreateRegions` stage.
self.creator.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(true));
}

self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
Ok(Status::executing(true))
}
Expand All @@ -129,17 +145,22 @@ impl CreateLogicalTablesProcedure {
pub async fn on_create_metadata(&self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;

let tables_data = self.creator.data.all_tables_data();
let region_numbers = self.creator.data.regin_numbers();
let physical_table_id = self.creator.data.physical_table_id();
let tables_data = self.creator.data.all_tables_data();
let num_tables = tables_data.len();

manager
.create_logic_tables_metadata(tables_data, region_numbers)
.await?;
if num_tables > 0 {
let region_numbers = self.creator.data.regin_numbers();
manager
.create_logic_tables_metadata(tables_data, region_numbers)
.await?;
}

info!("Created {num_tables} tables metadata for physical table {physical_table_id}");

// TODO(jeremy): Returns to the client
let _table_ids = self.creator.data.real_table_ids();

Ok(Status::Done)
}

Expand Down Expand Up @@ -264,11 +285,17 @@ impl TablesCreator {
physical_table_info: (TableId, Vec<RegionNumber>),
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
let table_ids_from_tasks = tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
Self {
data: CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_from_tasks,
table_ids_already_exists: Vec::new(),
physical_table_id: physical_table_info.0,
physical_region_numbers: physical_table_info.1,
region_wal_options,
Expand All @@ -279,19 +306,42 @@ impl TablesCreator {

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTablesData {
pub cluster_id: ClusterId,
pub state: CreateTablesState,
pub tasks: Vec<CreateTableTask>,
pub physical_table_id: TableId,
pub physical_region_numbers: Vec<RegionNumber>,
pub region_wal_options: HashMap<RegionNumber, String>,
cluster_id: ClusterId,
state: CreateTablesState,
tasks: Vec<CreateTableTask>,
table_ids_from_tasks: Vec<TableId>,
// Because the table_id is allocated before entering the distributed lock,
// it needs to recheck if the table exists when creating a table.
// If it does exist, then the table_id needs to be replaced with the existing one.
table_ids_already_exists: Vec<Option<TableId>>,
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
region_wal_options: HashMap<RegionNumber, String>,
}

impl CreateTablesData {
pub fn state(&self) -> &CreateTablesState {
&self.state
}

pub fn physical_table_id(&self) -> TableId {
self.physical_table_id
}

pub fn set_table_ids_already_exists(&mut self, table_ids_already_exists: Vec<Option<TableId>>) {
self.table_ids_already_exists = table_ids_already_exists;
}

pub fn real_table_ids(&self) -> Vec<TableId> {
self.table_ids_from_tasks
.iter()
.zip(self.table_ids_already_exists.iter())
.map(|(table_id_from_task, table_id_already_exists)| {
table_id_already_exists.unwrap_or(*table_id_from_task)
})
.collect::<Vec<_>>()
}

pub fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
self.tasks
.iter()
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ async fn handle_create_logical_table_tasks(
mut create_table_tasks: Vec<CreateTableTask>,
) -> Result<SubmitDdlTaskResponse> {
ensure!(!create_table_tasks.is_empty(), EmptyCreateTableTasksSnafu);
// Sets table_ids on create_table_tasks and return the physical table info
let physical_table_info = ddl_manager
.table_metadata_allocator
.set_table_ids_on_logic_create(&mut create_table_tasks)
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ async fn test_on_datanode_create_logical_regions() {
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: false }));
assert!(matches!(
procedure.creator.data.state,
CreateTablesState::CreateMetadata
procedure.creator.data.state(),
&CreateTablesState::CreateMetadata
));

handle.await.unwrap();
Expand Down

0 comments on commit 01785fe

Please sign in to comment.