From 839ffe3017fdef97a33745619558dfea692774ca Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 18 Jul 2024 17:13:28 +0300 Subject: [PATCH] Drop the whole ZBuf in case of SHM error! --- io/zenoh-transport/src/multicast/rx.rs | 7 ++++++- io/zenoh-transport/src/shm.rs | 17 +++++------------ io/zenoh-transport/src/unicast/lowlatency/rx.rs | 7 ++++++- io/zenoh-transport/src/unicast/universal/rx.rs | 7 ++++++- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 9a6cdb0d4d..d8a6aaeb02 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -13,6 +13,8 @@ // use std::sync::MutexGuard; +#[cfg(feature = "shared-memory")] +use tracing::error; use zenoh_core::{zlock, zread}; use zenoh_protocol::{ core::{Locator, Priority, Reliability}, @@ -44,7 +46,10 @@ impl TransportMulticastInner { #[cfg(feature = "shared-memory")] { if self.manager.config.multicast.is_shm { - crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?; + if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) { + error!("Error receiving SHM buffer: {e}"); + return Ok(()); + } } } diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index fc3c75b6a8..c562e47135 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -13,7 +13,6 @@ // use std::collections::HashSet; -use tracing::error; use zenoh_buffers::{reader::HasReader, writer::HasWriter, ZBuf, ZSlice, ZSliceKind}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_core::zerror; @@ -331,17 +330,11 @@ pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<() // Deserialize the shminfo let shmbinfo: ShmBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; - // Try to mount shmbuf and replace the content of the slice with mounted buf - // NOTE: SHM buffer read error is not a hard error because we do not want to - // loose all the data in the whole ZBuf above. In case of error we just - // replace current ZSlice with an empty one - *zslice = match shmr.read_shmbuf(&shmbinfo) { - Ok(val) => val.into(), - Result::Err(e) => { - error!("{e}"); - vec![].into() - } - }; + // Mount shmbuf + let smb = shmr.read_shmbuf(&shmbinfo)?; + + // Replace the content of the slice + *zslice = smb.into(); Ok(()) } diff --git a/io/zenoh-transport/src/unicast/lowlatency/rx.rs b/io/zenoh-transport/src/unicast/lowlatency/rx.rs index c82e172c7b..0484a4a028 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/rx.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/rx.rs @@ -11,6 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // +#[cfg(feature = "shared-memory")] +use tracing::error; use zenoh_buffers::{ reader::{HasReader, Reader}, ZSlice, @@ -37,7 +39,10 @@ impl TransportUnicastLowlatency { #[cfg(feature = "shared-memory")] { if self.config.shm.is_some() { - crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?; + if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) { + error!("Error receiving SHM buffer: {e}"); + return Ok(()); + } } } callback.handle_message(msg) diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index f97f29b0c7..71e674bdb8 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -13,6 +13,8 @@ // use std::sync::MutexGuard; +#[cfg(feature = "shared-memory")] +use tracing::error; use zenoh_core::{zlock, zread}; use zenoh_link::Link; use zenoh_protocol::{ @@ -45,7 +47,10 @@ impl TransportUnicastUniversal { #[cfg(feature = "shared-memory")] { if self.config.shm.is_some() { - crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?; + if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) { + error!("Error receiving SHM buffer: {e}"); + return Ok(()); + } } } callback.handle_message(msg)