Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: allocate table ids in the procedure #3293

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 82 additions & 75 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ impl CreateLogicalTablesProcedure {
Ok(Self { context, creator })
}

async fn on_prepare(&mut self) -> Result<Status> {
/// On the prepares step, it performs:
/// - Checks whether physical table exists.
/// - Checks whether logical tables exist.
/// - Allocates the table ids.
///
/// Abort(non-retry):
/// - The physical table does not exist.
/// - Failed to check whether tables exist.
/// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;

// Sets physical region numbers
Expand All @@ -80,7 +89,7 @@ impl CreateLogicalTablesProcedure {
.data
.set_physical_region_numbers(physical_region_numbers);

// Checks if the tables exists
// Checks if the tables exist
let table_name_keys = self
.creator
.data
Expand All @@ -96,24 +105,9 @@ impl CreateLogicalTablesProcedure {
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();

// Sets table ids already exists
self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);

// If all tables do not exists, we can create them directly.
if self.creator.data.is_all_tables_not_exists() {
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
return Ok(Status::executing(true));
}

// Filter out the tables that already exist.
let tasks = &self.creator.data.tasks;
let mut filtered_tasks = Vec::with_capacity(tasks.len());
for (task, table_id) in tasks
.iter()
.zip(self.creator.data.table_ids_already_exists().iter())
{
// Validates the tasks
let tasks = &mut self.creator.data.tasks;
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
if table_id.is_some() {
// If a table already exists, we just ignore it.
ensure!(
Expand All @@ -124,17 +118,34 @@ impl CreateLogicalTablesProcedure {
);
continue;
}
filtered_tasks.push(task.clone());
}

// Resets tasks
self.creator.data.tasks = filtered_tasks;
if self.creator.data.tasks.is_empty() {
// If all tables already exist, we can skip the `DatanodeCreateRegions` stage.
self.creator.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(true));
// If all tables already exist, returns the table_ids.
if already_exists_tables_ids.iter().all(Option::is_some) {
return Ok(Status::done_with_output(
already_exists_tables_ids
.into_iter()
.flatten()
.collect::<Vec<_>>(),
));
}

// Allocates table ids
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
let table_id = if let Some(table_id) = table_id {
*table_id
} else {
self.context
.table_metadata_allocator
.allocate_table_id(task)
.await?
};
task.set_table_id(table_id);
}

self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
Ok(Status::executing(true))
}
Expand All @@ -152,17 +163,20 @@ impl CreateLogicalTablesProcedure {
self.create_regions(region_routes).await
}

/// Creates table metadata
///
/// Abort(not-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(&self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;

let physical_table_id = self.creator.data.physical_table_id();
let tables_data = self.creator.data.all_tables_data();
let num_tables = tables_data.len();
let remaining_tasks = self.creator.data.remaining_tasks();
let num_tables = remaining_tasks.len();

if num_tables > 0 {
let chunk_size = manager.max_logical_tables_per_batch();
if num_tables > chunk_size {
let chunks = tables_data
let chunks = remaining_tasks
.into_iter()
.chunks(chunk_size)
.into_iter()
Expand All @@ -172,11 +186,21 @@ impl CreateLogicalTablesProcedure {
manager.create_logical_tables_metadata(chunk).await?;
}
} else {
manager.create_logical_tables_metadata(tables_data).await?;
manager
.create_logical_tables_metadata(remaining_tasks)
.await?;
}
}

let table_ids = self.creator.data.real_table_ids();
// The `table_id` MUST be collected after the [Prepare::Prepare],
// ensures the all `table_id`s have been allocated.
let table_ids = self
.creator
.data
.tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();

info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");

Expand Down Expand Up @@ -238,10 +262,10 @@ impl CreateLogicalTablesProcedure {
body: Some(PbRegionRequest::Creates(creates)),
};
create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(add_peer_context_if_needed(datanode)(err));
}
Ok(())
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}

Expand Down Expand Up @@ -310,17 +334,13 @@ impl TablesCreator {
tasks: Vec<CreateTableTask>,
physical_table_id: TableId,
) -> Self {
let table_ids_from_tasks = tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
let len = table_ids_from_tasks.len();
let len = tasks.len();

Self {
data: CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_from_tasks,
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
Expand All @@ -334,10 +354,6 @@ pub struct CreateTablesData {
cluster_id: ClusterId,
state: CreateTablesState,
tasks: Vec<CreateTableTask>,
table_ids_from_tasks: Vec<TableId>,
// Because the table_id is allocated before entering the distributed lock,
// it needs to recheck if the table exists when creating a table.
// If it does exist, then the table_id needs to be replaced with the existing one.
table_ids_already_exists: Vec<Option<TableId>>,
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
Expand All @@ -360,43 +376,34 @@ impl CreateTablesData {
self.table_ids_already_exists = table_ids_already_exists;
}

fn table_ids_already_exists(&self) -> &[Option<TableId>] {
&self.table_ids_already_exists
}

fn is_all_tables_not_exists(&self) -> bool {
self.table_ids_already_exists.iter().all(Option::is_none)
}

pub fn real_table_ids(&self) -> Vec<TableId> {
self.table_ids_from_tasks
.iter()
.zip(self.table_ids_already_exists.iter())
.map(|(table_id_from_task, table_id_already_exists)| {
table_id_already_exists.unwrap_or(*table_id_from_task)
})
.collect::<Vec<_>>()
}

fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
self.tasks
.iter()
.map(|task| &task.create_table)
.collect::<Vec<_>>()
}

fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
/// Returns the remaining tasks.
/// The length of tasks must be greater than 0.
fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
self.tasks
.iter()
.map(|task| {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| RegionId::new(table_info.ident.table_id, *region_number))
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
(table_info, table_route)
.zip(self.table_ids_already_exists.iter())
.flat_map(|(task, table_id)| {
if table_id.is_none() {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| {
RegionId::new(table_info.ident.table_id, *region_number)
})
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
Some((table_info, table_route))
} else {
None
}
})
.collect::<Vec<_>>()
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TableMetadataAllocator {
}
}

async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
let table_id = if let Some(table_id) = &task.create_table.table_id {
let table_id = table_id.id;

Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod create_logical_tables;
mod create_table;
Loading