Skip to content

Commit

Permalink
[#525] Test multi segment usage in zero copy connection
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Nov 25, 2024
1 parent b538316 commit 5f51073
Show file tree
Hide file tree
Showing 6 changed files with 398 additions and 24 deletions.
3 changes: 2 additions & 1 deletion iceoryx2-cal/src/dynamic_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ pub trait DynamicStorageBuilder<'builder, T: Send + Sync, D: DynamicStorage<T>>:
/// Defines if a newly created [`DynamicStorage`] owns the underlying resources
fn has_ownership(self, value: bool) -> Self;

/// Sets the size of the supplementary data
/// Sets the size of the supplementary data. Only relevant when it is newly created otherwise
/// the already initialized [`DynamicStorage`] with the full size is used.
fn supplementary_size(self, value: usize) -> Self;

/// The timeout defines how long the [`DynamicStorageBuilder`] should wait for
Expand Down
8 changes: 1 addition & 7 deletions iceoryx2-cal/src/dynamic_storage/posix_shared_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl<'builder, T: Send + Sync + Debug> NamedConceptBuilder<Storage<T>> for Build

impl<'builder, T: Send + Sync + Debug> Builder<'builder, T> {
fn open_impl(&self) -> Result<Storage<T>, DynamicStorageOpenError> {
let msg = "Failed to open ";
let msg = "Failed to open posix_shared_memory::DynamicStorage";

let full_name = self.config.path_for(&self.storage_name).file_name();
let mut wait_for_read_write_access = fail!(from self, when AdaptiveWaitBuilder::new().create(),
Expand Down Expand Up @@ -207,12 +207,6 @@ impl<'builder, T: Send + Sync + Debug> Builder<'builder, T> {
"{} since the adaptive wait call failed.", msg);
};

let required_size = std::mem::size_of::<Data<T>>() + self.supplementary_size;
if shm.size() < required_size {
fail!(from self, with DynamicStorageOpenError::InternalError,
"{} since the actual size {} does not match the required size of {}.", msg, shm.size(), required_size);
}

let init_state = shm.base_address().as_ptr() as *const Data<T>;

loop {
Expand Down
4 changes: 3 additions & 1 deletion iceoryx2-cal/src/shared_memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ pub trait SharedMemoryBuilder<Allocator: ShmAllocator, Shm: SharedMemory<Allocat
/// Defines if a newly created [`SharedMemory`] owns the underlying resources
fn has_ownership(self, value: bool) -> Self;

/// Sets the size of the [`SharedMemory`]
/// Sets the size of the [`SharedMemory`]. Only relevant when the [`SharedMemory`] is created
/// otherwise the already initialized [`SharedMemory`] is completely mapped into the process
/// space.
fn size(self, value: usize) -> Self;

/// The timeout defines how long the [`SharedMemoryBuilder`] should wait for
Expand Down
37 changes: 24 additions & 13 deletions iceoryx2-cal/src/zero_copy_connection/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ pub mod details {
"{} since the requested number of samples is set to {} but should be set to {}.",
msg, self.number_of_samples, storage.get().number_of_samples);
}

if storage.get().number_of_segments != self.number_of_segments {
fail!(from self, with ZeroCopyCreationError::IncompatibleNumberOfSegments,
"{} since the requested number of segments is set to {} but should be set to {}.",
msg, self.number_of_segments, storage.get().number_of_segments);
}
}

Ok(storage)
Expand Down Expand Up @@ -524,27 +530,28 @@ pub mod details {
impl<Storage: DynamicStorage<SharedManagementData>> ZeroCopySender for Sender<Storage> {
fn try_send(&self, ptr: PointerOffset) -> Result<Option<PointerOffset>, ZeroCopySendError> {
let msg = "Unable to send sample";
let storage = self.storage.get();

if !self.storage.get().enable_safe_overflow
&& self.storage.get().submission_channel.is_full()
{
if !storage.enable_safe_overflow && storage.submission_channel.is_full() {
fail!(from self, with ZeroCopySendError::ReceiveBufferFull,
"{} since the receive buffer is full.", msg);
}

let segment_id = ptr.segment_id().value() as usize;
let sample_size = self.storage.get().sample_size;
let sample_size = storage.sample_size;
let index = ptr.offset() / sample_size;

if !self.storage.get().used_chunk_list[segment_id].insert(index) {
debug_assert!(segment_id < storage.number_of_segments as usize);

if !storage.used_chunk_list[segment_id].insert(index) {
fail!(from self, with ZeroCopySendError::UsedChunkListFull,
"{} since the used chunk list is full.", msg);
}

match unsafe { self.storage.get().submission_channel.push(ptr.as_value()) } {
match unsafe { storage.submission_channel.push(ptr.as_value()) } {
Some(v) => {
let pointer_offset = PointerOffset::from_value(v as u64);
if !self.storage.get().used_chunk_list[segment_id]
if !storage.used_chunk_list[segment_id]
.remove(pointer_offset.offset() / sample_size)
{
fail!(from self, with ZeroCopySendError::ConnectionCorrupted,
Expand Down Expand Up @@ -574,22 +581,26 @@ pub mod details {

fn reclaim(&self) -> Result<Option<PointerOffset>, ZeroCopyReclaimError> {
let msg = "Unable to reclaim sample";
match unsafe { self.storage.get().completion_channel.pop() } {

let storage = self.storage.get();
match unsafe { storage.completion_channel.pop() } {
None => Ok(None),
Some(v) => {
let pointer_offset = PointerOffset::from_value(v as u64);
let segment_id = pointer_offset.segment_id().value() as usize;

if segment_id >= self.storage.get().used_chunk_list.len() {
fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedOffset,
debug_assert!(segment_id < storage.number_of_segments as usize);

if segment_id >= storage.used_chunk_list.len() {
fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedPointerOffset,
"{} since the receiver returned a non-existing segment id {:?}.",
msg, pointer_offset);
}

if !self.storage.get().used_chunk_list[segment_id]
.remove(pointer_offset.offset() / self.storage.get().sample_size)
if !storage.used_chunk_list[segment_id]
.remove(pointer_offset.offset() / storage.sample_size)
{
fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedOffset,
fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedPointerOffset,
"{} since the receiver returned a corrupted offset {:?}.",
msg, pointer_offset);
}
Expand Down
3 changes: 2 additions & 1 deletion iceoryx2-cal/src/zero_copy_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub enum ZeroCopyCreationError {
IncompatibleOverflowSetting,
IncompatibleSampleSize,
IncompatibleNumberOfSamples,
IncompatibleNumberOfSegments,
}

impl std::fmt::Display for ZeroCopyCreationError {
Expand Down Expand Up @@ -77,7 +78,7 @@ impl std::error::Error for ZeroCopyReceiveError {}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ZeroCopyReclaimError {
ReceiverReturnedCorruptedOffset,
ReceiverReturnedCorruptedPointerOffset,
}

impl std::fmt::Display for ZeroCopyReclaimError {
Expand Down
Loading

0 comments on commit 5f51073

Please sign in to comment.