Skip to content

Commit

Permalink
Add a builder for arbiter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sytten committed May 9, 2024
1 parent 95ca8f0 commit e285ce6
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 27 deletions.
2 changes: 2 additions & 0 deletions actix-rt/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Add `actix_rt::ArbiterBuilder` to allow user to configure the thread spawned for the arbiter.

## 2.9.0

- Add `actix_rt::System::runtime()` method to retrieve the underlying `actix_rt::Runtime` runtime.
Expand Down
140 changes: 114 additions & 26 deletions actix-rt/src/arbiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
task::{Context, Poll},
thread,
thread, usize,
};

use futures_core::ready;
Expand Down Expand Up @@ -80,42 +80,79 @@ impl ArbiterHandle {
}
}

/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
/// and functions.
///
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)]
pub struct Arbiter {
tx: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: thread::JoinHandle<()>,
/// A builder for configuring and spawning a new [Arbiter] thread.
pub struct ArbiterBuilder {
name_factory: Option<Box<dyn Fn(usize, usize) -> String + 'static>>,
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
runtime_factory: Option<Box<dyn Fn() -> tokio::runtime::Runtime + Send + 'static>>,
}

impl Arbiter {
/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
impl ArbiterBuilder {
/// Create a new [ArbiterBuilder].
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
Self::with_tokio_rt(|| {
crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
})
pub fn new() -> Self {
Self {
name_factory: None,
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
runtime_factory: None,
}
}

/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
/// Specify a factory function for generating the name of the Arbiter thread.
///
/// Defaults to `actix-rt|system:<system_id>|arbiter:<arb_id>`
///
/// # Example
///
/// ```no_run
/// let _ = actix_rt::System::new();
/// actix_rt::ArbiterBuilder::new()
/// .name(|system_id, arb_id| {
/// format!("some-prefix|system:{}|arbiter:{}", system_id, arb_id)
/// })
/// .build();
/// ```
pub fn name<N>(mut self, name_factory: N) -> Self
where
N: Fn(usize, usize) -> String + 'static,
{
self.name_factory = Some(Box::new(name_factory));
self
}

/// Specify a factory function for generating the [Tokio Runtime](tokio-runtime) used by the Arbiter.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
pub fn runtime<R>(mut self, runtime_factory: R) -> Self
where
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
R: Fn() -> tokio::runtime::Runtime + Send + 'static,
{
self.runtime_factory = Some(Box::new(runtime_factory));
self
}

/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn build(self) -> Arbiter {
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);

let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let name = self.name_factory.unwrap_or_else(|| {
Box::new(|system_id, arb_id| {
format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id)
})
})(system_id, arb_id);
let runtime_factory = self.runtime_factory.unwrap_or_else(|| {
Box::new(|| {
crate::runtime::default_tokio_runtime()
.expect("Cannot create new Arbiter's Runtime.")
})
});
let (tx, rx) = mpsc::unbounded_channel();

let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
Expand Down Expand Up @@ -160,13 +197,16 @@ impl Arbiter {
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
pub fn build(self) -> Arbiter {
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);

let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let name = self.name_factory.unwrap_or_else(|| {
Box::new(|system_id, arb_id| {
format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id)
})
})(system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel();

let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
Expand Down Expand Up @@ -204,6 +244,54 @@ impl Arbiter {

Arbiter { tx, thread_handle }
}
}

/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
/// and functions.
///
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)]
pub struct Arbiter {
tx: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: thread::JoinHandle<()>,
}

impl Arbiter {
/// Create an [ArbiterBuilder] to configure and spawn a new Arbiter thread.
pub fn builder() -> ArbiterBuilder {
ArbiterBuilder::new()
}

/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
ArbiterBuilder::new().build()
}

/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
{
ArbiterBuilder::new().runtime(runtime_factory).build()
}

/// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
ArbiterBuilder::new().build()
}

/// Sets up an Arbiter runner in a new System using the environment's local set.
pub(crate) fn in_new_system() -> ArbiterHandle {
Expand Down
2 changes: 1 addition & 1 deletion actix-rt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub use tokio::pin;
use tokio::task::JoinHandle;

pub use self::{
arbiter::{Arbiter, ArbiterHandle},
arbiter::{Arbiter, ArbiterBuilder, ArbiterHandle},
runtime::Runtime,
system::{System, SystemRunner},
};
Expand Down
22 changes: 22 additions & 0 deletions actix-rt/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,28 @@ fn new_arbiter_with_tokio() {
assert!(!counter.load(Ordering::SeqCst));
}

#[test]
fn arbiter_builder_name() {
let _ = System::new();

let arbiter = Arbiter::builder()
.name(|_, _| "test_thread".to_string())
.build();

let (tx, rx) = channel::<String>();

Check failure on line 312 in actix-rt/tests/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] reported by reviewdog 🐶 error[E0425]: cannot find function `channel` in this scope --> actix-rt/tests/tests.rs:312:20 | 312 | let (tx, rx) = channel::<String>(); | ^^^^^^^ not found in this scope | help: consider importing one of these items | 1 + use std::sync::mpsc::channel; | 1 + use tokio::sync::broadcast::channel; | 1 + use tokio::sync::mpsc::channel; | 1 + use tokio::sync::oneshot::channel; | and 1 other candidate Raw Output: actix-rt/tests/tests.rs:312:20:e:error[E0425]: cannot find function `channel` in this scope --> actix-rt/tests/tests.rs:312:20 | 312 | let (tx, rx) = channel::<String>(); | ^^^^^^^ not found in this scope | help: consider importing one of these items | 1 + use std::sync::mpsc::channel; | 1 + use tokio::sync::broadcast::channel; | 1 + use tokio::sync::mpsc::channel; | 1 + use tokio::sync::oneshot::channel; | and 1 other candidate __END__

Check failure on line 312 in actix-rt/tests/tests.rs

View workflow job for this annotation

GitHub Actions / Linux / msrv

cannot find function `channel` in this scope

Check failure on line 312 in actix-rt/tests/tests.rs

View workflow job for this annotation

GitHub Actions / Linux / stable

cannot find function `channel` in this scope
arbiter.spawn(async move {
let current_thread = std::thread::current();
let thread_name = current_thread.name().unwrap().to_string();
tx.send(thread_name).unwrap();
});

let name = rx.recv().unwrap();
assert_eq!(name, "test_thread");

arbiter.stop();
arbiter.join().unwrap();
}

#[test]
#[should_panic]
fn no_system_current_panic() {
Expand Down

0 comments on commit e285ce6

Please sign in to comment.