Skip to content

Commit

Permalink
feat: support to insert json data via grpc protocol (#4908)
Browse files Browse the repository at this point in the history
* feat: support to insert json data via grpc protocol

* chore: handle error

* feat: introduce `prepare_rows`

* chore: fmt toml

* test: add row deletion test

* test: fix unit test

* chore: remove log

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Nov 4, 2024
1 parent 191755f commit 4ab6dc2
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 75 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 28 additions & 5 deletions src/common/grpc-expr/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@

use std::collections::HashSet;

use api::v1::column_data_type_extension::TypeExt;
use api::v1::column_def::contains_fulltext;
use api::v1::{
AddColumn, AddColumns, Column, ColumnDataType, ColumnDataTypeExtension, ColumnDef,
ColumnOptions, ColumnSchema, CreateTableExpr, SemanticType,
ColumnOptions, ColumnSchema, CreateTableExpr, JsonTypeExtension, SemanticType,
};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_reference::TableReference;

use crate::error::{
DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, InvalidFulltextColumnTypeSnafu,
MissingTimestampColumnSnafu, Result, UnknownColumnDataTypeSnafu,
self, DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu,
InvalidFulltextColumnTypeSnafu, MissingTimestampColumnSnafu, Result,
UnknownColumnDataTypeSnafu,
};
pub struct ColumnExpr<'a> {
pub column_name: &'a str,
Expand Down Expand Up @@ -72,6 +74,28 @@ impl<'a> From<&'a ColumnSchema> for ColumnExpr<'a> {
}
}

fn infer_column_datatype(
datatype: i32,
datatype_extension: &Option<ColumnDataTypeExtension>,
) -> Result<ColumnDataType> {
let column_type =
ColumnDataType::try_from(datatype).context(UnknownColumnDataTypeSnafu { datatype })?;

if matches!(&column_type, ColumnDataType::Binary) {
if let Some(ext) = datatype_extension {
let type_ext = ext
.type_ext
.as_ref()
.context(error::MissingFieldSnafu { field: "type_ext" })?;
if *type_ext == TypeExt::JsonType(JsonTypeExtension::JsonBinary.into()) {
return Ok(ColumnDataType::Json);
}
}
}

Ok(column_type)
}

pub fn build_create_table_expr(
table_id: Option<TableId>,
table_name: &TableReference<'_>,
Expand Down Expand Up @@ -124,8 +148,7 @@ pub fn build_create_table_expr(
_ => {}
}

let column_type =
ColumnDataType::try_from(datatype).context(UnknownColumnDataTypeSnafu { datatype })?;
let column_type = infer_column_datatype(datatype, datatype_extension)?;

ensure!(
!contains_fulltext(options) || column_type == ColumnDataType::String,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::{DeleteRequests, DropFlowExpr, InsertRequests, RowDeleteRequests, R
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_query::Output;
use common_telemetry::tracing;
use common_telemetry::tracing::{self};
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
Expand Down
1 change: 1 addition & 0 deletions src/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ datatypes.workspace = true
file-engine.workspace = true
futures = "0.3"
futures-util.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
meter-core.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::error::{
MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu,
};
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::common::preprocess_row_delete_requests;
use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion};

pub struct Deleter {
Expand Down Expand Up @@ -72,6 +73,7 @@ impl Deleter {
mut requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
preprocess_row_delete_requests(&mut requests.deletes)?;
// remove empty requests
requests.deletes.retain(|req| {
req.rows
Expand Down
10 changes: 9 additions & 1 deletion src/operator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid json text: {}", json))]
InvalidJsonFormat {
#[snafu(implicit)]
location: Location,
json: String,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -808,7 +815,8 @@ impl ErrorExt for Error {
| Error::BuildAdminFunctionArgs { .. }
| Error::FunctionArityMismatch { .. }
| Error::InvalidPartition { .. }
| Error::PhysicalExpr { .. } => StatusCode::InvalidArguments,
| Error::PhysicalExpr { .. }
| Error::InvalidJsonFormat { .. } => StatusCode::InvalidArguments,

Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => {
StatusCode::TableAlreadyExists
Expand Down
4 changes: 3 additions & 1 deletion src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::error::{
};
use crate::expr_factory::CreateExprFactory;
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::common::preprocess_row_insert_requests;
use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion};
use crate::statement::StatementExecutor;

Expand Down Expand Up @@ -119,10 +120,11 @@ impl Inserter {
/// Handles row inserts request and creates a physical table on demand.
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
preprocess_row_insert_requests(&mut requests.inserts)?;
self.handle_row_inserts_with_create_type(
requests,
ctx,
Expand Down
77 changes: 74 additions & 3 deletions src/operator/src/req_convert/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,91 @@ pub(crate) mod partitioner;
use std::collections::HashMap;

use api::helper::ColumnDataTypeWrapper;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::column_def::options_from_column_schema;
use api::v1::value::ValueData;
use api::v1::{Column, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use api::v1::{
Column, ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
RowDeleteRequest, RowInsertRequest, Rows, SemanticType, Value,
};
use common_base::BitVec;
use datatypes::vectors::VectorRef;
use snafu::prelude::*;
use snafu::ResultExt;
use table::metadata::TableInfo;

use crate::error::{
ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu,
MissingTimeIndexColumnSnafu, Result,
ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidJsonFormatSnafu,
MissingTimeIndexColumnSnafu, Result, UnexpectedSnafu,
};

/// Encodes a string value as JSONB binary data if the value is of `StringValue` type.
fn encode_string_to_jsonb_binary(value_data: ValueData) -> Result<ValueData> {
if let ValueData::StringValue(json) = &value_data {
let binary = jsonb::parse_value(json.as_bytes())
.map_err(|_| InvalidJsonFormatSnafu { json }.build())
.map(|jsonb| jsonb.to_vec())?;
Ok(ValueData::BinaryValue(binary))
} else {
UnexpectedSnafu {
violated: "Expected to value data to be a string.",
}
.fail()
}
}

/// Prepares row insertion requests by converting any JSON values to binary JSONB format.
pub fn preprocess_row_insert_requests(requests: &mut Vec<RowInsertRequest>) -> Result<()> {
for request in requests {
prepare_rows(&mut request.rows)?;
}

Ok(())
}

/// Prepares row deletion requests by converting any JSON values to binary JSONB format.
pub fn preprocess_row_delete_requests(requests: &mut Vec<RowDeleteRequest>) -> Result<()> {
for request in requests {
prepare_rows(&mut request.rows)?;
}

Ok(())
}

fn prepare_rows(rows: &mut Option<Rows>) -> Result<()> {
if let Some(rows) = rows {
let indexes = rows
.schema
.iter()
.enumerate()
.filter_map(|(idx, schema)| {
if schema.datatype() == ColumnDataType::Json {
Some(idx)
} else {
None
}
})
.collect::<Vec<_>>();
for idx in &indexes {
let column = &mut rows.schema[*idx];
column.datatype_extension = Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
});
column.datatype = ColumnDataType::Binary.into();
}

for idx in &indexes {
for row in &mut rows.rows {
if let Some(value_data) = row.values[*idx].value_data.take() {
row.values[*idx].value_data = Some(encode_string_to_jsonb_binary(value_data)?);
}
}
}
}

Ok(())
}

pub fn columns_to_rows(columns: Vec<Column>, row_count: u32) -> Result<Rows> {
let row_count = row_count as usize;
let column_count = columns.len();
Expand Down
Loading

0 comments on commit 4ab6dc2

Please sign in to comment.