Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Switch to accepting sync input closures, adjust defaults, add docs #9

Merged
merged 10 commits into from
Jan 2, 2025
26 changes: 13 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
[package]
name = "compute-heavy-future-executor"
name = "vacation"
version = "0.1.0"
edition = "2021"
license = "MIT"
repository = "https://github.com/jlizen/compute-heavy-future-executor"
homepage = "https://github.com/jlizen/compute-heavy-future-executor"
repository = "https://github.com/jlizen/vacation"
homepage = "https://github.com/jlizen/vacation"
rust-version = "1.70"
exclude = ["/.github", "/examples", "/scripts"]
exclude = ["/.github", "/Exampless", "/scripts"]
jlizen marked this conversation as resolved.
Show resolved Hide resolved
readme = "README.md"
description = "Additional executor patterns for handling compute-bounded, blocking futures."
categories = ["asynchronous"]
description = "Give your (runtime) aworkers a break!"
jlizen marked this conversation as resolved.
Show resolved Hide resolved
categories = ["asynchronous", "executor"]

[features]
tokio = ["tokio/rt"]
tokio_block_in_place = ["tokio", "tokio/rt-multi-thread"]
secondary_tokio_runtime = ["tokio", "tokio/rt-multi-thread", "dep:libc", "dep:num_cpus"]
default = ["tokio"]
tokio = ["tokio/rt",]

[dependencies]
libc = { version = "0.2.168", optional = true }
log = "0.4.22"
num_cpus = { version = "1.0", optional = true }
tokio = { version = "1.0", features = ["macros", "sync"] }
num_cpus = "1.0"
tokio = { version = "1.0", features = ["sync"] }

[dev-dependencies]
tokio = { version = "1.0", features = ["full"]}
tokio = { version = "1", features = ["full"]}
futures-util = "0.3.31"
rayon = "1"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]


[lints.rust]
Expand Down
98 changes: 96 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,96 @@
# compute-heavy-future-executor
Experimental crate that adds additional executor patterns to use with frequently blocking futures.
# vacation
Vacation: give your (runtime) aworkers a break!

## Overview

Today, when library authors are write async APIs, they don't have a good way to handle long-running sync segments.
jlizen marked this conversation as resolved.
Show resolved Hide resolved

An application author can use selective handling such as `tokio::task::spawn_blocking()` along with concurrency control to delegate sync segments to blocking threads. Or, they might send the work to a `rayon` threadpool.

But, library authors generally don't have this flexibility. As, they generally want to be agnostic across runtime. Or, even if they are `tokio`-specific, they generally don't want to call `tokio::task::spawn_blocking()` as it is
suboptimal without extra configuration (concurrency control) as well as highly opinionated to send the work across threads.

This library solves this problem by providing libray authors a static, globally scoped strategy that they can delegate blocking sync work to without drawing any conclusions about handling.

And then, the applications using the library can tune handling based on their preferred approach.

## Usage - Library Authors
For library authors, it's as simple as adding a dependency enabling `vacation` (perhaps behind a feature flag).

```
[dependencies]
vacation = { version = "0.1", default-features = false }
```

And then wrap any sync work by passing it as a closure to a global `execute_sync()` call:

```
use vacation::execute_sync;

fn sync_work(input: String)-> u8 {
std::thread::sleep(std::time::Duration::from_secs(5));
println!("{input}");
5
}
pub async fn a_future_that_has_blocking_sync_work() -> u8 {
// relies on caller-specified strategy for translating execute_sync into a future that won't
// block the current worker thread
execute_sync(move || { sync_work("foo".to_string()) }).await.unwrap()
}

```

## Usage - Application owners
Application authors will need to add this library as a a direct dependency in order to customize the execution strategy.
By default, the strategy is just a non-op.

### Simple example

```
[dependencies]
// enables `tokio` feature by default => spawn_blocking strategy
vacation = { version = "0.1" }
```

And then call the `initialize_tokio()` helper that uses some sensible defaults:
```
use vacation::initialize_tokio;

#[tokio::main]
async fn main() {
initialize_tokio().unwrap();
}
```

### Rayon example
Or, you can add an alternate strategy, for instance a custom closure using Rayon.

```
[dependencies]
vacation = { version = "0.1", default-features = false }
// used for example with custom executor
rayon = "1"
```

```
use std::sync::OnceLock;
use rayon::ThreadPool;

use vacation::{
global_sync_strategy_builder, CustomExecutorSyncClosure,
};

static THREADPOOL: OnceLock<ThreadPool> = OnceLock::new();

fn initialize_strategy() {
THREADPOOL.setrayon::ThreadPoolBuilder::default().build().unwrap());

let custom_closure: CustomExecutorSyncClosure =
Box::new(|f| Box::new(async move { Ok(THREADPOOL.get().unwrap().spawn(f)) }));

global_sync_strategy_builder()
// probably no need for max concurrency as rayon already is defaulting to a thread per core
// and using a task queue
.initialize_custom_executor(custom_closure).unwrap();
}
```
45 changes: 0 additions & 45 deletions src/block_in_place.rs

This file was deleted.

18 changes: 6 additions & 12 deletions src/concurrency_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,13 @@ impl ConcurrencyLimit {
/// Internally turns errors into a no-op (`None`) and outputs log lines.
pub(crate) async fn acquire_permit(&self) -> Option<OwnedSemaphorePermit> {
match self.semaphore.clone() {
Some(semaphore) => {
match semaphore
.acquire_owned()
.await
.map_err(|err| Error::Semaphore(err))
{
Ok(permit) => Some(permit),
Err(err) => {
log::error!("failed to acquire permit: {err}");
None
}
Some(semaphore) => match semaphore.acquire_owned().await.map_err(Error::Semaphore) {
Ok(permit) => Some(permit),
Err(err) => {
log::error!("failed to acquire permit: {err}");
None
}
}
},
None => None,
}
}
Expand Down
55 changes: 0 additions & 55 deletions src/custom_executor.rs

This file was deleted.

26 changes: 16 additions & 10 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,30 @@ use core::fmt;

use crate::ExecutorStrategy;

/// An error from the custom executor
#[non_exhaustive]
#[derive(Debug)]
pub enum Error {
/// Executor has already had a global strategy configured.
AlreadyInitialized(ExecutorStrategy),
InvalidConfig(InvalidConfig),
/// Issue listening on the custom executor response channel.
RecvError(tokio::sync::oneshot::error::RecvError),
/// Error enforcing concurrency
Semaphore(tokio::sync::AcquireError),
/// Dynamic error from the custom executor closure
BoxError(Box<dyn std::error::Error + Send + Sync>),
#[cfg(feature = "tokio")]
/// Background spawn blocking task panicked
JoinError(tokio::task::JoinError),
}

#[derive(Debug)]
pub struct InvalidConfig {
pub field: &'static str,
pub received: String,
pub expected: &'static str,
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::AlreadyInitialized(strategy) => write!(
f,
"global strategy is already initialzed with strategy: {strategy:#?}"
"global strategy is already initialized with strategy: {strategy:#?}"
),
Error::InvalidConfig(err) => write!(f, "invalid config: {err:#?}"),
Error::BoxError(err) => write!(f, "custom executor error: {err}"),
Error::RecvError(err) => write!(f, "error in custom executor response channel: {err}"),
Error::Semaphore(err) => write!(
Expand All @@ -43,3 +40,12 @@ impl fmt::Display for Error {
}
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::BoxError(err) => Some(err.source()?),
_ => None,
}
}
}
17 changes: 9 additions & 8 deletions src/current_context.rs → src/executor/current_context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{concurrency_limit::ConcurrencyLimit, error::Error, ComputeHeavyFutureExecutor};
use crate::{concurrency_limit::ConcurrencyLimit, error::Error};

use super::ExecuteSync;

pub(crate) struct CurrentContextExecutor {
concurrency_limit: ConcurrencyLimit,
Expand All @@ -12,16 +14,15 @@ impl CurrentContextExecutor {
}
}

impl ComputeHeavyFutureExecutor for CurrentContextExecutor {
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
impl ExecuteSync for CurrentContextExecutor {
async fn execute_sync<F, R>(&self, f: F) -> Result<R, Error>
where
F: std::future::Future<Output = O> + Send + 'static,
O: Send + 'static,
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let _permit = self.concurrency_limit.acquire_permit().await;

Ok(fut.await)

// implicit permit drop
Ok(f())
// permit implicitly drops
}
}
Loading