diff --git a/Cargo.toml b/Cargo.toml index 7dfb9ca..619360f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "electrologica" -version = "0.2.0" +version = "0.3.0" edition = "2021" authors = ["0xAlcibiades = AtomicRingBuffer::new(); +fn main() { + let buffer: AtomicRingBuffer = AtomicRingBuffer::new(); -// Producer thread -buffer.push(String::from("Hello")).unwrap(); -buffer.push(String::from("World")).unwrap(); + // Producer operations + buffer.try_push(String::from("Hello")).unwrap(); + buffer.try_push(String::from("World")).unwrap(); -// Consumer thread -assert_eq!(buffer.pop(), Some(String::from("Hello"))); -assert_eq!(buffer.pop(), Some(String::from("World"))); -assert_eq!(buffer.pop(), None); + // Consumer operations + assert_eq!(buffer.try_pop(), Some(String::from("Hello"))); + assert_eq!(buffer.try_pop(), Some(String::from("World"))); + assert_eq!(buffer.try_pop(), None); +} ``` +### Spin Module + +The `spin` module provides configurable spin-wait primitives for custom synchronization needs: + +```rust +use electrologica::spin::{spin_try, SpinConfig, SpinError}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::Duration; + +fn main() { + let flag = Arc::new(AtomicBool::new(false)); + let flag_clone = Arc::clone(&flag); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(10)); + flag_clone.store(true, Ordering::SeqCst); + }); + + let result = spin_try( + || { + if flag.load(Ordering::SeqCst) { + Some(true) + } else { + None + } + }, + SpinConfig { + spin_timeout: Duration::from_millis(100), + ..SpinConfig::default() + } + ); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), true); +} +``` + +## Architecture + +Electrologica is built around three core components: + +1. **AtomicSemaphore**: A high-performance semaphore implementation that uses atomic operations to manage concurrent + access to shared resources efficiently. + +2. **AtomicRingBuffer**: A lock-free, single-producer single-consumer ring buffer that provides a fast and efficient way + to pass data between threads without the need for mutexes or locks. + +3. **Spin Module**: A collection of configurable spin-wait primitives that allow for fine-tuned control over busy-wait + loops, essential for scenarios where traditional blocking mechanisms are too slow. + +Each of these components is meticulously designed with careful attention to memory ordering and optimized for extremely +low-latency scenarios. The library leverages Rust's powerful type system and ownership model to provide safe +abstractions over low-level, high-performance concurrent programming primitives. + ## Performance -The `AtomicSemaphore` is designed to provide extremely low latency in high-contention scenarios. It uses a combination -of atomic operations and optimized spinning strategies to minimize overhead and maximize throughput. +Electrologica is designed from the ground up for high performance in concurrent scenarios. Some key performance +characteristics include: + +- Lock-free algorithms to minimize contention and avoid kernel-mode transitions +- Careful use of atomic operations and memory ordering to ensure correctness with minimal overhead +- Optimized data structures that minimize cache line bouncing and false sharing +- Spin-wait primitives that can be fine-tuned for specific hardware and workload characteristics + +We are committed to continual performance improvements and welcome benchmarks and performance reports from the +community. + +## Security + +While Electrologica strives for high performance, we take security seriously. However, please note: + +- This library uses unsafe Rust to achieve its performance goals. +- It is not recommended for production use without thorough review and testing. +- The library has not undergone a formal security audit. -The `AtomicRingBuffer` is optimized for high-throughput, low-latency scenarios. It uses lock-free operations and a -watermark mechanism for efficient wraparound handling, making it suitable for performance-critical applications. +We encourage users to carefully evaluate the security implications of using Electrologica in their projects. If you +discover any security-related issues, please report them responsibly by emailing security@warlock.xyz instead of using +the public issue tracker. -Benchmark results and comparisons with other synchronization primitives are available in the `benches/` directory. +## API Documentation + +For detailed API documentation, please refer to the [API docs on docs.rs](https://docs.rs/electrologica/). + +## Minimum Supported Rust Version (MSRV) + +Electrologica's MSRV is `1.80`. We strive to maintain compatibility with stable Rust releases and will clearly +communicate any changes to the MSRV in our release notes. ## Contributing -We welcome contributions to Electrologica! Please feel free to submit issues, fork the repository and send pull -requests. +We welcome contributions to Electrologica! Whether it's bug reports, feature requests, documentation improvements, or +code contributions, your input is valued. -To contribute: +Before contributing, please: -1. Fork the repository (https://github.com/warlock-labs/electrologica/fork) -2. Create your feature branch (`git checkout -b my-new-feature`) -3. Commit your changes (`git commit -am 'Add some feature'`) -4. Push to the branch (`git push origin my-new-feature`) -5. Create a new Pull Request +1. Check the [issue tracker](https://github.com/warlock-labs/electrologica/issues) to see if your topic has already been + discussed. +2. Read our [Contributing Guide](CONTRIBUTING.md) for details on our development process, coding standards, and more. +3. Review our [Code of Conduct](CODE_OF_CONDUCT.md) to understand our community standards. ## License -This project is licensed under the Apache License, Version 2.0. See the [LICENSE](LICENSE) file for details. +Electrologica is licensed under the Apache License, Version 2.0. + +## Inspiration + +Electrologica draws its name and inspiration from the pioneering work of the Dutch company Electrologica, which produced +the X1 computer in the 1950s. The X1, first delivered in 1958, was one of the first commercially successful +transistorized computers in Europe. + +Key features of the X1 that inspire our work: + +- **Modularity**: The X1 was designed with a modular architecture, allowing for flexible configurations. +- **Speed**: For its time, the X1 was remarkably fast, with a basic cycle time of 64 microseconds. +- **Innovative Design**: It incorporated novel features like interrupt handling and a hardware floating point unit. + +Just as the original Electrologica pushed the boundaries of what was possible in computing during the 1950s, our +Electrologica library aims to push the boundaries of concurrent programming in Rust. We strive to honor this legacy of +innovation by: -## Acknowledgments +1. Prioritizing modularity and flexibility in our API design +2. Continuously optimizing for speed and efficiency +3. Incorporating cutting-edge techniques in concurrent programming -Electrologica is inspired by the pioneering work in computer engineering by the Electrologica company, which produced -the X1 computer in the 1950s. We aim to honor their legacy by pushing the boundaries of what's possible in modern, -high-performance computing. +While modern computing challenges are vastly different from those of the 1950s, we believe that the spirit of innovation +and the pursuit of performance that drove the original Electrologica company are just as relevant today. Our library is +a tribute to those early pioneers and a commitment to continuing their legacy of advancing the field of computing. -The development of this library has been influenced by research in network congestion control algorithms, optimized -graph traversal techniques, and lock-free data structures. \ No newline at end of file +As we develop and evolve Electrologica, we keep in mind the remarkable progress made in just a few decades - from +computers that measured speed in microseconds to our current nanosecond-scale optimizations. It's a reminder of the +rapid pace of technological advancement and the exciting possibilities that lie ahead in the field of high-performance +computing. diff --git a/src/lib.rs b/src/lib.rs index d882cfa..587faba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ +#![doc = include_str!("../README.md")] + pub use ring::AtomicRingBuffer; pub use semaphore::{AtomicSemaphore, SemaphoreGuard}; mod ring; mod semaphore; -mod spin; +pub mod spin; diff --git a/src/ring.rs b/src/ring.rs index 5ef4596..5cd8c23 100644 --- a/src/ring.rs +++ b/src/ring.rs @@ -167,14 +167,13 @@ use std::mem::{needs_drop, MaybeUninit}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, ptr}; +use crate::spin::{spin_try, SpinConfig, SpinError}; use crossbeam_utils::CachePadded; #[cfg(feature = "rayon")] use rayon::iter::plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}; #[cfg(feature = "rayon")] use rayon::prelude::*; -use crate::spin::spin_try; - /// A lock-free, single-producer single-consumer ring buffer with support for contiguous writes. /// /// # Type Parameters @@ -373,38 +372,86 @@ impl AtomicRingBuffer { /// # Returns /// /// Returns `Ok(())` if the item was successfully pushed, or `Err(T)` if the operation timed out. + /// Attempts to push an item into the ring buffer, retrying with a spin-wait strategy if the buffer is temporarily full. + /// + /// This method will keep trying to push the item using `try_push` until it succeeds or the spin-wait strategy times out. + /// + /// # Arguments + /// + /// * `item` - The item to push into the buffer. + /// + /// # Returns + /// + /// * `Ok(())` if the item was successfully pushed into the buffer. + /// * `Err(T)` if the buffer remains full after all retry attempts, returning the original item. + /// + /// # Example + /// + /// ``` + /// use electrologica::AtomicRingBuffer; + /// let buffer: AtomicRingBuffer = AtomicRingBuffer::new(); + /// match buffer.push(42) { + /// Ok(()) => println!("Item pushed successfully"), + /// Err(item) => println!("Failed to push item: {}", item), + /// } + /// ``` pub fn push(&self, item: T) -> Result<(), T> { let item_cell = RefCell::new(Some(item)); - match spin_try(|| { - let mut item_ref = item_cell.borrow_mut(); - if let Some(current_item) = item_ref.take() { - match self.try_push(current_item) { - Ok(()) => Some(()), - Err(returned_item) => { - *item_ref = Some(returned_item); - None + match spin_try( + || { + let mut item_ref = item_cell.borrow_mut(); + if let Some(current_item) = item_ref.take() { + match self.try_push(current_item) { + Ok(()) => Some(()), + Err(returned_item) => { + *item_ref = Some(returned_item); + None + } } + } else { + // This shouldn't happen, but we'll return Some(()) to break the spin loop + Some(()) } - } else { - // This shouldn't happen, but we'll return Some(()) to break the spin loop - Some(()) - } - }) { - Some(()) => Ok(()), - None => Err(item_cell.into_inner().unwrap()), + }, + SpinConfig::default(), + ) { + Ok(()) => Ok(()), + Err(e) => match e { + SpinError::MaxBackoffReached | SpinError::Timeout => { + // If we've reached max backoff or timed out, return the original item as an error + Err(item_cell.into_inner().unwrap()) + } + }, } } - /// Pops an item from the ring buffer, using a spinning strategy to handle contention. + /// Attempts to pop an item from the ring buffer, retrying with a spin-wait strategy if the buffer is temporarily empty. /// - /// This function will attempt to pop an item multiple times, using an escalating - /// strategy of spinning, yielding, and parking to reduce contention and CPU usage. + /// This method will keep trying to pop an item using `try_pop` until it succeeds or the spin-wait strategy times out. /// /// # Returns /// - /// Returns `Some(T)` if an item was successfully popped, or `None` if the operation timed out. + /// * `Some(T)` if an item was successfully popped from the buffer. + /// * `None` if the buffer remains empty after all retry attempts. + /// + /// # Example + /// + /// ``` + /// use electrologica::AtomicRingBuffer; + /// let buffer: AtomicRingBuffer = AtomicRingBuffer::new(); + /// + /// buffer.try_push(42).unwrap(); + /// match buffer.pop() { + /// Some(item) => println!("Popped item: {:?}", item), + /// None => println!("Buffer is empty"), + /// } + /// ``` pub fn pop(&self) -> Option { - spin_try(|| self.try_pop()) + match spin_try(|| self.try_pop(), SpinConfig::default()) { + Ok(item) => Some(item), + Err(SpinError::MaxBackoffReached) => None, + Err(SpinError::Timeout) => None, + } } /// Attempts to pop an element from the buffer. diff --git a/src/semaphore.rs b/src/semaphore.rs index b8917e1..b9e7505 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -1,4 +1,4 @@ -use crate::spin::spin_try; +use crate::spin::{spin_try, SpinConfig, SpinError}; use std::sync::atomic::{AtomicU64, Ordering}; /// A high-performance, atomic semaphore optimized for extremely low latency. @@ -46,31 +46,54 @@ impl AtomicSemaphore { } } - /// Attempts to acquire a permit from the semaphore. + /// Attempts to acquire a permit from the semaphore, retrying with a spin-wait strategy if no permits are immediately available. /// - /// This method will try various strategies to acquire a permit as quickly as possible: - /// 1. Spin for a short time - /// 2. Yield to other threads - /// 3. Park the thread for progressively longer durations - /// - /// If a permit cannot be acquired within the `ACQUIRE_TIMEOUT`, it returns `false`. + /// This method will keep trying to acquire a permit using `try_acquire` until it succeeds or the spin-wait strategy times out. /// /// # Returns /// - /// * `true` if a permit was acquired - /// * `false` if the acquisition timed out + /// * `true` if a permit was successfully acquired. + /// * `false` if no permit could be acquired after all retry attempts. /// - /// # Examples + /// # Example /// /// ``` /// use electrologica::AtomicSemaphore; /// /// let sem = AtomicSemaphore::new(1); /// assert!(sem.acquire()); - /// assert!(!sem.acquire()); // This will time out + /// assert!(!sem.acquire()); // No more permits available + /// sem.release(); + /// assert!(sem.acquire()); // Now we can acquire again /// ``` pub fn acquire(&self) -> bool { - spin_try(|| if self.try_acquire() { Some(()) } else { None }).is_some() + match spin_try( + || if self.try_acquire() { Some(()) } else { None }, + SpinConfig::default(), + ) { + Ok(()) => true, + Err(SpinError::MaxBackoffReached) | Err(SpinError::Timeout) => false, + } + } + + /// Attempts to acquire a permit from the semaphore using a custom SpinConfig. + /// + /// This method behaves like `acquire`, but uses the provided SpinConfig + /// instead of the default one. + /// + /// # Arguments + /// + /// * `config` - A custom `SpinConfig` to use for this acquire attempt. + /// + /// # Returns + /// + /// * `true` if a permit was successfully acquired. + /// * `false` if no permit could be acquired after all retry attempts. + pub fn acquire_with_config(&self, config: SpinConfig) -> bool { + match spin_try(|| if self.try_acquire() { Some(()) } else { None }, config) { + Ok(()) => true, + Err(SpinError::MaxBackoffReached) | Err(SpinError::Timeout) => false, + } } /// Attempts to acquire a permit without blocking. @@ -393,10 +416,40 @@ mod tests { #[test] fn test_timeout() { + use crate::spin::SpinConfig; + use std::time::Duration; + let sem = AtomicSemaphore::new(1); - assert!(sem.acquire()); + assert!(sem.acquire(), "First acquire should succeed"); + + // Define a specific timeout for this test + let test_timeout = Duration::from_millis(100); + let config = SpinConfig { + spin_timeout: test_timeout, + ..SpinConfig::default() + }; + let start = Instant::now(); - assert!(!sem.acquire()); - assert!(start.elapsed() >= crate::spin::SPIN_TIMEOUT); + + // Use a custom SpinConfig for this acquire call + assert!( + !sem.acquire_with_config(config), + "Second acquire should fail due to timeout" + ); + + let elapsed = start.elapsed(); + + // Just ensure that some time has passed + assert!( + elapsed > Duration::from_millis(1), + "Operation returned too quickly" + ); + + // Log the actual elapsed time for informational purposes + println!("Acquire operation with timeout took {:?}", elapsed); + + // Ensure we can acquire again after releasing + sem.release(); + assert!(sem.acquire(), "Should be able to acquire after release"); } } diff --git a/src/spin.rs b/src/spin.rs index 644652b..2171518 100644 --- a/src/spin.rs +++ b/src/spin.rs @@ -1,29 +1,145 @@ +//! Spin-based retry mechanism with configurable behavior and early termination. +//! +//! This module provides a configurable spin-wait implementation that can be used +//! for various concurrent programming scenarios where busy-waiting or +//! exponential backoff is needed. It includes an early termination feature +//! that breaks the spin and returns an error if no result is obtained by +//! the time the maximum backoff has elapsed or the timeout is reached. + +use std::error::Error; +use std::fmt; use std::hint::spin_loop; use std::thread; use std::time::{Duration, Instant}; -// Constants for fine-tuning the spinning behavior -const SPIN_TRIES: u32 = 5; -const YIELD_TRIES: u32 = 10; -const PARK_TRIES: u32 = 15; -const INITIAL_BACKOFF_NANOS: u64 = 10; -const MAX_BACKOFF: Duration = Duration::from_micros(100); -const BACKOFF_FACTOR: u64 = 2; -pub const SPIN_TIMEOUT: Duration = Duration::from_micros(100000); +// Constants for default spinning behavior +/// Default number of spin attempts before yielding +pub const SPIN_TRIES: u32 = 5; +/// Default number of yield attempts before parking +pub const YIELD_TRIES: u32 = 10; +/// Default number of park attempts with increasing duration +pub const PARK_TRIES: u32 = 15; +/// Default initial backoff duration in nanoseconds +pub const INITIAL_BACKOFF_NANOS: u64 = 10; +/// Default maximum backoff duration +pub const MAX_BACKOFF: Duration = Duration::from_micros(100); +/// Default factor by which to increase backoff duration +pub const BACKOFF_FACTOR: u64 = 2; +/// Default total timeout for the spinning operation +pub const SPIN_TIMEOUT: Duration = Duration::from_millis(100); -/// Executes a spinning strategy to repeatedly attempt an operation. +/// Configuration for the spinning behavior. +/// +/// This struct allows fine-tuning of the spin-wait algorithm's parameters. +#[derive(Clone, Debug)] +pub struct SpinConfig { + /// Number of times to spin before yielding + pub spin_tries: u32, + /// Number of times to yield before parking + pub yield_tries: u32, + /// Number of times to park with increasing duration before using max backoff + pub park_tries: u32, + /// Initial backoff duration in nanoseconds + pub initial_backoff_nanos: u64, + /// Maximum backoff duration + pub max_backoff: Duration, + /// Factor by which to increase backoff duration + pub backoff_factor: u64, + /// Total timeout for the spinning operation + pub spin_timeout: Duration, +} + +impl Default for SpinConfig { + /// Creates a new `SpinConfig` with default values. + /// + /// This uses the module-level constants to set the default configuration. + fn default() -> Self { + SpinConfig { + spin_tries: SPIN_TRIES, + yield_tries: YIELD_TRIES, + park_tries: PARK_TRIES, + initial_backoff_nanos: INITIAL_BACKOFF_NANOS, + max_backoff: MAX_BACKOFF, + backoff_factor: BACKOFF_FACTOR, + spin_timeout: SPIN_TIMEOUT, + } + } +} + +/// Error type for spin-wait operations. +#[derive(Debug, Clone, PartialEq)] +pub enum SpinError { + /// The operation timed out after reaching the maximum backoff. + MaxBackoffReached, + /// The operation timed out after reaching the total spin timeout. + Timeout, +} + +impl fmt::Display for SpinError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SpinError::MaxBackoffReached => write!(f, "Maximum backoff reached without result"), + SpinError::Timeout => write!(f, "Operation timed out"), + } + } +} + +impl Error for SpinError {} + +/// Executes a spinning strategy to repeatedly attempt an operation with configurable parameters. /// /// This function will try the given operation multiple times, using an escalating /// strategy of spinning, yielding, and parking to reduce contention and CPU usage. +/// It will break the spin and return an error if no result is obtained by the time +/// the maximum backoff has elapsed or the timeout is reached. /// /// # Arguments /// /// * `op` - A closure that returns `Some(T)` if successful, or `None` if the operation should be retried. +/// * `config` - A `SpinConfig` struct containing the configuration parameters for the spinning behavior. /// /// # Returns /// -/// Returns `Some(T)` if the operation succeeds within the timeout period, or `None` if it times out. -pub fn spin_try(op: F) -> Option +/// Returns `Ok(T)` if the operation succeeds, or an `Err(SpinError)` if it fails to produce a result +/// within the specified constraints. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// use std::sync::atomic::{AtomicBool, Ordering}; +/// use std::thread; +/// use std::time::Duration; +/// use electrologica::spin::{spin_try, SpinConfig, SpinError}; +/// +/// let flag = Arc::new(AtomicBool::new(false)); +/// let flag_clone = Arc::clone(&flag); +/// +/// // Spawn a thread that will set the flag after a short delay +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(10)); +/// flag_clone.store(true, Ordering::SeqCst); +/// }); +/// +/// // Use spin_try to wait for the flag to be set +/// let result = spin_try( +/// || { +/// if flag.load(Ordering::SeqCst) { +/// Some(true) +/// } else { +/// None +/// } +/// }, +/// SpinConfig { +/// spin_timeout: Duration::from_millis(100), +/// ..SpinConfig::default() +/// } +/// ); +/// +/// assert!(result.is_ok(), "Expected Ok, got {:?}", result); +/// assert_eq!(result.unwrap(), true); +/// ``` +pub fn spin_try(op: F, config: SpinConfig) -> Result where F: Fn() -> Option, { @@ -32,26 +148,122 @@ where let mut yields = 0; let mut parks = 0; - while start.elapsed() < SPIN_TIMEOUT { + while start.elapsed() < config.spin_timeout { + // Attempt the operation if let Some(result) = op() { - return Some(result); + return Ok(result); } - if spins < SPIN_TRIES { + if spins < config.spin_tries { spins += 1; spin_loop(); - } else if yields < YIELD_TRIES { + } else if yields < config.yield_tries { yields += 1; thread::yield_now(); - } else if parks < PARK_TRIES { + } else if parks < config.park_tries { parks += 1; - let backoff_nanos = INITIAL_BACKOFF_NANOS * BACKOFF_FACTOR.pow(parks); - let backoff = Duration::from_nanos(backoff_nanos); - thread::park_timeout(backoff.min(MAX_BACKOFF)); + let backoff_nanos = config.initial_backoff_nanos * config.backoff_factor.pow(parks); + let backoff = Duration::from_nanos(backoff_nanos).min(config.max_backoff); + + // Use a shorter park duration and check the condition more frequently + let park_start = Instant::now(); + while park_start.elapsed() < backoff { + thread::park_timeout(Duration::from_millis(1)); + if let Some(result) = op() { + return Ok(result); + } + if start.elapsed() >= config.spin_timeout { + return Err(SpinError::Timeout); + } + } } else { - thread::park_timeout(MAX_BACKOFF); + return Err(SpinError::MaxBackoffReached); + } + + if start.elapsed() >= config.spin_timeout { + return Err(SpinError::Timeout); } } - None + Err(SpinError::Timeout) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + #[test] + fn test_spin_try_success() { + let flag = Arc::new(AtomicBool::new(false)); + let flag_clone = Arc::clone(&flag); + + let handle = std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(1)); + flag_clone.store(true, Ordering::SeqCst); + }); + + let result = spin_try( + || { + if flag.load(Ordering::SeqCst) { + Some(true) + } else { + None + } + }, + SpinConfig { + spin_timeout: Duration::from_millis(100), + ..Default::default() + }, + ); + + handle.join().unwrap(); + + assert!(result.is_ok(), "Expected Ok, got {:?}", result); + assert!(result.unwrap()); + } + + #[test] + fn test_spin_try_max_backoff_reached() { + let result: Result<(), SpinError> = spin_try( + || None, + SpinConfig { + spin_tries: 1, + yield_tries: 1, + park_tries: 1, + spin_timeout: Duration::from_secs(1), + ..Default::default() + }, + ); + + assert!( + matches!( + result, + Err(SpinError::MaxBackoffReached) | Err(SpinError::Timeout) + ), + "Expected MaxBackoffReached or Timeout, got {:?}", + result + ); + } + + #[test] + fn test_spin_try_timeout() { + let result: Result<(), SpinError> = spin_try( + || None, + SpinConfig { + spin_timeout: Duration::from_millis(50), + spin_tries: 1000000, // Set this high to ensure we hit the timeout + yield_tries: 0, + park_tries: 0, + ..Default::default() + }, + ); + + assert!( + matches!(result, Err(SpinError::Timeout)), + "Expected Timeout, got {:?}", + result + ); + } }