Skip to content

Commit

Permalink
fix: alter table add column id alloc mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 authored and WenyXu committed Nov 12, 2024
1 parent 68b5bc0 commit e291130
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl DataRegion {
})
.collect::<Result<_>>()?;

info!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
// assemble alter request
let alter_request = RegionRequest::Alter(RegionAlterRequest {
schema_version: version,
Expand Down
60 changes: 44 additions & 16 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -92,36 +92,64 @@ impl MetricEngineInner {

let metadata_region_id = to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
// columns that already exist in physical region
let mut existing_columns = vec![];

let pre_existing_physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;

let pre_exist_names = pre_existing_physical_columns
.iter()
.map(|col| col.column_schema.name.clone())
.collect::<HashSet<_>>();

// check pre-existing physical columns so if any columns to add is already exist,
// we can skip it in physical alter operation
// (but still need to update them in logical alter operation)
for col in &columns {
if self
.metadata_region
.column_semantic_type(
metadata_region_id,
logical_region_id,
&col.column_metadata.column_schema.name,
)
.await?
.is_none()
{
if !pre_exist_names.contains(&col.column_metadata.column_schema.name) {
columns_to_add.push(col.column_metadata.clone());
} else {
existing_columns.push(col.column_metadata.clone());
}
}

// alter data region
let data_region_id = to_data_region_id(physical_region_id);
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
columns_to_add,
&mut columns_to_add,
)
.await?;

// check physical region id again, since it may have been altered and column ids might be different than
// the one we have
let after_alter_phy_table = self
.data_region
.physical_columns(physical_region_id)
.await?;
let after_alter_cols = after_alter_phy_table
.iter()
.map(|col| (col.column_schema.name.clone(), col.clone()))
.collect::<HashMap<_, _>>();

// register columns to logical region
// we need to use modfied column metadata from physical region, since it may have been altered(especialy column id)
for col in columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, &col.column_metadata)
.await?;
if let Some(metadata) = after_alter_cols.get(&col.column_metadata.column_schema.name) {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, metadata)
.await?;
} else {
// TODO(discord9): consider make this a hard error?
error!(
"Column {} is not found in physical region",
col.column_metadata.column_schema.name
);
}
}

// invalid logical column cache
Expand Down
47 changes: 34 additions & 13 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,17 @@ impl MetricEngineInner {

self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
new_columns,
&mut new_columns,
)
.await?;

// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}
}

// register logical region to metadata region
Expand Down Expand Up @@ -260,27 +266,25 @@ impl MetricEngineInner {
Ok(data_region_id)
}

/// Execute corresponding alter requests to mito region. New added columns' [ColumnMetadata] will be
/// cloned into `added_columns`.
/// Execute corresponding alter requests to mito region. After calling this, `new_columns` will be assign a new column id
/// which should be correct if the following requirements are met:
///
/// # NOTE
///
/// `new_columns` MUST NOT pre-exist in the physical region. Or the results will be wrong column id for the new columns.
///
/// TODO(discord9): change this to actually return the actually added physical columns
pub(crate) async fn add_columns_to_physical_data_region(
&self,
data_region_id: RegionId,
metadata_region_id: RegionId,
logical_region_id: RegionId,
mut new_columns: Vec<ColumnMetadata>,
mut new_columns: &mut [ColumnMetadata],
) -> Result<()> {
// alter data region
self.data_region
.add_columns(data_region_id, &mut new_columns)
.await?;

// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}

// safety: previous step has checked this
self.state.write().unwrap().add_physical_columns(
data_region_id,
Expand All @@ -291,6 +295,23 @@ impl MetricEngineInner {
info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);

// correct the column id
let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?;
let after_alter_physical_schema_map = after_alter_physical_schema
.into_iter()
.map(|metadata| (metadata.column_schema.name.clone(), metadata))
.collect::<HashMap<_, _>>();

for col in new_columns.iter_mut() {
let column_metadata = after_alter_physical_schema_map
.get(&col.column_schema.name)
.with_context(|| ColumnNotFoundSnafu {
name: &col.column_schema.name,
region_id: data_region_id,
})?;
*col = column_metadata.clone();
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/metric-engine/src/metadata_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl MetadataRegion {
}

/// Check if the given column exists. Return the semantic type if exists.
#[allow(unused)]
pub async fn column_semantic_type(
&self,
physical_region_id: RegionId,
Expand Down
42 changes: 41 additions & 1 deletion src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod error;
mod planner;

use std::any::Any;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -35,6 +35,7 @@ use common_telemetry::tracing;
use datafusion::physical_plan::analyze::AnalyzeExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::ResolvedTableReference;
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp};
use datatypes::prelude::VectorRef;
Expand Down Expand Up @@ -289,6 +290,45 @@ impl DatafusionQueryEngine {
{
analyzed_plan.clone()
} else {
struct TableSourceSchema {}
impl datafusion_common::tree_node::TreeNodeVisitor<'_> for TableSourceSchema {
type Node = LogicalPlan;
fn f_down(
&mut self,
node: &Self::Node,
) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
{
if let LogicalPlan::TableScan(table_scan) = node {
let schema = table_scan.source.schema();

// found field in outter schema but not in inner schema
let outer_fields: BTreeSet<_> =
table_scan.projected_schema.fields().into_iter().collect();
let inner_fields = schema.fields().iter().collect::<BTreeSet<_>>();

let diff = outer_fields
.difference(&inner_fields)
.collect::<BTreeSet<_>>();
if !diff.is_empty() {
common_telemetry::error!("TableScan.source.schema: {:?}", &schema);
common_telemetry::error!(
"Projected==table_source?: {:?}",
schema.as_ref() == table_scan.projected_schema.as_arrow()
);
common_telemetry::error!("logical - phy: {:?}", diff);
common_telemetry::error!(
"phy - logical: {:?}",
inner_fields
.difference(&outer_fields)
.collect::<BTreeSet<_>>()
);
}
}
Ok(TreeNodeRecursion::Continue)
}
}
let mut table_source_schema = TableSourceSchema {};
analyzed_plan.visit(&mut table_source_schema).unwrap();
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
Expand Down
Loading

0 comments on commit e291130

Please sign in to comment.