Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative SHM segment cleanup #1445

Merged
merged 15 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions commons/zenoh-shm/src/api/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use crate::posix_shm::cleanup::cleanup_orphaned_segments;

/// Linux: Trigger cleanup for orphaned SHM segments
/// If process that created named SHM segment crashes or exits by a signal, the segment persists in the system
/// disregarding if it is used by other Zenoh processes or not. This is the detail of POSIX specification for
/// shared memory that is hard to bypass. To deal with this we developed a cleanup routine that enumerates all
/// segments and tries to find processes that are using it. If no such process found, segment will be removed.
/// There is no ideal signal to trigger this cleanup, so by default, zenoh triggers it in the following moments:
/// - first POSIX SHM segment creation
/// - process exit via exit() call or return from maint function
/// It is OK to additionally trigger this function at any time, but be aware that this can be costly.
///
/// For non-linux platforms this function currently does nothing
#[zenoh_macros::unstable_doc]
pub fn cleanup_orphaned_shm_segments() {
cleanup_orphaned_segments();
}
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//

pub mod buffer;
pub mod cleanup;
pub mod client;
pub mod client_storage;
pub mod common;
Expand Down
16 changes: 13 additions & 3 deletions commons/zenoh-shm/src/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use static_init::dynamic;

use crate::posix_shm::cleanup::cleanup_orphaned_segments;

/// A global cleanup, that is guaranteed to be dropped at normal program exit and that will
/// execute all registered cleanup routines at this moment
#[dynamic(lazy, drop)]
Expand All @@ -26,6 +28,8 @@ pub(crate) struct Cleanup {

impl Cleanup {
fn new() -> Self {
// on first cleanup subsystem touch we perform zenoh segment cleanup
cleanup_orphaned_segments();
Self {
cleanups: Default::default(),
}
Expand All @@ -34,14 +38,20 @@ impl Cleanup {
pub(crate) fn register_cleanup(&self, cleanup_fn: Box<dyn FnOnce() + Send>) {
self.cleanups.push(Some(cleanup_fn));
}
}

impl Drop for Cleanup {
fn drop(&mut self) {
fn cleanup(&self) {
while let Some(cleanup) = self.cleanups.pop() {
if let Some(f) = cleanup {
f();
}
}
}
}

impl Drop for Cleanup {
fn drop(&mut self) {
// on finalization stage we perform zenoh segment cleanup
cleanup_orphaned_segments();
self.cleanup();
}
}
134 changes: 134 additions & 0 deletions commons/zenoh-shm/src/posix_shm/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

pub(crate) use platform::cleanup_orphaned_segments;

#[cfg(not(all(unix, not(target_os = "macos"))))]
mod platform {
pub(crate) fn cleanup_orphaned_segments() {}
yellowhatter marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(all(unix, not(target_os = "macos")))]
mod platform {
use std::{borrow::Borrow, collections::HashSet, fs, path::PathBuf};

use zenoh_result::ZResult;

#[derive(PartialEq, Eq, Hash)]
struct ProcFdDir(PathBuf);

impl ProcFdDir {
fn enumerate_fds(&self) -> ZResult<HashSet<FdFile>> {
let fds = self.0.read_dir()?;
let fd_map: HashSet<FdFile> = fds
.filter_map(Result::ok)
.map(|f| std::convert::Into::<FdFile>::into(f.path()))
.collect();
Ok(fd_map)
}
}

impl From<PathBuf> for ProcFdDir {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

#[derive(PartialEq, Eq, Hash)]
struct FdFile(PathBuf);

impl From<PathBuf> for FdFile {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

#[derive(PartialEq, Eq, Hash)]
struct ShmFile(PathBuf);

impl ShmFile {
fn cleanup_file(self) {
let _ = std::fs::remove_file(self.0);
}
}

impl Borrow<PathBuf> for ShmFile {
fn borrow(&self) -> &PathBuf {
&self.0
}
}

impl From<PathBuf> for ShmFile {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

pub(crate) fn cleanup_orphaned_segments() {
if let Err(e) = cleanup_orphaned_segments_inner() {
tracing::error!("Error performing orphaned SHM segments cleanup: {e}")
}
}

fn enumerate_shm_files() -> ZResult<HashSet<ShmFile>> {
let shm_files = fs::read_dir("/dev/shm")?;
Ok(shm_files
.filter_map(Result::ok)
.filter_map(|f| {
if let Some(ext) = f.path().extension() {
if ext == "zenoh" {
return Some(std::convert::Into::<ShmFile>::into(f.path()));
}
}
None
})
.collect())
}

fn enumerate_proc_dirs() -> ZResult<HashSet<ProcFdDir>> {
let proc_dirs = fs::read_dir("/proc")?;
yellowhatter marked this conversation as resolved.
Show resolved Hide resolved
Ok(proc_dirs
.filter_map(Result::ok)
.map(|f| std::convert::Into::<ProcFdDir>::into(f.path().join("fd")))
.collect())
}

fn enumerate_proc_fds() -> ZResult<HashSet<FdFile>> {
let mut fds = HashSet::default();
let dirs = enumerate_proc_dirs()?;
for dir in dirs {
if let Ok(dir_fds) = dir.enumerate_fds() {
fds.extend(dir_fds);
}
}
Ok(fds)
}

fn cleanup_orphaned_segments_inner() -> ZResult<()> {
let fd_map = enumerate_proc_fds()?;
let mut shm_map = enumerate_shm_files()?;

for fd_file in fd_map {
if let Ok(resolved_link) = fd_file.0.read_link() {
shm_map.remove(&resolved_link);
}
}

for shm_file_to_cleanup in shm_map {
shm_file_to_cleanup.cleanup_file();
}

Ok(())
}
}
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/posix_shm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

pub mod array;
tested_crate_module!(segment);
pub(crate) mod cleanup;
2 changes: 1 addition & 1 deletion commons/zenoh-shm/src/posix_shm/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
fn os_id(id: ID, id_prefix: &str) -> String {
let os_id_str = format!("{id_prefix}_{id}");
let crc_os_id_str = ECMA.checksum(os_id_str.as_bytes());
format!("{:x}", crc_os_id_str)
format!("{:x}.zenoh", crc_os_id_str)
}

pub fn as_ptr(&self) -> *mut u8 {
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ pub mod shm {
zshm::{zshm, ZShm},
zshmmut::{zshmmut, ZShmMut},
},
cleanup::cleanup_orphaned_shm_segments,
client::{shm_client::ShmClient, shm_segment::ShmSegment},
client_storage::{ShmClientStorage, GLOBAL_CLIENT_STORAGE},
common::types::{ChunkID, ProtocolID, SegmentID},
Expand Down