Skip to content

Commit

Permalink
rename to vacation, default to no-op in all cases, add enum to `execu…
Browse files Browse the repository at this point in the history
…te_sync()` to specify likeliness to block, add helper for constructing good default tokio strategy
  • Loading branch information
jlizen committed Dec 30, 2024
1 parent e859f38 commit 494a96c
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 160 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[package]
name = "compute-heavy-future-executor"
name = "vacation"
version = "0.1.0"
edition = "2021"
license = "MIT"
repository = "https://github.com/jlizen/compute-heavy-future-executor"
homepage = "https://github.com/jlizen/compute-heavy-future-executor"
repository = "https://github.com/jlizen/vacation"
homepage = "https://github.com/jlizen/vacation"
rust-version = "1.70"
exclude = ["/.github", "/Exampless", "/scripts"]
readme = "README.md"
description = "Executor patterns for handling compute-bounded calls inside async contexts."
categories = ["asynchronous"]
description = "Give your (runtime) aworkers a break!"
categories = ["asynchronous", "executor"]

[features]
default = ["tokio"]
Expand Down
61 changes: 34 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# compute-heavy-future-executor
Experimental crate that adds additional executor patterns to use with frequently blocking futures.
# vacation
Vacation: give your (runtime) aworkers a break!

## Overview

Today, when library authors are write async APIs, they don't have a good way to handle long-running sync segments.

Expand All @@ -8,30 +10,22 @@ An application author can use selective handling such as `tokio::task::spawn_blo
But, library authors generally don't have this flexibility. As, they generally want to be agnostic across runtime. Or, even if they are `tokio`-specific, they generally don't want to call `tokio::task::spawn_blocking()` as it is
suboptimal without extra configuration (concurrency control) as well as highly opinionated to send the work across threads.

This library aims to solve this problem by providing libray authors a static, globally scoped strategy that they can delegate blocking sync work to without drawing any conclusions about handling.
This library solves this problem by providing libray authors a static, globally scoped strategy that they can delegate blocking sync work to without drawing any conclusions about handling.

And then, the applications using the library can either rely on the default strategy that this package provides, or tune them with their preferred approach.
And then, the applications using the library can tune handling based on their preferred approach.

## Usage - Library Authors
For library authors, it's as simple as adding a dependency enabling `compute-heavy-future-executor` (perhaps behind a feature flag).
For library authors, it's as simple as adding a dependency enabling `vacation` (perhaps behind a feature flag).

The below will default to 'current context' execution (ie non-op) unless the caller enables the tokio feature.
```
[dependencies]
compute-heavy-future-executor = { version = "0.1", default-features = false }
```

Meanwhile to be slightly more opinionated, the below will enable usage of `spawn_blocking` with concurrency control
by default unless the caller opts out:
```
[dependencies]
compute-heavy-future-executor = { version = "0.1" }
vacation = { version = "0.1", default-features = false }
```

And then wrap any sync work by passing it as a closure to a global `execute_sync()` call:

```
use compute_heavy_future_executor::execute_sync;
use vacation::execute_sync;
fn sync_work(input: String)-> u8 {
std::thread::sleep(std::time::Duration::from_secs(5));
Expand All @@ -47,35 +41,49 @@ pub async fn a_future_that_has_blocking_sync_work() -> u8 {
```

## Usage - Application owners
Application authors can benefit from this crate with no application code changes, if you are using
a library that is itself using this crate.
Application authors will need to add this library as a a direct dependency in order to customize the execution strategy.
By default, the strategy is just a non-op.

### Simple example

```
[dependencies]
// enables `tokio` feature by default => spawn_blocking strategy
vacation = { version = "0.1" }
```

And then call the `initialize_tokio()` helper that uses some sensible defaults:
```
use vacation::initialize_tokio;
#[tokio::main]
async fn main() {
initialize_tokio().unwrap();
}
```

If you want to customize the strategy beyond defaults, they can add
`compute-heavy-future-executor` to their dependencies:
### Rayon example
Or, you can add an alternate strategy, for instance a custom closure using Rayon.

```
[dependencies]
// enables tokio and therefore spawn_blocking strategy by default
compute-heavy-future-executor = { version = "0.1" }
vacation = { version = "0.1", default-features = false }
// used for example with custom executor
rayon = "1"
```

And then configure your global strategy as desired. For instance, see below for usage of rayon
instead of `spawn_blocking()`.

```
use std::sync::OnceLock;
use rayon::ThreadPool;
use compute_heavy_future_executor::{
use vacation::{
global_sync_strategy_builder, CustomExecutorSyncClosure,
};
static THREADPOOL: OnceLock<ThreadPool> = OnceLock::new();
fn initialize_strategy() {
THREADPOOL.set(|| rayon::ThreadPoolBuilder::default().build().unwrap());
THREADPOOL.setrayon::ThreadPoolBuilder::default().build().unwrap());
let custom_closure: CustomExecutorSyncClosure =
Box::new(|f| Box::new(async move { Ok(THREADPOOL.get().unwrap().spawn(f)) }));
Expand All @@ -85,5 +93,4 @@ fn initialize_strategy() {
// and using a task queue
.initialize_custom_executor(custom_closure).unwrap();
}
```
68 changes: 38 additions & 30 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::OnceLock;
use current_context::CurrentContextExecutor;
use custom_executor::{CustomExecutor, CustomExecutorSyncClosure};

use crate::{Error, ExecutorStrategy, GlobalStrategy};
use crate::{global_sync_strategy_builder, Error, ExecutorStrategy, GlobalStrategy};

pub(crate) trait ExecuteSync {
/// Accepts a sync function and processes it to completion.
Expand Down Expand Up @@ -41,7 +41,7 @@ fn set_sync_strategy(strategy: SyncExecutor) -> Result<(), Error> {
/// # Examples
///
/// ```
/// use compute_heavy_future_executor::{
/// use vacation::{
/// global_sync_strategy,
/// global_sync_strategy_builder,
/// GlobalStrategy,
Expand Down Expand Up @@ -97,22 +97,10 @@ pub(crate) enum SyncExecutor {
impl Default for &SyncExecutor {
fn default() -> Self {
DEFAULT_COMPUTE_HEAVY_SYNC_EXECUTOR_STRATEGY.get_or_init(|| {
let core_count = num_cpus::get();

#[cfg(feature = "tokio")]
{
log::info!("Defaulting to SpawnBlocking strategy for compute-heavy future executor \
with max concurrency of {core_count} until a strategy is initialized");

SyncExecutor::SpawnBlocking(spawn_blocking::SpawnBlockingExecutor::new(Some(core_count)))
}

#[cfg(not(feature = "tokio"))]
{
log::warn!("Defaulting to CurrentContext (non-op) strategy for compute-heavy future executor \
with max concurrency of {core_count} until a strategy is initialized.");
SyncExecutor::CurrentContext(CurrentContextExecutor::new(Some(core_count)))
}
log::warn!(
"Defaulting to CurrentContext (non-op) strategy for compute-heavy future executor"
);
SyncExecutor::CurrentContext(CurrentContextExecutor::new(None))
})
}
}
Expand Down Expand Up @@ -143,13 +131,35 @@ impl From<&SyncExecutor> for ExecutorStrategy {
}
}

/// Initialize a set of sensible defaults for a tokio runtime:
///
/// - SpawnBlocking strategy
/// - Max concurrency equal to the cpu core count.
///
/// # Error
/// Returns an error if the global strategy is already initialized.
/// It can only be initialized once.
/// # Examples
///
/// ```
/// use vacation::initialize_tokio;
///
/// # fn run() {
/// initialize_tokio().unwrap();
/// # }
pub fn initialize_tokio() -> Result<(), Error> {
global_sync_strategy_builder()
.max_concurrency(num_cpus::get())
.initialize_spawn_blocking()
}

/// A builder to replace the default sync executor strategy
/// with a caller-provided strategy.
///
/// # Examples
///
/// ```
/// use compute_heavy_future_executor::global_sync_strategy_builder;
/// use vacation::global_sync_strategy_builder;
///
/// # fn run() {
/// global_sync_strategy_builder()
Expand All @@ -170,15 +180,15 @@ impl SyncExecutorBuilder {
/// If this number is exceeded, the executor will wait to execute the
/// input closure until a permit can be acquired.
///
/// ## Default
/// No maximum concurrency when strategies are manually built.
/// A good value tends to be the number of cpu cores on your machine.
///
/// For default strategies, the default concurrency limit will be the number of cpu cores.
/// ## Default
/// No maximum concurrency.
///
/// # Examples
///
/// ```
/// use compute_heavy_future_executor::global_sync_strategy_builder;
/// use vacation::global_sync_strategy_builder;
///
/// # fn run() {
/// global_sync_strategy_builder()
Expand All @@ -197,7 +207,7 @@ impl SyncExecutorBuilder {
/// This is effectively a non-op wrapper that adds no special handling for the sync future
/// besides optional concurrency control.
///
/// This is the default if the `tokio` feature is disabled (with concurrency equal to cpu core count).
/// This is the default strategy if nothing is initialized, with no max concurrency.
///
/// # Error
/// Returns an error if the global strategy is already initialized.
Expand All @@ -206,7 +216,7 @@ impl SyncExecutorBuilder {
/// # Examples
///
/// ```
/// use compute_heavy_future_executor::global_sync_strategy_builder;
/// use vacation::global_sync_strategy_builder;
///
/// # async fn run() {
/// global_sync_strategy_builder().initialize_current_context().unwrap();
Expand All @@ -223,16 +233,14 @@ impl SyncExecutorBuilder {
///
/// Requires `tokio` feature.
///
/// This is the default strategy if `tokio` feature is enabled, with a concurrency limit equal to the number of cpu cores.
///
/// # Error
/// Returns an error if the global strategy is already initialized.
/// It can only be initialized once.
///
/// # Examples
///
/// ```
/// use compute_heavy_future_executor::global_sync_strategy_builder;
/// use vacation::global_sync_strategy_builder;
///
/// # async fn run() {
/// // this will include no concurrency limit when explicitly initialized
Expand Down Expand Up @@ -266,8 +274,8 @@ impl SyncExecutorBuilder {
/// # Examples
///
/// ```
/// use compute_heavy_future_executor::global_sync_strategy_builder;
/// use compute_heavy_future_executor::CustomExecutorSyncClosure;
/// use vacation::global_sync_strategy_builder;
/// use vacation::CustomExecutorSyncClosure;
///
/// # async fn run() {
/// // caution: this will panic if used outside of tokio multithreaded runtime
Expand Down
Loading

0 comments on commit 494a96c

Please sign in to comment.