Skip to content

Commit

Permalink
Implement more graceful tokio limbo send_off (fixes kotauskas#71)
Browse files Browse the repository at this point in the history
The send_off method now properly falls back to a static tokio runtime if required.
  • Loading branch information
florian-g2 committed Sep 7, 2024
1 parent db6c36d commit 660a370
Showing 1 changed file with 109 additions and 42 deletions.
151 changes: 109 additions & 42 deletions src/os/windows/limbo/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
//! Does not use the limbo pool.
use std::ops::Deref;
use crate::{
os::windows::{winprelude::*, FileHandle},
DebugExpectExt, LOCK_POISON,
};
use std::sync::{Mutex, OnceLock};
use tokio::{
fs::File,
net::windows::named_pipe::{NamedPipeClient, NamedPipeServer},
runtime::{self, Handle as RuntimeHandle, Runtime},
sync::mpsc::{unbounded_channel, UnboundedSender},
task,
};
use tokio::{fs::File, net::windows::named_pipe::{NamedPipeClient, NamedPipeServer}, runtime::{Handle as RuntimeHandle}, sync::mpsc::{unbounded_channel, UnboundedSender}, task};
use tokio::runtime::{Builder, Runtime};

pub(crate) enum Corpse {
NpServer(NamedPipeServer),
Expand All @@ -38,26 +34,7 @@ impl AsRawHandle for Corpse {
}

type Limbo = UnboundedSender<Corpse>;
static LIMBO: OnceLock<Mutex<Limbo>> = OnceLock::new();
static LIMBO_RT: OnceLock<Runtime> = OnceLock::new();

fn static_runtime_handle() -> &'static RuntimeHandle {
LIMBO_RT
.get_or_init(|| {
runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_io()
.thread_name("Tokio limbo dispatcher")
.thread_stack_size(1024 * 1024)
.build()
.expect(
"\
failed to build Tokio limbo helper (only necessary if the first pipe to be dropped happens to go \
out of scope outside of another Tokio runtime)",
)
})
.handle()
}
static LIMBO: Mutex<Option<Limbo>> = Mutex::new(None);

fn bury(c: Corpse) {
task::spawn_blocking(move || {
Expand All @@ -66,30 +43,120 @@ fn bury(c: Corpse) {
});
}

fn create_limbo() -> Limbo {
let (tx, mut rx) = unbounded_channel();

let mut _guard = None;
fn create_limbo() -> Option<Limbo> {
if RuntimeHandle::try_current().is_err() {
_guard = Some(static_runtime_handle().enter());
return None;
}

let (tx, mut rx) = unbounded_channel();
task::spawn(async move {
while let Some(c) = rx.recv().await {
bury(c);
}
});

tx

if tx.is_closed() {
// The tokio runtime may still have a handle, but we're right in the process of the runtime shutdown.
// When tokio is shutting down, it will drop tasks directly and synchronously at task::spawn methods.
// tx.is_closed() will evaluate to true in that case, because the channel receiver is dropped along with the task.
None
} else {
Some(tx)
}
}

pub(crate) fn send_off(c: Corpse) {
let mutex = LIMBO.get_or_init(|| Mutex::new(create_limbo()));
let mut limbo = mutex.lock().expect(LOCK_POISON);
if let Err(c) = limbo.send(c) {
*limbo = create_limbo();
limbo
.send(c.0)
.ok()
.debug_expect("fresh Tokio limbo helper died immediately after being created");
if let Some(limbo) = GUARANTEED_LIMBO.get() {
limbo.send(c).debug_expect("Guaranteed limbo must always be available");
return;
}

let mut limbo_guard = LIMBO.lock().expect(LOCK_POISON);
let limbo = match limbo_guard.as_ref() {
Some(limbo) => Some(limbo),
// if no limbo exists, create one
None => {
*limbo_guard = create_limbo();
limbo_guard.as_ref()
}
};

let Some(limbo) = limbo else {
// no user tokio runtime available for limbo, sending to guaranteed limbo
drop(limbo_guard);
send_off_to_guaranteed_limbo(c);
return;
};

// try to send the corpse to the limbo
let c = match limbo.send(c) {
Ok(_) => return,
Err(c) => c.0,
};

// we lost the limbo, but maybe it ran on a different tokio runtime which has died in the meantime
// try again using a fresh limbo on the current tokio runtime

*limbo_guard = create_limbo();
let Some(limbo) = limbo_guard.as_ref() else {
// no user tokio runtime available for limbo, sending to guaranteed limbo
drop(limbo_guard);
send_off_to_guaranteed_limbo(c);
return;
};

let c = match limbo.send(c) {
Ok(_) => return,
Err(c) => c.0,
};

// we lost the limbo again, now we have no other option than to send to the guaranteed limbo
*limbo_guard = None;
drop(limbo_guard);
send_off_to_guaranteed_limbo(c);
}


// the guaranteed limbo is running on its own tokio runtime.
// it is initialized as a last resort if no other tokio runtime is available.
struct GuaranteedLimbo {
runtime: Runtime,
limbo: Limbo
}

impl Deref for GuaranteedLimbo {
type Target = Limbo;
fn deref(&self) -> &Self::Target {
&self.limbo
}
}

static GUARANTEED_LIMBO: OnceLock<GuaranteedLimbo> = OnceLock::new();

fn send_off_to_guaranteed_limbo(c: Corpse) {
let limbo = GUARANTEED_LIMBO.get_or_init(|| {
let (tx, mut rx) = unbounded_channel();

let runtime = Builder::new_multi_thread()
.worker_threads(1)
.enable_io()
.thread_name("Tokio limbo dispatcher")
.thread_stack_size(1024 * 1024)
.build()
.expect(
"\
failed to build Tokio limbo helper (only necessary if the first pipe to be dropped happens to go \
out of scope outside of another Tokio runtime)",
);

runtime.spawn(async move {
while let Some(c) = rx.recv().await {
bury(c);
}
});

GuaranteedLimbo { runtime, limbo: tx }
});

limbo.send(c).debug_expect("Guaranteed limbo must always be available");
}

0 comments on commit 660a370

Please sign in to comment.