diff --git a/benchmarks/queue/src/main.rs b/benchmarks/queue/src/main.rs index 5c4e99759..4d19e9e35 100644 --- a/benchmarks/queue/src/main.rs +++ b/benchmarks/queue/src/main.rs @@ -22,7 +22,7 @@ const ITERATIONS: u64 = 10000000; trait PushPop: Send + Sync { fn push(&self, value: usize); - fn pop(&self) -> Option; + fn pop(&self) -> bool; } impl PushPop for Queue { @@ -30,28 +30,28 @@ impl PushPop for Queue { unsafe { self.push(&value) }; } - fn pop(&self) -> Option { - unsafe { self.pop() } + fn pop(&self) -> bool { + unsafe { self.pop().is_some() } } } impl PushPop for FixedSizeIndexQueue { fn push(&self, value: usize) { - unsafe { self.push(value) }; + unsafe { self.push(value as u64) }; } - fn pop(&self) -> Option { - unsafe { self.pop() } + fn pop(&self) -> bool { + unsafe { self.pop().is_some() } } } impl PushPop for FixedSizeSafelyOverflowingIndexQueue { fn push(&self, value: usize) { - unsafe { self.push(value) }; + unsafe { self.push(value as u64) }; } - fn pop(&self) -> Option { - unsafe { self.pop() } + fn pop(&self) -> bool { + unsafe { self.pop().is_some() } } } @@ -78,7 +78,7 @@ fn perform_benchmark( for _ in 0..args.iterations { queue_a2b.push(0); - while queue_b2a.pop().is_none() {} + while !queue_b2a.pop() {} } }); @@ -90,7 +90,7 @@ fn perform_benchmark( start_benchmark_barrier.wait(); for _ in 0..args.iterations { - while queue_a2b.pop().is_none() {} + while !queue_a2b.pop() {} queue_b2a.push(0); } diff --git a/iceoryx2-bb/container/src/queue.rs b/iceoryx2-bb/container/src/queue.rs index 29efa6757..d7f734c9d 100644 --- a/iceoryx2-bb/container/src/queue.rs +++ b/iceoryx2-bb/container/src/queue.rs @@ -333,8 +333,8 @@ pub mod details { debug_assert!( self.is_initialized .load(std::sync::atomic::Ordering::Relaxed), - "From: {}, Undefined behavior - the object was not initialized with 'init' before.", - source + "From: MetaQueue<{}>::{}, Undefined behavior - the object was not initialized with 'init' before.", + std::any::type_name::(), source ); } @@ -363,7 +363,7 @@ pub mod details { } pub(crate) unsafe fn peek_mut_impl(&mut self) -> Option<&mut T> { - self.verify_init(&format!("Queue<{}>::pop()", std::any::type_name::())); + self.verify_init("peek_mut()"); if self.is_empty() { return None; @@ -375,7 +375,7 @@ pub mod details { } pub(crate) unsafe fn peek_impl(&self) -> Option<&T> { - self.verify_init(&format!("Queue<{}>::pop()", std::any::type_name::())); + self.verify_init("peek()"); if self.is_empty() { return None; @@ -387,7 +387,7 @@ pub mod details { } pub(crate) unsafe fn pop_impl(&mut self) -> Option { - self.verify_init(&format!("Queue<{}>::pop()", std::any::type_name::())); + self.verify_init("pop()"); if self.is_empty() { return None; @@ -403,7 +403,7 @@ pub mod details { } pub(crate) unsafe fn push_impl(&mut self, value: T) -> bool { - self.verify_init(&format!("Queue<{}>::push()", std::any::type_name::())); + self.verify_init("push()"); if self.len == self.capacity { return false; @@ -414,10 +414,7 @@ pub mod details { } pub(crate) unsafe fn push_with_overflow_impl(&mut self, value: T) -> Option { - self.verify_init(&format!( - "Queue<{}>::push_with_overflow()", - std::any::type_name::() - )); + self.verify_init("push_with_overflow()"); let overridden_value = if self.len() == self.capacity() { self.pop_impl() diff --git a/iceoryx2-bb/container/src/slotmap.rs b/iceoryx2-bb/container/src/slotmap.rs index 016760910..0d2a7cd59 100644 --- a/iceoryx2-bb/container/src/slotmap.rs +++ b/iceoryx2-bb/container/src/slotmap.rs @@ -91,6 +91,8 @@ const INVALID: usize = usize::MAX; #[doc(hidden)] pub mod details { + use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicBool; + use super::*; /// The iterator of a [`SlotMap`], [`RelocatableSlotMap`] or [`FixedSizeSlotMap`]. @@ -123,10 +125,21 @@ pub mod details { data: MetaVec, Ptr>, data_next_free_index: MetaQueue, idx_to_data_free_list_head: usize, + is_initialized: IoxAtomicBool, len: usize, } impl MetaSlotMap { + #[inline(always)] + fn verify_init(&self, source: &str) { + debug_assert!( + self.is_initialized + .load(std::sync::atomic::Ordering::Relaxed), + "From: MetaSlotMap<{}>::{}, Undefined behavior - the object was not initialized with 'init' before.", + std::any::type_name::(), source + ); + } + fn next_available_key_after(&self, start: SlotMapKey) -> Option<(SlotMapKey, &T)> { let idx_to_data = &self.idx_to_data; @@ -160,6 +173,7 @@ pub mod details { } pub(crate) unsafe fn iter_impl(&self) -> Iter { + self.verify_init("iter()"); Iter { slotmap: self, key: SlotMapKey(0), @@ -167,10 +181,12 @@ pub mod details { } pub(crate) unsafe fn contains_impl(&self, key: SlotMapKey) -> bool { + self.verify_init("contains()"); self.idx_to_data[key.0] != INVALID } pub(crate) unsafe fn get_impl(&self, key: SlotMapKey) -> Option<&T> { + self.verify_init("get()"); match self.idx_to_data[key.0] { INVALID => None, n => Some(self.data[n].as_ref().expect( @@ -180,6 +196,7 @@ pub mod details { } pub(crate) unsafe fn get_mut_impl(&mut self, key: SlotMapKey) -> Option<&mut T> { + self.verify_init("get_mut()"); match self.idx_to_data[key.0] { INVALID => None, n => Some(self.data[n].as_mut().expect( @@ -233,6 +250,7 @@ pub mod details { } pub(crate) unsafe fn insert_impl(&mut self, value: T) -> Option { + self.verify_init("insert()"); self.acquire_next_free_index().map(|key| { let key = SlotMapKey(key); self.store_value(key, value); @@ -241,11 +259,13 @@ pub mod details { } pub(crate) unsafe fn insert_at_impl(&mut self, key: SlotMapKey, value: T) -> bool { + self.verify_init("insert_at()"); self.claim_index(key.value()); self.store_value(key, value) } pub(crate) unsafe fn store_value(&mut self, key: SlotMapKey, value: T) -> bool { + self.verify_init("store()"); if key.0 > self.capacity_impl() { return false; } @@ -264,6 +284,7 @@ pub mod details { } pub(crate) unsafe fn remove_impl(&mut self, key: SlotMapKey) -> bool { + self.verify_init("remove()"); if key.0 > self.idx_to_data.len() { return false; } @@ -283,6 +304,7 @@ pub mod details { } pub(crate) unsafe fn next_free_key_impl(&self) -> Option { + self.verify_init("next_free_key()"); if self.idx_to_data_free_list_head == INVALID { return None; } @@ -316,6 +338,7 @@ pub mod details { idx_to_data_free_list: RelocatableVec::new_uninit(capacity), data: RelocatableVec::new_uninit(capacity), data_next_free_index: RelocatableQueue::new_uninit(capacity), + is_initialized: IoxAtomicBool::new(false), } } @@ -338,6 +361,8 @@ pub mod details { "{msg} since the underlying data_next_free_index queue could not be initialized."); self.initialize_data_structures(); + self.is_initialized + .store(true, std::sync::atomic::Ordering::Relaxed); Ok(()) } @@ -356,6 +381,7 @@ pub mod details { idx_to_data_free_list: MetaVec::new(capacity), data: MetaVec::new(capacity), data_next_free_index: MetaQueue::new(capacity), + is_initialized: IoxAtomicBool::new(true), }; unsafe { new_self.initialize_data_structures() }; new_self diff --git a/iceoryx2-bb/container/src/vec.rs b/iceoryx2-bb/container/src/vec.rs index 13e1c92f6..4b9394292 100644 --- a/iceoryx2-bb/container/src/vec.rs +++ b/iceoryx2-bb/container/src/vec.rs @@ -181,14 +181,14 @@ pub mod details { type Target = [T]; fn deref(&self) -> &Self::Target { - self.verify_init(&format!("Vec<{}>::push()", std::any::type_name::())); + self.verify_init("deref()"); unsafe { core::slice::from_raw_parts((*self.data_ptr.as_ptr()).as_ptr(), self.len) } } } impl DerefMut for MetaVec { fn deref_mut(&mut self) -> &mut Self::Target { - self.verify_init(&format!("Vec<{}>::push()", std::any::type_name::())); + self.verify_init("deref_mut()"); unsafe { core::slice::from_raw_parts_mut( (*self.data_ptr.as_mut_ptr()).as_mut_ptr(), @@ -222,8 +222,8 @@ pub mod details { debug_assert!( self.is_initialized .load(std::sync::atomic::Ordering::Relaxed), - "From: {}, Undefined behavior - the object was not initialized with 'init' before.", - source + "From: MetaVec<{}>::{}, Undefined behavior - the object was not initialized with 'init' before.", + core::any::type_name::(), source ); } @@ -252,7 +252,7 @@ pub mod details { return false; } - self.verify_init(&format!("Vec<{}>::push()", std::any::type_name::())); + self.verify_init("push()"); self.push_unchecked(value); true } @@ -303,7 +303,7 @@ pub mod details { return None; } - self.verify_init(&format!("Vec<{}>::pop()", std::any::type_name::())); + self.verify_init("pop()"); Some(self.pop_unchecked()) } diff --git a/iceoryx2-bb/lock-free/src/mpmc/bit_set.rs b/iceoryx2-bb/lock-free/src/mpmc/bit_set.rs index 80057a461..48628395e 100644 --- a/iceoryx2-bb/lock-free/src/mpmc/bit_set.rs +++ b/iceoryx2-bb/lock-free/src/mpmc/bit_set.rs @@ -181,7 +181,7 @@ pub mod details { fn verify_init(&self, source: &str) { debug_assert!( self.is_memory_initialized.load(Ordering::Relaxed), - "Undefined behavior when calling \"{}\" and the object is not initialized.", + "Undefined behavior when calling BitSet::{} and the object is not initialized.", source ); } @@ -242,7 +242,7 @@ pub mod details { /// If the bit was successfully set it returns true, if the bit was already set it /// returns false. pub fn set(&self, id: usize) -> bool { - self.verify_init("set"); + self.verify_init("set()"); debug_assert!( id < self.capacity, "This should never happen. Out of bounds access with index {}.", @@ -255,7 +255,7 @@ pub mod details { /// Resets the next set bit and returns the bit index. If no bit was set it returns /// [`None`]. pub fn reset_next(&self) -> Option { - self.verify_init("reset_next"); + self.verify_init("reset_next()"); let current_position = self.reset_position.load(Ordering::Relaxed); for pos in (current_position..self.capacity).chain(0..current_position) { @@ -271,7 +271,7 @@ pub mod details { /// Reset every set bit in the BitSet and call the provided callback for every bit that /// was set. This is the most efficient way to acquire all bits that were set. pub fn reset_all(&self, mut callback: F) { - self.verify_init("reset_all"); + self.verify_init("reset_all()"); for i in 0..self.array_capacity { let value = unsafe { (*self.data_ptr.as_ptr().add(i)).swap(0, Ordering::Relaxed) }; diff --git a/iceoryx2-bb/lock-free/src/mpmc/container.rs b/iceoryx2-bb/lock-free/src/mpmc/container.rs index 90cb8f253..2d1be49f9 100644 --- a/iceoryx2-bb/lock-free/src/mpmc/container.rs +++ b/iceoryx2-bb/lock-free/src/mpmc/container.rs @@ -171,7 +171,7 @@ pub struct Container { data_ptr: RelocatablePointer>>, capacity: usize, change_counter: IoxAtomicU64, - is_memory_initialized: IoxAtomicBool, + is_initialized: IoxAtomicBool, container_id: UniqueId, // must be the last member, since it is a relocatable container as well and then the offset // calculations would again fail @@ -194,7 +194,7 @@ impl RelocatableContainer for Container { capacity, change_counter: IoxAtomicU64::new(0), index_set: UniqueIndexSet::new_uninit(capacity), - is_memory_initialized: IoxAtomicBool::new(false), + is_initialized: IoxAtomicBool::new(false), } } @@ -202,7 +202,7 @@ impl RelocatableContainer for Container { &mut self, allocator: &Allocator, ) -> Result<(), AllocationError> { - if self.is_memory_initialized.load(Ordering::Relaxed) { + if self.is_initialized.load(Ordering::Relaxed) { fatal_panic!(from self, "Memory already initialized. Initializing it twice may lead to undefined behavior."); } let msg = "Unable to initialize"; @@ -230,7 +230,7 @@ impl RelocatableContainer for Container { .add(i) .write(UnsafeCell::new(MaybeUninit::uninit())); } - self.is_memory_initialized.store(true, Ordering::Relaxed); + self.is_initialized.store(true, Ordering::Relaxed); Ok(()) } @@ -242,9 +242,10 @@ impl RelocatableContainer for Container { impl Container { #[inline(always)] - fn verify_memory_initialization(&self, source: &str) { - debug_assert!(self.is_memory_initialized.load(Ordering::Relaxed), - "Undefined behavior when calling \"{}\" and the object is not initialized with 'initialize_memory'.", source); + fn verify_init(&self, source: &str) { + debug_assert!(self.is_initialized.load(Ordering::Relaxed), + "Undefined behavior when calling Container<{}>::{} and the object is not initialized with 'init'.", + std::any::type_name::(), source); } /// Returns the required memory size of the data segment of the [`Container`]. @@ -287,7 +288,7 @@ impl Container { /// element will leak. /// pub unsafe fn add(&self, value: T) -> Result { - self.verify_memory_initialization("add"); + self.verify_init("add()"); let index = self.index_set.acquire_raw_index()?; core::ptr::copy_nonoverlapping( @@ -323,7 +324,7 @@ impl Container { /// which was allocated afterwards /// pub unsafe fn remove(&self, handle: ContainerHandle, mode: ReleaseMode) -> ReleaseState { - self.verify_memory_initialization("remove_with_handle"); + self.verify_init("remove()"); debug_assert!( handle.container_id == self.container_id.value(), "The ContainerHandle used as handle was not created by this Container instance." @@ -346,7 +347,7 @@ impl Container { /// * Ensure that [`Container::init()`] was called before calling this method /// pub unsafe fn get_state(&self) -> ContainerState { - self.verify_memory_initialization("get_state"); + self.verify_init("get_state()"); let mut state = ContainerState::new(self.container_id.value(), self.capacity); self.update_state(&mut state); diff --git a/iceoryx2-bb/lock-free/src/mpmc/unique_index_set.rs b/iceoryx2-bb/lock-free/src/mpmc/unique_index_set.rs index f6d873db9..3d850d920 100644 --- a/iceoryx2-bb/lock-free/src/mpmc/unique_index_set.rs +++ b/iceoryx2-bb/lock-free/src/mpmc/unique_index_set.rs @@ -311,7 +311,7 @@ impl UniqueIndexSet { fn verify_init(&self, source: &str) { debug_assert!( self.is_memory_initialized.load(Ordering::Relaxed), - "Undefined behavior when calling \"{}\" and the object is not initialized.", + "Undefined behavior when calling UniqueIndexSet::{} and the object is not initialized.", source ); } @@ -329,7 +329,7 @@ impl UniqueIndexSet { /// * Ensure that [`UniqueIndexSet::init()`] was called once. /// pub unsafe fn acquire(&self) -> Result, UniqueIndexSetAcquireFailure> { - self.verify_init("acquire"); + self.verify_init("acquire()"); unsafe { self.acquire_raw_index() }.map(|v| UniqueIndex { value: v, index_set: self, @@ -388,7 +388,7 @@ impl UniqueIndexSet { /// * The index must be manually released with [`UniqueIndexSet::release_raw_index()`] /// otherwise the index is leaked. pub unsafe fn acquire_raw_index(&self) -> Result { - self.verify_init("acquire_raw_index"); + self.verify_init("acquire_raw_index()"); let mut old_value = self.head.load(Ordering::Acquire); let mut old = HeadDetails::from(old_value); @@ -442,7 +442,7 @@ impl UniqueIndexSet { /// * Shall be only used when the index was acquired with /// [`UniqueIndexSet::acquire_raw_index()`] pub unsafe fn release_raw_index(&self, index: u32, mode: ReleaseMode) -> ReleaseState { - self.verify_init("release_raw_index"); + self.verify_init("release_raw_index()"); fence(Ordering::Release); let mut release_state; diff --git a/iceoryx2-bb/lock-free/src/spsc/index_queue.rs b/iceoryx2-bb/lock-free/src/spsc/index_queue.rs index bf4bb501c..1403faf89 100644 --- a/iceoryx2-bb/lock-free/src/spsc/index_queue.rs +++ b/iceoryx2-bb/lock-free/src/spsc/index_queue.rs @@ -53,19 +53,19 @@ use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicUsize}; /// The [`Producer`] of the [`IndexQueue`]/[`FixedSizeIndexQueue`] which can add values to it /// via [`Producer::push()`]. -pub struct Producer<'a, PointerType: PointerTrait> + Debug> { +pub struct Producer<'a, PointerType: PointerTrait> + Debug> { queue: &'a details::IndexQueue, } -impl> + Debug> Producer<'_, PointerType> { +impl> + Debug> Producer<'_, PointerType> { /// Adds a new value to the [`IndexQueue`]/[`FixedSizeIndexQueue`]. If the queue is full /// it returns false, otherwise true. - pub fn push(&mut self, t: usize) -> bool { + pub fn push(&mut self, t: u64) -> bool { unsafe { self.queue.push(t) } } } -impl> + Debug> Drop for Producer<'_, PointerType> { +impl> + Debug> Drop for Producer<'_, PointerType> { fn drop(&mut self) { self.queue.has_producer.store(true, Ordering::Relaxed); } @@ -73,26 +73,26 @@ impl> + Debug> Drop for Producer<'_, /// The [`Consumer`] of the [`IndexQueue`]/[`FixedSizeIndexQueue`] which can acquire values from it /// via [`Consumer::pop()`]. -pub struct Consumer<'a, PointerType: PointerTrait> + Debug> { +pub struct Consumer<'a, PointerType: PointerTrait> + Debug> { queue: &'a details::IndexQueue, } -impl> + Debug> Consumer<'_, PointerType> { +impl> + Debug> Consumer<'_, PointerType> { /// Acquires a value from the [`IndexQueue`]/[`FixedSizeIndexQueue`]. If the queue is empty /// it returns [`None`] otherwise the value. - pub fn pop(&mut self) -> Option { + pub fn pop(&mut self) -> Option { unsafe { self.queue.pop() } } } -impl> + Debug> Drop for Consumer<'_, PointerType> { +impl> + Debug> Drop for Consumer<'_, PointerType> { fn drop(&mut self) { self.queue.has_consumer.store(true, Ordering::Relaxed); } } -pub type IndexQueue = details::IndexQueue>>; -pub type RelocatableIndexQueue = details::IndexQueue>>; +pub type IndexQueue = details::IndexQueue>>; +pub type RelocatableIndexQueue = details::IndexQueue>>; pub mod details { use std::fmt::Debug; @@ -105,7 +105,7 @@ pub mod details { /// queue is created. #[repr(C)] #[derive(Debug)] - pub struct IndexQueue>> { + pub struct IndexQueue>> { data_ptr: PointerType, capacity: usize, write_position: IoxAtomicUsize, @@ -115,12 +115,12 @@ pub mod details { is_memory_initialized: IoxAtomicBool, } - unsafe impl>> Sync for IndexQueue {} - unsafe impl>> Send for IndexQueue {} + unsafe impl>> Sync for IndexQueue {} + unsafe impl>> Send for IndexQueue {} - impl IndexQueue>> { + impl IndexQueue>> { pub fn new(capacity: usize) -> Self { - let mut data_ptr = OwningPointer::>::new_with_alloc(capacity); + let mut data_ptr = OwningPointer::>::new_with_alloc(capacity); for i in 0..capacity { unsafe { data_ptr.as_mut_ptr().add(i).write(UnsafeCell::new(0)) }; @@ -138,7 +138,7 @@ pub mod details { } } - impl RelocatableContainer for IndexQueue>> { + impl RelocatableContainer for IndexQueue>> { unsafe fn new_uninit(capacity: usize) -> Self { Self { data_ptr: RelocatablePointer::new_uninit(), @@ -180,12 +180,12 @@ pub mod details { } } - impl> + Debug> IndexQueue { + impl> + Debug> IndexQueue { #[inline(always)] fn verify_init(&self, source: &str) { debug_assert!( self.is_memory_initialized.load(Ordering::Relaxed), - "Undefined behavior when calling \"{}\" and the object is not initialized.", + "Undefined behavior when calling IndexQueue::{} and the object is not initialized.", source ); } @@ -196,7 +196,7 @@ pub mod details { unaligned_mem_size::>(capacity) } - unsafe fn at(&self, position: usize) -> *mut usize { + unsafe fn at(&self, position: usize) -> *mut u64 { (*self.data_ptr.as_ptr().add(position % self.capacity)).get() } @@ -219,7 +219,7 @@ pub mod details { /// } /// ``` pub fn acquire_producer(&self) -> Option> { - self.verify_init("acquire_producer"); + self.verify_init("acquire_producer()"); match self.has_producer.compare_exchange( true, false, @@ -251,7 +251,7 @@ pub mod details { /// } /// ``` pub fn acquire_consumer(&self) -> Option> { - self.verify_init("acquire_consumer"); + self.verify_init("acquire_consumer()"); match self.has_consumer.compare_exchange( true, false, @@ -269,7 +269,7 @@ pub mod details { /// /// * Ensure that no concurrent push occurres. Only one thread at a time is allowed to call /// push. - pub unsafe fn push(&self, value: usize) -> bool { + pub unsafe fn push(&self, value: u64) -> bool { let write_position = self.write_position.load(Ordering::Relaxed); let is_full = write_position == self.read_position.load(Ordering::Relaxed) + self.capacity; @@ -293,7 +293,7 @@ pub mod details { /// # Safety /// /// * Ensure that no concurrent pop occurres. Only one thread at a time is allowed to call pop. - pub unsafe fn pop(&self) -> Option { + pub unsafe fn pop(&self) -> Option { let read_position = self.read_position.load(Ordering::Relaxed); //////////////// // SYNC POINT @@ -395,12 +395,12 @@ impl FixedSizeIndexQueue { } /// See [`IndexQueue::acquire_producer()`] - pub fn acquire_producer(&self) -> Option>>> { + pub fn acquire_producer(&self) -> Option>>> { self.state.acquire_producer() } /// See [`IndexQueue::acquire_consumer()`] - pub fn acquire_consumer(&self) -> Option>>> { + pub fn acquire_consumer(&self) -> Option>>> { self.state.acquire_consumer() } @@ -430,7 +430,7 @@ impl FixedSizeIndexQueue { /// /// * Ensure that no concurrent push occurres. Only one thread at a time is allowed to call /// push. - pub unsafe fn push(&self, value: usize) -> bool { + pub unsafe fn push(&self, value: u64) -> bool { self.state.push(value) } @@ -439,7 +439,7 @@ impl FixedSizeIndexQueue { /// # Safety /// /// * Ensure that no concurrent pop occurres. Only one thread at a time is allowed to call pop. - pub unsafe fn pop(&self) -> Option { + pub unsafe fn pop(&self) -> Option { self.state.pop() } } diff --git a/iceoryx2-bb/lock-free/src/spsc/safely_overflowing_index_queue.rs b/iceoryx2-bb/lock-free/src/spsc/safely_overflowing_index_queue.rs index 976dd1afc..c52f05cda 100644 --- a/iceoryx2-bb/lock-free/src/spsc/safely_overflowing_index_queue.rs +++ b/iceoryx2-bb/lock-free/src/spsc/safely_overflowing_index_queue.rs @@ -55,19 +55,19 @@ use iceoryx2_bb_log::{fail, fatal_panic}; /// The [`Producer`] of the [`SafelyOverflowingIndexQueue`]/[`FixedSizeSafelyOverflowingIndexQueue`] /// which can add values to it via [`Producer::push()`]. #[derive(Debug)] -pub struct Producer<'a, PointerType: PointerTrait>> { +pub struct Producer<'a, PointerType: PointerTrait>> { queue: &'a details::SafelyOverflowingIndexQueue, } -impl> + Debug> Producer<'_, PointerType> { +impl> + Debug> Producer<'_, PointerType> { /// Adds a new value to the [`SafelyOverflowingIndexQueue`]/[`FixedSizeSafelyOverflowingIndexQueue`]. /// If the queue is full it returns false, otherwise true. - pub fn push(&mut self, t: usize) -> Option { + pub fn push(&mut self, t: u64) -> Option { unsafe { self.queue.push(t) } } } -impl>> Drop for Producer<'_, PointerType> { +impl>> Drop for Producer<'_, PointerType> { fn drop(&mut self) { self.queue.has_producer.store(true, Ordering::Relaxed); } @@ -76,19 +76,19 @@ impl>> Drop for Producer<'_, Pointer /// The [`Consumer`] of the [`SafelyOverflowingIndexQueue`]/[`FixedSizeSafelyOverflowingIndexQueue`] /// which can acquire values from it via [`Consumer::pop()`]. #[derive(Debug)] -pub struct Consumer<'a, PointerType: PointerTrait>> { +pub struct Consumer<'a, PointerType: PointerTrait>> { queue: &'a details::SafelyOverflowingIndexQueue, } -impl> + Debug> Consumer<'_, PointerType> { +impl> + Debug> Consumer<'_, PointerType> { /// Acquires a value from the [`SafelyOverflowingIndexQueue`]/[`FixedSizeSafelyOverflowingIndexQueue`]. /// If the queue is empty it returns [`None`] otherwise the value. - pub fn pop(&mut self) -> Option { + pub fn pop(&mut self) -> Option { unsafe { self.queue.pop() } } } -impl>> Drop for Consumer<'_, PointerType> { +impl>> Drop for Consumer<'_, PointerType> { fn drop(&mut self) { self.queue.has_consumer.store(true, Ordering::Relaxed); } @@ -96,11 +96,11 @@ impl>> Drop for Consumer<'_, Pointer /// Non-relocatable version of the safely overflowing index queue pub type SafelyOverflowingIndexQueue = - details::SafelyOverflowingIndexQueue>>; + details::SafelyOverflowingIndexQueue>>; /// Relocatable version of the safely overflowing index queue pub type RelocatableSafelyOverflowingIndexQueue = - details::SafelyOverflowingIndexQueue>>; + details::SafelyOverflowingIndexQueue>>; pub mod details { use iceoryx2_bb_elementary::math::unaligned_mem_size; @@ -112,7 +112,7 @@ pub mod details { /// and overridden with the newest element. #[derive(Debug)] #[repr(C)] - pub struct SafelyOverflowingIndexQueue>> { + pub struct SafelyOverflowingIndexQueue>> { data_ptr: PointerType, capacity: usize, write_position: IoxAtomicUsize, @@ -122,18 +122,18 @@ pub mod details { is_memory_initialized: IoxAtomicBool, } - unsafe impl>> Sync + unsafe impl>> Sync for SafelyOverflowingIndexQueue { } - unsafe impl>> Send + unsafe impl>> Send for SafelyOverflowingIndexQueue { } - impl SafelyOverflowingIndexQueue>> { + impl SafelyOverflowingIndexQueue>> { pub fn new(capacity: usize) -> Self { - let mut data_ptr = OwningPointer::>::new_with_alloc(capacity + 1); + let mut data_ptr = OwningPointer::>::new_with_alloc(capacity + 1); for i in 0..capacity + 1 { unsafe { data_ptr.as_mut_ptr().add(i).write(UnsafeCell::new(0)) }; @@ -151,7 +151,7 @@ pub mod details { } } - impl RelocatableContainer for SafelyOverflowingIndexQueue>> { + impl RelocatableContainer for SafelyOverflowingIndexQueue>> { unsafe fn new_uninit(capacity: usize) -> Self { Self { data_ptr: RelocatablePointer::new_uninit(), @@ -174,8 +174,8 @@ pub mod details { self.data_ptr.init(fail!(from self, when allocator .allocate( Layout::from_size_align_unchecked( - std::mem::size_of::() * (self.capacity + 1), - std::mem::align_of::())), + std::mem::size_of::() * (self.capacity + 1), + std::mem::align_of::())), "Failed to initialize since the allocation of the data memory failed.")); for i in 0..self.capacity + 1 { @@ -193,14 +193,12 @@ pub mod details { } } - impl> + Debug> - SafelyOverflowingIndexQueue - { + impl> + Debug> SafelyOverflowingIndexQueue { #[inline(always)] fn verify_init(&self, source: &str) { debug_assert!( self.is_memory_initialized.load(Ordering::Relaxed), - "Undefined behavior when calling \"{}\" and the object is not initialized.", + "Undefined behavior when calling SafelyOverflowingIndexQueue::{} and the object is not initialized.", source ); } @@ -208,10 +206,10 @@ pub mod details { /// Returns the amount of memory required to create a [`SafelyOverflowingIndexQueue`] with /// the provided capacity. pub const fn const_memory_size(capacity: usize) -> usize { - unaligned_mem_size::>(capacity + 1) + unaligned_mem_size::>(capacity + 1) } - fn at(&self, position: usize) -> *mut usize { + fn at(&self, position: usize) -> *mut u64 { unsafe { (*self.data_ptr.as_ptr().add(position % (self.capacity + 1))).get() } } /// Acquires the [`Producer`] of the [`SafelyOverflowingIndexQueue`]. This is threadsafe and @@ -235,7 +233,7 @@ pub mod details { /// } /// ``` pub fn acquire_producer(&self) -> Option> { - self.verify_init("acquire_producer"); + self.verify_init("acquire_producer()"); match self.has_producer.compare_exchange( true, false, @@ -267,7 +265,7 @@ pub mod details { /// } /// ``` pub fn acquire_consumer(&self) -> Option> { - self.verify_init("acquire_consumer"); + self.verify_init("acquire_consumer()"); match self.has_consumer.compare_exchange( true, false, @@ -288,7 +286,7 @@ pub mod details { /// to ensure that at most one thread access this method. /// * It has to be ensured that the memory is initialized with /// [`SafelyOverflowingIndexQueue::init()`]. - pub unsafe fn push(&self, value: usize) -> Option { + pub unsafe fn push(&self, value: u64) -> Option { //////////////// // SYNC POINT R //////////////// @@ -336,7 +334,7 @@ pub mod details { /// to ensure that at most one thread access this method. /// * It has to be ensured that the memory is initialized with /// [`SafelyOverflowingIndexQueue::init()`]. - pub unsafe fn pop(&self) -> Option { + pub unsafe fn pop(&self) -> Option { let mut read_position = self.read_position.load(Ordering::Relaxed); //////////////// // SYNC POINT W @@ -417,8 +415,8 @@ pub mod details { #[repr(C)] pub struct FixedSizeSafelyOverflowingIndexQueue { state: RelocatableSafelyOverflowingIndexQueue, - data: [UnsafeCell; CAPACITY], - data_plus_one: UnsafeCell, + data: [UnsafeCell; CAPACITY], + data_plus_one: UnsafeCell, } unsafe impl Sync for FixedSizeSafelyOverflowingIndexQueue {} @@ -451,12 +449,12 @@ impl FixedSizeSafelyOverflowingIndexQueue { } /// See [`SafelyOverflowingIndexQueue::acquire_producer()`] - pub fn acquire_producer(&self) -> Option>>> { + pub fn acquire_producer(&self) -> Option>>> { self.state.acquire_producer() } /// See [`SafelyOverflowingIndexQueue::acquire_consumer()`] - pub fn acquire_consumer(&self) -> Option>>> { + pub fn acquire_consumer(&self) -> Option>>> { self.state.acquire_consumer() } @@ -476,7 +474,7 @@ impl FixedSizeSafelyOverflowingIndexQueue { /// /// * It must be ensured that no other thread/process calls this method concurrently /// - pub unsafe fn push(&self, value: usize) -> Option { + pub unsafe fn push(&self, value: u64) -> Option { self.state.push(value) } @@ -486,7 +484,7 @@ impl FixedSizeSafelyOverflowingIndexQueue { /// /// * It must be ensured that no other thread/process calls this method concurrently /// - pub unsafe fn pop(&self) -> Option { + pub unsafe fn pop(&self) -> Option { self.state.pop() } diff --git a/iceoryx2-bb/lock-free/tests/spsc_index_queue_tests.rs b/iceoryx2-bb/lock-free/tests/spsc_index_queue_tests.rs index 85ecd6fa1..c53a99f70 100644 --- a/iceoryx2-bb/lock-free/tests/spsc_index_queue_tests.rs +++ b/iceoryx2-bb/lock-free/tests/spsc_index_queue_tests.rs @@ -30,7 +30,7 @@ fn spsc_index_queue_push_works_until_full() { for i in 0..CAPACITY { assert_that!(sut, len i); - assert_that!(sut_producer.push(i), eq true); + assert_that!(sut_producer.push(i as u64), eq true); } assert_that!(sut_producer.push(1234), eq false); @@ -46,7 +46,7 @@ fn spsc_index_queue_pop_works_until_empty() { let sut = FixedSizeIndexQueue::::new(); let mut sut_producer = sut.acquire_producer().unwrap(); for i in 0..CAPACITY { - assert_that!(sut_producer.push(i), eq true); + assert_that!(sut_producer.push(i as u64), eq true); } assert_that!(sut.capacity(), eq CAPACITY); @@ -59,7 +59,7 @@ fn spsc_index_queue_pop_works_until_empty() { assert_that!(sut, len CAPACITY - i); let result = sut_consumer.pop(); assert_that!(result, is_some); - assert_that!(result.unwrap(), eq i); + assert_that!(result.unwrap(), eq i as u64); } assert_that!(sut_consumer.pop(), is_none); @@ -77,10 +77,10 @@ fn spsc_index_queue_push_pop_alteration_works() { let mut sut_consumer = sut.acquire_consumer().unwrap(); for i in 0..CAPACITY - 1 { - assert_that!(sut_producer.push(i), eq true); - assert_that!(sut_producer.push(i), eq true); + assert_that!(sut_producer.push(i as u64), eq true); + assert_that!(sut_producer.push(i as u64), eq true); - assert_that!(sut_consumer.pop(), eq Some(i / 2)) + assert_that!(sut_consumer.pop(), eq Some(i as u64 / 2)) } } @@ -138,7 +138,7 @@ fn spsc_index_queue_push_pop_works_concurrently() { let mut counter: usize = 0; barrier.wait(); while counter <= LIMIT { - if sut_producer.push(counter) { + if sut_producer.push(counter as u64) { counter += 1; } } @@ -150,8 +150,8 @@ fn spsc_index_queue_push_pop_works_concurrently() { loop { match sut_consumer.pop() { Some(v) => { - guard.push(v); - if v == LIMIT { + guard.push(v as usize); + if v as usize == LIMIT { return; } } diff --git a/iceoryx2-bb/lock-free/tests/spsc_safely_overflowing_index_queue_tests.rs b/iceoryx2-bb/lock-free/tests/spsc_safely_overflowing_index_queue_tests.rs index f168e8c9f..e59d10f95 100644 --- a/iceoryx2-bb/lock-free/tests/spsc_safely_overflowing_index_queue_tests.rs +++ b/iceoryx2-bb/lock-free/tests/spsc_safely_overflowing_index_queue_tests.rs @@ -30,7 +30,7 @@ fn spsc_safely_overflowing_index_queue_push_works_until_full() { for i in 0..CAPACITY { assert_that!(sut, len i); - assert_that!(sut_producer.push(i), is_none); + assert_that!(sut_producer.push(i as u64), is_none); } let oldest = sut_producer.push(1234); assert_that!(oldest, is_some); @@ -48,7 +48,7 @@ fn spsc_safely_overflowing_index_queue_pop_works_until_empty() { let sut = FixedSizeSafelyOverflowingIndexQueue::::new(); let mut sut_producer = sut.acquire_producer().unwrap(); for i in 0..CAPACITY { - assert_that!(sut_producer.push(i), is_none); + assert_that!(sut_producer.push(i as u64), is_none); } assert_that!(sut.capacity(), eq CAPACITY); @@ -61,7 +61,7 @@ fn spsc_safely_overflowing_index_queue_pop_works_until_empty() { assert_that!(sut, len CAPACITY - i); let result = sut_consumer.pop(); assert_that!(result, is_some); - assert_that!(result.unwrap(), eq i); + assert_that!(result.unwrap(), eq i as u64); } assert_that!(sut_consumer.pop(), is_none); @@ -79,10 +79,10 @@ fn spsc_safely_overflowing_index_queue_push_pop_alteration_works() { let mut sut_consumer = sut.acquire_consumer().unwrap(); for i in 0..CAPACITY - 1 { - assert_that!(sut_producer.push(i), is_none); - assert_that!(sut_producer.push(i), is_none); + assert_that!(sut_producer.push(i as u64), is_none); + assert_that!(sut_producer.push(i as u64), is_none); - assert_that!(sut_consumer.pop(), eq Some(i / 2)) + assert_that!(sut_consumer.pop(), eq Some(i as u64 / 2)) } } @@ -120,16 +120,16 @@ fn spsc_safely_overflowing_index_queue_get_producer_after_release_succeeds() { #[test] fn spsc_safely_overflowing_index_queue_push_pop_works_concurrently() { - const LIMIT: usize = 1000000; + const LIMIT: u64 = 1000000; const CAPACITY: usize = 1024; let sut = FixedSizeSafelyOverflowingIndexQueue::::new(); let mut sut_producer = sut.acquire_producer().unwrap(); let mut sut_consumer = sut.acquire_consumer().unwrap(); - let producer_storage = Arc::new(Mutex::>::new(vec![])); + let producer_storage = Arc::new(Mutex::>::new(vec![])); let producer_storage_push = Arc::clone(&producer_storage); - let consumer_storage = Arc::new(Mutex::>::new(vec![])); + let consumer_storage = Arc::new(Mutex::>::new(vec![])); let consumer_storage_pop = Arc::clone(&consumer_storage); let handle = BarrierHandle::new(); @@ -141,7 +141,7 @@ fn spsc_safely_overflowing_index_queue_push_pop_works_concurrently() { thread::scope(|s| { s.spawn(|| { let mut guard = producer_storage_push.lock().unwrap(); - let mut counter: usize = 0; + let mut counter: u64 = 0; barrier.wait(); while counter <= LIMIT { @@ -171,15 +171,15 @@ fn spsc_safely_overflowing_index_queue_push_pop_works_concurrently() { }); let mut element_counter = vec![]; - element_counter.resize(LIMIT + 1, 0); + element_counter.resize(LIMIT as usize + 1, 0); let guard = producer_storage.lock().unwrap(); for i in &*guard { - element_counter[*i] += 1; + element_counter[*i as usize] += 1; } let guard = consumer_storage.lock().unwrap(); for i in &*guard { - element_counter[*i] += 1; + element_counter[*i as usize] += 1; } for element in element_counter { @@ -189,16 +189,16 @@ fn spsc_safely_overflowing_index_queue_push_pop_works_concurrently() { #[test] fn spsc_safely_overflowing_index_queue_push_pop_works_concurrently_with_full_queue() { - const LIMIT: usize = 1000000; + const LIMIT: u64 = 1000000; const CAPACITY: usize = 1024; let sut = FixedSizeSafelyOverflowingIndexQueue::::new(); let mut sut_producer = sut.acquire_producer().unwrap(); let mut sut_consumer = sut.acquire_consumer().unwrap(); - let producer_storage = Arc::new(Mutex::>::new(vec![])); + let producer_storage = Arc::new(Mutex::>::new(vec![])); let producer_storage_push = Arc::clone(&producer_storage); - let consumer_storage = Arc::new(Mutex::>::new(vec![])); + let consumer_storage = Arc::new(Mutex::>::new(vec![])); let consumer_storage_pop = Arc::clone(&consumer_storage); let handle = BarrierHandle::new(); @@ -208,13 +208,13 @@ fn spsc_safely_overflowing_index_queue_push_pop_works_concurrently_with_full_que .unwrap(); for i in 0..CAPACITY { - assert_that!(sut_producer.push(i), is_none); + assert_that!(sut_producer.push(i as u64), is_none); } thread::scope(|s| { s.spawn(|| { let mut guard = producer_storage_push.lock().unwrap(); - let mut counter: usize = 1024; + let mut counter: u64 = 1024; barrier.wait(); while counter <= LIMIT { @@ -244,15 +244,15 @@ fn spsc_safely_overflowing_index_queue_push_pop_works_concurrently_with_full_que }); let mut element_counter = vec![]; - element_counter.resize(LIMIT + 1, 0); + element_counter.resize(LIMIT as usize + 1, 0); let guard = producer_storage.lock().unwrap(); for i in &*guard { - element_counter[*i] += 1; + element_counter[*i as usize] += 1; } let guard = consumer_storage.lock().unwrap(); for i in &*guard { - element_counter[*i] += 1; + element_counter[*i as usize] += 1; } for element in element_counter { diff --git a/iceoryx2-cal/src/communication_channel/posix_shared_memory.rs b/iceoryx2-cal/src/communication_channel/posix_shared_memory.rs index bc5d9ef03..708de9408 100644 --- a/iceoryx2-cal/src/communication_channel/posix_shared_memory.rs +++ b/iceoryx2-cal/src/communication_channel/posix_shared_memory.rs @@ -59,7 +59,7 @@ impl NamedConceptMgmt for Channel { } } -impl CommunicationChannel for Channel { +impl CommunicationChannel for Channel { type Sender = Sender; type Receiver = Receiver; type Creator = Creator; @@ -153,7 +153,7 @@ impl NamedConceptBuilder for Creator { } } -impl CommunicationChannelCreator for Creator { +impl CommunicationChannelCreator for Creator { fn enable_safe_overflow(mut self) -> Self { self.enable_safe_overflow = true; self @@ -213,7 +213,7 @@ impl NamedConceptBuilder for Connector { } } -impl CommunicationChannelConnector for Connector { +impl CommunicationChannelConnector for Connector { fn try_open_sender(self) -> Result { let msg = "Unable to try open communication channel"; @@ -276,12 +276,12 @@ impl CommunicationChannelParticipant for Receiver { } } -impl CommunicationChannelReceiver for Receiver { +impl CommunicationChannelReceiver for Receiver { fn buffer_size(&self) -> usize { self.management().index_queue.capacity() } - fn receive(&self) -> Result, CommunicationChannelReceiveError> { + fn receive(&self) -> Result, CommunicationChannelReceiveError> { Ok(unsafe { self.management().index_queue.pop() }) } } @@ -309,8 +309,8 @@ impl NamedConcept for Sender { } } -impl CommunicationChannelSender for Sender { - fn send(&self, value: &usize) -> Result, CommunicationChannelSendError> { +impl CommunicationChannelSender for Sender { + fn send(&self, value: &u64) -> Result, CommunicationChannelSendError> { match self.try_send(value) { Err(CommunicationChannelSendError::ReceiverCacheIsFull) => { fail!(from self, with CommunicationChannelSendError::ReceiverCacheIsFull, @@ -321,7 +321,7 @@ impl CommunicationChannelSender for Sender { } } - fn try_send(&self, value: &usize) -> Result, CommunicationChannelSendError> { + fn try_send(&self, value: &u64) -> Result, CommunicationChannelSendError> { if !self.management().enable_safe_overflow && self.management().index_queue.is_full() { return Err(CommunicationChannelSendError::ReceiverCacheIsFull); } diff --git a/iceoryx2-cal/src/communication_channel/process_local.rs b/iceoryx2-cal/src/communication_channel/process_local.rs index 70e755f70..5e79f6ff7 100644 --- a/iceoryx2-cal/src/communication_channel/process_local.rs +++ b/iceoryx2-cal/src/communication_channel/process_local.rs @@ -129,7 +129,7 @@ impl NamedConceptBuilder for Creator { } } -impl CommunicationChannelCreator for Creator { +impl CommunicationChannelCreator for Creator { fn enable_safe_overflow(mut self) -> Self { self.enable_safe_overflow = true; self @@ -190,7 +190,7 @@ impl NamedConceptBuilder for Connector { } } -impl CommunicationChannelConnector for Connector { +impl CommunicationChannelConnector for Connector { fn open_sender(self) -> Result { let msg = "Failed to open sender"; let origin = format!("{:?}", self); @@ -292,8 +292,8 @@ impl NamedConcept for Duplex { } } -impl CommunicationChannelSender for Duplex { - fn send(&self, data: &usize) -> Result, CommunicationChannelSendError> { +impl CommunicationChannelSender for Duplex { + fn send(&self, data: &u64) -> Result, CommunicationChannelSendError> { let msg = "Unable to send data"; match self.try_send(data) { Err(CommunicationChannelSendError::ReceiverCacheIsFull) => { @@ -308,7 +308,7 @@ impl CommunicationChannelSender for Duplex { } } - fn try_send(&self, data: &usize) -> Result, CommunicationChannelSendError> { + fn try_send(&self, data: &u64) -> Result, CommunicationChannelSendError> { if !self.management.enable_safe_overflow && self.management.queue.is_full() { return Err(CommunicationChannelSendError::ReceiverCacheIsFull); } @@ -330,12 +330,12 @@ impl CommunicationChannelParticipant for Duplex { } } -impl CommunicationChannelReceiver for Duplex { +impl CommunicationChannelReceiver for Duplex { fn buffer_size(&self) -> usize { self.management.queue.capacity() } - fn receive(&self) -> Result, CommunicationChannelReceiveError> { + fn receive(&self) -> Result, CommunicationChannelReceiveError> { Ok(self.management.queue.acquire_consumer().unwrap().pop()) } } @@ -407,7 +407,7 @@ impl NamedConceptMgmt for Channel { } } -impl CommunicationChannel for Channel { +impl CommunicationChannel for Channel { type Sender = Duplex; type Connector = Connector; type Creator = Creator; diff --git a/iceoryx2-cal/src/dynamic_storage/mod.rs b/iceoryx2-cal/src/dynamic_storage/mod.rs index 28ac9646c..e58d9de7e 100644 --- a/iceoryx2-cal/src/dynamic_storage/mod.rs +++ b/iceoryx2-cal/src/dynamic_storage/mod.rs @@ -109,7 +109,8 @@ pub trait DynamicStorageBuilder<'builder, T: Send + Sync, D: DynamicStorage>: /// Defines if a newly created [`DynamicStorage`] owns the underlying resources fn has_ownership(self, value: bool) -> Self; - /// Sets the size of the supplementary data + /// Sets the size of the supplementary data. Only relevant when it is newly created otherwise + /// the already initialized [`DynamicStorage`] with the full size is used. fn supplementary_size(self, value: usize) -> Self; /// The timeout defines how long the [`DynamicStorageBuilder`] should wait for diff --git a/iceoryx2-cal/src/dynamic_storage/posix_shared_memory.rs b/iceoryx2-cal/src/dynamic_storage/posix_shared_memory.rs index 3101968fd..1ea2f873b 100644 --- a/iceoryx2-cal/src/dynamic_storage/posix_shared_memory.rs +++ b/iceoryx2-cal/src/dynamic_storage/posix_shared_memory.rs @@ -175,7 +175,7 @@ impl NamedConceptBuilder> for Builder<'_, T> impl Builder<'_, T> { fn open_impl(&self) -> Result, DynamicStorageOpenError> { - let msg = "Failed to open "; + let msg = "Failed to open posix_shared_memory::DynamicStorage"; let full_name = self.config.path_for(&self.storage_name).file_name(); let mut wait_for_read_write_access = fail!(from self, when AdaptiveWaitBuilder::new().create(), @@ -207,12 +207,6 @@ impl Builder<'_, T> { "{} since the adaptive wait call failed.", msg); }; - let required_size = std::mem::size_of::>() + self.supplementary_size; - if shm.size() < required_size { - fail!(from self, with DynamicStorageOpenError::InternalError, - "{} since the actual size {} does not match the required size of {}.", msg, shm.size(), required_size); - } - let init_state = shm.base_address().as_ptr() as *const Data; loop { diff --git a/iceoryx2-cal/src/shared_memory/mod.rs b/iceoryx2-cal/src/shared_memory/mod.rs index 24edf1a32..f5ec16ffd 100644 --- a/iceoryx2-cal/src/shared_memory/mod.rs +++ b/iceoryx2-cal/src/shared_memory/mod.rs @@ -111,7 +111,9 @@ pub trait SharedMemoryBuilder Self; - /// Sets the size of the [`SharedMemory`] + /// Sets the size of the [`SharedMemory`]. Only relevant when the [`SharedMemory`] is created + /// otherwise the already initialized [`SharedMemory`] is completely mapped into the process + /// space. fn size(self, value: usize) -> Self; /// The timeout defines how long the [`SharedMemoryBuilder`] should wait for diff --git a/iceoryx2-cal/src/shm_allocator/pointer_offset.rs b/iceoryx2-cal/src/shm_allocator/pointer_offset.rs index da8eb1b10..692ba38f4 100644 --- a/iceoryx2-cal/src/shm_allocator/pointer_offset.rs +++ b/iceoryx2-cal/src/shm_allocator/pointer_offset.rs @@ -48,8 +48,23 @@ pub struct PointerOffset(u64); impl PointerOffset { /// Creates a new [`PointerOffset`] from the given offset value with the [`SegmentId`] == 0. pub const fn new(offset: usize) -> PointerOffset { - const SEGMENT_ID: u64 = 0; - Self((offset as u64) << (SegmentIdUnderlyingType::BITS) | SEGMENT_ID) + const SEGMENT_ID: u8 = 0; + Self::from_offset_and_segment_id(offset, SegmentId::new(SEGMENT_ID)) + } + + /// Creates a new [`PointerOffset`] from an offset and a [`SegmentId`] + pub const fn from_offset_and_segment_id(offset: usize, segment_id: SegmentId) -> PointerOffset { + Self((offset as u64) << (SegmentIdUnderlyingType::BITS) | segment_id.value() as u64) + } + + /// Creates a new [`PointerOffset`] from a provided raw value. + pub const fn from_value(value: u64) -> PointerOffset { + Self(value) + } + + /// Returns the underlying raw value of the [`PointerOffset`] + pub const fn as_value(&self) -> u64 { + self.0 } /// Sets the [`SegmentId`] of the [`PointerOffset`]. diff --git a/iceoryx2-cal/src/zero_copy_connection/common.rs b/iceoryx2-cal/src/zero_copy_connection/common.rs index d9a151b00..1cf607f98 100644 --- a/iceoryx2-cal/src/zero_copy_connection/common.rs +++ b/iceoryx2-cal/src/zero_copy_connection/common.rs @@ -23,7 +23,9 @@ pub mod details { DynamicStorageOpenOrCreateError, }; use crate::named_concept::*; + use crate::shared_memory::SegmentId; pub use crate::zero_copy_connection::*; + use iceoryx2_bb_container::vec::RelocatableVec; use iceoryx2_bb_elementary::relocatable_container::RelocatableContainer; use iceoryx2_bb_lock_free::spsc::{ index_queue::RelocatableIndexQueue, @@ -153,10 +155,11 @@ pub mod details { pub struct SharedManagementData { submission_channel: RelocatableSafelyOverflowingIndexQueue, completion_channel: RelocatableIndexQueue, - used_chunk_list: RelocatableUsedChunkList, + used_chunk_list: RelocatableVec, max_borrowed_samples: usize, sample_size: usize, - number_of_samples: usize, + number_of_samples_per_segment: usize, + number_of_segments: u8, state: IoxAtomicU8, init_state: IoxAtomicU64, enable_safe_overflow: bool, @@ -169,7 +172,8 @@ pub mod details { enable_safe_overflow: bool, max_borrowed_samples: usize, sample_size: usize, - number_of_samples: usize, + number_of_samples_per_segment: usize, + number_of_segments: u8, ) -> Self { Self { submission_channel: unsafe { @@ -180,13 +184,14 @@ pub mod details { completion_channel: unsafe { RelocatableIndexQueue::new_uninit(completion_channel_buffer_capacity) }, - used_chunk_list: unsafe { RelocatableUsedChunkList::new_uninit(number_of_samples) }, + used_chunk_list: unsafe { RelocatableVec::new_uninit(number_of_segments as usize) }, state: IoxAtomicU8::new(State::None.value()), init_state: IoxAtomicU64::new(0), enable_safe_overflow, sample_size, max_borrowed_samples, - number_of_samples, + number_of_samples_per_segment, + number_of_segments, } } @@ -194,12 +199,16 @@ pub mod details { submission_channel_buffer_capacity: usize, completion_channel_buffer_capacity: usize, number_of_samples: usize, + number_of_segments: u8, ) -> usize { + let number_of_segments = number_of_segments as usize; RelocatableIndexQueue::const_memory_size(completion_channel_buffer_capacity) + RelocatableSafelyOverflowingIndexQueue::const_memory_size( submission_channel_buffer_capacity, ) + RelocatableUsedChunkList::const_memory_size(number_of_samples) + * number_of_segments + + RelocatableVec::::const_memory_size(number_of_segments) } } @@ -210,7 +219,8 @@ pub mod details { enable_safe_overflow: bool, max_borrowed_samples: usize, sample_size: usize, - number_of_samples: usize, + number_of_samples_per_segment: usize, + number_of_segments: u8, timeout: Duration, config: Configuration, } @@ -228,7 +238,8 @@ pub mod details { let supplementary_size = SharedManagementData::const_memory_size( self.submission_channel_size(), self.completion_channel_size(), - self.number_of_samples, + self.number_of_samples_per_segment, + self.number_of_segments, ); let msg = "Failed to acquire underlying shared memory"; @@ -244,7 +255,22 @@ pub mod details { fatal_panic!(from self, when unsafe { data.completion_channel.init(allocator) }, "{} since the retrieve channel allocation failed. - This is an implementation bug!", msg); fatal_panic!(from self, when unsafe { data.used_chunk_list.init(allocator) }, - "{} since the used chunk list allocation failed. - This is an implementation bug!", msg); + "{} since the used chunk list vector allocation failed. - This is an implementation bug!", msg); + + for _ in 0..self.number_of_segments { + if !unsafe { + data.used_chunk_list.push(RelocatableUsedChunkList::new_uninit(self.number_of_samples_per_segment)) + } { + fatal_panic!(from self, + "{} since the used chunk list could not be added. - This is an implementation bug!", msg); + } + } + + for (n, used_chunk_list) in data.used_chunk_list.iter_mut().enumerate() { + fatal_panic!(from self, when unsafe { used_chunk_list.init(allocator) }, + "{} since the used chunk list for segment id {} failed to allocate memory. - This is an implementation bug!", + msg, n); + } true }) @@ -255,7 +281,8 @@ pub mod details { self.enable_safe_overflow, self.max_borrowed_samples, self.sample_size, - self.number_of_samples, + self.number_of_samples_per_segment, + self.number_of_segments ) ); @@ -314,10 +341,17 @@ pub mod details { msg, self.sample_size, storage.get().sample_size); } - if storage.get().number_of_samples != self.number_of_samples { + if storage.get().number_of_samples_per_segment != self.number_of_samples_per_segment + { fail!(from self, with ZeroCopyCreationError::IncompatibleNumberOfSamples, "{} since the requested number of samples is set to {} but should be set to {}.", - msg, self.number_of_samples, storage.get().number_of_samples); + msg, self.number_of_samples_per_segment, storage.get().number_of_samples_per_segment); + } + + if storage.get().number_of_segments != self.number_of_segments { + fail!(from self, with ZeroCopyCreationError::IncompatibleNumberOfSegments, + "{} since the requested number of segments is set to {} but should be set to {}.", + msg, self.number_of_segments, storage.get().number_of_segments); } } @@ -367,7 +401,8 @@ pub mod details { enable_safe_overflow: DEFAULT_ENABLE_SAFE_OVERFLOW, max_borrowed_samples: DEFAULT_MAX_BORROWED_SAMPLES, sample_size: 0, - number_of_samples: 0, + number_of_samples_per_segment: 0, + number_of_segments: DEFAULT_MAX_SUPPORTED_SHARED_MEMORY_SEGMENTS, config: Configuration::default(), timeout: Duration::ZERO, } @@ -382,6 +417,11 @@ pub mod details { impl> ZeroCopyConnectionBuilder> for Builder { + fn max_supported_shared_memory_segments(mut self, value: u8) -> Self { + self.number_of_segments = value.max(1); + self + } + fn buffer_size(mut self, value: usize) -> Self { self.buffer_size = value.clamp(1, usize::MAX); self @@ -397,8 +437,8 @@ pub mod details { self } - fn number_of_samples(mut self, value: usize) -> Self { - self.number_of_samples = value; + fn number_of_samples_per_segment(mut self, value: usize) -> Self { + self.number_of_samples_per_segment = value; self } @@ -470,6 +510,10 @@ pub mod details { self.storage.get().submission_channel.capacity() } + fn max_supported_shared_memory_segments(&self) -> u8 { + self.storage.get().number_of_segments + } + fn max_borrowed_samples(&self) -> usize { self.storage.get().max_borrowed_samples } @@ -487,37 +531,35 @@ pub mod details { impl> ZeroCopySender for Sender { fn try_send(&self, ptr: PointerOffset) -> Result, ZeroCopySendError> { let msg = "Unable to send sample"; + let storage = self.storage.get(); - if !self.storage.get().enable_safe_overflow - && self.storage.get().submission_channel.is_full() - { + if !storage.enable_safe_overflow && storage.submission_channel.is_full() { fail!(from self, with ZeroCopySendError::ReceiveBufferFull, "{} since the receive buffer is full.", msg); } - if !self - .storage - .get() - .used_chunk_list - .insert(ptr.offset() / self.storage.get().sample_size) - { - fail!(from self, with ZeroCopySendError::UsedChunkListFull, - "{} since the used chunk list is full.", msg); - } + let segment_id = ptr.segment_id().value() as usize; + let sample_size = storage.sample_size; + debug_assert!(ptr.offset() % sample_size == 0); + let index = ptr.offset() / sample_size; + + debug_assert!(segment_id < storage.number_of_segments as usize); + + let did_not_send_same_offset_twice = storage.used_chunk_list[segment_id].insert(index); + debug_assert!(did_not_send_same_offset_twice); - match unsafe { self.storage.get().submission_channel.push(ptr.offset()) } { + match unsafe { storage.submission_channel.push(ptr.as_value()) } { Some(v) => { - if !self - .storage - .get() - .used_chunk_list - .remove(v / self.storage.get().sample_size) + let pointer_offset = PointerOffset::from_value(v); + debug_assert!(pointer_offset.offset() % sample_size == 0); + if !storage.used_chunk_list[pointer_offset.segment_id().value() as usize] + .remove(pointer_offset.offset() / sample_size) { fail!(from self, with ZeroCopySendError::ConnectionCorrupted, - "{} since an invalid offset was returned on overflow.", msg); + "{} since the invalid offset {:?} was returned on overflow.", msg, pointer_offset); } - Ok(Some(PointerOffset::new(v))) + Ok(Some(pointer_offset)) } None => Ok(None), } @@ -539,29 +581,46 @@ pub mod details { } fn reclaim(&self) -> Result, ZeroCopyReclaimError> { - match unsafe { self.storage.get().completion_channel.pop() } { + let msg = "Unable to reclaim sample"; + + let storage = self.storage.get(); + match unsafe { storage.completion_channel.pop() } { None => Ok(None), Some(v) => { - if !self - .storage - .get() - .used_chunk_list - .remove(v / self.storage.get().sample_size) + let pointer_offset = PointerOffset::from_value(v); + let segment_id = pointer_offset.segment_id().value() as usize; + + debug_assert!(segment_id < storage.number_of_segments as usize); + + if segment_id >= storage.used_chunk_list.len() { + fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedPointerOffset, + "{} since the receiver returned a non-existing segment id {:?}.", + msg, pointer_offset); + } + + debug_assert!(pointer_offset.offset() % storage.sample_size == 0); + if !storage.used_chunk_list[segment_id] + .remove(pointer_offset.offset() / storage.sample_size) { - fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedOffset, - "Unable to reclaim sample since the receiver returned the corrupted offset {}.", v); + fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedPointerOffset, + "{} since the receiver returned a corrupted offset {:?}.", + msg, pointer_offset); } - Ok(Some(PointerOffset::new(v))) + Ok(Some(pointer_offset)) } } } unsafe fn acquire_used_offsets(&self, mut callback: F) { let sample_size = self.storage.get().sample_size; - self.storage - .get() - .used_chunk_list - .remove_all(|index| callback(PointerOffset::new(index * sample_size))); + for (n, used_chunk_list) in self.storage.get().used_chunk_list.iter().enumerate() { + used_chunk_list.remove_all(|index| { + callback(PointerOffset::from_offset_and_segment_id( + index * sample_size, + SegmentId::new(n as u8), + )) + }); + } } } @@ -600,6 +659,10 @@ pub mod details { self.storage.get().submission_channel.capacity() } + fn max_supported_shared_memory_segments(&self) -> u8 { + self.storage.get().number_of_segments + } + fn max_borrowed_samples(&self) -> usize { self.storage.get().max_borrowed_samples } @@ -630,13 +693,13 @@ pub mod details { None => Ok(None), Some(v) => { *self.borrow_counter() += 1; - Ok(Some(PointerOffset::new(v))) + Ok(Some(PointerOffset::from_value(v))) } } } fn release(&self, ptr: PointerOffset) -> Result<(), ZeroCopyReleaseError> { - match unsafe { self.storage.get().completion_channel.push(ptr.offset()) } { + match unsafe { self.storage.get().completion_channel.push(ptr.as_value()) } { true => { *self.borrow_counter() -= 1; Ok(()) diff --git a/iceoryx2-cal/src/zero_copy_connection/mod.rs b/iceoryx2-cal/src/zero_copy_connection/mod.rs index 754df1ae0..d3900b23b 100644 --- a/iceoryx2-cal/src/zero_copy_connection/mod.rs +++ b/iceoryx2-cal/src/zero_copy_connection/mod.rs @@ -37,6 +37,7 @@ pub enum ZeroCopyCreationError { IncompatibleOverflowSetting, IncompatibleSampleSize, IncompatibleNumberOfSamples, + IncompatibleNumberOfSegments, } impl std::fmt::Display for ZeroCopyCreationError { @@ -77,7 +78,7 @@ impl std::error::Error for ZeroCopyReceiveError {} #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ZeroCopyReclaimError { - ReceiverReturnedCorruptedOffset, + ReceiverReturnedCorruptedPointerOffset, } impl std::fmt::Display for ZeroCopyReclaimError { @@ -104,12 +105,14 @@ impl std::error::Error for ZeroCopyReleaseError {} pub const DEFAULT_BUFFER_SIZE: usize = 4; pub const DEFAULT_ENABLE_SAFE_OVERFLOW: bool = false; pub const DEFAULT_MAX_BORROWED_SAMPLES: usize = 4; +pub const DEFAULT_MAX_SUPPORTED_SHARED_MEMORY_SEGMENTS: u8 = 1; pub trait ZeroCopyConnectionBuilder: NamedConceptBuilder { fn buffer_size(self, value: usize) -> Self; fn enable_safe_overflow(self, value: bool) -> Self; fn receiver_max_borrowed_samples(self, value: usize) -> Self; - fn number_of_samples(self, value: usize) -> Self; + fn max_supported_shared_memory_segments(self, value: u8) -> Self; + fn number_of_samples_per_segment(self, value: usize) -> Self; /// The timeout defines how long the [`ZeroCopyConnectionBuilder`] should wait for /// concurrent /// [`ZeroCopyConnectionBuilder::create_sender()`] or @@ -125,6 +128,7 @@ pub trait ZeroCopyPortDetails { fn buffer_size(&self) -> usize; fn has_enabled_safe_overflow(&self) -> bool; fn max_borrowed_samples(&self) -> usize; + fn max_supported_shared_memory_segments(&self) -> u8; fn is_connected(&self) -> bool; } diff --git a/iceoryx2-cal/tests/communication_channel_trait_tests.rs b/iceoryx2-cal/tests/communication_channel_trait_tests.rs index d4b3f22c6..6c944b4c7 100644 --- a/iceoryx2-cal/tests/communication_channel_trait_tests.rs +++ b/iceoryx2-cal/tests/communication_channel_trait_tests.rs @@ -22,7 +22,7 @@ mod communication_channel { use iceoryx2_cal::testing::*; #[test] - fn names_are_set_correctly>() { + fn names_are_set_correctly>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -40,7 +40,7 @@ mod communication_channel { } #[test] - fn buffer_size_is_by_default_at_least_provided_constant>() { + fn buffer_size_is_by_default_at_least_provided_constant>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -53,7 +53,7 @@ mod communication_channel { } #[test] - fn safe_overflow_is_disabled_by_default>() { + fn safe_overflow_is_disabled_by_default>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -71,7 +71,7 @@ mod communication_channel { } #[test] - fn create_remove_and_create_again_works>() { + fn create_remove_and_create_again_works>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -94,7 +94,7 @@ mod communication_channel { } #[test] - fn connecting_to_non_existing_channel_fails>() { + fn connecting_to_non_existing_channel_fails>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -109,7 +109,7 @@ mod communication_channel { } #[test] - fn connecting_to_receiver_works>() { + fn connecting_to_receiver_works>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -125,7 +125,7 @@ mod communication_channel { } #[test] - fn connecting_after_first_connection_has_dropped_works>() { + fn connecting_after_first_connection_has_dropped_works>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -147,7 +147,7 @@ mod communication_channel { } #[test] - fn send_and_receive_works_for_single_packets>() { + fn send_and_receive_works_for_single_packets>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -163,7 +163,7 @@ mod communication_channel { const MAX_NUMBER_OF_PACKETS: usize = 16; for i in 0..MAX_NUMBER_OF_PACKETS { - let data: usize = 12 * i; + let data: u64 = 12 * i as u64; assert_that!(sut_sender.send(&data), is_ok); let received = sut_receiver.receive(); @@ -175,7 +175,7 @@ mod communication_channel { } #[test] - fn send_and_receive_for_multi_packets_has_queue_behavior>() { + fn send_and_receive_for_multi_packets_has_queue_behavior>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -193,13 +193,13 @@ mod communication_channel { for i in 0..MAX_NUMBER_OF_PACKETS { for k in 0..sut_receiver.buffer_size() { - let data: usize = 12 * i + k; + let data: u64 = (12 * i + k) as u64; assert_that!(sut_sender.send(&data), is_ok); } for k in 0..sut_receiver.buffer_size() { - let data: usize = 12 * i + k; + let data: u64 = (12 * i + k) as u64; let received = sut_receiver.receive(); assert_that!(received, is_ok); @@ -211,7 +211,7 @@ mod communication_channel { } #[test] - fn receive_without_transmission_returns_none>() { + fn receive_without_transmission_returns_none>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -231,7 +231,7 @@ mod communication_channel { } #[test] - fn send_will_return_receiver_cache_full_when_cache_is_full>() { + fn send_will_return_receiver_cache_full_when_cache_is_full>() { let storage_name = generate_name(); let config = generate_isolated_config::(); @@ -245,7 +245,7 @@ mod communication_channel { .open_sender() .unwrap(); - let mut send_counter: usize = 0; + let mut send_counter: u64 = 0; loop { let result = sut_sender.send(&send_counter); if result.is_err() { @@ -258,12 +258,12 @@ mod communication_channel { send_counter += 1; - if send_counter == sut_receiver.buffer_size() { + if send_counter as usize == sut_receiver.buffer_size() { break; } } - let mut receive_counter: usize = 0; + let mut receive_counter: u64 = 0; loop { let result = sut_receiver.receive(); assert_that!(result, is_ok); @@ -276,11 +276,11 @@ mod communication_channel { } assert_that!(send_counter, eq receive_counter); - assert_that!(send_counter, ge sut_receiver.buffer_size()); + assert_that!(send_counter, ge sut_receiver.buffer_size() as u64); } #[test] - fn safe_overflow_works>() { + fn safe_overflow_works>() { if !Sut::does_support_safe_overflow() { return; } @@ -302,21 +302,21 @@ mod communication_channel { assert_that!(sut_receiver.does_enable_safe_overflow(), eq true); for i in 0..sut_receiver.buffer_size() { - assert_that!(sut_sender.send(&i), is_ok); + assert_that!(sut_sender.send(&(i as u64)), is_ok); } const NUMBER_OF_PACKETS: usize = 128; for i in sut_receiver.buffer_size()..NUMBER_OF_PACKETS { - let data = sut_sender.send(&i).unwrap(); + let data = sut_sender.send(&(i as u64)).unwrap(); assert_that!(data, is_some); - assert_that!({ data.unwrap() }, eq i - sut_receiver.buffer_size()); + assert_that!({ data.unwrap() as usize }, eq i - sut_receiver.buffer_size()); } } #[test] - fn custom_buffer_size_works>() { + fn custom_buffer_size_works>() { if !Sut::has_configurable_buffer_size() { return; } @@ -338,19 +338,19 @@ mod communication_channel { assert_that!(sut_receiver.buffer_size(), ge buffer_size); for i in 0..buffer_size { - assert_that!(sut_sender.send(&i), is_ok); + assert_that!(sut_sender.send(&(i as u64)), is_ok); } for i in 0..buffer_size { let data = sut_receiver.receive().unwrap(); assert_that!(data, is_some); - assert_that!(data.unwrap(), eq i); + assert_that!(data.unwrap(), eq i as u64); } } } #[test] - fn custom_buffer_size_and_overflow_works>() { + fn custom_buffer_size_and_overflow_works>() { if !Sut::has_configurable_buffer_size() || !Sut::does_support_safe_overflow() { return; } @@ -373,25 +373,25 @@ mod communication_channel { assert_that!(sut_receiver.buffer_size(), eq buffer_size); for i in 0..buffer_size { - assert_that!(sut_sender.send(&i), is_ok); + assert_that!(sut_sender.send(&(i as u64)), is_ok); } for i in buffer_size..buffer_size * 2 { - let result = sut_sender.send(&i).unwrap(); + let result = sut_sender.send(&(i as u64)).unwrap(); assert_that!(result, is_some); - assert_that!(result.unwrap(), eq i - buffer_size); + assert_that!(result.unwrap() as usize, eq i - buffer_size); } for i in 0..buffer_size { let data = sut_receiver.receive().unwrap(); assert_that!(data, is_some); - assert_that!(data.unwrap(), eq i + buffer_size); + assert_that!(data.unwrap() as usize, eq i + buffer_size); } } } #[test] - fn list_channels_works>() { + fn list_channels_works>() { let mut sut_names = vec![]; let mut suts = vec![]; const LIMIT: usize = 8; @@ -437,7 +437,7 @@ mod communication_channel { } #[test] - fn custom_suffix_keeps_channels_separated>() { + fn custom_suffix_keeps_channels_separated>() { let _watch_dog = Watchdog::new(); let config = generate_isolated_config::(); @@ -489,7 +489,7 @@ mod communication_channel { } #[test] - fn defaults_for_configuration_are_set_correctly>() { + fn defaults_for_configuration_are_set_correctly>() { let config = ::Configuration::default(); assert_that!(*config.get_suffix(), eq Sut::default_suffix()); assert_that!(*config.get_path_hint(), eq Sut::default_path_hint()); @@ -497,7 +497,7 @@ mod communication_channel { } //#[cfg(not(any(target_os = "windows")))] - #[instantiate_tests(>)] + #[instantiate_tests(>)] mod unix_datagram {} #[instantiate_tests()] @@ -507,6 +507,6 @@ mod communication_channel { mod process_local {} #[cfg(not(any(target_os = "windows", target_os = "macos")))] - #[instantiate_tests(>)] + #[instantiate_tests(>)] mod message_queue {} } diff --git a/iceoryx2-cal/tests/zero_copy_connection_posix_shared_memory_tests.rs b/iceoryx2-cal/tests/zero_copy_connection_posix_shared_memory_tests.rs index d946a20a2..63bdcffda 100644 --- a/iceoryx2-cal/tests/zero_copy_connection_posix_shared_memory_tests.rs +++ b/iceoryx2-cal/tests/zero_copy_connection_posix_shared_memory_tests.rs @@ -49,7 +49,7 @@ mod zero_copy_connection_posix_shared_memory_tests { let start = std::time::SystemTime::now(); let sut = ::Builder::new(&storage_name) .timeout(TIMEOUT) - .number_of_samples(1) + .number_of_samples_per_segment(1) .receiver_max_borrowed_samples(1) .create_sender(1); diff --git a/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs b/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs index 37534a54b..0413acd50 100644 --- a/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs +++ b/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs @@ -22,7 +22,7 @@ mod zero_copy_connection { use iceoryx2_bb_testing::assert_that; use iceoryx2_cal::named_concept::*; use iceoryx2_cal::named_concept::{NamedConceptBuilder, NamedConceptMgmt}; - use iceoryx2_cal::shm_allocator::PointerOffset; + use iceoryx2_cal::shm_allocator::{PointerOffset, SegmentId}; use iceoryx2_cal::testing::{generate_isolated_config, generate_name}; use iceoryx2_cal::zero_copy_connection; use iceoryx2_cal::zero_copy_connection::*; @@ -39,14 +39,14 @@ mod zero_copy_connection { assert_that!( Sut::Builder::new(&name) .config(&config) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .create_receiver(SAMPLE_SIZE), is_ok ); assert_that!( Sut::Builder::new(&name) .config(&config) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .create_sender(SAMPLE_SIZE), is_ok ); @@ -58,14 +58,14 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); assert_that!(!sut_sender.is_connected(), eq true); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -82,7 +82,7 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); @@ -97,7 +97,7 @@ mod zero_copy_connection { ); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -118,18 +118,18 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let _sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let _sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE); assert_that!(sut_sender, is_err); @@ -139,7 +139,7 @@ mod zero_copy_connection { ); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE); assert_that!(sut_receiver, is_err); @@ -155,19 +155,19 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let _sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let _sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); drop(_sut_sender); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE); assert_that!(sut_sender, is_ok); @@ -179,19 +179,19 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let _sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let _sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); drop(_sut_receiver); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE); assert_that!(sut_receiver, is_ok); @@ -203,12 +203,12 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -226,14 +226,14 @@ mod zero_copy_connection { let _sut_sender = Sut::Builder::new(&name) .buffer_size(12) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) .buffer_size(16) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE); @@ -247,14 +247,14 @@ mod zero_copy_connection { let _sut_sender = Sut::Builder::new(&name) .receiver_max_borrowed_samples(2) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) .receiver_max_borrowed_samples(4) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE); @@ -268,13 +268,13 @@ mod zero_copy_connection { let _sut_sender = Sut::Builder::new(&name) .enable_safe_overflow(true) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE); @@ -291,13 +291,13 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let _sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(2 * SAMPLE_SIZE); @@ -314,13 +314,13 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let _sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES * 2) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES * 2) .config(&config) .create_receiver(SAMPLE_SIZE); @@ -333,12 +333,12 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -367,12 +367,12 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -395,7 +395,7 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .buffer_size(BUFFER_SIZE) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); @@ -421,7 +421,7 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .buffer_size(BUFFER_SIZE) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .enable_safe_overflow(true) .config(&config) .create_sender(SAMPLE_SIZE) @@ -453,7 +453,7 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(BUFFER_SIZE) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); @@ -469,7 +469,7 @@ mod zero_copy_connection { let receiver = Sut::Builder::new(&name) .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(BUFFER_SIZE) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -487,7 +487,7 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -507,7 +507,7 @@ mod zero_copy_connection { .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(MAX_BORROW) .enable_safe_overflow(true) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); @@ -515,7 +515,7 @@ mod zero_copy_connection { .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(MAX_BORROW) .enable_safe_overflow(true) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -558,7 +558,7 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .buffer_size(1) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config.lock().unwrap()) .create_sender(SAMPLE_SIZE) .unwrap(); @@ -573,7 +573,7 @@ mod zero_copy_connection { s.spawn(|| { let sut_receiver = Sut::Builder::new(&name) .buffer_size(1) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config.lock().unwrap()) .create_receiver(SAMPLE_SIZE) .unwrap(); @@ -616,13 +616,13 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(); let _sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .config(&config) .create_receiver(SAMPLE_SIZE) @@ -655,7 +655,7 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) @@ -663,7 +663,7 @@ mod zero_copy_connection { .create_sender(SAMPLE_SIZE) .unwrap(); let _sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) @@ -705,7 +705,7 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) @@ -713,7 +713,7 @@ mod zero_copy_connection { .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) @@ -750,7 +750,7 @@ mod zero_copy_connection { let config = generate_isolated_config::(); let sut_sender = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) @@ -758,7 +758,7 @@ mod zero_copy_connection { .create_sender(SAMPLE_SIZE) .unwrap(); let sut_receiver = Sut::Builder::new(&name) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) @@ -803,7 +803,7 @@ mod zero_copy_connection { assert_that!(::does_exist_cfg(&sut_names[i], &config), eq Ok(false)); std::mem::forget( Sut::Builder::new(&sut_names[i]) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) .create_sender(SAMPLE_SIZE) .unwrap(), @@ -853,7 +853,7 @@ mod zero_copy_connection { let sut_1 = Sut::Builder::new(&sut_name) .config(&config_1) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .create_sender(SAMPLE_SIZE) .unwrap(); @@ -864,7 +864,7 @@ mod zero_copy_connection { let sut_2 = Sut::Builder::new(&sut_name) .config(&config_2) - .number_of_samples(NUMBER_OF_SAMPLES) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .create_sender(SAMPLE_SIZE) .unwrap(); @@ -896,6 +896,444 @@ mod zero_copy_connection { assert_that!(*config.get_prefix(), eq Sut::default_prefix()); } + #[test] + fn sender_and_receiver_must_have_same_segment_id_requirements() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 123; + let name = generate_name(); + let config = generate_isolated_config::(); + + let _sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + let create_receiver = |number_of_segments: u8| { + Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(BUFFER_SIZE) + .max_supported_shared_memory_segments(number_of_segments) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + }; + + let sut_receiver = create_receiver(NUMBER_OF_SEGMENTS - 1); + assert_that!(sut_receiver.err(), eq Some(ZeroCopyCreationError::IncompatibleNumberOfSegments)); + + let sut_receiver = create_receiver(NUMBER_OF_SEGMENTS + 1); + assert_that!(sut_receiver.err(), eq Some(ZeroCopyCreationError::IncompatibleNumberOfSegments)); + + let sut_receiver = create_receiver(NUMBER_OF_SEGMENTS); + assert_that!(sut_receiver, is_ok); + } + + #[cfg(debug_assertions)] + #[should_panic] + #[test] + fn send_pointer_offset_with_out_of_bounds_segment_id_fails() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 123; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + // shall panic + sut_sender + .try_send(PointerOffset::from_offset_and_segment_id( + 0, + SegmentId::new(NUMBER_OF_SEGMENTS + 1), + )) + .unwrap(); + } + + #[cfg(debug_assertions)] + #[should_panic] + #[test] + fn release_pointer_offset_with_out_of_bounds_segment_id_fails() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 123; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_receiver = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + .unwrap(); + + // shall panic + sut_receiver + .release(PointerOffset::from_offset_and_segment_id( + 0, + SegmentId::new(NUMBER_OF_SEGMENTS + 1), + )) + .unwrap(); + } + + #[cfg(not(debug_assertions))] + #[test] + fn receive_pointer_offset_with_out_of_bounds_segment_id_fails() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 123; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + let sut_receiver = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + .unwrap(); + + sut_receiver + .release(PointerOffset::from_offset_and_segment_id( + 0, + SegmentId::new(NUMBER_OF_SEGMENTS + 1), + )) + .unwrap(); + + assert_that!(sut_sender.reclaim().err(), eq Some(ZeroCopyReclaimError::ReceiverReturnedCorruptedPointerOffset)); + } + + #[test] + fn setting_number_of_supported_shared_memory_segments_to_zero_sets_it_to_one< + Sut: ZeroCopyConnection, + >() { + const BUFFER_SIZE: usize = 10; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(BUFFER_SIZE) + .max_supported_shared_memory_segments(0) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + assert_that!(sut_sender.max_supported_shared_memory_segments(), eq 1); + } + + #[test] + fn receiver_cannot_borrow_more_samples_then_set_up_for_multiple_segments< + Sut: ZeroCopyConnection, + >() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 10; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + let sut_receiver = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + .unwrap(); + + for offset in 0..2 { + for n in 0..BUFFER_SIZE { + sut_sender + .try_send(PointerOffset::from_offset_and_segment_id( + offset * SAMPLE_SIZE, + SegmentId::new(n as u8), + )) + .unwrap(); + } + } + + let mut offsets = vec![]; + for _ in 0..BUFFER_SIZE { + offsets.push(sut_receiver.receive().unwrap().unwrap()); + } + + assert_that!(sut_receiver.receive().err(), eq Some(ZeroCopyReceiveError::ReceiveWouldExceedMaxBorrowValue)); + + for offset in offsets { + sut_receiver.release(offset).unwrap(); + assert_that!(sut_receiver.receive().unwrap(), is_some); + } + + assert_that!(sut_receiver.receive().err(), eq Some(ZeroCopyReceiveError::ReceiveWouldExceedMaxBorrowValue)); + } + + #[test] + fn receive_with_multiple_segments_works() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 10; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + let sut_receiver = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + .unwrap(); + + for k in 0..2 { + for n in 0..BUFFER_SIZE { + sut_sender + .try_send(PointerOffset::from_offset_and_segment_id( + k * SAMPLE_SIZE, + SegmentId::new(n as u8), + )) + .unwrap(); + } + } + + for k in 0..2 { + for n in 0..BUFFER_SIZE { + let offset = sut_receiver.receive().unwrap().unwrap(); + assert_that!(offset, eq PointerOffset::from_offset_and_segment_id( + k * SAMPLE_SIZE, + SegmentId::new(n as u8), + )); + sut_receiver.release(offset).unwrap(); + } + } + } + + #[test] + fn reclaim_works_with_multiple_segments() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 10; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + let sut_receiver = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + .unwrap(); + + for k in 0..2 { + for n in 0..BUFFER_SIZE { + sut_sender + .try_send(PointerOffset::from_offset_and_segment_id( + k * SAMPLE_SIZE, + SegmentId::new(n as u8), + )) + .unwrap(); + } + } + + for _ in 0..2 { + for _ in 0..BUFFER_SIZE { + let offset = sut_receiver.receive().unwrap().unwrap(); + sut_receiver.release(offset).unwrap(); + } + } + + for k in 0..2 { + for n in 0..BUFFER_SIZE { + let offset = sut_sender.reclaim().unwrap().unwrap(); + assert_that!(offset, eq PointerOffset::from_offset_and_segment_id( + k * SAMPLE_SIZE, + SegmentId::new(n as u8), + )); + } + } + } + + #[test] + fn acquire_used_offsets_works_with_multiple_segments() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 10; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + let _sut_receiver = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + .unwrap(); + + let mut offsets = vec![]; + for k in 0..2 { + for n in 0..BUFFER_SIZE { + let offset = PointerOffset::from_offset_and_segment_id( + k * SAMPLE_SIZE, + SegmentId::new(n as u8), + ); + sut_sender.try_send(offset).unwrap(); + offsets.push(offset); + } + } + + unsafe { + sut_sender.acquire_used_offsets(|offset| { + assert_that!(offsets, contains offset); + }) + }; + } + + #[cfg(debug_assertions)] + #[should_panic] + #[test] + fn panic_when_same_offset_is_sent_twice() { + const BUFFER_SIZE: usize = 10; + const NUMBER_OF_SEGMENTS: u8 = 10; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + let _sut_receiver = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(2 * BUFFER_SIZE) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(BUFFER_SIZE) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + .unwrap(); + + let offset = + PointerOffset::from_offset_and_segment_id(SAMPLE_SIZE, SegmentId::new(1 as u8)); + + assert_that!(sut_sender.try_send(offset), is_ok); + + // panics here + sut_sender.try_send(offset).unwrap(); + } + + #[test] + fn overflow_works_with_multiple_segments() { + const NUMBER_OF_SEGMENTS: u8 = 98; + let name = generate_name(); + let config = generate_isolated_config::(); + + let sut_sender = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(1) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(1) + .enable_safe_overflow(true) + .config(&config) + .create_sender(SAMPLE_SIZE) + .unwrap(); + + let _sut_receiver = Sut::Builder::new(&name) + .number_of_samples_per_segment(NUMBER_OF_SAMPLES) + .buffer_size(1) + .max_supported_shared_memory_segments(NUMBER_OF_SEGMENTS) + .receiver_max_borrowed_samples(1) + .enable_safe_overflow(true) + .config(&config) + .create_receiver(SAMPLE_SIZE) + .unwrap(); + + let overflow_sample = + PointerOffset::from_offset_and_segment_id(11 * SAMPLE_SIZE, SegmentId::new(73 as u8)); + sut_sender.try_send(overflow_sample).unwrap(); + + let returned_sample = sut_sender + .try_send(PointerOffset::from_offset_and_segment_id( + SAMPLE_SIZE, + SegmentId::new(1 as u8), + )) + .unwrap(); + + assert_that!(returned_sample, eq Some(overflow_sample)); + } + #[instantiate_tests()] mod posix_shared_memory {} diff --git a/iceoryx2/src/port/details/publisher_connections.rs b/iceoryx2/src/port/details/publisher_connections.rs index 081d7a2fd..507aae352 100644 --- a/iceoryx2/src/port/details/publisher_connections.rs +++ b/iceoryx2/src/port/details/publisher_connections.rs @@ -56,7 +56,7 @@ impl Connection { .buffer_size(this.buffer_size) .receiver_max_borrowed_samples(this.static_config.subscriber_max_borrowed_samples) .enable_safe_overflow(this.static_config.enable_safe_overflow) - .number_of_samples(details.number_of_samples) + .number_of_samples_per_segment(details.number_of_samples) .timeout(this.service_state.shared_node.config().global.service.creation_timeout) .create_receiver(this.static_config.message_type_details().sample_layout(details.max_slice_len).size()), "{} since the zero copy connection could not be established.", msg); diff --git a/iceoryx2/src/port/details/subscriber_connections.rs b/iceoryx2/src/port/details/subscriber_connections.rs index 852ea04da..e33815689 100644 --- a/iceoryx2/src/port/details/subscriber_connections.rs +++ b/iceoryx2/src/port/details/subscriber_connections.rs @@ -57,7 +57,7 @@ impl Connection { .buffer_size(subscriber_details.buffer_size) .receiver_max_borrowed_samples(this.static_config.subscriber_max_borrowed_samples) .enable_safe_overflow(this.static_config.enable_safe_overflow) - .number_of_samples(number_of_samples) + .number_of_samples_per_segment(number_of_samples) .timeout(this.shared_node.config().global.service.creation_timeout) .create_sender(this.static_config.message_type_details().sample_layout(max_slice_len).size()), "{}.", msg); diff --git a/iceoryx2/tests/service_tests.rs b/iceoryx2/tests/service_tests.rs index 00556648d..c72d63f01 100644 --- a/iceoryx2/tests/service_tests.rs +++ b/iceoryx2/tests/service_tests.rs @@ -253,7 +253,7 @@ mod service { Factory: SutFactory, >() { let _watch_dog = Watchdog::new(); - let number_of_threads = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 1024); + let number_of_threads = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 4); const NUMBER_OF_ITERATIONS: usize = 25; let test = Factory::new(); @@ -291,7 +291,7 @@ mod service { Factory: SutFactory, >() { let _watch_dog = Watchdog::new(); - let number_of_threads = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 1024); + let number_of_threads = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 4); const NUMBER_OF_ITERATIONS: usize = 25; let test = Factory::new(); @@ -343,7 +343,7 @@ mod service { set_log_level(LogLevel::Debug); let _watch_dog = Watchdog::new_with_timeout(Duration::from_secs(120)); const NUMBER_OF_CLOSE_THREADS: usize = 1; - let number_of_open_threads = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 1024); + let number_of_open_threads = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 4); let number_of_threads = NUMBER_OF_CLOSE_THREADS + number_of_open_threads; let test = Factory::new(); @@ -681,7 +681,7 @@ mod service { fn concurrent_service_creation_and_listing_works>() { let _watch_dog = Watchdog::new_with_timeout(Duration::from_secs(120)); let test = Factory::new(); - let number_of_creators = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 1024); + let number_of_creators = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 4); const NUMBER_OF_ITERATIONS: usize = 40; let barrier = Barrier::new(number_of_creators); @@ -726,7 +726,7 @@ mod service { >() { let _watch_dog = Watchdog::new_with_timeout(Duration::from_secs(120)); let test = Factory::new(); - let number_of_creators = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 1024); + let number_of_creators = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 4); const NUMBER_OF_ITERATIONS: usize = 30; let barrier = Barrier::new(number_of_creators); @@ -790,7 +790,7 @@ mod service { >() { let _watch_dog = Watchdog::new_with_timeout(Duration::from_secs(120)); let test = Factory::new(); - let number_of_creators = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 1024); + let number_of_creators = (SystemInfo::NumberOfCpuCores.value()).clamp(2, 4); const NUMBER_OF_ITERATIONS: usize = 30; let barrier = Barrier::new(number_of_creators); diff --git a/internal/scripts/iceoryx2_env.sh b/internal/scripts/iceoryx2_env.sh index f3d07bc92..0195dd913 100755 --- a/internal/scripts/iceoryx2_env.sh +++ b/internal/scripts/iceoryx2_env.sh @@ -49,7 +49,7 @@ setup_docker_image() { # ubuntu/debian and derivatives if command -v apt &>/dev/null; then apt update - apt -y install sudo git fish curl vim lsb-release software-properties-common gcc libacl1-dev libclang-dev + apt -y install sudo git fish curl vim lsb-release software-properties-common gcc libacl1-dev libclang-dev zlib1g-dev clang elif command -v pacman &>/dev/null; then pacman -Syu --noconfirm fish curl git vim clang else @@ -221,7 +221,7 @@ if [[ -z $OS_VERSION ]]; then OS_VERSION=$DEFAULT_OS_VERSION fi -CONTAINER_NAME=${CONTAINER_NAME_PREFIX}$(echo ${OS_VERSION} | tr : .) +CONTAINER_NAME=${CONTAINER_NAME_PREFIX}$(echo ${OS_VERSION} | tr : . | tr \/ .) if [[ $ACTION == "start" ]]; then start_docker