From e76243b08b86156b43628ec969447a91d38a1d98 Mon Sep 17 00:00:00 2001 From: yellowhatter <104833606+yellowhatter@users.noreply.github.com> Date: Thu, 6 Mar 2025 11:49:30 +0300 Subject: [PATCH] Migrate SHM (#1798) * WIP on SHM migration * posix implementation * tests for shm api abstraction * - move advisory locking functionality to shm segment class - some good refactoring * fix lints * fix workspace clippy * handle flock errors more precisely * use advisory-lock-based persistency suppression only on linux, mac has (and always had) some issues to fix * Update unix.rs * Update unix.rs * Update unix.rs * remove unsupported index for shm segment * Update shm.rs * rough windows support for shm * Update unix.rs * - format fix - disable some shm tests only for macos * add missing win_sys * fix errors on win * Update unix.rs and windows.rs * - get rid of some unwrap - fix clippy - take new elem_count() behavior into account (elem_count may be bigger then the one requested upon segment creation) * Update shm.rs * Update shm.rs * Update array.rs and shm.rs * Update array.rs * Update array.rs * Get real segment size for created POSIX segment * fix bug with SHM protocol handshaking * Update unix.rs and windows.rs * fix posix segment tests * Update Cargo.lock, Cargo.toml, and 2 more files... * Update windows.rs * fix posix shm provider tests * Update windows.rs * Update posix_shm_provider_backend.rs, windows.rs, and posix_shm_provider.rs * fix allocator test * WIP on mac persistency problem (flock() doesn't always work on mac tmpfs, so need other bsd-specific mechanism) * Make BSD and non-BSD implementation for shared memory cleanup * Add config aliases to properly detect BSD systems * - fix CI - add comments * Update unix.rs * Apply advisory locking on non-tmpfs for BSD * fix issues on mac * Update unix.rs * Update unix.rs * Update windows.rs * Update unix.rs and windows.rs * - review fixes - fix segment collision from different runs * Update shm.rs * Update shm.rs * Review fixes --- Cargo.lock | 56 +--- Cargo.toml | 3 +- commons/zenoh-shm/Cargo.toml | 12 +- commons/zenoh-shm/build.rs | 49 ++++ .../posix/posix_shm_provider_backend.rs | 13 +- .../posix/posix_shm_segment.rs | 6 +- commons/zenoh-shm/src/lib.rs | 1 + commons/zenoh-shm/src/metadata/segment.rs | 8 +- commons/zenoh-shm/src/posix_shm/array.rs | 39 ++- commons/zenoh-shm/src/posix_shm/cleanup.rs | 17 +- commons/zenoh-shm/src/posix_shm/mod.rs | 2 - commons/zenoh-shm/src/posix_shm/segment.rs | 130 +++------- .../zenoh-shm/src/posix_shm/segment_lock.rs | 135 ---------- .../zenoh-shm/src/posix_shm/struct_in_shm.rs | 31 +-- commons/zenoh-shm/src/shm/mod.rs | 79 ++++++ commons/zenoh-shm/src/shm/unix.rs | 241 ++++++++++++++++++ commons/zenoh-shm/src/shm/windows.rs | 149 +++++++++++ commons/zenoh-shm/tests/common/mod.rs | 2 - commons/zenoh-shm/tests/posix_array.rs | 26 +- commons/zenoh-shm/tests/posix_segment.rs | 81 ++---- commons/zenoh-shm/tests/posix_shm_provider.rs | 35 ++- commons/zenoh-shm/tests/shm.rs | 95 +++++++ .../src/unicast/establishment/ext/shm.rs | 19 +- 23 files changed, 795 insertions(+), 434 deletions(-) create mode 100644 commons/zenoh-shm/build.rs delete mode 100644 commons/zenoh-shm/src/posix_shm/segment_lock.rs create mode 100644 commons/zenoh-shm/src/shm/mod.rs create mode 100644 commons/zenoh-shm/src/shm/unix.rs create mode 100644 commons/zenoh-shm/src/shm/windows.rs create mode 100644 commons/zenoh-shm/tests/shm.rs diff --git a/Cargo.lock b/Cargo.lock index a111a92bca..fc640213e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -966,21 +966,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb25d077389e53838a8158c8e99174c5a9d902dee4904320db714f3c653ffba" -[[package]] -name = "crc" -version = "3.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "criterion" version = "0.5.1" @@ -2164,15 +2149,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "memoffset" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" -dependencies = [ - "autocfg", -] - [[package]] name = "memoffset" version = "0.7.1" @@ -2258,19 +2234,6 @@ dependencies = [ "getrandom 0.2.15", ] -[[package]] -name = "nix" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c" -dependencies = [ - "bitflags 1.3.2", - "cc", - "cfg-if 1.0.0", - "libc", - "memoffset 0.6.5", -] - [[package]] name = "nix" version = "0.26.4" @@ -3732,19 +3695,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "shared_memory" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba8593196da75d9dc4f69349682bd4c2099f8cde114257d1ef7ef1b33d1aba54" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "nix 0.23.2", - "rand 0.8.5", - "win-sys", -] - [[package]] name = "shellexpand" version = "3.1.0" @@ -5737,18 +5687,20 @@ version = "1.2.1" dependencies = [ "advisory-lock", "async-trait", - "crc", + "cfg_aliases 0.2.1", "crossbeam-queue", "libc", + "nix 0.29.0", "num-traits", "num_cpus", "rand 0.8.5", - "shared_memory", "stabby", "static_init", "thread-priority", "tokio", "tracing", + "win-sys", + "winapi", "zenoh-buffers", "zenoh-core", "zenoh-macros", diff --git a/Cargo.toml b/Cargo.toml index 2b93d472d6..828a2dd711 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,6 @@ bytes = "1.7.1" clap = { version = "4.5.17", features = ["derive"] } console-subscriber = "0.4.0" const_format = "0.2.33" -crc = "3.2.1" criterion = "0.5" crossbeam-utils = "0.8.20" crossbeam-queue = "0.3.12" @@ -163,7 +162,6 @@ serde_yaml = "0.9.34" static_init = "1.0.3" stabby = "36.1.1" sha3 = "0.10.8" -shared_memory = "0.12.4" shellexpand = "3.1.0" socket2 = { version = "0.5.7", features = ["all"] } stop-token = "0.7.0" @@ -188,6 +186,7 @@ uuid = { version = "1.10.0", default-features = false, features = [ validated_struct = "2.1.0" vec_map = "0.8.2" webpki-roots = "0.26.5" +win-sys = "0.3" winapi = { version = "0.3.9", features = ["iphlpapi", "winerror"] } x509-parser = "0.16.0" z-serial = "0.3.1" diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index 9039f7cf0c..297a9c8aa4 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -33,15 +33,13 @@ test = ["num_cpus"] [dependencies] async-trait = { workspace = true } -crc = { workspace = true } tracing = { workspace = true } -shared_memory = { workspace = true } tokio = { workspace = true } zenoh-result = { workspace = true } zenoh-core = { workspace = true } zenoh-macros = { workspace = true } zenoh-buffers = { workspace = true } -rand = { workspace = true } +rand = { workspace = true, features = ["std", "std_rng"] } static_init = { workspace = true } num-traits = { workspace = true } num_cpus = { workspace = true, optional = true } @@ -51,6 +49,14 @@ crossbeam-queue = { workspace = true } [target.'cfg(unix)'.dependencies] advisory-lock = { workspace = true } +nix = { workspace = true, features = ["fs", "mman"] } + +[target.'cfg(windows)'.dependencies] +win-sys = { workspace = true } +winapi = { workspace = true } [dev-dependencies] libc = { workspace = true } + +[build-dependencies] +cfg_aliases = "0.2.1" \ No newline at end of file diff --git a/commons/zenoh-shm/build.rs b/commons/zenoh-shm/build.rs new file mode 100644 index 0000000000..2502b031c0 --- /dev/null +++ b/commons/zenoh-shm/build.rs @@ -0,0 +1,49 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use cfg_aliases::cfg_aliases; + +fn main() { + // these aliases should at least be included in the same aliases of Nix crate: + // ___________________ + // | | + // | Nix aliases | + // | ___________ | + // | | Our | | + // | | aliases | | + // | |_________| | + // |_________________| + cfg_aliases! { + dragonfly: { target_os = "dragonfly" }, + ios: { target_os = "ios" }, + freebsd: { target_os = "freebsd" }, + macos: { target_os = "macos" }, + netbsd: { target_os = "netbsd" }, + openbsd: { target_os = "openbsd" }, + watchos: { target_os = "watchos" }, + tvos: { target_os = "tvos" }, + visionos: { target_os = "visionos" }, + + apple_targets: { any(ios, macos, watchos, tvos, visionos) }, + bsd: { any(freebsd, dragonfly, netbsd, openbsd, apple_targets) }, + + // we use this alias to detect platforms that + // don't support advisory file locking on tmpfs + shm_external_lockfile: { any(bsd, target_os = "redox") }, + } + + println!("cargo:rustc-check-cfg=cfg(apple_targets)"); + println!("cargo:rustc-check-cfg=cfg(bsd)"); + println!("cargo:rustc-check-cfg=cfg(shm_external_lockfile)"); +} diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs index bee2510997..5e20835f03 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs @@ -133,21 +133,28 @@ impl PosixShmProviderBackend { fn new(layout: &MemoryLayout) -> ZResult { let segment = PosixShmSegment::create(layout.size())?; + // because of platform specific, our shm segment is >= requested size, so in order to utilize + // additional memory we re-layout the size + let real_size = segment.segment.elem_count().get(); + let aligned_size = (real_size + - (real_size % layout.alignment().get_alignment_value().get())) + .try_into()?; + let mut free_list = BinaryHeap::new(); let root_chunk = Chunk { offset: 0, - size: layout.size(), + size: aligned_size, }; free_list.push(root_chunk); tracing::trace!( - "Created PosixShmProviderBackend id {}, layout {:?}", + "Created PosixShmProviderBackend id {}, layout {:?}, aligned size {aligned_size}", segment.segment.id(), layout ); Ok(Self { - available: AtomicUsize::new(layout.size().get()), + available: AtomicUsize::new(aligned_size.get()), segment, free_list: Mutex::new(free_list), alignment: layout.alignment(), diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs index 3a08d2be55..9051852583 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs @@ -24,8 +24,6 @@ use crate::{ posix_shm::array::ArrayInSHM, }; -const POSIX_SHM_SEGMENT_PREFIX: &str = "posix_shm_provider_segment"; - #[derive(Debug)] pub(crate) struct PosixShmSegment { pub(crate) segment: ArrayInSHM, @@ -33,12 +31,12 @@ pub(crate) struct PosixShmSegment { impl PosixShmSegment { pub(crate) fn create(alloc_size: NonZeroUsize) -> ZResult { - let segment = ArrayInSHM::create(alloc_size.get(), POSIX_SHM_SEGMENT_PREFIX)?; + let segment = ArrayInSHM::create(alloc_size)?; Ok(Self { segment }) } pub(crate) fn open(id: SegmentID) -> ZResult { - let segment = ArrayInSHM::open(id, POSIX_SHM_SEGMENT_PREFIX)?; + let segment = ArrayInSHM::open(id)?; Ok(Self { segment }) } } diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index b2ef6989a9..4dc0c0e872 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -60,6 +60,7 @@ pub mod posix_shm; pub mod reader; pub mod version; pub mod watchdog; +tested_crate_module!(shm); /// Information about a [`ShmBufInner`]. /// diff --git a/commons/zenoh-shm/src/metadata/segment.rs b/commons/zenoh-shm/src/metadata/segment.rs index a138a7a634..65d14d1d75 100644 --- a/commons/zenoh-shm/src/metadata/segment.rs +++ b/commons/zenoh-shm/src/metadata/segment.rs @@ -19,12 +19,10 @@ use zenoh_result::ZResult; use super::descriptor::{MetadataIndex, MetadataSegmentID}; use crate::{header::chunk_header::ChunkHeaderType, posix_shm::struct_in_shm::StructInSHM}; -const METADATA_SEGMENT_PREFIX: &str = "metadata"; - #[stabby::stabby] pub struct Metadata { headers: [ChunkHeaderType; S], - watchdogs: [AtomicU64; S], // todo: replace with (S + 63) / 64 when Rust supports it + watchdogs: [AtomicU64; S], // TODO: replace with (S + 63) / 64 when Rust supports it } impl Metadata { @@ -59,12 +57,12 @@ pub struct MetadataSegment { impl MetadataSegment { pub fn create() -> ZResult { - let data = StructInSHM::create(METADATA_SEGMENT_PREFIX)?; + let data = StructInSHM::create()?; Ok(Self { data }) } pub fn open(id: MetadataSegmentID) -> ZResult { - let data = StructInSHM::open(id, METADATA_SEGMENT_PREFIX)?; + let data = StructInSHM::open(id)?; Ok(Self { data }) } } diff --git a/commons/zenoh-shm/src/posix_shm/array.rs b/commons/zenoh-shm/src/posix_shm/array.rs index d092c579b5..23008eba49 100644 --- a/commons/zenoh-shm/src/posix_shm/array.rs +++ b/commons/zenoh-shm/src/posix_shm/array.rs @@ -12,20 +12,21 @@ // ZettaScale Zenoh Team, // -use std::{fmt::Display, marker::PhantomData, mem::size_of}; +use std::{marker::PhantomData, mem::size_of, num::NonZeroUsize}; use num_traits::{AsPrimitive, PrimInt, Unsigned}; use stabby::IStable; use zenoh_result::{bail, ZResult}; use super::segment::Segment; +use crate::shm; /// An SHM segment that is intended to be an array of elements of some certain type #[derive(Debug)] pub struct ArrayInSHM where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { inner: Segment, _phantom: PhantomData<(Elem, ElemIndex)>, @@ -34,20 +35,20 @@ where unsafe impl Sync for ArrayInSHM where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { } unsafe impl Send for ArrayInSHM where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { } impl ArrayInSHM where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, ElemIndex: Unsigned + PrimInt + 'static + AsPrimitive, Elem: IStable, isize: AsPrimitive, @@ -57,26 +58,22 @@ where panic!("Elem is a ZST. ZSTs are not allowed as ArrayInSHM generic"); }; - pub fn create(elem_count: usize, file_prefix: &str) -> ZResult { - if elem_count == 0 { - bail!("Unable to create SHM array segment of 0 elements") - } - + pub fn create(elem_count: NonZeroUsize) -> ZResult { let max: usize = ElemIndex::max_value().as_(); - if elem_count - 1 > max { + if elem_count.get() - 1 > max { bail!("Unable to create SHM array segment of {elem_count} elements: out of range for ElemIndex!") } - let alloc_size = elem_count * size_of::(); - let inner = Segment::create(alloc_size, file_prefix)?; + let alloc_size = NonZeroUsize::try_from(elem_count.get() * size_of::())?; + let inner = Segment::create(alloc_size)?; Ok(Self { inner, _phantom: PhantomData, }) } - pub fn open(id: ID, file_prefix: &str) -> ZResult { - let inner = Segment::open(id, file_prefix)?; + pub fn open(id: ID) -> ZResult { + let inner = Segment::open(id)?; Ok(Self { inner, _phantom: PhantomData, @@ -87,8 +84,10 @@ where self.inner.id() } - pub fn elem_count(&self) -> usize { - self.inner.len() / size_of::() + pub fn elem_count(&self) -> NonZeroUsize { + let max: usize = ElemIndex::max_value().as_(); + let actual = self.inner.len().get() / size_of::(); + unsafe { NonZeroUsize::new_unchecked(std::cmp::min(max.saturating_add(1), actual)) } } /// # Safety @@ -96,7 +95,7 @@ where /// Additional assert to check the index validity is added for "test" feature pub unsafe fn elem(&self, index: ElemIndex) -> *const Elem { #[cfg(feature = "test")] - assert!(self.inner.len() > index.as_() * size_of::()); + assert!(self.inner.len().get() > index.as_() * size_of::()); (self.inner.as_ptr() as *const Elem).add(index.as_()) } @@ -105,7 +104,7 @@ where /// Additional assert to check the index validity is added for "test" feature pub unsafe fn elem_mut(&self, index: ElemIndex) -> *mut Elem { #[cfg(feature = "test")] - assert!(self.inner.len() > index.as_() * size_of::()); + assert!(self.inner.len().get() > index.as_() * size_of::()); (self.inner.as_ptr() as *mut Elem).add(index.as_()) } @@ -117,7 +116,7 @@ where #[cfg(feature = "test")] { assert!(index >= 0); - assert!(self.inner.len() > index as usize * size_of::()); + assert!(self.inner.len().get() > index as usize * size_of::()); } index.as_() } diff --git a/commons/zenoh-shm/src/posix_shm/cleanup.rs b/commons/zenoh-shm/src/posix_shm/cleanup.rs index f7f6256c78..0f97efa864 100644 --- a/commons/zenoh-shm/src/posix_shm/cleanup.rs +++ b/commons/zenoh-shm/src/posix_shm/cleanup.rs @@ -25,7 +25,7 @@ mod platform { use zenoh_result::ZResult; - use crate::posix_shm::segment_lock::unix::ExclusiveShmLock; + use crate::shm; pub(crate) fn cleanup_orphaned_segments() { if let Err(e) = cleanup_orphaned_segments_inner() { @@ -34,6 +34,9 @@ mod platform { } fn cleanup_orphaned_segments_inner() -> ZResult<()> { + #[cfg(shm_external_lockfile)] + let shm_files = fs::read_dir(std::env::temp_dir())?; + #[cfg(not(shm_external_lockfile))] let shm_files = fs::read_dir("/dev/shm")?; for segment_file in shm_files.filter_map(Result::ok).filter(|f| { @@ -42,10 +45,14 @@ mod platform { } false }) { - let os_id = segment_file.file_name(); - if let Ok(lock) = ExclusiveShmLock::open_exclusive(&os_id) { - let _ = std::fs::remove_file(segment_file.path()); - drop(lock); + if let Some(Some(id_str)) = segment_file + .path() + .file_stem() + .map(|os_str| os_str.to_str()) + { + if let Ok(id) = id_str.parse::() { + shm::Segment::ensure_not_persistent(id); + } } } diff --git a/commons/zenoh-shm/src/posix_shm/mod.rs b/commons/zenoh-shm/src/posix_shm/mod.rs index bc553d008c..cb103f03d1 100644 --- a/commons/zenoh-shm/src/posix_shm/mod.rs +++ b/commons/zenoh-shm/src/posix_shm/mod.rs @@ -13,8 +13,6 @@ // pub mod array; -#[cfg(target_os = "linux")] -pub(crate) mod segment_lock; pub mod struct_in_shm; tested_crate_module!(segment); pub(crate) mod cleanup; diff --git a/commons/zenoh-shm/src/posix_shm/segment.rs b/commons/zenoh-shm/src/posix_shm/segment.rs index 9eeff20dbd..76090c02c9 100644 --- a/commons/zenoh-shm/src/posix_shm/segment.rs +++ b/commons/zenoh-shm/src/posix_shm/segment.rs @@ -12,57 +12,33 @@ // ZettaScale Zenoh Team, // -use std::fmt::{Debug, Display}; +use std::{fmt::Debug, num::NonZeroUsize}; use rand::Rng; -use shared_memory::{Shmem, ShmemConf, ShmemError}; -use zenoh_result::{bail, zerror, ZResult}; +use zenoh_result::{bail, ZResult}; -#[cfg(target_os = "linux")] -use super::segment_lock::unix::{ExclusiveShmLock, ShmLock}; -use crate::cleanup::CLEANUP; +use crate::{cleanup::CLEANUP, shm}; const SEGMENT_DEDICATE_TRIES: usize = 100; -const ECMA: crc::Crc = crc::Crc::::new(&crc::CRC_64_ECMA_182); /// Segment of shared memory identified by an ID pub struct Segment where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { - shmem: Shmem, // <-------------| - id: ID, // | - #[cfg(target_os = "linux")] // | location of these two fields matters! - _lock: Option, // <---| -} - -#[cfg(target_os = "linux")] -impl Drop for Segment -where - rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, -{ - fn drop(&mut self) { - if let Some(lock) = self._lock.take() { - if let Ok(_exclusive) = std::convert::TryInto::::try_into(lock) { - // in case if we are the last holder of this segment make ourselves owner to unlink it - self.shmem.set_owner(true); - } - } - } + shmem: shm::Segment, } impl Debug for Segment where ID: Debug, rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Segment") .field("shmem", &self.shmem.as_ptr()) - .field("id", &self.id) .finish() } } @@ -70,92 +46,52 @@ where impl Segment where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { // Automatically generate free id and create a new segment identified by this id - pub fn create(alloc_size: usize, id_prefix: &str) -> ZResult { + pub fn create(len: NonZeroUsize) -> ZResult { for _ in 0..SEGMENT_DEDICATE_TRIES { // Generate random id let id: ID = rand::thread_rng().gen(); - let os_id = Self::os_id(id.clone(), id_prefix); - - #[cfg(target_os = "linux")] - // Create lock to indicate that segment is managed - let lock = { - match ShmLock::create(&os_id) { - Ok(lock) => lock, - Err(_) => continue, - } - }; // Register cleanup routine to make sure Segment will be unlinked on exit - let c_os_id = os_id.clone(); CLEANUP.read().register_cleanup(Box::new(move || { - if let Ok(mut shmem) = ShmemConf::new().os_id(c_os_id).open() { - shmem.set_owner(true); - drop(shmem); - } + shm::Segment::ensure_not_persistent(id); })); // Try to create a new segment identified by prefix and generated id. // If creation fails because segment already exists for this id, // the creation attempt will be repeated with another id - match ShmemConf::new().size(alloc_size).os_id(os_id).create() { + match shm::Segment::create(id, len) { Ok(shmem) => { - tracing::debug!( - "Created SHM segment, size: {alloc_size}, prefix: {id_prefix}, id: {id}" - ); - #[cfg(target_os = "linux")] - let shmem = { - let mut shmem = shmem; - shmem.set_owner(false); - shmem - }; - return Ok(Segment { - shmem, - id, - #[cfg(target_os = "linux")] - _lock: Some(lock), - }); + tracing::debug!("Created SHM segment, len: {len}, id: {id}"); + return Ok(Segment { shmem }); + } + Err(shm::SegmentCreateError::SegmentExists) => {} + Err(shm::SegmentCreateError::OsError(e)) => { + bail!("Unable to create POSIX shm segment: {}", e) } - Err(ShmemError::LinkExists) => {} - Err(ShmemError::MappingIdExists) => {} - Err(e) => bail!("Unable to create POSIX shm segment: {}", e), } } bail!("Unable to dedicate POSIX shm segment file after {SEGMENT_DEDICATE_TRIES} tries!"); } // Open an existing segment identified by id - pub fn open(id: ID, id_prefix: &str) -> ZResult { - let os_id = Self::os_id(id.clone(), id_prefix); - - #[cfg(target_os = "linux")] - // Open lock to indicate that segment is managed - let lock = ShmLock::open(&os_id)?; - + pub fn open(id: ID) -> ZResult { // Open SHM segment - let shmem = ShmemConf::new().os_id(os_id).open().map_err(|e| { - zerror!( - "Error opening POSIX shm segment id {id}, prefix: {id_prefix}: {}", - e - ) - })?; - - tracing::debug!("Opened SHM segment, prefix: {id_prefix}, id: {id}"); + let shmem = match shm::Segment::open(id) { + Ok(val) => val, + Err(shm::SegmentOpenError::InvalidatedSegment) => { + bail!("Unable to open POSIX shm segment: segment is invalid!"); + } + Err(shm::SegmentOpenError::OsError(e)) => { + bail!("Unable to create POSIX shm segment: {}", e); + } + }; - Ok(Self { - shmem, - id, - #[cfg(target_os = "linux")] - _lock: Some(lock), - }) - } + tracing::debug!("Opened SHM segment, id: {id}"); - fn os_id(id: ID, id_prefix: &str) -> String { - let os_id_str = format!("{id_prefix}_{id}"); - let crc_os_id_str = ECMA.checksum(os_id_str.as_bytes()); - format!("{:x}.zenoh", crc_os_id_str) + Ok(Self { shmem }) } pub fn as_ptr(&self) -> *mut u8 { @@ -166,17 +102,11 @@ where /// NOTE: one some platforms (at least windows) the returned len will be the actual length of an shm segment /// (a required len rounded up to the nearest multiply of page size), on other (at least linux and macos) this /// returns a value requested upon segment creation - pub fn len(&self) -> usize { + pub fn len(&self) -> NonZeroUsize { self.shmem.len() } - // TODO: dead code warning occurs because of `tested_crate_module!()` macro when feature `test` is not enabled. Better to fix that - #[allow(dead_code)] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - pub fn id(&self) -> ID { - self.id.clone() + self.shmem.id() } } diff --git a/commons/zenoh-shm/src/posix_shm/segment_lock.rs b/commons/zenoh-shm/src/posix_shm/segment_lock.rs deleted file mode 100644 index ea5fcdd495..0000000000 --- a/commons/zenoh-shm/src/posix_shm/segment_lock.rs +++ /dev/null @@ -1,135 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -#[cfg(unix)] -pub(crate) mod unix { - use std::{ - ffi::OsStr, - fs::{File, OpenOptions}, - path::PathBuf, - }; - - use advisory_lock::{AdvisoryFileLock, FileLockMode}; - use zenoh_result::ZResult; - - #[repr(transparent)] - pub(crate) struct ShmLock(LockInner); - - impl Drop for ShmLock { - fn drop(&mut self) { - if AdvisoryFileLock::try_lock(&self.0._tempfile, FileLockMode::Exclusive).is_ok() { - let _ = std::fs::remove_file(self.0.path.clone()); - } - } - } - - impl ShmLock { - pub(crate) fn create(os_id: &T) -> ZResult - where - T: ?Sized + AsRef, - { - // calculate tempfile path - let path = tmp_file_path(os_id); - - // create tempfile just to lock it - let tempfile = OpenOptions::new() - .write(true) - .create_new(true) - .open(path.clone())?; - - // lock tempfile with shared lock to indicate that file is managed - AdvisoryFileLock::try_lock(&tempfile, FileLockMode::Shared)?; - - Ok(Self(LockInner { - path, - _tempfile: tempfile, - })) - } - - pub(crate) fn open(os_id: &T) -> ZResult - where - T: ?Sized + AsRef, - { - // calculate tempfile path - let path = tmp_file_path(os_id); - - // open tempfile just to lock it - let tempfile = OpenOptions::new().read(true).open(path.clone())?; - - // lock tempfile with shared lock to indicate that file is managed - AdvisoryFileLock::try_lock(&tempfile, FileLockMode::Shared)?; - - Ok(Self(LockInner { - path, - _tempfile: tempfile, - })) - } - } - - #[repr(transparent)] - pub(crate) struct ExclusiveShmLock(LockInner); - - impl ExclusiveShmLock { - pub(crate) fn open_exclusive(os_id: &T) -> ZResult - where - T: ?Sized + AsRef, - { - // calculate tempfile path - let path = tmp_file_path(os_id); - - // create or open tempfile just to lock it - let tempfile = OpenOptions::new() - .write(true) - .truncate(false) - .create(true) - .open(path.clone())?; - - // lock tempfile with exclusive lock to guarantee that the file is unmanaged - AdvisoryFileLock::try_lock(&tempfile, FileLockMode::Exclusive)?; - - Ok(Self(LockInner { - path, - _tempfile: tempfile, - })) - } - } - - impl Drop for ExclusiveShmLock { - fn drop(&mut self) { - let _ = std::fs::remove_file(self.0.path.clone()); - } - } - - impl TryFrom for ExclusiveShmLock { - type Error = (); - - fn try_from(value: ShmLock) -> Result { - if AdvisoryFileLock::try_lock(&value.0._tempfile, FileLockMode::Exclusive).is_ok() { - return Ok(unsafe { core::mem::transmute::(value) }); - } - Err(()) - } - } - - struct LockInner { - path: PathBuf, - _tempfile: File, - } - - fn tmp_file_path(os_id: &T) -> PathBuf - where - T: ?Sized + AsRef, - { - std::env::temp_dir().join(PathBuf::from(os_id)) - } -} diff --git a/commons/zenoh-shm/src/posix_shm/struct_in_shm.rs b/commons/zenoh-shm/src/posix_shm/struct_in_shm.rs index 009bac8e0e..39a99d6714 100644 --- a/commons/zenoh-shm/src/posix_shm/struct_in_shm.rs +++ b/commons/zenoh-shm/src/posix_shm/struct_in_shm.rs @@ -13,9 +13,9 @@ // use std::{ - fmt::Display, marker::PhantomData, mem::size_of, + num::NonZeroUsize, ops::{Deref, DerefMut}, }; @@ -23,13 +23,14 @@ use std::{ use zenoh_result::ZResult; use super::segment::Segment; +use crate::shm; /// An SHM segment that contains data structure #[derive(Debug)] pub struct StructInSHM where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { inner: Segment, _phantom: PhantomData, @@ -38,38 +39,38 @@ where unsafe impl Sync for StructInSHM where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { } unsafe impl Send for StructInSHM where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, + ID: shm::SegmentID, { } impl StructInSHM where rand::distributions::Standard: rand::distributions::Distribution, - // Elem: IStable, // todo: stabby does not support IStable for big arrays - ID: Clone + Display, + // Elem: IStable, // TODO: stabby does not support IStable for big arrays + ID: shm::SegmentID, { // Perform compile time check that Elem is not a ZST const _S: () = if size_of::() == 0 { panic!("Elem is a ZST. ZSTs are not allowed"); }; - pub fn create(file_prefix: &str) -> ZResult { - let alloc_size = size_of::(); - let inner = Segment::create(alloc_size, file_prefix)?; + pub fn create() -> ZResult { + let alloc_size = NonZeroUsize::try_from(size_of::())?; + let inner = Segment::create(alloc_size)?; Ok(Self { inner, _phantom: PhantomData, }) } - pub fn open(id: ID, file_prefix: &str) -> ZResult { - let inner = Segment::open(id, file_prefix)?; + pub fn open(id: ID) -> ZResult { + let inner = Segment::open(id)?; Ok(Self { inner, _phantom: PhantomData, @@ -89,8 +90,8 @@ where impl Deref for StructInSHM where rand::distributions::Standard: rand::distributions::Distribution, - // Elem: IStable, // todo: stabby does not support IStable for big arrays - ID: Clone + Display, + // Elem: IStable, // TODO: stabby does not support IStable for big arrays + ID: shm::SegmentID, { type Target = Elem; @@ -102,8 +103,8 @@ where impl DerefMut for StructInSHM where rand::distributions::Standard: rand::distributions::Distribution, - // Elem: IStable, // todo: stabby does not support IStable for big arrays - ID: Clone + Display, + // Elem: IStable, // TODO: stabby does not support IStable for big arrays + ID: shm::SegmentID, { fn deref_mut(&mut self) -> &mut Self::Target { unsafe { &mut *(self.inner.as_ptr() as *mut Elem) } diff --git a/commons/zenoh-shm/src/shm/mod.rs b/commons/zenoh-shm/src/shm/mod.rs new file mode 100644 index 0000000000..f56fa8fb38 --- /dev/null +++ b/commons/zenoh-shm/src/shm/mod.rs @@ -0,0 +1,79 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{fmt::Display, num::NonZeroUsize, ops::Deref}; + +use num_traits::Unsigned; + +#[cfg(target_os = "windows")] +mod windows; +#[cfg(target_os = "windows")] +use windows as platform; +#[cfg(any(target_os = "freebsd", target_os = "linux", target_os = "macos"))] +pub mod unix; +#[cfg(any(target_os = "freebsd", target_os = "linux", target_os = "macos"))] +pub use unix as platform; +#[cfg(all( + not(target_os = "windows"), + not(any(target_os = "freebsd", target_os = "linux", target_os = "macos")) +))] +compile_error!("shared_memory isnt implemented for this platform..."); + +#[derive(Debug)] +pub enum SegmentCreateError { + SegmentExists, + OsError(u32), +} + +#[derive(Debug)] +pub enum SegmentOpenError { + OsError(u32), + InvalidatedSegment, +} + +pub type ShmCreateResult = core::result::Result; +pub type ShmOpenResult = core::result::Result; + +pub trait SegmentID: Unsigned + Display + Copy + Send + 'static {} +impl SegmentID for T {} + +pub struct Segment { + inner: platform::SegmentImpl, +} + +impl Segment { + pub fn create(id: ID, len: NonZeroUsize) -> ShmCreateResult { + let inner = platform::SegmentImpl::create(id, len)?; + Ok(Self { inner }) + } + + pub fn open(id: ID) -> ShmOpenResult { + let inner = platform::SegmentImpl::open(id)?; + Ok(Self { inner }) + } + + pub fn ensure_not_persistent(id: ID) { + if let Err(err) = platform::SegmentImpl::open(id) { + tracing::trace!("Error cleaning up segment: {:?}", err); + } + } +} + +impl Deref for Segment { + type Target = platform::SegmentImpl; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs new file mode 100644 index 0000000000..baa03d85b0 --- /dev/null +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -0,0 +1,241 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#[cfg(shm_external_lockfile)] +use std::os::fd::FromRawFd; +use std::{ + ffi::c_void, + num::NonZeroUsize, + os::fd::{AsRawFd, OwnedFd}, + ptr::NonNull, +}; + +use advisory_lock::{AdvisoryFileLock, FileLockMode}; +#[cfg(shm_external_lockfile)] +use nix::fcntl::open; +use nix::{ + fcntl::OFlag, + sys::{ + mman::{mmap, munmap, shm_open, shm_unlink, MapFlags, ProtFlags}, + stat::{fstat, Mode}, + }, + unistd::ftruncate, +}; + +use super::{SegmentCreateError, SegmentID, SegmentOpenError, ShmCreateResult, ShmOpenResult}; + +pub struct SegmentImpl { + lock_fd: OwnedFd, + len: NonZeroUsize, + data_ptr: NonNull, + id: ID, +} + +// PUBLIC +impl SegmentImpl { + pub fn create(id: ID, len: NonZeroUsize) -> ShmCreateResult { + // we use separate lockfile on non-tmpfs for bsd + #[cfg(shm_external_lockfile)] + let lock_fd = unsafe { + OwnedFd::from_raw_fd({ + let lockpath = std::env::temp_dir().join(Self::id_str(id)); + let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; + open(&lockpath, flags, mode).map_err(|e| match e { + nix::Error::EEXIST => SegmentCreateError::SegmentExists, + e => SegmentCreateError::OsError(e as u32), + }) + }?) + }; + + // create unique shm fd + let fd = { + let id = Self::id_str(id); + let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; + + // TODO: these flags probably can be exposed to the config + let mode = Mode::S_IRUSR | Mode::S_IWUSR; + + tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); + match shm_open(id.as_str(), flags, mode) { + Ok(v) => v, + Err(nix::Error::EEXIST) => return Err(SegmentCreateError::SegmentExists), + Err(e) => return Err(SegmentCreateError::OsError(e as u32)), + } + }; + + // on non-bsd we use our SHM file also for locking + #[cfg(not(shm_external_lockfile))] + let lock_fd = fd; + #[cfg(not(shm_external_lockfile))] + let fd = &lock_fd; + + #[cfg(shm_external_lockfile)] + let fd = &fd; + + // put shared advisory lock on lock_fd + lock_fd + .as_raw_fd() + .try_lock(FileLockMode::Shared) + .map_err(|e| match e { + advisory_lock::FileLockError::AlreadyLocked => SegmentCreateError::SegmentExists, + advisory_lock::FileLockError::Io(e) => { + SegmentCreateError::OsError(e.raw_os_error().unwrap_or(0) as _) + } + })?; + + // resize shm segment to requested size + tracing::trace!("ftruncate(fd={}, len={})", fd.as_raw_fd(), len); + ftruncate(fd, len.get() as _).map_err(|e| SegmentCreateError::OsError(e as u32))?; + + // get real segment size + let len = { + let stat = fstat(fd.as_raw_fd()).map_err(|e| SegmentCreateError::OsError(e as u32))?; + NonZeroUsize::new(stat.st_size as usize).ok_or(SegmentCreateError::OsError(0))? + }; + + // map segment into our address space + let data_ptr = Self::map(len, fd).map_err(|e| SegmentCreateError::OsError(e as _))?; + + Ok(Self { + lock_fd, + len, + data_ptr, + id, + }) + } + + pub fn open(id: ID) -> ShmOpenResult { + // we use separate lockfile on non-tmpfs for bsd + #[cfg(shm_external_lockfile)] + let lock_fd = unsafe { + OwnedFd::from_raw_fd({ + let lockpath = std::env::temp_dir().join(Self::id_str(id)); + let flags = OFlag::O_RDWR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; + open(&lockpath, flags, mode).map_err(|e| SegmentOpenError::OsError(e as _)) + }?) + }; + + // open shm fd + let fd = { + let id = Self::id_str(id); + let flags = OFlag::O_RDWR; + + // TODO: these flags probably can be exposed to the config + let mode = Mode::S_IRUSR | Mode::S_IWUSR; + + tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); + match shm_open(id.as_str(), flags, mode) { + Ok(v) => v, + Err(e) => return Err(SegmentOpenError::OsError(e as u32)), + } + }; + + // on non-bsd we use our SHM file also for locking + #[cfg(not(shm_external_lockfile))] + let lock_fd = fd; + #[cfg(not(shm_external_lockfile))] + let fd = &lock_fd; + + #[cfg(shm_external_lockfile)] + let fd = &fd; + + // put shared advisory lock on lock_fd + lock_fd + .as_raw_fd() + .try_lock(FileLockMode::Shared) + .map_err(|e| match e { + advisory_lock::FileLockError::AlreadyLocked => SegmentOpenError::InvalidatedSegment, + advisory_lock::FileLockError::Io(e) => { + SegmentOpenError::OsError(e.raw_os_error().unwrap_or(0) as _) + } + })?; + + // get real segment size + let len = { + let stat = fstat(fd.as_raw_fd()).map_err(|e| SegmentOpenError::OsError(e as u32))?; + NonZeroUsize::new(stat.st_size as usize).ok_or(SegmentOpenError::InvalidatedSegment)? + }; + + // map segment into our address space + let data_ptr = Self::map(len, fd).map_err(|e| SegmentOpenError::OsError(e as _))?; + + Ok(Self { + lock_fd, + len, + data_ptr, + id, + }) + } + + pub fn id(&self) -> ID { + self.id + } + + pub fn len(&self) -> NonZeroUsize { + self.len + } + + pub fn as_ptr(&self) -> *mut u8 { + self.data_ptr.as_ptr() as _ + } +} + +// PRIVATE +impl SegmentImpl { + fn id_str(id: ID) -> String { + format!("{id}.zenoh") + } + + fn map(len: NonZeroUsize, fd: &OwnedFd) -> nix::Result> { + let prot = ProtFlags::PROT_READ | ProtFlags::PROT_WRITE; + let flags = MapFlags::MAP_SHARED; + + tracing::trace!( + "mmap(addr=NULL, length={}, prot={:X}, flags={:X}, f={}, offset=0)", + len, + prot, + flags, + fd.as_raw_fd() + ); + + unsafe { mmap(None, len, prot, flags, fd, 0) } + } +} + +impl Drop for SegmentImpl { + fn drop(&mut self) { + tracing::trace!("munmap(addr={:p},len={})", self.data_ptr, self.len); + if let Err(e) = unsafe { munmap(self.data_ptr, self.len.get()) } { + tracing::debug!("munmap() failed : {}", e); + }; + + if self + .lock_fd + .as_raw_fd() + .try_lock(FileLockMode::Exclusive) + .is_ok() + { + let id = Self::id_str(self.id); + tracing::trace!("shm_unlink(name={})", id); + let _ = shm_unlink(id.as_str()); + #[cfg(shm_external_lockfile)] + { + let lockpath = std::env::temp_dir().join(id); + let _ = std::fs::remove_file(lockpath); + } + } + } +} diff --git a/commons/zenoh-shm/src/shm/windows.rs b/commons/zenoh-shm/src/shm/windows.rs new file mode 100644 index 0000000000..0b4d3f5bab --- /dev/null +++ b/commons/zenoh-shm/src/shm/windows.rs @@ -0,0 +1,149 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::num::NonZeroUsize; + +use win_sys::*; +use winapi::um::errhandlingapi::GetLastError; + +use super::{SegmentCreateError, SegmentID, SegmentOpenError, ShmCreateResult, ShmOpenResult}; + +pub struct SegmentImpl { + _fd: FileMapping, + len: NonZeroUsize, + data_ptr: ViewOfFile, + id: ID, +} + +// PUBLIC +impl SegmentImpl { + pub fn create(id: ID, len: NonZeroUsize) -> ShmCreateResult { + let fd = { + let id = Self::id_str(id); + let high_size = ((len.get() as u64 & 0xFFFF_FFFF_0000_0000_u64) >> 32) as _; + let low_size = (len.get() as u64 & 0xFFFF_FFFF_u64) as _; + tracing::trace!( + "CreateFileMapping({:?}, NULL, {:X}, {}, {}, '{}')", + INVALID_HANDLE_VALUE, + PAGE_READWRITE.0, + high_size, + low_size, + id, + ); + + // If the mapping already exists, GetLastError() will return ERROR_ALREADY_EXISTS, + // and you'll receive a handle to the existing mapping instead of creating a new one. + let fd = CreateFileMapping( + INVALID_HANDLE_VALUE, + None, + PAGE_READWRITE, + high_size, + low_size, + id.as_str(), + ) + .map_err(|e| match e.win32_error().unwrap() { + ERROR_ALREADY_EXISTS => SegmentCreateError::SegmentExists, + err_code => SegmentCreateError::OsError(err_code.0 as _), + })?; + + // check error + if unsafe { GetLastError() } == ERROR_ALREADY_EXISTS.0 { + return Err(SegmentCreateError::SegmentExists); + } + + fd + }; + + let (data_ptr, len) = + Self::map(&fd).map_err(|e| SegmentCreateError::OsError(e.win32_error().unwrap().0))?; + + let len = len + .try_into() + .map_err(|_e| SegmentCreateError::OsError(0))?; + + Ok(Self { + _fd: fd, + len, + data_ptr, + id, + }) + } + + pub fn open(id: ID) -> ShmOpenResult { + let fd = { + let id = Self::id_str(id); + tracing::trace!( + "OpenFileMappingW({:?}, {}, '{}')", + FILE_MAP_ALL_ACCESS, + false, + id, + ); + + OpenFileMapping(FILE_MAP_ALL_ACCESS, false, id.as_str()) + .map_err(|e| SegmentOpenError::OsError(e.win32_error().unwrap().0)) + }?; + + let (data_ptr, len) = + Self::map(&fd).map_err(|e| SegmentOpenError::OsError(e.win32_error().unwrap().0))?; + + let len = len + .try_into() + .map_err(|_| SegmentOpenError::InvalidatedSegment)?; + + Ok(Self { + _fd: fd, + len, + data_ptr, + id, + }) + } + + pub fn id(&self) -> ID { + self.id + } + + pub fn len(&self) -> NonZeroUsize { + self.len + } + + pub fn as_ptr(&self) -> *mut u8 { + self.data_ptr.as_mut_ptr() as _ + } +} + +// PRIVATE +impl SegmentImpl { + fn id_str(id: ID) -> String { + format!("{}.zenoh", id) + } + + fn map(fd: &FileMapping) -> Result<(ViewOfFile, usize), Error> { + let data_ptr = { + tracing::trace!( + "MapViewOfFile(0x{:X}, {:X}, 0, 0, 0)", + fd, + (FILE_MAP_READ | FILE_MAP_WRITE).0, + ); + MapViewOfFile(fd.as_handle(), FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, 0) + }?; + + let len = { + let mut info = MEMORY_BASIC_INFORMATION::default(); + VirtualQuery(data_ptr.as_mut_ptr(), &mut info)?; + info.RegionSize + }; + + Ok((data_ptr, len)) + } +} diff --git a/commons/zenoh-shm/tests/common/mod.rs b/commons/zenoh-shm/tests/common/mod.rs index e1d4222b0e..73ebef4fca 100644 --- a/commons/zenoh-shm/tests/common/mod.rs +++ b/commons/zenoh-shm/tests/common/mod.rs @@ -19,8 +19,6 @@ use std::{ use zenoh_result::ZResult; -pub const TEST_SEGMENT_PREFIX: &str = "test"; - pub fn validate_memory(mem1: &mut [u8], mem2: &[u8]) { assert!(mem1.len() == mem2.len()); for cycle in 0..255u8 { diff --git a/commons/zenoh-shm/tests/posix_array.rs b/commons/zenoh-shm/tests/posix_array.rs index 2837f44f79..47736b846a 100644 --- a/commons/zenoh-shm/tests/posix_array.rs +++ b/commons/zenoh-shm/tests/posix_array.rs @@ -12,13 +12,12 @@ // ZettaScale Zenoh Team, // -use std::{fmt::Debug, mem::size_of}; +use std::{fmt::Debug, mem::size_of, num::NonZeroUsize}; use num_traits::{AsPrimitive, PrimInt, Unsigned}; use zenoh_shm::posix_shm::array::ArrayInSHM; pub mod common; -use common::TEST_SEGMENT_PREFIX; type TestSegmentID = u32; @@ -43,20 +42,21 @@ impl TestElem { fn validate_array( created_array: &mut ArrayInSHM, opened_array: &ArrayInSHM, - expected_elem_count: usize, + expected_elem_count: NonZeroUsize, ) where ElemIndex: Unsigned + PrimInt + 'static + AsPrimitive, isize: AsPrimitive, usize: AsPrimitive, { - assert!(created_array.elem_count() == expected_elem_count); + assert!(created_array.elem_count() >= expected_elem_count); assert!(opened_array.elem_count() >= expected_elem_count); + assert!(opened_array.elem_count() == created_array.elem_count()); let mut fill_ctr = 0; let mut validate_ctr = 0; // first of all, fill and validate elements sequentially - for i in 0..expected_elem_count { + for i in 0..created_array.elem_count().get() { unsafe { let elem1 = &mut *created_array.elem_mut(i.as_()); let elem2 = &*opened_array.elem(i.as_()); @@ -67,7 +67,7 @@ fn validate_array( } // then fill all the elements... - for i in 0..expected_elem_count { + for i in 0..created_array.elem_count().get() { unsafe { let elem1 = &mut *created_array.elem_mut(i.as_()); elem1.fill(&mut fill_ctr); @@ -75,7 +75,7 @@ fn validate_array( } // ...and validate all the elements - for i in 0..expected_elem_count { + for i in 0..opened_array.elem_count().get() { unsafe { let elem2 = &*opened_array.elem(i.as_()); elem2.validate(&mut validate_ctr); @@ -92,18 +92,19 @@ where // Estimate elem count to test // NOTE: for index sizes <= 16 bit we use the whole index range to test, // and for bigger indexes we use limited index range - let elem_count = { + let elem_count = NonZeroUsize::new({ match size_of::() > size_of::() { true => 100, false => ElemIndex::max_value().as_() + 1, } - }; + }) + .unwrap(); let mut new_arr: ArrayInSHM = - ArrayInSHM::create(elem_count, TEST_SEGMENT_PREFIX).expect("error creating new array!"); + ArrayInSHM::create(elem_count).expect("error creating new array!"); let opened_arr: ArrayInSHM<_, TestElem, ElemIndex> = - ArrayInSHM::open(new_arr.id(), TEST_SEGMENT_PREFIX).expect("error opening existing array!"); + ArrayInSHM::open(new_arr.id()).expect("error opening existing array!"); validate_array(&mut new_arr, &opened_arr, elem_count); } @@ -135,8 +136,7 @@ where let invalid_elem_count = ElemIndex::max_value().as_() + 2; let _ = ArrayInSHM::::create( - invalid_elem_count, - TEST_SEGMENT_PREFIX, + invalid_elem_count.try_into().unwrap(), ) .expect_err( format!("must fail: element count {invalid_elem_count} is out of range for ElemIndex!") diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs index 879fccf298..274bdd4917 100644 --- a/commons/zenoh-shm/tests/posix_segment.rs +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -12,50 +12,49 @@ // ZettaScale Zenoh Team, // #![cfg(feature = "test")] -use std::{fmt::Display, slice}; +use std::{num::NonZeroUsize, slice}; -use zenoh_shm::posix_shm::segment::Segment; +use zenoh_shm::{posix_shm::segment::Segment, shm::SegmentID}; pub mod common; -use common::{validate_memory, TEST_SEGMENT_PREFIX}; +use common::validate_memory; -fn validate_segment( +fn validate_segment( created_segment: &Segment, opened_segment: &Segment, - expected_elem_count: usize, + expected_elem_count: NonZeroUsize, ) where rand::distributions::Standard: rand::distributions::Distribution, - ID: Clone + Display, { - assert!(created_segment.len() == expected_elem_count); + assert!(created_segment.len() >= expected_elem_count); assert!(opened_segment.len() >= expected_elem_count); + assert!(opened_segment.len() == created_segment.len()); let ptr1 = created_segment.as_ptr(); let ptr2 = opened_segment.as_ptr(); - let slice1 = unsafe { slice::from_raw_parts_mut(ptr1, expected_elem_count) }; - let slice2 = unsafe { slice::from_raw_parts(ptr2, expected_elem_count) }; + let slice1 = unsafe { slice::from_raw_parts_mut(ptr1, created_segment.len().get()) }; + let slice2 = unsafe { slice::from_raw_parts(ptr2, opened_segment.len().get()) }; validate_memory(slice1, slice2); } -fn test_segment() +fn test_segment() where rand::distributions::Standard: rand::distributions::Distribution, - ID: Copy + Clone + Display, { - let elem_count = 900; + let elem_count = 900.try_into().unwrap(); let created_segment: Segment = - Segment::create(elem_count, TEST_SEGMENT_PREFIX).expect("error creating new segment"); + Segment::create(elem_count).expect("error creating new segment"); - let opened_segment_instance_1 = Segment::open(created_segment.id(), TEST_SEGMENT_PREFIX) - .expect("error opening existing segment!"); + let opened_segment_instance_1 = + Segment::open(created_segment.id()).expect("error opening existing segment!"); validate_segment(&created_segment, &opened_segment_instance_1, elem_count); - let opened_segment_instance_2 = Segment::open(created_segment.id(), TEST_SEGMENT_PREFIX) - .expect("error opening existing segment!"); + let opened_segment_instance_2 = + Segment::open(created_segment.id()).expect("error opening existing segment!"); validate_segment(&created_segment, &opened_segment_instance_1, elem_count); validate_segment(&created_segment, &opened_segment_instance_2, elem_count); @@ -86,57 +85,31 @@ fn segment_u64_id() { test_segment::() } -#[test] -fn segment_u128_id() { - test_segment::() -} - -/// SIGNED /// - -#[test] -fn segment_i8_id() { - test_segment::() -} - -#[test] -fn segment_i16_id() { - test_segment::() -} - -#[test] -fn segment_i32_id() { - test_segment::() -} - -#[test] -fn segment_i64_id() { - test_segment::() -} - -#[test] -fn segment_i128_id() { - test_segment::() -} +// TODO: this is not yet supported (produces too long shm name for Mac), +// but we don't really need this +//#[test] +//fn segment_u128_id() { +// test_segment::() +//} /// Behaviour checks /// #[test] fn segment_open() { let created_segment: Segment = - Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); + Segment::create(900.try_into().unwrap()).expect("error creating new segment"); - let _opened_segment = Segment::open(created_segment.id(), TEST_SEGMENT_PREFIX) - .expect("error opening existing segment!"); + let _opened_segment = + Segment::open(created_segment.id()).expect("error opening existing segment!"); } #[test] fn segment_open_error() { let id = { let created_segment: Segment = - Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); + Segment::create(900.try_into().unwrap()).expect("error creating new segment"); created_segment.id() }; - let _opened_segment = Segment::open(id, TEST_SEGMENT_PREFIX) - .expect_err("must fail: opened not existing segment!"); + let _opened_segment = Segment::open(id).expect_err("must fail: opened not existing segment!"); } diff --git a/commons/zenoh-shm/tests/posix_shm_provider.rs b/commons/zenoh-shm/tests/posix_shm_provider.rs index 77a95f9d9f..25af979980 100644 --- a/commons/zenoh-shm/tests/posix_shm_provider.rs +++ b/commons/zenoh-shm/tests/posix_shm_provider.rs @@ -25,15 +25,17 @@ use zenoh_shm::api::{ }; static BUFFER_NUM: usize = 100; -static BUFFER_SIZE: usize = 1024; +static BUFFER_SIZE: usize = 1000; #[test] fn posix_shm_provider_create() { - let _backend = PosixShmProviderBackend::builder() - .with_size(1024) + let size = 1024; + let backend = PosixShmProviderBackend::builder() + .with_size(size) .expect("Error creating Layout!") .wait() .expect("Error creating PosixShmProviderBackend!"); + assert!(backend.available() >= size); } #[test] @@ -74,26 +76,41 @@ fn posix_shm_provider_open() { #[test] fn posix_shm_provider_allocator() { + // size to allocate in the provider + let size_to_alloc = BUFFER_SIZE * BUFFER_NUM; + let backend = PosixShmProviderBackend::builder() - .with_size(BUFFER_SIZE * BUFFER_NUM) + .with_size(size_to_alloc) .expect("Error creating Layout!") .wait() .expect("Error creating PosixShmProviderBackend!"); + // the real size of memory available in the provider + let real_size = backend.available(); + assert!(real_size >= size_to_alloc); + + // the real number of buffers allocatable in the provider + let real_num = real_size / BUFFER_SIZE; + assert!(real_num >= BUFFER_NUM); + + // the remainder in the provider + let remainder = real_size - real_num * BUFFER_SIZE; + assert!(remainder < BUFFER_SIZE); + let layout = MemoryLayout::new(BUFFER_SIZE, AllocAlignment::default()).unwrap(); // exhaust memory by allocating it all let mut buffers = vec![]; - for _ in 0..BUFFER_NUM { + for _ in 0..real_num { let buf = backend .alloc(&layout) .expect("PosixShmProviderBackend: error allocating buffer"); buffers.push(buf); } - for _ in 0..BUFFER_NUM { + for _ in 0..real_num { // there is nothing to allocate at this point - assert_eq!(backend.available(), 0); + assert_eq!(backend.available(), remainder); assert!(backend.alloc(&layout).is_err()); // free buffer @@ -107,11 +124,11 @@ fn posix_shm_provider_allocator() { buffers.push(buf); } - // free everything + // free buffers while let Some(buffer) = buffers.pop() { backend.free(&buffer.descriptor); } // confirm that allocator is free - assert_eq!(backend.available(), BUFFER_NUM * BUFFER_SIZE); + assert_eq!(backend.available(), real_size); } diff --git a/commons/zenoh-shm/tests/shm.rs b/commons/zenoh-shm/tests/shm.rs new file mode 100644 index 0000000000..2bc01488a0 --- /dev/null +++ b/commons/zenoh-shm/tests/shm.rs @@ -0,0 +1,95 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#![cfg(feature = "test")] +use zenoh_shm::shm::Segment; + +#[test] +fn create() { + let id = (line!() as u64) + ((std::process::id() as u64) << 32); + let len = 1024.try_into().unwrap(); + let created_segment = Segment::create(id, len).unwrap(); + assert!(created_segment.len() >= len); +} + +#[test] +fn create_concurrent() { + let id = (line!() as u64) + ((std::process::id() as u64) << 32); + let len = 1024.try_into().unwrap(); + let created_segment = Segment::create(id, len).unwrap(); + assert!(created_segment.len() >= len); + assert!(Segment::create(id, len).is_err()); +} + +#[test] +fn create_and_open() { + let id = (line!() as u64) + ((std::process::id() as u64) << 32); + let len = 1024.try_into().unwrap(); + let created_segment = Segment::create(id, len).unwrap(); + let opened_segment = Segment::open(id).unwrap(); + assert!(created_segment.len() >= len); + assert!(opened_segment.len() >= len); +} + +#[test] +fn create_and_open_and_reopen() { + let id = (line!() as u64) + ((std::process::id() as u64) << 32); + let len = 1024.try_into().unwrap(); + let created_segment = Segment::create(id, len).unwrap(); + let opened_segment = Segment::open(id).unwrap(); + let opened_segment2 = Segment::open(id).unwrap(); + assert!(created_segment.len() >= len); + assert!(opened_segment.len() >= len); + assert!(opened_segment2.len() >= len); +} + +#[test] +fn create_and_open_and_reopen_and_open_closed() { + let id = (line!() as u64) + ((std::process::id() as u64) << 32); + let len = 1024.try_into().unwrap(); + let created_segment = Segment::create(id, len).unwrap(); + let opened_segment = Segment::open(id).unwrap(); + assert!(created_segment.len() >= len); + assert!(opened_segment.len() >= len); + + drop(created_segment); + + let opened_segment2 = Segment::open(id).unwrap(); + assert!(opened_segment2.len() >= len); +} + +#[test] +fn no_persistency() { + let id = (line!() as u64) + ((std::process::id() as u64) << 32); + let len = 1024.try_into().unwrap(); + let created_segment = Segment::create(id, len).unwrap(); + assert!(created_segment.len() >= len); + drop(created_segment); + + assert!(Segment::open(id).is_err()); +} + +#[test] +fn recreate_many_times() { + let id = (line!() as u64) + ((std::process::id() as u64) << 32); + let len = 1024.try_into().unwrap(); + + for _ in 0..100 { + let created_segment = Segment::create(id, len).unwrap(); + let opened_segment = Segment::open(id).unwrap(); + assert!(created_segment.len() >= len); + assert!(opened_segment.len() >= len); + } + + assert!(Segment::open(id).is_err()); +} diff --git a/io/zenoh-transport/src/unicast/establishment/ext/shm.rs b/io/zenoh-transport/src/unicast/establishment/ext/shm.rs index 0d0d8f0bb7..3d3df7e941 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/shm.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/shm.rs @@ -33,8 +33,6 @@ use crate::unicast::establishment::{AcceptFsm, OpenFsm}; /*************************************/ /* Segment */ /*************************************/ -const AUTH_SEGMENT_PREFIX: &str = "auth"; - pub(crate) type AuthSegmentID = u32; pub(crate) type AuthChallenge = u64; @@ -51,8 +49,7 @@ pub struct AuthSegment { impl AuthSegment { pub fn create(challenge: AuthChallenge, shm_protocols: &[ProtocolID]) -> ZResult { let array = ArrayInSHM::::create( - ID_START_INDEX + shm_protocols.len(), - AUTH_SEGMENT_PREFIX, + (ID_START_INDEX + shm_protocols.len()).try_into()?, )?; unsafe { (*array.elem_mut(LEN_INDEX)) = shm_protocols.len() as AuthChallenge; @@ -60,18 +57,19 @@ impl AuthSegment { // SHM implementation and the old one (*array.elem_mut(CHALLENGE_INDEX)) = !challenge; (*array.elem_mut(VERSION_INDEX)) = SHM_VERSION; - for elem in ID_START_INDEX..array.elem_count() { - (*array.elem_mut(elem)) = shm_protocols[elem - ID_START_INDEX] as u64; + #[allow(clippy::needless_range_loop)] + for elem_index in 0..shm_protocols.len() { + (*array.elem_mut(ID_START_INDEX + elem_index)) = shm_protocols[elem_index] as u64; } }; Ok(Self { array }) } pub fn open(id: AuthSegmentID) -> ZResult { - let array = ArrayInSHM::open(id, AUTH_SEGMENT_PREFIX)?; + let array = ArrayInSHM::open(id)?; // validate minimal array length - if array.elem_count() < ID_START_INDEX { + if array.elem_count().get() < ID_START_INDEX { bail!("SHM auth segment is too small, maybe the other side is using an incompatible SHM version?") } @@ -112,8 +110,9 @@ impl AuthSegment { pub fn protocols(&self) -> Vec { let mut result = vec![]; - for elem in ID_START_INDEX..self.array.elem_count() { - result.push(unsafe { *self.array.elem(elem) as u32 }); + let len = unsafe { (*self.array.elem(LEN_INDEX)) as usize }; + for elem in ID_START_INDEX..ID_START_INDEX + len { + result.push(unsafe { *self.array.elem(elem) as ProtocolID }); } result }