Skip to content

Commit

Permalink
feat: Support async lambda and udf
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Nov 27, 2023
1 parent 34e3e39 commit b330585
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 154 deletions.
1 change: 0 additions & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl SimpleOrchestrator {
})
.collect();
let runtime = dozer_lambda::JsRuntime::new(
self.runtime.clone(),
app_url(&self.config.api.app_grpc),
lambda_modules,
Default::default(),
Expand Down
3 changes: 3 additions & 0 deletions dozer-deno/src/runtime/exception.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function () {
throw new Error("exception from javascript");
}
5 changes: 5 additions & 0 deletions dozer-deno/src/runtime/fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export default async function () {
const response = await fetch('https://api.github.com/repos/getdozer/dozer/commits?per_page=1');
const json = await response.json();
return json;
}
5 changes: 5 additions & 0 deletions dozer-deno/src/runtime/fetch_exception.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export default async function () {
const response = await fetch("https://github.com/getdozer/dozer/commits");
const json = await response.json();
return json;
}
247 changes: 110 additions & 137 deletions dozer-deno/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,22 @@
//! Here we implement a `Runtime` struct that runs `JsRuntime` in a dedicated thread.
//! By sending work to the worker thread, `Runtime` is `Send + Sync`.
use std::{
collections::HashMap,
fs::canonicalize,
future::poll_fn,
num::NonZeroI32,
ops::ControlFlow,
sync::Arc,
task::{Context, Poll},
};
use std::{collections::HashMap, fs::canonicalize, num::NonZeroI32, thread::JoinHandle};

use deno_runtime::{
deno_core::{anyhow::Context as _, error::AnyError, JsRuntime, ModuleSpecifier},
deno_core::{
anyhow::{bail, Context as _},
error::AnyError,
JsRuntime, ModuleSpecifier,
},
deno_napi::v8::{self, undefined, Function, Global, Local},
};
use dozer_types::{
log::{error, info},
serde_json::Value,
thiserror,
};
use tokio::{
sync::{
mpsc::{self, error::TryRecvError},
oneshot,
},
task::{JoinHandle, LocalSet},
};
use tokio::sync::{mpsc, oneshot};

use self::conversion::{from_v8, to_v8};

Expand Down Expand Up @@ -57,62 +47,42 @@ pub enum Error {

impl Runtime {
/// Returns `Runtime` and the ids of the exported functions.
pub async fn new(
tokio_runtime: Arc<tokio::runtime::Runtime>,
modules: Vec<String>,
) -> Result<(Self, Vec<NonZeroI32>), Error> {
pub async fn new(modules: Vec<String>) -> Result<(Self, Vec<NonZeroI32>), Error> {
let (init_sender, init_receiver) = oneshot::channel();
let (work_sender, work_receiver) = mpsc::channel(10);
let handle = tokio_runtime.clone().spawn_blocking(move || {
let mut js_runtime = match js_runtime::new() {
Ok(js_runtime) => js_runtime,
Err(e) => {
let _ = init_sender.send(Err(Error::CreateJsRuntime(e)));
return;
}
};
let local_set = LocalSet::new();
let functions = match local_set
.block_on(&tokio_runtime, load_functions(&mut js_runtime, modules))
{
Ok(functions) => {
if init_sender
.send(Ok(functions.iter().map(|(id, _)| *id).collect::<Vec<_>>()))
.is_err()
{
return;
}
functions
}
let handle = std::thread::spawn(move || {
let worker = match Worker::new(modules) {
Ok(worker) => worker,
Err(e) => {
let _ = init_sender.send(Err(e));
return;
}
};
let functions = functions.into_iter().collect();
local_set.block_on(
&tokio_runtime,
worker_loop(js_runtime, work_receiver, functions),
);
if init_sender
.send(Ok(worker.functions.iter().map(|(id, _)| *id).collect()))
.is_err()
{
return;
}
let functions = worker.functions.into_iter().collect();
worker
.tokio_runtime
.block_on(worker_loop(worker.js_runtime, work_receiver, functions));
});

let mut this = Self {
work_sender,
handle: Some(handle),
};
let functions = match init_receiver.await {
Ok(Ok(functions)) => functions,
Ok(Err(e)) => return Err(e),
Err(_) => {
// Propagate the panic.
handle.await.unwrap();
unreachable!("we should have panicked");
this.propagate_panic();
}
};

Ok((
Self {
work_sender,
handle: Some(handle),
},
functions,
))
Ok((this, functions))
}

pub async fn call_function(
Expand All @@ -131,60 +101,81 @@ impl Runtime {
.await
.is_err()
{
return self.propagate_panic().await;
self.propagate_panic();
}
let Ok(result) = return_receiver.await else {
return self.propagate_panic().await;
self.propagate_panic();
};
result
}

// Return type is actually `!`
async fn propagate_panic(&mut self) -> Result<Value, AnyError> {
fn propagate_panic(&mut self) -> ! {
self.handle
.take()
.expect("runtime panicked before and cannot be used again")
.await
.join()
.unwrap();
unreachable!("we should have panicked");
}
}

async fn load_functions(
runtime: &mut JsRuntime,
modules: Vec<String>,
) -> Result<Vec<(NonZeroI32, Global<Function>)>, Error> {
let mut result = vec![];
for module in modules {
let path = canonicalize(&module).map_err(|e| Error::CanonicalizePath(module.clone(), e))?;
let module_specifier =
ModuleSpecifier::from_file_path(path).expect("we just canonicalized it");
info!("loading module {}", module_specifier);
let module_id = runtime
.load_side_module(&module_specifier, None)
.await
.map_err(|e| Error::LoadModule(module.clone(), e))?;
js_runtime::evaluate_module(runtime, module_id)
.await
.map_err(|e| Error::EvaluateModule(module.clone(), e))?;
let namespace = runtime
.get_module_namespace(module_id)
.map_err(|e| Error::GetModuleNamespace(module.clone(), e))?;
let scope = &mut runtime.handle_scope();
let namespace = v8::Local::new(scope, namespace);
let default_key = v8::String::new_external_onebyte_static(scope, b"default")
.unwrap()
.into();
let default_export = namespace
.get(scope, default_key)
.ok_or_else(|| Error::ModuleNoDefaultExport(module.clone()))?;
let function: Local<Function> = default_export
.try_into()
.map_err(|e| Error::ModuleDefaultExportNotFunction(module.clone(), e))?;
let id = function.get_identity_hash();
result.push((id, Global::new(scope, function)));
struct Worker {
tokio_runtime: tokio::runtime::Runtime,
js_runtime: JsRuntime,
functions: Vec<(NonZeroI32, Global<Function>)>,
}

impl Worker {
fn new(modules: Vec<String>) -> Result<Self, Error> {
let tokio_runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(Error::CreateJsRuntime)?;
let mut js_runtime = js_runtime::new().map_err(Error::CreateJsRuntime)?;

let mut functions = vec![];
for module in modules {
functions.push(tokio_runtime.block_on(load_function(&mut js_runtime, module))?);
}

Ok(Self {
tokio_runtime,
js_runtime,
functions,
})
}
Ok(result)
}

async fn load_function(
runtime: &mut JsRuntime,
module: String,
) -> Result<(NonZeroI32, Global<Function>), Error> {
let path = canonicalize(&module).map_err(|e| Error::CanonicalizePath(module.clone(), e))?;
let module_specifier = ModuleSpecifier::from_file_path(path).expect("we just canonicalized it");
info!("loading module {}", module_specifier);
let module_id = runtime
.load_side_module(&module_specifier, None)
.await
.map_err(|e| Error::LoadModule(module.clone(), e))?;
js_runtime::evaluate_module(runtime, module_id)
.await
.map_err(|e| Error::EvaluateModule(module.clone(), e))?;
let namespace = runtime
.get_module_namespace(module_id)
.map_err(|e| Error::GetModuleNamespace(module.clone(), e))?;
let scope = &mut runtime.handle_scope();
let namespace = v8::Local::new(scope, namespace);
let default_key = v8::String::new_external_onebyte_static(scope, b"default")
.unwrap()
.into();
let default_export = namespace
.get(scope, default_key)
.ok_or_else(|| Error::ModuleNoDefaultExport(module.clone()))?;
let function: Local<Function> = default_export
.try_into()
.map_err(|e| Error::ModuleDefaultExportNotFunction(module.clone(), e))?;
let id = function.get_identity_hash();
Ok((id, Global::new(scope, function)))
}

#[derive(Debug)]
Expand All @@ -201,56 +192,29 @@ async fn worker_loop(
mut work_receiver: mpsc::Receiver<Work>,
functions: HashMap<NonZeroI32, Global<Function>>,
) {
loop {
match poll_fn(|cx| {
poll_work_and_event_loop(&mut runtime, &mut work_receiver, &functions, cx)
})
.await
{
ControlFlow::Continue(Ok(())) => {}
ControlFlow::Continue(Err(e)) => {
error!("JavaScript runtime error: {}", e);
}
ControlFlow::Break(()) => {
break;
}
}
while let Some(work) = work_receiver.recv().await {
do_work(&mut runtime, work, &functions).await;
}
}

fn poll_work_and_event_loop(
async fn do_work(
runtime: &mut JsRuntime,
work_receiver: &mut mpsc::Receiver<Work>,
work: Work,
functions: &HashMap<NonZeroI32, Global<Function>>,
cx: &mut Context,
) -> Poll<ControlFlow<(), Result<(), AnyError>>> {
match work_receiver.try_recv() {
Ok(work) => {
do_work(runtime, work, functions);
}
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => return Poll::Ready(ControlFlow::Break(())),
}

runtime
.poll_event_loop(cx, false)
.map(ControlFlow::Continue)
}

fn do_work(runtime: &mut JsRuntime, work: Work, functions: &HashMap<NonZeroI32, Global<Function>>) {
) {
match work {
Work::CallFunction {
id,
args,
return_sender,
} => {
// Ignore error if receiver is closed.
let _ = return_sender.send(call_function(runtime, id, args, functions));
let _ = return_sender.send(call_function(runtime, id, args, functions).await);
}
}
}

fn call_function(
async fn call_function(
runtime: &mut JsRuntime,
function: NonZeroI32,
args: Vec<Value>,
Expand All @@ -259,17 +223,26 @@ fn call_function(
let function = functions
.get(&function)
.context(format!("function {} not found", function))?;
let scope = &mut runtime.handle_scope();
let recv = undefined(scope);
let mut scope = runtime.handle_scope();
let recv = undefined(&mut scope);
let args = args
.into_iter()
.map(|arg| to_v8(scope, arg))
.map(|arg| to_v8(&mut scope, arg))
.collect::<Result<Vec<_>, _>>()?;
let result = Local::new(scope, function).call(scope, recv.into(), &args);
result
.map(|value| from_v8(scope, value))
.unwrap_or(Ok(Value::Null))
let Some(promise) = Local::new(&mut scope, function).call(&mut scope, recv.into(), &args)
else {
// Deno doesn't expose a way to get the exception.
bail!("uncaught javascript exception");
};
let promise = Global::new(&mut scope, promise);
drop(scope);
let result = runtime.resolve_value(promise).await?;
let scope = &mut runtime.handle_scope();
let result = Local::new(scope, result);
from_v8(scope, result)
}

mod conversion;
mod js_runtime;
#[cfg(test)]
mod tests;
3 changes: 3 additions & 0 deletions dozer-deno/src/runtime/square.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function (input) {
return input * input;
}
Loading

0 comments on commit b330585

Please sign in to comment.