Skip to content

Commit

Permalink
feat: group requests by peer (#3619)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Apr 1, 2024
1 parent 4a5bb69 commit 0eb023b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
24 changes: 9 additions & 15 deletions src/common/meta/src/ddl/alter_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
use crate::rpc::router::find_leaders;
use crate::{cache_invalidator, metrics, ClusterId};

pub struct AlterLogicalTablesProcedure {
Expand Down Expand Up @@ -118,20 +118,14 @@ impl AlterLogicalTablesProcedure {

for peer in leaders {
let requester = self.context.datanode_manager.datanode(&peer).await;
let region_numbers = find_leader_regions(&physical_table_route.region_routes, &peer);

for region_number in region_numbers {
let request = self.make_request(region_number)?;
let peer = peer.clone();
let requester = requester.clone();

alter_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}
let request = self.make_request(&peer, &physical_table_route.region_routes)?;

alter_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}

// Collects responses from all the alter region tasks.
Expand Down
30 changes: 22 additions & 8 deletions src/common/meta/src/ddl/alter_logical_tables/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ use api::v1::region::{
RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::RegionId;

use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::Result;
use crate::key::table_info::TableInfoValue;
use crate::peer::Peer;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, RegionRoute};

impl AlterLogicalTablesProcedure {
pub(crate) fn make_request(&self, region_number: RegionNumber) -> Result<RegionRequest> {
let alter_requests = self.make_alter_region_requests(region_number)?;
pub(crate) fn make_request(
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
let alter_requests = self.make_alter_region_requests(peer, region_routes)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
Expand All @@ -40,17 +46,25 @@ impl AlterLogicalTablesProcedure {
Ok(request)
}

fn make_alter_region_requests(&self, region_number: RegionNumber) -> Result<AlterRequests> {
let mut requests = Vec::with_capacity(self.data.tasks.len());
fn make_alter_region_requests(
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<AlterRequests> {
let tasks = &self.data.tasks;
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
for (task, table) in self
.data
.tasks
.iter()
.zip(self.data.table_info_values.iter())
{
let region_id = RegionId::new(table.table_info.ident.table_id, region_number);
let request = self.make_alter_region_request(region_id, task, table)?;
requests.push(request);
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
let request = self.make_alter_region_request(region_id, task, table)?;
requests.push(request);
}
}

Ok(AlterRequests { requests })
Expand Down

0 comments on commit 0eb023b

Please sign in to comment.