Skip to content

Commit

Permalink
fix: json conversion of Decimal doesn't work with arrow-json
Browse files Browse the repository at this point in the history
  • Loading branch information
Solomon committed Nov 20, 2023
1 parent 7409098 commit 1e9ff97
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 47 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ uuid = "1.4.1"
chrono = "0.4.31"
datafusion = "32.0.0"
datafusion-expr = "32.0.0"
serde_json = "1.0.93"
serde_json = { version = "1.0.108", features = ["arbitrary_precision"] }
pgwire = "0.16.1"
tempdir = "0.3.7"
9 changes: 2 additions & 7 deletions dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use actix_web::web::ReqData;
use actix_web::{web, HttpResponse};
use datafusion::error::DataFusionError;
use dozer_cache::cache::expression::{QueryExpression, Skip};
use dozer_cache::cache::CacheRecord;
use dozer_cache::{CacheReader, Phase};
Expand All @@ -14,6 +13,7 @@ use openapiv3::OpenAPI;

use crate::api_helper::{get_record, get_records, get_records_count};
use crate::generator::oapi::generator::OpenApiGenerator;
use crate::sql::datafusion::json::record_batches_to_json_rows;
use crate::sql::datafusion::SQLExecutor;
use crate::CacheEndpoint;
use crate::{auth::Access, errors::ApiError};
Expand Down Expand Up @@ -193,12 +193,7 @@ pub async fn sql(
.collect()
.await
.map_err(ApiError::SQLQueryFailed)?;
datafusion::arrow::json::writer::record_batches_to_json_rows(
record_batches.iter().collect::<Vec<_>>().as_slice(),
)
.map_err(DataFusionError::ArrowError)
.map_err(ApiError::SQLQueryFailed)
.map(|result| HttpResponse::Ok().json(result))
Ok(HttpResponse::Ok().json(record_batches_to_json_rows(&record_batches)))
}

mod extractor {
Expand Down
173 changes: 173 additions & 0 deletions dozer-api/src/sql/datafusion/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use std::str::FromStr;

use datafusion::arrow::{
array::{
as_largestring_array, as_string_array, Array, AsArray, BooleanArray, IntervalDayTimeArray,
IntervalMonthDayNanoArray, IntervalYearMonthArray,
},
datatypes::{
ArrowPrimitiveType, BinaryType, ByteArrayType, DataType, Decimal128Type, Decimal256Type,
DecimalType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Int8Type, IntervalUnit, LargeBinaryType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
},
record_batch::RecordBatch,
};
use dozer_types::arrow_cast::display::{ArrayFormatter, FormatOptions};
use serde_json::{map::Map, Number, Value};

use crate::sql::util::Iso8601Duration;

pub fn record_batches_to_json_rows(batches: &[RecordBatch]) -> Vec<Map<String, Value>> {
let mut rows = Vec::new();

if batches.is_empty() {
return rows;
}

let schema = batches[0].schema();
let schema = schema.as_ref();

for batch in batches {
for row_index in 0..batch.num_rows() {
let mut row = Map::new();
for (column_index, column) in batch.columns().iter().enumerate() {
let value = field_to_json_value(&column, row_index);
let column_name = schema.field(column_index).name();
row.insert(column_name.clone(), value);
}
rows.push(row)
}
}

rows
}

macro_rules! field {
($array:tt[$index:tt], $type:tt) => {
$array
.as_any()
.downcast_ref::<$type>()
.unwrap()
.value($index)
};
}

fn json_number_from_primitive<T>(array: &dyn Array, row_index: usize) -> Value
where
T: ArrowPrimitiveType,
T::Native: ToString,
{
let array = array.as_primitive::<T>();
Value::Number(Number::from_str(&array.value(row_index).to_string()).unwrap())
}

fn json_string_from_datetime(array: &dyn Array, row_index: usize) -> Value {
let options = FormatOptions::default();
let formatter =
ArrayFormatter::try_new(array, &options).expect("datetime types should be supported");
Value::String(formatter.value(row_index).to_string())
}

fn json_string_from_utf8(array: &dyn Array, row_index: usize) -> Value {
let array = as_string_array(array);
Value::String(array.value(row_index).to_string())
}

fn json_string_from_largeutf8(array: &dyn Array, row_index: usize) -> Value {
let array = as_largestring_array(array);
Value::String(array.value(row_index).to_string())
}

fn json_string_from_binary<T: ByteArrayType>(array: &dyn Array, row_index: usize) -> Value {
let array = array.as_bytes::<T>();
Value::String(format!("{:?}", array.value(row_index)))
}

fn json_string_from_fixedsizebinary(array: &dyn Array, row_index: usize) -> Value {
let array = array.as_fixed_size_binary();
Value::String(format!("{:?}", array.value(row_index)))
}

fn json_number_from_decimal<T>(
array: &dyn Array,
row_index: usize,
precision: u8,
scale: i8,
) -> Value
where
T: ArrowPrimitiveType + DecimalType,
{
let array = array.as_primitive::<T>();
let value = array.value(row_index);
Value::String(<T as DecimalType>::format_decimal(value, precision, scale))
}

fn field_to_json_value(column: &dyn Array, row_index: usize) -> Value {
let i = row_index;
let value = if column.is_null(i) {
Value::Null
} else {
match column.data_type() {
DataType::Null => Value::Null,
DataType::Boolean => Value::Bool(field!(column[i], BooleanArray)),
DataType::Int8 => json_number_from_primitive::<Int8Type>(column, i),
DataType::Int16 => json_number_from_primitive::<Int16Type>(column, i),
DataType::Int32 => json_number_from_primitive::<Int32Type>(column, i),
DataType::Int64 => json_number_from_primitive::<Int64Type>(column, i),
DataType::UInt8 => json_number_from_primitive::<UInt8Type>(column, i),
DataType::UInt16 => json_number_from_primitive::<UInt16Type>(column, i),
DataType::UInt32 => json_number_from_primitive::<UInt32Type>(column, i),
DataType::UInt64 => json_number_from_primitive::<UInt64Type>(column, i),
DataType::Float16 => json_number_from_primitive::<Float16Type>(column, i),
DataType::Float32 => json_number_from_primitive::<Float32Type>(column, i),
DataType::Float64 => json_number_from_primitive::<Float64Type>(column, i),

DataType::Date32
| DataType::Date64
| DataType::Timestamp(_, _)
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_) => json_string_from_datetime(column, i),

DataType::Interval(IntervalUnit::DayTime) => Value::String({
let value = field!(column[i], IntervalDayTimeArray);
let (days, milliseconds) = (value as i32, (value >> 32) as i32);
Iso8601Duration::DaysMilliseconds(days, milliseconds).to_string()
}),
DataType::Interval(IntervalUnit::MonthDayNano) => Value::String({
let value = field!(column[i], IntervalMonthDayNanoArray);
let (months, days, nanoseconds) =
(value as i32, (value >> 32) as i32, (value >> 64) as i64);
Iso8601Duration::MonthsDaysNanoseconds(months, days, nanoseconds).to_string()
}),
DataType::Interval(IntervalUnit::YearMonth) => Value::String({
let months = field!(column[i], IntervalYearMonthArray);
Iso8601Duration::Months(months).to_string()
}),

DataType::Binary => json_string_from_binary::<BinaryType>(column, i),
DataType::FixedSizeBinary(_) => json_string_from_fixedsizebinary(column, i),
DataType::LargeBinary => json_string_from_binary::<LargeBinaryType>(column, i),
DataType::Utf8 => json_string_from_utf8(column, i),
DataType::LargeUtf8 => json_string_from_largeutf8(column, i),

DataType::Decimal128(precision, scale) => {
json_number_from_decimal::<Decimal128Type>(column, i, *precision, *scale)
}
DataType::Decimal256(precision, scale) => {
json_number_from_decimal::<Decimal256Type>(column, i, *precision, *scale)
}

DataType::List(_)
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::RunEndEncoded(_, _) => unimplemented!(),
}
};

value
}
1 change: 1 addition & 0 deletions dozer-api/src/sql/datafusion/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod json;
mod predicate_pushdown;

use std::{any::Any, sync::Arc};
Expand Down
1 change: 1 addition & 0 deletions dozer-api/src/sql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod datafusion;
pub mod pgwire;
mod util;
31 changes: 2 additions & 29 deletions dozer-api/src/sql/pgwire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use crate::shutdown::ShutdownReceiver;
use crate::sql::datafusion::SQLExecutor;
use crate::CacheEndpoint;

use super::util::Iso8601Duration;

pub struct PgWireServer {
config: SqlOptions,
}
Expand Down Expand Up @@ -478,35 +480,6 @@ fn encode_field(
}
}

#[derive(Debug)]
enum Iso8601Duration {
Duration(chrono::Duration),
DaysMilliseconds(i32, i32),
MonthsDaysNanoseconds(i32, i32, i64),
Months(i32),
}

impl std::fmt::Display for Iso8601Duration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Iso8601Duration::Duration(d) => std::fmt::Display::fmt(&d, f),
Iso8601Duration::DaysMilliseconds(days, msecs) => {
let secs = msecs.div_euclid(1_000);
let msecs = msecs.rem_euclid(1_000);
write!(f, "P{}DT{}.{:03}S", days, secs, msecs)
}
Iso8601Duration::MonthsDaysNanoseconds(months, days, nanos) => {
let secs = nanos.div_euclid(1_000_000_000);
let nanos = nanos.rem_euclid(1_000_000_000);
write!(f, "P{}M{}DT{}.{:09}S", months, days, secs, nanos)
}
Iso8601Duration::Months(months) => {
write!(f, "P{}M", months)
}
}
}
}

fn generic_error_info(err: String) -> ErrorInfo {
ErrorInfo::new("ERROR".to_string(), "2F000".to_string(), err)
}
28 changes: 28 additions & 0 deletions dozer-api/src/sql/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#[derive(Debug)]
pub enum Iso8601Duration {
Duration(chrono::Duration),
DaysMilliseconds(i32, i32),
MonthsDaysNanoseconds(i32, i32, i64),
Months(i32),
}

impl std::fmt::Display for Iso8601Duration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Iso8601Duration::Duration(d) => std::fmt::Display::fmt(&d, f),
Iso8601Duration::DaysMilliseconds(days, msecs) => {
let secs = msecs.div_euclid(1_000);
let msecs = msecs.rem_euclid(1_000);
write!(f, "P{}DT{}.{:03}S", days, secs, msecs)
}
Iso8601Duration::MonthsDaysNanoseconds(months, days, nanos) => {
let secs = nanos.div_euclid(1_000_000_000);
let nanos = nanos.rem_euclid(1_000_000_000);
write!(f, "P{}M{}DT{}.{:09}S", months, days, secs, nanos)
}
Iso8601Duration::Months(months) => {
write!(f, "P{}M", months)
}
}
}
}
41 changes: 37 additions & 4 deletions dozer-tests/src/cache_tests/filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bson::{doc, Document};
use bson::{doc, Bson, Document};
use dozer_cache::cache::expression::{FilterExpression, Operator, QueryExpression};
use dozer_types::types::Record;
use dozer_types::{serde_json::Value, types::Record};
use futures::stream::StreamExt;
use mongodb::Collection;

Expand Down Expand Up @@ -44,13 +44,13 @@ fn convert_filter(filter: Option<&FilterExpression>) -> Document {
Operator::GTE => "$gte",
_ => unreachable!(),
};
document.insert(name, doc! {operator: bson::to_bson(value).unwrap()});
document.insert(name, doc! {operator: to_bson(value).unwrap()});
}
Operator::Contains => {
document.insert(
"$text",
doc! {
"$search": bson::to_bson(value).unwrap()
"$search": to_bson(value).unwrap()
},
);
}
Expand Down Expand Up @@ -103,3 +103,36 @@ fn check_equals(film: &Film, record: &Record) {
);
assert!(values.next().is_none());
}

fn to_bson(value: &Value) -> bson::ser::Result<bson::Bson> {
// this match block's sole purpose is to properly convert `serde_json::Number` to Bson
// when `serde_json/arbitrary_precision` feature is enabled.
// `bson::to_bson()` by itself does not properly convert it.
match value {
Value::Number(number) => {
let bson_value = if let Some(n) = number.as_i64() {
Bson::Int64(n)
} else if let Some(n) = number.as_f64() {
Bson::Double(n)
} else {
bson::to_bson(value)?
};
Ok(bson_value)
}
Value::Array(vec) => {
let mut array = Vec::with_capacity(vec.len());
for value in vec {
array.push(to_bson(value)?)
}
Ok(array.into())
}
Value::Object(map) => {
let mut object = bson::Document::new();
for (key, value) in map.into_iter() {
object.insert(key, to_bson(value)?);
}
Ok(object.into())
}
value => bson::to_bson(value),
}
}
2 changes: 1 addition & 1 deletion dozer-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
[dependencies]
chrono = { version = "0.4.23", features = ["serde"] }
serde = { version = "1.0.189", features = ["derive", "rc"] }
serde_json = { version = "1.0.93", features = ["std"] }
serde_json = { version = "1.0.108", features = ["arbitrary_precision"] }
ijson = "0.1.3"
rust_decimal = { version = "1.32", features = ["serde-str", "db-postgres"] }
bincode = { workspace = true, features = ["serde"] }
Expand Down
Loading

0 comments on commit 1e9ff97

Please sign in to comment.