diff --git a/commons/zenoh-codec/src/core/shm.rs b/commons/zenoh-codec/src/core/shm.rs index 4f272f0ed4..b67716611d 100644 --- a/commons/zenoh-codec/src/core/shm.rs +++ b/commons/zenoh-codec/src/core/shm.rs @@ -11,6 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // +use std::num::NonZeroUsize; + use zenoh_buffers::{ reader::{DidntRead, Reader}, writer::{DidntWrite, Writer}, @@ -62,6 +64,18 @@ where } } +impl WCodec for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: NonZeroUsize) -> Self::Output { + self.write(&mut *writer, x.get())?; + Ok(()) + } +} + impl WCodec<&ShmBufInfo, &mut W> for Zenoh080 where W: Writer, @@ -80,7 +94,7 @@ where self.write(&mut *writer, data_descriptor)?; self.write(&mut *writer, shm_protocol)?; - self.write(&mut *writer, data_len)?; + self.write(&mut *writer, *data_len)?; self.write(&mut *writer, watchdog_descriptor)?; self.write(&mut *writer, header_descriptor)?; self.write(&mut *writer, generation)?; @@ -138,6 +152,19 @@ where } } +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + let size: usize = self.read(&mut *reader)?; + let size = NonZeroUsize::new(size).ok_or(DidntRead)?; + Ok(size) + } +} + impl RCodec for Zenoh080 where R: Reader, diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index a76cf896d3..5e3dec390e 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -52,4 +52,4 @@ lockfree = { workspace = true } stabby = { workspace = true } [dev-dependencies] -libc = { workspace = true } +libc = { workspace = true } \ No newline at end of file diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs index 7de9e9f22d..663379e034 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs @@ -16,6 +16,7 @@ use std::{ borrow::Borrow, cmp, collections::BinaryHeap, + num::NonZeroUsize, sync::{ atomic::{AtomicPtr, AtomicUsize, Ordering}, Mutex, @@ -31,7 +32,7 @@ use crate::api::{ provider::{ chunk::{AllocatedChunk, ChunkDescriptor}, shm_provider_backend::ShmProviderBackend, - types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError}, + types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutError}, }, }; @@ -45,7 +46,7 @@ const MIN_FREE_CHUNK_SIZE: usize = 1_024; #[derive(Eq, Copy, Clone, Debug)] struct Chunk { offset: ChunkID, - size: usize, + size: NonZeroUsize, } impl Ord for Chunk { @@ -86,7 +87,7 @@ impl PosixShmProviderBackendBuilder { self, size: usize, alignment: AllocAlignment, - ) -> ZResult> { + ) -> Result, ZLayoutError> { let layout = MemoryLayout::new(size, alignment)?; Ok(LayoutedPosixShmProviderBackendBuilder { layout }) } @@ -96,7 +97,7 @@ impl PosixShmProviderBackendBuilder { pub fn with_size( self, size: usize, - ) -> ZResult> { + ) -> Result, ZLayoutError> { let layout = MemoryLayout::new(size, AllocAlignment::default())?; Ok(LayoutedPosixShmProviderBackendBuilder { layout }) } @@ -149,7 +150,7 @@ impl PosixShmProviderBackend { ); Ok(Self { - available: AtomicUsize::new(layout.size()), + available: AtomicUsize::new(layout.size().get()), segment, free_list: Mutex::new(free_list), alignment: layout.alignment(), @@ -163,7 +164,7 @@ impl ShmProviderBackend for PosixShmProviderBackend { let required_len = layout.size(); - if self.available.load(Ordering::Relaxed) < required_len { + if self.available.load(Ordering::Relaxed) < required_len.get() { tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout); return Err(ZAllocError::OutOfMemory); } @@ -176,16 +177,20 @@ impl ShmProviderBackend for PosixShmProviderBackend { Some(mut chunk) if chunk.size >= required_len => { // NOTE: don't loose any chunks here, as it will lead to memory leak tracing::trace!("Allocator selected Chunk ({:?})", &chunk); - if chunk.size - required_len >= MIN_FREE_CHUNK_SIZE { + if chunk.size.get() - required_len.get() >= MIN_FREE_CHUNK_SIZE { let free_chunk = Chunk { - offset: chunk.offset + required_len as ChunkID, - size: chunk.size - required_len, + offset: chunk.offset + required_len.get() as ChunkID, + // SAFETY: this is safe because we always operate on a leftover, which is checked above! + size: unsafe { + NonZeroUsize::new_unchecked(chunk.size.get() - required_len.get()) + }, }; tracing::trace!("The allocation will leave a Free Chunk: {:?}", &free_chunk); guard.push(free_chunk); chunk.size = required_len; } - self.available.fetch_sub(chunk.size, Ordering::Relaxed); + self.available + .fetch_sub(chunk.size.get(), Ordering::Relaxed); let descriptor = ChunkDescriptor::new(self.segment.segment.id(), chunk.offset, chunk.size); @@ -219,16 +224,18 @@ impl ShmProviderBackend for PosixShmProviderBackend { offset: chunk.chunk, size: chunk.len, }; - self.available.fetch_add(free_chunk.size, Ordering::Relaxed); + self.available + .fetch_add(free_chunk.size.get(), Ordering::Relaxed); zlock!(self.free_list).push(free_chunk); } fn defragment(&self) -> usize { fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option { - let end_offset = a.offset as usize + a.size; + let end_offset = a.offset as usize + a.size.get(); if end_offset == b.offset as usize { Some(Chunk { - size: a.size + b.size, + // SAFETY: this is safe because we operate on non-zero sizes and it will never overflow + size: unsafe { NonZeroUsize::new_unchecked(a.size.get() + b.size.get()) }, offset: a.offset, }) } else { @@ -256,7 +263,7 @@ impl ShmProviderBackend for PosixShmProviderBackend { match try_merge_adjacent_chunks(¤t, &next) { Some(c) => { current = c; - largest = largest.max(current.size); + largest = largest.max(current.size.get()); if i == n { guard.push(current) } @@ -279,7 +286,7 @@ impl ShmProviderBackend for PosixShmProviderBackend { self.available.load(Ordering::Relaxed) } - fn layout_for(&self, layout: MemoryLayout) -> ZResult { + fn layout_for(&self, layout: MemoryLayout) -> Result { layout.extend(self.alignment) } } diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs index dd103462e4..3a08d2be55 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use std::sync::atomic::AtomicPtr; +use std::{num::NonZeroUsize, sync::atomic::AtomicPtr}; use zenoh_result::ZResult; @@ -32,8 +32,8 @@ pub(crate) struct PosixShmSegment { } impl PosixShmSegment { - pub(crate) fn create(alloc_size: usize) -> ZResult { - let segment = ArrayInSHM::create(alloc_size, POSIX_SHM_SEGMENT_PREFIX)?; + pub(crate) fn create(alloc_size: NonZeroUsize) -> ZResult { + let segment = ArrayInSHM::create(alloc_size.get(), POSIX_SHM_SEGMENT_PREFIX)?; Ok(Self { segment }) } diff --git a/commons/zenoh-shm/src/api/provider/chunk.rs b/commons/zenoh-shm/src/api/provider/chunk.rs index 939758a345..fe7d0d5cb6 100644 --- a/commons/zenoh-shm/src/api/provider/chunk.rs +++ b/commons/zenoh-shm/src/api/provider/chunk.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use std::sync::atomic::AtomicPtr; +use std::{num::NonZeroUsize, sync::atomic::AtomicPtr}; use crate::api::common::types::{ChunkID, SegmentID}; @@ -22,13 +22,13 @@ use crate::api::common::types::{ChunkID, SegmentID}; pub struct ChunkDescriptor { pub segment: SegmentID, pub chunk: ChunkID, - pub len: usize, + pub len: NonZeroUsize, } impl ChunkDescriptor { /// Create a new Chunk Descriptor #[zenoh_macros::unstable_doc] - pub fn new(segment: SegmentID, chunk: ChunkID, len: usize) -> Self { + pub fn new(segment: SegmentID, chunk: ChunkID, len: NonZeroUsize) -> Self { Self { segment, chunk, diff --git a/commons/zenoh-shm/src/api/provider/shm_provider.rs b/commons/zenoh-shm/src/api/provider/shm_provider.rs index 8773498b61..1487a1ee18 100644 --- a/commons/zenoh-shm/src/api/provider/shm_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider.rs @@ -16,6 +16,7 @@ use std::{ collections::VecDeque, future::{Future, IntoFuture}, marker::PhantomData, + num::NonZeroUsize, pin::Pin, sync::{atomic::Ordering, Arc, Mutex}, time::Duration, @@ -159,7 +160,7 @@ where IDSource: ProtocolIDSource, Backend: ShmProviderBackend, { - size: usize, + size: NonZeroUsize, provider_layout: MemoryLayout, provider: &'a ShmProvider, } @@ -185,6 +186,7 @@ where // Create layout for specified arguments let layout = MemoryLayout::new(data.size, data.alignment) .map_err(|_| ZLayoutError::IncorrectLayoutArgs)?; + let size = layout.size(); // Obtain provider's layout for our layout let provider_layout = data @@ -194,7 +196,7 @@ where .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?; Ok(Self { - size: data.size, + size, provider_layout, provider: data.provider, }) @@ -320,7 +322,7 @@ where let result = InnerPolicy::alloc(layout, provider); if let Err(ZAllocError::OutOfMemory) = result { // try to alloc again only if GC managed to reclaim big enough chunk - if provider.garbage_collect() >= layout.size() { + if provider.garbage_collect() >= layout.size().get() { return AltPolicy::alloc(layout, provider); } } @@ -352,7 +354,7 @@ where let result = InnerPolicy::alloc(layout, provider); if let Err(ZAllocError::NeedDefragment) = result { // try to alloc again only if big enough chunk was defragmented - if provider.defragment() >= layout.size() { + if provider.defragment() >= layout.size().get() { return AltPolicy::alloc(layout, provider); } } @@ -803,6 +805,8 @@ where /// Remember that chunk's len may be >= len! #[zenoh_macros::unstable_doc] pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult { + let len = len.try_into()?; + // allocate resources for SHM buffer let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?; @@ -837,7 +841,7 @@ where if is_free_chunk(maybe_free) { tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free); self.backend.free(&maybe_free.descriptor); - largest = largest.max(maybe_free.descriptor.len); + largest = largest.max(maybe_free.descriptor.len.get()); return false; } true @@ -868,7 +872,7 @@ where } } - fn alloc_inner(&self, size: usize, layout: &MemoryLayout) -> BufAllocResult + fn alloc_inner(&self, size: NonZeroUsize, layout: &MemoryLayout) -> BufAllocResult where Policy: AllocPolicy, { @@ -914,7 +918,7 @@ where fn wrap( &self, chunk: AllocatedChunk, - len: usize, + len: NonZeroUsize, allocated_header: AllocatedHeaderDescriptor, allocated_watchdog: AllocatedWatchdog, confirmed_watchdog: ConfirmedDescriptor, @@ -971,7 +975,7 @@ where { async fn alloc_inner_async( &self, - size: usize, + size: NonZeroUsize, backend_layout: &MemoryLayout, ) -> BufAllocResult where diff --git a/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs b/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs index 933940cac1..51795f5880 100644 --- a/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs @@ -11,12 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // - -use zenoh_result::ZResult; - use super::{ chunk::ChunkDescriptor, - types::{ChunkAllocResult, MemoryLayout}, + types::{ChunkAllocResult, MemoryLayout, ZLayoutError}, }; /// The provider backend trait @@ -48,5 +45,5 @@ pub trait ShmProviderBackend { /// - validate, if the provided layout can be used with this backend /// - adopt the layout for backend capabilities #[zenoh_macros::unstable_doc] - fn layout_for(&self, layout: MemoryLayout) -> ZResult; + fn layout_for(&self, layout: MemoryLayout) -> Result; } diff --git a/commons/zenoh-shm/src/api/provider/types.rs b/commons/zenoh-shm/src/api/provider/types.rs index 603c4a481a..bb04dfa5fc 100644 --- a/commons/zenoh-shm/src/api/provider/types.rs +++ b/commons/zenoh-shm/src/api/provider/types.rs @@ -12,9 +12,7 @@ // ZettaScale Zenoh Team, // -use std::fmt::Display; - -use zenoh_result::{bail, ZResult}; +use std::{fmt::Display, num::NonZeroUsize}; use super::chunk::AllocatedChunk; use crate::api::buffer::zshmmut::ZShmMut; @@ -60,15 +58,24 @@ impl Default for AllocAlignment { } impl AllocAlignment { + /// Try to create a new AllocAlignment from alignment representation in powers of 2. + /// + /// # Errors + /// + /// This function will return an error if provided alignment power cannot fit into usize. #[zenoh_macros::unstable_doc] - pub fn new(pow: u8) -> Self { - Self { pow } + pub const fn new(pow: u8) -> Result { + match pow { + pow if pow < usize::BITS as u8 => Ok(Self { pow }), + _ => Err(ZLayoutError::IncorrectLayoutArgs), + } } /// Get alignment in normal units (bytes) #[zenoh_macros::unstable_doc] - pub fn get_alignment_value(&self) -> usize { - 1usize << self.pow + pub fn get_alignment_value(&self) -> NonZeroUsize { + // SAFETY: this is safe because we limit pow in new based on usize size + unsafe { NonZeroUsize::new_unchecked(1usize << self.pow) } } /// Align size according to inner alignment. @@ -78,17 +85,25 @@ impl AllocAlignment { /// ``` /// use zenoh_shm::api::provider::types::AllocAlignment; /// - /// let alignment = AllocAlignment::new(2); // 4-byte alignment - /// let initial_size: usize = 7; + /// let alignment = AllocAlignment::new(2).unwrap(); // 4-byte alignment + /// let initial_size = 7.try_into().unwrap(); /// let aligned_size = alignment.align_size(initial_size); - /// assert_eq!(aligned_size, 8); + /// assert_eq!(aligned_size.get(), 8); /// ``` #[zenoh_macros::unstable_doc] - pub fn align_size(&self, size: usize) -> usize { + pub fn align_size(&self, size: NonZeroUsize) -> NonZeroUsize { let alignment = self.get_alignment_value(); - match size % alignment { + match size.get() % alignment { 0 => size, - remainder => size + (alignment - remainder), + // SAFETY: + // This unsafe block is always safe: + // 1. 0 < remainder < alignment + // 2. because of 1, the value of (alignment.get() - remainder) is always > 0 + // 3. because of 2, we add nonzero size to nonzero (alignment.get() - remainder) and it is always positive if no overflow + // 4. we make sure that there is no overflow condition in 3 by means of alignment limitation in `new` by limiting pow value + remainder => unsafe { + NonZeroUsize::new_unchecked(size.get() + (alignment.get() - remainder)) + }, } } } @@ -97,7 +112,7 @@ impl AllocAlignment { #[zenoh_macros::unstable_doc] #[derive(Debug)] pub struct MemoryLayout { - size: usize, + size: NonZeroUsize, alignment: AllocAlignment, } @@ -111,18 +126,29 @@ impl Display for MemoryLayout { } impl MemoryLayout { - /// Try to create a new memory layout + /// Try to create a new memory layout. + /// + /// # Errors + /// + /// This function will return an error if zero size have passed or if the provided size is not the multiply of the alignment. #[zenoh_macros::unstable_doc] - pub fn new(size: usize, alignment: AllocAlignment) -> ZResult { + pub fn new(size: T, alignment: AllocAlignment) -> Result + where + T: TryInto, + { + let Ok(size) = size.try_into() else { + return Err(ZLayoutError::IncorrectLayoutArgs); + }; + // size of an allocation must be a miltiple of it's alignment! - match size % alignment.get_alignment_value() { + match size.get() % alignment.get_alignment_value() { 0 => Ok(Self { size, alignment }), - _ => bail!("size of an allocation must be a miltiple of it's alignment!"), + _ => Err(ZLayoutError::IncorrectLayoutArgs), } } #[zenoh_macros::unstable_doc] - pub fn size(&self) -> usize { + pub fn size(&self) -> NonZeroUsize { self.size } @@ -139,27 +165,23 @@ impl MemoryLayout { /// use zenoh_shm::api::provider::types::MemoryLayout; /// /// // 8 bytes with 4-byte alignment - /// let layout4b = MemoryLayout::new(8, AllocAlignment::new(2)).unwrap(); + /// let layout4b = MemoryLayout::new(8, AllocAlignment::new(2).unwrap()).unwrap(); /// /// // Try to realign with 2-byte alignment - /// let layout2b = layout4b.extend(AllocAlignment::new(1)); + /// let layout2b = layout4b.extend(AllocAlignment::new(1).unwrap()); /// assert!(layout2b.is_err()); // fails because new alignment must be >= old /// /// // Try to realign with 8-byte alignment - /// let layout8b = layout4b.extend(AllocAlignment::new(3)); + /// let layout8b = layout4b.extend(AllocAlignment::new(3).unwrap()); /// assert!(layout8b.is_ok()); // ok /// ``` #[zenoh_macros::unstable_doc] - pub fn extend(&self, new_alignment: AllocAlignment) -> ZResult { + pub fn extend(&self, new_alignment: AllocAlignment) -> Result { if self.alignment <= new_alignment { let new_size = new_alignment.align_size(self.size); return MemoryLayout::new(new_size, new_alignment); } - bail!( - "Cannot extend alignment form {} to {}: new alignment must be >= old!", - self.alignment, - new_alignment - ) + Err(ZLayoutError::IncorrectLayoutArgs) } } diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index 19f8a1c76f..8ec2458931 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -13,6 +13,7 @@ // use std::{ any::Any, + num::NonZeroUsize, sync::{ atomic::{AtomicPtr, Ordering}, Arc, @@ -62,7 +63,7 @@ pub struct ShmBufInfo { /// Actual data length /// NOTE: data_descriptor's len is >= of this len and describes the actual memory length /// dedicated in shared memory segment for this particular buffer. - pub data_len: usize, + pub data_len: NonZeroUsize, /// The watchdog descriptor pub watchdog_descriptor: Descriptor, @@ -76,7 +77,7 @@ impl ShmBufInfo { pub fn new( data_descriptor: ChunkDescriptor, shm_protocol: ProtocolID, - data_len: usize, + data_len: NonZeroUsize, watchdog_descriptor: Descriptor, header_descriptor: HeaderDescriptor, generation: u32, @@ -122,14 +123,10 @@ impl std::fmt::Debug for ShmBufInner { } impl ShmBufInner { - pub fn len(&self) -> usize { + pub fn len(&self) -> NonZeroUsize { self.info.data_len } - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - fn is_valid(&self) -> bool { self.header.header().generation.load(Ordering::SeqCst) == self.info.generation } @@ -156,7 +153,7 @@ impl ShmBufInner { fn as_slice(&self) -> &[u8] { tracing::trace!("ShmBufInner::as_slice() == len = {:?}", self.info.data_len); let bp = self.buf.load(Ordering::SeqCst); - unsafe { std::slice::from_raw_parts(bp, self.info.data_len) } + unsafe { std::slice::from_raw_parts(bp, self.info.data_len.get()) } } unsafe fn dec_ref_count(&self) { @@ -176,7 +173,7 @@ impl ShmBufInner { /// guarantee that your in applications only one process at the time will actually write. unsafe fn as_mut_slice_inner(&mut self) -> &mut [u8] { let bp = self.buf.load(Ordering::SeqCst); - std::slice::from_raw_parts_mut(bp, self.info.data_len) + std::slice::from_raw_parts_mut(bp, self.info.data_len.get()) } } diff --git a/examples/examples/z_alloc_shm.rs b/examples/examples/z_alloc_shm.rs index eceb74f35b..e96ca7dab1 100644 --- a/examples/examples/z_alloc_shm.rs +++ b/examples/examples/z_alloc_shm.rs @@ -54,7 +54,7 @@ async fn run() -> ZResult<()> { // OPTION: Allocation with custom alignment and alloc policy customization let _comprehensive = provider .alloc(512) - .with_alignment(AllocAlignment::new(2)) + .with_alignment(AllocAlignment::new(2).unwrap()) // for more examples on policies, please see allocation policy usage below (for layout allocation API) .with_policy::() .wait() @@ -63,7 +63,7 @@ async fn run() -> ZResult<()> { // OPTION: Allocation with custom alignment and async alloc policy let _async = provider .alloc(512) - .with_alignment(AllocAlignment::new(2)) + .with_alignment(AllocAlignment::new(2).unwrap()) // for more examples on policies, please see allocation policy usage below (for layout allocation API) .with_policy::>>() .await @@ -83,7 +83,7 @@ async fn run() -> ZResult<()> { // OPTION: Comprehensive configuration: let _comprehensive_layout = provider .alloc(512) - .with_alignment(AllocAlignment::new(2)) + .with_alignment(AllocAlignment::new(2).unwrap()) .into_layout() .unwrap(); diff --git a/io/zenoh-transport/src/unicast/establishment/ext/shm.rs b/io/zenoh-transport/src/unicast/establishment/ext/shm.rs index 1a6f272d42..e2068af94a 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/shm.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/shm.rs @@ -418,7 +418,13 @@ impl<'a> AcceptFsm for &'a ShmFsm<'a> { }; // Read Alice's SHM Segment - let alice_segment = AuthSegment::open(init_syn.alice_segment)?; + let alice_segment = match AuthSegment::open(init_syn.alice_segment) { + Ok(buff) => buff, + Err(e) => { + tracing::trace!("{} {}", S, e); + return Ok(None); + } + }; Ok(Some(alice_segment)) }