diff --git a/benchmarks/publish-subscribe/src/main.rs b/benchmarks/publish-subscribe/src/main.rs index 83f136a3d..8f3236749 100644 --- a/benchmarks/publish-subscribe/src/main.rs +++ b/benchmarks/publish-subscribe/src/main.rs @@ -61,7 +61,7 @@ fn perform_benchmark(args: &Args) -> Result<(), Box(args: &Args) -> Result<(), Box [!CAUTION] +> Every payload you transmit with iceoryx2 must be compatible with shared +> memory. Specifically, it must: +> +> * be self contained, no heap, no pointers to external sources +> * have a uniform memory representation, ensuring that shared structs have the +> same data layout +> * not use pointers to manage their internal structure +> +> Data types like `std::string` or `std::vector` will cause undefined behavior +> and may result in segmentation faults. We provide alternative data types +> that are compatible with shared memory. See the +> [complex data type example](../complex_data_types) for guidance on how to +> use them. + To achieve this, we create a copy of the global configuration, modify the setting `config.global.prefix` using the user-provided CLI argument, and then set up the example accordingly. diff --git a/examples/cxx/event_based_communication/README.md b/examples/cxx/event_based_communication/README.md index 868c4151e..2a3d09d68 100644 --- a/examples/cxx/event_based_communication/README.md +++ b/examples/cxx/event_based_communication/README.md @@ -2,6 +2,21 @@ ## Running The Example +> [!CAUTION] +> Every payload you transmit with iceoryx2 must be compatible with shared +> memory. Specifically, it must: +> +> * be self contained, no heap, no pointers to external sources +> * have a uniform memory representation, ensuring that shared structs have the +> same data layout +> * not use pointers to manage their internal structure +> +> Data types like `std::string` or `std::vector` will cause undefined behavior +> and may result in segmentation faults. We provide alternative data types +> that are compatible with shared memory. See the +> [complex data type example](../complex_data_types) for guidance on how to +> use them. + This example demonstrates iceoryx2's event multiplexing mechanism in a more complex setup. The iceoryx2 `Publisher` and `Subscriber` are integrated into custom `ExamplePublisher` and `ExampleSubscriber` classes, which also diff --git a/examples/cxx/publish_subscribe/README.md b/examples/cxx/publish_subscribe/README.md index de71f34a9..272495244 100644 --- a/examples/cxx/publish_subscribe/README.md +++ b/examples/cxx/publish_subscribe/README.md @@ -10,7 +10,8 @@ instructions in the [C++ Examples Readme](../README.md). > memory. Specifically, it must: > > * be self contained, no heap, no pointers to external sources -> * have a uniform memory representation -> `#[repr(C)]` +> * have a uniform memory representation, ensuring that shared structs have the +> same data layout > * not use pointers to manage their internal structure > > Data types like `std::string` or `std::vector` will cause undefined behavior diff --git a/examples/cxx/publish_subscribe_dynamic_data/README.md b/examples/cxx/publish_subscribe_dynamic_data/README.md new file mode 100644 index 000000000..958c45a9b --- /dev/null +++ b/examples/cxx/publish_subscribe_dynamic_data/README.md @@ -0,0 +1,68 @@ +# Publish-Subscribe With Dynamic Data (Slice Of Shared Memory Compatible Types) + +This example demonstrates how to send data when the maximum data size cannot +be predetermined and needs to be adjusted dynamically during the service's +runtime. iceoryx2 enables the reallocation of the publisher's data segment, +allowing users to send samples of arbitrary sizes. + +## Running The Example + +> [!CAUTION] +> Every payload you transmit with iceoryx2 must be compatible with shared +> memory. Specifically, it must: +> +> * be self contained, no heap, no pointers to external sources +> * have a uniform memory representation, ensuring that shared structs have the +> same data layout +> * not use pointers to manage their internal structure +> +> Data types like `std::string` or `std::vector` will cause undefined behavior +> and may result in segmentation faults. We provide alternative data types +> that are compatible with shared memory. See the +> [complex data type example](../complex_data_types) for guidance on how to +> use them. + +This example demonstrates a robust publisher-subscriber communication pattern +between two separate processes. A service with the payload type of an `u8` slice +is created, and every publisher can define a slice length hint they support +for communication with `initial_max_slice_len`. The publisher sends a message with +increasing size every second containing a piece of dynamic data. On the receiving +end, the subscriber checks for new data every second. + +The subscriber is printing the sample on the console whenever new data arrives. + +The `initial_max_slice_len` hint and the `AllocationStrategy` set by the +publisher will define how memory is reallocated when [`Publisher::loan_slice()`] +or [`Publisher::loan_slice_uninit()`] request more memory than it is available. + +First you have to build the C++ examples: + +```sh +cmake -S . -B target/ffi/build -DBUILD_EXAMPLES=ON +cmake --build target/ffi/build +``` + +To observe this dynamic communication in action, open two separate terminals and +execute the following commands: + +### Terminal 1 + +```sh +./target/ffi/build/examples/cxx/publish_subscribe_dynamic_data/example_cxx_publish_subscribe_dyn_subscriber +``` + +### Terminal 2 + +```sh +./target/ffi/build/examples/cxx/publish_subscribe_dynamic_data/example_cxx_publish_subscribe_dyn_publisher +``` + +Feel free to run multiple instances of publisher or subscriber processes +simultaneously to explore how iceoryx2 handles publisher-subscriber +communication efficiently. + +You may hit the maximum supported number of ports when too many publisher or +subscriber processes run. Take a look at the [iceoryx2 config](../../../config) +to set the limits globally or at the +[API of the Service builder](https://docs.rs/iceoryx2/latest/iceoryx2/service/index.html) +to set them for a single service. diff --git a/examples/cxx/publish_subscribe_dynamic_data/src/publisher.cpp b/examples/cxx/publish_subscribe_dynamic_data/src/publisher.cpp index e8c34ea3d..63c6af62a 100644 --- a/examples/cxx/publish_subscribe_dynamic_data/src/publisher.cpp +++ b/examples/cxx/publish_subscribe_dynamic_data/src/publisher.cpp @@ -34,14 +34,24 @@ auto main() -> int { // Since the payload type is uint8_t, this number is the same as the number of bytes in the payload. // For other types, number of bytes used by the payload will be max_slice_len * sizeof(Payload::ValueType) - const uint64_t maximum_elements = 1024; // NOLINT - auto publisher = - service.publisher_builder().max_slice_len(maximum_elements).create().expect("successful publisher creation"); + constexpr uint64_t INITIAL_SIZE_HINT = 16; + auto publisher = service + .publisher_builder() + // We guess that the samples are at most 16 bytes in size. + // This is just a hint to the underlying allocator and is purely optional + // The better the guess is the less reallocations will be performed + .initial_max_slice_len(INITIAL_SIZE_HINT) + // The underlying sample size will be increased with a power of two strategy + // when [`Publisher::loan_slice()`] or [`Publisher::loan_slice_uninit()`] require more + // memory than available. + .allocation_strategy(AllocationStrategy::PowerOfTwo) + .create() + .expect("successful publisher creation"); auto counter = 0; while (node.wait(CYCLE_TIME).has_value()) { - const auto required_memory_size = (counter % 16) + 1; // NOLINT + const auto required_memory_size = (counter + 1) * (counter + 1); // NOLINT auto sample = publisher.loan_slice_uninit(required_memory_size).expect("acquire sample"); sample.write_from_fn([&](auto byte_idx) { return (byte_idx + counter) % 255; }); // NOLINT diff --git a/examples/cxx/publish_subscribe_dynamic_data/src/subscriber.cpp b/examples/cxx/publish_subscribe_dynamic_data/src/subscriber.cpp index 4a789d4d3..f61bf500c 100644 --- a/examples/cxx/publish_subscribe_dynamic_data/src/subscriber.cpp +++ b/examples/cxx/publish_subscribe_dynamic_data/src/subscriber.cpp @@ -17,7 +17,6 @@ #include "iox2/service_type.hpp" #include -#include #include constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); @@ -37,11 +36,8 @@ auto main() -> int { auto sample = subscriber.receive().expect("receive succeeds"); while (sample.has_value()) { auto payload = sample->payload(); - std::cout << "received " << std::dec << static_cast(payload.number_of_bytes()) << " bytes: "; - for (auto byte : payload) { - std::cout << std::setw(2) << std::setfill('0') << std::hex << static_cast(byte) << " "; - } - std::cout << std::endl; + std::cout << "received " << std::dec << static_cast(payload.number_of_bytes()) << " bytes" + << std::endl; sample = subscriber.receive().expect("receive succeeds"); } } diff --git a/examples/cxx/publish_subscribe_with_user_header/README.md b/examples/cxx/publish_subscribe_with_user_header/README.md index 5a64eecb4..604991891 100644 --- a/examples/cxx/publish_subscribe_with_user_header/README.md +++ b/examples/cxx/publish_subscribe_with_user_header/README.md @@ -10,7 +10,8 @@ instructions in the [C++ Examples Readme](../README.md). > memory. Specifically, it must: > > * be self contained, no heap, no pointers to external sources -> * have a uniform memory representation -> `#[repr(C)]` +> * have a uniform memory representation, ensuring that shared structs have the +> same data layout > * not use pointers to manage their internal structure > > Data types like `std::string` or `std::vector` will cause undefined behavior diff --git a/examples/rust/publish_subscribe_dynamic_data/README.md b/examples/rust/publish_subscribe_dynamic_data/README.md index 637fd9e15..45f9f9ea0 100644 --- a/examples/rust/publish_subscribe_dynamic_data/README.md +++ b/examples/rust/publish_subscribe_dynamic_data/README.md @@ -1,5 +1,10 @@ # Publish-Subscribe With Dynamic Data (Slice Of Shared Memory Compatible Types) +This example demonstrates how to send data when the maximum data size cannot +be predetermined and needs to be adjusted dynamically during the service's +runtime. iceoryx2 enables the reallocation of the publisher's data segment, +allowing users to send samples of arbitrary sizes. + ## Running The Example > [!CAUTION] @@ -18,13 +23,17 @@ This example demonstrates a robust publisher-subscriber communication pattern between two separate processes. A service with the payload type of an `u8` slice -is created, and every publisher can define the largest slice length they support -for communication with `max_slice_len`. The publisher sends a message every -second containing a piece of dynamic data. On the receiving end, the subscriber -checks for new data every second. +is created, and every publisher can define a slice length hint they support +for communication with `initial_max_slice_len`. The publisher sends a message with +increasing size every second containing a piece of dynamic data. On the receiving +end, the subscriber checks for new data every second. The subscriber is printing the sample on the console whenever new data arrives. +The `initial_max_slice_len` hint and the `AllocationStrategy` set by the +publisher will define how memory is reallocated when [`Publisher::loan_slice()`] +or [`Publisher::loan_slice_uninit()`] request more memory than it is available. + To observe this dynamic communication in action, open two separate terminals and execute the following commands: diff --git a/examples/rust/publish_subscribe_dynamic_data/publisher.rs b/examples/rust/publish_subscribe_dynamic_data/publisher.rs index 8baa001c1..916bbcdb3 100644 --- a/examples/rust/publish_subscribe_dynamic_data/publisher.rs +++ b/examples/rust/publish_subscribe_dynamic_data/publisher.rs @@ -23,16 +23,22 @@ fn main() -> Result<(), Box> { .publish_subscribe::<[u8]>() .open_or_create()?; - let maximum_elements = 1024; let publisher = service .publisher_builder() - .max_slice_len(maximum_elements) + // We guess that the samples are at most 16 bytes in size. + // This is just a hint to the underlying allocator and is purely optional + // The better the guess is the less reallocations will be performed + .initial_max_slice_len(16) + // The underlying sample size will be increased with a power of two strategy + // when [`Publisher::loan_slice()`] or [`Publisher::loan_slice_uninit()`] require more + // memory than available. + .allocation_strategy(AllocationStrategy::PowerOfTwo) .create()?; let mut counter = 0; while node.wait(CYCLE_TIME).is_ok() { - let required_memory_size = (counter % 16) + 1; + let required_memory_size = (counter + 1) * (counter + 1); let sample = publisher.loan_slice_uninit(required_memory_size)?; let sample = sample.write_from_fn(|byte_idx| ((byte_idx + counter) % 255) as u8); diff --git a/examples/rust/publish_subscribe_dynamic_data/subscriber.rs b/examples/rust/publish_subscribe_dynamic_data/subscriber.rs index fdc81b4da..734b34327 100644 --- a/examples/rust/publish_subscribe_dynamic_data/subscriber.rs +++ b/examples/rust/publish_subscribe_dynamic_data/subscriber.rs @@ -27,11 +27,7 @@ fn main() -> Result<(), Box> { while node.wait(CYCLE_TIME).is_ok() { while let Some(sample) = subscriber.receive()? { - print!("received {} bytes: ", sample.payload().len()); - for byte in sample.payload() { - print!("{:02x} ", byte); - } - println!(""); + println!("received {} bytes", sample.payload().len()); } } diff --git a/iceoryx2-bb/memory/src/pool_allocator.rs b/iceoryx2-bb/memory/src/pool_allocator.rs index c9f4c6e5b..0259512e0 100644 --- a/iceoryx2-bb/memory/src/pool_allocator.rs +++ b/iceoryx2-bb/memory/src/pool_allocator.rs @@ -100,6 +100,20 @@ impl PoolAllocator { self.bucket_alignment } + /// Releases an previously allocated bucket of memory. + /// + /// # Safety + /// + /// * `ptr` must be allocated previously with [`PoolAllocator::allocate()`] or + /// [`PoolAllocator::allocate_zeroed()`] + /// + pub unsafe fn deallocate_bucket(&self, ptr: NonNull) { + self.verify_init("deallocate"); + + self.buckets + .release_raw_index(self.get_index(ptr), ReleaseMode::Default); + } + /// # Safety /// /// * `ptr` must point to a piece of memory of length `size` @@ -204,10 +218,7 @@ impl BaseAllocator for PoolAllocator { } unsafe fn deallocate(&self, ptr: NonNull, _layout: Layout) { - self.verify_init("deallocate"); - - self.buckets - .release_raw_index(self.get_index(ptr), ReleaseMode::Default); + self.deallocate_bucket(ptr); } } diff --git a/iceoryx2-cal/src/resizable_shared_memory/dynamic.rs b/iceoryx2-cal/src/resizable_shared_memory/dynamic.rs index 2b2a228e3..dc9a28194 100644 --- a/iceoryx2-cal/src/resizable_shared_memory/dynamic.rs +++ b/iceoryx2-cal/src/resizable_shared_memory/dynamic.rs @@ -16,11 +16,14 @@ use std::sync::atomic::Ordering; use std::time::Duration; use std::{fmt::Debug, marker::PhantomData}; -use crate::shared_memory::{AllocationStrategy, SegmentId, ShmPointer}; +use crate::shared_memory::{ + AllocationStrategy, SegmentId, SharedMemoryForPoolAllocator, ShmPointer, +}; use crate::shared_memory::{ PointerOffset, SharedMemory, SharedMemoryBuilder, SharedMemoryCreateError, SharedMemoryOpenError, ShmAllocator, }; +use crate::shm_allocator::pool_allocator::PoolAllocator; use crate::shm_allocator::ShmAllocationError; use iceoryx2_bb_container::semantic_string::SemanticString; use iceoryx2_bb_container::slotmap::{SlotMap, SlotMapKey}; @@ -34,7 +37,8 @@ use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicU64, IoxAtomicUsize}; use super::{ NamedConcept, NamedConceptBuilder, NamedConceptDoesExistError, NamedConceptListError, NamedConceptMgmt, NamedConceptRemoveError, ResizableSharedMemory, ResizableSharedMemoryBuilder, - ResizableSharedMemoryView, ResizableSharedMemoryViewBuilder, ResizableShmAllocationError, + ResizableSharedMemoryForPoolAllocator, ResizableSharedMemoryView, + ResizableSharedMemoryViewBuilder, ResizableShmAllocationError, }; const MAX_NUMBER_OF_REALLOCATIONS: usize = SegmentId::max_segment_id() as usize + 1; @@ -298,6 +302,27 @@ pub struct DynamicView> { _data: PhantomData, } +impl> DynamicView +where + Shm::Builder: Debug, +{ + fn release_old_unused_segments( + shared_memory_map: &mut SlotMap>, + old_idx: usize, + ) { + if old_idx == INVALID_KEY { + return; + } + + let old_key = SlotMapKey::new(old_idx); + if let Some(shm) = shared_memory_map.get(old_key) { + if shm.chunk_count.load(Ordering::Relaxed) == 0 { + shared_memory_map.remove(old_key); + } + } + } +} + impl> ResizableSharedMemoryView for DynamicView where @@ -322,7 +347,11 @@ where let entry = ShmEntry::new(shm); entry.register_offset(); shared_memory_map.insert_at(key, entry); - self.current_idx.store(key.value(), Ordering::Relaxed); + Self::release_old_unused_segments( + shared_memory_map, + self.current_idx.swap(key.value(), Ordering::Relaxed), + ); + payload_start_address } Some(entry) => { @@ -341,7 +370,8 @@ where match shared_memory_map.get(key) { Some(entry) => { - if entry.unregister_offset() == ShmEntryState::Empty + let state = entry.unregister_offset(); + if state == ShmEntryState::Empty && self.current_idx.load(Ordering::Relaxed) != key.value() { shared_memory_map.remove(key); @@ -639,6 +669,47 @@ where fail!(from self, with e.into(), "{msg} due to {:?}.", e); } } + + unsafe fn perform_deallocation)>( + &self, + offset: PointerOffset, + mut deallocation_call: F, + ) { + let segment_id = SlotMapKey::new(offset.segment_id().value() as usize); + let state = self.state_mut(); + match state.shared_memory_map.get(segment_id) { + Some(entry) => { + deallocation_call(entry); + if entry.unregister_offset() == ShmEntryState::Empty + && segment_id != state.current_idx + { + state.shared_memory_map.remove(segment_id); + } + } + None => fatal_panic!(from self, + "This should never happen! Unable to deallocate {:?} since the corresponding shared memory segment is not available!", offset), + } + } +} + +impl ResizableSharedMemoryForPoolAllocator + for DynamicMemory +where + Shm::Builder: Debug, +{ + unsafe fn deallocate_bucket(&self, offset: PointerOffset) { + self.perform_deallocation(offset, |entry| entry.shm.deallocate_bucket(offset)); + } + + fn bucket_size(&self, segment_id: SegmentId) -> usize { + let segment_id_key = SlotMapKey::new(segment_id.value() as usize); + match self.state_mut().shared_memory_map.get(segment_id_key) { + Some(entry) => entry.shm.bucket_size(), + None => fatal_panic!(from self, + "This should never happen! Unable to acquire bucket size since the segment {:?} does not exist.", + segment_id), + } + } } impl> ResizableSharedMemory @@ -680,19 +751,6 @@ where } unsafe fn deallocate(&self, offset: PointerOffset, layout: Layout) { - let segment_id = SlotMapKey::new(offset.segment_id().value() as usize); - let state = self.state_mut(); - match state.shared_memory_map.get(segment_id) { - Some(entry) => { - entry.shm.deallocate(offset, layout); - if entry.unregister_offset() == ShmEntryState::Empty - && segment_id != state.current_idx - { - state.shared_memory_map.remove(segment_id); - } - } - None => fatal_panic!(from self, - "This should never happen! Unable to deallocate {:?} since the corresponding shared memory segment is not available!", offset), - } + self.perform_deallocation(offset, |entry| entry.shm.deallocate(offset, layout)); } } diff --git a/iceoryx2-cal/src/resizable_shared_memory/mod.rs b/iceoryx2-cal/src/resizable_shared_memory/mod.rs index 710ebb8ae..ea0fee75f 100644 --- a/iceoryx2-cal/src/resizable_shared_memory/mod.rs +++ b/iceoryx2-cal/src/resizable_shared_memory/mod.rs @@ -88,7 +88,7 @@ pub mod dynamic; -pub use crate::shm_allocator::AllocationStrategy; +pub use crate::shm_allocator::{pool_allocator::PoolAllocator, AllocationStrategy}; use std::alloc::Layout; use std::fmt::Debug; @@ -98,7 +98,7 @@ use iceoryx2_bb_elementary::enum_gen; use crate::named_concept::*; use crate::shared_memory::{ - SharedMemory, SharedMemoryCreateError, SharedMemoryOpenError, ShmPointer, + SegmentId, SharedMemory, SharedMemoryCreateError, SharedMemoryOpenError, ShmPointer, }; use crate::shm_allocator::{PointerOffset, ShmAllocationError, ShmAllocator}; @@ -124,7 +124,7 @@ pub trait ResizableSharedMemoryViewBuilder< Shm: SharedMemory, ResizableShm: ResizableSharedMemory, ResizableShmView: ResizableSharedMemoryView, ->: NamedConceptBuilder +>: NamedConceptBuilder + Debug { /// The timeout defines how long the /// [`SharedMemoryBuilder`](crate::shared_memory::SharedMemoryBuilder) should wait for @@ -146,7 +146,7 @@ pub trait ResizableSharedMemoryBuilder< Allocator: ShmAllocator, Shm: SharedMemory, ResizableShm: ResizableSharedMemory, ->: NamedConceptBuilder +>: NamedConceptBuilder + Debug { /// Provides an initial hint to the underlying [`ShmAllocator`] on how large the largest chunk /// will be. If the chunk exceeds the hinted [`Layout`] a new [`SharedMemory`] segment is @@ -167,7 +167,9 @@ pub trait ResizableSharedMemoryBuilder< } /// A read-only view to a [`ResizableSharedMemory`]. Can be created by arbitrary many processes. -pub trait ResizableSharedMemoryView> { +pub trait ResizableSharedMemoryView>: + Debug +{ /// Registers a received [`PointerOffset`] at the [`ResizableSharedMemoryView`] and returns the /// absolut pointer to the data. If the segment of the received [`PointerOffset`] was not yet /// mapped into the processes space, it will be opened and mapped. If this fails a @@ -238,3 +240,18 @@ pub trait ResizableSharedMemory>: + ResizableSharedMemory +{ + /// Release previously allocated memory + /// + /// # Safety + /// + /// * the offset must be acquired with [`SharedMemory::allocate()`] - extracted from the + /// [`ShmPointer`] + unsafe fn deallocate_bucket(&self, offset: PointerOffset); + + /// Returns the bucket size of the corresponding [`PoolAllocator`] + fn bucket_size(&self, segment_id: SegmentId) -> usize; +} diff --git a/iceoryx2-cal/src/shared_memory/common.rs b/iceoryx2-cal/src/shared_memory/common.rs index 959bbde5f..a8532989a 100644 --- a/iceoryx2-cal/src/shared_memory/common.rs +++ b/iceoryx2-cal/src/shared_memory/common.rs @@ -30,6 +30,7 @@ use crate::static_storage::file::{ #[doc(hidden)] pub mod details { use iceoryx2_bb_memory::bump_allocator::BumpAllocator; + use pool_allocator::PoolAllocator; use super::*; @@ -443,4 +444,20 @@ pub mod details { self.payload_start_address } } + + impl>> SharedMemoryForPoolAllocator + for Memory + { + unsafe fn deallocate_bucket(&self, offset: PointerOffset) { + self.storage + .get() + .allocator + .assume_init_ref() + .deallocate_bucket(offset); + } + + fn bucket_size(&self) -> usize { + unsafe { self.storage.get().allocator.assume_init_ref().bucket_size() } + } + } } diff --git a/iceoryx2-cal/src/shared_memory/mod.rs b/iceoryx2-cal/src/shared_memory/mod.rs index f5ec16ffd..112823ded 100644 --- a/iceoryx2-cal/src/shared_memory/mod.rs +++ b/iceoryx2-cal/src/shared_memory/mod.rs @@ -63,6 +63,7 @@ use std::{fmt::Debug, time::Duration}; pub use crate::shm_allocator::*; use crate::static_storage::file::{NamedConcept, NamedConceptBuilder, NamedConceptMgmt}; use iceoryx2_bb_system_types::file_name::*; +use pool_allocator::PoolAllocator; /// Failure returned by [`SharedMemoryBuilder::create()`] #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] @@ -184,3 +185,16 @@ pub trait SharedMemory: unsafe { FileName::new_unchecked(b".shm") } } } + +pub trait SharedMemoryForPoolAllocator: SharedMemory { + /// Release previously allocated memory + /// + /// # Safety + /// + /// * the offset must be acquired with [`SharedMemory::allocate()`] - extracted from the + /// [`ShmPointer`] + unsafe fn deallocate_bucket(&self, offset: PointerOffset); + + /// Returns the bucket size of the [`PoolAllocator`] + fn bucket_size(&self) -> usize; +} diff --git a/iceoryx2-cal/src/shm_allocator/pool_allocator.rs b/iceoryx2-cal/src/shm_allocator/pool_allocator.rs index 0f5b679b6..8511b9884 100644 --- a/iceoryx2-cal/src/shm_allocator/pool_allocator.rs +++ b/iceoryx2-cal/src/shm_allocator/pool_allocator.rs @@ -56,6 +56,16 @@ impl PoolAllocator { pub fn number_of_buckets(&self) -> u32 { self.allocator.number_of_buckets() } + + /// # Safety + /// + /// * provided [`PointerOffset`] must be allocated with [`PoolAllocator::allocate()`] + pub unsafe fn deallocate_bucket(&self, offset: PointerOffset) { + self.number_of_used_buckets.fetch_sub(1, Ordering::Relaxed); + self.allocator.deallocate_bucket(NonNull::new_unchecked( + (offset.offset() + self.allocator.start_address()) as *mut u8, + )); + } } impl ShmAllocator for PoolAllocator { @@ -199,11 +209,7 @@ impl ShmAllocator for PoolAllocator { )) } - unsafe fn deallocate(&self, offset: PointerOffset, layout: Layout) { - self.number_of_used_buckets.fetch_sub(1, Ordering::Relaxed); - self.allocator.deallocate( - NonNull::new_unchecked((offset.offset() + self.allocator.start_address()) as *mut u8), - layout, - ); + unsafe fn deallocate(&self, offset: PointerOffset, _layout: Layout) { + self.deallocate_bucket(offset); } } diff --git a/iceoryx2-cal/src/zero_copy_connection/common.rs b/iceoryx2-cal/src/zero_copy_connection/common.rs index 1cf607f98..825853f8c 100644 --- a/iceoryx2-cal/src/zero_copy_connection/common.rs +++ b/iceoryx2-cal/src/zero_copy_connection/common.rs @@ -12,7 +12,8 @@ #[doc(hidden)] pub mod details { - use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicU64, IoxAtomicU8}; + use iceoryx2_bb_elementary::allocator::{AllocationError, BaseAllocator}; + use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicU64, IoxAtomicU8, IoxAtomicUsize}; use std::cell::UnsafeCell; use std::fmt::Debug; use std::marker::PhantomData; @@ -150,14 +151,36 @@ pub mod details { } } + #[derive(Debug)] + struct SegmentDetails { + used_chunk_list: RelocatableUsedChunkList, + sample_size: IoxAtomicUsize, + } + + impl SegmentDetails { + fn new_uninit(number_of_samples: usize) -> Self { + Self { + used_chunk_list: unsafe { RelocatableUsedChunkList::new_uninit(number_of_samples) }, + sample_size: IoxAtomicUsize::new(0), + } + } + + const fn const_memory_size(number_of_samples: usize) -> usize { + RelocatableUsedChunkList::const_memory_size(number_of_samples) + } + + unsafe fn init(&mut self, allocator: &T) -> Result<(), AllocationError> { + self.used_chunk_list.init(allocator) + } + } + #[derive(Debug)] #[repr(C)] pub struct SharedManagementData { submission_channel: RelocatableSafelyOverflowingIndexQueue, completion_channel: RelocatableIndexQueue, - used_chunk_list: RelocatableVec, + segment_details: RelocatableVec, max_borrowed_samples: usize, - sample_size: usize, number_of_samples_per_segment: usize, number_of_segments: u8, state: IoxAtomicU8, @@ -171,7 +194,6 @@ pub mod details { completion_channel_buffer_capacity: usize, enable_safe_overflow: bool, max_borrowed_samples: usize, - sample_size: usize, number_of_samples_per_segment: usize, number_of_segments: u8, ) -> Self { @@ -184,11 +206,10 @@ pub mod details { completion_channel: unsafe { RelocatableIndexQueue::new_uninit(completion_channel_buffer_capacity) }, - used_chunk_list: unsafe { RelocatableVec::new_uninit(number_of_segments as usize) }, + segment_details: unsafe { RelocatableVec::new_uninit(number_of_segments as usize) }, state: IoxAtomicU8::new(State::None.value()), init_state: IoxAtomicU64::new(0), enable_safe_overflow, - sample_size, max_borrowed_samples, number_of_samples_per_segment, number_of_segments, @@ -206,9 +227,8 @@ pub mod details { + RelocatableSafelyOverflowingIndexQueue::const_memory_size( submission_channel_buffer_capacity, ) - + RelocatableUsedChunkList::const_memory_size(number_of_samples) - * number_of_segments - + RelocatableVec::::const_memory_size(number_of_segments) + + SegmentDetails::const_memory_size(number_of_samples) * number_of_segments + + RelocatableVec::::const_memory_size(number_of_segments) } } @@ -218,7 +238,6 @@ pub mod details { buffer_size: usize, enable_safe_overflow: bool, max_borrowed_samples: usize, - sample_size: usize, number_of_samples_per_segment: usize, number_of_segments: u8, timeout: Duration, @@ -254,20 +273,20 @@ pub mod details { "{} since the receive channel allocation failed. - This is an implementation bug!", msg); fatal_panic!(from self, when unsafe { data.completion_channel.init(allocator) }, "{} since the retrieve channel allocation failed. - This is an implementation bug!", msg); - fatal_panic!(from self, when unsafe { data.used_chunk_list.init(allocator) }, + fatal_panic!(from self, when unsafe { data.segment_details.init(allocator) }, "{} since the used chunk list vector allocation failed. - This is an implementation bug!", msg); for _ in 0..self.number_of_segments { if !unsafe { - data.used_chunk_list.push(RelocatableUsedChunkList::new_uninit(self.number_of_samples_per_segment)) + data.segment_details.push(SegmentDetails::new_uninit(self.number_of_samples_per_segment)) } { fatal_panic!(from self, "{} since the used chunk list could not be added. - This is an implementation bug!", msg); } } - for (n, used_chunk_list) in data.used_chunk_list.iter_mut().enumerate() { - fatal_panic!(from self, when unsafe { used_chunk_list.init(allocator) }, + for (n, details) in data.segment_details.iter_mut().enumerate() { + fatal_panic!(from self, when unsafe { details.init(allocator) }, "{} since the used chunk list for segment id {} failed to allocate memory. - This is an implementation bug!", msg, n); } @@ -280,7 +299,6 @@ pub mod details { self.completion_channel_size(), self.enable_safe_overflow, self.max_borrowed_samples, - self.sample_size, self.number_of_samples_per_segment, self.number_of_segments ) @@ -335,12 +353,6 @@ pub mod details { msg, storage.get().enable_safe_overflow, self.enable_safe_overflow); } - if storage.get().sample_size != self.sample_size { - fail!(from self, with ZeroCopyCreationError::IncompatibleSampleSize, - "{} since the requested sample size is set to {} but should be set to {}.", - msg, self.sample_size, storage.get().sample_size); - } - if storage.get().number_of_samples_per_segment != self.number_of_samples_per_segment { fail!(from self, with ZeroCopyCreationError::IncompatibleNumberOfSamples, @@ -353,6 +365,12 @@ pub mod details { "{} since the requested number of segments is set to {} but should be set to {}.", msg, self.number_of_segments, storage.get().number_of_segments); } + + if storage.get().number_of_segments != self.number_of_segments { + fail!(from self, with ZeroCopyCreationError::IncompatibleNumberOfSegments, + "{} since the requested number of segments is set to {} but should be set to {}.", + msg, self.number_of_segments, storage.get().number_of_segments); + } } Ok(storage) @@ -400,7 +418,6 @@ pub mod details { buffer_size: DEFAULT_BUFFER_SIZE, enable_safe_overflow: DEFAULT_ENABLE_SAFE_OVERFLOW, max_borrowed_samples: DEFAULT_MAX_BORROWED_SAMPLES, - sample_size: 0, number_of_samples_per_segment: 0, number_of_segments: DEFAULT_MAX_SUPPORTED_SHARED_MEMORY_SEGMENTS, config: Configuration::default(), @@ -448,12 +465,9 @@ pub mod details { } fn create_sender( - mut self, - sample_size: usize, + self, ) -> Result< as ZeroCopyConnection>::Sender, ZeroCopyCreationError> { - self.sample_size = sample_size; - let msg = "Unable to create sender"; let storage = fail!(from self, when self.create_or_open_shm(), "{} since the corresponding connection could not be created or opened", msg); @@ -467,12 +481,9 @@ pub mod details { } fn create_receiver( - mut self, - sample_size: usize, + self, ) -> Result< as ZeroCopyConnection>::Receiver, ZeroCopyCreationError> { - self.sample_size = sample_size; - let msg = "Unable to create receiver"; let storage = fail!(from self, when self.create_or_open_shm(), "{} since the corresponding connection could not be created or opened", msg); @@ -529,7 +540,11 @@ pub mod details { } impl> ZeroCopySender for Sender { - fn try_send(&self, ptr: PointerOffset) -> Result, ZeroCopySendError> { + fn try_send( + &self, + ptr: PointerOffset, + sample_size: usize, + ) -> Result, ZeroCopySendError> { let msg = "Unable to send sample"; let storage = self.storage.get(); @@ -539,22 +554,33 @@ pub mod details { } let segment_id = ptr.segment_id().value() as usize; - let sample_size = storage.sample_size; + let segment_details = &storage.segment_details[segment_id]; + segment_details + .sample_size + .store(sample_size, Ordering::Relaxed); debug_assert!(ptr.offset() % sample_size == 0); let index = ptr.offset() / sample_size; debug_assert!(segment_id < storage.number_of_segments as usize); - let did_not_send_same_offset_twice = storage.used_chunk_list[segment_id].insert(index); + let did_not_send_same_offset_twice = segment_details.used_chunk_list.insert(index); debug_assert!(did_not_send_same_offset_twice); match unsafe { storage.submission_channel.push(ptr.as_value()) } { Some(v) => { let pointer_offset = PointerOffset::from_value(v); - debug_assert!(pointer_offset.offset() % sample_size == 0); - if !storage.used_chunk_list[pointer_offset.segment_id().value() as usize] - .remove(pointer_offset.offset() / sample_size) - { + let segment_id = pointer_offset.segment_id().value() as usize; + + let segment_details = &storage.segment_details[segment_id]; + debug_assert!( + pointer_offset.offset() + % segment_details.sample_size.load(Ordering::Relaxed) + == 0 + ); + let index = pointer_offset.offset() + / segment_details.sample_size.load(Ordering::Relaxed); + + if !segment_details.used_chunk_list.remove(index) { fail!(from self, with ZeroCopySendError::ConnectionCorrupted, "{} since the invalid offset {:?} was returned on overflow.", msg, pointer_offset); } @@ -568,6 +594,7 @@ pub mod details { fn blocking_send( &self, ptr: PointerOffset, + sample_size: usize, ) -> Result, ZeroCopySendError> { if !self.storage.get().enable_safe_overflow { AdaptiveWaitBuilder::new() @@ -577,7 +604,7 @@ pub mod details { .unwrap(); } - self.try_send(ptr) + self.try_send(ptr, sample_size) } fn reclaim(&self) -> Result, ZeroCopyReclaimError> { @@ -592,16 +619,22 @@ pub mod details { debug_assert!(segment_id < storage.number_of_segments as usize); - if segment_id >= storage.used_chunk_list.len() { + if segment_id >= storage.segment_details.len() { fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedPointerOffset, "{} since the receiver returned a non-existing segment id {:?}.", msg, pointer_offset); } - debug_assert!(pointer_offset.offset() % storage.sample_size == 0); - if !storage.used_chunk_list[segment_id] - .remove(pointer_offset.offset() / storage.sample_size) - { + let segment_details = &storage.segment_details[segment_id]; + debug_assert!( + pointer_offset.offset() + % segment_details.sample_size.load(Ordering::Relaxed) + == 0 + ); + let index = pointer_offset.offset() + / segment_details.sample_size.load(Ordering::Relaxed); + + if !segment_details.used_chunk_list.remove(index) { fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedPointerOffset, "{} since the receiver returned a corrupted offset {:?}.", msg, pointer_offset); @@ -612,11 +645,10 @@ pub mod details { } unsafe fn acquire_used_offsets(&self, mut callback: F) { - let sample_size = self.storage.get().sample_size; - for (n, used_chunk_list) in self.storage.get().used_chunk_list.iter().enumerate() { - used_chunk_list.remove_all(|index| { + for (n, segment_details) in self.storage.get().segment_details.iter().enumerate() { + segment_details.used_chunk_list.remove_all(|index| { callback(PointerOffset::from_offset_and_segment_id( - index * sample_size, + index * segment_details.sample_size.load(Ordering::Relaxed), SegmentId::new(n as u8), )) }); diff --git a/iceoryx2-cal/src/zero_copy_connection/mod.rs b/iceoryx2-cal/src/zero_copy_connection/mod.rs index d3900b23b..5b3bcaf78 100644 --- a/iceoryx2-cal/src/zero_copy_connection/mod.rs +++ b/iceoryx2-cal/src/zero_copy_connection/mod.rs @@ -35,7 +35,6 @@ pub enum ZeroCopyCreationError { IncompatibleBufferSize, IncompatibleMaxBorrowedSampleSetting, IncompatibleOverflowSetting, - IncompatibleSampleSize, IncompatibleNumberOfSamples, IncompatibleNumberOfSegments, } @@ -120,8 +119,8 @@ pub trait ZeroCopyConnectionBuilder: NamedConceptBuilder< /// By default it is set to [`Duration::ZERO`] for no timeout. fn timeout(self, value: Duration) -> Self; - fn create_sender(self, sample_size: usize) -> Result; - fn create_receiver(self, sample_size: usize) -> Result; + fn create_sender(self) -> Result; + fn create_receiver(self) -> Result; } pub trait ZeroCopyPortDetails { @@ -133,10 +132,17 @@ pub trait ZeroCopyPortDetails { } pub trait ZeroCopySender: Debug + ZeroCopyPortDetails + NamedConcept { - fn try_send(&self, ptr: PointerOffset) -> Result, ZeroCopySendError>; - - fn blocking_send(&self, ptr: PointerOffset) - -> Result, ZeroCopySendError>; + fn try_send( + &self, + ptr: PointerOffset, + sample_size: usize, + ) -> Result, ZeroCopySendError>; + + fn blocking_send( + &self, + ptr: PointerOffset, + sample_size: usize, + ) -> Result, ZeroCopySendError>; fn reclaim(&self) -> Result, ZeroCopyReclaimError>; diff --git a/iceoryx2-cal/tests/resizable_shared_memory_tests.rs b/iceoryx2-cal/tests/resizable_shared_memory_tests.rs index 0a1c3ce37..d804ec9aa 100644 --- a/iceoryx2-cal/tests/resizable_shared_memory_tests.rs +++ b/iceoryx2-cal/tests/resizable_shared_memory_tests.rs @@ -946,6 +946,44 @@ mod resizable_shared_memory { assert_that!(sut_viewer.number_of_active_segments(), eq 1); } + #[test] + fn register_segment_that_was_resized_on_the_first_allocation_leads_to_a_unmap_of_the_old_segment_on_viewer_side< + Shm: SharedMemory, + Sut: ResizableSharedMemory, + >() { + let config = generate_isolated_config::(); + let storage_name = generate_name(); + + let sut = Sut::MemoryBuilder::new(&storage_name) + .config(&config) + .max_chunk_layout_hint(Layout::new::()) + .max_number_of_chunks_hint(1) + .allocation_strategy(AllocationStrategy::BestFit) + .create() + .unwrap(); + + let sut_viewer = Sut::ViewBuilder::new(&storage_name) + .config(&config) + .open() + .unwrap(); + + let chunk_small = sut.allocate(Layout::new::()).unwrap().offset; + unsafe { + sut_viewer + .register_and_translate_offset(chunk_small) + .unwrap() + }; + unsafe { sut_viewer.unregister_offset(chunk_small) }; + + let chunk = sut.allocate(Layout::new::()).unwrap().offset; + + // this shall release the old `u8` segment and map the new `u64` segment, therefore + // leading to 1 active segment + unsafe { sut_viewer.register_and_translate_offset(chunk).unwrap() }; + + assert_that!(sut_viewer.number_of_active_segments(), eq 1); + } + #[instantiate_tests(, resizable_shared_memory::dynamic::DynamicMemory>>)] mod posix {} diff --git a/iceoryx2-cal/tests/zero_copy_connection_posix_shared_memory_tests.rs b/iceoryx2-cal/tests/zero_copy_connection_posix_shared_memory_tests.rs index 63bdcffda..b06db7f35 100644 --- a/iceoryx2-cal/tests/zero_copy_connection_posix_shared_memory_tests.rs +++ b/iceoryx2-cal/tests/zero_copy_connection_posix_shared_memory_tests.rs @@ -51,7 +51,7 @@ mod zero_copy_connection_posix_shared_memory_tests { .timeout(TIMEOUT) .number_of_samples_per_segment(1) .receiver_max_borrowed_samples(1) - .create_sender(1); + .create_sender(); assert_that!(sut, is_err); assert_that!(sut.err().unwrap(), eq ZeroCopyCreationError::InitializationNotYetFinalized); diff --git a/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs b/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs index 0413acd50..ad5ad3fc2 100644 --- a/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs +++ b/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs @@ -20,6 +20,7 @@ mod zero_copy_connection { use iceoryx2_bb_posix::barrier::*; use iceoryx2_bb_system_types::file_name::FileName; use iceoryx2_bb_testing::assert_that; + use iceoryx2_bb_testing::watchdog::Watchdog; use iceoryx2_cal::named_concept::*; use iceoryx2_cal::named_concept::{NamedConceptBuilder, NamedConceptMgmt}; use iceoryx2_cal::shm_allocator::{PointerOffset, SegmentId}; @@ -40,14 +41,14 @@ mod zero_copy_connection { Sut::Builder::new(&name) .config(&config) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) - .create_receiver(SAMPLE_SIZE), + .create_receiver(), is_ok ); assert_that!( Sut::Builder::new(&name) .config(&config) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) - .create_sender(SAMPLE_SIZE), + .create_sender(), is_ok ); } @@ -60,14 +61,14 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); assert_that!(!sut_sender.is_connected(), eq true); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); assert_that!(sut_receiver.is_connected(), eq true); assert_that!(sut_sender.is_connected(), eq true); @@ -84,7 +85,7 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); assert_that!(sut_sender.buffer_size(), eq DEFAULT_BUFFER_SIZE); assert_that!( @@ -99,7 +100,7 @@ mod zero_copy_connection { let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); assert_that!(sut_receiver.buffer_size(), eq DEFAULT_BUFFER_SIZE); assert_that!( @@ -120,18 +121,18 @@ mod zero_copy_connection { let _sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let _sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE); + .create_sender(); assert_that!(sut_sender, is_err); assert_that!( sut_sender.err().unwrap(), eq @@ -141,7 +142,7 @@ mod zero_copy_connection { let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE); + .create_receiver(); assert_that!(sut_receiver, is_err); assert_that!( sut_receiver.err().unwrap(), eq @@ -157,19 +158,19 @@ mod zero_copy_connection { let _sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let _sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); drop(_sut_sender); let sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE); + .create_sender(); assert_that!(sut_sender, is_ok); } @@ -181,19 +182,19 @@ mod zero_copy_connection { let _sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let _sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); drop(_sut_receiver); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE); + .create_receiver(); assert_that!(sut_receiver, is_ok); } @@ -205,12 +206,12 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); drop(sut_sender); @@ -228,14 +229,14 @@ mod zero_copy_connection { .buffer_size(12) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .buffer_size(16) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE); + .create_receiver(); assert_that!(sut_receiver, is_err); } @@ -249,14 +250,14 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(2) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .receiver_max_borrowed_samples(4) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE); + .create_receiver(); assert_that!(sut_receiver, is_err); } @@ -270,13 +271,13 @@ mod zero_copy_connection { .enable_safe_overflow(true) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE); + .create_receiver(); assert_that!(sut_receiver, is_err); assert_that!( @@ -285,29 +286,6 @@ mod zero_copy_connection { ); } - #[test] - fn connecting_with_incompatible_sample_size_fails() { - let name = generate_name(); - let config = generate_isolated_config::(); - - let _sut_sender = Sut::Builder::new(&name) - .number_of_samples_per_segment(NUMBER_OF_SAMPLES) - .config(&config) - .create_sender(SAMPLE_SIZE) - .unwrap(); - - let sut_receiver = Sut::Builder::new(&name) - .number_of_samples_per_segment(NUMBER_OF_SAMPLES) - .config(&config) - .create_receiver(2 * SAMPLE_SIZE); - - assert_that!(sut_receiver, is_err); - assert_that!( - sut_receiver.err().unwrap(), eq - ZeroCopyCreationError::IncompatibleSampleSize - ); - } - #[test] fn connecting_with_incompatible_number_of_samples_fails() { let name = generate_name(); @@ -316,13 +294,13 @@ mod zero_copy_connection { let _sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES * 2) .config(&config) - .create_receiver(SAMPLE_SIZE); + .create_receiver(); assert_that!(sut_receiver, is_err); } @@ -335,17 +313,17 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let sample_offset = SAMPLE_SIZE * 2; assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); let sample = sut_receiver.receive().unwrap(); @@ -369,18 +347,18 @@ mod zero_copy_connection { let sut_sender = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let sample_offset = SAMPLE_SIZE * 2; assert_that!(sut_receiver.has_data(), eq false); assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); @@ -397,18 +375,18 @@ mod zero_copy_connection { .buffer_size(BUFFER_SIZE) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); for i in 0..BUFFER_SIZE { let sample_offset = SAMPLE_SIZE * i; assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } - let result = sut_sender.try_send(PointerOffset::new(9)); + let result = sut_sender.try_send(PointerOffset::new(9), SAMPLE_SIZE); assert_that!(result, is_err); assert_that!(result.err().unwrap(), eq ZeroCopySendError::ReceiveBufferFull); } @@ -424,13 +402,13 @@ mod zero_copy_connection { .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); for i in 0..BUFFER_SIZE { let sample_offset = SAMPLE_SIZE * i; assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } @@ -438,7 +416,7 @@ mod zero_copy_connection { for i in 0..BUFFER_SIZE { let overflow_sample_offset = SAMPLE_SIZE * i; let sample_offset = SAMPLE_SIZE * (BUFFER_SIZE + i); - let result = sut_sender.try_send(PointerOffset::new(sample_offset)); + let result = sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE); assert_that!(result, is_ok); assert_that!(result.ok().unwrap().unwrap().offset(), eq overflow_sample_offset); } @@ -455,13 +433,13 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); for i in 0..BUFFER_SIZE { let sample_offset = SAMPLE_SIZE * i; assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } @@ -471,7 +449,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); for i in 0..BUFFER_SIZE { let sample = receiver.receive(); @@ -489,7 +467,7 @@ mod zero_copy_connection { let receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let sample = receiver.receive().unwrap(); @@ -509,7 +487,7 @@ mod zero_copy_connection { .enable_safe_overflow(true) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .buffer_size(BUFFER_SIZE) @@ -517,7 +495,7 @@ mod zero_copy_connection { .enable_safe_overflow(true) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let mut sample_offset = SAMPLE_SIZE; @@ -525,7 +503,7 @@ mod zero_copy_connection { for _ in 0..BUFFER_SIZE { sample_offset += SAMPLE_SIZE; assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } @@ -553,6 +531,7 @@ mod zero_copy_connection { #[test] fn blocking_send_blocks() { + let _watchdog = Watchdog::new(); let name = generate_name(); let config = Mutex::new(generate_isolated_config::()); @@ -560,7 +539,7 @@ mod zero_copy_connection { .buffer_size(1) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config.lock().unwrap()) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let handle = BarrierHandle::new(); @@ -575,7 +554,7 @@ mod zero_copy_connection { .buffer_size(1) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config.lock().unwrap()) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let receive_sample = || loop { @@ -598,11 +577,11 @@ mod zero_copy_connection { let now = Instant::now(); assert_that!( - sut_sender.blocking_send(PointerOffset::new(sample_offset_1)), + sut_sender.blocking_send(PointerOffset::new(sample_offset_1), SAMPLE_SIZE), is_ok ); assert_that!( - sut_sender.blocking_send(PointerOffset::new(sample_offset_2)), + sut_sender.blocking_send(PointerOffset::new(sample_offset_2), SAMPLE_SIZE), is_ok ); assert_that!(now.elapsed(), time_at_least TIMEOUT); @@ -619,13 +598,13 @@ mod zero_copy_connection { .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let _sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .buffer_size(BUFFER_SIZE) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let mut offsets = HashSet::new(); @@ -634,7 +613,7 @@ mod zero_copy_connection { let sample_offset = SAMPLE_SIZE * i; offsets.insert(sample_offset); assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } @@ -660,7 +639,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let _sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) @@ -668,13 +647,13 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); for i in 0..BUFFER_SIZE { let sample_offset = SAMPLE_SIZE * i; assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } @@ -684,7 +663,7 @@ mod zero_copy_connection { let sample_offset = SAMPLE_SIZE * (i + BUFFER_SIZE); offsets.insert(sample_offset); assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } @@ -710,7 +689,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) @@ -718,13 +697,13 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); for i in 0..BUFFER_SIZE { let sample_offset = SAMPLE_SIZE * i; assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } @@ -755,7 +734,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) @@ -763,7 +742,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let mut offsets = HashSet::new(); @@ -771,7 +750,7 @@ mod zero_copy_connection { let sample_offset = SAMPLE_SIZE * (i + BUFFER_SIZE); offsets.insert(sample_offset); assert_that!( - sut_sender.try_send(PointerOffset::new(sample_offset)), + sut_sender.try_send(PointerOffset::new(sample_offset), SAMPLE_SIZE), is_ok ); } @@ -805,7 +784,7 @@ mod zero_copy_connection { Sut::Builder::new(&sut_names[i]) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(), ); assert_that!(::does_exist_cfg(&sut_names[i], &config), eq Ok(true)); @@ -854,7 +833,7 @@ mod zero_copy_connection { let sut_1 = Sut::Builder::new(&sut_name) .config(&config_1) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); assert_that!(::does_exist_cfg(&sut_name, &config_1), eq Ok(true)); @@ -865,7 +844,7 @@ mod zero_copy_connection { let sut_2 = Sut::Builder::new(&sut_name) .config(&config_2) .number_of_samples_per_segment(NUMBER_OF_SAMPLES) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); assert_that!(::does_exist_cfg(&sut_name, &config_1), eq Ok(true)); @@ -910,7 +889,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let create_receiver = |number_of_segments: u8| { @@ -921,7 +900,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() }; let sut_receiver = create_receiver(NUMBER_OF_SEGMENTS - 1); @@ -950,15 +929,18 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); // shall panic sut_sender - .try_send(PointerOffset::from_offset_and_segment_id( - 0, - SegmentId::new(NUMBER_OF_SEGMENTS + 1), - )) + .try_send( + PointerOffset::from_offset_and_segment_id( + 0, + SegmentId::new(NUMBER_OF_SEGMENTS + 1), + ), + SAMPLE_SIZE, + ) .unwrap(); } @@ -978,7 +960,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); // shall panic @@ -1005,7 +987,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) @@ -1015,7 +997,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); sut_receiver @@ -1043,7 +1025,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); assert_that!(sut_sender.max_supported_shared_memory_segments(), eq 1); @@ -1065,7 +1047,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) @@ -1075,16 +1057,19 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); for offset in 0..2 { for n in 0..BUFFER_SIZE { sut_sender - .try_send(PointerOffset::from_offset_and_segment_id( - offset * SAMPLE_SIZE, - SegmentId::new(n as u8), - )) + .try_send( + PointerOffset::from_offset_and_segment_id( + offset * SAMPLE_SIZE, + SegmentId::new(n as u8), + ), + SAMPLE_SIZE, + ) .unwrap(); } } @@ -1118,7 +1103,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) @@ -1128,16 +1113,19 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); for k in 0..2 { for n in 0..BUFFER_SIZE { sut_sender - .try_send(PointerOffset::from_offset_and_segment_id( - k * SAMPLE_SIZE, - SegmentId::new(n as u8), - )) + .try_send( + PointerOffset::from_offset_and_segment_id( + k * SAMPLE_SIZE, + SegmentId::new(n as u8), + ), + SAMPLE_SIZE, + ) .unwrap(); } } @@ -1168,7 +1156,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let sut_receiver = Sut::Builder::new(&name) @@ -1178,16 +1166,19 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); for k in 0..2 { for n in 0..BUFFER_SIZE { sut_sender - .try_send(PointerOffset::from_offset_and_segment_id( - k * SAMPLE_SIZE, - SegmentId::new(n as u8), - )) + .try_send( + PointerOffset::from_offset_and_segment_id( + k * SAMPLE_SIZE, + SegmentId::new(n as u8), + ), + SAMPLE_SIZE, + ) .unwrap(); } } @@ -1224,7 +1215,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let _sut_receiver = Sut::Builder::new(&name) @@ -1234,7 +1225,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let mut offsets = vec![]; @@ -1244,7 +1235,7 @@ mod zero_copy_connection { k * SAMPLE_SIZE, SegmentId::new(n as u8), ); - sut_sender.try_send(offset).unwrap(); + sut_sender.try_send(offset, SAMPLE_SIZE).unwrap(); offsets.push(offset); } } @@ -1272,7 +1263,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let _sut_receiver = Sut::Builder::new(&name) @@ -1282,16 +1273,16 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(BUFFER_SIZE) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let offset = PointerOffset::from_offset_and_segment_id(SAMPLE_SIZE, SegmentId::new(1 as u8)); - assert_that!(sut_sender.try_send(offset), is_ok); + assert_that!(sut_sender.try_send(offset, SAMPLE_SIZE), is_ok); // panics here - sut_sender.try_send(offset).unwrap(); + sut_sender.try_send(offset, SAMPLE_SIZE).unwrap(); } #[test] @@ -1307,7 +1298,7 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(1) .enable_safe_overflow(true) .config(&config) - .create_sender(SAMPLE_SIZE) + .create_sender() .unwrap(); let _sut_receiver = Sut::Builder::new(&name) @@ -1317,18 +1308,18 @@ mod zero_copy_connection { .receiver_max_borrowed_samples(1) .enable_safe_overflow(true) .config(&config) - .create_receiver(SAMPLE_SIZE) + .create_receiver() .unwrap(); let overflow_sample = PointerOffset::from_offset_and_segment_id(11 * SAMPLE_SIZE, SegmentId::new(73 as u8)); - sut_sender.try_send(overflow_sample).unwrap(); + sut_sender.try_send(overflow_sample, SAMPLE_SIZE).unwrap(); let returned_sample = sut_sender - .try_send(PointerOffset::from_offset_and_segment_id( + .try_send( + PointerOffset::from_offset_and_segment_id(SAMPLE_SIZE, SegmentId::new(1 as u8)), SAMPLE_SIZE, - SegmentId::new(1 as u8), - )) + ) .unwrap(); assert_that!(returned_sample, eq Some(overflow_sample)); diff --git a/iceoryx2-ffi/cxx/include/iox2/allocation_strategy.hpp b/iceoryx2-ffi/cxx/include/iox2/allocation_strategy.hpp new file mode 100644 index 000000000..f2ffef981 --- /dev/null +++ b/iceoryx2-ffi/cxx/include/iox2/allocation_strategy.hpp @@ -0,0 +1,33 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_ALLOCATION_STRATEGY_HPP +#define IOX2_ALLOCATION_STRATEGY_HPP + +#include + +namespace iox2 { +/// Describes generically an [`AllocationStrategy`], meaning how the memory is increased when the +/// available memory is insufficient. +enum class AllocationStrategy : uint8_t { + /// Increases the memory so that it perfectly fits the new size requirements. This may lead + /// to a lot of reallocations but has the benefit that no byte is wasted. + BestFit, + /// Increases the memory by rounding the increased memory size up to the next power of two. + /// Reduces reallocations a lot at the cost of increased memory usage. + PowerOfTwo, + /// The memory is not increased. This may lead to an out-of-memory error when allocating. + Static +}; +} // namespace iox2 + +#endif diff --git a/iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp b/iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp index ae274c188..a002182a4 100644 --- a/iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp @@ -15,6 +15,7 @@ #include "iox/assertions.hpp" #include "iox/into.hpp" +#include "iox2/allocation_strategy.hpp" #include "iox2/callback_progression.hpp" #include "iox2/config_creation_error.hpp" #include "iox2/connection_failure.hpp" @@ -1459,6 +1460,21 @@ constexpr auto from(const int value) noexcept -> IOX_UNREACHABLE(); } + +template <> +constexpr auto from(const iox2::AllocationStrategy value) noexcept + -> iox2_allocation_strategy_e { + switch (value) { + case iox2::AllocationStrategy::BestFit: + return iox2_allocation_strategy_e_BEST_FIT; + case iox2::AllocationStrategy::PowerOfTwo: + return iox2_allocation_strategy_e_POWER_OF_TWO; + case iox2::AllocationStrategy::Static: + return iox2_allocation_strategy_e_STATIC; + } + + IOX_UNREACHABLE(); +} } // namespace iox #endif diff --git a/iceoryx2-ffi/cxx/include/iox2/port_factory_publisher.hpp b/iceoryx2-ffi/cxx/include/iox2/port_factory_publisher.hpp index 587fcad4a..95e557f35 100644 --- a/iceoryx2-ffi/cxx/include/iox2/port_factory_publisher.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/port_factory_publisher.hpp @@ -13,11 +13,10 @@ #ifndef IOX2_PORTFACTORY_PUBLISHER_HPP #define IOX2_PORTFACTORY_PUBLISHER_HPP -#include "iox/assertions_addendum.hpp" #include "iox/builder_addendum.hpp" #include "iox/expected.hpp" +#include "iox2/allocation_strategy.hpp" #include "iox2/internal/iceoryx2.hpp" -#include "iox2/payload_info.hpp" #include "iox2/publisher.hpp" #include "iox2/service_type.hpp" #include "iox2/unable_to_deliver_strategy.hpp" @@ -46,7 +45,14 @@ class PortFactoryPublisher { /// Sets the maximum slice length that a user can allocate with /// [`Publisher::loan_slice()`] or [`Publisher::loan_slice_uninit()`]. template ::VALUE, void>> - auto max_slice_len(uint64_t value) && -> PortFactoryPublisher&&; + auto initial_max_slice_len(uint64_t value) && -> PortFactoryPublisher&&; + + /// Defines the allocation strategy that is used when the provided + /// [`PortFactoryPublisher::initial_max_slice_len()`] is exhausted. This happens when the user + /// acquires a more than max slice len in [`Publisher::loan_slice()`] or + /// [`Publisher::loan_slice_uninit()`]. + template ::VALUE, void>> + auto allocation_strategy(AllocationStrategy value) && -> PortFactoryPublisher&&; /// Creates a new [`Publisher`] or returns a [`PublisherCreateError`] on failure. auto create() && -> iox::expected, PublisherCreateError>; @@ -59,6 +65,7 @@ class PortFactoryPublisher { iox2_port_factory_publisher_builder_h m_handle; iox::optional m_max_slice_len; + iox::optional m_allocation_strategy; }; template @@ -68,11 +75,20 @@ inline PortFactoryPublisher::PortFactoryPublisher(iox2_p template template -inline auto PortFactoryPublisher::max_slice_len(uint64_t value) && -> PortFactoryPublisher&& { +inline auto +PortFactoryPublisher::initial_max_slice_len(uint64_t value) && -> PortFactoryPublisher&& { m_max_slice_len.emplace(value); return std::move(*this); } +template +template +inline auto PortFactoryPublisher::allocation_strategy( + AllocationStrategy value) && -> PortFactoryPublisher&& { + m_allocation_strategy.emplace(value); + return std::move(*this); +} + template inline auto PortFactoryPublisher::create() && -> iox::expected, @@ -82,10 +98,14 @@ PortFactoryPublisher::create() && -> iox::expected(iox::into(value))); }); m_max_slice_len - .and_then([&](auto value) { iox2_port_factory_publisher_builder_set_max_slice_len(&m_handle, value); }) - .or_else([&]() { iox2_port_factory_publisher_builder_set_max_slice_len(&m_handle, 1); }); + .and_then([&](auto value) { iox2_port_factory_publisher_builder_set_initial_max_slice_len(&m_handle, value); }) + .or_else([&]() { iox2_port_factory_publisher_builder_set_initial_max_slice_len(&m_handle, 1); }); m_max_loaned_samples.and_then( [&](auto value) { iox2_port_factory_publisher_builder_set_max_loaned_samples(&m_handle, value); }); + m_allocation_strategy.and_then([&](auto value) { + iox2_port_factory_publisher_builder_set_allocation_strategy(&m_handle, + iox::into(value)); + }); iox2_publisher_h pub_handle {}; diff --git a/iceoryx2-ffi/cxx/include/iox2/publisher.hpp b/iceoryx2-ffi/cxx/include/iox2/publisher.hpp index 3ac562139..f75414577 100644 --- a/iceoryx2-ffi/cxx/include/iox2/publisher.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/publisher.hpp @@ -51,7 +51,7 @@ class Publisher { /// Returns the maximum number of elements that can be loaned in a slice. template ::VALUE, void>> - auto max_slice_len() const -> uint64_t; + auto initial_max_slice_len() const -> uint64_t; /// Copies the input `value` into a [`SampleMut`] and delivers it. /// On success it returns the number of [`Subscriber`]s that received @@ -154,8 +154,8 @@ inline auto Publisher::unable_to_deliver_strategy() cons template template -inline auto Publisher::max_slice_len() const -> uint64_t { - return iox2_publisher_max_slice_len(&m_handle); +inline auto Publisher::initial_max_slice_len() const -> uint64_t { + return iox2_publisher_initial_max_slice_len(&m_handle); } template diff --git a/iceoryx2-ffi/cxx/tests/src/service_publish_subscribe_tests.cpp b/iceoryx2-ffi/cxx/tests/src/service_publish_subscribe_tests.cpp index 5d69ef46b..7ea2097dd 100644 --- a/iceoryx2-ffi/cxx/tests/src/service_publish_subscribe_tests.cpp +++ b/iceoryx2-ffi/cxx/tests/src/service_publish_subscribe_tests.cpp @@ -239,7 +239,7 @@ TYPED_TEST(ServicePublishSubscribeTest, slice_copy_send_receive_works) { auto service = node.service_builder(service_name).template publish_subscribe>().create().expect(""); - auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect(""); + auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect(""); auto sut_subscriber = service.subscriber_builder().create().expect(""); iox::UninitializedArray elements; @@ -279,7 +279,7 @@ TYPED_TEST(ServicePublishSubscribeTest, loan_slice_send_receive_works) { .create() .expect(""); - auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect(""); + auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect(""); auto sut_subscriber = service.subscriber_builder().create().expect(""); auto send_sample = sut_publisher.loan_slice(SLICE_MAX_LENGTH).expect(""); @@ -316,7 +316,7 @@ TYPED_TEST(ServicePublishSubscribeTest, loan_slice_uninit_send_receive_works) { .create() .expect(""); - auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect(""); + auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect(""); auto sut_subscriber = service.subscriber_builder().create().expect(""); auto send_sample = sut_publisher.loan_slice_uninit(SLICE_MAX_LENGTH).expect(""); @@ -358,7 +358,7 @@ TYPED_TEST(ServicePublishSubscribeTest, loan_slice_uninit_with_bytes_send_receiv .create() .expect(""); - auto sut_publisher = service.publisher_builder().max_slice_len(sizeof(DummyData)).create().expect(""); + auto sut_publisher = service.publisher_builder().initial_max_slice_len(sizeof(DummyData)).create().expect(""); auto sut_subscriber = service.subscriber_builder().create().expect(""); auto send_sample = sut_publisher.loan_slice_uninit(sizeof(DummyData)).expect(""); @@ -388,7 +388,7 @@ TYPED_TEST(ServicePublishSubscribeTest, write_from_fn_send_receive_works) { auto service = node.service_builder(service_name).template publish_subscribe>().create().expect(""); - auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect(""); + auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect(""); auto sut_subscriber = service.subscriber_builder().create().expect(""); auto send_sample = sut_publisher.loan_slice_uninit(SLICE_MAX_LENGTH).expect(""); @@ -422,7 +422,7 @@ TYPED_TEST(ServicePublishSubscribeTest, write_from_slice_send_receive_works) { auto service = node.service_builder(service_name).template publish_subscribe>().create().expect(""); - auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect(""); + auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect(""); auto sut_subscriber = service.subscriber_builder().create().expect(""); iox::UninitializedArray elements; @@ -606,9 +606,9 @@ TYPED_TEST(ServicePublishSubscribeTest, publisher_applies_max_slice_len) { auto service = node.service_builder(service_name).template publish_subscribe>().create().expect(""); - auto sut = service.publisher_builder().max_slice_len(DESIRED_MAX_SLICE_LEN).create().expect(""); + auto sut = service.publisher_builder().initial_max_slice_len(DESIRED_MAX_SLICE_LEN).create().expect(""); - ASSERT_THAT(sut.max_slice_len(), Eq(DESIRED_MAX_SLICE_LEN)); + ASSERT_THAT(sut.initial_max_slice_len(), Eq(DESIRED_MAX_SLICE_LEN)); } TYPED_TEST(ServicePublishSubscribeTest, send_receive_with_user_header_works) { @@ -758,4 +758,63 @@ TYPED_TEST(ServicePublishSubscribeTest, service_can_be_opened_when_there_is_a_su } } +TYPED_TEST(ServicePublishSubscribeTest, publisher_reallocates_memory_when_allocation_strategy_is_set) { + constexpr ServiceType SERVICE_TYPE = TestFixture::TYPE; + using ValueType = uint8_t; + constexpr uint64_t INITIAL_SIZE = 128; + + const auto service_name = iox2_testing::generate_service_name(); + auto node = NodeBuilder().create().expect(""); + auto service = + node.service_builder(service_name).template publish_subscribe>().create().expect(""); + + auto publisher = service.publisher_builder() + .initial_max_slice_len(INITIAL_SIZE) + .allocation_strategy(AllocationStrategy::BestFit) + .create() + .expect(""); + + { + auto sample = publisher.loan_slice(INITIAL_SIZE); + ASSERT_THAT(sample.has_value(), Eq(true)); + } + + { + auto sample = publisher.loan_slice(INITIAL_SIZE * INITIAL_SIZE); + ASSERT_THAT(sample.has_value(), Eq(true)); + } + + { + auto sample = publisher.loan_slice(INITIAL_SIZE * INITIAL_SIZE * INITIAL_SIZE); + ASSERT_THAT(sample.has_value(), Eq(true)); + } +} + +TYPED_TEST(ServicePublishSubscribeTest, publisher_does_not_reallocate_when_allocation_strategy_is_static) { + constexpr ServiceType SERVICE_TYPE = TestFixture::TYPE; + using ValueType = uint8_t; + constexpr uint64_t INITIAL_SIZE = 128; + + const auto service_name = iox2_testing::generate_service_name(); + auto node = NodeBuilder().create().expect(""); + auto service = + node.service_builder(service_name).template publish_subscribe>().create().expect(""); + + auto publisher = service.publisher_builder() + .initial_max_slice_len(INITIAL_SIZE) + .allocation_strategy(AllocationStrategy::Static) + .create() + .expect(""); + + auto sample_1 = publisher.loan_slice(INITIAL_SIZE); + ASSERT_THAT(sample_1.has_value(), Eq(true)); + + auto sample_2 = publisher.loan_slice(INITIAL_SIZE * INITIAL_SIZE); + ASSERT_THAT(sample_2.has_value(), Eq(false)); + ASSERT_THAT(sample_2.error(), Eq(PublisherLoanError::ExceedsMaxLoanSize)); + + auto sample_3 = publisher.loan_slice(INITIAL_SIZE * INITIAL_SIZE * INITIAL_SIZE); + ASSERT_THAT(sample_3.has_value(), Eq(false)); + ASSERT_THAT(sample_3.error(), Eq(PublisherLoanError::ExceedsMaxLoanSize)); +} } // namespace diff --git a/iceoryx2-ffi/ffi/src/api/port_factory_publisher_builder.rs b/iceoryx2-ffi/ffi/src/api/port_factory_publisher_builder.rs index 79bc069ba..e4c5215ce 100644 --- a/iceoryx2-ffi/ffi/src/api/port_factory_publisher_builder.rs +++ b/iceoryx2-ffi/ffi/src/api/port_factory_publisher_builder.rs @@ -30,6 +30,31 @@ use core::mem::ManuallyDrop; // BEGIN types definition +/// Describes generically an allocation strategy, meaning how the memory is increased when the +/// available memory is insufficient. +#[repr(C)] +#[derive(Copy, Clone, StringLiteral)] +pub enum iox2_allocation_strategy_e { + /// Increases the memory so that it perfectly fits the new size requirements. This may lead + /// to a lot of reallocations but has the benefit that no byte is wasted. + BEST_FIT, + /// Increases the memory by rounding the increased memory size up to the next power of two. + /// Reduces reallocations a lot at the cost of increased memory usage. + POWER_OF_TWO, + /// The memory is not increased. This may lead to an out-of-memory error when allocating. + STATIC, +} + +impl From for AllocationStrategy { + fn from(value: iox2_allocation_strategy_e) -> Self { + match value { + iox2_allocation_strategy_e::STATIC => AllocationStrategy::Static, + iox2_allocation_strategy_e::BEST_FIT => AllocationStrategy::BestFit, + iox2_allocation_strategy_e::POWER_OF_TWO => AllocationStrategy::PowerOfTwo, + } + } +} + #[repr(C)] #[derive(Copy, Clone, StringLiteral)] pub enum iox2_publisher_create_error_e { @@ -196,6 +221,43 @@ pub unsafe extern "C" fn iox2_publisher_create_error_string( error.as_str_literal().as_ptr() as *const c_char } +/// Sets the [`iox2_allocation_strategy_e`] for the publisher +/// +/// # Arguments +/// +/// * `port_factory_handle` - Must be a valid [`iox2_port_factory_publisher_builder_h_ref`] +/// obtained by [`iox2_port_factory_pub_sub_publisher_builder`](crate::iox2_port_factory_pub_sub_publisher_builder). +/// * `value` - The value to set max slice length to +/// +/// # Safety +/// +/// * `port_factory_handle` must be valid handles +#[no_mangle] +pub unsafe extern "C" fn iox2_port_factory_publisher_builder_set_allocation_strategy( + port_factory_handle: iox2_port_factory_publisher_builder_h_ref, + value: iox2_allocation_strategy_e, +) { + port_factory_handle.assert_non_null(); + + let port_factory_struct = unsafe { &mut *port_factory_handle.as_type() }; + match port_factory_struct.service_type { + iox2_service_type_e::IPC => { + let port_factory = ManuallyDrop::take(&mut port_factory_struct.value.as_mut().ipc); + + port_factory_struct.set(PortFactoryPublisherBuilderUnion::new_ipc( + port_factory.allocation_strategy(value.into()), + )); + } + iox2_service_type_e::LOCAL => { + let port_factory = ManuallyDrop::take(&mut port_factory_struct.value.as_mut().local); + + port_factory_struct.set(PortFactoryPublisherBuilderUnion::new_local( + port_factory.allocation_strategy(value.into()), + )); + } + } +} + /// Sets the max slice length for the publisher /// /// # Arguments @@ -208,7 +270,7 @@ pub unsafe extern "C" fn iox2_publisher_create_error_string( /// /// * `port_factory_handle` must be valid handles #[no_mangle] -pub unsafe extern "C" fn iox2_port_factory_publisher_builder_set_max_slice_len( +pub unsafe extern "C" fn iox2_port_factory_publisher_builder_set_initial_max_slice_len( port_factory_handle: iox2_port_factory_publisher_builder_h_ref, value: c_size_t, ) { @@ -220,14 +282,14 @@ pub unsafe extern "C" fn iox2_port_factory_publisher_builder_set_max_slice_len( let port_factory = ManuallyDrop::take(&mut port_factory_struct.value.as_mut().ipc); port_factory_struct.set(PortFactoryPublisherBuilderUnion::new_ipc( - port_factory.max_slice_len(value), + port_factory.initial_max_slice_len(value), )); } iox2_service_type_e::LOCAL => { let port_factory = ManuallyDrop::take(&mut port_factory_struct.value.as_mut().local); port_factory_struct.set(PortFactoryPublisherBuilderUnion::new_local( - port_factory.max_slice_len(value), + port_factory.initial_max_slice_len(value), )); } } diff --git a/iceoryx2-ffi/ffi/src/api/publisher.rs b/iceoryx2-ffi/ffi/src/api/publisher.rs index f70b07167..1dac4e0dd 100644 --- a/iceoryx2-ffi/ffi/src/api/publisher.rs +++ b/iceoryx2-ffi/ffi/src/api/publisher.rs @@ -337,15 +337,17 @@ pub unsafe extern "C" fn iox2_publisher_unable_to_deliver_strategy( /// /// * `publisher_handle` is valid and non-null #[no_mangle] -pub unsafe extern "C" fn iox2_publisher_max_slice_len( +pub unsafe extern "C" fn iox2_publisher_initial_max_slice_len( publisher_handle: iox2_publisher_h_ref, ) -> c_int { publisher_handle.assert_non_null(); let publisher = &mut *publisher_handle.as_type(); match publisher.service_type { - iox2_service_type_e::IPC => publisher.value.as_mut().ipc.max_slice_len() as c_int, - iox2_service_type_e::LOCAL => publisher.value.as_mut().local.max_slice_len() as c_int, + iox2_service_type_e::IPC => publisher.value.as_mut().ipc.initial_max_slice_len() as c_int, + iox2_service_type_e::LOCAL => { + publisher.value.as_mut().local.initial_max_slice_len() as c_int + } } } diff --git a/iceoryx2-ffi/ffi/src/api/sample_mut.rs b/iceoryx2-ffi/ffi/src/api/sample_mut.rs index ee623f913..211d90bae 100644 --- a/iceoryx2-ffi/ffi/src/api/sample_mut.rs +++ b/iceoryx2-ffi/ffi/src/api/sample_mut.rs @@ -54,7 +54,7 @@ impl SampleMutUninitUnion { #[repr(C)] #[repr(align(8))] // alignment of Option pub struct iox2_sample_mut_storage_t { - internal: [u8; 56], // magic number obtained with size_of::>() + internal: [u8; 64], // magic number obtained with size_of::>() } #[repr(C)] diff --git a/iceoryx2/src/port/details/data_segment.rs b/iceoryx2/src/port/details/data_segment.rs new file mode 100644 index 000000000..2a1a5342b --- /dev/null +++ b/iceoryx2/src/port/details/data_segment.rs @@ -0,0 +1,245 @@ +// Copyright (c) 2023 - 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use std::alloc::Layout; + +use iceoryx2_bb_log::fail; +use iceoryx2_cal::{ + event::NamedConceptBuilder, + resizable_shared_memory::*, + shared_memory::{ + SharedMemory, SharedMemoryBuilder, SharedMemoryCreateError, SharedMemoryForPoolAllocator, + SharedMemoryOpenError, ShmPointer, + }, + shm_allocator::{ + self, pool_allocator::PoolAllocator, AllocationError, AllocationStrategy, PointerOffset, + SegmentId, ShmAllocationError, + }, +}; + +use crate::{ + config, + service::{ + self, + config_scheme::{data_segment_config, resizable_data_segment_config}, + dynamic_config::publish_subscribe::PublisherDetails, + naming_scheme::data_segment_name, + }, +}; + +#[repr(C)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum DataSegmentType { + Dynamic, + Static, +} + +impl DataSegmentType { + pub(crate) fn new_from_allocation_strategy(v: AllocationStrategy) -> Self { + match v { + AllocationStrategy::Static => DataSegmentType::Static, + _ => DataSegmentType::Dynamic, + } + } +} + +#[derive(Debug)] +enum MemoryType { + Static(Service::SharedMemory), + Dynamic(Service::ResizableSharedMemory), +} + +#[derive(Debug)] +pub(crate) struct DataSegment { + memory: MemoryType, +} + +impl DataSegment { + pub(crate) fn create( + details: &PublisherDetails, + global_config: &config::Config, + sample_layout: Layout, + allocation_strategy: AllocationStrategy, + ) -> Result { + let allocator_config = shm_allocator::pool_allocator::Config { + bucket_layout: sample_layout, + }; + let msg = "Unable to create the data segment since the underlying shared memory could not be created."; + let origin = "DataSegment::create()"; + + let segment_name = data_segment_name(&details.publisher_id); + let memory = match details.data_segment_type { + DataSegmentType::Static => { + let segment_config = data_segment_config::(global_config); + let memory = fail!(from origin, + when <>::Builder as NamedConceptBuilder< + Service::SharedMemory, + >>::new(&segment_name) + .config(&segment_config) + .size(sample_layout.size() * details.number_of_samples + sample_layout.align() - 1) + .create(&allocator_config), + "{msg}"); + MemoryType::Static(memory) + } + DataSegmentType::Dynamic => { + let segment_config = resizable_data_segment_config::(global_config); + let memory = fail!(from origin, + when <>::MemoryBuilder as NamedConceptBuilder>::new( + &segment_name, + ) + .config(&segment_config) + .max_number_of_chunks_hint(details.number_of_samples) + .max_chunk_layout_hint(sample_layout) + .allocation_strategy(allocation_strategy) + .create(), + "{msg}"); + MemoryType::Dynamic(memory) + } + }; + + Ok(Self { memory }) + } + + pub(crate) fn allocate(&self, layout: Layout) -> Result { + let msg = "Unable to allocate memory from the data segment"; + match &self.memory { + MemoryType::Static(memory) => Ok(fail!(from self, when memory.allocate(layout), + "{msg}.")), + MemoryType::Dynamic(memory) => match memory.allocate(layout) { + Ok(ptr) => Ok(ptr), + Err(ResizableShmAllocationError::ShmAllocationError(e)) => { + fail!(from self, with e, + "{msg} caused by {:?}.", e); + } + Err(ResizableShmAllocationError::MaxReallocationsReached) => { + fail!(from self, + with ShmAllocationError::AllocationError(AllocationError::OutOfMemory), + "{msg} since the maxmimum number of reallocations was reached. Try to provide initial_max_slice_len({}) as hint when creating the publisher to have a more fitting initial setup.", layout.size()); + } + Err(ResizableShmAllocationError::SharedMemoryCreateError(e)) => { + fail!(from self, + with ShmAllocationError::AllocationError(AllocationError::InternalError), + "{msg} since the shared memory segment creation failed while resizing the memory due to ({:?}).", e); + } + }, + } + } + + pub(crate) unsafe fn deallocate_bucket(&self, offset: PointerOffset) { + match &self.memory { + MemoryType::Static(memory) => memory.deallocate_bucket(offset), + MemoryType::Dynamic(memory) => memory.deallocate_bucket(offset), + } + } + + pub(crate) fn bucket_size(&self, segment_id: SegmentId) -> usize { + match &self.memory { + MemoryType::Static(memory) => memory.bucket_size(), + MemoryType::Dynamic(memory) => memory.bucket_size(segment_id), + } + } + + pub(crate) fn max_number_of_segments(data_segment_type: DataSegmentType) -> u8 { + match data_segment_type { + DataSegmentType::Static => 1, + DataSegmentType::Dynamic => { + (Service::ResizableSharedMemory::max_number_of_reallocations() - 1) as u8 + } + } + } +} + +#[derive(Debug)] +enum MemoryViewType { + Static(Service::SharedMemory), + Dynamic( + >::View, + ), +} + +#[derive(Debug)] +pub(crate) struct DataSegmentView { + memory: MemoryViewType, +} + +impl DataSegmentView { + pub(crate) fn open( + details: &PublisherDetails, + global_config: &config::Config, + ) -> Result { + let segment_name = data_segment_name(&details.publisher_id); + let origin = "DataSegment::open()"; + let msg = + "Unable to open data segment since the underlying shared memory could not be opened."; + + let memory = match details.data_segment_type { + DataSegmentType::Static => { + let segment_config = data_segment_config::(global_config); + let memory = fail!(from origin, + when >:: + Builder::new(&segment_name) + .config(&segment_config) + .timeout(global_config.global.service.creation_timeout) + .open(), + "{msg}"); + MemoryViewType::Static(memory) + } + DataSegmentType::Dynamic => { + let segment_config = resizable_data_segment_config::(global_config); + let memory = fail!(from origin, + when <>::ViewBuilder as NamedConceptBuilder>::new( + &segment_name, + ) + .config(&segment_config) + .open(), + "{msg}"); + MemoryViewType::Dynamic(memory) + } + }; + + Ok(Self { memory }) + } + + pub(crate) fn register_and_translate_offset( + &self, + offset: PointerOffset, + ) -> Result { + match &self.memory { + MemoryViewType::Static(memory) => Ok(offset.offset() + memory.payload_start_address()), + MemoryViewType::Dynamic(memory) => unsafe { + match memory.register_and_translate_offset(offset) { + Ok(ptr) => Ok(ptr as usize), + Err(e) => { + fail!(from self, with e, + "Failed to register and translate pointer due to a failure while opening the corresponding shared memory segment ({:?}).", + e); + } + } + }, + } + } + + pub(crate) unsafe fn unregister_offset(&self, offset: PointerOffset) { + if let MemoryViewType::Dynamic(memory) = &self.memory { + memory.unregister_offset(offset); + } + } +} diff --git a/iceoryx2/src/port/details/mod.rs b/iceoryx2/src/port/details/mod.rs index 1cf265a4c..aca86cae8 100644 --- a/iceoryx2/src/port/details/mod.rs +++ b/iceoryx2/src/port/details/mod.rs @@ -10,5 +10,6 @@ // // SPDX-License-Identifier: Apache-2.0 OR MIT +pub(crate) mod data_segment; pub(crate) mod publisher_connections; pub(crate) mod subscriber_connections; diff --git a/iceoryx2/src/port/details/publisher_connections.rs b/iceoryx2/src/port/details/publisher_connections.rs index 507aae352..26f345810 100644 --- a/iceoryx2/src/port/details/publisher_connections.rs +++ b/iceoryx2/src/port/details/publisher_connections.rs @@ -15,27 +15,23 @@ use std::{cell::UnsafeCell, sync::Arc}; use crate::{ port::port_identifiers::{UniquePublisherId, UniqueSubscriberId}, service::{ - self, - config_scheme::{connection_config, data_segment_config}, - dynamic_config::publish_subscribe::PublisherDetails, - naming_scheme::{connection_name, data_segment_name}, - static_config::publish_subscribe::StaticConfig, - ServiceState, + self, config_scheme::connection_config, + dynamic_config::publish_subscribe::PublisherDetails, naming_scheme::connection_name, + static_config::publish_subscribe::StaticConfig, ServiceState, }, }; use crate::port::update_connections::ConnectionFailure; use iceoryx2_bb_log::fail; use iceoryx2_cal::named_concept::NamedConceptBuilder; -use iceoryx2_cal::{ - shared_memory::SharedMemory, shared_memory::SharedMemoryBuilder, - shm_allocator::pool_allocator::PoolAllocator, zero_copy_connection::*, -}; +use iceoryx2_cal::zero_copy_connection::*; + +use super::data_segment::DataSegmentView; #[derive(Debug)] pub(crate) struct Connection { pub(crate) receiver: ::Receiver, - pub(crate) data_segment: Service::SharedMemory, + pub(crate) data_segment: DataSegmentView, pub(crate) publisher_id: UniquePublisherId, } @@ -49,24 +45,22 @@ impl Connection { details.publisher_id, this.subscriber_id ); + let global_config = this.service_state.shared_node.config(); let receiver = fail!(from this, when :: Builder::new( &connection_name(details.publisher_id, this.subscriber_id)) - .config(&connection_config::(this.service_state.shared_node.config())) + .config(&connection_config::(global_config)) .buffer_size(this.buffer_size) .receiver_max_borrowed_samples(this.static_config.subscriber_max_borrowed_samples) .enable_safe_overflow(this.static_config.enable_safe_overflow) .number_of_samples_per_segment(details.number_of_samples) - .timeout(this.service_state.shared_node.config().global.service.creation_timeout) - .create_receiver(this.static_config.message_type_details().sample_layout(details.max_slice_len).size()), + .max_supported_shared_memory_segments(details.max_number_of_segments) + .timeout(global_config.global.service.creation_timeout) + .create_receiver(), "{} since the zero copy connection could not be established.", msg); let data_segment = fail!(from this, - when >:: - Builder::new(&data_segment_name(&details.publisher_id)) - .config(&data_segment_config::(this.service_state.shared_node.config())) - .timeout(this.service_state.shared_node.config().global.service.creation_timeout) - .open(), + when DataSegmentView::open(details, global_config), "{} since the publishers data segment could not be opened.", msg); Ok(Self { diff --git a/iceoryx2/src/port/details/subscriber_connections.rs b/iceoryx2/src/port/details/subscriber_connections.rs index e33815689..45ef52ebb 100644 --- a/iceoryx2/src/port/details/subscriber_connections.rs +++ b/iceoryx2/src/port/details/subscriber_connections.rs @@ -39,7 +39,6 @@ impl Connection { this: &SubscriberConnections, subscriber_details: SubscriberDetails, number_of_samples: usize, - max_slice_len: usize, ) -> Result { let msg = format!( "Unable to establish connection to subscriber {:?} from publisher {:?}", @@ -58,8 +57,9 @@ impl Connection { .receiver_max_borrowed_samples(this.static_config.subscriber_max_borrowed_samples) .enable_safe_overflow(this.static_config.enable_safe_overflow) .number_of_samples_per_segment(number_of_samples) + .max_supported_shared_memory_segments(this.max_number_of_segments) .timeout(this.shared_node.config().global.service.creation_timeout) - .create_sender(this.static_config.message_type_details().sample_layout(max_slice_len).size()), + .create_sender(), "{}.", msg); Ok(Self { @@ -76,6 +76,7 @@ pub(crate) struct SubscriberConnections { shared_node: Arc>, pub(crate) static_config: StaticConfig, number_of_samples: usize, + max_number_of_segments: u8, } impl SubscriberConnections { @@ -85,6 +86,7 @@ impl SubscriberConnections { port_id: UniquePublisherId, static_config: &StaticConfig, number_of_samples: usize, + max_number_of_segments: u8, ) -> Self { Self { connections: (0..capacity).map(|_| UnsafeCell::new(None)).collect(), @@ -92,6 +94,7 @@ impl SubscriberConnections { port_id, static_config: static_config.clone(), number_of_samples, + max_number_of_segments, } } @@ -116,13 +119,11 @@ impl SubscriberConnections { &self, index: usize, subscriber_details: SubscriberDetails, - max_slice_len: usize, ) -> Result<(), ZeroCopyCreationError> { *self.get_mut(index) = Some(Connection::new( self, subscriber_details, self.number_of_samples, - max_slice_len, )?); Ok(()) diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index c8a88ae91..b0703da32 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -69,7 +69,7 @@ //! let publisher = service //! .publisher_builder() //! // defines the maximum length of a slice -//! .max_slice_len(128) +//! .initial_max_slice_len(128) //! // defines how many samples can be loaned in parallel //! .max_loaned_samples(5) //! // defines behavior when subscriber queue is full in an non-overflowing service @@ -101,6 +101,7 @@ //! # } //! ``` +use super::details::data_segment::{DataSegment, DataSegmentType}; use super::port_identifiers::UniquePublisherId; use super::UniqueSubscriberId; use crate::port::details::subscriber_connections::*; @@ -128,14 +129,9 @@ use iceoryx2_bb_log::{debug, error, fail, fatal_panic, warn}; use iceoryx2_bb_system_types::file_name::FileName; use iceoryx2_cal::dynamic_storage::DynamicStorage; use iceoryx2_cal::event::NamedConceptMgmt; -use iceoryx2_cal::named_concept::{ - NamedConceptBuilder, NamedConceptListError, NamedConceptRemoveError, -}; -use iceoryx2_cal::shared_memory::{ - SharedMemory, SharedMemoryBuilder, SharedMemoryCreateError, ShmPointer, -}; -use iceoryx2_cal::shm_allocator::pool_allocator::PoolAllocator; -use iceoryx2_cal::shm_allocator::{self, PointerOffset, ShmAllocationError}; +use iceoryx2_cal::named_concept::{NamedConceptListError, NamedConceptRemoveError}; +use iceoryx2_cal::shared_memory::ShmPointer; +use iceoryx2_cal::shm_allocator::{AllocationStrategy, PointerOffset, ShmAllocationError}; use iceoryx2_cal::zero_copy_connection::{ ZeroCopyConnection, ZeroCopyCreationError, ZeroCopySendError, ZeroCopySender, }; @@ -180,7 +176,7 @@ pub enum PublisherLoanError { ExceedsMaxLoanedSamples, /// The provided slice size exceeds the configured max slice size of the [`Publisher`]. /// To send a [`SampleMut`] with this size a new [`Publisher`] has to be created with - /// a [`crate::service::port_factory::publisher::PortFactoryPublisher::max_slice_len()`] + /// a [`crate::service::port_factory::publisher::PortFactoryPublisher::initial_max_slice_len()`] /// greater or equal to the required len. ExceedsMaxLoanSize, /// Errors that indicate either an implementation issue or a wrongly configured system. @@ -238,57 +234,111 @@ pub(crate) enum RemovePubSubPortFromAllConnectionsError { } #[derive(Debug)] -pub(crate) struct DataSegment { +struct SegmentState { sample_reference_counter: Vec, - memory: Service::SharedMemory, - payload_size: usize, - payload_type_layout: Layout, + payload_size: IoxAtomicUsize, +} + +impl SegmentState { + fn new(number_of_samples: usize) -> Self { + let mut sample_reference_counter = Vec::with_capacity(number_of_samples); + for _ in 0..number_of_samples { + sample_reference_counter.push(IoxAtomicU64::new(0)); + } + + Self { + sample_reference_counter, + payload_size: IoxAtomicUsize::new(0), + } + } + + fn set_payload_size(&self, value: usize) { + self.payload_size.store(value, Ordering::Relaxed); + } + + fn payload_size(&self) -> usize { + self.payload_size.load(Ordering::Relaxed) + } + + fn sample_index(&self, distance_to_chunk: usize) -> usize { + debug_assert!(distance_to_chunk % self.payload_size() == 0); + distance_to_chunk / self.payload_size() + } + + fn borrow_sample(&self, distance_to_chunk: usize) -> u64 { + self.sample_reference_counter[self.sample_index(distance_to_chunk)] + .fetch_add(1, Ordering::Relaxed) + } + + fn release_sample(&self, distance_to_chunk: usize) -> u64 { + self.sample_reference_counter[self.sample_index(distance_to_chunk)] + .fetch_sub(1, Ordering::Relaxed) + } +} + +#[derive(Debug, Clone, Copy)] +struct OffsetAndSize { + offset: u64, + size: usize, +} + +#[derive(Debug)] +struct AllocationPair { + shm_pointer: ShmPointer, + sample_size: usize, +} + +#[derive(Debug)] +pub(crate) struct PublisherBackend { + segment_states: Vec, + data_segment: DataSegment, port_id: UniquePublisherId, config: LocalPublisherConfig, service_state: Arc>, subscriber_connections: SubscriberConnections, subscriber_list_state: UnsafeCell>, - history: Option>>, + history: Option>>, static_config: crate::service::static_config::StaticConfig, loan_counter: IoxAtomicUsize, is_active: IoxAtomicBool, } -impl DataSegment { - fn sample_index(&self, distance_to_chunk: usize) -> usize { - distance_to_chunk / self.payload_size - } - - fn allocate(&self, layout: Layout) -> Result { +impl PublisherBackend { + fn allocate(&self, layout: Layout) -> Result { self.retrieve_returned_samples(); let msg = "Unable to allocate Sample"; - let ptr = self.memory.allocate(layout)?; - if self.sample_reference_counter[self.sample_index(ptr.offset.offset())] - .fetch_add(1, Ordering::Relaxed) - != 0 - { + let shm_pointer = self.data_segment.allocate(layout)?; + let (ref_count, sample_size) = self.borrow_sample(shm_pointer.offset); + if ref_count != 0 { fatal_panic!(from self, "{} since the allocated sample is already in use! This should never happen!", msg); } - Ok(ptr) + Ok(AllocationPair { + shm_pointer, + sample_size, + }) } - fn borrow_sample(&self, distance_to_chunk: usize) { - self.sample_reference_counter[self.sample_index(distance_to_chunk)] - .fetch_add(1, Ordering::Relaxed); + fn borrow_sample(&self, offset: PointerOffset) -> (u64, usize) { + let segment_id = offset.segment_id(); + let segment_state = &self.segment_states[segment_id.value() as usize]; + let mut payload_size = segment_state.payload_size(); + if segment_state.payload_size() == 0 { + payload_size = self.data_segment.bucket_size(segment_id); + segment_state.set_payload_size(payload_size); + } + (segment_state.borrow_sample(offset.offset()), payload_size) } - fn release_sample(&self, distance_to_chunk: PointerOffset) { - if self.sample_reference_counter[self.sample_index(distance_to_chunk.offset())] - .fetch_sub(1, Ordering::Relaxed) + fn release_sample(&self, offset: PointerOffset) { + if self.segment_states[offset.segment_id().value() as usize].release_sample(offset.offset()) == 1 { unsafe { - self.memory - .deallocate(distance_to_chunk, self.payload_type_layout); + self.data_segment.deallocate_bucket(offset); } } } @@ -330,21 +380,28 @@ impl DataSegment { self.loan_counter.fetch_sub(1, Ordering::Relaxed); } - fn add_sample_to_history(&self, address_to_chunk: usize) { + fn add_sample_to_history(&self, offset: PointerOffset, sample_size: usize) { match &self.history { None => (), Some(history) => { let history = unsafe { &mut *history.get() }; - self.borrow_sample(address_to_chunk); - match history.push_with_overflow(address_to_chunk) { + self.borrow_sample(offset); + match history.push_with_overflow(OffsetAndSize { + offset: offset.as_value(), + size: sample_size, + }) { None => (), - Some(old) => self.release_sample(PointerOffset::new(old)), + Some(old) => self.release_sample(PointerOffset::from_value(old.offset)), } } } } - fn deliver_sample(&self, address_to_chunk: usize) -> Result { + fn deliver_sample( + &self, + offset: PointerOffset, + sample_size: usize, + ) -> Result { self.retrieve_returned_samples(); let deliver_call = match self.config.unable_to_deliver_strategy { @@ -359,7 +416,7 @@ impl DataSegment { let mut number_of_recipients = 0; for i in 0..self.subscriber_connections.len() { if let Some(ref connection) = self.subscriber_connections.get(i) { - match deliver_call(&connection.sender, PointerOffset::new(address_to_chunk)) { + match deliver_call(&connection.sender, offset, sample_size) { Err(ZeroCopySendError::ReceiveBufferFull) | Err(ZeroCopySendError::UsedChunkListFull) => { /* causes no problem @@ -378,23 +435,23 @@ impl DataSegment { DegrationAction::Warn => { error!(from self, "While delivering the sample: {:?} a corrupted connection was detected with subscriber {:?}.", - address_to_chunk, connection.subscriber_id); + offset, connection.subscriber_id); } DegrationAction::Fail => { fail!(from self, with PublisherSendError::ConnectionCorrupted, "While delivering the sample: {:?} a corrupted connection was detected with subscriber {:?}.", - address_to_chunk, connection.subscriber_id); + offset, connection.subscriber_id); } }, None => { error!(from self, "While delivering the sample: {:?} a corrupted connection was detected with subscriber {:?}.", - address_to_chunk, connection.subscriber_id); + offset, connection.subscriber_id); } } } Ok(overflow) => { - self.borrow_sample(address_to_chunk); + self.borrow_sample(offset); number_of_recipients += 1; if let Some(old) = overflow { @@ -434,11 +491,7 @@ impl DataSegment { }; if create_connection { - match self.subscriber_connections.create( - i, - *subscriber_details, - self.config.max_slice_len, - ) { + match self.subscriber_connections.create(i, *subscriber_details) { Ok(()) => match &self.subscriber_connections.get(i) { Some(connection) => self.deliver_sample_history(connection), None => { @@ -501,10 +554,13 @@ impl DataSegment { Some(history) => { let history = unsafe { &mut *history.get() }; for i in 0..history.len() { - let ptr_distance = unsafe { history.get_unchecked(i) }; + let old_sample = unsafe { history.get_unchecked(i) }; - match connection.sender.try_send(PointerOffset::new(ptr_distance)) { - Ok(_) => self.borrow_sample(ptr_distance), + let offset = PointerOffset::from_value(old_sample.offset); + match connection.sender.try_send(offset, old_sample.size) { + Ok(_) => { + self.borrow_sample(offset); + } Err(e) => { warn!(from self, "Failed to deliver history to new subscriber via {:?} due to {:?}", connection, e); } @@ -514,7 +570,11 @@ impl DataSegment { } } - pub(crate) fn send_sample(&self, address_to_chunk: usize) -> Result { + pub(crate) fn send_sample( + &self, + offset: PointerOffset, + sample_size: usize, + ) -> Result { let msg = "Unable to send sample"; if !self.is_active.load(Ordering::Relaxed) { fail!(from self, with PublisherSendError::ConnectionBrokenSincePublisherNoLongerExists, @@ -524,8 +584,8 @@ impl DataSegment { fail!(from self, when self.update_connections(), "{} since the connections could not be updated.", msg); - self.add_sample_to_history(address_to_chunk); - self.deliver_sample(address_to_chunk) + self.add_sample_to_history(offset, sample_size); + self.deliver_sample(offset, sample_size) } } @@ -536,7 +596,7 @@ pub struct Publisher< Payload: Debug + ?Sized + 'static, UserHeader: Debug, > { - pub(crate) data_segment: Arc>, + pub(crate) backend: Arc>, dynamic_publisher_handle: Option, payload_size: usize, _payload: PhantomData, @@ -548,7 +608,7 @@ impl Drop { fn drop(&mut self) { if let Some(handle) = self.dynamic_publisher_handle { - self.data_segment + self.backend .service_state .dynamic_storage .get() @@ -582,26 +642,38 @@ impl .messaging_pattern .required_amount_of_samples_per_data_segment(config.max_loaned_samples); + let data_segment_type = + DataSegmentType::new_from_allocation_strategy(config.allocation_strategy); + + let sample_layout = static_config + .message_type_details + .sample_layout(config.initial_max_slice_len); + + let max_slice_len = config.initial_max_slice_len; + let max_number_of_segments = + DataSegment::::max_number_of_segments(data_segment_type); + let publisher_details = PublisherDetails { + data_segment_type, + publisher_id: port_id, + number_of_samples, + max_slice_len, + node_id: *service.__internal_state().shared_node.id(), + max_number_of_segments, + }; + let global_config = service.__internal_state().shared_node.config(); + let data_segment = fail!(from origin, - when Self::create_data_segment(&port_id, service.__internal_state().shared_node.config(), number_of_samples, static_config, &config), + when DataSegment::create(&publisher_details, global_config, sample_layout, config.allocation_strategy), with PublisherCreateError::UnableToCreateDataSegment, "{} since the data segment could not be acquired.", msg); - let max_slice_len = config.max_slice_len; - let data_segment = Arc::new(DataSegment { + let backend = Arc::new(PublisherBackend { is_active: IoxAtomicBool::new(true), - memory: data_segment, - payload_size: static_config - .message_type_details() - .sample_layout(config.max_slice_len) - .size(), - payload_type_layout: static_config - .message_type_details() - .payload_layout(config.max_slice_len), - sample_reference_counter: { - let mut v = Vec::with_capacity(number_of_samples); - for _ in 0..number_of_samples { - v.push(IoxAtomicU64::new(0)); + data_segment, + segment_states: { + let mut v: Vec = Vec::with_capacity(max_number_of_segments as usize); + for _ in 0..max_number_of_segments { + v.push(SegmentState::new(number_of_samples)) } v }, @@ -613,6 +685,7 @@ impl port_id, static_config, number_of_samples, + max_number_of_segments, ), config, subscriber_list_state: unsafe { UnsafeCell::new(subscriber_list.get_state()) }, @@ -624,7 +697,7 @@ impl loan_counter: IoxAtomicUsize::new(0), }); - let payload_size = data_segment + let payload_size = backend .subscriber_connections .static_config .message_type_details @@ -632,14 +705,14 @@ impl .size; let mut new_self = Self { - data_segment, + backend, dynamic_publisher_handle: None, payload_size, _payload: PhantomData, _user_header: PhantomData, }; - if let Err(e) = new_self.data_segment.populate_subscriber_channels() { + if let Err(e) = new_self.backend.populate_subscriber_channels() { warn!(from new_self, "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e); } @@ -652,12 +725,8 @@ impl .dynamic_storage .get() .publish_subscribe() - .add_publisher_id(PublisherDetails { - publisher_id: port_id, - number_of_samples, - max_slice_len, - node_id: *service.__internal_state().shared_node.id(), - }) { + .add_publisher_id(publisher_details) + { Some(unique_index) => unique_index, None => { fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers, @@ -671,60 +740,36 @@ impl Ok(new_self) } - fn create_data_segment( - port_id: &UniquePublisherId, - global_config: &config::Config, - number_of_samples: usize, - static_config: &publish_subscribe::StaticConfig, - config: &LocalPublisherConfig, - ) -> Result { - let l = static_config - .message_type_details - .sample_layout(config.max_slice_len); - let allocator_config = shm_allocator::pool_allocator::Config { bucket_layout: l }; - - Ok(fail!(from "Publisher::create_data_segment()", - when <>::Builder as NamedConceptBuilder< - Service::SharedMemory, - >>::new(&data_segment_name(port_id)) - .config(&data_segment_config::(global_config)) - .size(l.size() * number_of_samples + l.align() - 1) - .create(&allocator_config), - "Unable to create the data segment.")) - } - /// Returns the [`UniquePublisherId`] of the [`Publisher`] pub fn id(&self) -> UniquePublisherId { - self.data_segment.port_id + self.backend.port_id } /// Returns the strategy the [`Publisher`] follows when a [`SampleMut`] cannot be delivered /// since the [`Subscriber`](crate::port::subscriber::Subscriber)s buffer is full. pub fn unable_to_deliver_strategy(&self) -> UnableToDeliverStrategy { - self.data_segment.config.unable_to_deliver_strategy + self.backend.config.unable_to_deliver_strategy } /// Returns the maximum slice length configured for this [`Publisher`]. - pub fn max_slice_len(&self) -> usize { - self.data_segment.config.max_slice_len + pub fn initial_max_slice_len(&self) -> usize { + self.backend.config.initial_max_slice_len } - fn allocate(&self, layout: Layout) -> Result { + fn allocate(&self, layout: Layout) -> Result { let msg = "Unable to allocate Sample with"; - if self.data_segment.loan_counter.load(Ordering::Relaxed) - >= self.data_segment.config.max_loaned_samples + if self.backend.loan_counter.load(Ordering::Relaxed) + >= self.backend.config.max_loaned_samples { fail!(from self, with PublisherLoanError::ExceedsMaxLoanedSamples, "{} {:?} since already {} samples were loaned and it would exceed the maximum of parallel loans of {}. Release or send a loaned sample to loan another sample.", - msg, layout, self.data_segment.loan_counter.load(Ordering::Relaxed), self.data_segment.config.max_loaned_samples); + msg, layout, self.backend.loan_counter.load(Ordering::Relaxed), self.backend.config.max_loaned_samples); } - match self.data_segment.allocate(layout) { + match self.backend.allocate(layout) { Ok(chunk) => { - self.data_segment - .loan_counter - .fetch_add(1, Ordering::Relaxed); + self.backend.loan_counter.fetch_add(1, Ordering::Relaxed); Ok(chunk) } Err(ShmAllocationError::AllocationError(AllocationError::OutOfMemory)) => { @@ -743,7 +788,7 @@ impl } fn sample_layout(&self, number_of_elements: usize) -> Layout { - self.data_segment + self.backend .subscriber_connections .static_config .message_type_details @@ -751,7 +796,7 @@ impl } fn user_header_ptr(&self, header: *const Header) -> *const u8 { - self.data_segment + self.backend .subscriber_connections .static_config .message_type_details @@ -760,7 +805,7 @@ impl } fn payload_ptr(&self, header: *const Header) -> *const u8 { - self.data_segment + self.backend .subscriber_connections .static_config .message_type_details @@ -769,7 +814,7 @@ impl } fn payload_type_variant(&self) -> TypeVariant { - self.data_segment + self.backend .subscriber_connections .static_config .message_type_details @@ -846,19 +891,19 @@ impl ) -> Result, UserHeader>, PublisherLoanError> { let chunk = self.allocate(self.sample_layout(1))?; - let header_ptr = chunk.data_ptr as *mut Header; + let header_ptr = chunk.shm_pointer.data_ptr as *mut Header; let user_header_ptr = self.user_header_ptr(header_ptr) as *mut UserHeader; let payload_ptr = self.payload_ptr(header_ptr) as *mut MaybeUninit; - - unsafe { header_ptr.write(Header::new(self.data_segment.port_id, 1)) }; + unsafe { header_ptr.write(Header::new(self.backend.port_id, 1)) }; let sample = unsafe { RawSampleMut::new_unchecked(header_ptr, user_header_ptr, payload_ptr) }; Ok( SampleMutUninit::, UserHeader>::new( - &self.data_segment, + &self.backend, sample, - chunk.offset, + chunk.shm_pointer.offset, + chunk.sample_size, ), ) } @@ -927,7 +972,7 @@ impl /// # .open_or_create()?; /// # /// # let publisher = service.publisher_builder() - /// .max_slice_len(120) + /// .initial_max_slice_len(120) /// .create()?; /// /// let slice_length = 5; @@ -968,7 +1013,7 @@ impl /// # .open_or_create()?; /// # /// # let publisher = service.publisher_builder() - /// .max_slice_len(120) + /// .initial_max_slice_len(120) /// .create()?; /// /// let slice_length = 5; @@ -995,8 +1040,10 @@ impl underlying_number_of_slice_elements: usize, ) -> Result], UserHeader>, PublisherLoanError> { - let max_slice_len = self.data_segment.config.max_slice_len; - if max_slice_len < slice_len { + let max_slice_len = self.backend.config.initial_max_slice_len; + if self.backend.config.allocation_strategy == AllocationStrategy::Static + && max_slice_len < slice_len + { fail!(from self, with PublisherLoanError::ExceedsMaxLoanSize, "Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.", slice_len, max_slice_len); @@ -1004,11 +1051,10 @@ impl let sample_layout = self.sample_layout(slice_len); let chunk = self.allocate(sample_layout)?; - let header_ptr = chunk.data_ptr as *mut Header; + let header_ptr = chunk.shm_pointer.data_ptr as *mut Header; let user_header_ptr = self.user_header_ptr(header_ptr) as *mut UserHeader; let payload_ptr = self.payload_ptr(header_ptr) as *mut MaybeUninit; - - unsafe { header_ptr.write(Header::new(self.data_segment.port_id, slice_len as _)) }; + unsafe { header_ptr.write(Header::new(self.backend.port_id, slice_len as _)) }; let sample = unsafe { RawSampleMut::new_unchecked( @@ -1020,9 +1066,10 @@ impl Ok( SampleMutUninit::], UserHeader>::new( - &self.data_segment, + &self.backend, sample, - chunk.offset, + chunk.shm_pointer.offset, + chunk.sample_size, ), ) } @@ -1060,7 +1107,7 @@ impl Upda for Publisher { fn update_connections(&self) -> Result<(), ConnectionFailure> { - self.data_segment.update_connections() + self.backend.update_connections() } } diff --git a/iceoryx2/src/port/subscriber.rs b/iceoryx2/src/port/subscriber.rs index b894b9eb5..0598c0580 100644 --- a/iceoryx2/src/port/subscriber.rs +++ b/iceoryx2/src/port/subscriber.rs @@ -43,7 +43,7 @@ use iceoryx2_bb_elementary::CallbackProgression; use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState}; use iceoryx2_bb_log::{fail, warn}; use iceoryx2_cal::dynamic_storage::DynamicStorage; -use iceoryx2_cal::{shared_memory::*, zero_copy_connection::*}; +use iceoryx2_cal::zero_copy_connection::*; use crate::port::DegrationAction; use crate::sample::SampleDetails; @@ -305,16 +305,25 @@ impl Ok(data) => match data { None => Ok(None), Some(offset) => { - let absolute_address = - offset.offset() + connection.data_segment.payload_start_address(); - let details = SampleDetails { publisher_connection: connection.clone(), offset, origin: connection.publisher_id, }; - Ok(Some((details, absolute_address))) + let offset = match connection + .data_segment + .register_and_translate_offset(offset) + { + Ok(offset) => offset, + Err(e) => { + fail!(from self, with SubscriberReceiveError::ConnectionFailure(ConnectionFailure::UnableToMapPublishersDataSegment(e)), + "Unable to register and translate offset from publisher {:?} since the received offset {:?} could not be registered and translated.", + connection.publisher_id, offset); + } + }; + + Ok(Some((details, offset))) } }, Err(ZeroCopyReceiveError::ReceiveWouldExceedMaxBorrowValue) => { diff --git a/iceoryx2/src/prelude.rs b/iceoryx2/src/prelude.rs index 20d86153c..c515bcb84 100644 --- a/iceoryx2/src/prelude.rs +++ b/iceoryx2/src/prelude.rs @@ -29,3 +29,4 @@ pub use iceoryx2_bb_log::set_log_level; pub use iceoryx2_bb_log::LogLevel; pub use iceoryx2_bb_posix::file_descriptor::{FileDescriptor, FileDescriptorBased}; pub use iceoryx2_bb_posix::file_descriptor_set::SynchronousMultiplexing; +pub use iceoryx2_cal::shm_allocator::AllocationStrategy; diff --git a/iceoryx2/src/sample.rs b/iceoryx2/src/sample.rs index 653555f94..8f198e5ec 100644 --- a/iceoryx2/src/sample.rs +++ b/iceoryx2/src/sample.rs @@ -84,6 +84,13 @@ impl Drop for Sample { fn drop(&mut self) { + unsafe { + self.details + .publisher_connection + .data_segment + .unregister_offset(self.details.offset) + }; + match self .details .publisher_connection diff --git a/iceoryx2/src/sample_mut.rs b/iceoryx2/src/sample_mut.rs index de4b29404..563148c6e 100644 --- a/iceoryx2/src/sample_mut.rs +++ b/iceoryx2/src/sample_mut.rs @@ -48,7 +48,7 @@ //! # .publish_subscribe::<[usize]>() //! # .create()?; //! # -//! # let publisher = service.publisher_builder().max_slice_len(16).create()?; +//! # let publisher = service.publisher_builder().initial_max_slice_len(16).create()?; //! //! let slice_length = 12; //! // initializes every element of the slice with `Default::default()` @@ -64,7 +64,7 @@ //! ``` use crate::{ - port::publisher::{DataSegment, PublisherSendError}, + port::publisher::{PublisherBackend, PublisherSendError}, raw_sample::RawSampleMut, service::header::publish_subscribe::Header, }; @@ -87,9 +87,10 @@ use std::{ /// Does not implement [`Send`] since it releases unsent samples in the [`crate::port::publisher::Publisher`] and the /// [`crate::port::publisher::Publisher`] is not thread-safe! pub struct SampleMut { - pub(crate) data_segment: Arc>, + pub(crate) publisher_backend: Arc>, pub(crate) ptr: RawSampleMut, pub(crate) offset_to_chunk: PointerOffset, + pub(crate) sample_size: usize, } impl Debug @@ -98,12 +99,13 @@ impl Debu fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "SampleMut<{}, {}, {}> {{ data_segment: {:?}, offset_to_chunk: {:?} }}", + "SampleMut<{}, {}, {}> {{ publisher_backend: {:?}, offset_to_chunk: {:?}, sample_size: {} }}", core::any::type_name::(), core::any::type_name::(), core::any::type_name::(), - self.data_segment, - self.offset_to_chunk + self.publisher_backend, + self.offset_to_chunk, + self.sample_size ) } } @@ -112,7 +114,8 @@ impl Drop for SampleMut { fn drop(&mut self) { - self.data_segment.return_loaned_sample(self.offset_to_chunk); + self.publisher_backend + .return_loaned_sample(self.offset_to_chunk); } } @@ -288,6 +291,7 @@ impl< /// # } /// ``` pub fn send(self) -> Result { - self.data_segment.send_sample(self.offset_to_chunk.offset()) + self.publisher_backend + .send_sample(self.offset_to_chunk, self.sample_size) } } diff --git a/iceoryx2/src/sample_mut_uninit.rs b/iceoryx2/src/sample_mut_uninit.rs index e4d23950c..c9bede061 100644 --- a/iceoryx2/src/sample_mut_uninit.rs +++ b/iceoryx2/src/sample_mut_uninit.rs @@ -49,7 +49,7 @@ //! # .publish_subscribe::<[usize]>() //! # .create()?; //! # -//! # let publisher = service.publisher_builder().max_slice_len(16).create()?; +//! # let publisher = service.publisher_builder().initial_max_slice_len(16).create()?; //! //! let slice_length = 12; //! let sample = publisher.loan_slice_uninit(slice_length)?; @@ -76,7 +76,7 @@ //! # .publish_subscribe::<[usize]>() //! # .create()?; //! # -//! # let publisher = service.publisher_builder().max_slice_len(16).create()?; +//! # let publisher = service.publisher_builder().initial_max_slice_len(16).create()?; //! //! let slice_length = 4; //! let sample = publisher.loan_slice_uninit(slice_length)?; @@ -95,7 +95,7 @@ use std::{fmt::Debug, mem::MaybeUninit, sync::Arc}; use iceoryx2_cal::shm_allocator::PointerOffset; use crate::{ - port::publisher::DataSegment, raw_sample::RawSampleMut, sample_mut::SampleMut, + port::publisher::PublisherBackend, raw_sample::RawSampleMut, sample_mut::SampleMut, service::header::publish_subscribe::Header, }; @@ -261,15 +261,17 @@ impl SampleMutUninit, UserHeader> { pub(crate) fn new( - data_segment: &Arc>, + publisher_backend: &Arc>, ptr: RawSampleMut>, offset_to_chunk: PointerOffset, + sample_size: usize, ) -> Self { Self { sample: SampleMut { - data_segment: Arc::clone(data_segment), + publisher_backend: Arc::clone(publisher_backend), ptr, offset_to_chunk, + sample_size, }, } } @@ -341,15 +343,17 @@ impl SampleMutUninit], UserHeader> { pub(crate) fn new( - data_segment: &Arc>, + publisher_backend: &Arc>, ptr: RawSampleMut]>, offset_to_chunk: PointerOffset, + sample_size: usize, ) -> Self { Self { sample: SampleMut { - data_segment: Arc::clone(data_segment), + publisher_backend: Arc::clone(publisher_backend), ptr, offset_to_chunk, + sample_size, }, } } @@ -374,7 +378,7 @@ impl /// # .publish_subscribe::<[usize]>() /// # .open_or_create()?; /// # - /// # let publisher = service.publisher_builder().max_slice_len(32).create()?; + /// # let publisher = service.publisher_builder().initial_max_slice_len(32).create()?; /// /// let slice_length = 10; /// let mut sample = publisher.loan_slice_uninit(slice_length)?; @@ -408,7 +412,7 @@ impl /// # .publish_subscribe::<[usize]>() /// # .open_or_create()?; /// # - /// # let publisher = service.publisher_builder().max_slice_len(16).create()?; + /// # let publisher = service.publisher_builder().initial_max_slice_len(16).create()?; /// /// let slice_length = 12; /// let sample = publisher.loan_slice_uninit(slice_length)?; @@ -448,7 +452,7 @@ impl /// # .publish_subscribe::<[usize]>() /// # .open_or_create()?; /// # - /// # let publisher = service.publisher_builder().max_slice_len(16).create()?; + /// # let publisher = service.publisher_builder().initial_max_slice_len(16).create()?; /// /// let slice_length = 3; /// let sample = publisher.loan_slice_uninit(slice_length)?; diff --git a/iceoryx2/src/service/config_scheme.rs b/iceoryx2/src/service/config_scheme.rs index 287b26134..fce12bb49 100644 --- a/iceoryx2/src/service/config_scheme.rs +++ b/iceoryx2/src/service/config_scheme.rs @@ -66,6 +66,15 @@ pub(crate) fn data_segment_config( .path_hint(global_config.global.root_path()) } +pub(crate) fn resizable_data_segment_config( + global_config: &config::Config, +) -> ::Configuration { + <::Configuration>::default() + .prefix(&global_config.global.prefix) + .suffix(&global_config.global.service.publisher_data_segment_suffix) + .path_hint(global_config.global.root_path()) +} + pub(crate) fn node_monitoring_config( global_config: &config::Config, ) -> ::Configuration { diff --git a/iceoryx2/src/service/dynamic_config/event.rs b/iceoryx2/src/service/dynamic_config/event.rs index 0da479fa8..61e8037a7 100644 --- a/iceoryx2/src/service/dynamic_config/event.rs +++ b/iceoryx2/src/service/dynamic_config/event.rs @@ -38,6 +38,7 @@ use crate::{ use super::PortCleanupAction; +#[repr(C)] #[derive(Debug, Clone, Copy)] pub(crate) struct DynamicConfigSettings { pub number_of_listeners: usize, @@ -46,18 +47,21 @@ pub(crate) struct DynamicConfigSettings { /// The dynamic configuration of an [`crate::service::messaging_pattern::MessagingPattern::Event`] /// based service. Contains dynamic parameters like the connected endpoints etc.. +#[repr(C)] #[derive(Debug)] pub struct DynamicConfig { pub(crate) listeners: Container, pub(crate) notifiers: Container, } +#[repr(C)] #[derive(Debug, Clone, Copy)] pub(crate) struct ListenerDetails { pub(crate) listener_id: UniqueListenerId, pub(crate) node_id: NodeId, } +#[repr(C)] #[derive(Debug, Clone, Copy)] pub(crate) struct NotifierDetails { pub(crate) notifier_id: UniqueNotifierId, diff --git a/iceoryx2/src/service/dynamic_config/publish_subscribe.rs b/iceoryx2/src/service/dynamic_config/publish_subscribe.rs index 9fae84e35..bc74446de 100644 --- a/iceoryx2/src/service/dynamic_config/publish_subscribe.rs +++ b/iceoryx2/src/service/dynamic_config/publish_subscribe.rs @@ -33,25 +33,33 @@ use iceoryx2_bb_memory::bump_allocator::BumpAllocator; use crate::{ node::NodeId, - port::port_identifiers::{UniquePortId, UniquePublisherId, UniqueSubscriberId}, + port::{ + details::data_segment::DataSegmentType, + port_identifiers::{UniquePortId, UniquePublisherId, UniqueSubscriberId}, + }, }; use super::PortCleanupAction; +#[repr(C)] #[derive(Debug, Clone, Copy)] pub(crate) struct DynamicConfigSettings { pub number_of_subscribers: usize, pub number_of_publishers: usize, } +#[repr(C)] #[derive(Debug, Clone, Copy)] pub(crate) struct PublisherDetails { pub(crate) publisher_id: UniquePublisherId, pub(crate) node_id: NodeId, pub(crate) number_of_samples: usize, pub(crate) max_slice_len: usize, + pub(crate) data_segment_type: DataSegmentType, + pub(crate) max_number_of_segments: u8, } +#[repr(C)] #[derive(Debug, Copy, Clone)] pub(crate) struct SubscriberDetails { pub(crate) subscriber_id: UniqueSubscriberId, @@ -61,6 +69,7 @@ pub(crate) struct SubscriberDetails { /// The dynamic configuration of an [`crate::service::messaging_pattern::MessagingPattern::Event`] /// based service. Contains dynamic parameters like the connected endpoints etc.. +#[repr(C)] #[derive(Debug)] pub struct DynamicConfig { pub(crate) subscribers: Container, diff --git a/iceoryx2/src/service/ipc.rs b/iceoryx2/src/service/ipc.rs index 22d2d3cf0..2bac04b65 100644 --- a/iceoryx2/src/service/ipc.rs +++ b/iceoryx2/src/service/ipc.rs @@ -52,6 +52,8 @@ impl crate::service::Service for Service { type DynamicStorage = dynamic_storage::posix_shared_memory::Storage; type ServiceNameHasher = hash::sha1::Sha1; type SharedMemory = shared_memory::posix::Memory; + type ResizableSharedMemory = + resizable_shared_memory::dynamic::DynamicMemory; type Connection = zero_copy_connection::posix_shared_memory::Connection; type Event = event::unix_datagram_socket::EventImpl; type Monitoring = monitoring::file_lock::FileLockMonitoring; diff --git a/iceoryx2/src/service/local.rs b/iceoryx2/src/service/local.rs index ef0801fa7..facc73151 100644 --- a/iceoryx2/src/service/local.rs +++ b/iceoryx2/src/service/local.rs @@ -52,6 +52,8 @@ impl crate::service::Service for Service { type DynamicStorage = dynamic_storage::process_local::Storage; type ServiceNameHasher = hash::sha1::Sha1; type SharedMemory = shared_memory::process_local::Memory; + type ResizableSharedMemory = + resizable_shared_memory::dynamic::DynamicMemory; type Connection = zero_copy_connection::process_local::Connection; //type Event = event::process_local::EventImpl; type Event = event::unix_datagram_socket::EventImpl; diff --git a/iceoryx2/src/service/mod.rs b/iceoryx2/src/service/mod.rs index 8ad26cc87..d97b78ce5 100644 --- a/iceoryx2/src/service/mod.rs +++ b/iceoryx2/src/service/mod.rs @@ -186,9 +186,9 @@ use iceoryx2_cal::monitoring::Monitoring; use iceoryx2_cal::named_concept::NamedConceptListError; use iceoryx2_cal::named_concept::*; use iceoryx2_cal::reactor::Reactor; +use iceoryx2_cal::resizable_shared_memory::ResizableSharedMemoryForPoolAllocator; use iceoryx2_cal::serialize::Serialize; -use iceoryx2_cal::shared_memory::SharedMemory; -use iceoryx2_cal::shm_allocator::pool_allocator::PoolAllocator; +use iceoryx2_cal::shared_memory::SharedMemoryForPoolAllocator; use iceoryx2_cal::static_storage::*; use iceoryx2_cal::zero_copy_connection::ZeroCopyConnection; use service_id::ServiceId; @@ -468,7 +468,10 @@ pub trait Service: Debug + Sized + internal::ServiceInternal { type DynamicStorage: DynamicStorage; /// The memory used to store the payload. - type SharedMemory: SharedMemory; + type SharedMemory: SharedMemoryForPoolAllocator; + + /// The dynamic memory used to store dynamic payload + type ResizableSharedMemory: ResizableSharedMemoryForPoolAllocator; /// The connection used to exchange pointers to the payload type Connection: ZeroCopyConnection; diff --git a/iceoryx2/src/service/port_factory/publisher.rs b/iceoryx2/src/service/port_factory/publisher.rs index ab7be51b4..8e377cdfb 100644 --- a/iceoryx2/src/service/port_factory/publisher.rs +++ b/iceoryx2/src/service/port_factory/publisher.rs @@ -45,7 +45,7 @@ //! //! let publisher = pubsub.publisher_builder() //! // allows to call Publisher::loan_slice() with up to 128 elements -//! .max_slice_len(128) +//! .initial_max_slice_len(128) //! .create()?; //! //! let sample = publisher.loan_slice(50)?; @@ -57,6 +57,7 @@ use std::fmt::Debug; use iceoryx2_bb_log::fail; +use iceoryx2_cal::shm_allocator::AllocationStrategy; use serde::{de::Visitor, Deserialize, Serialize}; use super::publish_subscribe::PortFactory; @@ -130,7 +131,8 @@ pub(crate) struct LocalPublisherConfig { pub(crate) max_loaned_samples: usize, pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy, pub(crate) degration_callback: Option>, - pub(crate) max_slice_len: usize, + pub(crate) initial_max_slice_len: usize, + pub(crate) allocation_strategy: AllocationStrategy, } /// Factory to create a new [`Publisher`] port/endpoint for @@ -153,8 +155,9 @@ impl<'factory, Service: service::Service, Payload: Debug + ?Sized, UserHeader: D pub(crate) fn new(factory: &'factory PortFactory) -> Self { Self { config: LocalPublisherConfig { + allocation_strategy: AllocationStrategy::Static, degration_callback: None, - max_slice_len: 1, + initial_max_slice_len: 1, max_loaned_samples: factory .service .__internal_state() @@ -227,8 +230,17 @@ impl { /// Sets the maximum slice length that a user can allocate with /// [`Publisher::loan_slice()`] or [`Publisher::loan_slice_uninit()`]. - pub fn max_slice_len(mut self, value: usize) -> Self { - self.config.max_slice_len = value; + pub fn initial_max_slice_len(mut self, value: usize) -> Self { + self.config.initial_max_slice_len = value; + self + } + + /// Defines the allocation strategy that is used when the provided + /// [`PortFactoryPublisher::initial_max_slice_len()`] is exhausted. This happens when the user + /// acquires a more than max slice len in [`Publisher::loan_slice()`] or + /// [`Publisher::loan_slice_uninit()`]. + pub fn allocation_strategy(mut self, value: AllocationStrategy) -> Self { + self.config.allocation_strategy = value; self } } diff --git a/iceoryx2/src/service/static_config/message_type_details.rs b/iceoryx2/src/service/static_config/message_type_details.rs index 57924140a..1c7b702fd 100644 --- a/iceoryx2/src/service/static_config/message_type_details.rs +++ b/iceoryx2/src/service/static_config/message_type_details.rs @@ -116,15 +116,6 @@ impl MessageTypeDetails { } } - pub(crate) fn payload_layout(&self, number_of_elements: usize) -> Layout { - unsafe { - Layout::from_size_align_unchecked( - self.payload.size * number_of_elements, - self.payload.alignment, - ) - } - } - pub(crate) fn is_compatible_to(&self, rhs: &Self) -> bool { self.header == rhs.header && self.user_header.type_name == rhs.user_header.type_name @@ -267,37 +258,6 @@ mod tests { assert_that!(sut, eq demo.payload); } - #[test] - fn test_payload_layout() { - let details = MessageTypeDetails::from::(TypeVariant::FixedSize); - let sut = details.payload_layout(0); - assert_that!(sut.size(), eq 0); - let sut = details.payload_layout(5); - assert_that!(sut.size(), eq 40); - - #[repr(C)] - struct Demo { - _b: bool, - _i16: i16, - _i64: i64, - } - - let details = MessageTypeDetails::from::(TypeVariant::FixedSize); - let sut = details.payload_layout(1); - #[cfg(target_pointer_width = "32")] - let expected = 12; - #[cfg(target_pointer_width = "64")] - let expected = 16; - assert_that!(sut.size(), eq expected); - - #[cfg(target_pointer_width = "32")] - let expected = 36; - #[cfg(target_pointer_width = "64")] - let expected = 48; - let sut = details.payload_layout(3); - assert_that!(sut.size(), eq expected); - } - #[test] // test_sample_layout tests the sample layout for combinations of different types. fn test_sample_layout() { diff --git a/iceoryx2/tests/publisher_tests.rs b/iceoryx2/tests/publisher_tests.rs index db55b141b..19acf683d 100644 --- a/iceoryx2/tests/publisher_tests.rs +++ b/iceoryx2/tests/publisher_tests.rs @@ -105,7 +105,7 @@ mod publisher { let publisher = service .publisher_builder() - .max_slice_len(NUMBER_OF_ELEMENTS) + .initial_max_slice_len(NUMBER_OF_ELEMENTS) .create()?; let sut = publisher.loan_slice(NUMBER_OF_ELEMENTS)?; @@ -129,7 +129,7 @@ mod publisher { let publisher = service .publisher_builder() - .max_slice_len(NUMBER_OF_ELEMENTS) + .initial_max_slice_len(NUMBER_OF_ELEMENTS) .create()?; for i in 0..NUMBER_OF_ELEMENTS { @@ -153,7 +153,7 @@ mod publisher { let publisher = service .publisher_builder() - .max_slice_len(NUMBER_OF_ELEMENTS) + .initial_max_slice_len(NUMBER_OF_ELEMENTS) .create()?; let sut = publisher.loan_slice(NUMBER_OF_ELEMENTS + 1); diff --git a/iceoryx2/tests/service_publish_subscribe_tests.rs b/iceoryx2/tests/service_publish_subscribe_tests.rs index 355594b92..a48ad311d 100644 --- a/iceoryx2/tests/service_publish_subscribe_tests.rs +++ b/iceoryx2/tests/service_publish_subscribe_tests.rs @@ -20,7 +20,7 @@ mod service_publish_subscribe { use iceoryx2::port::publisher::{PublisherCreateError, PublisherLoanError}; use iceoryx2::port::subscriber::SubscriberCreateError; use iceoryx2::port::update_connections::UpdateConnections; - use iceoryx2::prelude::*; + use iceoryx2::prelude::{AllocationStrategy, *}; use iceoryx2::service::builder::publish_subscribe::PublishSubscribeCreateError; use iceoryx2::service::builder::publish_subscribe::PublishSubscribeOpenError; use iceoryx2::service::builder::publish_subscribe::{CustomHeaderMarker, CustomPayloadMarker}; @@ -2474,7 +2474,7 @@ mod service_publish_subscribe { let publisher = sut .publisher_builder() - .max_slice_len(MAX_ELEMENTS) + .initial_max_slice_len(MAX_ELEMENTS) .create() .unwrap(); let subscriber = sut.subscriber_builder().create().unwrap(); @@ -2517,7 +2517,7 @@ mod service_publish_subscribe { let publisher = service_pub .publisher_builder() - .max_slice_len(MAX_ELEMENTS) + .initial_max_slice_len(MAX_ELEMENTS) .create() .unwrap(); let subscriber = service_sub.subscriber_builder().create().unwrap(); @@ -2958,7 +2958,7 @@ mod service_publish_subscribe { let publisher = sut .publisher_builder() - .max_slice_len(NUMBER_OF_ELEMENTS) + .initial_max_slice_len(NUMBER_OF_ELEMENTS) .create() .unwrap(); let subscriber = sut.subscriber_builder().create().unwrap(); @@ -2976,6 +2976,230 @@ mod service_publish_subscribe { assert_that!(sample.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); } + #[test] + fn send_increasing_samples_with_static_allocation_strategy_fails() { + const SLICE_SIZE: usize = 1024; + let service_name = generate_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let service_pub = node + .service_builder(&service_name) + .publish_subscribe::<[u8]>() + .create() + .unwrap(); + + let publisher = service_pub + .publisher_builder() + .initial_max_slice_len(SLICE_SIZE) + .allocation_strategy(AllocationStrategy::Static) + .create() + .unwrap(); + + let sample = publisher.loan_slice(SLICE_SIZE - 1); + assert_that!(sample, is_ok); + + let sample = publisher.loan_slice(SLICE_SIZE); + assert_that!(sample, is_ok); + + let sample = publisher.loan_slice(SLICE_SIZE + 1); + assert_that!(sample, is_err); + assert_that!(sample.err(), eq Some(PublisherLoanError::ExceedsMaxLoanSize)); + } + + fn send_and_receives_increasing_samples_works( + allocation_strategy: AllocationStrategy, + ) { + const ITERATIONS: usize = 128; + let service_name = generate_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let service_pub = node + .service_builder(&service_name) + .publish_subscribe::<[u8]>() + .create() + .unwrap(); + + let service_sub = node + .service_builder(&service_name) + .publish_subscribe::<[u8]>() + .open() + .unwrap(); + + let publisher = service_pub + .publisher_builder() + .initial_max_slice_len(1) + .allocation_strategy(allocation_strategy) + .create() + .unwrap(); + + let subscriber = service_sub.subscriber_builder().create().unwrap(); + + for n in 0..ITERATIONS { + let sample_size = (n + 1) * 32; + let mut sample = publisher.loan_slice(sample_size).unwrap(); + for byte in sample.payload_mut() { + *byte = n as u8; + } + + sample.send().unwrap(); + + let sample = subscriber.receive().unwrap().unwrap(); + assert_that!(sample.payload(), len sample_size); + for byte in sample.payload() { + assert_that!(*byte, eq n as u8); + } + } + } + + #[test] + fn send_and_receives_increasing_samples_works_for_best_fit_allocation_strategy() { + send_and_receives_increasing_samples_works::(AllocationStrategy::BestFit); + } + + #[test] + fn send_and_receives_increasing_samples_works_for_power_of_two_allocation_strategy< + Sut: Service, + >() { + send_and_receives_increasing_samples_works::(AllocationStrategy::PowerOfTwo); + } + + fn send_and_receives_increasing_samples_with_overflow_works( + allocation_strategy: AllocationStrategy, + ) { + const SUBSCRIBER_MAX_BUFFER_SIZE: usize = 5; + const ITERATIONS: usize = 128; + const REPETITIONS: usize = 13; + let service_name = generate_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let service_pub = node + .service_builder(&service_name) + .publish_subscribe::<[u8]>() + .subscriber_max_buffer_size(SUBSCRIBER_MAX_BUFFER_SIZE) + .create() + .unwrap(); + + let service_sub = node + .service_builder(&service_name) + .publish_subscribe::<[u8]>() + .open() + .unwrap(); + + let publisher = service_pub + .publisher_builder() + .initial_max_slice_len(1) + .allocation_strategy(allocation_strategy) + .create() + .unwrap(); + + let subscriber = service_sub.subscriber_builder().create().unwrap(); + + for n in 0..ITERATIONS { + let sample_size = (n + 1) * 32; + for _ in 0..REPETITIONS { + let mut sample = publisher.loan_slice(sample_size).unwrap(); + for byte in sample.payload_mut() { + *byte = n as u8; + } + + sample.send().unwrap(); + } + + let sample = subscriber.receive().unwrap().unwrap(); + assert_that!(sample.payload(), len sample_size); + for byte in sample.payload() { + assert_that!(*byte, eq n as u8); + } + } + } + + #[test] + fn send_and_receives_increasing_samples_with_overflow_for_best_fit_allocation_strategy< + Sut: Service, + >() { + send_and_receives_increasing_samples_with_overflow_works::( + AllocationStrategy::BestFit, + ); + } + + #[test] + fn send_and_receives_increasing_samples_with_overflow_for_power_of_two_allocation_strategy< + Sut: Service, + >() { + send_and_receives_increasing_samples_with_overflow_works::( + AllocationStrategy::PowerOfTwo, + ); + } + + fn deliver_history_with_increasing_samples_works( + allocation_strategy: AllocationStrategy, + ) { + const SUBSCRIBER_MAX_BUFFER_SIZE: usize = 12; + let service_name = generate_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let service_pub = node + .service_builder(&service_name) + .publish_subscribe::<[u8]>() + .subscriber_max_buffer_size(SUBSCRIBER_MAX_BUFFER_SIZE) + .history_size(SUBSCRIBER_MAX_BUFFER_SIZE) + .create() + .unwrap(); + + let service_sub = node + .service_builder(&service_name) + .publish_subscribe::<[u8]>() + .open() + .unwrap(); + + let publisher = service_pub + .publisher_builder() + .initial_max_slice_len(1) + .allocation_strategy(allocation_strategy) + .create() + .unwrap(); + + for n in 0..SUBSCRIBER_MAX_BUFFER_SIZE { + let sample_size = (n + 1) * 32; + let mut sample = publisher.loan_slice(sample_size).unwrap(); + for byte in sample.payload_mut() { + *byte = n as u8; + } + + sample.send().unwrap(); + } + + let subscriber = service_sub.subscriber_builder().create().unwrap(); + publisher.update_connections().unwrap(); + + for n in 0..SUBSCRIBER_MAX_BUFFER_SIZE { + let sample_size = (n + 1) * 32; + let sample = subscriber.receive().unwrap().unwrap(); + assert_that!(sample.payload(), len sample_size); + for byte in sample.payload() { + assert_that!(*byte, eq n as u8); + } + } + } + + #[test] + fn deliver_history_with_increasing_samples_works_for_best_fit_allocation_strategy< + Sut: Service, + >() { + deliver_history_with_increasing_samples_works::(AllocationStrategy::BestFit); + } + + #[test] + fn deliver_history_with_increasing_samples_works_for_power_of_two_allocation_strategy< + Sut: Service, + >() { + deliver_history_with_increasing_samples_works::(AllocationStrategy::PowerOfTwo); + } + #[instantiate_tests()] mod ipc {}