Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: json conversion #4893

Merged
merged 13 commits into from
Oct 29, 2024
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
jsonb = { git = "https://github.com/datafuselabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false }
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false }
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
mockall = "0.11.4"
Expand Down
10 changes: 9 additions & 1 deletion src/datatypes/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid JSON text: {}", value))]
InvalidJson {
value: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Value exceeds the precision {} bound", precision))]
ValueExceedsPrecision {
precision: u8,
Expand Down Expand Up @@ -222,7 +229,8 @@ impl ErrorExt for Error {
| DefaultValueType { .. }
| DuplicateMeta { .. }
| InvalidTimestampPrecision { .. }
| InvalidPrecisionOrScale { .. } => StatusCode::InvalidArguments,
| InvalidPrecisionOrScale { .. }
| InvalidJson { .. } => StatusCode::InvalidArguments,

ValueExceedsPrecision { .. }
| CastType { .. }
Expand Down
1 change: 1 addition & 0 deletions src/datatypes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(let_chains)]
#![feature(assert_matches)]

pub mod arrow_array;
pub mod data_type;
Expand Down
80 changes: 80 additions & 0 deletions src/datatypes/src/vectors/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,36 @@ impl BinaryVector {
pub(crate) fn as_arrow(&self) -> &dyn Array {
&self.array
}

/// Creates a new binary vector of JSONB from a binary vector.
/// The binary vector must contain valid JSON strings.
pub fn convert_binary_to_json(&self) -> Result<BinaryVector> {
let arrow_array = self.to_arrow_array();
let mut vector = vec![];
for binary in arrow_array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.iter()
{
let jsonb = if let Some(binary) = binary {
match jsonb::from_slice(binary) {
Ok(jsonb) => Some(jsonb.to_vec()),
Err(_) => {
let s = String::from_utf8_lossy(binary);
return error::InvalidJsonSnafu {
value: s.to_string(),
}
.fail();
}
}
} else {
None
};
vector.push(jsonb);
}
Ok(BinaryVector::from(vector))
}
}

impl From<BinaryArray> for BinaryVector {
Expand Down Expand Up @@ -233,6 +263,8 @@ vectors::impl_try_from_arrow_array_for_vector!(BinaryArray, BinaryVector);

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use arrow::datatypes::DataType as ArrowDataType;
use common_base::bytes::Bytes;
use serde_json;
Expand Down Expand Up @@ -383,4 +415,52 @@ mod tests {
assert_eq!(b"four", vector.get_data(3).unwrap());
assert_eq!(builder.len(), 4);
}

#[test]
fn test_binary_json_conversion() {
// json strings
let json_strings = vec![
b"{\"hello\": \"world\"}".to_vec(),
b"{\"foo\": 1}".to_vec(),
b"123".to_vec(),
];
let json_vector = BinaryVector::from(json_strings.clone())
.convert_binary_to_json()
.unwrap();
let jsonbs = json_strings
.iter()
.map(|v| jsonb::parse_value(v).unwrap().to_vec())
.collect::<Vec<_>>();
for i in 0..3 {
assert_eq!(
json_vector.get_ref(i).as_binary().unwrap().unwrap(),
jsonbs.get(i).unwrap().as_slice()
);
}

// jsonb
let json_vector = BinaryVector::from(jsonbs.clone())
.convert_binary_to_json()
.unwrap();
for i in 0..3 {
assert_eq!(
json_vector.get_ref(i).as_binary().unwrap().unwrap(),
jsonbs.get(i).unwrap().as_slice()
);
}

// binary with jsonb header (0x80, 0x40, 0x20)
let binary_with_jsonb_header: Vec<u8> = [0x80, 0x23, 0x40, 0x22].to_vec();
let error = BinaryVector::from(vec![binary_with_jsonb_header])
.convert_binary_to_json()
.unwrap_err();
assert_matches!(error, error::Error::InvalidJson { .. });

// invalid json string
let json_strings = vec![b"{\"hello\": \"world\"".to_vec()];
let error = BinaryVector::from(json_strings)
.convert_binary_to_json()
.unwrap_err();
assert_matches!(error, error::Error::InvalidJson { .. });
}
}
8 changes: 8 additions & 0 deletions src/datatypes/src/vectors/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ mod find_unique;
mod replicate;
mod take;

use std::sync::Arc;

use common_base::BitVec;

use crate::error::{self, Result};
Expand Down Expand Up @@ -89,6 +91,12 @@ macro_rules! impl_scalar_vector_op {
}

fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
if to_type == &ConcreteDataType::json_datatype() {
if let Some(vector) = self.as_any().downcast_ref::<BinaryVector>() {
let json_vector = vector.convert_binary_to_json()?;
return Ok(Arc::new(json_vector) as VectorRef);
}
}
cast::cast_non_constant!(self, to_type)
}

Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/postgres/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ pub(super) fn parameters_to_scalar_values(
if let Some(server_type) = &server_type {
match server_type {
ConcreteDataType::Binary(_) => {
ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec()))
ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
}
_ => {
return Err(invalid_parameter_error(
Expand All @@ -971,7 +971,7 @@ pub(super) fn parameters_to_scalar_values(
}
}
} else {
ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec()))
ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
}
}
_ => Err(invalid_parameter_error(
Expand Down
35 changes: 30 additions & 5 deletions tests-integration/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.unwrap();

sqlx::query(
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null)",
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null, j json default null)",
)
.execute(&pool)
.await
Expand All @@ -158,18 +158,30 @@ pub async fn test_mysql_crud(store_type: StorageType) {
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let hello = format!("hello{i}");
let bytes = hello.as_bytes();
sqlx::query("insert into demo values(?, ?, ?, ?, ?)")
let jsons = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?)")
.bind(i)
.bind(i)
.bind(d)
.bind(dt)
.bind(bytes)
.bind(jsons)
.execute(&pool)
.await
.unwrap();
}

let rows = sqlx::query("select i, d, dt, b from demo")
let rows = sqlx::query("select i, d, dt, b, j from demo")
.fetch_all(&pool)
.await
.unwrap();
Expand All @@ -180,6 +192,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
let d: NaiveDate = row.get("d");
let dt: DateTime<Utc> = row.get("dt");
let bytes: Vec<u8> = row.get("b");
let json: serde_json::Value = row.get("j");
assert_eq!(ret, i as i64);
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
assert_eq!(expected_d, d);
Expand All @@ -194,6 +207,18 @@ pub async fn test_mysql_crud(store_type: StorageType) {
format!("{}", dt.format("%Y-%m-%d %H:%M:%S"))
);
assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes));
let expected_j = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
assert_eq!(json, expected_j);
}

let rows = sqlx::query("select i from demo where i=?")
Expand Down Expand Up @@ -396,7 +421,7 @@ pub async fn test_postgres_crud(store_type: StorageType) {
let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis();
let bytes = "hello".as_bytes();
let json = serde_json::json!({
"code": 200,
"code": i,
"success": true,
"payload": {
"features": [
Expand Down Expand Up @@ -444,7 +469,7 @@ pub async fn test_postgres_crud(store_type: StorageType) {
assert_eq!("hello".as_bytes(), bytes);

let expected_j = serde_json::json!({
"code": 200,
"code": i,
"success": true,
"payload": {
"features": [
Expand Down