Skip to content

Commit

Permalink
refactor: per review
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Jan 8, 2025
1 parent 2337644 commit 4ff32f3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 15 deletions.
51 changes: 51 additions & 0 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,57 @@ mod tests {
check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
}

#[test]
fn test_fill_impure_columns_err() {
let rows = Rows {
schema: vec![new_column_schema(
"k0",
ColumnDataType::Int64,
SemanticType::Tag,
)],
rows: vec![Row {
values: vec![i64_value(1)],
}],
};
let metadata = {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"now()".to_string(),
)))
.unwrap(),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"k0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.primary_key(vec![2]);
builder.build().unwrap()
};

let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
assert!(request
.fill_missing_columns(&metadata)
.unwrap_err()
.to_string()
.contains("Unexpected impure default value with region_id"));
}

#[test]
fn test_fill_missing_columns() {
let rows = Rows {
Expand Down
22 changes: 9 additions & 13 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ impl Inserter {
.convert(requests)
.await?;

// Fill impure default values in the request
let inserts = fill_reqs_with_impure_default(&table_infos, inserts)?;

self.do_request(inserts, &ctx).await
self.do_request(inserts, &table_infos, &ctx).await
}

/// Handles row inserts request with metric engine.
Expand Down Expand Up @@ -266,9 +263,7 @@ impl Inserter {
.convert(requests)
.await?;

// Fill impure default values in the request
let inserts = fill_reqs_with_impure_default(&table_infos, inserts)?;
self.do_request(inserts, &ctx).await
self.do_request(inserts, &table_infos, &ctx).await
}

pub async fn handle_table_insert(
Expand All @@ -291,9 +286,8 @@ impl Inserter {

let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
// Fill impure default values in the request
let inserts = fill_reqs_with_impure_default(&table_infos, inserts)?;
self.do_request(inserts, &ctx).await

self.do_request(inserts, &table_infos, &ctx).await
}

pub async fn handle_statement_insert(
Expand All @@ -308,19 +302,21 @@ impl Inserter {

let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
// Fill impure default values in the request
let inserts = fill_reqs_with_impure_default(&table_infos, inserts)?;

self.do_request(inserts, ctx).await
self.do_request(inserts, &table_infos, ctx).await
}
}

impl Inserter {
async fn do_request(
&self,
requests: InstantAndNormalInsertRequests,
table_infos: &HashMap<TableId, Arc<TableInfo>>,
ctx: &QueryContextRef,
) -> Result<Output> {
// Fill impure default values in the request
let requests = fill_reqs_with_impure_default(table_infos, requests)?;

let write_cost = write_meter!(
ctx.current_catalog(),
ctx.current_schema(),
Expand Down
8 changes: 6 additions & 2 deletions src/operator/src/req_convert/insert/fill_impure_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct ImpureDefaultFiller {
impl ImpureDefaultFiller {
pub fn new(table_info: TableInfoRef) -> Result<Self> {
let impure_column_list = find_all_impure_columns(&table_info);
let pks = table_info.meta.primary_key_indices.clone();
let pks = &table_info.meta.primary_key_indices;
let pk_names = pks
.iter()
.map(|&i| table_info.meta.schema.column_name_by_index(i).to_string())
Expand Down Expand Up @@ -93,6 +93,10 @@ impl ImpureDefaultFiller {
})
.collect();

if self.impure_columns.len() == impure_columns_in_reqs.len() {
return;
}

let (schema_append, row_append): (Vec<_>, Vec<_>) = self
.impure_columns
.iter()
Expand All @@ -107,7 +111,7 @@ impl ImpureDefaultFiller {

rows.schema.extend(schema_append);
for row in rows.rows.iter_mut() {
row.values.extend(row_append.clone());
row.values.extend_from_slice(row_append.as_slice());
}
}
}
Expand Down

0 comments on commit 4ff32f3

Please sign in to comment.