Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ChangeColumnType for AlterKind #3757

Merged
merged 10 commits into from
Apr 24, 2024
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl AlterTableProcedure {
AlterKind::RenameTable { new_table_name } => {
new_info.name = new_table_name.to_string();
}
AlterKind::DropColumns { .. } => {}
AlterKind::DropColumns { .. } | AlterKind::ModifyColumns { .. } => {}
}

Ok(new_info)
Expand Down
80 changes: 63 additions & 17 deletions src/store-api/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use api::helper::ColumnDataTypeWrapper;
use api::v1::region::RegionColumnDef;
use api::v1::SemanticType;
use api::v1::{ColumnDef, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
Expand All @@ -33,7 +33,7 @@ use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};

use crate::region_request::{AddColumn, AddColumnLocation, AlterKind};
use crate::region_request::{AddColumn, AddColumnLocation, AlterKind, ModifyColumn};
use crate::storage::consts::is_internal_column;
use crate::storage::{ColumnId, RegionId};

Expand Down Expand Up @@ -61,18 +61,7 @@ impl fmt::Debug for ColumnMetadata {
}

impl ColumnMetadata {
/// Construct `Self` from protobuf struct [RegionColumnDef]
pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
let column_id = column_def.column_id;

let column_def = column_def
.column_def
.context(InvalidRawRegionRequestSnafu {
err: "column_def is absent",
})?;

let semantic_type = column_def.semantic_type();

fn inner_try_from_column_def(column_def: ColumnDef) -> Result<ColumnSchema> {
let default_constrain = if column_def.default_constraint.is_empty() {
None
} else {
Expand All @@ -86,9 +75,22 @@ impl ColumnMetadata {
column_def.datatype_extension.clone(),
)
.into();
let column_schema = ColumnSchema::new(column_def.name, data_type, column_def.is_nullable)
ColumnSchema::new(column_def.name, data_type, column_def.is_nullable)
.with_default_constraint(default_constrain)
.context(ConvertDatatypesSnafu)?;
.context(ConvertDatatypesSnafu)
}

/// Construct `Self` from protobuf struct [RegionColumnDef]
pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
let column_id = column_def.column_id;
let column_def = column_def
.column_def
.context(InvalidRawRegionRequestSnafu {
err: "column_def is absent",
})?;
let semantic_type = column_def.semantic_type();
let column_schema = Self::inner_try_from_column_def(column_def)?;

Ok(Self {
column_schema,
semantic_type,
Expand Down Expand Up @@ -535,6 +537,7 @@ impl RegionMetadataBuilder {
match kind {
AlterKind::AddColumns { columns } => self.add_columns(columns)?,
AlterKind::DropColumns { names } => self.drop_columns(&names),
AlterKind::ModifyColumns { columns } => self.modify_columns(columns),
}
Ok(self)
}
Expand Down Expand Up @@ -615,6 +618,25 @@ impl RegionMetadataBuilder {
self.column_metadatas
.retain(|col| !name_set.contains(&col.column_schema.name));
}

/// Modifies columns to the metadata if exist.
fn modify_columns(&mut self, columns: Vec<ModifyColumn>) {
let mut modify_map: HashMap<_, _> = columns
.into_iter()
.map(
|ModifyColumn {
column_name,
target_type,
}| (column_name, target_type),
)
.collect();

for column_meta in self.column_metadatas.iter_mut() {
if let Some(target_type) = modify_map.remove(&column_meta.column_schema.name) {
column_meta.column_schema.data_type = target_type;
}
}
}
}

/// Fields skipped in serialization.
Expand Down Expand Up @@ -707,6 +729,13 @@ pub enum MetadataError {

#[snafu(display("Time index column not found"))]
TimeIndexNotFound { location: Location },

#[snafu(display("Modify column {} not exists in region: {}", column_name, region_id))]
ModifyColumnNotFound {
column_name: String,
region_id: RegionId,
location: Location,
},
}

impl ErrorExt for MetadataError {
Expand Down Expand Up @@ -1112,7 +1141,7 @@ mod test {
let metadata = builder.build().unwrap();
check_columns(&metadata, &["a", "b", "f", "c", "d"]);

let mut builder = RegionMetadataBuilder::from_existing(metadata);
let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
builder
.alter(AlterKind::DropColumns {
names: vec!["a".to_string()],
Expand All @@ -1121,6 +1150,23 @@ mod test {
// Build returns error as the primary key contains a.
let err = builder.build().unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());

let mut builder = RegionMetadataBuilder::from_existing(metadata);
builder
.alter(AlterKind::ModifyColumns {
columns: vec![ModifyColumn {
column_name: "b".to_string(),
target_type: ConcreteDataType::string_datatype(),
}],
})
.unwrap();
let metadata = builder.build().unwrap();
check_columns(&metadata, &["a", "b", "f", "c", "d"]);
let b_type = metadata
.column_by_name("b")
.map(|column_meta| column_meta.column_schema.data_type.clone())
.unwrap();
assert_eq!(ConcreteDataType::string_datatype(), b_type);
KKould marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
Expand Down
128 changes: 127 additions & 1 deletion src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ use api::v1::region::{
};
use api::v1::{self, Rows, SemanticType};
pub use common_base::AffectedRows;
use datatypes::data_type::ConcreteDataType;
use snafu::{ensure, OptionExt};
use strum::IntoStaticStr;

use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, MetadataError,
RegionMetadata, Result,
ModifyColumnNotFoundSnafu, RegionMetadata, Result,
};
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};
Expand Down Expand Up @@ -332,6 +333,11 @@ pub enum AlterKind {
/// Name of columns to drop.
names: Vec<String>,
},
/// Modify columns form the region, only fields are allowed to modify.
ModifyColumns {
/// Columns to modify.
columns: Vec<ModifyColumn>,
},
}

impl AlterKind {
Expand All @@ -350,6 +356,11 @@ impl AlterKind {
Self::validate_column_to_drop(name, metadata)?;
}
}
AlterKind::ModifyColumns { columns } => {
for col_to_modify in columns {
col_to_modify.validate(metadata)?;
}
}
}
Ok(())
}
Expand All @@ -364,6 +375,9 @@ impl AlterKind {
AlterKind::DropColumns { names } => names
.iter()
.any(|name| metadata.column_by_name(name).is_some()),
AlterKind::ModifyColumns { columns } => columns
.iter()
.any(|col_to_modify| col_to_modify.need_alter(metadata)),
}
}

Expand Down Expand Up @@ -501,6 +515,60 @@ impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
}
}

/// Modify a column.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ModifyColumn {
/// Schema of the column to modify.
pub column_name: String,
/// Column will be modified to this type.
pub target_type: ConcreteDataType,
}

impl ModifyColumn {
/// Returns an error if the column to modify is invalid.
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
let column_meta =
metadata
.column_by_name(&self.column_name)
.context(ModifyColumnNotFoundSnafu {
column_name: self.column_name.clone(),
region_id: metadata.region_id,
})?;
KKould marked this conversation as resolved.
Show resolved Hide resolved

ensure!(
!matches!(
column_meta.semantic_type,
SemanticType::Timestamp | SemanticType::Tag
),
KKould marked this conversation as resolved.
Show resolved Hide resolved
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: "'timestamp' or 'tag' column cannot change type".to_string()
}
evenyag marked this conversation as resolved.
Show resolved Hide resolved
);
ensure!(
column_meta
.column_schema
.data_type
.can_arrow_type_cast_to(&self.target_type),
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column '{}' cannot be cast automatically to type '{}'",
self.column_name, self.target_type
),
}
);

Ok(())
}

/// Returns true if no column to modify to the region.
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
debug_assert!(self.validate(metadata).is_ok());
metadata.column_by_name(&self.column_name).is_some()
}
}

#[derive(Debug, Default)]
pub struct RegionFlushRequest {
pub row_group_size: Option<usize>,
Expand Down Expand Up @@ -678,6 +746,15 @@ mod tests {
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_1",
ConcreteDataType::boolean_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![2]);
builder.build().unwrap()
}
Expand Down Expand Up @@ -790,6 +867,55 @@ mod tests {
assert!(kind.need_alter(&metadata));
}

#[test]
fn test_validate_modify_column() {
let metadata = new_metadata();
AlterKind::ModifyColumns {
columns: vec![ModifyColumn {
column_name: "xxxx".to_string(),
target_type: ConcreteDataType::string_datatype(),
}],
}
.validate(&metadata)
.unwrap_err();

AlterKind::ModifyColumns {
columns: vec![ModifyColumn {
column_name: "field_1".to_string(),
target_type: ConcreteDataType::date_datatype(),
}],
}
.validate(&metadata)
.unwrap_err();

AlterKind::ModifyColumns {
columns: vec![ModifyColumn {
column_name: "ts".to_string(),
target_type: ConcreteDataType::date_datatype(),
}],
}
.validate(&metadata)
.unwrap_err();

AlterKind::ModifyColumns {
columns: vec![ModifyColumn {
column_name: "tag_0".to_string(),
target_type: ConcreteDataType::date_datatype(),
}],
}
.validate(&metadata)
.unwrap_err();

let kind = AlterKind::ModifyColumns {
columns: vec![ModifyColumn {
column_name: "field_0".to_string(),
target_type: ConcreteDataType::int32_datatype(),
}],
};
kind.validate(&metadata).unwrap();
assert!(kind.need_alter(&metadata));
}

#[test]
fn test_validate_schema_version() {
let mut metadata = new_metadata();
Expand Down
12 changes: 12 additions & 0 deletions src/table/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Not allowed to modify index column '{}' from table {}",
column_name,
table_name
))]
ModifyColumnInIndex {
KKould marked this conversation as resolved.
Show resolved Hide resolved
column_name: String,
table_name: String,
location: Location,
},

#[snafu(display(
"Failed to build column descriptor for table: {}, column: {}",
table_name,
Expand Down Expand Up @@ -147,6 +158,7 @@ impl ErrorExt for Error {
| Error::SchemaConversion { .. }
| Error::TableProjection { .. } => StatusCode::EngineExecuteQuery,
Error::RemoveColumnInIndex { .. }
| Error::ModifyColumnInIndex { .. }
| Error::BuildColumnDescriptor { .. }
| Error::InvalidAlterRequest { .. } => StatusCode::InvalidArguments,
Error::TablesRecordBatch { .. } | Error::DuplicatedExecuteCall { .. } => {
Expand Down
Loading