Skip to content

Commit

Permalink
refactor: Use JsonValue instead of serde_json::Value
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Nov 27, 2023
1 parent 34e3e39 commit 3cb2e1c
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 97 deletions.
17 changes: 7 additions & 10 deletions dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use crate::sql::datafusion::SQLExecutor;
use crate::CacheEndpoint;
use crate::{auth::Access, errors::ApiError};
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::json_types::field_to_json_value;
use dozer_types::serde_json::{json, Value};
use dozer_types::json_types::{field_to_json_value, json, JsonValue};

use self::extractor::QueryExpressionExtractor;

Expand Down Expand Up @@ -90,10 +89,8 @@ pub async fn list(
}

// Generated get function for health check
pub async fn health_route() -> Result<HttpResponse, ApiError> {
let status = ServingStatus::Serving;
let resp = json!({ "status": status.as_str_name() }).to_string();
Ok(HttpResponse::Ok().body(resp))
pub async fn health_route() -> HttpResponse {
HttpResponse::Ok().json(json!({ "status": ServingStatus::Serving.as_str_name() }))
}

pub async fn count(
Expand Down Expand Up @@ -132,7 +129,7 @@ fn get_records_map(
access: Option<ReqData<Access>>,
cache_endpoint: ReqData<Arc<CacheEndpoint>>,
exp: &mut QueryExpression,
) -> Result<Vec<IndexMap<String, Value>>, ApiError> {
) -> Result<Vec<IndexMap<String, JsonValue>>, ApiError> {
let mut maps = vec![];
let cache_reader = &cache_endpoint.cache_reader();
let records = get_records(
Expand All @@ -153,18 +150,18 @@ fn get_records_map(
fn record_to_map(
record: CacheRecord,
schema: &Schema,
) -> Result<IndexMap<String, Value>, CannotConvertF64ToJson> {
) -> Result<IndexMap<String, JsonValue>, CannotConvertF64ToJson> {
let mut map = IndexMap::new();

for (field_def, field) in schema.fields.iter().zip(record.record.values) {
let val = field_to_json_value(field);
map.insert(field_def.name.clone(), val);
}

map.insert("__dozer_record_id".to_string(), Value::from(record.id));
map.insert("__dozer_record_id".to_string(), JsonValue::from(record.id));
map.insert(
"__dozer_record_version".to_string(),
Value::from(record.version),
JsonValue::from(record.version),
);

Ok(map)
Expand Down
50 changes: 26 additions & 24 deletions dozer-deno/src/runtime/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
use std::ops::Deref;

use deno_runtime::{
deno_core::{
anyhow::{bail, Context as _},
error::AnyError,
},
deno_napi::v8::{self, HandleScope, Local},
};
use dozer_types::serde_json::{self, Number};
use dozer_types::json_types::{DestructuredJson, JsonObject, JsonValue};

pub fn to_v8<'s>(
scope: &mut HandleScope<'s>,
value: serde_json::Value,
value: JsonValue,
) -> Result<Local<'s, v8::Value>, AnyError> {
match value {
serde_json::Value::Null => Ok(v8::null(scope).into()),
serde_json::Value::Bool(value) => Ok(v8::Boolean::new(scope, value).into()),
serde_json::Value::Number(value) => {
let value = value.as_f64().context("number is not a f64")?;
match value.destructure() {
DestructuredJson::Null => Ok(v8::null(scope).into()),
DestructuredJson::Bool(value) => Ok(v8::Boolean::new(scope, value).into()),
DestructuredJson::Number(value) => {
let value = value.to_f64().context("number is not a f64")?;
Ok(v8::Number::new(scope, value).into())
}
serde_json::Value::String(value) => Ok(v8::String::new(scope, &value)
.context(format!("failed to create string {}", value))?
DestructuredJson::String(value) => Ok(v8::String::new(scope, &value)
.context(format!("failed to create string {}", value.deref()))?
.into()),
serde_json::Value::Array(values) => {
DestructuredJson::Array(values) => {
let array = v8::Array::new(scope, values.len() as i32);
for (index, value) in values.into_iter().enumerate() {
let value = to_v8(scope, value)?;
array.set_index(scope, index as u32, value);
}
Ok(array.into())
}
serde_json::Value::Object(map) => {
DestructuredJson::Object(map) => {
let object = v8::Object::new(scope);
for (key, value) in map.into_iter() {
let key = v8::String::new(scope, &key)
.context(format!("failed to create key {}", key))?;
.context(format!("failed to create key {}", key.deref()))?;
let value = to_v8(scope, value)?;
object.set(scope, key.into(), value);
}
Expand All @@ -45,30 +47,30 @@ pub fn to_v8<'s>(
pub fn from_v8<'s>(
scope: &mut HandleScope<'s>,
value: Local<'s, v8::Value>,
) -> Result<serde_json::Value, AnyError> {
) -> Result<JsonValue, AnyError> {
if value.is_null_or_undefined() {
Ok(serde_json::Value::Null)
Ok(JsonValue::NULL)
} else if value.is_boolean() {
Ok(serde_json::Value::Bool(value.boolean_value(scope)))
Ok(value.boolean_value(scope).into())
} else if value.is_number() {
Ok(serde_json::Value::Number(
Number::from_f64(value.number_value(scope).context("number is not a f64")?)
.context("f64 number cannot be represented in JSON")?,
))
Ok(value
.number_value(scope)
.context("number is not a f64")?
.into())
} else if let Ok(value) = TryInto::<Local<v8::String>>::try_into(value) {
Ok(serde_json::Value::String(value.to_rust_string_lossy(scope)))
Ok(value.to_rust_string_lossy(scope).into())
} else if let Ok(value) = TryInto::<Local<v8::Array>>::try_into(value) {
let mut values = Vec::new();
for index in 0..value.length() {
let value = value.get_index(scope, index).unwrap();
let value = from_v8(scope, value)?;
values.push(value);
}
Ok(serde_json::Value::Array(values))
Ok(values.into())
} else if let Ok(value) = TryInto::<Local<v8::Object>>::try_into(value) {
let mut map = serde_json::Map::new();
let mut map = JsonObject::new();
let Some(keys) = value.get_own_property_names(scope, Default::default()) else {
return Ok(serde_json::Value::Object(map));
return Ok(map.into());
};
for index in 0..keys.length() {
let key = keys.get_index(scope, index).unwrap();
Expand All @@ -77,7 +79,7 @@ pub fn from_v8<'s>(
let value = from_v8(scope, value)?;
map.insert(key, value);
}
Ok(serde_json::Value::Object(map))
Ok(map.into())
} else {
bail!("cannot convert v8 value to JSON because its type is not supported")
}
Expand Down
18 changes: 9 additions & 9 deletions dozer-deno/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use deno_runtime::{
deno_napi::v8::{self, undefined, Function, Global, Local},
};
use dozer_types::{
json_types::JsonValue,
log::{error, info},
serde_json::Value,
thiserror,
};
use tokio::{
Expand Down Expand Up @@ -118,8 +118,8 @@ impl Runtime {
pub async fn call_function(
&mut self,
id: NonZeroI32,
args: Vec<Value>,
) -> Result<Value, AnyError> {
args: Vec<JsonValue>,
) -> Result<JsonValue, AnyError> {
let (return_sender, return_receiver) = oneshot::channel();
if self
.work_sender
Expand All @@ -140,7 +140,7 @@ impl Runtime {
}

// Return type is actually `!`
async fn propagate_panic(&mut self) -> Result<Value, AnyError> {
async fn propagate_panic(&mut self) -> Result<JsonValue, AnyError> {
self.handle
.take()
.expect("runtime panicked before and cannot be used again")
Expand Down Expand Up @@ -191,8 +191,8 @@ async fn load_functions(
enum Work {
CallFunction {
id: NonZeroI32,
args: Vec<Value>,
return_sender: oneshot::Sender<Result<Value, AnyError>>,
args: Vec<JsonValue>,
return_sender: oneshot::Sender<Result<JsonValue, AnyError>>,
},
}

Expand Down Expand Up @@ -253,9 +253,9 @@ fn do_work(runtime: &mut JsRuntime, work: Work, functions: &HashMap<NonZeroI32,
fn call_function(
runtime: &mut JsRuntime,
function: NonZeroI32,
args: Vec<Value>,
args: Vec<JsonValue>,
functions: &HashMap<NonZeroI32, Global<Function>>,
) -> Result<Value, AnyError> {
) -> Result<JsonValue, AnyError> {
let function = functions
.get(&function)
.context(format!("function {} not found", function))?;
Expand All @@ -268,7 +268,7 @@ fn call_function(
let result = Local::new(scope, function).call(scope, recv.into(), &args);
result
.map(|value| from_v8(scope, value))
.unwrap_or(Ok(Value::Null))
.unwrap_or(Ok(JsonValue::NULL))
}

mod conversion;
Expand Down
11 changes: 5 additions & 6 deletions dozer-lambda/src/js/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::{num::NonZeroI32, sync::Arc};

use dozer_log::tokio::runtime::Runtime;
use dozer_types::{
json_types::field_to_json_value,
json_types::{field_to_json_value, json, JsonObject, JsonValue},
log::error,
serde_json::{json, Value},
types::{Field, Operation},
};

Expand Down Expand Up @@ -46,10 +45,10 @@ impl Worker {
}
}

fn create_record_json_value(field_names: Vec<String>, values: Vec<Field>) -> Value {
let mut record = Value::Object(Default::default());
fn create_record_json_value(field_names: Vec<String>, values: Vec<Field>) -> JsonValue {
let mut record = JsonObject::new();
for (field_name, value) in field_names.into_iter().zip(values.into_iter()) {
record[field_name] = field_to_json_value(value);
record.insert(field_name, field_to_json_value(value));
}
record
record.into()
}
21 changes: 11 additions & 10 deletions dozer-log-js/src/mapper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::ops::Deref;

use dozer_log::replication::LogOperation;
use dozer_types::{
json_types::field_to_json_value,
serde_json::Value,
json_types::{field_to_json_value, DestructuredJson, JsonValue},
types::{Field, Operation, Record, Schema},
};
use neon::{
Expand Down Expand Up @@ -89,21 +90,21 @@ fn map_value<'a, C: Context<'a>>(value: Field, cx: &mut C) -> JsResult<'a, JsVal
map_json_value(field_to_json_value(value), cx)
}

fn map_json_value<'a, C: Context<'a>>(value: Value, cx: &mut C) -> JsResult<'a, JsValue> {
match value {
Value::Null => Ok(cx.null().upcast()),
Value::Bool(b) => Ok(cx.boolean(b).upcast()),
Value::Number(n) => Ok(cx.number(n.as_f64().unwrap_or(f64::NAN)).upcast()),
Value::String(s) => Ok(cx.string(s).upcast()),
Value::Array(a) => {
fn map_json_value<'a, C: Context<'a>>(value: JsonValue, cx: &mut C) -> JsResult<'a, JsValue> {
match value.destructure() {
DestructuredJson::Null => Ok(cx.null().upcast()),
DestructuredJson::Bool(b) => Ok(cx.boolean(b).upcast()),
DestructuredJson::Number(n) => Ok(cx.number(n.to_f64().unwrap_or(f64::NAN)).upcast()),
DestructuredJson::String(s) => Ok(cx.string(s.deref()).upcast()),
DestructuredJson::Array(a) => {
let result = cx.empty_array();
for (i, v) in a.into_iter().enumerate() {
let v = map_json_value(v, cx)?;
result.set(cx, i as u32, v)?;
}
Ok(result.upcast())
}
Value::Object(o) => {
DestructuredJson::Object(o) => {
let result = cx.empty_object();
for (k, v) in o.into_iter() {
let v = map_json_value(v, cx)?;
Expand Down
9 changes: 1 addition & 8 deletions dozer-sql/expression/src/javascript/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use std::{num::NonZeroI32, sync::Arc};

use dozer_deno::deno_runtime::deno_core::error::AnyError;
use dozer_types::{
errors::types::DeserializationError,
json_types::{json_value_to_serde_json, serde_json_to_json_value},
thiserror,
types::{Field, FieldType, Record, Schema, SourceDefinition},
};
Expand Down Expand Up @@ -34,8 +32,6 @@ pub enum Error {
CreateRuntime(#[from] dozer_deno::RuntimeError),
#[error("failed to evaluate udf: {0}")]
Evaluate(#[source] AnyError),
#[error("failed to convert json: {0}")]
JsonConversion(#[source] DeserializationError),
}

impl Udf {
Expand Down Expand Up @@ -105,11 +101,8 @@ async fn evaluate_impl(

let mut runtime = runtime.lock().await;
let result = runtime
.call_function(function, vec![json_value_to_serde_json(&arg)])
.call_function(function, vec![arg])
.await
.map_err(Error::Evaluate)?;
drop(runtime);

let result = serde_json_to_json_value(result).map_err(Error::JsonConversion)?;
Ok(Field::Json(result))
}
Loading

0 comments on commit 3cb2e1c

Please sign in to comment.