diff --git a/iceoryx2-ffi/cxx/include/iox2/header_publish_subscribe.hpp b/iceoryx2-ffi/cxx/include/iox2/header_publish_subscribe.hpp index ac29d5ed7..a548ea9a8 100644 --- a/iceoryx2-ffi/cxx/include/iox2/header_publish_subscribe.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/header_publish_subscribe.hpp @@ -30,8 +30,8 @@ class HeaderPublishSubscribe { /// Returns the [`UniquePublisherId`] of the source [`Publisher`]. auto publisher_id() const -> UniquePublisherId; - /// Returns the [`Layout`] of the corresponding payload. - auto payload_type_layout() const -> iox::Layout; + /// Returns the number of [`Payload`] elements in the received [`Sample`]. + auto number_of_elements() const -> uint64_t; private: template diff --git a/iceoryx2-ffi/cxx/src/header_publish_subscribe.cpp b/iceoryx2-ffi/cxx/src/header_publish_subscribe.cpp index d8c0389af..3ac5932ec 100644 --- a/iceoryx2-ffi/cxx/src/header_publish_subscribe.cpp +++ b/iceoryx2-ffi/cxx/src/header_publish_subscribe.cpp @@ -51,10 +51,7 @@ auto HeaderPublishSubscribe::publisher_id() const -> UniquePublisherId { return UniquePublisherId { id_handle }; } -auto HeaderPublishSubscribe::payload_type_layout() const -> iox::Layout { - auto size = iox2_publish_subscribe_header_payload_type_size(&m_handle); - auto alignment = iox2_publish_subscribe_header_payload_type_alignment(&m_handle); - - return iox::Layout::create(size, alignment).expect("Payload layout is always valid."); +auto HeaderPublishSubscribe::number_of_elements() const -> uint64_t { + return iox2_publish_subscribe_header_number_of_elements(&m_handle); } } // namespace iox2 diff --git a/iceoryx2-ffi/ffi/src/api/publish_subscribe_header.rs b/iceoryx2-ffi/ffi/src/api/publish_subscribe_header.rs index c7b591779..fe18c5e7d 100644 --- a/iceoryx2-ffi/ffi/src/api/publish_subscribe_header.rs +++ b/iceoryx2-ffi/ffi/src/api/publish_subscribe_header.rs @@ -154,7 +154,12 @@ pub unsafe extern "C" fn iox2_publish_subscribe_header_publisher_id( *id_handle_ptr = (*storage_ptr).as_handle(); } -/// Returns the number of bytes of the payload +/// Returns the number of elements of the payload. +/// The element size is defined via this call when creating a new service +/// [`crate::iox2_service_builder_pub_sub_set_payload_type_details()`]. +/// So if the payload is defined with alignment 8 and size 16 and this function returns 5. It +/// means that the payload consists of 5 elements of size 16 and every element is 8 byte aligned. +/// Therefore, the payload pointer points to a memory region with 5 * 16 = 80 bytes. /// /// # Arguments /// @@ -165,16 +170,13 @@ pub unsafe extern "C" fn iox2_publish_subscribe_header_publisher_id( /// /// * `header_handle` is valid and non-null #[no_mangle] -pub unsafe extern "C" fn iox2_publish_subscribe_header_number_of_bytes( +pub unsafe extern "C" fn iox2_publish_subscribe_header_number_of_elements( header_handle: iox2_publish_subscribe_header_h_ref, ) -> u64 { header_handle.assert_non_null(); let header = &mut *header_handle.as_type(); - // In the typed Rust API it is the number of elements and the element is a - // CustomPayloadMarker. But this translates to the number of bytes whenever CustomPayloadMarker - // is used. header.value.as_ref().number_of_elements() } // END C API diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index 61858c9e2..d580d0443 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -116,6 +116,7 @@ use crate::service::naming_scheme::{ data_segment_name, extract_publisher_id_from_connection, extract_subscriber_id_from_connection, }; use crate::service::port_factory::publisher::{LocalPublisherConfig, UnableToDeliverStrategy}; +use crate::service::static_config::message_type_details::TypeVariant; use crate::service::static_config::publish_subscribe::{self}; use crate::service::{self, ServiceState}; use crate::{config, sample_mut::SampleMut}; @@ -139,6 +140,7 @@ use iceoryx2_cal::zero_copy_connection::{ ZeroCopyConnection, ZeroCopyCreationError, ZeroCopySendError, ZeroCopySender, }; use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicU64, IoxAtomicUsize}; +use std::any::TypeId; use std::cell::UnsafeCell; use std::fmt::Debug; use std::sync::atomic::Ordering; @@ -529,7 +531,11 @@ impl DataSegment { /// Sending endpoint of a publish-subscriber based communication. #[derive(Debug)] -pub struct Publisher { +pub struct Publisher< + Service: service::Service, + Payload: Debug + ?Sized + 'static, + UserHeader: Debug, +> { pub(crate) data_segment: Arc>, dynamic_publisher_handle: Option, payload_size: usize, @@ -756,6 +762,15 @@ impl .payload_ptr_from_header(header.cast()) .cast() } + + fn payload_type_variant(&self) -> TypeVariant { + self.data_segment + .subscriber_connections + .static_config + .message_type_details + .payload + .variant + } } //////////////////////// @@ -962,6 +977,18 @@ impl &self, slice_len: usize, ) -> Result], UserHeader>, PublisherLoanError> + { + // required since Rust does not support generic specializations or negative traits + debug_assert!(TypeId::of::() != TypeId::of::()); + + unsafe { self.loan_slice_uninit_impl(slice_len, slice_len) } + } + + unsafe fn loan_slice_uninit_impl( + &self, + slice_len: usize, + underlying_number_of_slice_elements: usize, + ) -> Result], UserHeader>, PublisherLoanError> { let max_slice_len = self.data_segment.config.max_slice_len; if max_slice_len < slice_len { @@ -982,7 +1009,7 @@ impl RawSampleMut::new_unchecked( header_ptr, user_header_ptr, - core::slice::from_raw_parts_mut(payload_ptr, slice_len), + core::slice::from_raw_parts_mut(payload_ptr, underlying_number_of_slice_elements), ) }; @@ -999,6 +1026,13 @@ impl impl Publisher { + /// # Safety + /// + /// * slice_len != 1 only when payload TypeVariant == Dynamic + /// * The number_of_elements in the [`Header`](crate::service::header::publish_subscribe::Header) + /// is set to `slice_len` + /// * The [`SampleMutUninit`] will contain `slice_len` * `MessageTypeDetails::payload.size` + /// elements of type [`CustomPayloadMarker`]. #[doc(hidden)] pub unsafe fn loan_custom_payload( &self, @@ -1007,36 +1041,13 @@ impl SampleMutUninit], UserHeader>, PublisherLoanError, > { - let max_slice_len = self.data_segment.config.max_slice_len; - if max_slice_len < slice_len { - fail!(from self, with PublisherLoanError::ExceedsMaxLoanSize, - "Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.", - slice_len, max_slice_len); - } - - let sample_layout = self.sample_layout(slice_len); - let chunk = self.allocate(sample_layout)?; - let header_ptr = chunk.data_ptr as *mut Header; - let user_header_ptr = self.user_header_ptr(header_ptr) as *mut UserHeader; - let payload_ptr = self.payload_ptr(header_ptr) as *mut MaybeUninit; - - let slice_len = self.payload_size * slice_len; - - unsafe { header_ptr.write(Header::new(self.data_segment.port_id, slice_len as _)) }; - - let sample = unsafe { - RawSampleMut::new_unchecked( - header_ptr, - user_header_ptr, - core::slice::from_raw_parts_mut(payload_ptr, slice_len), - ) - }; + // TypeVariant::Dynamic == slice and only here it makes sense to loan more than one element + debug_assert!( + slice_len == 1 + || (slice_len != 1 && self.payload_type_variant() == TypeVariant::Dynamic) + ); - Ok(SampleMutUninit::< - Service, - [MaybeUninit], - UserHeader, - >::new(&self.data_segment, sample, chunk.offset)) + self.loan_slice_uninit_impl(slice_len, self.payload_size * slice_len) } } //////////////////////// diff --git a/iceoryx2/src/port/subscriber.rs b/iceoryx2/src/port/subscriber.rs index 585ac3694..af5d1000a 100644 --- a/iceoryx2/src/port/subscriber.rs +++ b/iceoryx2/src/port/subscriber.rs @@ -31,6 +31,7 @@ //! # } //! ``` +use std::any::TypeId; use std::cell::UnsafeCell; use std::fmt::Debug; use std::marker::PhantomData; @@ -46,6 +47,7 @@ use iceoryx2_cal::{shared_memory::*, zero_copy_connection::*}; use crate::port::DegrationAction; use crate::sample::SampleDetails; +use crate::service::builder::publish_subscribe::CustomPayloadMarker; use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails}; use crate::service::header::publish_subscribe::Header; use crate::service::port_factory::subscriber::SubscriberConfig; @@ -102,7 +104,11 @@ impl std::error::Error for SubscriberCreateError {} /// The receiving endpoint of a publish-subscribe communication. #[derive(Debug)] -pub struct Subscriber { +pub struct Subscriber< + Service: service::Service, + Payload: Debug + ?Sized + 'static, + UserHeader: Debug, +> { dynamic_subscriber_handle: Option, publisher_connections: PublisherConnections, to_be_removed_connections: UnsafeCell>>>, @@ -423,6 +429,8 @@ impl pub fn receive( &self, ) -> Result>, SubscriberReceiveError> { + debug_assert!(TypeId::of::() != TypeId::of::()); + Ok(self.receive_impl()?.map(|(details, absolute_address)| { let header_ptr = absolute_address as *const Header; let user_header_ptr = self.user_header_ptr(header_ptr).cast(); @@ -443,6 +451,8 @@ impl pub fn receive( &self, ) -> Result>, SubscriberReceiveError> { + debug_assert!(TypeId::of::() != TypeId::of::()); + Ok(self.receive_impl()?.map(|(details, absolute_address)| { let header_ptr = absolute_address as *const Header; let user_header_ptr = self.user_header_ptr(header_ptr).cast(); @@ -462,3 +472,45 @@ impl })) } } + +impl + Subscriber +{ + /// # Safety + /// + /// * The number_of_elements in the [`Header`](crate::service::header::publish_subscribe::Header) + /// corresponds to the payload type details that where overridden in + /// `MessageTypeDetails::payload.size`. Meaning, when the payload.size == 8 and the number + /// of elements if 5, it means that the sample will contain a slice of 8 * 5 = 40 + /// [`CustomPayloadMarker`]s. + #[doc(hidden)] + pub unsafe fn receive_custom_payload( + &self, + ) -> Result>, SubscriberReceiveError> + { + Ok(self.receive_impl()?.map(|(details, absolute_address)| { + let header_ptr = absolute_address as *const Header; + let user_header_ptr = self.user_header_ptr(header_ptr).cast(); + let payload_ptr = self.payload_ptr(header_ptr).cast(); + let number_of_elements = unsafe { (*header_ptr).number_of_elements() }; + let number_of_bytes = number_of_elements as usize + * self + .static_config + .publish_subscribe() + .message_type_details + .payload + .size; + + Sample { + details, + ptr: unsafe { + RawSample::::new_slice_unchecked( + header_ptr, + user_header_ptr, + core::slice::from_raw_parts(payload_ptr, number_of_bytes), + ) + }, + } + })) + } +} diff --git a/iceoryx2/src/sample_mut_uninit.rs b/iceoryx2/src/sample_mut_uninit.rs index e4d23950c..83003c13a 100644 --- a/iceoryx2/src/sample_mut_uninit.rs +++ b/iceoryx2/src/sample_mut_uninit.rs @@ -114,7 +114,7 @@ use crate::{ /// /// The generic parameter `Payload` is actually [`core::mem::MaybeUninit`]. pub struct SampleMutUninit { - sample: SampleMut, + pub(crate) sample: SampleMut, } impl diff --git a/iceoryx2/src/service/builder/publish_subscribe.rs b/iceoryx2/src/service/builder/publish_subscribe.rs index c2ee1143b..7eae4ff59 100644 --- a/iceoryx2/src/service/builder/publish_subscribe.rs +++ b/iceoryx2/src/service/builder/publish_subscribe.rs @@ -43,7 +43,7 @@ pub struct CustomHeaderMarker {} #[repr(C)] #[derive(Debug)] #[doc(hidden)] -pub struct CustomPayloadMarker {} +pub struct CustomPayloadMarker(u8); /// Errors that can occur when an existing [`MessagingPattern::PublishSubscribe`] [`Service`] shall be opened. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] diff --git a/iceoryx2/src/service/static_config/message_type_details.rs b/iceoryx2/src/service/static_config/message_type_details.rs index 3efa1fe9f..57924140a 100644 --- a/iceoryx2/src/service/static_config/message_type_details.rs +++ b/iceoryx2/src/service/static_config/message_type_details.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; /// Defines if the type is a slice with a runtime-size ([`TypeVariant::Dynamic`]) /// or if its a type that satisfies [`Sized`] ([`TypeVariant::FixedSize`]). -#[derive(Default, Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] pub enum TypeVariant { #[default] /// A type notated by [`#[repr(C)]`](https://doc.rust-lang.org/reference/type-layout.html#reprc). diff --git a/iceoryx2/tests/publisher_tests.rs b/iceoryx2/tests/publisher_tests.rs index eeb92c16f..db55b141b 100644 --- a/iceoryx2/tests/publisher_tests.rs +++ b/iceoryx2/tests/publisher_tests.rs @@ -394,8 +394,7 @@ mod publisher { } #[test] - fn publisher_with_overridden_payload_details_adjusts_slice_len() -> TestResult<()> - { + fn publisher_with_custom_payload_details_adjusts_slice_len() -> TestResult<()> { const TYPE_SIZE_OVERRIDE: usize = 128; let service_name = generate_name()?; let config = generate_isolated_config(); @@ -419,6 +418,54 @@ mod publisher { Ok(()) } + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn publisher_with_custom_payload_details_panics_when_calling_loan_slice_uninit() { + const TYPE_SIZE_OVERRIDE: usize = 128; + let service_name = generate_name().unwrap(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let mut type_detail = TypeDetail::__internal_new::(TypeVariant::FixedSize); + type_detail.size = TYPE_SIZE_OVERRIDE; + + let service = unsafe { + node.service_builder(&service_name) + .publish_subscribe::<[CustomPayloadMarker]>() + .__internal_set_payload_type_details(&type_detail) + .create() + .unwrap() + }; + + let sut = service.publisher_builder().create().unwrap(); + + // panics here + let _sample = sut.loan_slice_uninit(1); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn custom_fixed_size_payload_panics_when_loaning_more_than_one_element() { + set_log_level(LogLevel::Error); + let service_name = generate_name().unwrap(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let type_details = TypeDetail::__internal_new::(TypeVariant::FixedSize); + + let service = unsafe { + node.service_builder(&service_name) + .publish_subscribe::<[CustomPayloadMarker]>() + .__internal_set_payload_type_details(&type_details) + .create() + .unwrap() + }; + + let sut = service.publisher_builder().create().unwrap(); + + let _sample = unsafe { sut.loan_custom_payload(2) }; + } + #[instantiate_tests()] mod ipc {} diff --git a/iceoryx2/tests/service_publish_subscribe_tests.rs b/iceoryx2/tests/service_publish_subscribe_tests.rs index 2210abf95..355594b92 100644 --- a/iceoryx2/tests/service_publish_subscribe_tests.rs +++ b/iceoryx2/tests/service_publish_subscribe_tests.rs @@ -2902,6 +2902,80 @@ mod service_publish_subscribe { assert_that!(*sample, eq 456); } + #[test] + fn communication_with_custom_payload_works() { + set_log_level(LogLevel::Error); + const NUMBER_OF_ELEMENTS: usize = 1; + let service_name = generate_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let mut type_details = TypeDetail::__internal_new::(TypeVariant::FixedSize); + type_details.size = 1024; + type_details.alignment = 1024; + + let sut = unsafe { + node.service_builder(&service_name) + .publish_subscribe::<[CustomPayloadMarker]>() + .__internal_set_payload_type_details(&type_details) + .create() + .unwrap() + }; + + let publisher = sut.publisher_builder().create().unwrap(); + let subscriber = sut.subscriber_builder().create().unwrap(); + + let sample = unsafe { publisher.loan_custom_payload(NUMBER_OF_ELEMENTS).unwrap() }; + assert_that!(sample.payload(), len type_details.size); + assert_that!((sample.payload().as_ptr() as usize % type_details.alignment), eq 0); + assert_that!(sample.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); + + unsafe { sample.assume_init().send().unwrap() }; + + let sample = unsafe { subscriber.receive_custom_payload().unwrap().unwrap() }; + assert_that!(sample.payload(), len type_details.size); + assert_that!((sample.payload().as_ptr() as usize % type_details.alignment), eq 0); + assert_that!(sample.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); + } + + #[test] + fn communication_with_custom_slice_payload_works() { + set_log_level(LogLevel::Error); + const NUMBER_OF_ELEMENTS: usize = 7; + let service_name = generate_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let mut type_details = TypeDetail::__internal_new::(TypeVariant::Dynamic); + type_details.size = 2048; + type_details.alignment = 2048; + + let sut = unsafe { + node.service_builder(&service_name) + .publish_subscribe::<[CustomPayloadMarker]>() + .__internal_set_payload_type_details(&type_details) + .create() + .unwrap() + }; + + let publisher = sut + .publisher_builder() + .max_slice_len(NUMBER_OF_ELEMENTS) + .create() + .unwrap(); + let subscriber = sut.subscriber_builder().create().unwrap(); + + let sample = unsafe { publisher.loan_custom_payload(NUMBER_OF_ELEMENTS).unwrap() }; + assert_that!(sample.payload(), len type_details.size * NUMBER_OF_ELEMENTS); + assert_that!((sample.payload().as_ptr() as usize % type_details.alignment), eq 0); + assert_that!(sample.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); + + unsafe { sample.assume_init().send().unwrap() }; + + let sample = unsafe { subscriber.receive_custom_payload().unwrap().unwrap() }; + assert_that!(sample.payload(), len type_details.size * NUMBER_OF_ELEMENTS); + assert_that!((sample.payload().as_ptr() as usize % type_details.alignment), eq 0); + assert_that!(sample.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); + } + #[instantiate_tests()] mod ipc {} diff --git a/iceoryx2/tests/subscriber_tests.rs b/iceoryx2/tests/subscriber_tests.rs index 273743c37..1fed1494e 100644 --- a/iceoryx2/tests/subscriber_tests.rs +++ b/iceoryx2/tests/subscriber_tests.rs @@ -12,6 +12,8 @@ #[generic_tests::define] mod subscriber { + use iceoryx2::service::builder::publish_subscribe::CustomPayloadMarker; + use iceoryx2::service::static_config::message_type_details::{TypeDetail, TypeVariant}; use std::collections::HashSet; use iceoryx2::{ @@ -69,6 +71,31 @@ mod subscriber { } } + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn subscriber_with_custom_payload_details_panics_when_calling_receive() { + const TYPE_SIZE_OVERRIDE: usize = 128; + let service_name = generate_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let mut type_detail = TypeDetail::__internal_new::(TypeVariant::FixedSize); + type_detail.size = TYPE_SIZE_OVERRIDE; + + let service = unsafe { + node.service_builder(&service_name) + .publish_subscribe::<[CustomPayloadMarker]>() + .__internal_set_payload_type_details(&type_detail) + .create() + .unwrap() + }; + + let sut = service.subscriber_builder().create().unwrap(); + + // panics here + let _sample = sut.receive(); + } + #[instantiate_tests()] mod ipc {}