Skip to content

Commit

Permalink
feat: Support async lambda and udf (#2239)
Browse files Browse the repository at this point in the history
* feat: Support `async` lambda and udf

* Clean up deno worker abstraction a bit

---------

Co-authored-by: Jesse Bakker <[email protected]>
  • Loading branch information
chubei and Jesse-Bakker authored Dec 11, 2023
1 parent 605709c commit 30d5510
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 173 deletions.
1 change: 0 additions & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl SimpleOrchestrator {
})
.collect();
let runtime = dozer_lambda::JsRuntime::new(
self.runtime.clone(),
app_url(&self.config.api.app_grpc),
lambda_modules,
Default::default(),
Expand Down
3 changes: 3 additions & 0 deletions dozer-deno/src/runtime/exception.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function () {
throw new Error("exception from javascript");
}
5 changes: 5 additions & 0 deletions dozer-deno/src/runtime/fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export default async function () {
const response = await fetch('https://api.github.com/repos/getdozer/dozer/commits?per_page=1');
const json = await response.json();
return json;
}
5 changes: 5 additions & 0 deletions dozer-deno/src/runtime/fetch_exception.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export default async function () {
const response = await fetch("https://github.com/getdozer/dozer/commits");
const json = await response.json();
return json;
}
254 changes: 109 additions & 145 deletions dozer-deno/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,22 @@
//! Here we implement a `Runtime` struct that runs `JsRuntime` in a dedicated thread.
//! By sending work to the worker thread, `Runtime` is `Send + Sync`.
use std::{
collections::HashMap,
fs::canonicalize,
future::poll_fn,
num::NonZeroI32,
ops::ControlFlow,
sync::Arc,
task::{Context, Poll},
};
use std::{collections::HashMap, fs::canonicalize, num::NonZeroI32, thread::JoinHandle};

use deno_runtime::{
deno_core::{anyhow::Context as _, error::AnyError, JsRuntime, ModuleSpecifier},
deno_core::{
anyhow::{bail, Context as _},
error::AnyError,
JsRuntime, ModuleSpecifier,
},
deno_napi::v8::{self, undefined, Function, Global, Local},
};
use dozer_types::{
json_types::JsonValue,
log::{error, info},
thiserror,
};
use tokio::{
sync::{
mpsc::{self, error::TryRecvError},
oneshot,
},
task::{JoinHandle, LocalSet},
};
use tokio::sync::{mpsc, oneshot};

use self::conversion::{from_v8, to_v8};

Expand Down Expand Up @@ -57,62 +47,39 @@ pub enum Error {

impl Runtime {
/// Returns `Runtime` and the ids of the exported functions.
pub async fn new(
tokio_runtime: Arc<tokio::runtime::Runtime>,
modules: Vec<String>,
) -> Result<(Self, Vec<NonZeroI32>), Error> {
pub async fn new(modules: Vec<String>) -> Result<(Self, Vec<NonZeroI32>), Error> {
let (init_sender, init_receiver) = oneshot::channel();
let (work_sender, work_receiver) = mpsc::channel(10);
let handle = tokio_runtime.clone().spawn_blocking(move || {
let mut js_runtime = match js_runtime::new() {
Ok(js_runtime) => js_runtime,
Err(e) => {
let _ = init_sender.send(Err(Error::CreateJsRuntime(e)));
return;
}
};
let local_set = LocalSet::new();
let functions = match local_set
.block_on(&tokio_runtime, load_functions(&mut js_runtime, modules))
{
Ok(functions) => {
if init_sender
.send(Ok(functions.iter().map(|(id, _)| *id).collect::<Vec<_>>()))
.is_err()
{
return;
}
functions
}
let handle = std::thread::spawn(move || {
let mut worker = match Worker::new(modules) {
Ok(worker) => worker,
Err(e) => {
let _ = init_sender.send(Err(e));
return;
}
};
let functions = functions.into_iter().collect();
local_set.block_on(
&tokio_runtime,
worker_loop(js_runtime, work_receiver, functions),
);
if init_sender
.send(Ok(worker.functions.keys().cloned().collect()))
.is_err()
{
return;
}
worker.run(work_receiver)
});

let mut this = Self {
work_sender,
handle: Some(handle),
};
let functions = match init_receiver.await {
Ok(Ok(functions)) => functions,
Ok(Err(e)) => return Err(e),
Err(_) => {
// Propagate the panic.
handle.await.unwrap();
unreachable!("we should have panicked");
this.propagate_panic();
}
};

Ok((
Self {
work_sender,
handle: Some(handle),
},
functions,
))
Ok((this, functions))
}

pub async fn call_function(
Expand All @@ -131,31 +98,102 @@ impl Runtime {
.await
.is_err()
{
return self.propagate_panic().await;
self.propagate_panic();
}
let Ok(result) = return_receiver.await else {
return self.propagate_panic().await;
self.propagate_panic();
};
result
}

// Return type is actually `!`
async fn propagate_panic(&mut self) -> Result<JsonValue, AnyError> {
fn propagate_panic(&mut self) -> ! {
self.handle
.take()
.expect("runtime panicked before and cannot be used again")
.await
.join()
.unwrap();
unreachable!("we should have panicked");
}
}

async fn load_functions(
runtime: &mut JsRuntime,
modules: Vec<String>,
) -> Result<Vec<(NonZeroI32, Global<Function>)>, Error> {
let mut result = vec![];
for module in modules {
struct Worker {
tokio_runtime: tokio::runtime::Runtime,
js_runtime: JsRuntime,
functions: HashMap<NonZeroI32, Global<Function>>,
}

impl Worker {
fn new(modules: Vec<String>) -> Result<Self, Error> {
let tokio_runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(Error::CreateJsRuntime)?;
let mut js_runtime = js_runtime::new().map_err(Error::CreateJsRuntime)?;

let mut functions = HashMap::with_capacity(modules.len());
for module in modules {
let (id, fun) = tokio_runtime.block_on(Self::load_function(&mut js_runtime, module))?;
functions.insert(id, fun);
}

Ok(Self {
tokio_runtime,
js_runtime,
functions,
})
}

async fn call_function(
runtime: &mut JsRuntime,
function: NonZeroI32,
args: Vec<JsonValue>,
functions: &HashMap<NonZeroI32, Global<Function>>,
) -> Result<JsonValue, AnyError> {
let function = functions
.get(&function)
.context(format!("function {} not found", function))?;
let mut scope = runtime.handle_scope();
let recv = undefined(&mut scope);
let args = args
.into_iter()
.map(|arg| to_v8(&mut scope, arg))
.collect::<Result<Vec<_>, _>>()?;
let Some(promise) = Local::new(&mut scope, function).call(&mut scope, recv.into(), &args)
else {
// Deno doesn't expose a way to get the exception.
bail!("uncaught javascript exception");
};
let promise = Global::new(&mut scope, promise);
drop(scope);
let result = runtime.resolve_value(promise).await?;
let scope = &mut runtime.handle_scope();
let result = Local::new(scope, result);
from_v8(scope, result)
}

fn run(&mut self, mut work_receiver: mpsc::Receiver<Work>) {
let runtime = &mut self.js_runtime;
let functions = &self.functions;
self.tokio_runtime.block_on(async {
while let Some(work) = work_receiver.recv().await {
match work {
Work::CallFunction {
id,
args,
return_sender,
} => {
// Ignore error if receiver is closed.
let _ = return_sender
.send(Self::call_function(runtime, id, args, functions).await);
}
}
}
})
}
async fn load_function(
runtime: &mut JsRuntime,
module: String,
) -> Result<(NonZeroI32, Global<Function>), Error> {
let path = canonicalize(&module).map_err(|e| Error::CanonicalizePath(module.clone(), e))?;
let module_specifier =
ModuleSpecifier::from_file_path(path).expect("we just canonicalized it");
Expand All @@ -182,9 +220,8 @@ async fn load_functions(
.try_into()
.map_err(|e| Error::ModuleDefaultExportNotFunction(module.clone(), e))?;
let id = function.get_identity_hash();
result.push((id, Global::new(scope, function)));
Ok((id, Global::new(scope, function)))
}
Ok(result)
}

#[derive(Debug)]
Expand All @@ -196,80 +233,7 @@ enum Work {
},
}

async fn worker_loop(
mut runtime: JsRuntime,
mut work_receiver: mpsc::Receiver<Work>,
functions: HashMap<NonZeroI32, Global<Function>>,
) {
loop {
match poll_fn(|cx| {
poll_work_and_event_loop(&mut runtime, &mut work_receiver, &functions, cx)
})
.await
{
ControlFlow::Continue(Ok(())) => {}
ControlFlow::Continue(Err(e)) => {
error!("JavaScript runtime error: {}", e);
}
ControlFlow::Break(()) => {
break;
}
}
}
}

fn poll_work_and_event_loop(
runtime: &mut JsRuntime,
work_receiver: &mut mpsc::Receiver<Work>,
functions: &HashMap<NonZeroI32, Global<Function>>,
cx: &mut Context,
) -> Poll<ControlFlow<(), Result<(), AnyError>>> {
match work_receiver.try_recv() {
Ok(work) => {
do_work(runtime, work, functions);
}
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => return Poll::Ready(ControlFlow::Break(())),
}

runtime
.poll_event_loop(cx, false)
.map(ControlFlow::Continue)
}

fn do_work(runtime: &mut JsRuntime, work: Work, functions: &HashMap<NonZeroI32, Global<Function>>) {
match work {
Work::CallFunction {
id,
args,
return_sender,
} => {
// Ignore error if receiver is closed.
let _ = return_sender.send(call_function(runtime, id, args, functions));
}
}
}

fn call_function(
runtime: &mut JsRuntime,
function: NonZeroI32,
args: Vec<JsonValue>,
functions: &HashMap<NonZeroI32, Global<Function>>,
) -> Result<JsonValue, AnyError> {
let function = functions
.get(&function)
.context(format!("function {} not found", function))?;
let scope = &mut runtime.handle_scope();
let recv = undefined(scope);
let args = args
.into_iter()
.map(|arg| to_v8(scope, arg))
.collect::<Result<Vec<_>, _>>()?;
let result = Local::new(scope, function).call(scope, recv.into(), &args);
result
.map(|value| from_v8(scope, value))
.unwrap_or(Ok(JsonValue::NULL))
}

mod conversion;
mod js_runtime;
#[cfg(test)]
mod tests;
3 changes: 3 additions & 0 deletions dozer-deno/src/runtime/square.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function (input) {
return input * input;
}
42 changes: 42 additions & 0 deletions dozer-deno/src/runtime/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use dozer_types::json_types::json;

use super::*;

async fn call_function(module: &str, args: Vec<JsonValue>) -> Result<JsonValue, AnyError> {
let (mut runtime, functions) = Runtime::new(vec![format!("src/runtime/{module}")]).await?;
runtime.call_function(functions[0], args).await
}

#[tokio::test]
async fn test_runtime() {
assert_eq!(
call_function("square.js", vec![json!(2.0)]).await.unwrap(),
json!(4.0)
);
}

#[tokio::test]
async fn test_function_call_exception() {
let error = call_function("exception.js", vec![]).await.unwrap_err();
assert_eq!(error.to_string(), "uncaught javascript exception");
}

#[tokio::test]
async fn test_async_function_call() {
let Ok(result) = call_function("fetch.js", vec![])
.await
.unwrap()
.into_array()
else {
panic!("expected array")
};
assert!(!result.is_empty());
}

#[tokio::test]
async fn test_async_function_call_exception() {
let error = call_function("fetch_exception.js", vec![])
.await
.unwrap_err();
assert!(error.to_string().starts_with("SyntaxError: "));
}
Loading

0 comments on commit 30d5510

Please sign in to comment.