Skip to content

Commit

Permalink
chore: avoid sending create table requests for already existing tables
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 13, 2025
1 parent f069ea0 commit 5aa7b8b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
13 changes: 11 additions & 2 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::warn;
use common_telemetry::{info, warn};
use futures_util::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -143,7 +143,10 @@ impl CreateLogicalTablesProcedure {

for peer in leaders {
let requester = self.context.node_manager.datanode(&peer).await;
let request = self.make_request(&peer, region_routes)?;
let Some(request) = self.make_request(&peer, region_routes)? else {
info!("no region request to send to datanode {}", peer);
continue;
};

create_region_tasks.push(async move {
requester
Expand All @@ -153,6 +156,12 @@ impl CreateLogicalTablesProcedure {
});
}

if create_region_tasks.is_empty() {
info!("no region request to send to datanodes");
self.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(true));
}

// Collects response from datanodes.
let phy_raw_schemas = join_all(create_region_tasks)
.await
Expand Down
16 changes: 12 additions & 4 deletions src/common/meta/src/ddl/create_logical_tables/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@ impl CreateLogicalTablesProcedure {
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
) -> Result<Option<RegionRequest>> {
let tasks = &self.data.tasks;
let table_ids_already_exists = &self.data.table_ids_already_exists;
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
for task in tasks {
for (task, table_id_already_exists) in tasks.iter().zip(table_ids_already_exists) {
if table_id_already_exists.is_some() {
continue;
}
let create_table_expr = &task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
Expand All @@ -51,13 +55,17 @@ impl CreateLogicalTablesProcedure {
}
}

Ok(RegionRequest {
if requests.is_empty() {
return Ok(None);
}

Ok(Some(RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Creates(CreateRequests { requests })),
})
}))
}

fn create_region_request_builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ impl CreateLogicalTablesProcedure {

pub(crate) async fn create_logical_tables_metadata(&mut self) -> Result<Vec<TableId>> {
let remaining_tasks = self.data.remaining_tasks();
if remaining_tasks.is_empty() {
info!("no logical tables to create metadata");
return Ok(vec![]);
}
let num_tables = remaining_tasks.len();

if num_tables > 0 {
Expand Down

0 comments on commit 5aa7b8b

Please sign in to comment.