Skip to content

Commit

Permalink
feat: show json value as strings
Browse files Browse the repository at this point in the history
  • Loading branch information
CookiePieWw committed Aug 27, 2024
1 parent c334919 commit b2585ac
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/datatypes/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ impl ConcreteDataType {
matches!(self, ConcreteDataType::Decimal128(_))
}

pub fn is_json(&self) -> bool {
matches!(self, ConcreteDataType::Json(_))
}

pub fn numerics() -> Vec<ConcreteDataType> {
vec![
ConcreteDataType::int8_datatype(),
Expand Down
10 changes: 9 additions & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,15 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
if column_schema.is_time_index() && timestamp_index.is_none() {
timestamp_index = Some(index);
}
let field = Field::try_from(column_schema)?;
let mut field = Field::try_from(column_schema)?;

// Json column performs the same as binary column in Arrow, so we need to mark it
if column_schema.data_type.is_json() {
field.set_metadata(HashMap::from([(
String::from("is_json"),
String::from("true"),
)]));
}
fields.push(field);
ensure!(
name_to_index
Expand Down
5 changes: 4 additions & 1 deletion src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ impl TryFrom<&Field> for ColumnSchema {
type Error = Error;

fn try_from(field: &Field) -> Result<ColumnSchema> {
let data_type = ConcreteDataType::try_from(field.data_type())?;
let mut data_type = ConcreteDataType::try_from(field.data_type())?;
if field.metadata().contains_key("is_json") {
data_type = ConcreteDataType::json_datatype();
}
let mut metadata = field.metadata().clone();
let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) {
Some(json) => {
Expand Down
25 changes: 5 additions & 20 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,19 @@ impl MergeScanExec {
query_ctx: QueryContextRef,
target_partition: usize,
) -> Result<Self> {
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
let arrow_schema = Arc::new(arrow_schema.clone());
let properties = PlanProperties::new(
EquivalenceProperties::new(arrow_schema_without_metadata.clone()),
EquivalenceProperties::new(arrow_schema.clone()),
Partitioning::UnknownPartitioning(target_partition),
ExecutionMode::Bounded,
);
let schema_without_metadata =
Self::arrow_schema_to_schema(arrow_schema_without_metadata.clone())?;
let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
Ok(Self {
table,
regions,
plan,
schema: schema_without_metadata,
arrow_schema: arrow_schema_without_metadata,
schema,
arrow_schema,
region_query_handler,
metric: ExecutionPlanMetricsSet::new(),
sub_stage_metrics: Arc::default(),
Expand Down Expand Up @@ -294,20 +293,6 @@ impl MergeScanExec {
}))
}

fn arrow_schema_without_metadata(arrow_schema: &ArrowSchema) -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(
arrow_schema
.fields()
.iter()
.map(|field| {
let field = field.as_ref().clone();
let field_without_metadata = field.with_metadata(Default::default());
Arc::new(field_without_metadata)
})
.collect::<Vec<_>>(),
))
}

fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
Ok(Arc::new(schema))
Expand Down
1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ serde_json.workspace = true
session.workspace = true
snafu.workspace = true
snap = "1"
jsonb.workspace = true
sql.workspace = true
strum.workspace = true
table.workspace = true
Expand Down
16 changes: 13 additions & 3 deletions src/servers/src/mysql/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
&mut row_writer,
&record_batch,
query_context.clone(),
&column_def,
)
.await?
}
Expand All @@ -191,9 +192,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
row_writer: &mut RowWriter<'_, W>,
recordbatch: &RecordBatch,
query_context: QueryContextRef,
column_def: &[Column],
) -> Result<()> {
for row in recordbatch.rows() {
for value in row.into_iter() {
for (value, column) in row.into_iter().zip(column_def.iter()) {
match value {
Value::Null => row_writer.write_col(None::<u8>)?,
Value::Boolean(v) => row_writer.write_col(v as i8)?,
Expand All @@ -208,7 +210,14 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::Float32(v) => row_writer.write_col(v.0)?,
Value::Float64(v) => row_writer.write_col(v.0)?,
Value::String(v) => row_writer.write_col(v.as_utf8())?,
Value::Binary(v) | Value::Json(v) => row_writer.write_col(v.deref())?,
Value::Binary(v) => match column.coltype {
ColumnType::MYSQL_TYPE_JSON => {
row_writer.write_col(jsonb::to_string(&v))?;
}
_ => {
row_writer.write_col(v.deref())?;
}
},
Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
// convert datetime and timestamp to timezone of current connection
Value::DateTime(v) => row_writer.write_col(
Expand All @@ -219,7 +228,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
)?,
Value::Interval(v) => row_writer.write_col(v.to_iso8601_string())?,
Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
Value::List(_) => {
Value::List(_) | Value::Json(_) => {
return Err(Error::Internal {
err_msg: format!(
"cannot write value {:?} in mysql protocol: unimplemented",
Expand Down Expand Up @@ -281,6 +290,7 @@ pub(crate) fn create_mysql_column(
ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
_ => error::UnsupportedDataTypeSnafu {
data_type,
reason: "not implemented",
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/postgres/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ where
.map(move |row| {
row.and_then(|row| {
let mut encoder = DataRowEncoder::new(pg_schema_ref.clone());
for value in row.iter() {
encode_value(&query_ctx, value, &mut encoder)?;
for (value, column) in row.iter().zip(schema.column_schemas()) {
encode_value(&query_ctx, value, &mut encoder, &column.data_type)?;
}
encoder.finish()
})
Expand Down
63 changes: 52 additions & 11 deletions src/servers/src/postgres/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub(super) fn encode_value(
query_ctx: &QueryContextRef,
value: &Value,
builder: &mut DataRowEncoder,
datatype: &ConcreteDataType,
) -> PgWireResult<()> {
match value {
Value::Null => builder.encode_field(&None::<&i8>),
Expand All @@ -77,13 +78,18 @@ pub(super) fn encode_value(
Value::Float32(v) => builder.encode_field(&v.0),
Value::Float64(v) => builder.encode_field(&v.0),
Value::String(v) => builder.encode_field(&v.as_utf8()),
Value::Binary(v) => {
let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
match *bytea_output {
PGByteaOutputValue::ESCAPE => builder.encode_field(&EscapeOutputBytea(v.deref())),
PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
Value::Binary(v) => match datatype {
ConcreteDataType::Json(_) => builder.encode_field(&jsonb::to_string(v)),
_ => {
let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
match *bytea_output {
PGByteaOutputValue::ESCAPE => {
builder.encode_field(&EscapeOutputBytea(v.deref()))
}
PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
}
}
}
},
Value::Date(v) => {
if let Some(date) = v.to_chrono_date() {
let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
Expand Down Expand Up @@ -125,6 +131,7 @@ pub(super) fn encode_value(
}
Value::Interval(v) => builder.encode_field(&PgInterval::from(*v)),
Value::Decimal128(v) => builder.encode_field(&v.to_string()),
// Value of json type is represented as Value::Binary(_)
Value::List(_) | Value::Duration(_) | Value::Json(_) => {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!(
Expand Down Expand Up @@ -154,10 +161,10 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
&ConcreteDataType::Time(_) => Ok(Type::TIME),
&ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
&ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
&ConcreteDataType::Json(_) => Ok(Type::JSON),
&ConcreteDataType::Duration(_)
| &ConcreteDataType::List(_)
| &ConcreteDataType::Dictionary(_)
| &ConcreteDataType::Json(_) => server_error::UnsupportedDataTypeSnafu {
| &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
data_type: origin,
reason: "not implemented",
}
Expand Down Expand Up @@ -538,7 +545,9 @@ pub(super) fn parameters_to_scalar_values(
ConcreteDataType::String(_) => {
ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
}
ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => {
ScalarValue::Binary(data)
}
_ => {
return Err(invalid_parameter_error(
"invalid_parameter_type",
Expand Down Expand Up @@ -582,6 +591,8 @@ pub(super) fn param_types_to_pg_types(
mod test {
use std::sync::Arc;

use common_time::interval::IntervalUnit;
use common_time::timestamp::TimeUnit;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::value::ListValue;
use pgwire::api::results::{FieldFormat, FieldInfo};
Expand Down Expand Up @@ -779,6 +790,35 @@ mod test {
),
];

let datatypes = vec![
ConcreteDataType::null_datatype(),
ConcreteDataType::boolean_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
ConcreteDataType::int8_datatype(),
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::time_datatype(TimeUnit::Second),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_datatype(TimeUnit::Second),
ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
];
let values = vec![
Value::Null,
Value::Boolean(true),
Expand Down Expand Up @@ -813,14 +853,15 @@ mod test {
.build()
.into();
let mut builder = DataRowEncoder::new(Arc::new(schema));
for i in values.iter() {
encode_value(&query_context, i, &mut builder).unwrap();
for (value, datatype) in values.iter().zip(datatypes) {
encode_value(&query_context, value, &mut builder, &datatype).unwrap();
}

let err = encode_value(
&query_context,
&Value::List(ListValue::new(vec![], ConcreteDataType::int16_datatype())),
&mut builder,
&ConcreteDataType::list_datatype(ConcreteDataType::int16_datatype()),
)
.unwrap_err();
match err {
Expand Down
2 changes: 1 addition & 1 deletion tests/cases/standalone/common/insert/merge_mode.result
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ create table if not exists invalid_merge_mode(
engine=mito
with('merge_mode'='first_row');

Error: 1004(InvalidArguments), Invalid options: Matching variant not found at line 1 column 25
Error: 1004(InvalidArguments), Invalid options: Matching variant not found at line 1 column 76

create table if not exists invalid_merge_mode(
host string,
Expand Down

0 comments on commit b2585ac

Please sign in to comment.