diff --git a/core/Cargo.toml b/core/Cargo.toml index 14b6f8d..69d999b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,42 +1,43 @@ -[package] -name = "drasi-core" -version.workspace = true -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[features] -default = [] -parallel_solver = [] - -[dependencies] -async-trait = "0.1.68" -drasi-query-ast = { path = "../query-ast" } -drasi-query-cypher = { path = "../query-cypher" } -hashers = "1.0.1" -ordered-float = "3.7.0" -serde = { version = "1", features = ["derive"] } -serde_json = "1" -rand = { version = "0.8.5", features = ["small_rng"] } -tokio = { version = "1.29.1", features = ["full"] } -async-recursion = "1.0.4" -futures = "0.3.28" -tokio-stream = "0.1.14" -async-stream = "0.3.5" -itoa = "1.0.1" -caches = "0.2.4" -log = "0.4.20" -tracing = "0.1.37" -opentelemetry = "0.20" -chrono = { version = "0.4.31", features = ["serde"] } -dateparser = "0.2.0" -chrono-tz = "0.8.3" -regex = "1.9.5" -zoneinfo_parse = "0.1.4" -iso8601-duration = "0.2.0" -round = "0.1.2" -priority-queue = "1.3.2" -thiserror = "1.0.50" -statistical = "1.0.0" -approx = "0.5.1" -lazy_static = "1.4.0" +[package] +name = "drasi-core" +version.workspace = true +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +default = [] +parallel_solver = [] + +[dependencies] +async-trait = "0.1.68" +drasi-query-ast = { path = "../query-ast" } +drasi-query-cypher = { path = "../query-cypher" } +hashers = "1.0.1" +ordered-float = "3.7.0" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +rand = { version = "0.8.5", features = ["small_rng"] } +tokio = { version = "1.29.1", features = ["full"] } +async-recursion = "1.0.4" +futures = "0.3.28" +tokio-stream = "0.1.14" +async-stream = "0.3.5" +itoa = "1.0.1" +caches = "0.2.4" +log = "0.4.20" +tracing = "0.1.37" +opentelemetry = "0.20" +chrono = { version = "0.4.31", features = ["serde"] } +dateparser = "0.2.0" +chrono-tz = "0.8.3" +regex = "1.9.5" +zoneinfo_parse = "0.1.4" +iso8601-duration = "0.2.0" +round = "0.1.2" +priority-queue = "1.3.2" +thiserror = "1.0.50" +statistical = "1.0.0" +approx = "0.5.1" +lazy_static = "1.4.0" +once_cell = "1.19.0" diff --git a/core/src/evaluation/expressions/mod.rs b/core/src/evaluation/expressions/mod.rs index 977b5c5..00696dd 100644 --- a/core/src/evaluation/expressions/mod.rs +++ b/core/src/evaluation/expressions/mod.rs @@ -8,7 +8,7 @@ use async_recursion::async_recursion; use chrono::{ Datelike, Duration, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday, }; -use drasi_query_ast::ast::{self, ParentExpression}; +use drasi_query_ast::ast; use crate::evaluation::temporal_constants; use crate::evaluation::variable_value::duration::Duration as DurationStruct; @@ -21,7 +21,7 @@ use crate::interface::{ResultKey, ResultOwner}; use crate::{evaluation::variable_value::VariableValue, interface::ResultIndex}; use super::{ - context::{ExpressionEvaluationContext, QueryVariables, SideEffects}, + context::{ExpressionEvaluationContext, SideEffects}, functions::{aggregation::Accumulator, Function, FunctionRegistry}, EvaluationError, }; @@ -67,6 +67,9 @@ impl ExpressionEvaluator { ast::Expression::ObjectExpression(expression) => { self.evaluate_object_expression(context, expression).await } + ast::Expression::IteratorExpression(expression) => { + self.evaluate_iterator_expression(context, expression).await + } } } @@ -102,6 +105,7 @@ impl ExpressionEvaluator { ast::Expression::CaseExpression(_) => "case", ast::Expression::ListExpression(_) => "list", ast::Expression::ObjectExpression(_) => "object", + ast::Expression::IteratorExpression(_) => "iterator", }; Ok((alias.to_string(), value)) @@ -1297,86 +1301,6 @@ impl ExpressionEvaluator { _ => VariableValue::Bool(false), } } - ast::BinaryExpression::Iterate(_e1, _e2) => { - // This expression will be solely used in reduce(). - // It should not (and will not) be evaluated here. - todo!() - } - ast::BinaryExpression::Reduce(_e1, _e2) => { - // This expression will be solely used in reduce(). - // It should not (and will not) be evaluated here. - todo!() - } - ast::BinaryExpression::Filter(e1, e2) => { - // x in [a,b,c], where x > 0 - let in_expression = *e1.clone(); //Check - let filter_expression = *e2.clone(); - let mut result = Vec::new(); - let (variable, val_list) = self - .get_in_expression_element(context, in_expression) - .await?; - let mut context_variables = QueryVariables::new(); - let mut filter_and_map = false; - let mut condition_expression: Option = None; - let mut evaluation_expression: Option = None; - if let ast::Expression::UnaryExpression(exp) = filter_expression.clone() { - if let ast::UnaryExpression::Literal(literal) = exp { - if let ast::Literal::Expression(iterate_expression) = literal { - filter_and_map = true; - condition_expression = - Some(iterate_expression.get_children()[0].clone()); - evaluation_expression = - Some(iterate_expression.get_children()[1].clone()); - } - } - } - for element in &val_list { - // if filter_expression is a UnaryExpression::Literal::Expression (e.g. x in [a,b,c], where x > 0 | x+1 ) - if filter_and_map { - context_variables.insert(variable.to_string().into(), element.clone()); - let local_context = ExpressionEvaluationContext::new( - &context_variables, - context.get_clock(), - ); - let evaluation_result = self - .evaluate_expression( - &local_context, - &condition_expression.clone().unwrap(), - ) - .await; - - if let Ok(result_value) = evaluation_result { - if result_value == VariableValue::Bool(true) { - let evaluation_result = self - .evaluate_expression( - &local_context, - &evaluation_expression.clone().unwrap(), - ) - .await; - if let Ok(result_value) = evaluation_result { - result.push(result_value); - } - } - } - } else { - context_variables.insert(variable.to_string().into(), element.clone()); - let local_context = ExpressionEvaluationContext::new( - &context_variables, - context.get_clock(), - ); - let evaluation_result = self - .evaluate_expression(&local_context, &filter_expression.clone()) - .await; - - if let Ok(result_value) = evaluation_result { - if result_value == VariableValue::Bool(true) { - result.push(element.clone()); - } - } - } - } - VariableValue::List(result) - } ast::BinaryExpression::Index(e1, e2) => { let index_exp = self.evaluate_expression(context, e2).await?; let variable_value_list = self.evaluate_expression(context, e1).await?; @@ -1454,6 +1378,13 @@ impl ExpressionEvaluator { error: Box::new(e), })? } + Function::LazyScalar(scalar) => scalar + .call(context, expression, &expression.args) + .await + .map_err(|e| EvaluationError::FunctionError { + function_name: expression.name.to_string(), + error: Box::new(e), + })?, Function::Aggregating(aggregate) => { let mut values = Vec::new(); for arg in &expression.args { @@ -1596,37 +1527,6 @@ impl ExpressionEvaluator { ) -> Result { let mut result = Vec::new(); for e in &expression.elements { - //if e is type of filter expression, then we need to evaluate it differently - if let ast::Expression::BinaryExpression(exp) = e { - if let ast::BinaryExpression::Filter(_e1, _e2) = exp { - //println!("exp: {:?}", exp); - return self.evaluate_binary_expression(context, exp).await; - } - } else if let ast::Expression::UnaryExpression(exp) = e { - if let ast::UnaryExpression::Literal(literal) = exp { - if let ast::Literal::Expression(iterate_expression) = literal { - let in_expression = iterate_expression.get_children()[0]; - let (variable, val_list) = self - .get_in_expression_element(&context.clone(), in_expression.clone()) - .await?; - let mapping_expression = iterate_expression.get_children()[1].clone(); - let mut variables = context.clone_variables(); - for element in val_list { - variables.insert(variable.to_string().into(), element.clone()); - let local_context = - ExpressionEvaluationContext::new(&variables, context.get_clock()); - let evaluation_result = self - .evaluate_expression(&local_context, &mapping_expression.clone()) - .await; - if let Ok(result_value) = evaluation_result { - result.push(result_value.clone()); - } - } - return Ok(VariableValue::List(result)); - } - } - } - result.push(self.evaluate_expression(context, e).await?); } @@ -1649,30 +1549,126 @@ impl ExpressionEvaluator { Ok(VariableValue::Object(result)) } - async fn get_in_expression_element( + pub async fn evaluate_assignment( &self, context: &ExpressionEvaluationContext<'_>, - in_expression: ast::Expression, - ) -> Result<(String, Vec), EvaluationError> { - let variable_name = match in_expression.get_children()[0] { - // x - ast::Expression::UnaryExpression(exp) => match exp { - ast::UnaryExpression::Identifier(ident) => ident, - _ => return Err(EvaluationError::InvalidType), + expression: &ast::Expression, + ) -> Result<(Arc, VariableValue), EvaluationError> { + match expression { + ast::Expression::BinaryExpression(exp) => match exp { + ast::BinaryExpression::Eq(var, val) => { + let variable = match *var.clone() { + ast::Expression::UnaryExpression(exp) => match exp { + ast::UnaryExpression::Identifier(ident) => ident, + _ => return Err(EvaluationError::InvalidType), + }, + _ => return Err(EvaluationError::InvalidType), + }; + let value = self.evaluate_expression(context, val).await?; + Ok((variable, value)) + } + _ => Err(EvaluationError::InvalidType), }, - _ => return Err(EvaluationError::InvalidType), - }; - let list_expression = in_expression.get_children()[1]; - let list_evaluated = self - .evaluate_expression(context, &list_expression.clone()) - .await - .unwrap(); - let val_list = match list_evaluated { - VariableValue::List(list) => list, - _ => return Err(EvaluationError::InvalidType), - }; + _ => Err(EvaluationError::InvalidType), + } + } + + async fn evaluate_iterator_expression( + &self, + context: &ExpressionEvaluationContext<'_>, + expression: &ast::IteratorExpression, + ) -> Result { + let items = self + .evaluate_expression(context, &expression.list_expression) + .await?; + match items { + VariableValue::List(items) => { + let mut result = Vec::new(); + let mut variables = context.clone_variables(); + for item in items { + if let Some(filter) = &expression.filter { + variables + .insert(expression.item_identifier.to_string().into(), item.clone()); + let local_context = + ExpressionEvaluationContext::new(&variables, context.get_clock()); + if !self.evaluate_predicate(&local_context, filter).await? { + continue; + } + } - Ok((variable_name.to_string(), val_list)) + match &expression.map_expression { + Some(map_expression) => { + variables.insert( + expression.item_identifier.to_string().into(), + item.clone(), + ); + let local_context = + ExpressionEvaluationContext::new(&variables, context.get_clock()); + let item_result = self + .evaluate_expression(&local_context, map_expression) + .await?; + result.push(item_result); + } + None => { + result.push(item); + } + } + } + Ok(VariableValue::List(result)) + } + _ => Err(EvaluationError::InvalidType), + } + } + + pub async fn reduce_iterator_expression( + &self, + context: &ExpressionEvaluationContext<'_>, + expression: &ast::IteratorExpression, + accumulator_variable: Arc, + ) -> Result { + let items = self + .evaluate_expression(context, &expression.list_expression) + .await?; + match items { + VariableValue::List(items) => { + let mut result = match context.get_variable(accumulator_variable.clone()) { + Some(value) => value.clone(), + None => VariableValue::Null, + }; + let mut variables = context.clone_variables(); + for item in items { + if let Some(filter) = &expression.filter { + variables + .insert(expression.item_identifier.to_string().into(), item.clone()); + let local_context = + ExpressionEvaluationContext::new(&variables, context.get_clock()); + + if !self.evaluate_predicate(&local_context, filter).await? { + continue; + } + } + + match &expression.map_expression { + Some(map_expression) => { + variables.insert( + expression.item_identifier.to_string().into(), + item.clone(), + ); + let local_context = + ExpressionEvaluationContext::new(&variables, context.get_clock()); + result = self + .evaluate_expression(&local_context, map_expression) + .await?; + variables + .insert(accumulator_variable.to_string().into(), result.clone()); + } + None => {} + } + } + Ok(result) + } + _ => Err(EvaluationError::InvalidType), + } } } diff --git a/core/src/evaluation/expressions/tests/list_construction.rs b/core/src/evaluation/expressions/tests/list_construction.rs index fb332c2..5b9e078 100644 --- a/core/src/evaluation/expressions/tests/list_construction.rs +++ b/core/src/evaluation/expressions/tests/list_construction.rs @@ -94,6 +94,31 @@ async fn test_list_comprehension_property_with_mapping_only() { ); } +#[tokio::test] +async fn test_list_comprehension_property_with_mapping_only_non_numeric() { + let expr = "[x IN ['foo', 'bar'] | x + 'baz']"; + + let expr = drasi_query_cypher::parse_expression(expr).unwrap(); + let function_registry = Arc::new(FunctionRegistry::new()); + let ari = Arc::new(InMemoryResultIndex::new()); + let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); + + let variables = QueryVariables::new(); + let context = + ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); + + assert_eq!( + evaluator + .evaluate_expression(&context, &expr) + .await + .unwrap(), + VariableValue::List(vec![ + VariableValue::String(String::from("foobaz")), + VariableValue::String(String::from("barbaz")), + ]) + ); +} + #[tokio::test] async fn test_list_comprehension_property() { let expr = "[x IN [123,342,12,34,-12] where x > 0| x - 10]"; //for each element in param1, return the value property diff --git a/core/src/evaluation/expressions/tests/list_reduce.rs b/core/src/evaluation/expressions/tests/list_reduce.rs index 5b09741..05ac220 100644 --- a/core/src/evaluation/expressions/tests/list_reduce.rs +++ b/core/src/evaluation/expressions/tests/list_reduce.rs @@ -1,196 +1,336 @@ -use crate::evaluation::context::QueryVariables; -use crate::evaluation::functions::FunctionRegistry; -use crate::evaluation::variable_value::integer::Integer; -use crate::evaluation::variable_value::VariableValue; -use crate::evaluation::{ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock}; -use crate::in_memory_index::in_memory_result_index::InMemoryResultIndex; -use std::collections::BTreeMap; -use std::sync::Arc; - -#[tokio::test] -async fn evaluate_reduce_func() { - let expr = "reduce(acc = 0, x IN [1,2,3] | acc + x)"; - - let expr = drasi_query_cypher::parse_expression(expr).unwrap(); - - let function_registry = Arc::new(FunctionRegistry::new()); - let ari = Arc::new(InMemoryResultIndex::new()); - let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); - - let variables = QueryVariables::new(); - - let context = - ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); - assert_eq!( - evaluator - .evaluate_expression(&context, &expr) - .await - .unwrap(), - VariableValue::Integer(Integer::from(6)) - ); -} - -#[tokio::test] -async fn evaluate_reduce_func_min_temp() { - let expr = "reduce ( minTemp = 0.0, sensorValVersion IN [11.2, 431, -75, 24] | - CASE WHEN sensorValVersion < minTemp THEN sensorValVersion - ELSE minTemp END)"; - - let expr = drasi_query_cypher::parse_expression(expr).unwrap(); - - let function_registry = Arc::new(FunctionRegistry::new()); - let ari = Arc::new(InMemoryResultIndex::new()); - let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); - - let variables = QueryVariables::new(); - - let context = - ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); - assert_eq!( - evaluator - .evaluate_expression(&context, &expr) - .await - .unwrap(), - VariableValue::Integer(Integer::from(-75)) - ); -} - -#[tokio::test] -async fn evaluate_reduce_func_sensor_value() { - let expr = "reduce (total = 0.0, val IN $param1| total + val.value)"; - - let expr = drasi_query_cypher::parse_expression(expr).unwrap(); - - let function_registry = Arc::new(FunctionRegistry::new()); - let ari = Arc::new(InMemoryResultIndex::new()); - let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); - - let mut variables = QueryVariables::new(); - - let sensor_val_version_list = vec![ - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(11)), - ); - map - }), - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(86)), - ); - map - }), - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(3)), - ); - map - }), - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(121)), - ); - map - }), - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(-45)), - ); - map - }), - ]; - variables.insert( - "param1".to_string().into(), - VariableValue::List(sensor_val_version_list), - ); - let context = - ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); - - assert_eq!( - evaluator - .evaluate_expression(&context, &expr) - .await - .unwrap(), - VariableValue::Float(176.0.into()) - ); -} - -#[tokio::test] -async fn evaluate_reduce_func_sensor_value_count() { - let expr = "reduce (count = 0, val IN $param1| count + 1)"; - - let expr = drasi_query_cypher::parse_expression(expr).unwrap(); - - let function_registry = Arc::new(FunctionRegistry::new()); - let ari = Arc::new(InMemoryResultIndex::new()); - let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); - - let mut variables = QueryVariables::new(); - - let sensor_val_version_list = vec![ - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(11)), - ); - map - }), - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(86)), - ); - map - }), - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(3)), - ); - map - }), - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(121)), - ); - map - }), - VariableValue::Object({ - let mut map = BTreeMap::new(); - map.insert( - "value".to_string(), - VariableValue::Integer(Integer::from(-45)), - ); - map - }), - ]; - variables.insert( - "param1".to_string().into(), - VariableValue::List(sensor_val_version_list), - ); - let context = - ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); - - assert_eq!( - evaluator - .evaluate_expression(&context, &expr) - .await - .unwrap(), - VariableValue::Integer(5.into()) - ); -} +use crate::evaluation::context::QueryVariables; +use crate::evaluation::functions::FunctionRegistry; +use crate::evaluation::variable_value::integer::Integer; +use crate::evaluation::variable_value::VariableValue; +use crate::evaluation::{ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock}; +use crate::in_memory_index::in_memory_result_index::InMemoryResultIndex; +use std::collections::BTreeMap; +use std::sync::Arc; + +#[tokio::test] +async fn evaluate_reduce_func() { + let expr = "reduce(acc = 0, x IN [1,2,3] | acc + x)"; + + let expr = drasi_query_cypher::parse_expression(expr).unwrap(); + + let function_registry = Arc::new(FunctionRegistry::new()); + let ari = Arc::new(InMemoryResultIndex::new()); + let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); + + let variables = QueryVariables::new(); + + let context = + ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); + assert_eq!( + evaluator + .evaluate_expression(&context, &expr) + .await + .unwrap(), + VariableValue::Integer(Integer::from(6)) + ); +} + +#[tokio::test] +async fn evaluate_reduce_func_min_temp() { + let expr = "reduce ( minTemp = 0.0, sensorValVersion IN [11.2, 431, -75, 24] | + CASE WHEN sensorValVersion < minTemp THEN sensorValVersion + ELSE minTemp END)"; + + let expr = drasi_query_cypher::parse_expression(expr).unwrap(); + + let function_registry = Arc::new(FunctionRegistry::new()); + let ari = Arc::new(InMemoryResultIndex::new()); + let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); + + let variables = QueryVariables::new(); + + let context = + ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); + assert_eq!( + evaluator + .evaluate_expression(&context, &expr) + .await + .unwrap(), + VariableValue::Integer(Integer::from(-75)) + ); +} + +#[tokio::test] +async fn evaluate_reduce_func_sensor_value() { + let expr = "reduce (total = 0.0, val IN $param1| total + val.value)"; + + let expr = drasi_query_cypher::parse_expression(expr).unwrap(); + + let function_registry = Arc::new(FunctionRegistry::new()); + let ari = Arc::new(InMemoryResultIndex::new()); + let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); + + let mut variables = QueryVariables::new(); + + let sensor_val_version_list = vec![ + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(11)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(86)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(3)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(121)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(-45)), + ); + map + }), + ]; + variables.insert( + "param1".to_string().into(), + VariableValue::List(sensor_val_version_list), + ); + let context = + ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); + + assert_eq!( + evaluator + .evaluate_expression(&context, &expr) + .await + .unwrap(), + VariableValue::Float(176.0.into()) + ); +} + +#[tokio::test] +async fn evaluate_reduce_func_sensor_value_count() { + let expr = "reduce (count = 0, val IN $param1| count + 1)"; + + let expr = drasi_query_cypher::parse_expression(expr).unwrap(); + + let function_registry = Arc::new(FunctionRegistry::new()); + let ari = Arc::new(InMemoryResultIndex::new()); + let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); + + let mut variables = QueryVariables::new(); + + let sensor_val_version_list = vec![ + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(11)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(86)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(3)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(121)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(-45)), + ); + map + }), + ]; + variables.insert( + "param1".to_string().into(), + VariableValue::List(sensor_val_version_list), + ); + let context = + ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); + + assert_eq!( + evaluator + .evaluate_expression(&context, &expr) + .await + .unwrap(), + VariableValue::Integer(5.into()) + ); +} + +#[tokio::test] +async fn evaluate_reduce_func_with_filter() { + let expr = "reduce (count = 0, val IN $param1 where val.value > 0| count + 1)"; + + let expr = drasi_query_cypher::parse_expression(expr).unwrap(); + + let function_registry = Arc::new(FunctionRegistry::new()); + let ari = Arc::new(InMemoryResultIndex::new()); + let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); + + let mut variables = QueryVariables::new(); + + let sensor_val_version_list = vec![ + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(11)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(86)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(3)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(121)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(-45)), + ); + map + }), + ]; + variables.insert( + "param1".to_string().into(), + VariableValue::List(sensor_val_version_list), + ); + let context = + ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); + + assert_eq!( + evaluator + .evaluate_expression(&context, &expr) + .await + .unwrap(), + VariableValue::Integer(4.into()) + ); +} + +#[tokio::test] +async fn evaluate_reduce_func_with_filter_no_results() { + let expr = "reduce (count = 0, val IN $param1 where 1 = 0| count + 1)"; + + let expr = drasi_query_cypher::parse_expression(expr).unwrap(); + + let function_registry = Arc::new(FunctionRegistry::new()); + let ari = Arc::new(InMemoryResultIndex::new()); + let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); + + let mut variables = QueryVariables::new(); + + let sensor_val_version_list = vec![ + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(11)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(86)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(3)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(121)), + ); + map + }), + VariableValue::Object({ + let mut map = BTreeMap::new(); + map.insert( + "value".to_string(), + VariableValue::Integer(Integer::from(-45)), + ); + map + }), + ]; + variables.insert( + "param1".to_string().into(), + VariableValue::List(sensor_val_version_list), + ); + let context = + ExpressionEvaluationContext::new(&variables, Arc::new(InstantQueryClock::new(0, 0))); + + assert_eq!( + evaluator + .evaluate_expression(&context, &expr) + .await + .unwrap(), + VariableValue::Integer(0.into()) + ); +} diff --git a/core/src/evaluation/functions/list.rs b/core/src/evaluation/functions/list.rs index 493d6cb..85b90e0 100644 --- a/core/src/evaluation/functions/list.rs +++ b/core/src/evaluation/functions/list.rs @@ -13,7 +13,7 @@ pub trait RegisterListFunctions { impl RegisterListFunctions for FunctionRegistry { fn register_list_functions(&self) { - self.register_function("reduce", Function::Scalar(Arc::new(Reduce {}))); + self.register_function("reduce", Function::LazyScalar(Arc::new(Reduce::new()))); self.register_function("tail", Function::Scalar(Arc::new(Tail {}))); } } diff --git a/core/src/evaluation/functions/list/reduce.rs b/core/src/evaluation/functions/list/reduce.rs index f2e24ec..a9568f0 100644 --- a/core/src/evaluation/functions/list/reduce.rs +++ b/core/src/evaluation/functions/list/reduce.rs @@ -1,90 +1,69 @@ -use crate::evaluation::functions::FunctionRegistry; -use crate::in_memory_index::in_memory_result_index::InMemoryResultIndex; -use async_trait::async_trait; -use std::sync::Arc; - -use crate::evaluation::functions::ScalarFunction; -use crate::evaluation::variable_value::VariableValue; -use crate::evaluation::{EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator}; -use drasi_query_ast::ast::{self, Expression, ParentExpression, UnaryExpression}; - -#[derive(Debug)] -pub struct Reduce {} - -#[async_trait] -impl ScalarFunction for Reduce { - async fn call( - &self, - context: &ExpressionEvaluationContext, - _expression: &ast::FunctionExpression, - args: Vec, - ) -> Result { - if args.len() != 2 { - return Err(EvaluationError::InvalidArgumentCount("reduce".to_string())); - } - let function_registry = Arc::new(FunctionRegistry::new()); - let ari = Arc::new(InMemoryResultIndex::new()); - let evaluator = ExpressionEvaluator::new(function_registry.clone(), ari.clone()); - - let mut query_variables = context.clone_variables(); //Retrieve the query variables from the global context - - match (&args[0], &args[1]) { - ( - VariableValue::Expression(accumulator), - VariableValue::Expression(iteration_pattern), - ) => { - let accumulator_object = evaluator - .evaluate_expression(context, &accumulator.clone()) - .await - .unwrap(); - let accumulator_map = accumulator_object.as_object().unwrap(); - - let in_expression = iteration_pattern.get_children()[0]; //a IN B | .... -> a IN B - - let expression = iteration_pattern.get_children()[1]; //a IN B | .... -> .... (actual expression) - let list_expression = in_expression.get_children()[1]; //a IN B -> B (list of literal values) - let variable = match in_expression.get_children()[0] { - Expression::UnaryExpression(exp) => match exp { - UnaryExpression::Identifier(ident) => ident, - _ => return Err(EvaluationError::InvalidType), - }, - _ => return Err(EvaluationError::InvalidType), - }; - - let list_evaluated = evaluator - .evaluate_expression(context, &list_expression.clone()) - .await - .unwrap(); - let val_list = match list_evaluated { - VariableValue::List(list) => list, - _ => return Err(EvaluationError::InvalidType), - }; - // Evaluate the list, converts Literal values into VariableValues - //vec![Litearl(1), Literal(2), Literal(3)] -> vec![Integer(1), Integer(2), Integer(3)] - - let accumulator_name = accumulator_map.keys().next().unwrap(); //acc - let accumulator_value = accumulator_map.get(accumulator_name).unwrap(); // 0 - - query_variables.insert(accumulator_name.clone().into(), accumulator_value.clone()); - - for value in &val_list { - query_variables.insert(variable.to_string().into_boxed_str(), value.clone()); - let context = - ExpressionEvaluationContext::new(&query_variables, context.get_clock()); - let result = evaluator - .evaluate_expression(&context, &expression.clone()) - .await - .unwrap(); - - query_variables.insert(accumulator_name.clone().into(), result.clone()); - } - - Ok(query_variables - .get(accumulator_name.as_str()) - .unwrap() - .clone()) - } - _ => return Err(EvaluationError::InvalidType), - } - } -} +use crate::evaluation::functions::FunctionRegistry; +use crate::in_memory_index::in_memory_result_index::InMemoryResultIndex; +use async_trait::async_trait; +use once_cell::sync::Lazy; +use std::fmt::Formatter; +use std::sync::Arc; + +use crate::evaluation::functions::LazyScalarFunction; +use crate::evaluation::variable_value::VariableValue; +use crate::evaluation::{EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator}; +use drasi_query_ast::ast::{self, Expression}; + +pub struct Reduce { + evaluator: Lazy, +} + +impl Reduce { + pub fn new() -> Self { + Reduce { + evaluator: Lazy::new(|| { + let function_registry = Arc::new(FunctionRegistry::new()); + let ari = Arc::new(InMemoryResultIndex::new()); + ExpressionEvaluator::new(function_registry, ari) + }), + } + } +} + +#[async_trait] +impl LazyScalarFunction for Reduce { + async fn call( + &self, + context: &ExpressionEvaluationContext, + _expression: &ast::FunctionExpression, + args: &Vec, + ) -> Result { + if args.len() != 2 { + return Err(EvaluationError::InvalidArgumentCount("reduce".to_string())); + } + + let initializer = &args[0]; + let iterator = match &args[1] { + Expression::IteratorExpression(i) => i, + _ => return Err(EvaluationError::InvalidType), + }; + + let (accumulator_name, accumulator) = self + .evaluator + .evaluate_assignment(context, initializer) + .await?; + + let mut query_variables = context.clone_variables(); //Retrieve the query variables from the global context + query_variables.insert(accumulator_name.to_string().into(), accumulator); + + let context = ExpressionEvaluationContext::new(&query_variables, context.get_clock()); + let result = self + .evaluator + .reduce_iterator_expression(&context, iterator, accumulator_name) + .await?; + + Ok(result) + } +} + +impl std::fmt::Debug for Reduce { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Reduce") + } +} diff --git a/core/src/evaluation/functions/mod.rs b/core/src/evaluation/functions/mod.rs index 801e8a4..48062e9 100644 --- a/core/src/evaluation/functions/mod.rs +++ b/core/src/evaluation/functions/mod.rs @@ -25,7 +25,7 @@ use self::{ text::RegisterTextFunctions, }; -use super::{EvaluationError, ExpressionEvaluationContext}; +use super::{EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator}; pub mod aggregation; pub mod context_mutators; @@ -42,6 +42,7 @@ pub mod text; pub enum Function { Scalar(Arc), + LazyScalar(Arc), Aggregating(Arc), ContextMutator(Arc), } @@ -56,6 +57,16 @@ pub trait ScalarFunction: Send + Sync { ) -> Result; } +#[async_trait] +pub trait LazyScalarFunction: Send + Sync { + async fn call( + &self, + context: &ExpressionEvaluationContext, + expression: &ast::FunctionExpression, + args: &Vec, + ) -> Result; +} + #[async_trait] pub trait AggregatingFunction: Debug + Send + Sync { fn initialize_accumulator( diff --git a/core/src/evaluation/parts/tests/multi_part.rs b/core/src/evaluation/parts/tests/multi_part.rs index c956570..cb62b16 100644 --- a/core/src/evaluation/parts/tests/multi_part.rs +++ b/core/src/evaluation/parts/tests/multi_part.rs @@ -413,7 +413,7 @@ async fn aggregating_part_to_aggregating_part_add_solution() { " MATCH (a) WHERE a.Value1 < 10 - WITH a.Name, a.Category, sum(a.Value1) as my_sum + WITH a.Name, sum(a.Value1) as my_sum, a.Category RETURN Category, avg(my_sum) as my_avg ", ); diff --git a/query-ast/src/ast.rs b/query-ast/src/ast.rs index e599352..6479967 100644 --- a/query-ast/src/ast.rs +++ b/query-ast/src/ast.rs @@ -227,7 +227,8 @@ pub enum Expression { FunctionExpression(FunctionExpression), CaseExpression(CaseExpression), ListExpression(ListExpression), - ObjectExpression(ObjectExpression), + ObjectExpression(ObjectExpression), //do we really need this? + IteratorExpression(IteratorExpression), } impl ParentExpression for Expression { @@ -239,6 +240,7 @@ impl ParentExpression for Expression { Expression::CaseExpression(expr) => expr.get_children(), Expression::ListExpression(expr) => expr.get_children(), Expression::ObjectExpression(expr) => expr.get_children(), + Expression::IteratorExpression(expr) => expr.get_children(), } } } @@ -379,10 +381,7 @@ pub enum BinaryExpression { Divide(Box, Box), Modulo(Box, Box), Exponent(Box, Box), - Iterate(Box, Box), - Reduce(Box, Box), HasLabel(Box, Box), - Filter(Box, Box), Index(Box, Box), } @@ -451,18 +450,6 @@ impl BinaryExpression { Expression::BinaryExpression(Self::HasLabel(Box::new(a), Box::new(b))) } - pub fn iterate(a: Expression, b: Expression) -> Expression { - Expression::BinaryExpression(Self::Iterate(Box::new(a), Box::new(b))) - } - - pub fn reduce(a: Expression, b: Expression) -> Expression { - Expression::BinaryExpression(Self::Reduce(Box::new(a), Box::new(b))) - } - - pub fn filter(a: Expression, b: Expression) -> Expression { - Expression::BinaryExpression(Self::Filter(Box::new(a), Box::new(b))) - } - pub fn index(a: Expression, b: Expression) -> Expression { Expression::BinaryExpression(Self::Index(Box::new(a), Box::new(b))) } @@ -487,9 +474,6 @@ impl ParentExpression for BinaryExpression { BinaryExpression::Modulo(a, b) => vec![a, b], BinaryExpression::Exponent(a, b) => vec![a, b], BinaryExpression::HasLabel(a, b) => vec![a, b], - BinaryExpression::Iterate(a, b) => vec![a, b], - BinaryExpression::Reduce(a, b) => vec![a, b], - BinaryExpression::Filter(a, b) => vec![a, b], BinaryExpression::Index(a, b) => vec![a, b], } } @@ -610,3 +594,73 @@ impl ParentExpression for ListExpression { children } } + +#[derive(Debug, Clone, PartialEq, Hash, Eq)] +pub struct IteratorExpression { + pub item_identifier: Arc, + pub list_expression: Box, + pub filter: Option>, + pub map_expression: Option>, +} + +impl IteratorExpression { + pub fn map( + item_identifier: Arc, + list_expression: Expression, + map_expression: Expression, + ) -> Expression { + Expression::IteratorExpression(IteratorExpression { + item_identifier, + list_expression: Box::new(list_expression), + filter: None, + map_expression: Some(Box::new(map_expression)), + }) + } + + pub fn map_with_filter( + item_identifier: Arc, + list_expression: Expression, + map_expression: Expression, + filter: Expression, + ) -> Expression { + Expression::IteratorExpression(IteratorExpression { + item_identifier, + list_expression: Box::new(list_expression), + filter: Some(Box::new(filter)), + map_expression: Some(Box::new(map_expression)), + }) + } + + pub fn iterator(item_identifier: Arc, list_expression: Expression) -> Expression { + Expression::IteratorExpression(IteratorExpression { + item_identifier, + list_expression: Box::new(list_expression), + filter: None, + map_expression: None, + }) + } + + pub fn iterator_with_filter( + item_identifier: Arc, + list_expression: Expression, + filter: Expression, + ) -> Expression { + Expression::IteratorExpression(IteratorExpression { + item_identifier, + list_expression: Box::new(list_expression), + filter: Some(Box::new(filter)), + map_expression: None, + }) + } +} + +impl ParentExpression for IteratorExpression { + fn get_children(&self) -> Vec<&Expression> { + let mut children = Vec::new(); + children.push(&*self.list_expression); + if let Some(filter) = &self.filter { + children.push(filter); + } + children + } +} diff --git a/query-cypher/src/lib.rs b/query-cypher/src/lib.rs index 4f81802..ee419f2 100644 --- a/query-cypher/src/lib.rs +++ b/query-cypher/src/lib.rs @@ -188,10 +188,6 @@ peg::parser! { rule else_expression() -> Expression = kw_else() __+ else_:expression() __+ { else_ } - // rule list_range_expression() -> Expression - // = start:expression()? __* ".." __* end:expression()? { UnaryExpression::list_range(start, end) } - // / start:expression() __* ".." __* { BinaryExpression::range(start, UnaryExpression::literal(Literal::Integer(9223372036854775807))) } - #[cache_left_rec] pub rule expression() -> Expression = precedence!{ @@ -200,12 +196,9 @@ peg::parser! { -- kw_not() __* c:(@) { UnaryExpression::not(c) } -- - v:variable() {UnaryExpression::literal(Literal::Expression(Box::new(v)))} - - - a:(@) __* "|" __* b:@ { UnaryExpression::literal(Literal::Expression(Box::new(BinaryExpression::iterate(a, b)))) } + it:iterator() { it } + -- a:(@) __* kw_in() __* b:@ { BinaryExpression::in_(a, b) } - a:(@) __* kw_where() __* b:expression() { BinaryExpression::filter(a, b) } a:(@) __* "=" __* b:@ { BinaryExpression::eq(a, b) } a:(@) __* ("<>" / "!=") __* b:@ { BinaryExpression::ne(a, b) } a:(@) __* "<" __* b:@ { BinaryExpression::lt(a, b) } @@ -241,6 +234,32 @@ peg::parser! { "[" __* c:expression() ** (__* "," __*) __* "]" { ListExpression::list(c) } } + #[cache_left_rec] + rule iterator() -> Expression + = "[" __* item:ident() __* kw_in() __* list:expression() __* kw_where() __* filter:expression() __* "|" __* map:expression()__* "]" + { IteratorExpression::map_with_filter(item, list, map, filter) } + + / "[" __* item:ident() __* kw_in() __* list:expression() __* "|" __* map:expression()__* "]" + { IteratorExpression::map(item, list, map) } + + / "[" __* item:ident() __* kw_in() __* list:expression() __* kw_where() __* filter:expression()__* "]" + { IteratorExpression::iterator_with_filter(item, list, filter) } + + / "[" __* item:ident() __* kw_in() __* list:expression()__* "]" + { IteratorExpression::iterator(item, list) } + + / item:ident() __* kw_in() __* list:expression() __* kw_where() __* filter:expression() __* "|" __* map:expression() + { IteratorExpression::map_with_filter(item, list, map, filter) } + + / item:ident() __* kw_in() __* list:expression() __* "|" __* map:expression() + { IteratorExpression::map(item, list, map) } + + / item:ident() __* kw_in() __* list:expression() __* kw_where() __* filter:expression() + { IteratorExpression::iterator_with_filter(item, list, filter) } + + / item:ident() __* kw_in() __* list:expression() + { IteratorExpression::iterator(item, list) } + // e.g. 'hello_world', 'Rust', 'HAS_PROPERTY' rule ident() -> Arc @@ -324,11 +343,6 @@ peg::parser! { SetClause { name: p.0, key: p.1, value: e } } - rule variable() -> Expression - = n:ident() _()? "=" _()? v:real() { UnaryExpression::variable(n,UnaryExpression::literal(Literal::Real(v))) } - / n:ident() _()? "=" _()? v:integer() { UnaryExpression::variable(n,UnaryExpression::literal(Literal::Integer(v))) } - - // e.g. 'DELETE a' rule delete_clause() -> Arc = kw_delete() __+ name:ident() { name } diff --git a/query-cypher/src/tests.rs b/query-cypher/src/tests.rs index d8c1be8..400f6a2 100644 --- a/query-cypher/src/tests.rs +++ b/query-cypher/src/tests.rs @@ -408,28 +408,22 @@ fn test_reduce_function() { FunctionExpression::function( "reduce".into(), vec![ - UnaryExpression::literal(Literal::Expression(Box::new( - UnaryExpression::variable( - "s".into(), - UnaryExpression::literal(Literal::Integer(0)) - ) - ))), - UnaryExpression::literal(Literal::Expression(Box::new( - BinaryExpression::iterate( - BinaryExpression::in_( - UnaryExpression::ident("x"), - ListExpression::list(vec![ - UnaryExpression::literal(Literal::Integer(1)), - UnaryExpression::literal(Literal::Integer(2)), - UnaryExpression::literal(Literal::Integer(3)), - ]) - ), - BinaryExpression::add( - UnaryExpression::ident("s"), - UnaryExpression::ident("x") - ) + BinaryExpression::eq( + UnaryExpression::ident("s"), + UnaryExpression::literal(Literal::Integer(0)) + ), + IteratorExpression::map( + "x".into(), + ListExpression::list(vec![ + UnaryExpression::literal(Literal::Integer(1)), + UnaryExpression::literal(Literal::Integer(2)), + UnaryExpression::literal(Literal::Integer(3)), + ]), + BinaryExpression::add( + UnaryExpression::ident("s"), + UnaryExpression::ident("x") ) - ))) + ) ], 17, ), @@ -640,3 +634,31 @@ fn test_reflex_query_with_comment() { ]) ); } + +#[test] +fn where_follows_with_no_alias() { + let query = + cypher::query("MATCH (a) WITH a WHERE 1 = 1 RETURN a.Field1", &TEST_CONFIG).unwrap(); + + println!("{:#?}", query.parts); + + assert_eq!(2, query.parts.len()); + + assert_eq!( + query.parts[0].return_clause, + ProjectionClause::Item(vec![UnaryExpression::ident("a".into())]) + ); + + assert_eq!( + query.parts[1].where_clauses, + vec![BinaryExpression::eq( + UnaryExpression::literal(Literal::Integer(1)), + UnaryExpression::literal(Literal::Integer(1)) + )] + ); + + assert_eq!( + query.parts[1].return_clause, + ProjectionClause::Item(vec![UnaryExpression::property("a".into(), "Field1".into())]) + ); +}