Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(neon): Add tokio async runtime support #1055

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[alias]
# Neon defines mutually exclusive feature flags which prevents using `cargo clippy --all-features`
# The following aliases simplify linting the entire workspace
neon-check = " check --all --all-targets --features napi-experimental,futures,external-buffers,serde"
neon-clippy = "clippy --all --all-targets --features napi-experimental,futures,external-buffers,serde -- -A clippy::missing_safety_doc"
neon-test = " test --all --features=doc-dependencies,doc-comment,napi-experimental,futures,external-buffers,serde"
neon-doc = " rustdoc -p neon --features=doc-dependencies,napi-experimental,futures,external-buffers,sys,serde -- --cfg docsrs"
neon-check = " check --all --all-targets --features napi-experimental,external-buffers,serde,tokio"
neon-clippy = "clippy --all --all-targets --features napi-experimental,external-buffers,serde,tokio -- -A clippy::missing_safety_doc"
neon-test = " test --all --features=doc-dependencies,doc-comment,napi-experimental,external-buffers,serde,tokio"
neon-doc = " rustdoc -p neon --features=doc-dependencies,napi-experimental,external-buffers,sys,serde,tokio -- --cfg docsrs"
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 33 additions & 4 deletions crates/neon-macros/src/export/function/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub(crate) struct Meta {

#[derive(Default)]
pub(super) enum Kind {
Async,
AsyncFn,
#[default]
Normal,
Task,
Expand All @@ -28,7 +30,8 @@ impl Meta {

fn force_context(&mut self, meta: syn::meta::ParseNestedMeta) -> syn::Result<()> {
match self.kind {
Kind::Normal => {}
Kind::Normal | Kind::AsyncFn => {}
Kind::Async => return Err(meta.error(super::ASYNC_CX_ERROR)),
Kind::Task => return Err(meta.error(super::TASK_CX_ERROR)),
}

Expand All @@ -37,6 +40,16 @@ impl Meta {
Ok(())
}

fn make_async(&mut self, meta: syn::meta::ParseNestedMeta) -> syn::Result<()> {
if matches!(self.kind, Kind::AsyncFn) {
return Err(meta.error(super::ASYNC_FN_ERROR));
}

self.kind = Kind::Async;

Ok(())
}

fn make_task(&mut self, meta: syn::meta::ParseNestedMeta) -> syn::Result<()> {
if self.context {
return Err(meta.error(super::TASK_CX_ERROR));
Expand All @@ -48,13 +61,25 @@ impl Meta {
}
}

pub(crate) struct Parser;
pub(crate) struct Parser(syn::ItemFn);

impl Parser {
pub(crate) fn new(item: syn::ItemFn) -> Self {
Self(item)
}
}

impl syn::parse::Parser for Parser {
type Output = Meta;
type Output = (syn::ItemFn, Meta);

fn parse2(self, tokens: proc_macro2::TokenStream) -> syn::Result<Self::Output> {
let Self(item) = self;
let mut attr = Meta::default();

if item.sig.asyncness.is_some() {
attr.kind = Kind::AsyncFn;
}

let parser = syn::meta::parser(|meta| {
if meta.path.is_ident("name") {
return attr.set_name(meta);
Expand All @@ -68,6 +93,10 @@ impl syn::parse::Parser for Parser {
return attr.force_context(meta);
}

if meta.path.is_ident("async") {
return attr.make_async(meta);
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
}

if meta.path.is_ident("task") {
return attr.make_task(meta);
}
Expand All @@ -77,6 +106,6 @@ impl syn::parse::Parser for Parser {

parser.parse2(tokens)?;

Ok(attr)
Ok((item, attr))
}
}
28 changes: 21 additions & 7 deletions crates/neon-macros/src/export/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::export::function::meta::Kind;

pub(crate) mod meta;

static ASYNC_CX_ERROR: &str = "`FunctionContext` is not allowed in async functions";
static ASYNC_FN_ERROR: &str = "`async` attribute should not be used with an `async fn`";
static TASK_CX_ERROR: &str = "`FunctionContext` is not allowed with `task` attribute";

pub(super) fn export(meta: meta::Meta, input: syn::ItemFn) -> proc_macro::TokenStream {
Expand Down Expand Up @@ -40,19 +42,19 @@ pub(super) fn export(meta: meta::Meta, input: syn::ItemFn) -> proc_macro::TokenS
.unwrap_or_else(|| quote::quote!(#name))
});

// Import the value or JSON trait for conversion
let result_trait_name = if meta.json {
quote::format_ident!("NeonExportReturnJson")
// Tag whether we should JSON wrap results
let return_tag = if meta.json {
quote::format_ident!("NeonJsonTag")
} else {
quote::format_ident!("NeonExportReturnValue")
quote::format_ident!("NeonValueTag")
};

// Convert the result
// N.B.: Braces are intentionally included to avoid leaking trait to function body
let result_extract = quote::quote!({
use neon::macro_internal::#result_trait_name;
use neon::macro_internal::{ToNeonMarker, #return_tag as NeonReturnTag};

res.try_neon_export_return(&mut cx)
(&res).to_neon_marker::<NeonReturnTag>().neon_into_js(&mut cx, res)
});

// Default export name as identity unless a name is provided
Expand All @@ -63,6 +65,17 @@ pub(super) fn export(meta: meta::Meta, input: syn::ItemFn) -> proc_macro::TokenS

// Generate the call to the original function
let call_body = match meta.kind {
Kind::Async | Kind::AsyncFn => quote::quote!(
let (#(#tuple_fields,)*) = cx.args()?;
let fut = #name(#context_arg #(#args),*);
let fut = {
use neon::macro_internal::{ToNeonMarker, NeonValueTag};

(&fut).to_neon_marker::<NeonValueTag>().into_neon_result(&mut cx, fut)?
};

neon::macro_internal::spawn(&mut cx, fut, |mut cx, res| #result_extract)
),
Kind::Normal => quote::quote!(
let (#(#tuple_fields,)*) = cx.args()?;
let res = #name(#context_arg #(#args),*);
Expand Down Expand Up @@ -160,7 +173,8 @@ fn has_context_arg(meta: &meta::Meta, sig: &syn::Signature) -> syn::Result<bool>

// Context is only allowed for normal functions
match meta.kind {
Kind::Normal => {}
Kind::Normal | Kind::Async => {}
Kind::AsyncFn => return Err(syn::Error::new(first.span(), ASYNC_CX_ERROR)),
Kind::Task => return Err(syn::Error::new(first.span(), TASK_CX_ERROR)),
}

Expand Down
3 changes: 2 additions & 1 deletion crates/neon-macros/src/export/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub(crate) fn export(
match item {
// Export a function
syn::Item::Fn(item) => {
let meta = syn::parse_macro_input!(attr with function::meta::Parser);
let parser = function::meta::Parser::new(item);
let (item, meta) = syn::parse_macro_input!(attr with parser);

function::export(meta, item)
}
Expand Down
8 changes: 7 additions & 1 deletion crates/neon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ exclude = ["neon.jpg", "doc/**/*"]
edition = "2021"

[dev-dependencies]
itertools = "0.10.5"
semver = "1.0.20"
psd = "0.3.4" # used for a doc example
anyhow = "1.0.75" # used for a doc example
Expand Down Expand Up @@ -56,12 +57,17 @@ external-buffers = []

# Experimental Rust Futures API
# https://github.com/neon-bindings/rfcs/pull/46
futures = ["tokio"]
futures = ["dep:tokio"]

# Enable low-level system APIs. The `sys` API allows augmenting the Neon API
# from external crates.
sys = []

# Enable async runtime
tokio = ["tokio-rt-multi-thread"] # Shorter alias
tokio-rt = ["futures", "tokio/rt"]
tokio-rt-multi-thread = ["tokio-rt", "tokio/rt-multi-thread"]

# Default N-API version. Prefer to select a minimum required version.
# DEPRECATED: This is an alias that should be removed
napi-runtime = ["napi-8"]
Expand Down
2 changes: 2 additions & 0 deletions crates/neon/src/context/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub trait ContextInternal<'cx>: Sized {
}

fn default_main(mut cx: ModuleContext) -> NeonResult<()> {
#[cfg(feature = "tokio-rt-multi-thread")]
crate::executor::tokio::init(&mut cx)?;
crate::registered().export(&mut cx)
}

Expand Down
59 changes: 59 additions & 0 deletions crates/neon/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::{future::Future, pin::Pin};

use crate::{context::Cx, thread::LocalKey};

#[cfg(feature = "tokio-rt")]
pub(crate) mod tokio;

type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the need for Pin here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the Runtime trait to be object-safe so that I can put it in a Box<dyn Runtime>; this means that the spawn(..) function can't be generic. Since it can't be generic, it takes a boxed dyn future.

Boxed futures need be Pin because of Future::poll which takes self: Pin<&mut Self>. Without the Pin, it could be moved in memory and you wouldn't be able to poll it.

This is a very common type in async Rust. The futures crate even includes a type alias for it.


pub(crate) static RUNTIME: LocalKey<Box<dyn Runtime>> = LocalKey::new();

pub trait Runtime: Send + Sync + 'static {
fn spawn(&self, fut: BoxFuture);
}

/// Register a [`Future`] executor runtime globally to the addon.
///
/// Returns `Ok(())` if a global executor has not been set and `Err(runtime)` if it has.
///
/// If the `tokio` feature flag is enabled and the addon does not provide a
/// [`#[neon::main]`](crate::main) function, a multithreaded tokio runtime will be
/// automatically registered.
///
/// **Note**: Each instance of the addon will have its own runtime. It is recommended
/// to initialize the async runtime once in a process global and share it across instances.
///
/// ```
/// # #[cfg(feature = "tokio-rt-multi-thread")]
/// # fn example() {
/// # use neon::prelude::*;
/// use once_cell::sync::OnceCell;
/// use tokio::runtime::Runtime;
///
/// static RUNTIME: OnceCell<Runtime> = OnceCell::new();
///
/// #[neon::main]
/// fn main(mut cx: ModuleContext) -> NeonResult<()> {
/// let runtime = RUNTIME
/// .get_or_try_init(Runtime::new)
/// .or_else(|err| cx.throw_error(err.to_string()))?;
///
/// let _ = neon::set_global_executor(&mut cx, runtime);
///
/// Ok(())
/// }
/// # }
/// ```
pub fn set_global_executor<R>(cx: &mut Cx, runtime: R) -> Result<(), R>
where
R: Runtime,
{
if RUNTIME.get(cx).is_some() {
return Err(runtime);
}

RUNTIME.get_or_init(cx, || Box::new(runtime));

Ok(())
}
66 changes: 66 additions & 0 deletions crates/neon/src/executor/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::sync::Arc;

use super::{BoxFuture, Runtime};

impl Runtime for tokio::runtime::Runtime {
fn spawn(&self, fut: BoxFuture) {
spawn(self.handle(), fut);
}
}

impl Runtime for Arc<tokio::runtime::Runtime> {
fn spawn(&self, fut: BoxFuture) {
spawn(self.handle(), fut);
}
}

impl Runtime for &'static tokio::runtime::Runtime {
fn spawn(&self, fut: BoxFuture) {
spawn(self.handle(), fut);
}
}

impl Runtime for tokio::runtime::Handle {
fn spawn(&self, fut: BoxFuture) {
spawn(self, fut);
}
}

impl Runtime for &'static tokio::runtime::Handle {
fn spawn(&self, fut: BoxFuture) {
spawn(self, fut);
}
}

fn spawn(handle: &tokio::runtime::Handle, fut: BoxFuture) {
#[allow(clippy::let_underscore_future)]
let _ = handle.spawn(fut);
}

#[cfg(feature = "tokio-rt-multi-thread")]
pub(crate) fn init(cx: &mut crate::context::ModuleContext) -> crate::result::NeonResult<()> {
use once_cell::sync::OnceCell;
use tokio::runtime::{Builder, Runtime};

use crate::context::Context;

static RUNTIME: OnceCell<Runtime> = OnceCell::new();

super::RUNTIME.get_or_try_init(cx, |cx| {
let runtime = RUNTIME
.get_or_try_init(|| {
#[cfg(feature = "tokio-rt-multi-thread")]
let mut builder = Builder::new_multi_thread();

#[cfg(not(feature = "tokio-rt-multi-thread"))]
let mut builder = Builder::new_current_thread();

builder.enable_all().build()
})
.or_else(|err| cx.throw_error(err.to_string()))?;

Ok(Box::new(runtime))
})?;

Ok(())
}
Loading