Skip to content

Commit

Permalink
[#498] Separate the CustomPayloadType API from the normal safe API
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Nov 1, 2024
1 parent 88e86e6 commit d50afad
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 49 deletions.
4 changes: 2 additions & 2 deletions iceoryx2-ffi/cxx/include/iox2/header_publish_subscribe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class HeaderPublishSubscribe {
/// Returns the [`UniquePublisherId`] of the source [`Publisher`].
auto publisher_id() const -> UniquePublisherId;

/// Returns the [`Layout`] of the corresponding payload.
auto payload_type_layout() const -> iox::Layout;
/// Returns the number of [`Payload`] elements in the received [`Sample`].
auto number_of_elements() const -> uint64_t;

private:
template <ServiceType, typename, typename>
Expand Down
7 changes: 2 additions & 5 deletions iceoryx2-ffi/cxx/src/header_publish_subscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ auto HeaderPublishSubscribe::publisher_id() const -> UniquePublisherId {
return UniquePublisherId { id_handle };
}

auto HeaderPublishSubscribe::payload_type_layout() const -> iox::Layout {
auto size = iox2_publish_subscribe_header_payload_type_size(&m_handle);
auto alignment = iox2_publish_subscribe_header_payload_type_alignment(&m_handle);

return iox::Layout::create(size, alignment).expect("Payload layout is always valid.");
auto HeaderPublishSubscribe::number_of_elements() const -> uint64_t {
return iox2_publish_subscribe_header_number_of_elements(&m_handle);
}
} // namespace iox2
12 changes: 7 additions & 5 deletions iceoryx2-ffi/ffi/src/api/publish_subscribe_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ pub unsafe extern "C" fn iox2_publish_subscribe_header_publisher_id(
*id_handle_ptr = (*storage_ptr).as_handle();
}

/// Returns the number of bytes of the payload
/// Returns the number of elements of the payload.
/// The element size is defined via this call when creating a new service
/// [`crate::iox2_service_builder_pub_sub_set_payload_type_details()`].
/// So if the payload is defined with alignment 8 and size 16 and this function returns 5. It
/// means that the payload consists of 5 elements of size 16 and every element is 8 byte aligned.
/// Therefore, the payload pointer points to a memory region with 5 * 16 = 80 bytes.
///
/// # Arguments
///
Expand All @@ -165,16 +170,13 @@ pub unsafe extern "C" fn iox2_publish_subscribe_header_publisher_id(
///
/// * `header_handle` is valid and non-null
#[no_mangle]
pub unsafe extern "C" fn iox2_publish_subscribe_header_number_of_bytes(
pub unsafe extern "C" fn iox2_publish_subscribe_header_number_of_elements(
header_handle: iox2_publish_subscribe_header_h_ref,
) -> u64 {
header_handle.assert_non_null();

let header = &mut *header_handle.as_type();

// In the typed Rust API it is the number of elements and the element is a
// CustomPayloadMarker. But this translates to the number of bytes whenever CustomPayloadMarker
// is used.
header.value.as_ref().number_of_elements()
}
// END C API
73 changes: 42 additions & 31 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ use crate::service::naming_scheme::{
data_segment_name, extract_publisher_id_from_connection, extract_subscriber_id_from_connection,
};
use crate::service::port_factory::publisher::{LocalPublisherConfig, UnableToDeliverStrategy};
use crate::service::static_config::message_type_details::TypeVariant;
use crate::service::static_config::publish_subscribe::{self};
use crate::service::{self, ServiceState};
use crate::{config, sample_mut::SampleMut};
Expand All @@ -139,6 +140,7 @@ use iceoryx2_cal::zero_copy_connection::{
ZeroCopyConnection, ZeroCopyCreationError, ZeroCopySendError, ZeroCopySender,
};
use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicU64, IoxAtomicUsize};
use std::any::TypeId;
use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -529,7 +531,11 @@ impl<Service: service::Service> DataSegment<Service> {

/// Sending endpoint of a publish-subscriber based communication.
#[derive(Debug)]
pub struct Publisher<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug> {
pub struct Publisher<
Service: service::Service,
Payload: Debug + ?Sized + 'static,
UserHeader: Debug,
> {
pub(crate) data_segment: Arc<DataSegment<Service>>,
dynamic_publisher_handle: Option<ContainerHandle>,
payload_size: usize,
Expand Down Expand Up @@ -756,6 +762,15 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
.payload_ptr_from_header(header.cast())
.cast()
}

fn payload_type_variant(&self) -> TypeVariant {
self.data_segment
.subscriber_connections
.static_config
.message_type_details
.payload
.variant
}
}

////////////////////////
Expand Down Expand Up @@ -962,6 +977,18 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
&self,
slice_len: usize,
) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, PublisherLoanError>
{
// required since Rust does not support generic specializations or negative traits
debug_assert!(TypeId::of::<Payload>() != TypeId::of::<CustomPayloadMarker>());

unsafe { self.loan_slice_uninit_impl(slice_len, slice_len) }
}

unsafe fn loan_slice_uninit_impl(
&self,
slice_len: usize,
underlying_number_of_slice_elements: usize,
) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, PublisherLoanError>
{
let max_slice_len = self.data_segment.config.max_slice_len;
if max_slice_len < slice_len {
Expand All @@ -982,7 +1009,7 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
RawSampleMut::new_unchecked(
header_ptr,
user_header_ptr,
core::slice::from_raw_parts_mut(payload_ptr, slice_len),
core::slice::from_raw_parts_mut(payload_ptr, underlying_number_of_slice_elements),
)
};

Expand All @@ -999,6 +1026,13 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
impl<Service: service::Service, UserHeader: Debug>
Publisher<Service, [CustomPayloadMarker], UserHeader>
{
/// # Safety
///
/// * slice_len != 1 only when payload TypeVariant == Dynamic
/// * The number_of_elements in the [`Header`](crate::service::header::publish_subscribe::Header)
/// is set to `slice_len`
/// * The [`SampleMutUninit`] will contain `slice_len` * `MessageTypeDetails::payload.size`
/// elements of type [`CustomPayloadMarker`].
#[doc(hidden)]
pub unsafe fn loan_custom_payload(
&self,
Expand All @@ -1007,36 +1041,13 @@ impl<Service: service::Service, UserHeader: Debug>
SampleMutUninit<Service, [MaybeUninit<CustomPayloadMarker>], UserHeader>,
PublisherLoanError,
> {
let max_slice_len = self.data_segment.config.max_slice_len;
if max_slice_len < slice_len {
fail!(from self, with PublisherLoanError::ExceedsMaxLoanSize,
"Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.",
slice_len, max_slice_len);
}

let sample_layout = self.sample_layout(slice_len);
let chunk = self.allocate(sample_layout)?;
let header_ptr = chunk.data_ptr as *mut Header;
let user_header_ptr = self.user_header_ptr(header_ptr) as *mut UserHeader;
let payload_ptr = self.payload_ptr(header_ptr) as *mut MaybeUninit<CustomPayloadMarker>;

let slice_len = self.payload_size * slice_len;

unsafe { header_ptr.write(Header::new(self.data_segment.port_id, slice_len as _)) };

let sample = unsafe {
RawSampleMut::new_unchecked(
header_ptr,
user_header_ptr,
core::slice::from_raw_parts_mut(payload_ptr, slice_len),
)
};
// TypeVariant::Dynamic == slice and only here it makes sense to loan more than one element
debug_assert!(
slice_len == 1
|| (slice_len != 1 && self.payload_type_variant() == TypeVariant::Dynamic)
);

Ok(SampleMutUninit::<
Service,
[MaybeUninit<CustomPayloadMarker>],
UserHeader,
>::new(&self.data_segment, sample, chunk.offset))
self.loan_slice_uninit_impl(slice_len, self.payload_size * slice_len)
}
}
////////////////////////
Expand Down
54 changes: 53 additions & 1 deletion iceoryx2/src/port/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! # }
//! ```
use std::any::TypeId;
use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::marker::PhantomData;
Expand All @@ -46,6 +47,7 @@ use iceoryx2_cal::{shared_memory::*, zero_copy_connection::*};

use crate::port::DegrationAction;
use crate::sample::SampleDetails;
use crate::service::builder::publish_subscribe::CustomPayloadMarker;
use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails};
use crate::service::header::publish_subscribe::Header;
use crate::service::port_factory::subscriber::SubscriberConfig;
Expand Down Expand Up @@ -102,7 +104,11 @@ impl std::error::Error for SubscriberCreateError {}

/// The receiving endpoint of a publish-subscribe communication.
#[derive(Debug)]
pub struct Subscriber<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug> {
pub struct Subscriber<
Service: service::Service,
Payload: Debug + ?Sized + 'static,
UserHeader: Debug,
> {
dynamic_subscriber_handle: Option<ContainerHandle>,
publisher_connections: PublisherConnections<Service>,
to_be_removed_connections: UnsafeCell<Queue<Arc<Connection<Service>>>>,
Expand Down Expand Up @@ -423,6 +429,8 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
pub fn receive(
&self,
) -> Result<Option<Sample<Service, Payload, UserHeader>>, SubscriberReceiveError> {
debug_assert!(TypeId::of::<Payload>() != TypeId::of::<CustomPayloadMarker>());

Ok(self.receive_impl()?.map(|(details, absolute_address)| {
let header_ptr = absolute_address as *const Header;
let user_header_ptr = self.user_header_ptr(header_ptr).cast();
Expand All @@ -443,6 +451,8 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
pub fn receive(
&self,
) -> Result<Option<Sample<Service, [Payload], UserHeader>>, SubscriberReceiveError> {
debug_assert!(TypeId::of::<Payload>() != TypeId::of::<CustomPayloadMarker>());

Ok(self.receive_impl()?.map(|(details, absolute_address)| {
let header_ptr = absolute_address as *const Header;
let user_header_ptr = self.user_header_ptr(header_ptr).cast();
Expand All @@ -462,3 +472,45 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
}))
}
}

impl<Service: service::Service, UserHeader: Debug>
Subscriber<Service, [CustomPayloadMarker], UserHeader>
{
/// # Safety
///
/// * The number_of_elements in the [`Header`](crate::service::header::publish_subscribe::Header)
/// corresponds to the payload type details that where overridden in
/// `MessageTypeDetails::payload.size`. Meaning, when the payload.size == 8 and the number
/// of elements if 5, it means that the sample will contain a slice of 8 * 5 = 40
/// [`CustomPayloadMarker`]s.
#[doc(hidden)]
pub unsafe fn receive_custom_payload(
&self,
) -> Result<Option<Sample<Service, [CustomPayloadMarker], UserHeader>>, SubscriberReceiveError>
{
Ok(self.receive_impl()?.map(|(details, absolute_address)| {
let header_ptr = absolute_address as *const Header;
let user_header_ptr = self.user_header_ptr(header_ptr).cast();
let payload_ptr = self.payload_ptr(header_ptr).cast();
let number_of_elements = unsafe { (*header_ptr).number_of_elements() };
let number_of_bytes = number_of_elements as usize
* self
.static_config
.publish_subscribe()
.message_type_details
.payload
.size;

Sample {
details,
ptr: unsafe {
RawSample::<Header, UserHeader, [CustomPayloadMarker]>::new_slice_unchecked(
header_ptr,
user_header_ptr,
core::slice::from_raw_parts(payload_ptr, number_of_bytes),
)
},
}
}))
}
}
2 changes: 1 addition & 1 deletion iceoryx2/src/sample_mut_uninit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ use crate::{
///
/// The generic parameter `Payload` is actually [`core::mem::MaybeUninit<Payload>`].
pub struct SampleMutUninit<Service: crate::service::Service, Payload: Debug + ?Sized, UserHeader> {
sample: SampleMut<Service, Payload, UserHeader>,
pub(crate) sample: SampleMut<Service, Payload, UserHeader>,
}

impl<Service: crate::service::Service, Payload: Debug + ?Sized, UserHeader>
Expand Down
2 changes: 1 addition & 1 deletion iceoryx2/src/service/builder/publish_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct CustomHeaderMarker {}
#[repr(C)]
#[derive(Debug)]
#[doc(hidden)]
pub struct CustomPayloadMarker {}
pub struct CustomPayloadMarker(u8);

/// Errors that can occur when an existing [`MessagingPattern::PublishSubscribe`] [`Service`] shall be opened.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion iceoryx2/src/service/static_config/message_type_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize};

/// Defines if the type is a slice with a runtime-size ([`TypeVariant::Dynamic`])
/// or if its a type that satisfies [`Sized`] ([`TypeVariant::FixedSize`]).
#[derive(Default, Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Default, Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum TypeVariant {
#[default]
/// A type notated by [`#[repr(C)]`](https://doc.rust-lang.org/reference/type-layout.html#reprc).
Expand Down
51 changes: 49 additions & 2 deletions iceoryx2/tests/publisher_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ mod publisher {
}

#[test]
fn publisher_with_overridden_payload_details_adjusts_slice_len<Sut: Service>() -> TestResult<()>
{
fn publisher_with_custom_payload_details_adjusts_slice_len<Sut: Service>() -> TestResult<()> {
const TYPE_SIZE_OVERRIDE: usize = 128;
let service_name = generate_name()?;
let config = generate_isolated_config();
Expand All @@ -419,6 +418,54 @@ mod publisher {
Ok(())
}

#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn publisher_with_custom_payload_details_panics_when_calling_loan_slice_uninit<Sut: Service>() {
const TYPE_SIZE_OVERRIDE: usize = 128;
let service_name = generate_name().unwrap();
let config = generate_isolated_config();
let node = NodeBuilder::new().config(&config).create::<Sut>().unwrap();
let mut type_detail = TypeDetail::__internal_new::<u8>(TypeVariant::FixedSize);
type_detail.size = TYPE_SIZE_OVERRIDE;

let service = unsafe {
node.service_builder(&service_name)
.publish_subscribe::<[CustomPayloadMarker]>()
.__internal_set_payload_type_details(&type_detail)
.create()
.unwrap()
};

let sut = service.publisher_builder().create().unwrap();

// panics here
let _sample = sut.loan_slice_uninit(1);
}

#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn custom_fixed_size_payload_panics_when_loaning_more_than_one_element<Sut: Service>() {
set_log_level(LogLevel::Error);
let service_name = generate_name().unwrap();
let config = generate_isolated_config();
let node = NodeBuilder::new().config(&config).create::<Sut>().unwrap();
let type_details = TypeDetail::__internal_new::<u8>(TypeVariant::FixedSize);

let service = unsafe {
node.service_builder(&service_name)
.publish_subscribe::<[CustomPayloadMarker]>()
.__internal_set_payload_type_details(&type_details)
.create()
.unwrap()
};

let sut = service.publisher_builder().create().unwrap();

let _sample = unsafe { sut.loan_custom_payload(2) };
}

#[instantiate_tests(<iceoryx2::service::ipc::Service>)]
mod ipc {}

Expand Down
Loading

0 comments on commit d50afad

Please sign in to comment.