diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index d462f77e9f..d3f89d8e04 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -296,7 +296,6 @@ impl SimpleOrchestrator { }) .collect(); let runtime = dozer_lambda::JsRuntime::new( - self.runtime.clone(), app_url(&self.config.api.app_grpc), lambda_modules, Default::default(), diff --git a/dozer-deno/src/runtime/exception.js b/dozer-deno/src/runtime/exception.js new file mode 100644 index 0000000000..8f150c0f19 --- /dev/null +++ b/dozer-deno/src/runtime/exception.js @@ -0,0 +1,3 @@ +export default function () { + throw new Error("exception from javascript"); +} diff --git a/dozer-deno/src/runtime/fetch.js b/dozer-deno/src/runtime/fetch.js new file mode 100644 index 0000000000..8c500a69d5 --- /dev/null +++ b/dozer-deno/src/runtime/fetch.js @@ -0,0 +1,5 @@ +export default async function () { + const response = await fetch('https://api.github.com/repos/getdozer/dozer/commits?per_page=1'); + const json = await response.json(); + return json; +} diff --git a/dozer-deno/src/runtime/fetch_exception.js b/dozer-deno/src/runtime/fetch_exception.js new file mode 100644 index 0000000000..73d70295af --- /dev/null +++ b/dozer-deno/src/runtime/fetch_exception.js @@ -0,0 +1,5 @@ +export default async function () { + const response = await fetch("https://github.com/getdozer/dozer/commits"); + const json = await response.json(); + return json; +} diff --git a/dozer-deno/src/runtime/mod.rs b/dozer-deno/src/runtime/mod.rs index b82b5a8edd..15e93b7c81 100644 --- a/dozer-deno/src/runtime/mod.rs +++ b/dozer-deno/src/runtime/mod.rs @@ -2,18 +2,14 @@ //! Here we implement a `Runtime` struct that runs `JsRuntime` in a dedicated thread. //! By sending work to the worker thread, `Runtime` is `Send + Sync`. -use std::{ - collections::HashMap, - fs::canonicalize, - future::poll_fn, - num::NonZeroI32, - ops::ControlFlow, - sync::Arc, - task::{Context, Poll}, -}; +use std::{collections::HashMap, fs::canonicalize, num::NonZeroI32, thread::JoinHandle}; use deno_runtime::{ - deno_core::{anyhow::Context as _, error::AnyError, JsRuntime, ModuleSpecifier}, + deno_core::{ + anyhow::{bail, Context as _}, + error::AnyError, + JsRuntime, ModuleSpecifier, + }, deno_napi::v8::{self, undefined, Function, Global, Local}, }; use dozer_types::{ @@ -21,13 +17,7 @@ use dozer_types::{ serde_json::Value, thiserror, }; -use tokio::{ - sync::{ - mpsc::{self, error::TryRecvError}, - oneshot, - }, - task::{JoinHandle, LocalSet}, -}; +use tokio::sync::{mpsc, oneshot}; use self::conversion::{from_v8, to_v8}; @@ -57,62 +47,42 @@ pub enum Error { impl Runtime { /// Returns `Runtime` and the ids of the exported functions. - pub async fn new( - tokio_runtime: Arc, - modules: Vec, - ) -> Result<(Self, Vec), Error> { + pub async fn new(modules: Vec) -> Result<(Self, Vec), Error> { let (init_sender, init_receiver) = oneshot::channel(); let (work_sender, work_receiver) = mpsc::channel(10); - let handle = tokio_runtime.clone().spawn_blocking(move || { - let mut js_runtime = match js_runtime::new() { - Ok(js_runtime) => js_runtime, - Err(e) => { - let _ = init_sender.send(Err(Error::CreateJsRuntime(e))); - return; - } - }; - let local_set = LocalSet::new(); - let functions = match local_set - .block_on(&tokio_runtime, load_functions(&mut js_runtime, modules)) - { - Ok(functions) => { - if init_sender - .send(Ok(functions.iter().map(|(id, _)| *id).collect::>())) - .is_err() - { - return; - } - functions - } + let handle = std::thread::spawn(move || { + let worker = match Worker::new(modules) { + Ok(worker) => worker, Err(e) => { let _ = init_sender.send(Err(e)); return; } }; - let functions = functions.into_iter().collect(); - local_set.block_on( - &tokio_runtime, - worker_loop(js_runtime, work_receiver, functions), - ); + if init_sender + .send(Ok(worker.functions.iter().map(|(id, _)| *id).collect())) + .is_err() + { + return; + } + let functions = worker.functions.into_iter().collect(); + worker + .tokio_runtime + .block_on(worker_loop(worker.js_runtime, work_receiver, functions)); }); + let mut this = Self { + work_sender, + handle: Some(handle), + }; let functions = match init_receiver.await { Ok(Ok(functions)) => functions, Ok(Err(e)) => return Err(e), Err(_) => { - // Propagate the panic. - handle.await.unwrap(); - unreachable!("we should have panicked"); + this.propagate_panic(); } }; - Ok(( - Self { - work_sender, - handle: Some(handle), - }, - functions, - )) + Ok((this, functions)) } pub async fn call_function( @@ -131,60 +101,81 @@ impl Runtime { .await .is_err() { - return self.propagate_panic().await; + self.propagate_panic(); } let Ok(result) = return_receiver.await else { - return self.propagate_panic().await; + self.propagate_panic(); }; result } - // Return type is actually `!` - async fn propagate_panic(&mut self) -> Result { + fn propagate_panic(&mut self) -> ! { self.handle .take() .expect("runtime panicked before and cannot be used again") - .await + .join() .unwrap(); unreachable!("we should have panicked"); } } -async fn load_functions( - runtime: &mut JsRuntime, - modules: Vec, -) -> Result)>, Error> { - let mut result = vec![]; - for module in modules { - let path = canonicalize(&module).map_err(|e| Error::CanonicalizePath(module.clone(), e))?; - let module_specifier = - ModuleSpecifier::from_file_path(path).expect("we just canonicalized it"); - info!("loading module {}", module_specifier); - let module_id = runtime - .load_side_module(&module_specifier, None) - .await - .map_err(|e| Error::LoadModule(module.clone(), e))?; - js_runtime::evaluate_module(runtime, module_id) - .await - .map_err(|e| Error::EvaluateModule(module.clone(), e))?; - let namespace = runtime - .get_module_namespace(module_id) - .map_err(|e| Error::GetModuleNamespace(module.clone(), e))?; - let scope = &mut runtime.handle_scope(); - let namespace = v8::Local::new(scope, namespace); - let default_key = v8::String::new_external_onebyte_static(scope, b"default") - .unwrap() - .into(); - let default_export = namespace - .get(scope, default_key) - .ok_or_else(|| Error::ModuleNoDefaultExport(module.clone()))?; - let function: Local = default_export - .try_into() - .map_err(|e| Error::ModuleDefaultExportNotFunction(module.clone(), e))?; - let id = function.get_identity_hash(); - result.push((id, Global::new(scope, function))); +struct Worker { + tokio_runtime: tokio::runtime::Runtime, + js_runtime: JsRuntime, + functions: Vec<(NonZeroI32, Global)>, +} + +impl Worker { + fn new(modules: Vec) -> Result { + let tokio_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(Error::CreateJsRuntime)?; + let mut js_runtime = js_runtime::new().map_err(Error::CreateJsRuntime)?; + + let mut functions = vec![]; + for module in modules { + functions.push(tokio_runtime.block_on(load_function(&mut js_runtime, module))?); + } + + Ok(Self { + tokio_runtime, + js_runtime, + functions, + }) } - Ok(result) +} + +async fn load_function( + runtime: &mut JsRuntime, + module: String, +) -> Result<(NonZeroI32, Global), Error> { + let path = canonicalize(&module).map_err(|e| Error::CanonicalizePath(module.clone(), e))?; + let module_specifier = ModuleSpecifier::from_file_path(path).expect("we just canonicalized it"); + info!("loading module {}", module_specifier); + let module_id = runtime + .load_side_module(&module_specifier, None) + .await + .map_err(|e| Error::LoadModule(module.clone(), e))?; + js_runtime::evaluate_module(runtime, module_id) + .await + .map_err(|e| Error::EvaluateModule(module.clone(), e))?; + let namespace = runtime + .get_module_namespace(module_id) + .map_err(|e| Error::GetModuleNamespace(module.clone(), e))?; + let scope = &mut runtime.handle_scope(); + let namespace = v8::Local::new(scope, namespace); + let default_key = v8::String::new_external_onebyte_static(scope, b"default") + .unwrap() + .into(); + let default_export = namespace + .get(scope, default_key) + .ok_or_else(|| Error::ModuleNoDefaultExport(module.clone()))?; + let function: Local = default_export + .try_into() + .map_err(|e| Error::ModuleDefaultExportNotFunction(module.clone(), e))?; + let id = function.get_identity_hash(); + Ok((id, Global::new(scope, function))) } #[derive(Debug)] @@ -201,43 +192,16 @@ async fn worker_loop( mut work_receiver: mpsc::Receiver, functions: HashMap>, ) { - loop { - match poll_fn(|cx| { - poll_work_and_event_loop(&mut runtime, &mut work_receiver, &functions, cx) - }) - .await - { - ControlFlow::Continue(Ok(())) => {} - ControlFlow::Continue(Err(e)) => { - error!("JavaScript runtime error: {}", e); - } - ControlFlow::Break(()) => { - break; - } - } + while let Some(work) = work_receiver.recv().await { + do_work(&mut runtime, work, &functions).await; } } -fn poll_work_and_event_loop( +async fn do_work( runtime: &mut JsRuntime, - work_receiver: &mut mpsc::Receiver, + work: Work, functions: &HashMap>, - cx: &mut Context, -) -> Poll>> { - match work_receiver.try_recv() { - Ok(work) => { - do_work(runtime, work, functions); - } - Err(TryRecvError::Empty) => (), - Err(TryRecvError::Disconnected) => return Poll::Ready(ControlFlow::Break(())), - } - - runtime - .poll_event_loop(cx, false) - .map(ControlFlow::Continue) -} - -fn do_work(runtime: &mut JsRuntime, work: Work, functions: &HashMap>) { +) { match work { Work::CallFunction { id, @@ -245,12 +209,12 @@ fn do_work(runtime: &mut JsRuntime, work: Work, functions: &HashMap { // Ignore error if receiver is closed. - let _ = return_sender.send(call_function(runtime, id, args, functions)); + let _ = return_sender.send(call_function(runtime, id, args, functions).await); } } } -fn call_function( +async fn call_function( runtime: &mut JsRuntime, function: NonZeroI32, args: Vec, @@ -259,17 +223,26 @@ fn call_function( let function = functions .get(&function) .context(format!("function {} not found", function))?; - let scope = &mut runtime.handle_scope(); - let recv = undefined(scope); + let mut scope = runtime.handle_scope(); + let recv = undefined(&mut scope); let args = args .into_iter() - .map(|arg| to_v8(scope, arg)) + .map(|arg| to_v8(&mut scope, arg)) .collect::, _>>()?; - let result = Local::new(scope, function).call(scope, recv.into(), &args); - result - .map(|value| from_v8(scope, value)) - .unwrap_or(Ok(Value::Null)) + let Some(promise) = Local::new(&mut scope, function).call(&mut scope, recv.into(), &args) + else { + // Deno doesn't expose a way to get the exception. + bail!("uncaught javascript exception"); + }; + let promise = Global::new(&mut scope, promise); + drop(scope); + let result = runtime.resolve_value(promise).await?; + let scope = &mut runtime.handle_scope(); + let result = Local::new(scope, result); + from_v8(scope, result) } mod conversion; mod js_runtime; +#[cfg(test)] +mod tests; diff --git a/dozer-deno/src/runtime/square.js b/dozer-deno/src/runtime/square.js new file mode 100644 index 0000000000..8a73c42839 --- /dev/null +++ b/dozer-deno/src/runtime/square.js @@ -0,0 +1,3 @@ +export default function (input) { + return input * input; +} diff --git a/dozer-deno/src/runtime/tests.rs b/dozer-deno/src/runtime/tests.rs new file mode 100644 index 0000000000..3c6e587866 --- /dev/null +++ b/dozer-deno/src/runtime/tests.rs @@ -0,0 +1,38 @@ +use dozer_types::serde_json::json; + +use super::*; + +async fn call_function(module: &str, args: Vec) -> Result { + let (mut runtime, functions) = Runtime::new(vec![format!("src/runtime/{module}")]).await?; + runtime.call_function(functions[0], args).await +} + +#[tokio::test] +async fn test_runtime() { + assert_eq!( + call_function("square.js", vec![json!(2.0)]).await.unwrap(), + json!(4.0) + ); +} + +#[tokio::test] +async fn test_function_call_exception() { + let error = call_function("exception.js", vec![]).await.unwrap_err(); + assert_eq!(error.to_string(), "uncaught javascript exception"); +} + +#[tokio::test] +async fn test_async_function_call() { + let Value::Array(result) = call_function("fetch.js", vec![]).await.unwrap() else { + panic!("expected array") + }; + assert!(!result.is_empty()); +} + +#[tokio::test] +async fn test_async_function_call_exception() { + let error = call_function("fetch_exception.js", vec![]) + .await + .unwrap_err(); + assert!(error.to_string().starts_with("SyntaxError: ")); +} diff --git a/dozer-lambda/src/js/mod.rs b/dozer-lambda/src/js/mod.rs index f54ee5de6b..e9b6a1d5db 100644 --- a/dozer-lambda/src/js/mod.rs +++ b/dozer-lambda/src/js/mod.rs @@ -1,10 +1,6 @@ use std::sync::Arc; -use dozer_log::{ - errors::ReaderBuilderError, - reader::LogReaderOptions, - tokio::{self, sync::Mutex}, -}; +use dozer_log::{errors::ReaderBuilderError, reader::LogReaderOptions, tokio::sync::Mutex}; use dozer_types::{ grpc_types::internal::internal_pipeline_service_client::InternalPipelineServiceClient, models::lambda_config::JavaScriptLambda, thiserror, tonic, @@ -30,7 +26,6 @@ pub enum Error { impl Runtime { pub async fn new( - runtime: Arc, app_url: String, lambda_modules: Vec, options: LogReaderOptions, @@ -40,7 +35,7 @@ impl Runtime { .iter() .map(|module| module.module.clone()) .collect(); - let (worker, lambdas) = Worker::new(runtime, modules).await?; + let (worker, lambdas) = Worker::new(modules).await?; // Create trigger. let client = InternalPipelineServiceClient::connect(app_url.clone()) diff --git a/dozer-lambda/src/js/tests.rs b/dozer-lambda/src/js/tests.rs index c03d339154..b425f8318a 100644 --- a/dozer-lambda/src/js/tests.rs +++ b/dozer-lambda/src/js/tests.rs @@ -1,6 +1,7 @@ use std::{pin::pin, time::Duration}; use dozer_deno::deno_runtime::deno_core::futures::future::{join, select, Either}; +use dozer_log::tokio; use super::*; @@ -13,16 +14,16 @@ fn test_lambda_runtime() { .build() .unwrap(), ); - tokio_runtime.block_on(test_lambda_runtime_impl(tokio_runtime.clone())); + tokio_runtime.block_on(test_lambda_runtime_impl()); } -async fn test_lambda_runtime_impl(tokio_runtime: Arc) { +async fn test_lambda_runtime_impl() { let (app_url, app_server) = mock::start_mock_internal_pipeline_server().await; let lambda_modules = vec![JavaScriptLambda { endpoint: mock::mock_endpoint(), module: "src/js/test_lambda.js".to_string(), }]; - let lambda_runtime = Runtime::new(tokio_runtime, app_url, lambda_modules, Default::default()); + let lambda_runtime = Runtime::new(app_url, lambda_modules, Default::default()); let (lambda_runtime, app_server) = match select(pin!(lambda_runtime), app_server).await { Either::Left((lambda_runtime, app_server)) => (lambda_runtime.unwrap(), app_server), Either::Right((app_server, _)) => { diff --git a/dozer-lambda/src/js/worker/mod.rs b/dozer-lambda/src/js/worker/mod.rs index 17d4f01bee..06dc7c8b55 100644 --- a/dozer-lambda/src/js/worker/mod.rs +++ b/dozer-lambda/src/js/worker/mod.rs @@ -1,6 +1,5 @@ -use std::{num::NonZeroI32, sync::Arc}; +use std::num::NonZeroI32; -use dozer_log::tokio::runtime::Runtime; use dozer_types::{ json_types::field_to_json_value, log::error, @@ -15,10 +14,9 @@ pub struct Worker { impl Worker { pub async fn new( - runtime: Arc, modules: Vec, ) -> Result<(Self, Vec), dozer_deno::RuntimeError> { - let (runtime, lambdas) = dozer_deno::Runtime::new(runtime, modules).await?; + let (runtime, lambdas) = dozer_deno::Runtime::new(modules).await?; Ok((Self { runtime }, lambdas)) } diff --git a/dozer-sql/expression/src/javascript/evaluate.rs b/dozer-sql/expression/src/javascript/evaluate.rs index a9be6761b2..157b4e12e3 100644 --- a/dozer-sql/expression/src/javascript/evaluate.rs +++ b/dozer-sql/expression/src/javascript/evaluate.rs @@ -45,8 +45,7 @@ impl Udf { module: String, arg: Expression, ) -> Result { - let (deno_runtime, functions) = - dozer_deno::Runtime::new(tokio_runtime.clone(), vec![module]).await?; + let (deno_runtime, functions) = dozer_deno::Runtime::new(vec![module]).await?; let function = functions[0]; Ok(Self { function_name,