Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle insert default value #5307

Merged
merged 14 commits into from
Jan 14, 2025
17 changes: 17 additions & 0 deletions src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ impl ColumnSchema {
self.default_constraint.as_ref()
}

/// Check if the default constraint is a impure function.
pub fn is_default_impure(&self) -> bool {
self.default_constraint
.as_ref()
.map(|c| c.is_function())
.unwrap_or(false)
}

#[inline]
pub fn metadata(&self) -> &Metadata {
&self.metadata
Expand Down Expand Up @@ -290,6 +298,15 @@ impl ColumnSchema {
}
}

/// Creates an impure default value for this column, only if it have a impure default constraint.
/// Otherwise, returns `Ok(None)`.
pub fn create_impure_default(&self) -> Result<Option<Value>> {
match &self.default_constraint {
Some(c) => c.create_impure_default(&self.data_type),
None => Ok(None),
}
}

/// Retrieves the fulltext options for the column.
pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
match self.metadata.get(FULLTEXT_KEY) {
Expand Down
51 changes: 51 additions & 0 deletions src/datatypes/src/schema/constraint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,63 @@ impl ColumnDefaultConstraint {
}
}

/// Only create default vector if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
pub fn create_impure_default_vector(
&self,
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<Option<VectorRef>> {
assert!(num_rows > 0);

match self {
ColumnDefaultConstraint::Function(expr) => {
// Functions should also ensure its return value is not null when
// is_nullable is true.
match &expr[..] {
// TODO(dennis): we only supports current_timestamp right now,
// it's better to use a expression framework in future.
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => {
create_current_timestamp_vector(data_type, num_rows).map(Some)
}
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
ColumnDefaultConstraint::Value(_) => Ok(None),
}
}

/// Only create default value if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
pub fn create_impure_default(&self, data_type: &ConcreteDataType) -> Result<Option<Value>> {
match self {
ColumnDefaultConstraint::Function(expr) => {
// Functions should also ensure its return value is not null when
// is_nullable is true.
match &expr[..] {
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => {
create_current_timestamp(data_type).map(Some)
}
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
ColumnDefaultConstraint::Value(_) => Ok(None),
}
}

/// Returns true if this constraint might creates NULL.
fn maybe_null(&self) -> bool {
// Once we support more functions, we may return true if given function
// could return null.
matches!(self, ColumnDefaultConstraint::Value(Value::Null))
}

/// Returns true if this constraint is a function.
pub fn is_function(&self) -> bool {
matches!(self, ColumnDefaultConstraint::Function(_))
}
}

fn create_current_timestamp(data_type: &ConcreteDataType) -> Result<Value> {
Expand Down
62 changes: 51 additions & 11 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::{debug, trace};
use datatypes::value::Value;
use itertools::Itertools;
use snafu::{IntoError, OptionExt, ResultExt};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -178,14 +179,32 @@ impl Flownode for FlowWorkerManager {
.table_from_id(&table_id)
.await
.map_err(to_meta_err(snafu::location!()))?;
let default_vals = table_schema
.default_values
.iter()
.zip(table_schema.relation_desc.typ().column_types.iter())
.map(|(v, ty)| {
v.as_ref().and_then(|v| {
match v.create_default(ty.scalar_type(), ty.nullable()) {
Ok(v) => Some(v),
Err(err) => {
common_telemetry::error!(err; "Failed to create default value");
None
}
}
})
})
.collect_vec();

let table_types = table_schema
.relation_desc
.typ()
.column_types
.clone()
.into_iter()
.map(|t| t.scalar_type)
.collect_vec();
let table_col_names = table_schema.names;
let table_col_names = table_schema.relation_desc.names;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
Expand All @@ -202,31 +221,35 @@ impl Flownode for FlowWorkerManager {
.enumerate()
.map(|(i, name)| (&name.column_name, i)),
);
let fetch_order: Vec<usize> = table_col_names

let fetch_order: Vec<FetchFromRow> = table_col_names
.iter()
.map(|col_name| {
.zip(default_vals.into_iter())
.map(|(col_name, col_default_val)| {
name_to_col
.get(col_name)
.copied()
.map(FetchFromRow::Idx)
.or_else(|| col_default_val.clone().map(FetchFromRow::Default))
.with_context(|| UnexpectedSnafu {
err_msg: format!("Column not found: {}", col_name),
err_msg: format!(
"Column not found: {}, default_value: {:?}",
col_name, col_default_val
),
})
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
trace!("Reordering columns: {:?}", fetch_order)
}

trace!("Reordering columns: {:?}", fetch_order);
(table_types, fetch_order)
};

// TODO(discord9): use column instead of row
let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(|r| {
let r = repr::Row::from(r);
let reordered = fetch_order
.iter()
.map(|&i| r.inner[i].clone())
.collect_vec();
let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
repr::Row::new(reordered)
})
.map(|r| (r, now, 1))
Expand Down Expand Up @@ -258,3 +281,20 @@ impl Flownode for FlowWorkerManager {
Ok(Default::default())
}
}

/// Simple helper enum for fetching value from row with default value
#[derive(Debug, Clone)]
enum FetchFromRow {
Idx(usize),
Default(Value),
}

impl FetchFromRow {
/// Panic if idx is out of bound
fn fetch(&self, row: &repr::Row) -> Value {
match self {
FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
FetchFromRow::Default(v) => v.clone(),
}
}
}
2 changes: 1 addition & 1 deletion src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl FlownodeContext {
name: name.join("."),
})?;
let schema = self.table_source.table(name).await?;
Ok((id, schema))
Ok((id, schema.relation_desc))
}

/// Assign a global id to a table, if already assigned, return the existing global id
Expand Down
58 changes: 45 additions & 13 deletions src/flow/src/adapter/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use common_error::ext::BoxedError;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use datatypes::schema::ColumnDefaultConstraint;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;

Expand All @@ -27,18 +29,44 @@ use crate::error::{
};
use crate::repr::RelationDesc;

/// Table description, include relation desc and default values, which is the minimal information flow needed for table
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TableDesc {
pub relation_desc: RelationDesc,
pub default_values: Vec<Option<ColumnDefaultConstraint>>,
}

impl TableDesc {
pub fn new(
discord9 marked this conversation as resolved.
Show resolved Hide resolved
relation_desc: RelationDesc,
default_values: Vec<Option<ColumnDefaultConstraint>>,
) -> Self {
Self {
relation_desc,
default_values,
}
}

pub fn new_no_default(relation_desc: RelationDesc) -> Self {
Self {
relation_desc,
default_values: vec![],
}
}
}

/// Table source but for flow, provide table schema by table name/id
#[async_trait::async_trait]
pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error>;
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error>;

/// Get the table schema by table name
async fn table(&self, name: &TableName) -> Result<RelationDesc, Error> {
async fn table(&self, name: &TableName) -> Result<TableDesc, Error> {
let id = self.table_id_from_name(name).await?;
self.table_from_id(&id).await
}
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error>;
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error>;
}

/// managed table source information, query from table info manager and table name manager
Expand All @@ -51,7 +79,7 @@ pub struct ManagedTableSource {

#[async_trait::async_trait]
impl FlowTableSource for ManagedTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
Expand Down Expand Up @@ -175,7 +203,7 @@ impl ManagedTableSource {
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationDesc), Error> {
) -> Result<(TableName, TableDesc), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
Expand Down Expand Up @@ -219,7 +247,7 @@ pub(crate) mod test {
use crate::repr::{ColumnType, RelationType};

pub struct FlowDummyTableSource {
pub id_names_to_desc: Vec<(TableId, TableName, RelationDesc)>,
pub id_names_to_desc: Vec<(TableId, TableName, TableDesc)>,
id_to_idx: HashMap<TableId, usize>,
name_to_idx: HashMap<TableName, usize>,
}
Expand All @@ -234,8 +262,10 @@ pub(crate) mod test {
"public".to_string(),
"numbers".to_string(),
],
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
TableDesc::new_no_default(
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
),
),
(
1025,
Expand All @@ -244,11 +274,13 @@ pub(crate) mod test {
"public".to_string(),
"numbers_with_ts".to_string(),
],
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
TableDesc::new_no_default(
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
),
),
];
let id_to_idx = id_names_to_desc
Expand All @@ -271,7 +303,7 @@ pub(crate) mod test {

#[async_trait::async_trait]
impl FlowTableSource for FlowDummyTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
Expand Down
15 changes: 11 additions & 4 deletions src/flow/src/adapter/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use session::context::QueryContextBuilder;
use snafu::{OptionExt, ResultExt};
use table::table_reference::TableReference;

use crate::adapter::table_source::TableDesc;
use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL};
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
use crate::repr::{ColumnType, RelationDesc, RelationType};
Expand Down Expand Up @@ -126,7 +127,7 @@ impl FlowWorkerManager {

pub fn table_info_value_to_relation_desc(
table_info_value: TableInfoValue,
) -> Result<RelationDesc, Error> {
) -> Result<TableDesc, Error> {
let raw_schema = table_info_value.table_info.meta.schema;
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
Expand All @@ -147,8 +148,7 @@ pub fn table_info_value_to_relation_desc(
let keys = vec![crate::repr::Key::from(key)];

let time_index = raw_schema.timestamp_index;

Ok(RelationDesc {
let relation_desc = RelationDesc {
typ: RelationType {
column_types,
keys,
Expand All @@ -157,7 +157,14 @@ pub fn table_info_value_to_relation_desc(
auto_columns: vec![],
},
names: col_names,
})
};
let default_values = raw_schema
.column_schemas
.iter()
.map(|c| c.default_constraint().cloned())
.collect_vec();

Ok(TableDesc::new(relation_desc, default_values))
}

pub fn from_proto_to_data_type(
Expand Down
Loading
Loading