Skip to content

Commit

Permalink
move to sync fn input in executor, ditch everything but spawn_blockin…
Browse files Browse the repository at this point in the history
…g/custom/current_context, tweak defaults to enable concurrency limiting + use spawn_blocking for tokio feature, add more docs
  • Loading branch information
jlizen committed Dec 26, 2024
1 parent be60053 commit 9ac60bd
Show file tree
Hide file tree
Showing 26 changed files with 807 additions and 1,265 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@ license = "MIT"
repository = "https://github.com/jlizen/compute-heavy-future-executor"
homepage = "https://github.com/jlizen/compute-heavy-future-executor"
rust-version = "1.70"
exclude = ["/.github", "/examples", "/scripts"]
exclude = ["/.github", "/Exampless", "/scripts"]
readme = "README.md"
description = "Additional executor patterns for handling compute-bounded, blocking futures."
description = "Executor patterns for handling compute-bounded calls inside async contexts."
categories = ["asynchronous"]

[features]
tokio = ["tokio/rt"]
tokio_block_in_place = ["tokio", "tokio/rt-multi-thread"]
secondary_tokio_runtime = ["tokio", "tokio/rt-multi-thread", "dep:libc", "dep:num_cpus"]
default = ["tokio"]
tokio = ["tokio/rt",]

[dependencies]
libc = { version = "0.2.168", optional = true }
log = "0.4.22"
num_cpus = { version = "1.0", optional = true }
tokio = { version = "1.0", features = ["macros", "sync"] }
num_cpus = "1.0"
tokio = { version = "1.0", features = ["sync"] }

[dev-dependencies]
tokio = { version = "1.0", features = ["full"]}
tokio = { version = "1", features = ["full"]}
futures-util = "0.3.31"
rayon = "1"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]


[lints.rust]
Expand Down
86 changes: 86 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,88 @@
# compute-heavy-future-executor
Experimental crate that adds additional executor patterns to use with frequently blocking futures.

Today, when library authors are write async APIs, they don't have a good way to handle long-running sync segments.

An application author can use selective handling such as `tokio::task::spawn_blocking()` along with concurrency control to delegate sync segments to blocking threads. Or, they might send the work to a `rayon` threadpool.

But, library authors generally don't have this flexibility. As, they generally want to be agnostic across runtime. Or, even if they are `tokio`-specific, they generally don't want to call `tokio::task::spawn_blocking()` as it is
suboptimal without extra configuration (concurrency control) as well as highly opinionated to send the work across threads.

This library aims to solve this problem by providing libray authors a static, globally scoped strategy that they can delegate blocking sync work to without drawing any conclusions about handling.

And then, the applications using the library can either rely on the default strategy that this package provides, or tune them with their preferred approach.

## Usage - Library Authors
For library authors, it's as simple as adding a dependency enabling `compute-heavy-future-executor` (perhaps behind a feature flag).

The below will default to 'current context' execution (ie non-op) unless the caller enables the tokio feature.
```
[dependencies]
compute-heavy-future-executor = { version = "0.1", default-features = false }
```

Meanwhile to be slightly more opinionated, the below will enable usage of `spawn_blocking` with concurrency control
by default unless the caller opts out:
```
[dependencies]
compute-heavy-future-executor = { version = "0.1" }
```

And then wrap any sync work by passing it as a closure to a global `execute_sync()` call:

```
use compute_heavy_future_executor::execute_sync;
fn sync_work(num: u8)-> u8 {
std::thread::sleep(std::time::Duration::from_secs(5));
num
}
pub async fn a_future_that_has_blocking_sync_work() -> u8 {
// relies on caller-specified strategy for translating execute_sync into a future that won't
// block the current worker thread
execute_sync(|| { sync_work(5) }).await.unwrap()
}
```

## Usage - Application owners
Application authors can benefit from this crate with no application code changes, if you are using
a library that is itself using this crate.

If you want to customize the strategy beyond defaults, they can add
`compute-heavy-future-executor` to their dependencies:

```
[dependencies]
// enables tokio and therefore spawn_blocking strategy by default
compute-heavy-future-executor = { version = "0.1" }
// used for example with custom executor
rayon = "1"
```

And then configure your global strategy as desired. For instance, see below for usage of rayon
instead of `spawn_blocking()`.

```
use std::sync::OnceLock;
use rayon::ThreadPool;
use compute_heavy_future_executor::{
global_sync_strategy_builder, CustomExecutorSyncClosure,
};
static THREADPOOL: OnceLock<ThreadPool> = OnceLock::new();
fn initialize_strategy() {
THREADPOOL.set(|| rayon::ThreadPoolBuilder::default().build().unwrap());
let custom_closure: CustomExecutorSyncClosure =
Box::new(|f| Box::new(async move { Ok(THREADPOOL.get().unwrap().spawn(f)) }));
global_sync_strategy_builder()
// probably no need for max concurrency as rayon already is defaulting to a thread per core
// and using a task queue
.initialize_custom_executor(custom_closure).unwrap();
}
```
45 changes: 0 additions & 45 deletions src/block_in_place.rs

This file was deleted.

18 changes: 6 additions & 12 deletions src/concurrency_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,13 @@ impl ConcurrencyLimit {
/// Internally turns errors into a no-op (`None`) and outputs log lines.
pub(crate) async fn acquire_permit(&self) -> Option<OwnedSemaphorePermit> {
match self.semaphore.clone() {
Some(semaphore) => {
match semaphore
.acquire_owned()
.await
.map_err(|err| Error::Semaphore(err))
{
Ok(permit) => Some(permit),
Err(err) => {
log::error!("failed to acquire permit: {err}");
None
}
Some(semaphore) => match semaphore.acquire_owned().await.map_err(Error::Semaphore) {
Ok(permit) => Some(permit),
Err(err) => {
log::error!("failed to acquire permit: {err}");
None
}
}
},
None => None,
}
}
Expand Down
55 changes: 0 additions & 55 deletions src/custom_executor.rs

This file was deleted.

17 changes: 7 additions & 10 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,30 @@ use core::fmt;

use crate::ExecutorStrategy;

/// An error from the custom executor
#[non_exhaustive]
#[derive(Debug)]
pub enum Error {
/// Executor has already had a global strategy configured.
AlreadyInitialized(ExecutorStrategy),
InvalidConfig(InvalidConfig),
/// Issue listening on the custom executor response channel.
RecvError(tokio::sync::oneshot::error::RecvError),
/// Error enforcing concurrency
Semaphore(tokio::sync::AcquireError),
/// Dynamic error from the custom executor closure
BoxError(Box<dyn std::error::Error + Send + Sync>),
#[cfg(feature = "tokio")]
/// Background spawn blocking task panicked
JoinError(tokio::task::JoinError),
}

#[derive(Debug)]
pub struct InvalidConfig {
pub field: &'static str,
pub received: String,
pub expected: &'static str,
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::AlreadyInitialized(strategy) => write!(
f,
"global strategy is already initialzed with strategy: {strategy:#?}"
"global strategy is already initialized with strategy: {strategy:#?}"
),
Error::InvalidConfig(err) => write!(f, "invalid config: {err:#?}"),
Error::BoxError(err) => write!(f, "custom executor error: {err}"),
Error::RecvError(err) => write!(f, "error in custom executor response channel: {err}"),
Error::Semaphore(err) => write!(
Expand Down
17 changes: 9 additions & 8 deletions src/current_context.rs → src/executor/current_context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{concurrency_limit::ConcurrencyLimit, error::Error, ComputeHeavyFutureExecutor};
use crate::{concurrency_limit::ConcurrencyLimit, error::Error};

use super::ExecuteSync;

pub(crate) struct CurrentContextExecutor {
concurrency_limit: ConcurrencyLimit,
Expand All @@ -12,16 +14,15 @@ impl CurrentContextExecutor {
}
}

impl ComputeHeavyFutureExecutor for CurrentContextExecutor {
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
impl ExecuteSync for CurrentContextExecutor {
async fn execute_sync<F, R>(&self, f: F) -> Result<R, Error>
where
F: std::future::Future<Output = O> + Send + 'static,
O: Send + 'static,
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let _permit = self.concurrency_limit.acquire_permit().await;

Ok(fut.await)

// implicit permit drop
Ok(f())
// permit implicitly drops
}
}
62 changes: 62 additions & 0 deletions src/executor/custom_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::future::Future;

use crate::{concurrency_limit::ConcurrencyLimit, error::Error};

use super::ExecuteSync;

/// A closure that accepts an arbitrary sync function and returns a future that executes it.
/// The Custom Executor will implicitly wrap the input function in a oneshot
/// channel to erase its input/output type.
pub type CustomExecutorSyncClosure = Box<
dyn Fn(
Box<dyn FnOnce() + Send + 'static>,
) -> Box<
dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
+ Send
+ 'static,
> + Send
+ Sync,
>;

pub(crate) struct CustomExecutor {
closure: CustomExecutorSyncClosure,
concurrency_limit: ConcurrencyLimit,
}

impl CustomExecutor {
pub(crate) fn new(closure: CustomExecutorSyncClosure, max_concurrency: Option<usize>) -> Self {
Self {
closure,
concurrency_limit: ConcurrencyLimit::new(max_concurrency),
}
}
}

impl ExecuteSync for CustomExecutor {
// the compiler correctly is pointing out that the custom closure isn't guaranteed to call f.
// but, we leave that to the implementer to guarantee since we are limited by working with static signatures
#[allow(unused_variables)]
async fn execute_sync<F, R>(&self, f: F) -> Result<R, Error>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let _permit = self.concurrency_limit.acquire_permit().await;

let (tx, rx) = tokio::sync::oneshot::channel();

let wrapped_input_closure = Box::new(|| {
let res = f();
if tx.send(res).is_err() {
log::trace!("custom sync executor foreground dropped before it could receive the result of the sync closure");
}
});

Box::into_pin((self.closure)(wrapped_input_closure))
.await
.map_err(Error::BoxError)?;

rx.await.map_err(Error::RecvError)
// permit implicitly drops
}
}
Loading

0 comments on commit 9ac60bd

Please sign in to comment.