Skip to content

Commit

Permalink
Add crate for LocalSet execution
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Sep 27, 2023
1 parent d8da712 commit d90af81
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 56 deletions.
108 changes: 52 additions & 56 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ members = [
"lib/just-retry",
"lib/post-process",
"lib/speedy-uuid",
"lib/tokio-run-local",
]
resolver = "2"

Expand Down
9 changes: 9 additions & 0 deletions lib/tokio-run-local/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "tokio-run-local"
edition.workspace = true
version.workspace = true

[dependencies]
thiserror = "1.0.49"
tokio = { version = "1.32.0", features = ["rt", "sync"] }
tracing = "0.1.37"
70 changes: 70 additions & 0 deletions lib/tokio-run-local/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::{any::Any, future::Future, pin::Pin, sync::OnceLock, thread};
use thiserror::Error;
use tokio::{
sync::{mpsc, oneshot},
task::LocalSet,
};

type LocalBoxFuture<'a, O> = Pin<Box<dyn Future<Output = O> + 'a>>;
type FutureProducer = Box<dyn FnOnce() -> LocalBoxFuture<'static, Box<dyn Any + Send>> + Send>;
type Task = (FutureProducer, oneshot::Sender<Box<dyn Any + Send>>);

const CHANNEL_CAPACITY: usize = 500;
static GLOBAL_SINGLE_THREADED_RUNTIME: OnceLock<mpsc::Sender<Task>> = OnceLock::new();

#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Oneshot(#[from] oneshot::error::RecvError),

#[error("Spawn error")]
SpawnError,
}

pub async fn run<F, Fut>(func: F) -> Result<Box<Fut::Output>, Error>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future,
<Fut as Future>::Output: Send + 'static,
{
let runtime_handle = GLOBAL_SINGLE_THREADED_RUNTIME.get_or_init(|| {
let (sender, mut receiver) = mpsc::channel::<Task>(CHANNEL_CAPACITY);

thread::spawn(move || {
let local_set = LocalSet::new();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

local_set.block_on(&runtime, async move {
while let Some((producer, sender)) = receiver.recv().await {
tokio::task::spawn_local(async move {
if sender.send(producer().await).is_err() {
tracing::debug!("failed to send value from single threaded executor");
}
});
}
});
});

sender
});

let (sender, receiver) = oneshot::channel();
let closure = Box::new(|| {
Box::pin(async move {
let output = func().await;
Box::new(output) as Box<dyn Any + Send>
}) as _
});

runtime_handle
.send((closure, sender))
.await
.map_err(|_| Error::SpawnError)?;

let result = receiver.await?;

Ok(result.downcast().unwrap())
}

0 comments on commit d90af81

Please sign in to comment.