Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async resource with hasRef to threadsafe functions #950

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions crates/neon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,21 @@ pub use neon_macros::*;
#[cfg(feature = "napi-6")]
mod lifecycle;

#[cfg(feature = "napi-8")]
#[repr(u64)]
// Note: `upper` must be non-zero or `napi_check_object_type_tag` will always return false
// https://github.com/nodejs/node/blob/5fad0b93667ffc6e4def52996b9529ac99b26319/src/js_native_api_v8.cc#L2455
pub(crate) enum UpperTypeTag {
Module = 1,
Tsfn = 2,
}

#[cfg(feature = "napi-8")]
static MODULE_TAG: once_cell::sync::Lazy<crate::sys::TypeTag> = once_cell::sync::Lazy::new(|| {
let mut lower = [0; std::mem::size_of::<u64>()];

// Generating a random module tag at runtime allows Neon builds to be reproducible. A few
// alternativeswere considered:
// alternatives were considered:
// * Generating a random value at build time; this reduces runtime dependencies but, breaks
// reproducible builds
// * A static random value; this solves the previous issues, but does not protect against ABI
Expand All @@ -123,7 +132,8 @@ static MODULE_TAG: once_cell::sync::Lazy<crate::sys::TypeTag> = once_cell::sync:
// expansion of implementation.
let lower = u64::from_ne_bytes(lower);

// Note: `upper` must be non-zero or `napi_check_object_type_tag` will always return false
// https://github.com/nodejs/node/blob/5fad0b93667ffc6e4def52996b9529ac99b26319/src/js_native_api_v8.cc#L2455
crate::sys::TypeTag { lower, upper: 1 }
crate::sys::TypeTag {
lower,
upper: UpperTypeTag::Module as u64,
}
});
11 changes: 11 additions & 0 deletions crates/neon/src/sys/bindings/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,17 @@ mod napi1 {
message: *const c_char,
message_len: usize,
);

fn wrap(
env: Env,
js_object: Value,
native_object: *mut c_void,
finalize_cb: Finalize,
finalize_hint: *mut c_void,
result: *mut Ref,
) -> Status;

fn unwrap(env: Env, js_object: Value, result: *mut *mut c_void) -> Status;
}
);
}
Expand Down
179 changes: 166 additions & 13 deletions crates/neon/src/sys/tsfn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ const BOUNDARY: FailureBoundary = FailureBoundary {
panic: "A panic occurred while executing a `neon::event::Channel::send` callback",
};

#[cfg(feature = "napi-8")]
// Identifies state stored in the async resource of a threadsafe function
static TSFN_TAG: once_cell::sync::Lazy<crate::sys::TypeTag> = once_cell::sync::Lazy::new(|| {
let mut tag = *crate::MODULE_TAG;
tag.upper = crate::UpperTypeTag::Tsfn as u64;
tag
});

#[derive(Debug)]
struct Tsfn(napi::ThreadsafeFunction);

Expand All @@ -27,10 +35,16 @@ unsafe impl Sync for Tsfn {}
/// function for scheduling tasks to execute on a JavaScript thread.
pub struct ThreadsafeFunction<T> {
tsfn: Tsfn,
is_finalized: Arc<Mutex<bool>>,
state: Arc<Mutex<State>>,
callback: fn(Option<Env>, T),
}

#[derive(Debug)]
struct State {
is_finalized: bool,
has_ref: bool,
}

#[derive(Debug)]
struct Callback<T> {
callback: fn(Option<Env>, T),
Expand All @@ -40,6 +54,138 @@ struct Callback<T> {
/// Error returned when scheduling a threadsafe function with some data
pub struct CallError;

unsafe extern "C" fn has_ref_callback(env: Env, info: napi::CallbackInfo) -> napi::Value {
let create_bool = |result| {
let mut out = MaybeUninit::uninit();
assert_eq!(
napi::get_boolean(env, result, out.as_mut_ptr()),
napi::Status::Ok
);
out.assume_init()
};

// If we hit _any_ failure condition, assume the threadsafe function is referenced
let bail = || create_bool(true);

let this = {
let mut this = MaybeUninit::uninit();

if napi::get_cb_info(
env,
info,
ptr::null_mut(),
ptr::null_mut(),
this.as_mut_ptr(),
ptr::null_mut(),
) != napi::Status::Ok
{
return bail();
}

this.assume_init()
};

#[cfg(feature = "napi-8")]
{
let mut has_tag = false;
let status =
napi::check_object_type_tag(env, this, &*TSFN_TAG as *const _, &mut has_tag as *mut _);

if status != napi::Status::Ok || !has_tag {
return bail();
}
}

let mut state = MaybeUninit::uninit();

if napi::unwrap(env, this, state.as_mut_ptr()) != napi::Status::Ok {
return bail();
}

let state = &*state.assume_init().cast::<Mutex<State>>();
let is_ref = state.lock().map(|state| state.has_ref).unwrap_or(true);

create_bool(is_ref)
}

unsafe extern "C" fn drop_state(_env: Env, data: *mut c_void, _hint: *mut c_void) {
drop(Arc::<Mutex<State>>::from_raw(data.cast()))
}

unsafe fn create_async_resource(env: Env, state: Arc<Mutex<State>>) -> napi::Value {
let has_ref_fn_name = "hasRef";

let has_ref_fn = {
let mut has_ref_fn = MaybeUninit::uninit();

assert_eq!(
napi::create_function(
env,
has_ref_fn_name.as_ptr().cast(),
has_ref_fn_name.len(),
Some(has_ref_callback),
ptr::null_mut(),
has_ref_fn.as_mut_ptr(),
),
napi::Status::Ok,
);

has_ref_fn.assume_init()
};

let resource = {
let mut resource = MaybeUninit::uninit();

assert_eq!(
napi::create_object(env, resource.as_mut_ptr()),
napi::Status::Ok
);

resource.assume_init()
};

let has_ref_key = {
let mut key = MaybeUninit::uninit();

assert_eq!(
napi::create_string_utf8(
env,
has_ref_fn_name.as_ptr().cast(),
has_ref_fn_name.len(),
key.as_mut_ptr()
),
napi::Status::Ok
);

key.assume_init()
};

assert_eq!(
napi::set_property(env, resource, has_ref_key, has_ref_fn),
napi::Status::Ok
);

assert_eq!(
napi::wrap(
env,
resource,
Arc::into_raw(state) as *mut _,
Some(drop_state),
ptr::null_mut(),
ptr::null_mut(),
),
napi::Status::Ok
);

#[cfg(feature = "napi-8")]
assert_eq!(
napi::type_tag_object(env, resource, &*TSFN_TAG),
napi::Status::Ok
);

resource
}

impl<T: Send + 'static> ThreadsafeFunction<T> {
/// Creates a new unbounded N-API Threadsafe Function
/// Safety: `Env` must be valid for the current thread
Expand All @@ -55,19 +201,22 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
callback: fn(Option<Env>, T),
) -> Self {
let mut result = MaybeUninit::uninit();
let is_finalized = Arc::new(Mutex::new(false));
let state = Arc::new(Mutex::new(State {
is_finalized: false,
has_ref: true,
}));

assert_eq!(
napi::create_threadsafe_function(
env,
std::ptr::null_mut(),
std::ptr::null_mut(),
ptr::null_mut(),
create_async_resource(env, state.clone()),
super::string(env, "neon threadsafe function"),
max_queue_size,
// Always set the reference count to 1. Prefer using
// Rust `Arc` to maintain the struct.
1,
Arc::into_raw(is_finalized.clone()) as *mut _,
Arc::into_raw(state.clone()) as *mut _,
Some(Self::finalize),
std::ptr::null_mut(),
Some(Self::callback),
Expand All @@ -78,7 +227,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

Self {
tsfn: Tsfn(result.assume_init()),
is_finalized,
state,
callback,
}
}
Expand All @@ -98,10 +247,10 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

// Hold the lock before entering `call_threadsafe_function` so that
// `finalize_cb` would never complete.
let mut is_finalized = self.is_finalized.lock().unwrap();
let mut state = self.state.lock().unwrap();

let status = {
if *is_finalized {
if state.is_finalized {
napi::Status::Closing
} else {
unsafe {
Expand All @@ -115,7 +264,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
} else {
// Prevent further calls to `call_threadsafe_function`
if status == napi::Status::Closing {
*is_finalized = true;
state.is_finalized = true;
}

// If the call failed, the callback won't execute
Expand All @@ -128,27 +277,31 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
/// References a threadsafe function to prevent exiting the event loop until it has been dropped. (Default)
/// Safety: `Env` must be valid for the current thread
pub unsafe fn reference(&self, env: Env) {
let mut state = self.state.lock().unwrap();
assert_eq!(
napi::ref_threadsafe_function(env, self.tsfn.0),
napi::Status::Ok,
);
state.has_ref = true;
}

/// Unreferences a threadsafe function to allow exiting the event loop before it has been dropped.
/// Safety: `Env` must be valid for the current thread
pub unsafe fn unref(&self, env: Env) {
let mut state = self.state.lock().unwrap();
assert_eq!(
napi::unref_threadsafe_function(env, self.tsfn.0),
napi::Status::Ok,
);
state.has_ref = false;
}

// Provides a C ABI wrapper for a napi callback notifying us about tsfn
// being finalized.
unsafe extern "C" fn finalize(_env: Env, data: *mut c_void, _hint: *mut c_void) {
let is_finalized = Arc::from_raw(data as *mut Mutex<bool>);
let state = Arc::from_raw(data as *mut Mutex<State>);

*is_finalized.lock().unwrap() = true;
state.lock().unwrap().is_finalized = true;
}

// Provides a C ABI wrapper for invoking the user supplied function pointer
Expand Down Expand Up @@ -179,10 +332,10 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

impl<T> Drop for ThreadsafeFunction<T> {
fn drop(&mut self) {
let is_finalized = self.is_finalized.lock().unwrap();
let state = self.state.lock().unwrap();

// tsfn was already finalized by `Environment::CleanupHandles()` in Node.js
if *is_finalized {
if state.is_finalized {
return;
}

Expand Down