diff --git a/Cargo.toml b/Cargo.toml index ff2faf3..76fb1da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,33 +1,33 @@ [package] -name = "compute-heavy-future-executor" +name = "vacation" version = "0.1.0" edition = "2021" license = "MIT" -repository = "https://github.com/jlizen/compute-heavy-future-executor" -homepage = "https://github.com/jlizen/compute-heavy-future-executor" +repository = "https://github.com/jlizen/vacation" +homepage = "https://github.com/jlizen/vacation" rust-version = "1.70" exclude = ["/.github", "/examples", "/scripts"] readme = "README.md" -description = "Additional executor patterns for handling compute-bounded, blocking futures." -categories = ["asynchronous"] +description = "Give your (runtime) workers a break!" +categories = ["asynchronous", "executor"] [features] -tokio = ["tokio/rt"] -tokio_block_in_place = ["tokio", "tokio/rt-multi-thread"] -secondary_tokio_runtime = ["tokio", "tokio/rt-multi-thread", "dep:libc", "dep:num_cpus"] +default = ["tokio"] +tokio = ["tokio/rt",] [dependencies] -libc = { version = "0.2.168", optional = true } log = "0.4.22" -num_cpus = { version = "1.0", optional = true } -tokio = { version = "1.0", features = ["macros", "sync"] } +num_cpus = "1.0" +tokio = { version = "1.0", features = ["sync"] } [dev-dependencies] -tokio = { version = "1.0", features = ["full"]} +tokio = { version = "1", features = ["full"]} futures-util = "0.3.31" +rayon = "1" [package.metadata.docs.rs] all-features = true +rustdoc-args = ["--cfg", "docsrs"] [lints.rust] diff --git a/README.md b/README.md index 484c00a..f32ead5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,89 @@ -# compute-heavy-future-executor -Experimental crate that adds additional executor patterns to use with frequently blocking futures. +# vacation + Vacation: Give your (runtime) workers a break! + +## Overview + +Today, when library authors write async APIs, they don't have a good way to handle long-running sync segments. + +An application author can use selective handling such as `tokio::task::spawn_blocking()` along with concurrency control to delegate sync segments to blocking threads. Or, they might send the work to a `rayon` threadpool. + +But, library authors generally don't have this flexibility. As, they generally want to be agnostic across runtime. Or, even if they are `tokio`-specific, they generally don't want to call `tokio::task::spawn_blocking()` as it is +suboptimal without extra configuration (concurrency control) as well as highly opinionated to send the work across threads. + +This library solves this problem by providing libray authors a static, globally scoped strategy that they can delegate blocking sync work to without drawing any conclusions about handling. + +And then, the applications using the library can tune handling based on their preferred approach. + +## Usage - Library Authors +For library authors, it's as simple as adding a dependency enabling `vacation` (perhaps behind a feature flag). + +```ignore +[dependencies] +vacation = { version = "0.1", default-features = false } +``` + +And then wrap any sync work by passing it as a closure to a global `execute()` call: + +``` +fn sync_work(input: String)-> u8 { + std::thread::sleep(std::time::Duration::from_secs(5)); + println!("{input}"); + 5 +} +pub async fn a_future_that_has_blocking_sync_work() -> u8 { + // relies on application-specified strategy for translating execute into a future that won't + // block the current worker thread + vacation::execute(move || { sync_work("foo".to_string()) }, vacation::ChanceOfBlocking::High).await.unwrap() +} + +``` + +## Usage - Application owners +Application authors will need to add this library as a a direct dependency in order to customize the execution strategy. +By default, the strategy is just a non-op. + +### Simple example + +```ignore +[dependencies] +// enables `tokio` feature by default => spawn_blocking strategy +vacation = { version = "0.1" } +``` + +And then call the `install_tokio_strategy()` helper that uses some sensible defaults: +``` +#[tokio::main] +async fn main() { + vacation::install_tokio_strategy().unwrap(); +} +``` + +### Rayon example +Or, you can add an alternate strategy, for instance a custom closure using Rayon. + +```ignore +[dependencies] +vacation = { version = "0.1", default-features = false } +// used for example with custom executor +rayon = "1" +``` + +``` +use std::sync::OnceLock; +use rayon::ThreadPool; + +static THREADPOOL: OnceLock = OnceLock::new(); + +fn initialize_strategy() { + THREADPOOL.set(rayon::ThreadPoolBuilder::default().build().unwrap()); + + let custom_closure = |f: vacation::CustomClosureInput| { + Box::new(async move { Ok(THREADPOOL.get().unwrap().spawn(f)) }) as vacation::CustomClosureOutput + }; + + vacation::init() + // probably no need for max concurrency as rayon already is defaulting to a thread per core + // and using a task queue + .custom_executor(custom_closure).install().unwrap(); +} +``` \ No newline at end of file diff --git a/src/block_in_place.rs b/src/block_in_place.rs deleted file mode 100644 index fd62710..0000000 --- a/src/block_in_place.rs +++ /dev/null @@ -1,45 +0,0 @@ -use crate::{ - concurrency_limit::ConcurrencyLimit, - error::{Error, InvalidConfig}, - ComputeHeavyFutureExecutor, -}; - -use tokio::runtime::{Handle, RuntimeFlavor}; - -pub(crate) struct BlockInPlaceExecutor { - concurrency_limit: ConcurrencyLimit, -} - -impl BlockInPlaceExecutor { - pub(crate) fn new(max_concurrency: Option) -> Result { - match Handle::current().runtime_flavor() { - RuntimeFlavor::MultiThread => Ok(()), - #[cfg(tokio_unstable)] - RuntimeFlavor::MultiThreadAlt => Ok(()), - flavor => Err(Error::InvalidConfig(InvalidConfig { - field: "current tokio runtime flavor", - received: format!("{flavor:#?}"), - expected: "MultiThread", - }))?, - }?; - - Ok(Self { - concurrency_limit: ConcurrencyLimit::new(max_concurrency), - }) - } -} - -impl ComputeHeavyFutureExecutor for BlockInPlaceExecutor { - async fn execute(&self, fut: F) -> Result - where - F: std::future::Future + Send + 'static, - O: Send + 'static, - { - let _permit = self.concurrency_limit.acquire_permit().await; - - Ok(tokio::task::block_in_place(move || { - tokio::runtime::Handle::current().block_on(async { fut.await }) - })) - // permit implicitly drops - } -} diff --git a/src/concurrency_limit.rs b/src/concurrency_limit.rs index 1b9281e..971acb2 100644 --- a/src/concurrency_limit.rs +++ b/src/concurrency_limit.rs @@ -23,19 +23,13 @@ impl ConcurrencyLimit { /// Internally turns errors into a no-op (`None`) and outputs log lines. pub(crate) async fn acquire_permit(&self) -> Option { match self.semaphore.clone() { - Some(semaphore) => { - match semaphore - .acquire_owned() - .await - .map_err(|err| Error::Semaphore(err)) - { - Ok(permit) => Some(permit), - Err(err) => { - log::error!("failed to acquire permit: {err}"); - None - } + Some(semaphore) => match semaphore.acquire_owned().await.map_err(Error::Semaphore) { + Ok(permit) => Some(permit), + Err(err) => { + log::error!("failed to acquire permit: {err}"); + None } - } + }, None => None, } } diff --git a/src/current_context.rs b/src/current_context.rs deleted file mode 100644 index 6badb65..0000000 --- a/src/current_context.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::{concurrency_limit::ConcurrencyLimit, error::Error, ComputeHeavyFutureExecutor}; - -pub(crate) struct CurrentContextExecutor { - concurrency_limit: ConcurrencyLimit, -} - -impl CurrentContextExecutor { - pub(crate) fn new(max_concurrency: Option) -> Self { - Self { - concurrency_limit: ConcurrencyLimit::new(max_concurrency), - } - } -} - -impl ComputeHeavyFutureExecutor for CurrentContextExecutor { - async fn execute(&self, fut: F) -> Result - where - F: std::future::Future + Send + 'static, - O: Send + 'static, - { - let _permit = self.concurrency_limit.acquire_permit().await; - - Ok(fut.await) - - // implicit permit drop - } -} diff --git a/src/custom_executor.rs b/src/custom_executor.rs deleted file mode 100644 index 300c432..0000000 --- a/src/custom_executor.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::{future::Future, pin::Pin}; - -use crate::{ - concurrency_limit::ConcurrencyLimit, error::Error, make_future_cancellable, - ComputeHeavyFutureExecutor, -}; - -/// A closure that accepts an arbitrary future and polls it to completion -/// via its preferred strategy. -pub type CustomExecutorClosure = Box< - dyn Fn( - Pin + Send + 'static>>, - ) -> Box< - dyn Future>> - + Send - + 'static, - > + Send - + Sync, ->; - -pub(crate) struct CustomExecutor { - closure: CustomExecutorClosure, - concurrency_limit: ConcurrencyLimit, -} - -impl CustomExecutor { - pub(crate) fn new(closure: CustomExecutorClosure, max_concurrency: Option) -> Self { - Self { - closure, - concurrency_limit: ConcurrencyLimit::new(max_concurrency), - } - } -} - -impl ComputeHeavyFutureExecutor for CustomExecutor { - async fn execute(&self, fut: F) -> Result - where - F: Future + Send + 'static, - O: Send + 'static, - { - let _permit = self.concurrency_limit.acquire_permit().await; - - let (wrapped_future, rx) = make_future_cancellable(fut); - - // if our custom executor future resolves to an error, we know it will never send - // the response so we immediately return - if let Err(err) = Box::into_pin((self.closure)(Box::pin(wrapped_future))).await { - return Err(Error::BoxError(err)); - } - - rx.await.map_err(|err| Error::RecvError(err)) - - // permit implicitly drops - } -} diff --git a/src/error.rs b/src/error.rs index af43323..7ebf8ca 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,33 +2,30 @@ use core::fmt; use crate::ExecutorStrategy; +/// An error from the custom executor #[non_exhaustive] #[derive(Debug)] pub enum Error { + /// Executor has already had a global strategy configured. AlreadyInitialized(ExecutorStrategy), - InvalidConfig(InvalidConfig), + /// Issue listening on the custom executor response channel. RecvError(tokio::sync::oneshot::error::RecvError), + /// Error enforcing concurrency Semaphore(tokio::sync::AcquireError), + /// Dynamic error from the custom executor closure BoxError(Box), #[cfg(feature = "tokio")] + /// Background spawn blocking task panicked JoinError(tokio::task::JoinError), } -#[derive(Debug)] -pub struct InvalidConfig { - pub field: &'static str, - pub received: String, - pub expected: &'static str, -} - impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Error::AlreadyInitialized(strategy) => write!( f, - "global strategy is already initialzed with strategy: {strategy:#?}" + "global strategy is already initialized with strategy: {strategy:#?}" ), - Error::InvalidConfig(err) => write!(f, "invalid config: {err:#?}"), Error::BoxError(err) => write!(f, "custom executor error: {err}"), Error::RecvError(err) => write!(f, "error in custom executor response channel: {err}"), Error::Semaphore(err) => write!( @@ -43,3 +40,12 @@ impl fmt::Display for Error { } } } + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::BoxError(err) => Some(err.source()?), + _ => None, + } + } +} diff --git a/src/executor/custom.rs b/src/executor/custom.rs new file mode 100644 index 0000000..722df11 --- /dev/null +++ b/src/executor/custom.rs @@ -0,0 +1,60 @@ +use std::future::Future; + +use crate::{concurrency_limit::ConcurrencyLimit, error::Error}; + +use super::Execute; + +/// The input for the custom closure +pub type CustomClosureInput = Box; +/// The output type for the custom closure +pub type CustomClosureOutput = + Box>> + Send + 'static>; + +/// A closure that accepts an arbitrary sync function and returns a future that executes it. +/// The Custom Executor will implicitly wrap the input function in a oneshot +/// channel to erase its input/output type. +pub(crate) type CustomClosure = + Box CustomClosureOutput + Send + Sync>; + +pub(crate) struct Custom { + closure: CustomClosure, + concurrency_limit: ConcurrencyLimit, +} + +impl Custom { + pub(crate) fn new(closure: CustomClosure, max_concurrency: Option) -> Self { + Self { + closure, + concurrency_limit: ConcurrencyLimit::new(max_concurrency), + } + } +} + +impl Execute for Custom { + // the compiler correctly is pointing out that the custom closure isn't guaranteed to call f. + // but, we leave that to the implementer to guarantee since we are limited by working with static signatures + #[allow(unused_variables)] + async fn execute(&self, f: F) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let _permit = self.concurrency_limit.acquire_permit().await; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + let wrapped_input_closure = Box::new(|| { + let res = f(); + if tx.send(res).is_err() { + log::trace!("custom sync executor foreground dropped before it could receive the result of the sync closure"); + } + }); + + Box::into_pin((self.closure)(wrapped_input_closure)) + .await + .map_err(Error::BoxError)?; + + rx.await.map_err(Error::RecvError) + // permit implicitly drops + } +} diff --git a/src/executor/execute_directly.rs b/src/executor/execute_directly.rs new file mode 100644 index 0000000..96ce5a0 --- /dev/null +++ b/src/executor/execute_directly.rs @@ -0,0 +1,28 @@ +use crate::{concurrency_limit::ConcurrencyLimit, error::Error}; + +use super::Execute; + +pub(crate) struct ExecuteDirectly { + concurrency_limit: ConcurrencyLimit, +} + +impl ExecuteDirectly { + pub(crate) fn new(max_concurrency: Option) -> Self { + Self { + concurrency_limit: ConcurrencyLimit::new(max_concurrency), + } + } +} + +impl Execute for ExecuteDirectly { + async fn execute(&self, f: F) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let _permit = self.concurrency_limit.acquire_permit().await; + + Ok(f()) + // permit implicitly drops + } +} diff --git a/src/executor/mod.rs b/src/executor/mod.rs new file mode 100644 index 0000000..4d27350 --- /dev/null +++ b/src/executor/mod.rs @@ -0,0 +1,401 @@ +pub(crate) mod custom; +pub(crate) mod execute_directly; +#[cfg(feature = "tokio")] +pub(crate) mod spawn_blocking; + +use std::{future::Future, sync::OnceLock}; + +use custom::{Custom, CustomClosure}; +use execute_directly::ExecuteDirectly; + +use crate::{init, Error, ExecutorStrategy, GlobalStrategy}; + +pub(crate) trait Execute { + /// Accepts a sync function and processes it to completion. + async fn execute(&self, f: F) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static; +} + +fn set_global_strategy(strategy: Executor) -> Result<(), Error> { + GLOBAL_EXECUTOR_STRATEGY + .set(strategy) + .map_err(|_| Error::AlreadyInitialized(GLOBAL_EXECUTOR_STRATEGY.get().unwrap().into()))?; + + log::info!( + "initialized vacation synchronous executor strategy - {:#?}", + global_strategy() + ); + + Ok(()) +} + +/// Get the currently initialized sync strategy, +/// or the default strategy for the current feature in case no strategy has been loaded. +/// +/// See [`ExecutorBuilder`] for details on strategies. +/// +/// # Examples +/// +/// ``` +/// use vacation::{ +/// global_strategy, +/// GlobalStrategy, +/// ExecutorStrategy +/// }; +/// +/// # fn run() { +/// +/// #[cfg(feature = "tokio")] +/// assert_eq!(global_strategy(), GlobalStrategy::Default(ExecutorStrategy::SpawnBlocking)); +/// +/// #[cfg(not(feature = "tokio"))] +/// assert_eq!(global_strategy(), GlobalStrategy::Default(ExecutorStrategy::ExecuteDirectly)); +/// +/// vacation::init() +/// .execute_directly() +/// .install() +/// .unwrap(); +/// +/// assert_eq!(global_strategy(), GlobalStrategy::Initialized(ExecutorStrategy::ExecuteDirectly)); +/// +/// # } +/// ``` +pub fn global_strategy() -> GlobalStrategy { + match GLOBAL_EXECUTOR_STRATEGY.get() { + Some(strategy) => GlobalStrategy::Initialized(strategy.into()), + None => GlobalStrategy::Default(get_default_strategy().into()), + } +} + +pub(crate) fn get_global_executor() -> &'static Executor { + GLOBAL_EXECUTOR_STRATEGY + .get() + .unwrap_or_else(|| get_default_strategy()) +} + +pub(crate) fn get_default_strategy() -> &'static Executor { + DEFAULT_GLOBAL_EXECUTOR_STRATEGY.get_or_init(|| { + log::warn!( + "Defaulting to ExecuteDirectly (non-op) strategy for vacation compute-heavy future executor" + ); + Executor::ExecuteDirectly(ExecuteDirectly::new(None)) + }) +} + +/// The stored strategy used to spawn compute-heavy futures. +static GLOBAL_EXECUTOR_STRATEGY: OnceLock = OnceLock::new(); + +/// The fallback strategy used in case no strategy is explicitly set +static DEFAULT_GLOBAL_EXECUTOR_STRATEGY: OnceLock = OnceLock::new(); + +#[non_exhaustive] +pub(crate) enum Executor { + /// A non-op strategy that runs the function in the current context + ExecuteDirectly(execute_directly::ExecuteDirectly), + /// User-provided closure + Custom(custom::Custom), + /// tokio task::spawn_blocking + #[cfg(feature = "tokio")] + SpawnBlocking(spawn_blocking::SpawnBlocking), +} + +impl Execute for Executor { + async fn execute(&self, f: F) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + match self { + Executor::ExecuteDirectly(executor) => executor.execute(f).await, + Executor::Custom(executor) => executor.execute(f).await, + #[cfg(feature = "tokio")] + Executor::SpawnBlocking(executor) => executor.execute(f).await, + } + } +} + +impl From<&Executor> for ExecutorStrategy { + fn from(value: &Executor) -> Self { + match value { + Executor::ExecuteDirectly(_) => Self::ExecuteDirectly, + Executor::Custom(_) => Self::Custom, + #[cfg(feature = "tokio")] + Executor::SpawnBlocking(_) => Self::SpawnBlocking, + } + } +} + +/// Initialize a set of sensible defaults for a tokio runtime: +/// +/// - [`ExecutorBuilder::spawn_blocking`] strategy +/// - Max concurrency equal to the cpu core count. +/// +/// Stores the current tokio runtime to spawn tasks with. To use an alternate +/// runtime, use [`ExecutorBuilder::spawn_blocking_with_handle`]. +/// +/// Only available with the `tokio` feature. +/// +/// # Panic +/// Calling this from outside a tokio runtime will panic. +/// +/// # Errors +/// Returns an error if the global strategy is already initialized. +/// It can only be initialized once. +/// +/// # Examples +/// +/// ``` +/// # fn run() { +/// vacation::install_tokio_strategy().unwrap(); +/// # } +/// ``` +#[cfg(feature = "tokio")] +pub fn install_tokio_strategy() -> Result<(), Error> { + init() + .max_concurrency(num_cpus::get()) + .spawn_blocking() + .install() +} + +/// A builder to replace the default sync executor strategy +/// with a caller-provided strategy. +/// +/// # Examples +/// +/// ``` +/// # fn run() { +/// vacation::init() +/// .max_concurrency(10) +/// .execute_directly() +/// .install() +/// .unwrap(); +/// # } +/// ``` +#[must_use = "doesn't do anything unless used"] +#[derive(Default)] +pub struct ExecutorBuilder { + pub(crate) max_concurrency: Option, + pub(crate) strategy: Strategy, +} + +impl std::fmt::Debug for ExecutorBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExecutorBuilder") + .field("max_concurrency", &self.max_concurrency) + .field("strategy", &self.strategy) + .finish() + } +} + +#[derive(Debug)] +pub struct NeedsStrategy; +pub enum HasStrategy { + ExecuteDirectly, + #[cfg(feature = "tokio")] + SpawnBlocking(tokio::runtime::Handle), + Custom(CustomClosure), +} + +impl std::fmt::Debug for HasStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::ExecuteDirectly => write!(f, "ExecuteDirectly"), + #[cfg(feature = "tokio")] + Self::SpawnBlocking(handle) => f.debug_tuple("SpawnBlocking").field(handle).finish(), + Self::Custom(_) => f.debug_tuple("Custom").finish(), + } + } +} + +impl ExecutorBuilder { + /// Set the max number of simultaneous futures processed by this executor. + /// + /// If this number is exceeded, the executor will wait to execute the + /// input closure until a permit can be acquired. + /// + /// A good value tends to be the number of cpu cores on your machine. + /// + /// ## Default + /// No maximum concurrency. + /// + /// # Examples + /// + /// ``` + /// # fn run() { + /// vacation::init() + /// .max_concurrency(10) + /// .execute_directly() + /// .install() + /// .unwrap(); + /// # } + #[must_use = "doesn't do anything unless used with a strategy"] + pub fn max_concurrency(self, max_task_concurrency: usize) -> Self { + Self { + max_concurrency: Some(max_task_concurrency), + ..self + } + } + /// Initializes a new (non-op) global strategy to wait in the current context. + /// + /// This is effectively a non-op wrapper that adds no special handling for the sync future + /// besides optional concurrency control. + /// + /// This is the default strategy if nothing is initialized, with no max concurrency. + /// + /// # Examples + /// + /// ``` + /// # async fn run() { + /// vacation::init().execute_directly().install().unwrap(); + /// # } + /// ``` + #[must_use = "doesn't do anything unless install()-ed"] + pub fn execute_directly(self) -> ExecutorBuilder { + ExecutorBuilder:: { + strategy: HasStrategy::ExecuteDirectly, + max_concurrency: self.max_concurrency, + } + } + + /// Initializes a new global strategy to execute input closures by blocking on them inside the + /// tokio blocking threadpool via Tokio's [`spawn_blocking`]. + /// + /// Stores the current tokio runtime to spawn tasks with. To use an alternate + /// runtime, use [`ExecutorBuilder::spawn_blocking_with_handle`]. + /// + /// Requires `tokio` feature. + /// + /// # Panic + /// Calling this from outside a tokio runtime will panic. + /// + /// # Examples + /// + /// ``` + /// # async fn run() { + /// // this will include no concurrency limit when explicitly initialized + /// // without a call to [`concurrency_limit()`] + /// vacation::init().spawn_blocking().install().unwrap(); + /// # } + /// ``` + /// [`spawn_blocking`]: tokio::task::spawn_blocking + /// + #[must_use = "doesn't do anything unless install()-ed"] + #[cfg(feature = "tokio")] + pub fn spawn_blocking(self) -> ExecutorBuilder { + ExecutorBuilder:: { + strategy: HasStrategy::SpawnBlocking(tokio::runtime::Handle::current()), + max_concurrency: self.max_concurrency, + } + } + + /// Initializes a new global strategy to execute input closures by blocking on them inside the + /// tokio blocking threadpool via Tokio's [`spawn_blocking`], on a specific runtime. + /// + /// Uses the provided tokio runtime handle to decide which runtime to `spawn_blocking` onto. + /// + /// Requires `tokio` feature. + /// + /// # Examples + /// + /// ``` + /// # async fn run() { + /// // this will include no concurrency limit when explicitly initialized + /// // without a call to [`concurrency_limit()`] + /// let handle = tokio::runtime::Handle::current(); + /// vacation::init().spawn_blocking_with_handle(handle).install().unwrap(); + /// # } + /// ``` + /// [`spawn_blocking`]: tokio::task::spawn_blocking + /// + #[must_use = "doesn't do anything unless install()-ed"] + #[cfg(feature = "tokio")] + pub fn spawn_blocking_with_handle( + self, + runtime_handle: tokio::runtime::Handle, + ) -> ExecutorBuilder { + ExecutorBuilder:: { + strategy: HasStrategy::SpawnBlocking(runtime_handle), + max_concurrency: self.max_concurrency, + } + } + + /// Accepts a closure that will accept an arbitrary closure and call it. The input + /// function will be implicitly wrapped in a oneshot channel to avoid input/output types. + /// + /// Intended for injecting arbitrary runtimes/strategies or customizing existing ones. + /// + /// For instance, you could delegate to a [`Rayon threadpool`] or use Tokio's [`block_in_place`]. + /// See `tests/custom_executor_strategy.rs` for a `Rayon` example. + /// + /// + /// # Examples + /// + /// ``` + /// # async fn run() { + /// // caution: this will panic if used outside of tokio multithreaded runtime + /// // this is a kind of dangerous strategy, read up on `block in place's` limitations + /// // before using this approach + /// let closure = |f: vacation::CustomClosureInput| { + /// Box::new(async move { Ok(tokio::task::block_in_place(move || f())) }) as vacation::CustomClosureOutput + /// }; + /// + /// vacation::init().custom_executor(closure).install().unwrap(); + /// # } + /// + /// ``` + /// + /// [`Rayon threadpool`]: https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html + /// [`block_in_place`]: https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html + #[must_use = "doesn't do anything unless install()-ed"] + pub fn custom_executor(self, closure: Closure) -> ExecutorBuilder + where + Closure: Fn( + Box, + ) -> Box< + dyn Future>> + + Send + + 'static, + > + Send + + Sync + + 'static, + { + ExecutorBuilder:: { + strategy: HasStrategy::Custom(Box::new(closure)), + max_concurrency: self.max_concurrency, + } + } +} + +impl ExecutorBuilder { + /// Initializes the loaded configuration and stores it as a global strategy. + /// + /// # Error + /// Returns an error if the global strategy is already initialized. + /// It can only be initialized once. + /// + /// /// # Examples + /// + /// ``` + /// # async fn run() { + /// vacation::init().execute_directly().install().unwrap(); + /// # } + /// ``` + pub fn install(self) -> Result<(), Error> { + let executor = match self.strategy { + HasStrategy::ExecuteDirectly => { + Executor::ExecuteDirectly(ExecuteDirectly::new(self.max_concurrency)) + } + #[cfg(feature = "tokio")] + HasStrategy::SpawnBlocking(handle) => Executor::SpawnBlocking( + spawn_blocking::SpawnBlocking::new(handle, self.max_concurrency), + ), + HasStrategy::Custom(closure) => { + Executor::Custom(Custom::new(closure, self.max_concurrency)) + } + }; + + set_global_strategy(executor) + } +} diff --git a/src/executor/spawn_blocking.rs b/src/executor/spawn_blocking.rs new file mode 100644 index 0000000..ca42e46 --- /dev/null +++ b/src/executor/spawn_blocking.rs @@ -0,0 +1,37 @@ +use tokio::runtime::Handle; + +use crate::{concurrency_limit::ConcurrencyLimit, error::Error}; + +use super::Execute; + +pub(crate) struct SpawnBlocking { + concurrency_limit: ConcurrencyLimit, + handle: Handle, +} + +impl SpawnBlocking { + pub(crate) fn new(handle: Handle, max_concurrency: Option) -> Self { + let concurrency_limit = ConcurrencyLimit::new(max_concurrency); + + Self { + concurrency_limit, + handle, + } + } +} + +impl Execute for SpawnBlocking { + async fn execute(&self, f: F) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let _permit = self.concurrency_limit.acquire_permit().await; + + self.handle + .spawn_blocking(f) + .await + .map_err(Error::JoinError) + // permit implicitly drops + } +} diff --git a/src/lib.rs b/src/lib.rs index db6a430..b72a2df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,577 +1,139 @@ -#[cfg(feature = "tokio_block_in_place")] -mod block_in_place; +#![deny(missing_docs)] +#![deny(missing_debug_implementations)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![cfg_attr(test, deny(warnings))] +#![doc = include_str!("../README.md")] + mod concurrency_limit; -mod current_context; -mod custom_executor; -pub mod error; -#[cfg(feature = "secondary_tokio_runtime")] -mod secondary_tokio_runtime; -#[cfg(feature = "tokio")] -mod spawn_blocking; +mod error; +mod executor; -pub use custom_executor::CustomExecutorClosure; pub use error::Error; -#[cfg(feature = "secondary_tokio_runtime")] -pub use secondary_tokio_runtime::SecondaryTokioRuntimeStrategyBuilder; - -#[cfg(feature = "tokio_block_in_place")] -use block_in_place::BlockInPlaceExecutor; -use current_context::CurrentContextExecutor; -use custom_executor::CustomExecutor; -#[cfg(feature = "secondary_tokio_runtime")] -use secondary_tokio_runtime::SecondaryTokioRuntimeExecutor; -#[cfg(feature = "tokio")] -use spawn_blocking::SpawnBlockingExecutor; - -use std::{fmt::Debug, future::Future, sync::OnceLock}; - -use tokio::{select, sync::oneshot::Receiver}; - -// TODO: module docs, explain the point of this library, give some samples - -/// Initialize a builder to set the global compute heavy future -/// executor strategy. -#[must_use = "doesn't do anything unless used"] -pub fn global_strategy_builder() -> GlobalStrategyBuilder { - GlobalStrategyBuilder::default() -} - -/// Get the currently initialized strategy, or the default strategy for the -/// current feature and runtime type in case no strategy has been loaded. -pub fn global_strategy() -> CurrentStrategy { - match COMPUTE_HEAVY_FUTURE_EXECUTOR_STRATEGY.get() { - Some(strategy) => CurrentStrategy::Initialized(strategy.into()), - None => CurrentStrategy::Default(<&ExecutorStrategyImpl>::default().into()), - } -} - -#[must_use = "doesn't do anything unless used"] -#[derive(Default)] -pub struct GlobalStrategyBuilder { - max_concurrency: Option, -} +pub use executor::{ + custom::{CustomClosureInput, CustomClosureOutput}, + global_strategy, install_tokio_strategy, ExecutorBuilder, +}; -impl GlobalStrategyBuilder { - /// Set the max number of simultaneous futures processed by this executor. - /// - /// If this number is exceeded, the futures sent to - /// [`execute_compute_heavy_future()`] will sleep until a permit - /// can be acquired. - /// - /// ## Default - /// No maximum concurrency - /// - /// # Example - /// - /// ``` - /// use compute_heavy_future_executor::global_strategy_builder; - /// - /// # async fn run() { - /// global_strategy_builder() - /// .max_concurrency(10) - /// .initialize_current_context() - /// .unwrap(); - /// # } - pub fn max_concurrency(self, max_task_concurrency: usize) -> Self { - Self { - max_concurrency: Some(max_task_concurrency), - ..self - } - } - - /// Initializes a new global strategy to wait in the current context. - /// - /// This is effectively a non-op wrapper that adds no special handling for the future besides optional concurrency control. - /// This is the default if the `tokio` feature is disabled. - /// - /// # Cancellation - /// Yes, the future is dropped if the caller drops the returned future from - ///[`execute_compute_heavy_future()`]. - /// - /// Note that it will only be dropped across yield points in the case of long-blocking futures. - /// - /// ## Error - /// Returns an error if the global strategy is already initialized. - /// It can only be initialized once. - /// - /// # Example - /// - /// ``` - /// use compute_heavy_future_executor::global_strategy_builder; - /// use compute_heavy_future_executor::execute_compute_heavy_future; - /// - /// # async fn run() { - /// global_strategy_builder().initialize_current_context().unwrap(); - /// - /// let future = async { - /// std::thread::sleep(std::time::Duration::from_millis(50)); - /// 5 - /// }; - /// - /// let res = execute_compute_heavy_future(future).await.unwrap(); - /// assert_eq!(res, 5); - /// # } - /// ``` - pub fn initialize_current_context(self) -> Result<(), Error> { - let strategy = - ExecutorStrategyImpl::CurrentContext(CurrentContextExecutor::new(self.max_concurrency)); - set_strategy(strategy) - } +use executor::{get_global_executor, Execute, Executor, NeedsStrategy}; - /// Initializes a new global strategy to execute futures by blocking on them inside the - /// tokio blocking threadpool. This is the default strategy if none is explicitly initialized, - /// if the `tokio` feature is enabled. - /// - /// By default, tokio will spin up a blocking thread - /// per task, which may be more than your count of CPU cores, depending on runtime config. - /// - /// If you expect many concurrent cpu-heavy futures, consider limiting your blocking - /// tokio threadpool size. - /// Or, you can use a heavier weight strategy like [`initialize_secondary_tokio_runtime()`]. - /// - /// # Cancellation - /// Yes, the future is dropped if the caller drops the returned future - /// from [`execute_compute_heavy_future()`]. - /// - /// Note that it will only be dropped across yield points in the case of long-blocking futures. - /// - /// ## Error - /// Returns an error if the global strategy is already initialized. - /// It can only be initialized once. - /// - /// # Example - /// - /// ``` - /// use compute_heavy_future_executor::global_strategy_builder; - /// use compute_heavy_future_executor::execute_compute_heavy_future; - /// - /// # async fn run() { - /// global_strategy_builder().initialize_spawn_blocking().unwrap(); - /// - /// let future = async { - /// std::thread::sleep(std::time::Duration::from_millis(50)); - /// 5 - /// }; - /// - /// let res = execute_compute_heavy_future(future).await.unwrap(); - /// assert_eq!(res, 5); - /// # } - /// ``` - #[cfg(feature = "tokio")] - pub fn initialize_spawn_blocking(self) -> Result<(), Error> { - let strategy = - ExecutorStrategyImpl::SpawnBlocking(SpawnBlockingExecutor::new(self.max_concurrency)); - set_strategy(strategy) - } - - /// Initializes a new global strategy to execute futures by calling tokio::task::block_in_place - /// on the current tokio worker thread. This evicts other tasks on same worker thread to - /// avoid blocking them. - /// - /// This approach can starve your executor of worker threads if called with too many - /// concurrent cpu-heavy futures. - /// - /// If you expect many concurrent cpu-heavy futures, consider a - /// heavier weight strategy like [`initialize_secondary_tokio_runtime()`]. - /// - /// # Cancellation - /// No, this strategy does not allow futures to be cancelled. - /// - /// ## Error - /// Returns an error if called from a context besides a tokio multithreaded runtime. - /// - /// Returns an error if the global strategy is already initialized. - /// It can only be initialized once. - /// - /// # Example - /// - /// ``` - /// use compute_heavy_future_executor::global_strategy_builder; - /// use compute_heavy_future_executor::execute_compute_heavy_future; - /// - /// # async fn run() { - /// global_strategy_builder().initialize_block_in_place().unwrap(); - /// - /// let future = async { - /// std::thread::sleep(std::time::Duration::from_millis(50)); - /// 5 - /// }; - /// - /// let res = execute_compute_heavy_future(future).await.unwrap(); - /// assert_eq!(res, 5); - /// # } - /// ``` - #[cfg(feature = "tokio_block_in_place")] - pub fn initialize_block_in_place(self) -> Result<(), Error> { - let strategy = - ExecutorStrategyImpl::BlockInPlace(BlockInPlaceExecutor::new(self.max_concurrency)?); - set_strategy(strategy) - } - - /// Initializes a new global strategy that spins up a secondary background tokio runtime - /// that executes futures on lower priority worker threads. - /// - /// This uses certain defaults, listed below. To modify these defaults, - /// instead use [`secondary_tokio_runtime_builder()`] - /// - /// # Defaults - /// ## Thread niceness - /// The thread niceness for the secondary runtime's worker threads, - /// which on linux is used to increase or lower relative - /// OS scheduling priority. - /// - /// Default: 10 - /// - /// ## Thread count - /// The count of worker threads in the secondary tokio runtime. - /// - /// Default: CPU core count - /// - /// ## Channel size - /// The buffer size of the channel used to spawn tasks - /// in the background executor. - /// - /// Default: 10 - /// - /// ## Max task concurrency - /// The max number of simultaneous background tasks running - /// - /// Default: no limit - /// - /// # Cancellation - /// Yes, the future is dropped if the caller drops the returned future - /// from [`execute_compute_heavy_future()`]. - /// - /// Note that it will only be dropped across yield points in the case of long-blocking futures. - /// - /// ## Error - /// Returns an error if the global strategy is already initialized. - /// It can only be initialized once. - /// - /// # Example - /// - /// ``` - /// use compute_heavy_future_executor::global_strategy_builder; - /// use compute_heavy_future_executor::execute_compute_heavy_future; - /// - /// # async fn run() { - /// global_strategy_builder().initialize_secondary_tokio_runtime().unwrap(); - /// - /// let future = async { - /// std::thread::sleep(std::time::Duration::from_millis(50)); - /// 5 - /// }; - /// - /// let res = execute_compute_heavy_future(future).await.unwrap(); - /// assert_eq!(res, 5); - /// # } - /// ``` - #[cfg(feature = "secondary_tokio_runtime")] - pub fn initialize_secondary_tokio_runtime(self) -> Result<(), Error> { - self.secondary_tokio_runtime_builder().initialize() - } - - /// Creates a [`SecondaryTokioRuntimeStrategyBuilder`] for a customized secondary tokio runtime strategy. - /// - /// Subsequent calls on the returned builder allow modifying defaults. - /// - /// The returned builder will require calling [`SecondaryTokioRuntimeStrategyBuilder::initialize()`] to - /// ultimately load the strategy. - /// - /// # Cancellation - /// Yes, the future is dropped if the caller drops the returned future - /// from [`execute_compute_heavy_future()`]. - /// - /// Note that it will only be dropped across yield points in the case of long-blocking futures. - /// - /// # Example - /// - /// ``` - /// use compute_heavy_future_executor::global_strategy_builder; - /// use compute_heavy_future_executor::execute_compute_heavy_future; - /// - /// # async fn run() { - /// global_strategy_builder() - /// .secondary_tokio_runtime_builder() - /// .niceness(1) - /// .thread_count(2) - /// .channel_size(3) - /// .max_concurrency(4) - /// .initialize() - /// .unwrap(); - /// - /// let future = async { - /// std::thread::sleep(std::time::Duration::from_millis(50)); - /// 5 - /// }; - /// - /// let res = execute_compute_heavy_future(future).await.unwrap(); - /// assert_eq!(res, 5); - /// # } - /// ``` - #[cfg(feature = "secondary_tokio_runtime")] - #[must_use = "doesn't do anything unless used"] - pub fn secondary_tokio_runtime_builder(self) -> SecondaryTokioRuntimeStrategyBuilder { - SecondaryTokioRuntimeStrategyBuilder::new(self.max_concurrency) - } - - /// Accepts a closure that will poll an arbitrary feature to completion. - /// - /// Intended for injecting arbitrary runtimes/strategies or customizing existing ones. - /// - /// # Cancellation - /// Yes, the closure's returned future is dropped if the caller drops the returned future from [`execute_compute_heavy_future()`]. - /// Note that it will only be dropped across yield points in the case of long-blocking futures. - /// - /// ## Error - /// Returns an error if the global strategy is already initialized. - /// It can only be initialized once. - /// - /// # Example - /// - /// ``` - /// use compute_heavy_future_executor::global_strategy_builder; - /// use compute_heavy_future_executor::execute_compute_heavy_future; - /// use compute_heavy_future_executor::CustomExecutorClosure; - /// - /// // this isn't actually a good strategy, to be clear - /// # async fn run() { - /// let closure: CustomExecutorClosure = Box::new(|fut| { - /// Box::new( - /// async move { - /// tokio::task::spawn(async move { fut.await }) - /// .await - /// .map_err(|err| err.into()) - /// } - /// ) - /// }); - /// - /// global_strategy_builder().initialize_custom_executor(closure).unwrap(); - /// - /// let future = async { - /// std::thread::sleep(std::time::Duration::from_millis(50)); - /// 5 - /// }; - /// - /// let res = execute_compute_heavy_future(future).await.unwrap(); - /// assert_eq!(res, 5); - /// # } - /// - /// ``` - pub fn initialize_custom_executor(self, closure: CustomExecutorClosure) -> Result<(), Error> { - let strategy = ExecutorStrategyImpl::CustomExecutor(CustomExecutor::new( - closure, - self.max_concurrency, - )); - set_strategy(strategy) - } -} - -pub(crate) fn set_strategy(strategy: ExecutorStrategyImpl) -> Result<(), Error> { - COMPUTE_HEAVY_FUTURE_EXECUTOR_STRATEGY - .set(strategy) - .map_err(|_| { - Error::AlreadyInitialized(COMPUTE_HEAVY_FUTURE_EXECUTOR_STRATEGY.get().unwrap().into()) - })?; - - log::info!( - "initialized compute-heavy future executor strategy - {:#?}", - global_strategy() - ); - - Ok(()) -} -trait ComputeHeavyFutureExecutor { - /// Accepts a future and returns its result - async fn execute(&self, fut: F) -> Result - where - F: Future + Send + 'static, - O: Send + 'static; -} +use std::fmt::Debug; +/// The currently loaded global strategy. #[derive(Debug, Clone, Copy, PartialEq)] -pub enum CurrentStrategy { +pub enum GlobalStrategy { + /// The strategy loaded by default. Default(ExecutorStrategy), + /// Caller-specified strategy, replacing default Initialized(ExecutorStrategy), } +/// The different types of executor strategies that can be loaded. +/// See [`ExecutorBuilder`] for more detail on each strategy. +/// +/// # Examples +/// +/// ``` +/// use vacation::{ +/// global_strategy, +/// GlobalStrategy, +/// ExecutorStrategy +/// }; +/// +/// # fn run() { +/// +/// #[cfg(feature = "tokio")] +/// assert_eq!(global_strategy(), GlobalStrategy::Default(ExecutorStrategy::SpawnBlocking)); +/// +/// #[cfg(not(feature = "tokio"))] +/// assert_eq!(global_strategy(), GlobalStrategy::Default(ExecutorStrategy::ExecuteDirectly)); +/// +/// vacation::init() +/// .execute_directly() +/// .install() +/// .unwrap(); +/// +/// assert_eq!(global_strategy(), GlobalStrategy::Initialized(ExecutorStrategy::ExecuteDirectly)); +/// +/// # } +/// ``` #[non_exhaustive] #[derive(Debug, Clone, Copy, PartialEq)] pub enum ExecutorStrategy { /// A non-op strategy that awaits in the current context - CurrentContext, + ExecuteDirectly, /// User-provided closure - CustomExecutor, + Custom, /// tokio task::spawn_blocking #[cfg(feature = "tokio")] SpawnBlocking, - /// tokio task::block_in_place - #[cfg(feature = "tokio_block_in_place")] - BlockInPlace, - #[cfg(feature = "secondary_tokio_runtime")] - /// Spin up a second, lower-priority tokio runtime - /// that communicates via channels - SecondaryTokioRuntime, -} - -impl From<&ExecutorStrategyImpl> for ExecutorStrategy { - fn from(value: &ExecutorStrategyImpl) -> Self { - match value { - ExecutorStrategyImpl::CurrentContext(_) => Self::CurrentContext, - ExecutorStrategyImpl::CustomExecutor(_) => Self::CustomExecutor, - #[cfg(feature = "tokio")] - ExecutorStrategyImpl::SpawnBlocking(_) => Self::SpawnBlocking, - #[cfg(feature = "tokio_block_in_place")] - ExecutorStrategyImpl::BlockInPlace(_) => Self::BlockInPlace, - #[cfg(feature = "secondary_tokio_runtime")] - ExecutorStrategyImpl::SecondaryTokioRuntime(_) => Self::SecondaryTokioRuntime, - } - } -} - -/// The stored strategy used to spawn compute-heavy futures. -static COMPUTE_HEAVY_FUTURE_EXECUTOR_STRATEGY: OnceLock = OnceLock::new(); - -#[non_exhaustive] -enum ExecutorStrategyImpl { - /// A non-op strategy that awaits in the current context - CurrentContext(CurrentContextExecutor), - /// User-provided closure - CustomExecutor(CustomExecutor), - /// tokio task::spawn_blocking - #[cfg(feature = "tokio")] - SpawnBlocking(SpawnBlockingExecutor), - /// tokio task::block_in_place - #[cfg(feature = "tokio_block_in_place")] - BlockInPlace(BlockInPlaceExecutor), - #[cfg(feature = "secondary_tokio_runtime")] - /// Spin up a second, lower-priority tokio runtime - /// that communicates via channels - SecondaryTokioRuntime(SecondaryTokioRuntimeExecutor), } -impl ComputeHeavyFutureExecutor for ExecutorStrategyImpl { - async fn execute(&self, fut: F) -> Result - where - F: Future + Send + 'static, - O: Send + 'static, - { - match self { - ExecutorStrategyImpl::CurrentContext(executor) => executor.execute(fut).await, - ExecutorStrategyImpl::CustomExecutor(executor) => executor.execute(fut).await, - #[cfg(feature = "tokio")] - ExecutorStrategyImpl::SpawnBlocking(executor) => executor.execute(fut).await, - #[cfg(feature = "tokio_block_in_place")] - ExecutorStrategyImpl::BlockInPlace(executor) => executor.execute(fut).await, - #[cfg(feature = "secondary_tokio_runtime")] - ExecutorStrategyImpl::SecondaryTokioRuntime(executor) => executor.execute(fut).await, - } +/// Initialize a builder to set the global sync function +/// executor strategy. +/// +/// See [`ExecutorBuilder`] for details on strategies. +/// +/// # Examples +/// +/// ``` +/// # fn run() { +/// vacation::init().max_concurrency(3).spawn_blocking().install().unwrap(); +/// # } +/// ``` +#[must_use = "doesn't do anything unless used"] +pub fn init() -> ExecutorBuilder { + ExecutorBuilder { + strategy: NeedsStrategy, + max_concurrency: None, } } -/// The fallback strategy used in case no strategy is explicitly set -static DEFAULT_COMPUTE_HEAVY_FUTURE_EXECUTOR_STRATEGY: OnceLock = - OnceLock::new(); - -impl Default for &ExecutorStrategyImpl { - fn default() -> Self { - &DEFAULT_COMPUTE_HEAVY_FUTURE_EXECUTOR_STRATEGY.get_or_init(|| { - #[cfg(feature = "tokio")] - { - log::info!("Defaulting to SpawnBlocking strategy for compute-heavy future executor \ - until a strategy is initialized"); - - ExecutorStrategyImpl::SpawnBlocking(SpawnBlockingExecutor::new(None)) - } - - #[cfg(not(feature = "tokio"))] - { - log::warn!("Defaulting to CurrentContext (non-op) strategy for compute-heavy future executor \ - until a strategy is initialized."); - ExecutorStrategyImpl::CurrentContext(CurrentContextExecutor::new(None)) - } - }) - } +/// Likelihood of the provided closure blocking for a significant period of time. +/// Will eventually be used to customize strategies with more granularity. +#[derive(Debug, Clone, Copy)] +pub enum ChanceOfBlocking { + /// Very likely to block, use primary sync strategy + High, } -/// Spawn a future to the configured compute-heavy executor and wait on its output. +/// Send a sync closure to the configured or default global compute-heavy executor, and wait on its output. /// /// # Strategy selection /// -/// If no strategy is configured, this library will fall back to the following defaults: -/// - no `tokio`` feature - current context -/// - all other cases - spawn blocking -/// -/// You can override these defaults by initializing a strategy via [`global_strategy_builder()`] -/// and [`GlobalStrategyBuilder`]. +/// If no strategy is configured, this library will fall back to a non-op `ExecuteDirectly` strategy. /// -/// # Cancellation +/// You can override these defaults by initializing a strategy via [`init()`] +/// and [`ExecutorBuilder`]. /// -/// Most strategies will cancel the input future, if the caller drops the returned future, -/// with the following exception: -/// - the block in place strategy never cancels the future (until the executor is shut down) -/// -/// # Example +/// # Examples /// /// ``` /// # async fn run() { -/// use compute_heavy_future_executor::execute_compute_heavy_future; -/// -/// let future = async { -/// std::thread::sleep(std::time::Duration::from_millis(50)); +/// let closure = || { +/// std::thread::sleep(std::time::Duration::from_secs(1)); /// 5 -/// }; +/// }; /// -/// let res = execute_compute_heavy_future(future).await.unwrap(); +/// let res = vacation::execute(closure, vacation::ChanceOfBlocking::High).await.unwrap(); /// assert_eq!(res, 5); /// # } /// /// ``` /// -pub async fn execute_compute_heavy_future(fut: F) -> Result +pub async fn execute(f: F, _chance_of_blocking: ChanceOfBlocking) -> Result where - F: Future + Send + 'static, + F: FnOnce() -> R + Send + 'static, R: Send + 'static, + Error: Send + Sync + 'static, { - let executor = COMPUTE_HEAVY_FUTURE_EXECUTOR_STRATEGY - .get() - .unwrap_or_else(|| <&ExecutorStrategyImpl>::default()); + let executor = get_global_executor(); + match executor { - ExecutorStrategyImpl::CurrentContext(executor) => executor.execute(fut).await, - ExecutorStrategyImpl::CustomExecutor(executor) => executor.execute(fut).await, - #[cfg(feature = "tokio_block_in_place")] - ExecutorStrategyImpl::BlockInPlace(executor) => executor.execute(fut).await, + Executor::ExecuteDirectly(executor) => executor.execute(f).await, + Executor::Custom(executor) => executor.execute(f).await, #[cfg(feature = "tokio")] - ExecutorStrategyImpl::SpawnBlocking(executor) => executor.execute(fut).await, - #[cfg(feature = "secondary_tokio_runtime")] - ExecutorStrategyImpl::SecondaryTokioRuntime(executor) => executor.execute(fut).await, + Executor::SpawnBlocking(executor) => executor.execute(f).await, } } -pub fn make_future_cancellable(fut: F) -> (impl Future, Receiver) -where - F: std::future::Future + Send + 'static, - O: Send + 'static, -{ - let (mut tx, rx) = tokio::sync::oneshot::channel(); - let wrapped_future = async { - select! { - // if tx is closed, we always want to poll that future first, - // so we don't need to add rng - biased; - - _ = tx.closed() => { - // receiver already dropped, don't need to do anything - // cancel the background future - }, - result = fut => { - // if this fails, the receiver already dropped, so we don't need to do anything - let _ = tx.send(result); - } - } - }; - - (wrapped_future, rx) -} - // tests are in /tests/ to allow separate initialization of oncelock across processes when using default cargo test runner diff --git a/src/secondary_tokio_runtime.rs b/src/secondary_tokio_runtime.rs deleted file mode 100644 index 318418c..0000000 --- a/src/secondary_tokio_runtime.rs +++ /dev/null @@ -1,221 +0,0 @@ -use std::{future::Future, pin::Pin}; - -use tokio::sync::mpsc::Sender; - -use crate::{ - concurrency_limit::ConcurrencyLimit, - error::{Error, InvalidConfig}, - make_future_cancellable, set_strategy, ComputeHeavyFutureExecutor, ExecutorStrategyImpl, -}; - -const DEFAULT_NICENESS: i8 = 10; -const DEFAULT_CHANNEL_SIZE: usize = 10; - -fn default_thread_count() -> usize { - num_cpus::get() -} - -/// Extention of [`GlobalStrategyBuilder`] for a customized secondary tokio runtime strategy. -/// -/// Requires calling [`SecondaryTokioRuntimeStrategyBuilder::initialize()`] to -/// initialize the strategy. -/// -/// # Example -/// -/// ``` -/// use compute_heavy_future_executor::global_strategy_builder; -/// use compute_heavy_future_executor::execute_compute_heavy_future; -/// -/// # async fn run() { -/// global_strategy_builder() -/// .secondary_tokio_runtime_builder() -/// .niceness(1) -/// .thread_count(2) -/// .channel_size(3) -/// .max_concurrency(4) -/// .initialize() -/// .unwrap(); -/// # } -/// ``` -#[must_use = "doesn't do anything unless used"] -#[derive(Default)] -pub struct SecondaryTokioRuntimeStrategyBuilder { - niceness: Option, - thread_count: Option, - channel_size: Option, - // passed down from the parent `GlobalStrategy` builder, not modified internally - max_concurrency: Option, -} - -impl SecondaryTokioRuntimeStrategyBuilder { - pub(crate) fn new(max_concurrency: Option) -> Self { - Self { - max_concurrency, - ..Default::default() - } - } -} - -impl SecondaryTokioRuntimeStrategyBuilder { - /// Set the thread niceness for the secondary runtime's worker threads, - /// which on linux is used to increase or lower relative - /// OS scheduling priority. - /// - /// Allowed values are -20..=19 - /// - /// ## Default - /// - /// The default value is 10. - pub fn niceness(self, niceness: i8) -> Self { - Self { - niceness: Some(niceness), - ..self - } - } - - /// Set the count of worker threads in the secondary tokio runtime. - /// - /// ## Default - /// - /// The default value is the number of cpu cores - pub fn thread_count(self, thread_count: usize) -> Self { - Self { - thread_count: Some(thread_count), - ..self - } - } - - /// Set the buffer size of the channel used to spawn tasks - /// in the background executor. - /// - /// ## Default - /// - /// The default value is 10 - pub fn channel_size(self, channel_size: usize) -> Self { - Self { - channel_size: Some(channel_size), - ..self - } - } - - /// Set the max number of simultaneous futures processed by this executor. - /// - /// Yes, the future is dropped if the caller drops the returned future from - ///[`execute_compute_heavy_future()`]. - /// - /// ## Default - /// No maximum concurrency - pub fn max_concurrency(self, max_task_concurrency: usize) -> Self { - Self { - max_concurrency: Some(max_task_concurrency), - ..self - } - } - - pub fn initialize(self) -> Result<(), Error> { - let niceness = self.niceness.unwrap_or(DEFAULT_NICENESS); - - // please https://github.com/rust-lang/rfcs/issues/671 - if !(-20..=19).contains(&niceness) { - return Err(Error::InvalidConfig(InvalidConfig { - field: "niceness", - received: niceness.to_string(), - expected: "-20..=19", - })); - } - - let thread_count = self.thread_count.unwrap_or_else(|| default_thread_count()); - let channel_size = self.channel_size.unwrap_or(DEFAULT_CHANNEL_SIZE); - - let executor = SecondaryTokioRuntimeExecutor::new( - niceness, - thread_count, - channel_size, - self.max_concurrency, - ); - - set_strategy(ExecutorStrategyImpl::SecondaryTokioRuntime(executor)) - } -} - -type BackgroundFuture = Pin + Send + 'static>>; - -pub(crate) struct SecondaryTokioRuntimeExecutor { - tx: Sender, - concurrency_limit: ConcurrencyLimit, -} - -impl SecondaryTokioRuntimeExecutor { - pub(crate) fn new( - niceness: i8, - thread_count: usize, - channel_size: usize, - max_concurrency: Option, - ) -> Self { - // channel is only for routing work to new task::spawn so should be very quick - let (tx, mut rx) = tokio::sync::mpsc::channel(channel_size); - - std::thread::Builder::new() - .name("compute-heavy-executor".to_string()) - .spawn(move || { - let rt = tokio::runtime::Builder::new_multi_thread() - .thread_name("compute-heavy-executor-pool-thread") - .worker_threads(thread_count) - .on_thread_start(move || unsafe { - // Reduce thread pool thread niceness, so they are lower priority - // than the foreground executor and don't interfere with I/O tasks - #[cfg(target_os = "linux")] - { - *libc::__errno_location() = 0; - if libc::nice(niceness.into()) == -1 && *libc::__errno_location() != 0 { - let error = std::io::Error::last_os_error(); - log::error!("failed to set threadpool niceness of secondary compute-heavy tokio executor: {}", error); - } - } - }) - .enable_all() - .build() - .unwrap_or_else(|e| panic!("cpu heavy runtime failed_to_initialize: {}", e)); - - rt.block_on(async { - log::debug!("starting to process work on secondary compute-heavy tokio executor"); - - while let Some(work) = rx.recv().await { - tokio::task::spawn(async move { - work.await - }); - } - }); - log::warn!("exiting secondary compute heavy tokio runtime because foreground channel closed"); - }) - .unwrap_or_else(|e| panic!("secondary compute-heavy runtime thread failed_to_initialize: {}", e)); - - Self { - tx, - concurrency_limit: ConcurrencyLimit::new(max_concurrency), - } - } -} - -impl ComputeHeavyFutureExecutor for SecondaryTokioRuntimeExecutor { - async fn execute(&self, fut: F) -> Result - where - F: std::future::Future + Send + 'static, - O: Send + 'static, - { - let _permit = self.concurrency_limit.acquire_permit().await; - - let (wrapped_future, rx) = make_future_cancellable(fut); - - match self.tx.send(Box::pin(wrapped_future)).await { - Ok(_) => (), - Err(err) => { - panic!("secondary compute-heavy runtime channel cannot be reached: {err}") - } - } - - rx.await.map_err(|err| Error::RecvError(err)) - - // permit implicitly drops - } -} diff --git a/src/spawn_blocking.rs b/src/spawn_blocking.rs deleted file mode 100644 index fee12a6..0000000 --- a/src/spawn_blocking.rs +++ /dev/null @@ -1,38 +0,0 @@ -use crate::{ - concurrency_limit::ConcurrencyLimit, error::Error, make_future_cancellable, - ComputeHeavyFutureExecutor, -}; - -pub(crate) struct SpawnBlockingExecutor { - concurrency_limit: ConcurrencyLimit, -} - -impl SpawnBlockingExecutor { - pub(crate) fn new(max_concurrency: Option) -> Self { - let concurrency_limit = ConcurrencyLimit::new(max_concurrency); - - Self { concurrency_limit } - } -} - -impl ComputeHeavyFutureExecutor for SpawnBlockingExecutor { - async fn execute(&self, fut: F) -> Result - where - F: std::future::Future + Send + 'static, - O: Send + 'static, - { - let _permit = self.concurrency_limit.acquire_permit().await; - - let (wrapped_future, rx) = make_future_cancellable(fut); - - if let Err(err) = tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(wrapped_future) - }) - .await - { - return Err(Error::JoinError(err)); - } - - rx.await.map_err(|err| Error::RecvError(err)) - } -} diff --git a/tests/block_in_place_strategy.rs b/tests/block_in_place_strategy.rs deleted file mode 100644 index c425260..0000000 --- a/tests/block_in_place_strategy.rs +++ /dev/null @@ -1,58 +0,0 @@ -#[cfg(feature = "tokio_block_in_place")] -mod test { - use std::time::Duration; - - use compute_heavy_future_executor::{ - execute_compute_heavy_future, global_strategy, global_strategy_builder, CurrentStrategy, - ExecutorStrategy, - }; - use futures_util::future::join_all; - - fn initialize() { - // we are racing all tests against the single oncelock - let _ = global_strategy_builder() - .max_concurrency(3) - .initialize_block_in_place(); - } - - #[cfg(feature = "tokio_block_in_place")] - #[tokio::test(flavor = "multi_thread")] - async fn block_in_place_strategy() { - initialize(); - - let future = async { 5 }; - - let res = execute_compute_heavy_future(future).await.unwrap(); - assert_eq!(res, 5); - - assert_eq!( - global_strategy(), - CurrentStrategy::Initialized(ExecutorStrategy::BlockInPlace) - ); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 10)] - async fn block_in_place_concurrency() { - initialize(); - - let start = std::time::Instant::now(); - - let mut futures = Vec::new(); - - for _ in 0..5 { - let future = async move { std::thread::sleep(Duration::from_millis(15)) }; - // we need to spawn here since otherwise block in place will cancel other futures inside the same task, - // ref https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html - let future = - tokio::task::spawn(async move { execute_compute_heavy_future(future).await }); - futures.push(future); - } - - join_all(futures).await; - - let elapsed_millis = start.elapsed().as_millis(); - assert!(elapsed_millis < 50, "futures did not run concurrently"); - - assert!(elapsed_millis > 20, "futures exceeded max concurrency"); - } -} diff --git a/tests/block_in_place_wrong_runtime.rs b/tests/block_in_place_wrong_runtime.rs deleted file mode 100644 index dec7b72..0000000 --- a/tests/block_in_place_wrong_runtime.rs +++ /dev/null @@ -1,9 +0,0 @@ -#[cfg(feature = "tokio_block_in_place")] -#[tokio::test] -async fn block_in_place_wrong_runtime() { - use compute_heavy_future_executor::{global_strategy_builder, Error}; - - let res = global_strategy_builder().initialize_block_in_place(); - - assert!(matches!(res, Err(Error::InvalidConfig(_)))); -} diff --git a/tests/current_context_default.rs b/tests/current_context_default.rs deleted file mode 100644 index 3985b67..0000000 --- a/tests/current_context_default.rs +++ /dev/null @@ -1,19 +0,0 @@ -#[cfg(not(feature = "tokio"))] -#[tokio::test] -async fn default_to_current_context_tokio_single_threaded() { - use compute_heavy_future_executor::{ - execute_compute_heavy_future, global_strategy, CurrentStrategy, ExecutorStrategy, - }; - - // this is a tokio test but we haven't enabled the tokio config flag - - let future = async { 5 }; - - let res = execute_compute_heavy_future(future).await.unwrap(); - assert_eq!(res, 5); - - assert_eq!( - global_strategy(), - CurrentStrategy::Default(ExecutorStrategy::CurrentContext) - ); -} diff --git a/tests/current_context_strategy.rs b/tests/current_context_strategy.rs deleted file mode 100644 index 2669d82..0000000 --- a/tests/current_context_strategy.rs +++ /dev/null @@ -1,78 +0,0 @@ -use std::time::Duration; - -use compute_heavy_future_executor::{ - execute_compute_heavy_future, global_strategy, global_strategy_builder, CurrentStrategy, - ExecutorStrategy, -}; -use futures_util::future::join_all; -use tokio::select; - -fn initialize() { - // we are racing all tests against the single oncelock - let _ = global_strategy_builder() - .max_concurrency(3) - .initialize_current_context(); -} - -#[tokio::test] -async fn current_context_strategy() { - initialize(); - - let future = async { 5 }; - - let res = execute_compute_heavy_future(future).await.unwrap(); - assert_eq!(res, 5); - - assert_eq!( - global_strategy(), - CurrentStrategy::Initialized(ExecutorStrategy::CurrentContext) - ); -} - -#[tokio::test] -async fn current_context_cancellable() { - initialize(); - - let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let future = async move { - { - tokio::time::sleep(Duration::from_secs(60)).await; - let _ = tx.send(()); - } - }; - - select! { - _ = tokio::time::sleep(Duration::from_millis(4)) => { }, - _ = execute_compute_heavy_future(future) => {} - } - - tokio::time::sleep(Duration::from_millis(8)).await; - - // future should have been cancelled when spawn compute heavy future was dropped - assert_eq!( - rx.try_recv(), - Err(tokio::sync::oneshot::error::TryRecvError::Closed) - ); -} - -#[tokio::test] -async fn current_context_concurrency() { - initialize(); - - let start = std::time::Instant::now(); - - let mut futures = Vec::new(); - - for _ in 0..5 { - // can't use std::thread::sleep because this is all in the same thread - let future = async move { tokio::time::sleep(Duration::from_millis(15)).await }; - futures.push(execute_compute_heavy_future(future)); - } - - join_all(futures).await; - - let elapsed_millis = start.elapsed().as_millis(); - assert!(elapsed_millis < 50, "futures did not run concurrently"); - - assert!(elapsed_millis > 20, "futures exceeded max concurrency"); -} diff --git a/tests/custom_executor_simple.rs b/tests/custom_executor_simple.rs deleted file mode 100644 index 5999adc..0000000 --- a/tests/custom_executor_simple.rs +++ /dev/null @@ -1,28 +0,0 @@ -use compute_heavy_future_executor::{ - execute_compute_heavy_future, global_strategy, global_strategy_builder, CurrentStrategy, - CustomExecutorClosure, ExecutorStrategy, -}; - -#[tokio::test] -async fn custom_executor_simple() { - let closure: CustomExecutorClosure = Box::new(|fut| { - Box::new(async move { - let res = fut.await; - Ok(res) - }) - }); - - global_strategy_builder() - .initialize_custom_executor(closure) - .unwrap(); - - let future = async { 5 }; - - let res = execute_compute_heavy_future(future).await.unwrap(); - assert_eq!(res, 5); - - assert_eq!( - global_strategy(), - CurrentStrategy::Initialized(ExecutorStrategy::CustomExecutor) - ); -} diff --git a/tests/custom_executor_strategy.rs b/tests/custom_executor_strategy.rs deleted file mode 100644 index 210f5bf..0000000 --- a/tests/custom_executor_strategy.rs +++ /dev/null @@ -1,80 +0,0 @@ -use std::time::Duration; - -use compute_heavy_future_executor::{ - execute_compute_heavy_future, global_strategy_builder, CustomExecutorClosure, -}; -use futures_util::future::join_all; -use tokio::select; - -fn initialize() { - let closure: CustomExecutorClosure = Box::new(|fut| { - Box::new(async move { - tokio::task::spawn(async move { fut.await }) - .await - .map_err(|err| err.into()) - }) - }); - - // we are racing all tests against the single oncelock - let _ = global_strategy_builder() - .max_concurrency(3) - .initialize_custom_executor(closure); -} - -#[tokio::test] -async fn custom_executor_strategy() { - initialize(); - - let future = async { 5 }; - - let res = execute_compute_heavy_future(future).await.unwrap(); - assert_eq!(res, 5); -} - -#[tokio::test] -async fn custom_executor_concurrency() { - initialize(); - - let start = std::time::Instant::now(); - - let mut futures = Vec::new(); - - for _ in 0..5 { - // can't use std::thread::sleep because this is all in the same thread - let future = async move { tokio::time::sleep(Duration::from_millis(15)).await }; - futures.push(execute_compute_heavy_future(future)); - } - - join_all(futures).await; - - let elapsed_millis = start.elapsed().as_millis(); - assert!(elapsed_millis < 50, "futures did not run concurrently"); - - assert!(elapsed_millis > 20, "futures exceeded max concurrency"); -} - -#[tokio::test] -async fn custom_executor_cancellable() { - initialize(); - - let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let future = async move { - { - tokio::time::sleep(Duration::from_secs(60)).await; - let _ = tx.send(()); - } - }; - - select! { - _ = tokio::time::sleep(Duration::from_millis(4)) => { }, - _ = execute_compute_heavy_future(future) => {} - } - - tokio::time::sleep(Duration::from_millis(8)).await; - - // future should have been cancelled when spawn compute heavy future was dropped - assert_eq!( - rx.try_recv(), - Err(tokio::sync::oneshot::error::TryRecvError::Closed) - ); -} diff --git a/tests/custom_simple.rs b/tests/custom_simple.rs new file mode 100644 index 0000000..7c38faa --- /dev/null +++ b/tests/custom_simple.rs @@ -0,0 +1,31 @@ +use std::time::Duration; + +use vacation::{ + execute, global_strategy, init, ChanceOfBlocking, CustomClosureInput, CustomClosureOutput, + ExecutorStrategy, GlobalStrategy, +}; + +#[tokio::test] +async fn custom_simple() { + let custom_closure = |f: CustomClosureInput| { + Box::new(async move { + f(); + Ok(()) + }) as CustomClosureOutput + }; + + init().custom_executor(custom_closure).install().unwrap(); + + let closure = || { + std::thread::sleep(Duration::from_millis(15)); + 5 + }; + + let res = execute(closure, ChanceOfBlocking::High).await.unwrap(); + assert_eq!(res, 5); + + assert_eq!( + global_strategy(), + GlobalStrategy::Initialized(ExecutorStrategy::Custom) + ); +} diff --git a/tests/custom_strategy.rs b/tests/custom_strategy.rs new file mode 100644 index 0000000..7f94bdc --- /dev/null +++ b/tests/custom_strategy.rs @@ -0,0 +1,63 @@ +use std::{sync::OnceLock, time::Duration}; + +use futures_util::future::join_all; +use rayon::ThreadPool; +use vacation::{execute, init, ChanceOfBlocking, CustomClosureInput, CustomClosureOutput}; + +static THREADPOOL: OnceLock = OnceLock::new(); + +fn initialize() { + THREADPOOL.get_or_init(|| rayon::ThreadPoolBuilder::default().build().unwrap()); + + let custom_closure = |f: CustomClosureInput| { + Box::new(async move { + THREADPOOL.get().unwrap().spawn(f); + Ok(()) + }) as CustomClosureOutput + }; + + let _ = init() + .max_concurrency(3) + .custom_executor(custom_closure) + .install(); +} + +#[tokio::test] +async fn custom_strategy() { + initialize(); + + let closure = || { + std::thread::sleep(Duration::from_millis(15)); + 5 + }; + + let res = execute(closure, ChanceOfBlocking::High).await.unwrap(); + assert_eq!(res, 5); +} + +#[tokio::test] +async fn custom_concurrency() { + initialize(); + + let start = std::time::Instant::now(); + + let mut futures = Vec::new(); + + let closure = || { + std::thread::sleep(Duration::from_millis(15)); + 5 + }; + tokio::time::sleep(Duration::from_millis(5)).await; + + // note that we also are racing against concurrency from other tests in this module + for _ in 0..6 { + futures.push(execute(closure, ChanceOfBlocking::High)); + } + + join_all(futures).await; + + let elapsed_millis = start.elapsed().as_millis(); + assert!(elapsed_millis < 50, "futures did not run concurrently"); + + assert!(elapsed_millis > 20, "futures exceeded max concurrency"); +} diff --git a/tests/execute_directly_default.rs b/tests/execute_directly_default.rs new file mode 100644 index 0000000..c2ab22a --- /dev/null +++ b/tests/execute_directly_default.rs @@ -0,0 +1,30 @@ +#[tokio::test] +async fn default_to_execute_directly() { + use std::time::Duration; + + use vacation::{execute, global_strategy, ChanceOfBlocking, ExecutorStrategy, GlobalStrategy}; + + // this is a tokio test but we haven't enabled the tokio config flag + + let closure = || { + std::thread::sleep(Duration::from_millis(5)); + 5 + }; + + let res = execute(closure, ChanceOfBlocking::High).await.unwrap(); + assert_eq!(res, 5); + + assert_eq!( + global_strategy(), + GlobalStrategy::Default(ExecutorStrategy::ExecuteDirectly) + ); + + // make sure we can continue to call it without failures due to repeat initialization + let res = execute(closure, ChanceOfBlocking::High).await.unwrap(); + assert_eq!(res, 5); + + assert_eq!( + global_strategy(), + GlobalStrategy::Default(ExecutorStrategy::ExecuteDirectly) + ); +} diff --git a/tests/execute_directly_strategy.rs b/tests/execute_directly_strategy.rs new file mode 100644 index 0000000..d599157 --- /dev/null +++ b/tests/execute_directly_strategy.rs @@ -0,0 +1,60 @@ +use std::time::Duration; + +use futures_util::future::join_all; +use vacation::{ + execute, global_strategy, init, ChanceOfBlocking, ExecutorStrategy, GlobalStrategy, +}; + +fn initialize() { + // we are racing all tests against the single oncelock + let _ = init().max_concurrency(3).execute_directly().install(); +} + +#[tokio::test] +async fn execute_directly_strategy() { + initialize(); + + let closure = || { + std::thread::sleep(Duration::from_millis(15)); + 5 + }; + + let res = execute(closure, ChanceOfBlocking::High).await.unwrap(); + assert_eq!(res, 5); + + assert_eq!( + global_strategy(), + GlobalStrategy::Initialized(ExecutorStrategy::ExecuteDirectly) + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn execute_directly_concurrency() { + initialize(); + + let start = std::time::Instant::now(); + + let mut futures = Vec::new(); + + let closure = || { + std::thread::sleep(Duration::from_millis(15)); + 5 + }; + + // note that we also are racing against concurrency from other tests in this module + for _ in 0..6 { + // we need to spawn tasks since otherwise we'll just block the current worker thread + let future = async move { + tokio::task::spawn(async move { execute(closure, ChanceOfBlocking::High).await }).await + }; + futures.push(future); + } + tokio::time::sleep(Duration::from_millis(5)).await; + + join_all(futures).await; + + let elapsed_millis = start.elapsed().as_millis(); + assert!(elapsed_millis < 50, "futures did not run concurrently"); + + assert!(elapsed_millis > 20, "futures exceeded max concurrency"); +} diff --git a/tests/multiple_initialize_err.rs b/tests/multiple_initialize_err.rs index 5ad3e2d..55a2bf7 100644 --- a/tests/multiple_initialize_err.rs +++ b/tests/multiple_initialize_err.rs @@ -1,13 +1,11 @@ -use compute_heavy_future_executor::{error::Error, global_strategy_builder}; +use vacation::{init, Error}; #[test] fn multiple_initialize_err() { - global_strategy_builder() - .initialize_current_context() - .unwrap(); + init().execute_directly().install().unwrap(); assert!(matches!( - global_strategy_builder().initialize_current_context(), + init().execute_directly().install(), Err(Error::AlreadyInitialized(_)) )); } diff --git a/tests/multiple_initialize_err_with_secondary_runtime_builder.rs b/tests/multiple_initialize_err_with_secondary_runtime_builder.rs deleted file mode 100644 index 5ec6f1b..0000000 --- a/tests/multiple_initialize_err_with_secondary_runtime_builder.rs +++ /dev/null @@ -1,16 +0,0 @@ -#[cfg(feature = "secondary_tokio_runtime")] -#[test] -fn multiple_initialize_err_with_secondary_runtime_builder() { - use compute_heavy_future_executor::{error::Error, global_strategy_builder}; - - let builder = global_strategy_builder().secondary_tokio_runtime_builder(); // not yet initialized - - global_strategy_builder() - .initialize_current_context() - .unwrap(); - - assert!(matches!( - builder.initialize(), - Err(Error::AlreadyInitialized(_)) - )); -} diff --git a/tests/secondary_tokio_builder_allowed_config.rs b/tests/secondary_tokio_builder_allowed_config.rs deleted file mode 100644 index c04b7da..0000000 --- a/tests/secondary_tokio_builder_allowed_config.rs +++ /dev/null @@ -1,19 +0,0 @@ -#[cfg(feature = "secondary_tokio_runtime")] -#[tokio::test] -async fn secondary_tokio_runtime_builder_allowed_config() { - use compute_heavy_future_executor::{execute_compute_heavy_future, global_strategy_builder}; - - global_strategy_builder() - .max_concurrency(5) - .secondary_tokio_runtime_builder() - .channel_size(10) - .niceness(5) - .thread_count(2) - .initialize() - .unwrap(); - - let future = async { 5 }; - - let res = execute_compute_heavy_future(future).await.unwrap(); - assert_eq!(res, 5); -} diff --git a/tests/secondary_tokio_builder_disallowed_config.rs b/tests/secondary_tokio_builder_disallowed_config.rs deleted file mode 100644 index 51b512a..0000000 --- a/tests/secondary_tokio_builder_disallowed_config.rs +++ /dev/null @@ -1,14 +0,0 @@ -#[cfg(feature = "secondary_tokio_runtime")] -#[tokio::test] -#[should_panic] -async fn secondary_tokio_runtime_builder_disallowed_config() { - use compute_heavy_future_executor::{error::Error, global_strategy_builder}; - - let res = global_strategy_builder() - .secondary_tokio_runtime_builder() - .channel_size(10) - .niceness(5) - .initialize(); - - assert!(matches!(res, Err(Error::InvalidConfig(_)))); -} diff --git a/tests/secondary_tokio_strategy.rs b/tests/secondary_tokio_strategy.rs deleted file mode 100644 index c0379f5..0000000 --- a/tests/secondary_tokio_strategy.rs +++ /dev/null @@ -1,81 +0,0 @@ -#[cfg(feature = "secondary_tokio_runtime")] -mod test { - use std::time::Duration; - - use futures_util::future::join_all; - use tokio::select; - - use compute_heavy_future_executor::{ - execute_compute_heavy_future, global_strategy, global_strategy_builder, CurrentStrategy, - ExecutorStrategy, - }; - - fn initialize() { - // we are racing all tests against the single oncelock - let _ = global_strategy_builder() - .max_concurrency(3) - .initialize_secondary_tokio_runtime(); - } - - #[tokio::test] - async fn secondary_tokio_runtime_strategy() { - initialize(); - - let future = async { 5 }; - - let res = execute_compute_heavy_future(future).await.unwrap(); - assert_eq!(res, 5); - - assert_eq!( - global_strategy(), - CurrentStrategy::Initialized(ExecutorStrategy::SecondaryTokioRuntime) - ); - } - - #[tokio::test] - async fn secondary_tokio_runtime_concurrency() { - initialize(); - - let start = std::time::Instant::now(); - - let mut futures = Vec::new(); - - for _ in 0..5 { - let future = async move { std::thread::sleep(Duration::from_millis(15)) }; - futures.push(execute_compute_heavy_future(future)); - } - - join_all(futures).await; - - let elapsed_millis = start.elapsed().as_millis(); - assert!(elapsed_millis < 50, "futures did not run concurrently"); - - assert!(elapsed_millis > 20, "futures exceeded max concurrency"); - } - - #[tokio::test] - async fn secondary_tokio_runtime_strategy_cancel_safe() { - initialize(); - - let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let future = async move { - { - tokio::time::sleep(Duration::from_secs(60)).await; - let _ = tx.send(()); - } - }; - - select! { - _ = tokio::time::sleep(Duration::from_millis(4)) => { }, - _ = execute_compute_heavy_future(future) => {} - } - - tokio::time::sleep(Duration::from_millis(8)).await; - - // future should have been cancelled when spawn compute heavy future was dropped - assert_eq!( - rx.try_recv(), - Err(tokio::sync::oneshot::error::TryRecvError::Closed) - ); - } -} diff --git a/tests/spawn_blocking_default.rs b/tests/spawn_blocking_default.rs deleted file mode 100644 index 3214f99..0000000 --- a/tests/spawn_blocking_default.rs +++ /dev/null @@ -1,17 +0,0 @@ -#[cfg(feature = "tokio")] -#[tokio::test] -async fn spawn_blocking_strategy() { - use compute_heavy_future_executor::{ - execute_compute_heavy_future, global_strategy, CurrentStrategy, ExecutorStrategy, - }; - - let future = async { 5 }; - - let res = execute_compute_heavy_future(future).await.unwrap(); - assert_eq!(res, 5); - - assert_eq!( - global_strategy(), - CurrentStrategy::Default(ExecutorStrategy::SpawnBlocking) - ); -} diff --git a/tests/spawn_blocking_strategy.rs b/tests/spawn_blocking_strategy.rs index b44d05f..466fe11 100644 --- a/tests/spawn_blocking_strategy.rs +++ b/tests/spawn_blocking_strategy.rs @@ -3,32 +3,31 @@ mod test { use std::time::Duration; use futures_util::future::join_all; - use tokio::select; - use compute_heavy_future_executor::{ - execute_compute_heavy_future, global_strategy, global_strategy_builder, CurrentStrategy, - ExecutorStrategy, + use vacation::{ + execute, global_strategy, init, ChanceOfBlocking, ExecutorStrategy, GlobalStrategy, }; fn initialize() { // we are racing all tests against the single oncelock - let _ = global_strategy_builder() - .max_concurrency(3) - .initialize_spawn_blocking(); + let _ = init().max_concurrency(3).spawn_blocking().install(); } #[tokio::test] async fn spawn_blocking_strategy() { initialize(); - let future = async { 5 }; + let closure = || { + std::thread::sleep(Duration::from_millis(15)); + 5 + }; - let res = execute_compute_heavy_future(future).await.unwrap(); + let res = execute(closure, ChanceOfBlocking::High).await.unwrap(); assert_eq!(res, 5); assert_eq!( global_strategy(), - CurrentStrategy::Initialized(ExecutorStrategy::SpawnBlocking) + GlobalStrategy::Initialized(ExecutorStrategy::SpawnBlocking) ); } @@ -39,42 +38,24 @@ mod test { let mut futures = Vec::new(); - for _ in 0..5 { - let future = async move { std::thread::sleep(Duration::from_millis(15)) }; - futures.push(execute_compute_heavy_future(future)); + let closure = || { + std::thread::sleep(Duration::from_millis(15)); + 5 + }; + + // note that we also are racing against concurrency from other tests in this module + for _ in 0..6 { + let future = async move { execute(closure, ChanceOfBlocking::High).await }; + futures.push(future); } + tokio::time::sleep(Duration::from_millis(5)).await; - join_all(futures).await; + let res = join_all(futures).await; + println!("{res:#?}"); let elapsed_millis = start.elapsed().as_millis(); assert!(elapsed_millis < 60, "futures did not run concurrently"); assert!(elapsed_millis > 20, "futures exceeded max concurrency"); } - - #[tokio::test] - async fn spawn_blocking_strategy_cancellable() { - initialize(); - - let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let future = async move { - { - tokio::time::sleep(Duration::from_secs(60)).await; - let _ = tx.send(()); - } - }; - - select! { - _ = tokio::time::sleep(Duration::from_millis(4)) => { }, - _ = execute_compute_heavy_future(future) => {} - } - - tokio::time::sleep(Duration::from_millis(8)).await; - - // future should have been cancelled when spawn compute heavy future was dropped - assert_eq!( - rx.try_recv(), - Err(tokio::sync::oneshot::error::TryRecvError::Closed) - ); - } }