Skip to content

Commit

Permalink
Implement ability to move the spiderfire runtime from one tokio runti…
Browse files Browse the repository at this point in the history
…me to another
  • Loading branch information
Arshia001 committed Sep 20, 2024
1 parent 7ea8ee9 commit b803c1d
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 20 deletions.
37 changes: 32 additions & 5 deletions runtime/src/event_loop/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ use super::{EventLoop, EventLoopPollResult};

type FutureOutput = (Result<BoxedIntoValue, BoxedIntoValue>, TracedHeap<*mut JSObject>);

#[derive(Default)]
pub struct FutureQueue {
queue: FuturesUnordered<JoinHandle<FutureOutput>>,
queue: Option<FuturesUnordered<JoinHandle<FutureOutput>>>,
}

impl Default for FutureQueue {
fn default() -> Self {
Self { queue: Some(Default::default()) }
}
}

impl FutureQueue {
Expand All @@ -30,7 +35,8 @@ impl FutureQueue {
) -> Result<EventLoopPollResult, Option<ErrorReport>> {
let mut results = Vec::new();

while let Poll::Ready(Some(item)) = self.queue.poll_next_unpin(wcx) {
let queue = self.get_queue_mut();
while let Poll::Ready(Some(item)) = queue.poll_next_unpin(wcx) {
match item {
Ok(item) => results.push(item),
Err(error) => {
Expand Down Expand Up @@ -66,11 +72,32 @@ impl FutureQueue {
}

pub fn enqueue(&self, cx: &Context, handle: JoinHandle<FutureOutput>) {
self.queue.push(handle);
self.get_queue().push(handle);
EventLoop::from_context(cx).wake();
}

pub fn is_empty(&self) -> bool {
self.queue.is_empty()
self.get_queue().is_empty()
}

fn get_queue(&self) -> &FuturesUnordered<JoinHandle<FutureOutput>> {
self.queue.as_ref().expect("Future queue was dropped but not recreated")
}

fn get_queue_mut(&mut self) -> &mut FuturesUnordered<JoinHandle<FutureOutput>> {
self.queue.as_mut().expect("Future queue was dropped but not recreated")
}

pub fn recreate_queue(&mut self) {
if self.queue.is_none() {
self.queue = Some(Default::default());
}
}

pub fn drop_queue(&mut self) {
if self.queue.is_some() {
assert!(self.is_empty());
self.queue = None;
}
}
}
2 changes: 1 addition & 1 deletion runtime/src/event_loop/microtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ unsafe extern "C" fn enqueue_promise_job(
_: Handle<*mut JSObject>, _: Handle<*mut JSObject>,
) -> bool {
let cx = unsafe { &Context::new_unchecked(cx) };
let event_loop = unsafe { &mut cx.get_private().event_loop };
let event_loop = cx.get_event_loop();
let microtasks = event_loop.microtasks.as_mut().unwrap();
if !job.is_null() {
microtasks.enqueue(
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/event_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct EventLoop {
impl EventLoop {
#[allow(clippy::mut_from_ref)]
pub(crate) fn from_context(cx: &Context) -> &mut Self {
unsafe { &mut cx.get_private().event_loop }
cx.get_event_loop()
}

pub(crate) fn wake(&mut self) {
Expand Down Expand Up @@ -161,7 +161,7 @@ pub(crate) unsafe extern "C" fn promise_rejection_tracker_callback(
cx: *mut JSContext, _: bool, promise: Handle<*mut JSObject>, state: PromiseRejectionHandlingState, _: *mut c_void,
) {
let cx = unsafe { &Context::new_unchecked(cx) };
let unhandled = &mut unsafe { cx.get_private() }.event_loop.unhandled_rejections;
let unhandled = &mut cx.get_event_loop().unhandled_rejections;
let promise = unsafe { Local::from_raw_handle(promise) };
match state {
PromiseRejectionHandlingState::Unhandled => unhandled.push_back(TracedHeap::from_local(&promise)),
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/globals/abort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl AbortSignal {
});

let duration = Duration::milliseconds(time as i64);
let event_loop = unsafe { &mut cx.get_private().event_loop };
let event_loop = cx.get_event_loop();
if let Some(queue) = &mut event_loop.macrotasks {
queue.enqueue(
cx,
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/globals/microtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::event_loop::microtasks::Microtask;

#[js_fn]
fn queueMicrotask(cx: &Context, callback: Function) -> Result<()> {
let event_loop = unsafe { &mut cx.get_private().event_loop };
let event_loop = cx.get_event_loop();
if let Some(queue) = &mut event_loop.microtasks {
queue.enqueue(cx, Microtask::User(TracedHeap::new(callback.get())));
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions runtime/src/globals/timers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const MINIMUM_DELAY_NESTED: i32 = 4;
fn set_timer(
cx: &Context, callback: Function, duration: Option<Clamp<i32>>, arguments: &[JSVal], repeat: bool,
) -> Result<u32> {
let event_loop = unsafe { &mut cx.get_private().event_loop };
let event_loop = cx.get_event_loop();
if let Some(queue) = &mut event_loop.macrotasks {
let minimum = if queue.nesting > 5 {
MINIMUM_DELAY_NESTED
Expand All @@ -38,7 +38,7 @@ fn set_timer(

fn clear_timer(cx: &Context, id: Option<Enforce<u32>>) -> Result<()> {
if let Some(id) = id {
let event_loop = unsafe { &mut cx.get_private().event_loop };
let event_loop = cx.get_event_loop();
if let Some(queue) = &mut event_loop.macrotasks {
queue.remove(id.0);
Ok(())
Expand Down Expand Up @@ -76,7 +76,7 @@ fn clearInterval(cx: &Context, Opt(id): Opt<Enforce<u32>>) -> Result<()> {

#[js_fn]
fn queueMacrotask(cx: &Context, callback: Function) -> Result<()> {
let event_loop = unsafe { &mut cx.get_private().event_loop };
let event_loop = cx.get_event_loop();
if let Some(queue) = &mut event_loop.macrotasks {
queue.enqueue(cx, Macrotask::User(UserMacrotask::new(callback)), None);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
(result, heap)
});

let event_loop = unsafe { &cx.get_private().event_loop };
let event_loop = cx.get_event_loop();
event_loop.futures.as_ref().map(|futures| {
futures.enqueue(cx, handle);
promise
Expand Down
33 changes: 27 additions & 6 deletions runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ use crate::module::StandardModules;

#[derive(Default)]
pub struct ContextPrivate {
pub(crate) event_loop: EventLoop,
event_loop: EventLoop,
pub app_data: Option<Box<dyn Any>>,
}

pub trait ContextExt {
#[allow(clippy::mut_from_ref)]
fn get_event_loop(&self) -> &mut EventLoop;

#[allow(clippy::mut_from_ref)]
unsafe fn get_private(&self) -> &mut ContextPrivate;

Expand All @@ -41,6 +44,10 @@ pub trait ContextExt {
}

impl ContextExt for Context {
fn get_event_loop(&self) -> &mut EventLoop {
&mut unsafe { self.get_private() }.event_loop
}

unsafe fn get_private(&self) -> &mut ContextPrivate {
unsafe { (*self.get_raw_private()).downcast_mut().unwrap() }
}
Expand All @@ -53,7 +60,6 @@ impl ContextExt for Context {
unsafe { self.get_private().app_data.as_deref_mut().unwrap() as *mut _ }
}

//
unsafe fn get_app_data<T: 'static>(&self) -> &mut T {
unsafe { (*self.get_raw_app_data()).downcast_mut().unwrap() }
}
Expand All @@ -80,20 +86,35 @@ impl<'cx> Runtime<'cx> {
}

pub async fn run_event_loop(&self) -> Result<(), Option<ErrorReport>> {
let event_loop = unsafe { &mut self.cx.get_private().event_loop };
let event_loop = self.cx.get_event_loop();
let cx = self.cx.duplicate();
event_loop.run_to_end(&cx).await
}

pub fn step_event_loop(&self, wcx: &mut std::task::Context) -> Result<(), Option<ErrorReport>> {
let event_loop = unsafe { &mut self.cx.get_private().event_loop };
let event_loop = self.cx.get_event_loop();
let cx = self.cx.duplicate();
event_loop.step(&cx, wcx)
}

pub fn event_loop_is_empty(&self) -> bool {
let event_loop = unsafe { &mut self.cx.get_private().event_loop };
event_loop.is_empty()
self.cx.get_event_loop().is_empty()
}

// This is useful when the event loop needs to be transferred from one tokio runtime to another,
// such as when pre-evaluating code in one runtime and then running the resulting module separately.
pub fn remove_from_tokio_runtime(&self) {
let event_loop = self.cx.get_event_loop();
drop(event_loop.waker.take());
if let Some(ref mut queue) = event_loop.futures {
queue.drop_queue();
}
}

pub fn install_in_tokio_runtime(&self) {
if let Some(ref mut queue) = self.cx.get_event_loop().futures {
queue.recreate_queue();
}
}
}

Expand Down

0 comments on commit b803c1d

Please sign in to comment.