From 1e9ff97617b10ab31237d31e8d1d36e5683d6b9c Mon Sep 17 00:00:00 2001 From: Solomon Date: Mon, 20 Nov 2023 14:43:32 +0100 Subject: [PATCH] fix: json conversion of Decimal doesn't work with arrow-json --- Cargo.lock | 4 +- dozer-api/Cargo.toml | 2 +- dozer-api/src/rest/api_generator.rs | 9 +- dozer-api/src/sql/datafusion/json.rs | 173 ++++++++++++++++++++++++++ dozer-api/src/sql/datafusion/mod.rs | 1 + dozer-api/src/sql/mod.rs | 1 + dozer-api/src/sql/pgwire.rs | 31 +---- dozer-api/src/sql/util.rs | 28 +++++ dozer-tests/src/cache_tests/filter.rs | 41 +++++- dozer-types/Cargo.toml | 2 +- dozer-types/src/json_types.rs | 44 ++++++- 11 files changed, 289 insertions(+), 47 deletions(-) create mode 100644 dozer-api/src/sql/datafusion/json.rs create mode 100644 dozer-api/src/sql/util.rs diff --git a/Cargo.lock b/Cargo.lock index e861062d57..02326aa798 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9683,9 +9683,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.107" +version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ "indexmap 2.0.0", "itoa", diff --git a/dozer-api/Cargo.toml b/dozer-api/Cargo.toml index 7839b9387c..da8490ba12 100644 --- a/dozer-api/Cargo.toml +++ b/dozer-api/Cargo.toml @@ -55,6 +55,6 @@ uuid = "1.4.1" chrono = "0.4.31" datafusion = "32.0.0" datafusion-expr = "32.0.0" -serde_json = "1.0.93" +serde_json = { version = "1.0.108", features = ["arbitrary_precision"] } pgwire = "0.16.1" tempdir = "0.3.7" diff --git a/dozer-api/src/rest/api_generator.rs b/dozer-api/src/rest/api_generator.rs index 29367f621e..067e47a6a7 100644 --- a/dozer-api/src/rest/api_generator.rs +++ b/dozer-api/src/rest/api_generator.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use actix_web::web::ReqData; use actix_web::{web, HttpResponse}; -use datafusion::error::DataFusionError; use dozer_cache::cache::expression::{QueryExpression, Skip}; use dozer_cache::cache::CacheRecord; use dozer_cache::{CacheReader, Phase}; @@ -14,6 +13,7 @@ use openapiv3::OpenAPI; use crate::api_helper::{get_record, get_records, get_records_count}; use crate::generator::oapi::generator::OpenApiGenerator; +use crate::sql::datafusion::json::record_batches_to_json_rows; use crate::sql::datafusion::SQLExecutor; use crate::CacheEndpoint; use crate::{auth::Access, errors::ApiError}; @@ -193,12 +193,7 @@ pub async fn sql( .collect() .await .map_err(ApiError::SQLQueryFailed)?; - datafusion::arrow::json::writer::record_batches_to_json_rows( - record_batches.iter().collect::>().as_slice(), - ) - .map_err(DataFusionError::ArrowError) - .map_err(ApiError::SQLQueryFailed) - .map(|result| HttpResponse::Ok().json(result)) + Ok(HttpResponse::Ok().json(record_batches_to_json_rows(&record_batches))) } mod extractor { diff --git a/dozer-api/src/sql/datafusion/json.rs b/dozer-api/src/sql/datafusion/json.rs new file mode 100644 index 0000000000..7fa47f1549 --- /dev/null +++ b/dozer-api/src/sql/datafusion/json.rs @@ -0,0 +1,173 @@ +use std::str::FromStr; + +use datafusion::arrow::{ + array::{ + as_largestring_array, as_string_array, Array, AsArray, BooleanArray, IntervalDayTimeArray, + IntervalMonthDayNanoArray, IntervalYearMonthArray, + }, + datatypes::{ + ArrowPrimitiveType, BinaryType, ByteArrayType, DataType, Decimal128Type, Decimal256Type, + DecimalType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, + Int8Type, IntervalUnit, LargeBinaryType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + }, + record_batch::RecordBatch, +}; +use dozer_types::arrow_cast::display::{ArrayFormatter, FormatOptions}; +use serde_json::{map::Map, Number, Value}; + +use crate::sql::util::Iso8601Duration; + +pub fn record_batches_to_json_rows(batches: &[RecordBatch]) -> Vec> { + let mut rows = Vec::new(); + + if batches.is_empty() { + return rows; + } + + let schema = batches[0].schema(); + let schema = schema.as_ref(); + + for batch in batches { + for row_index in 0..batch.num_rows() { + let mut row = Map::new(); + for (column_index, column) in batch.columns().iter().enumerate() { + let value = field_to_json_value(&column, row_index); + let column_name = schema.field(column_index).name(); + row.insert(column_name.clone(), value); + } + rows.push(row) + } + } + + rows +} + +macro_rules! field { + ($array:tt[$index:tt], $type:tt) => { + $array + .as_any() + .downcast_ref::<$type>() + .unwrap() + .value($index) + }; +} + +fn json_number_from_primitive(array: &dyn Array, row_index: usize) -> Value +where + T: ArrowPrimitiveType, + T::Native: ToString, +{ + let array = array.as_primitive::(); + Value::Number(Number::from_str(&array.value(row_index).to_string()).unwrap()) +} + +fn json_string_from_datetime(array: &dyn Array, row_index: usize) -> Value { + let options = FormatOptions::default(); + let formatter = + ArrayFormatter::try_new(array, &options).expect("datetime types should be supported"); + Value::String(formatter.value(row_index).to_string()) +} + +fn json_string_from_utf8(array: &dyn Array, row_index: usize) -> Value { + let array = as_string_array(array); + Value::String(array.value(row_index).to_string()) +} + +fn json_string_from_largeutf8(array: &dyn Array, row_index: usize) -> Value { + let array = as_largestring_array(array); + Value::String(array.value(row_index).to_string()) +} + +fn json_string_from_binary(array: &dyn Array, row_index: usize) -> Value { + let array = array.as_bytes::(); + Value::String(format!("{:?}", array.value(row_index))) +} + +fn json_string_from_fixedsizebinary(array: &dyn Array, row_index: usize) -> Value { + let array = array.as_fixed_size_binary(); + Value::String(format!("{:?}", array.value(row_index))) +} + +fn json_number_from_decimal( + array: &dyn Array, + row_index: usize, + precision: u8, + scale: i8, +) -> Value +where + T: ArrowPrimitiveType + DecimalType, +{ + let array = array.as_primitive::(); + let value = array.value(row_index); + Value::String(::format_decimal(value, precision, scale)) +} + +fn field_to_json_value(column: &dyn Array, row_index: usize) -> Value { + let i = row_index; + let value = if column.is_null(i) { + Value::Null + } else { + match column.data_type() { + DataType::Null => Value::Null, + DataType::Boolean => Value::Bool(field!(column[i], BooleanArray)), + DataType::Int8 => json_number_from_primitive::(column, i), + DataType::Int16 => json_number_from_primitive::(column, i), + DataType::Int32 => json_number_from_primitive::(column, i), + DataType::Int64 => json_number_from_primitive::(column, i), + DataType::UInt8 => json_number_from_primitive::(column, i), + DataType::UInt16 => json_number_from_primitive::(column, i), + DataType::UInt32 => json_number_from_primitive::(column, i), + DataType::UInt64 => json_number_from_primitive::(column, i), + DataType::Float16 => json_number_from_primitive::(column, i), + DataType::Float32 => json_number_from_primitive::(column, i), + DataType::Float64 => json_number_from_primitive::(column, i), + + DataType::Date32 + | DataType::Date64 + | DataType::Timestamp(_, _) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) => json_string_from_datetime(column, i), + + DataType::Interval(IntervalUnit::DayTime) => Value::String({ + let value = field!(column[i], IntervalDayTimeArray); + let (days, milliseconds) = (value as i32, (value >> 32) as i32); + Iso8601Duration::DaysMilliseconds(days, milliseconds).to_string() + }), + DataType::Interval(IntervalUnit::MonthDayNano) => Value::String({ + let value = field!(column[i], IntervalMonthDayNanoArray); + let (months, days, nanoseconds) = + (value as i32, (value >> 32) as i32, (value >> 64) as i64); + Iso8601Duration::MonthsDaysNanoseconds(months, days, nanoseconds).to_string() + }), + DataType::Interval(IntervalUnit::YearMonth) => Value::String({ + let months = field!(column[i], IntervalYearMonthArray); + Iso8601Duration::Months(months).to_string() + }), + + DataType::Binary => json_string_from_binary::(column, i), + DataType::FixedSizeBinary(_) => json_string_from_fixedsizebinary(column, i), + DataType::LargeBinary => json_string_from_binary::(column, i), + DataType::Utf8 => json_string_from_utf8(column, i), + DataType::LargeUtf8 => json_string_from_largeutf8(column, i), + + DataType::Decimal128(precision, scale) => { + json_number_from_decimal::(column, i, *precision, *scale) + } + DataType::Decimal256(precision, scale) => { + json_number_from_decimal::(column, i, *precision, *scale) + } + + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) + | DataType::Struct(_) + | DataType::Union(_, _) + | DataType::Dictionary(_, _) + | DataType::Map(_, _) + | DataType::RunEndEncoded(_, _) => unimplemented!(), + } + }; + + value +} diff --git a/dozer-api/src/sql/datafusion/mod.rs b/dozer-api/src/sql/datafusion/mod.rs index a6c2cb6fc5..143faa7e6e 100644 --- a/dozer-api/src/sql/datafusion/mod.rs +++ b/dozer-api/src/sql/datafusion/mod.rs @@ -1,3 +1,4 @@ +pub mod json; mod predicate_pushdown; use std::{any::Any, sync::Arc}; diff --git a/dozer-api/src/sql/mod.rs b/dozer-api/src/sql/mod.rs index e0a41a0ac5..4e3878baf3 100644 --- a/dozer-api/src/sql/mod.rs +++ b/dozer-api/src/sql/mod.rs @@ -1,2 +1,3 @@ pub mod datafusion; pub mod pgwire; +mod util; diff --git a/dozer-api/src/sql/pgwire.rs b/dozer-api/src/sql/pgwire.rs index 74b2203319..b5903e7029 100644 --- a/dozer-api/src/sql/pgwire.rs +++ b/dozer-api/src/sql/pgwire.rs @@ -41,6 +41,8 @@ use crate::shutdown::ShutdownReceiver; use crate::sql::datafusion::SQLExecutor; use crate::CacheEndpoint; +use super::util::Iso8601Duration; + pub struct PgWireServer { config: SqlOptions, } @@ -478,35 +480,6 @@ fn encode_field( } } -#[derive(Debug)] -enum Iso8601Duration { - Duration(chrono::Duration), - DaysMilliseconds(i32, i32), - MonthsDaysNanoseconds(i32, i32, i64), - Months(i32), -} - -impl std::fmt::Display for Iso8601Duration { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Iso8601Duration::Duration(d) => std::fmt::Display::fmt(&d, f), - Iso8601Duration::DaysMilliseconds(days, msecs) => { - let secs = msecs.div_euclid(1_000); - let msecs = msecs.rem_euclid(1_000); - write!(f, "P{}DT{}.{:03}S", days, secs, msecs) - } - Iso8601Duration::MonthsDaysNanoseconds(months, days, nanos) => { - let secs = nanos.div_euclid(1_000_000_000); - let nanos = nanos.rem_euclid(1_000_000_000); - write!(f, "P{}M{}DT{}.{:09}S", months, days, secs, nanos) - } - Iso8601Duration::Months(months) => { - write!(f, "P{}M", months) - } - } - } -} - fn generic_error_info(err: String) -> ErrorInfo { ErrorInfo::new("ERROR".to_string(), "2F000".to_string(), err) } diff --git a/dozer-api/src/sql/util.rs b/dozer-api/src/sql/util.rs new file mode 100644 index 0000000000..13eb4ff3dd --- /dev/null +++ b/dozer-api/src/sql/util.rs @@ -0,0 +1,28 @@ +#[derive(Debug)] +pub enum Iso8601Duration { + Duration(chrono::Duration), + DaysMilliseconds(i32, i32), + MonthsDaysNanoseconds(i32, i32, i64), + Months(i32), +} + +impl std::fmt::Display for Iso8601Duration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Iso8601Duration::Duration(d) => std::fmt::Display::fmt(&d, f), + Iso8601Duration::DaysMilliseconds(days, msecs) => { + let secs = msecs.div_euclid(1_000); + let msecs = msecs.rem_euclid(1_000); + write!(f, "P{}DT{}.{:03}S", days, secs, msecs) + } + Iso8601Duration::MonthsDaysNanoseconds(months, days, nanos) => { + let secs = nanos.div_euclid(1_000_000_000); + let nanos = nanos.rem_euclid(1_000_000_000); + write!(f, "P{}M{}DT{}.{:09}S", months, days, secs, nanos) + } + Iso8601Duration::Months(months) => { + write!(f, "P{}M", months) + } + } + } +} diff --git a/dozer-tests/src/cache_tests/filter.rs b/dozer-tests/src/cache_tests/filter.rs index c7a3b7d8d4..84ab737bb6 100644 --- a/dozer-tests/src/cache_tests/filter.rs +++ b/dozer-tests/src/cache_tests/filter.rs @@ -1,6 +1,6 @@ -use bson::{doc, Document}; +use bson::{doc, Bson, Document}; use dozer_cache::cache::expression::{FilterExpression, Operator, QueryExpression}; -use dozer_types::types::Record; +use dozer_types::{serde_json::Value, types::Record}; use futures::stream::StreamExt; use mongodb::Collection; @@ -44,13 +44,13 @@ fn convert_filter(filter: Option<&FilterExpression>) -> Document { Operator::GTE => "$gte", _ => unreachable!(), }; - document.insert(name, doc! {operator: bson::to_bson(value).unwrap()}); + document.insert(name, doc! {operator: to_bson(value).unwrap()}); } Operator::Contains => { document.insert( "$text", doc! { - "$search": bson::to_bson(value).unwrap() + "$search": to_bson(value).unwrap() }, ); } @@ -103,3 +103,36 @@ fn check_equals(film: &Film, record: &Record) { ); assert!(values.next().is_none()); } + +fn to_bson(value: &Value) -> bson::ser::Result { + // this match block's sole purpose is to properly convert `serde_json::Number` to Bson + // when `serde_json/arbitrary_precision` feature is enabled. + // `bson::to_bson()` by itself does not properly convert it. + match value { + Value::Number(number) => { + let bson_value = if let Some(n) = number.as_i64() { + Bson::Int64(n) + } else if let Some(n) = number.as_f64() { + Bson::Double(n) + } else { + bson::to_bson(value)? + }; + Ok(bson_value) + } + Value::Array(vec) => { + let mut array = Vec::with_capacity(vec.len()); + for value in vec { + array.push(to_bson(value)?) + } + Ok(array.into()) + } + Value::Object(map) => { + let mut object = bson::Document::new(); + for (key, value) in map.into_iter() { + object.insert(key, to_bson(value)?); + } + Ok(object.into()) + } + value => bson::to_bson(value), + } +} diff --git a/dozer-types/Cargo.toml b/dozer-types/Cargo.toml index 92fa6a639c..a76af96c5d 100644 --- a/dozer-types/Cargo.toml +++ b/dozer-types/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" [dependencies] chrono = { version = "0.4.23", features = ["serde"] } serde = { version = "1.0.189", features = ["derive", "rc"] } -serde_json = { version = "1.0.93", features = ["std"] } +serde_json = { version = "1.0.108", features = ["arbitrary_precision"] } ijson = "0.1.3" rust_decimal = { version = "1.32", features = ["serde-str", "db-postgres"] } bincode = { workspace = true, features = ["serde"] } diff --git a/dozer-types/src/json_types.rs b/dozer-types/src/json_types.rs index 24fde64581..e65a241d10 100644 --- a/dozer-types/src/json_types.rs +++ b/dozer-types/src/json_types.rs @@ -8,7 +8,7 @@ use prost_types::value::Kind; use prost_types::{ListValue, Struct, Value as ProstValue}; use serde_json::{Map, Value}; -use ijson::{Destructured, DestructuredRef, IArray, IObject, IValue}; +use ijson::{Destructured, DestructuredRef, IArray, INumber, IObject, IValue}; pub type JsonValue = IValue; pub type JsonObject = IObject; @@ -21,7 +21,7 @@ pub use ijson::ijson as json; pub fn json_from_str(from: &str) -> Result { let serde_value: serde_json::Value = serde_json::from_str(from).unwrap_or_else(|_| serde_json::Value::String(from.to_owned())); - ijson::to_value(serde_value).map_err(Into::into) + serde_json_to_json_value(serde_value) } pub fn parse_json_slice(bytes: &[u8]) -> Result { @@ -181,7 +181,45 @@ pub fn json_value_to_prost(val: JsonValue) -> ProstValue { } pub fn serde_json_to_json_value(value: Value) -> Result { - ijson::to_value(value).map_err(Into::into) + // this match block's sole purpose is to properly convert `serde_json::Number` to IValue + // when `serde_json/arbitrary_precision` feature is enabled. + // `ijson::to_value()` by itself does not properly convert it. + match value { + Value::Number(number) => { + fn ivalue_from_number_opt(number: &serde_json::Number) -> Option { + if let Some(n) = number.as_f64() { + if let Ok(value) = INumber::try_from(n) { + return Some(value.into()); + } + } else if let Some(n) = number.as_i64() { + return Some(INumber::from(n).into()); + } else if let Some(n) = number.as_u64() { + return Some(INumber::from(n).into()); + } + None + } + if let Some(value) = ivalue_from_number_opt(&number) { + Ok(value) + } else { + ijson::to_value(Value::Number(number)).map_err(Into::into) + } + } + Value::Array(vec) => { + let mut array = IArray::with_capacity(vec.len()); + for value in vec { + array.push(serde_json_to_json_value(value)?) + } + Ok(array.into()) + } + Value::Object(map) => { + let mut object = IObject::with_capacity(map.len()); + for (key, value) in map.into_iter() { + object.insert(key, serde_json_to_json_value(value)?); + } + Ok(object.into()) + } + value => ijson::to_value(value).map_err(Into::into), + } } #[cfg(test)]