diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 0ed19db600be..de6083200bbc 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -150,6 +150,7 @@ impl DataRegion { }) .collect::>()?; + info!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}"); // assemble alter request let alter_request = RegionRequest::Alter(RegionAlterRequest { schema_version: version, diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 76066ab97a3b..0a321dc8cbd8 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use common_telemetry::{error, info}; use snafu::{OptionExt, ResultExt}; @@ -92,18 +92,27 @@ impl MetricEngineInner { let metadata_region_id = to_metadata_region_id(physical_region_id); let mut columns_to_add = vec![]; + // columns that already exist in physical region + let mut existing_columns = vec![]; + + let pre_existing_physical_columns = self + .data_region + .physical_columns(physical_region_id) + .await?; + + let pre_exist_names = pre_existing_physical_columns + .iter() + .map(|col| col.column_schema.name.clone()) + .collect::>(); + + // check pre-existing physical columns so if any columns to add is already exist, + // we can skip it in physical alter operation + // (but still need to update them in logical alter operation) for col in &columns { - if self - .metadata_region - .column_semantic_type( - metadata_region_id, - logical_region_id, - &col.column_metadata.column_schema.name, - ) - .await? - .is_none() - { + if !pre_exist_names.contains(&col.column_metadata.column_schema.name) { columns_to_add.push(col.column_metadata.clone()); + } else { + existing_columns.push(col.column_metadata.clone()); } } @@ -111,17 +120,36 @@ impl MetricEngineInner { let data_region_id = to_data_region_id(physical_region_id); self.add_columns_to_physical_data_region( data_region_id, - metadata_region_id, logical_region_id, - columns_to_add, + &mut columns_to_add, ) .await?; + // check physical region id again, since it may have been altered and column ids might be different than + // the one we have + let after_alter_phy_table = self + .data_region + .physical_columns(physical_region_id) + .await?; + let after_alter_cols = after_alter_phy_table + .iter() + .map(|col| (col.column_schema.name.clone(), col.clone())) + .collect::>(); + // register columns to logical region + // we need to use modfied column metadata from physical region, since it may have been altered(especialy column id) for col in columns { - self.metadata_region - .add_column(metadata_region_id, logical_region_id, &col.column_metadata) - .await?; + if let Some(metadata) = after_alter_cols.get(&col.column_metadata.column_schema.name) { + self.metadata_region + .add_column(metadata_region_id, logical_region_id, metadata) + .await?; + } else { + // TODO(discord9): consider make this a hard error? + error!( + "Column {} is not found in physical region", + col.column_metadata.column_schema.name + ); + } } // invalid logical column cache diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 9ca89248dcac..75cefcefc0a1 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -212,11 +212,17 @@ impl MetricEngineInner { self.add_columns_to_physical_data_region( data_region_id, - metadata_region_id, logical_region_id, - new_columns, + &mut new_columns, ) .await?; + + // register columns to metadata region + for col in &new_columns { + self.metadata_region + .add_column(metadata_region_id, logical_region_id, col) + .await?; + } } // register logical region to metadata region @@ -260,27 +266,25 @@ impl MetricEngineInner { Ok(data_region_id) } - /// Execute corresponding alter requests to mito region. New added columns' [ColumnMetadata] will be - /// cloned into `added_columns`. + /// Execute corresponding alter requests to mito region. After calling this, `new_columns` will be assign a new column id + /// which should be correct if the following requirements are met: + /// + /// # NOTE + /// + /// `new_columns` MUST NOT pre-exist in the physical region. Or the results will be wrong column id for the new columns. + /// + /// TODO(discord9): change this to actually return the actually added physical columns pub(crate) async fn add_columns_to_physical_data_region( &self, data_region_id: RegionId, - metadata_region_id: RegionId, logical_region_id: RegionId, - mut new_columns: Vec, + mut new_columns: &mut [ColumnMetadata], ) -> Result<()> { // alter data region self.data_region .add_columns(data_region_id, &mut new_columns) .await?; - // register columns to metadata region - for col in &new_columns { - self.metadata_region - .add_column(metadata_region_id, logical_region_id, col) - .await?; - } - // safety: previous step has checked this self.state.write().unwrap().add_physical_columns( data_region_id, @@ -291,6 +295,23 @@ impl MetricEngineInner { info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}"); PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _); + // correct the column id + let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?; + let after_alter_physical_schema_map = after_alter_physical_schema + .into_iter() + .map(|metadata| (metadata.column_schema.name.clone(), metadata)) + .collect::>(); + + for col in new_columns.iter_mut() { + let column_metadata = after_alter_physical_schema_map + .get(&col.column_schema.name) + .with_context(|| ColumnNotFoundSnafu { + name: &col.column_schema.name, + region_id: data_region_id, + })?; + *col = column_metadata.clone(); + } + Ok(()) } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index b02fa3de517b..2ecce12e17d6 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -211,6 +211,7 @@ impl MetadataRegion { } /// Check if the given column exists. Return the semantic type if exists. + #[allow(unused)] pub async fn column_semantic_type( &self, physical_region_id: RegionId, diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index f295a2c9b3bb..3e916502e1d2 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -18,7 +18,7 @@ mod error; mod planner; use std::any::Any; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use async_trait::async_trait; @@ -35,6 +35,7 @@ use common_telemetry::tracing; use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::ResolvedTableReference; use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp}; use datatypes::prelude::VectorRef; @@ -289,6 +290,45 @@ impl DatafusionQueryEngine { { analyzed_plan.clone() } else { + struct TableSourceSchema {} + impl datafusion_common::tree_node::TreeNodeVisitor<'_> for TableSourceSchema { + type Node = LogicalPlan; + fn f_down( + &mut self, + node: &Self::Node, + ) -> datafusion_common::Result + { + if let LogicalPlan::TableScan(table_scan) = node { + let schema = table_scan.source.schema(); + + // found field in outter schema but not in inner schema + let outer_fields: BTreeSet<_> = + table_scan.projected_schema.fields().into_iter().collect(); + let inner_fields = schema.fields().iter().collect::>(); + + let diff = outer_fields + .difference(&inner_fields) + .collect::>(); + if !diff.is_empty() { + common_telemetry::error!("TableScan.source.schema: {:?}", &schema); + common_telemetry::error!( + "Projected==table_source?: {:?}", + schema.as_ref() == table_scan.projected_schema.as_arrow() + ); + common_telemetry::error!("logical - phy: {:?}", diff); + common_telemetry::error!( + "phy - logical: {:?}", + inner_fields + .difference(&outer_fields) + .collect::>() + ); + } + } + Ok(TreeNodeRecursion::Continue) + } + } + let mut table_source_schema = TableSourceSchema {}; + analyzed_plan.visit(&mut table_source_schema).unwrap(); state .optimizer() .optimize(analyzed_plan, state, |_, _| {}) diff --git a/tests/cases/standalone/common/alter/alter_table.result b/tests/cases/standalone/common/alter/alter_table.result index bd503fe866c8..0b8af61c9800 100644 --- a/tests/cases/standalone/common/alter/alter_table.result +++ b/tests/cases/standalone/common/alter/alter_table.result @@ -1,4 +1,9 @@ -CREATE TABLE test_alt_table(h INTEGER, i INTEGER, j TIMESTAMP TIME INDEX, PRIMARY KEY (h, i)); +CREATE TABLE test_alt_table( + h INTEGER, + i INTEGER, + j TIMESTAMP TIME INDEX, + PRIMARY KEY (h, i) +); Affected Rows: 0 @@ -12,13 +17,20 @@ DESC TABLE test_alt_table; | j | TimestampMillisecond | PRI | NO | | TIMESTAMP | +--------+----------------------+-----+------+---------+---------------+ -INSERT INTO test_alt_table VALUES (1, 1, 0), (2, 2, 1); +INSERT INTO + test_alt_table +VALUES + (1, 1, 0), + (2, 2, 1); Affected Rows: 2 -- TODO: It may result in an error if `k` is with type INTEGER. -- Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Int32 but found Utf8 at column index 3 -ALTER TABLE test_alt_table ADD COLUMN k STRING PRIMARY KEY; +ALTER TABLE + test_alt_table +ADD + COLUMN k STRING PRIMARY KEY; Affected Rows: 0 @@ -33,7 +45,10 @@ DESC TABLE test_alt_table; | k | String | PRI | YES | | TAG | +--------+----------------------+-----+------+---------+---------------+ -SELECT * FROM test_alt_table; +SELECT + * +FROM + test_alt_table; +---+---+-------------------------+---+ | h | i | j | k | @@ -42,7 +57,12 @@ SELECT * FROM test_alt_table; | 2 | 2 | 1970-01-01T00:00:00.001 | | +---+---+-------------------------+---+ -SELECT * FROM test_alt_table WHERE i = 1; +SELECT + * +FROM + test_alt_table +WHERE + i = 1; +---+---+---------------------+---+ | h | i | j | k | @@ -51,7 +71,10 @@ SELECT * FROM test_alt_table WHERE i = 1; +---+---+---------------------+---+ -- SQLNESS ARG restart=true -ALTER TABLE test_alt_table ADD COLUMN m INTEGER; +ALTER TABLE + test_alt_table +ADD + COLUMN m INTEGER; Affected Rows: 0 @@ -71,3 +94,102 @@ DROP TABLE test_alt_table; Affected Rows: 0 +-- to test if same name column can be added +CREATE TABLE phy (ts timestamp time index, val double) engine = metric with ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE t1 ( + ts timestamp time index, + val double, + host string primary key +) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +INSERT INTO + t1 +VALUES + ('host1', 0, 1), + ('host2', 1, 0,); + +Affected Rows: 2 + +SELECT + * +FROM + t1; + ++-------+-------------------------+-----+ +| host | ts | val | ++-------+-------------------------+-----+ +| host2 | 1970-01-01T00:00:00.001 | 0.0 | +| host1 | 1970-01-01T00:00:00 | 1.0 | ++-------+-------------------------+-----+ + +CREATE TABLE t2 ( + ts timestamp time index, + job string primary key, + val double +) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +ALTER TABLE + t1 +ADD + COLUMN `at` STRING; + +Affected Rows: 0 + +ALTER TABLE + t2 +ADD + COLUMN at3 STRING; + +Affected Rows: 0 + +ALTER TABLE + t2 +ADD + COLUMN `at` STRING; + +Affected Rows: 0 + +ALTER TABLE + t2 +ADD + COLUMN at2 STRING; + +Affected Rows: 0 + +INSERT INTO + t2 +VALUES + ("loc_1", "loc_2", "loc_3", 'job1', 0, 1); + +Affected Rows: 1 + +SELECT + * +FROM + t2; + ++-------+-------+-------+------+---------------------+-----+ +| at | at2 | at3 | job | ts | val | ++-------+-------+-------+------+---------------------+-----+ +| loc_1 | loc_2 | loc_3 | job1 | 1970-01-01T00:00:00 | 1.0 | ++-------+-------+-------+------+---------------------+-----+ + +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE t2; + +Affected Rows: 0 + +DROP TABLE phy; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/alter_table.sql b/tests/cases/standalone/common/alter/alter_table.sql index d77e66cc4570..d560e2f208cf 100644 --- a/tests/cases/standalone/common/alter/alter_table.sql +++ b/tests/cases/standalone/common/alter/alter_table.sql @@ -1,22 +1,107 @@ -CREATE TABLE test_alt_table(h INTEGER, i INTEGER, j TIMESTAMP TIME INDEX, PRIMARY KEY (h, i)); +CREATE TABLE test_alt_table( + h INTEGER, + i INTEGER, + j TIMESTAMP TIME INDEX, + PRIMARY KEY (h, i) +); DESC TABLE test_alt_table; -INSERT INTO test_alt_table VALUES (1, 1, 0), (2, 2, 1); +INSERT INTO + test_alt_table +VALUES + (1, 1, 0), + (2, 2, 1); -- TODO: It may result in an error if `k` is with type INTEGER. -- Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Int32 but found Utf8 at column index 3 -ALTER TABLE test_alt_table ADD COLUMN k STRING PRIMARY KEY; +ALTER TABLE + test_alt_table +ADD + COLUMN k STRING PRIMARY KEY; DESC TABLE test_alt_table; -SELECT * FROM test_alt_table; +SELECT + * +FROM + test_alt_table; -SELECT * FROM test_alt_table WHERE i = 1; +SELECT + * +FROM + test_alt_table +WHERE + i = 1; -- SQLNESS ARG restart=true -ALTER TABLE test_alt_table ADD COLUMN m INTEGER; +ALTER TABLE + test_alt_table +ADD + COLUMN m INTEGER; DESC TABLE test_alt_table; DROP TABLE test_alt_table; + +-- to test if same name column can be added +CREATE TABLE phy (ts timestamp time index, val double) engine = metric with ("physical_metric_table" = ""); + +CREATE TABLE t1 ( + ts timestamp time index, + val double, + host string primary key +) engine = metric with ("on_physical_table" = "phy"); + +INSERT INTO + t1 +VALUES + ('host1', 0, 1), + ('host2', 1, 0,); + +SELECT + * +FROM + t1; + +CREATE TABLE t2 ( + ts timestamp time index, + job string primary key, + val double +) engine = metric with ("on_physical_table" = "phy"); + +ALTER TABLE + t1 +ADD + COLUMN `at` STRING; + +ALTER TABLE + t2 +ADD + COLUMN at3 STRING; + +ALTER TABLE + t2 +ADD + COLUMN `at` STRING; + +ALTER TABLE + t2 +ADD + COLUMN at2 STRING; + +INSERT INTO + t2 +VALUES + ("loc_1", "loc_2", "loc_3", 'job1', 0, 1); + +SELECT + * +FROM + t2; + +DROP TABLE t1; + +DROP TABLE t2; + +DROP TABLE phy; \ No newline at end of file