From dedf7924b2f4c798a775a2c0cc23cc1930ad5451 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 18 Jul 2024 15:34:50 +0300 Subject: [PATCH 1/8] Do not trigger transport error in case of SHM buffer invalidation --- io/zenoh-transport/src/shm.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index 8450ad878e..a50e602157 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -13,6 +13,7 @@ // use std::collections::HashSet; +use tracing::error; use zenoh_buffers::{reader::HasReader, writer::HasWriter, ZBuf, ZSlice, ZSliceKind}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_core::zerror; @@ -330,12 +331,17 @@ pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<() // Deserialize the shminfo let shmbinfo: ShmBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; - // Mount shmbuf - let smb = shmr.read_shmbuf(&shmbinfo)?; - - // Replace the content of the slice - let zs: ZSlice = smb.into(); - *zslice = zs; + // Try to mount shmbuf and replace the content of the slice with mounted buf + // NOTE: SHM buffer read error is not a hard error becuse we do not want to + // loose all the data in the whole ZBuf above. In case of error we just + // replace current ZSlice with an empty one + *zslice = match shmr.read_shmbuf(&shmbinfo) { + Ok(val) => val.into(), + Result::Err(e) => { + error!("{e}"); + vec![].into() + } + }; Ok(()) } From 0c998d2df63e5a519f8977ca42f549bc411cc293 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 18 Jul 2024 15:43:51 +0300 Subject: [PATCH 2/8] Fix spelling --- io/zenoh-transport/src/shm.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index a50e602157..fc3c75b6a8 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -332,7 +332,7 @@ pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<() let shmbinfo: ShmBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; // Try to mount shmbuf and replace the content of the slice with mounted buf - // NOTE: SHM buffer read error is not a hard error becuse we do not want to + // NOTE: SHM buffer read error is not a hard error because we do not want to // loose all the data in the whole ZBuf above. In case of error we just // replace current ZSlice with an empty one *zslice = match shmr.read_shmbuf(&shmbinfo) { From 839ffe3017fdef97a33745619558dfea692774ca Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 18 Jul 2024 17:13:28 +0300 Subject: [PATCH 3/8] Drop the whole ZBuf in case of SHM error! --- io/zenoh-transport/src/multicast/rx.rs | 7 ++++++- io/zenoh-transport/src/shm.rs | 17 +++++------------ io/zenoh-transport/src/unicast/lowlatency/rx.rs | 7 ++++++- io/zenoh-transport/src/unicast/universal/rx.rs | 7 ++++++- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 9a6cdb0d4d..d8a6aaeb02 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -13,6 +13,8 @@ // use std::sync::MutexGuard; +#[cfg(feature = "shared-memory")] +use tracing::error; use zenoh_core::{zlock, zread}; use zenoh_protocol::{ core::{Locator, Priority, Reliability}, @@ -44,7 +46,10 @@ impl TransportMulticastInner { #[cfg(feature = "shared-memory")] { if self.manager.config.multicast.is_shm { - crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?; + if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) { + error!("Error receiving SHM buffer: {e}"); + return Ok(()); + } } } diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index fc3c75b6a8..c562e47135 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -13,7 +13,6 @@ // use std::collections::HashSet; -use tracing::error; use zenoh_buffers::{reader::HasReader, writer::HasWriter, ZBuf, ZSlice, ZSliceKind}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_core::zerror; @@ -331,17 +330,11 @@ pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<() // Deserialize the shminfo let shmbinfo: ShmBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; - // Try to mount shmbuf and replace the content of the slice with mounted buf - // NOTE: SHM buffer read error is not a hard error because we do not want to - // loose all the data in the whole ZBuf above. In case of error we just - // replace current ZSlice with an empty one - *zslice = match shmr.read_shmbuf(&shmbinfo) { - Ok(val) => val.into(), - Result::Err(e) => { - error!("{e}"); - vec![].into() - } - }; + // Mount shmbuf + let smb = shmr.read_shmbuf(&shmbinfo)?; + + // Replace the content of the slice + *zslice = smb.into(); Ok(()) } diff --git a/io/zenoh-transport/src/unicast/lowlatency/rx.rs b/io/zenoh-transport/src/unicast/lowlatency/rx.rs index c82e172c7b..0484a4a028 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/rx.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/rx.rs @@ -11,6 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // +#[cfg(feature = "shared-memory")] +use tracing::error; use zenoh_buffers::{ reader::{HasReader, Reader}, ZSlice, @@ -37,7 +39,10 @@ impl TransportUnicastLowlatency { #[cfg(feature = "shared-memory")] { if self.config.shm.is_some() { - crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?; + if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) { + error!("Error receiving SHM buffer: {e}"); + return Ok(()); + } } } callback.handle_message(msg) diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index f97f29b0c7..71e674bdb8 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -13,6 +13,8 @@ // use std::sync::MutexGuard; +#[cfg(feature = "shared-memory")] +use tracing::error; use zenoh_core::{zlock, zread}; use zenoh_link::Link; use zenoh_protocol::{ @@ -45,7 +47,10 @@ impl TransportUnicastUniversal { #[cfg(feature = "shared-memory")] { if self.config.shm.is_some() { - crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?; + if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) { + error!("Error receiving SHM buffer: {e}"); + return Ok(()); + } } } callback.handle_message(msg) From 4189c5c5ce1e53f1b7600cd1644df56bb3d200f6 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Sun, 21 Jul 2024 17:56:03 +0300 Subject: [PATCH 4/8] fix clippy errors --- io/zenoh-transport/src/multicast/rx.rs | 2 -- io/zenoh-transport/src/unicast/lowlatency/rx.rs | 2 -- io/zenoh-transport/src/unicast/universal/rx.rs | 2 -- 3 files changed, 6 deletions(-) diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 0f2cc720a5..93dc3c727a 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -13,8 +13,6 @@ // use std::sync::MutexGuard; -#[cfg(feature = "shared-memory")] -use tracing::error; use zenoh_core::{zlock, zread}; use zenoh_protocol::{ core::{Locator, Priority, Reliability}, diff --git a/io/zenoh-transport/src/unicast/lowlatency/rx.rs b/io/zenoh-transport/src/unicast/lowlatency/rx.rs index 833a868e9f..3dd499000d 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/rx.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/rx.rs @@ -11,8 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -#[cfg(feature = "shared-memory")] -use tracing::error; use zenoh_buffers::{ reader::{HasReader, Reader}, ZSlice, diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 6834cd5f57..afd8e114d7 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -13,8 +13,6 @@ // use std::sync::MutexGuard; -#[cfg(feature = "shared-memory")] -use tracing::error; use zenoh_core::{zlock, zread}; use zenoh_link::Link; use zenoh_protocol::{ From 4c8575d571b28892bbf6ed3f7490ed6487cb6e2a Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Sun, 21 Jul 2024 17:57:01 +0300 Subject: [PATCH 5/8] Fix misaligned memory access bug (affects non-64 bit ARM) --- commons/zenoh-shm/src/posix_shm/segment.rs | 22 ++++++++----------- .../src/unicast/establishment/ext/shm.rs | 17 +++++++++----- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/commons/zenoh-shm/src/posix_shm/segment.rs b/commons/zenoh-shm/src/posix_shm/segment.rs index 5458ab3e3e..657976ece1 100644 --- a/commons/zenoh-shm/src/posix_shm/segment.rs +++ b/commons/zenoh-shm/src/posix_shm/segment.rs @@ -12,10 +12,7 @@ // ZettaScale Zenoh Team, // -use std::{ - fmt::{Debug, Display}, - mem::size_of, -}; +use std::fmt::{Debug, Display}; use rand::Rng; use shared_memory::{Shmem, ShmemConf, ShmemError}; @@ -63,7 +60,7 @@ where // If creation fails because segment already exists for this id, // the creation attempt will be repeated with another id match ShmemConf::new() - .size(alloc_size + size_of::()) + .size(alloc_size) .os_id(Self::os_id(id.clone(), id_prefix)) .create() { @@ -71,7 +68,6 @@ where tracing::debug!( "Created SHM segment, size: {alloc_size}, prefix: {id_prefix}, id: {id}" ); - unsafe { *(shmem.as_ptr() as *mut usize) = alloc_size }; return Ok(Segment { shmem, id }); } Err(ShmemError::LinkExists) => {} @@ -94,10 +90,6 @@ where ) })?; - if shmem.len() <= size_of::() { - bail!("SHM segment too small") - } - tracing::debug!("Opened SHM segment, prefix: {id_prefix}, id: {id}"); Ok(Self { shmem, id }) @@ -110,17 +102,21 @@ where } pub fn as_ptr(&self) -> *mut u8 { - unsafe { self.shmem.as_ptr().add(size_of::()) } + self.shmem.as_ptr() } + /// Returns the length of this [`Segment`]. + /// NOTE: one some platforms (at least windows) the returned len will be the actual length of an shm segment + /// (a required len rounded up to the nearest multiply of page size), on other (at least linux and macos) this + /// returns a value requested upon segment creation pub fn len(&self) -> usize { - unsafe { *(self.shmem.as_ptr() as *mut usize) } + self.shmem.len() } // TODO: dead code warning occurs because of `tested_crate_module!()` macro when feature `test` is not enabled. Better to fix that #[allow(dead_code)] pub fn is_empty(&self) -> bool { - unsafe { *(self.shmem.as_ptr() as *mut usize) == 0 } + self.len() == 0 } pub fn id(&self) -> ID { diff --git a/io/zenoh-transport/src/unicast/establishment/ext/shm.rs b/io/zenoh-transport/src/unicast/establishment/ext/shm.rs index e2068af94a..025aaaef44 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/shm.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/shm.rs @@ -36,6 +36,10 @@ const AUTH_SEGMENT_PREFIX: &str = "auth"; pub(crate) type AuthSegmentID = u32; pub(crate) type AuthChallenge = u64; +const LEN_INDEX: usize = 0; +const CHALLENGE_INDEX: usize = 1; +const ID_START_INDEX: usize = 2; + #[derive(Debug)] pub struct AuthSegment { array: ArrayInSHM, @@ -44,13 +48,14 @@ pub struct AuthSegment { impl AuthSegment { pub fn create(challenge: AuthChallenge, shm_protocols: &[ProtocolID]) -> ZResult { let array = ArrayInSHM::::create( - 1 + shm_protocols.len(), + ID_START_INDEX + shm_protocols.len(), AUTH_SEGMENT_PREFIX, )?; unsafe { - (*array.elem_mut(0)) = challenge; - for elem in 1..array.elem_count() { - (*array.elem_mut(elem)) = shm_protocols[elem - 1] as u64; + (*array.elem_mut(LEN_INDEX)) = shm_protocols.len() as AuthChallenge; + (*array.elem_mut(CHALLENGE_INDEX)) = challenge; + for elem in ID_START_INDEX..array.elem_count() { + (*array.elem_mut(elem)) = shm_protocols[elem - ID_START_INDEX] as u64; } }; Ok(Self { array }) @@ -62,12 +67,12 @@ impl AuthSegment { } pub fn challenge(&self) -> AuthChallenge { - unsafe { *self.array.elem(0) } + unsafe { *self.array.elem(CHALLENGE_INDEX) } } pub fn protocols(&self) -> Vec { let mut result = vec![]; - for elem in 1..self.array.elem_count() { + for elem in ID_START_INDEX..self.array.elem_count() { result.push(unsafe { *self.array.elem(elem) as u32 }); } result From 1cbf809b1f1a6c5dc78f18cb6a51ef3eafe96efb Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 24 Jul 2024 11:44:31 +0300 Subject: [PATCH 6/8] fix tests to be platform-agnostic --- commons/zenoh-shm/tests/posix_array.rs | 22 ++++++------- commons/zenoh-shm/tests/posix_segment.rs | 40 +++++++++++++----------- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/commons/zenoh-shm/tests/posix_array.rs b/commons/zenoh-shm/tests/posix_array.rs index 562102ea17..83fdad88fb 100644 --- a/commons/zenoh-shm/tests/posix_array.rs +++ b/commons/zenoh-shm/tests/posix_array.rs @@ -41,25 +41,25 @@ impl TestElem { } fn validate_array( - array1: &mut ArrayInSHM, - array2: &ArrayInSHM, + created_array: &mut ArrayInSHM, + opened_array: &ArrayInSHM, expected_elem_count: usize, ) where ElemIndex: Unsigned + PrimInt + 'static + AsPrimitive, isize: AsPrimitive, usize: AsPrimitive, { - assert!(array1.elem_count() == expected_elem_count); - assert!(array2.elem_count() == expected_elem_count); + assert!(created_array.elem_count() == expected_elem_count); + assert!(opened_array.elem_count() >= expected_elem_count); let mut fill_ctr = 0; let mut validate_ctr = 0; // first of all, fill and validate elements sequentially - for i in 0..array1.elem_count() { + for i in 0..expected_elem_count { unsafe { - let elem1 = &mut *array1.elem_mut(i.as_()); - let elem2 = &*array2.elem(i.as_()); + let elem1 = &mut *created_array.elem_mut(i.as_()); + let elem2 = &*opened_array.elem(i.as_()); elem1.fill(&mut fill_ctr); elem2.validate(&mut validate_ctr); @@ -67,17 +67,17 @@ fn validate_array( } // then fill all the elements... - for i in 0..array1.elem_count() { + for i in 0..expected_elem_count { unsafe { - let elem1 = &mut *array1.elem_mut(i.as_()); + let elem1 = &mut *created_array.elem_mut(i.as_()); elem1.fill(&mut fill_ctr); } } // ...and validate all the elements - for i in 0..array2.elem_count() { + for i in 0..expected_elem_count { unsafe { - let elem2 = &*array2.elem(i.as_()); + let elem2 = &*opened_array.elem(i.as_()); elem2.validate(&mut validate_ctr); } } diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs index 094ae40a85..63bbe663cd 100644 --- a/commons/zenoh-shm/tests/posix_segment.rs +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -19,18 +19,22 @@ use zenoh_shm::posix_shm::segment::Segment; pub mod common; use common::{validate_memory, TEST_SEGMENT_PREFIX}; -fn validate_segment(segment1: &Segment, segment2: &Segment) -where +fn validate_segment( + created_segment: &Segment, + opened_segment: &Segment, + expected_elem_count: usize, +) where rand::distributions::Standard: rand::distributions::Distribution, ID: Clone + Display, { - assert!(segment1.len() == segment2.len()); + assert!(created_segment.len() == expected_elem_count); + assert!(opened_segment.len() >= expected_elem_count); - let ptr1 = segment1.as_ptr(); - let ptr2 = segment2.as_ptr(); + let ptr1 = created_segment.as_ptr(); + let ptr2 = opened_segment.as_ptr(); - let slice1 = unsafe { slice::from_raw_parts_mut(ptr1, segment1.len()) }; - let slice2 = unsafe { slice::from_raw_parts(ptr2, segment2.len()) }; + let slice1 = unsafe { slice::from_raw_parts_mut(ptr1, expected_elem_count) }; + let slice2 = unsafe { slice::from_raw_parts(ptr2, expected_elem_count) }; validate_memory(slice1, slice2); } @@ -40,22 +44,22 @@ where rand::distributions::Standard: rand::distributions::Distribution, ID: Copy + Clone + Display, { - let new_segment: Segment = + let created_segment: Segment = Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); - let opened_segment_instance_1 = Segment::open(new_segment.id(), TEST_SEGMENT_PREFIX) + let opened_segment_instance_1 = Segment::open(created_segment.id(), TEST_SEGMENT_PREFIX) .expect("error opening existing segment!"); - validate_segment(&new_segment, &opened_segment_instance_1); + validate_segment(&created_segment, &opened_segment_instance_1); - let opened_segment_instance_2 = Segment::open(new_segment.id(), TEST_SEGMENT_PREFIX) + let opened_segment_instance_2 = Segment::open(created_segment.id(), TEST_SEGMENT_PREFIX) .expect("error opening existing segment!"); - validate_segment(&new_segment, &opened_segment_instance_1); - validate_segment(&new_segment, &opened_segment_instance_2); + validate_segment(&created_segment, &opened_segment_instance_1); + validate_segment(&created_segment, &opened_segment_instance_2); drop(opened_segment_instance_1); - validate_segment(&new_segment, &opened_segment_instance_2); + validate_segment(&created_segment, &opened_segment_instance_2); } /// UNSIGNED /// @@ -116,19 +120,19 @@ fn segment_i128_id() { #[test] fn segment_open() { - let new_segment: Segment = + let created_segment: Segment = Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); - let _opened_segment = Segment::open(new_segment.id(), TEST_SEGMENT_PREFIX) + let _opened_segment = Segment::open(created_segment.id(), TEST_SEGMENT_PREFIX) .expect("error opening existing segment!"); } #[test] fn segment_open_error() { let id = { - let new_segment: Segment = + let created_segment: Segment = Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); - new_segment.id() + created_segment.id() }; let _opened_segment = Segment::open(id, TEST_SEGMENT_PREFIX) From b545f72df0f20cecac4382ef508d6aa91e658af5 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 24 Jul 2024 11:49:53 +0300 Subject: [PATCH 7/8] Update posix_segment.rs --- commons/zenoh-shm/tests/posix_segment.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs index 63bbe663cd..cc3a92e9e5 100644 --- a/commons/zenoh-shm/tests/posix_segment.rs +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -44,19 +44,21 @@ where rand::distributions::Standard: rand::distributions::Distribution, ID: Copy + Clone + Display, { + let elem_count = 900; + let created_segment: Segment = - Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); + Segment::create(elem_count, TEST_SEGMENT_PREFIX).expect("error creating new segment"); let opened_segment_instance_1 = Segment::open(created_segment.id(), TEST_SEGMENT_PREFIX) .expect("error opening existing segment!"); - validate_segment(&created_segment, &opened_segment_instance_1); + validate_segment(&created_segment, &opened_segment_instance_1, elem_count); let opened_segment_instance_2 = Segment::open(created_segment.id(), TEST_SEGMENT_PREFIX) .expect("error opening existing segment!"); - validate_segment(&created_segment, &opened_segment_instance_1); - validate_segment(&created_segment, &opened_segment_instance_2); + validate_segment(&created_segment, &opened_segment_instance_1, elem_count); + validate_segment(&created_segment, &opened_segment_instance_2, elem_count); drop(opened_segment_instance_1); validate_segment(&created_segment, &opened_segment_instance_2); From 0564fd8dedc364b47b364d2b8b0569a286b2de40 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 24 Jul 2024 11:51:46 +0300 Subject: [PATCH 8/8] Update posix_segment.rs --- commons/zenoh-shm/tests/posix_segment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs index cc3a92e9e5..879fccf298 100644 --- a/commons/zenoh-shm/tests/posix_segment.rs +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -61,7 +61,7 @@ where validate_segment(&created_segment, &opened_segment_instance_2, elem_count); drop(opened_segment_instance_1); - validate_segment(&created_segment, &opened_segment_instance_2); + validate_segment(&created_segment, &opened_segment_instance_2, elem_count); } /// UNSIGNED ///