Skip to content

Commit

Permalink
Merge pull request #1445 from ZettaScaleLabs/fix_memory_leak
Browse files Browse the repository at this point in the history
Alternative SHM segment cleanup
  • Loading branch information
yellowhatter authored Sep 27, 2024
2 parents 21d2d4d + 0d0b32a commit cdb869b
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 4 deletions.
31 changes: 31 additions & 0 deletions commons/zenoh-shm/src/api/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use crate::posix_shm::cleanup::cleanup_orphaned_segments;

/// Linux: Trigger cleanup for orphaned SHM segments
/// If process that created named SHM segment crashes or exits by a signal, the segment persists in the system
/// disregarding if it is used by other Zenoh processes or not. This is the detail of POSIX specification for
/// shared memory that is hard to bypass. To deal with this we developed a cleanup routine that enumerates all
/// segments and tries to find processes that are using it. If no such process found, segment will be removed.
/// There is no ideal signal to trigger this cleanup, so by default, zenoh triggers it in the following moments:
/// - first POSIX SHM segment creation
/// - process exit via exit() call or return from maint function
/// It is OK to additionally trigger this function at any time, but be aware that this can be costly.
///
/// For non-linux platforms this function currently does nothing
#[zenoh_macros::unstable_doc]
pub fn cleanup_orphaned_shm_segments() {
cleanup_orphaned_segments();
}
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//

pub mod buffer;
pub mod cleanup;
pub mod client;
pub mod client_storage;
pub mod common;
Expand Down
16 changes: 13 additions & 3 deletions commons/zenoh-shm/src/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use static_init::dynamic;

use crate::posix_shm::cleanup::cleanup_orphaned_segments;

/// A global cleanup, that is guaranteed to be dropped at normal program exit and that will
/// execute all registered cleanup routines at this moment
#[dynamic(lazy, drop)]
Expand All @@ -26,6 +28,8 @@ pub(crate) struct Cleanup {

impl Cleanup {
fn new() -> Self {
// on first cleanup subsystem touch we perform zenoh segment cleanup
cleanup_orphaned_segments();
Self {
cleanups: Default::default(),
}
Expand All @@ -34,14 +38,20 @@ impl Cleanup {
pub(crate) fn register_cleanup(&self, cleanup_fn: Box<dyn FnOnce() + Send>) {
self.cleanups.push(Some(cleanup_fn));
}
}

impl Drop for Cleanup {
fn drop(&mut self) {
fn cleanup(&self) {
while let Some(cleanup) = self.cleanups.pop() {
if let Some(f) = cleanup {
f();
}
}
}
}

impl Drop for Cleanup {
fn drop(&mut self) {
// on finalization stage we perform zenoh segment cleanup
cleanup_orphaned_segments();
self.cleanup();
}
}
134 changes: 134 additions & 0 deletions commons/zenoh-shm/src/posix_shm/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

pub(crate) use platform::cleanup_orphaned_segments;

#[cfg(not(all(unix, not(target_os = "macos"))))]
mod platform {
pub(crate) fn cleanup_orphaned_segments() {}
}

#[cfg(all(unix, not(target_os = "macos")))]
mod platform {
use std::{borrow::Borrow, collections::HashSet, fs, path::PathBuf};

use zenoh_result::ZResult;

#[derive(PartialEq, Eq, Hash)]
struct ProcFdDir(PathBuf);

impl ProcFdDir {
fn enumerate_fds(&self) -> ZResult<HashSet<FdFile>> {
let fds = self.0.read_dir()?;
let fd_map: HashSet<FdFile> = fds
.filter_map(Result::ok)
.map(|f| std::convert::Into::<FdFile>::into(f.path()))
.collect();
Ok(fd_map)
}
}

impl From<PathBuf> for ProcFdDir {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

#[derive(PartialEq, Eq, Hash)]
struct FdFile(PathBuf);

impl From<PathBuf> for FdFile {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

#[derive(PartialEq, Eq, Hash)]
struct ShmFile(PathBuf);

impl ShmFile {
fn cleanup_file(self) {
let _ = std::fs::remove_file(self.0);
}
}

impl Borrow<PathBuf> for ShmFile {
fn borrow(&self) -> &PathBuf {
&self.0
}
}

impl From<PathBuf> for ShmFile {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

pub(crate) fn cleanup_orphaned_segments() {
if let Err(e) = cleanup_orphaned_segments_inner() {
tracing::error!("Error performing orphaned SHM segments cleanup: {e}")
}
}

fn enumerate_shm_files() -> ZResult<HashSet<ShmFile>> {
let shm_files = fs::read_dir("/dev/shm")?;
Ok(shm_files
.filter_map(Result::ok)
.filter_map(|f| {
if let Some(ext) = f.path().extension() {
if ext == "zenoh" {
return Some(std::convert::Into::<ShmFile>::into(f.path()));
}
}
None
})
.collect())
}

fn enumerate_proc_dirs() -> ZResult<HashSet<ProcFdDir>> {
let proc_dirs = fs::read_dir("/proc")?;
Ok(proc_dirs
.filter_map(Result::ok)
.map(|f| std::convert::Into::<ProcFdDir>::into(f.path().join("fd")))
.collect())
}

fn enumerate_proc_fds() -> ZResult<HashSet<FdFile>> {
let mut fds = HashSet::default();
let dirs = enumerate_proc_dirs()?;
for dir in dirs {
if let Ok(dir_fds) = dir.enumerate_fds() {
fds.extend(dir_fds);
}
}
Ok(fds)
}

fn cleanup_orphaned_segments_inner() -> ZResult<()> {
let fd_map = enumerate_proc_fds()?;
let mut shm_map = enumerate_shm_files()?;

for fd_file in fd_map {
if let Ok(resolved_link) = fd_file.0.read_link() {
shm_map.remove(&resolved_link);
}
}

for shm_file_to_cleanup in shm_map {
shm_file_to_cleanup.cleanup_file();
}

Ok(())
}
}
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/posix_shm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

pub mod array;
tested_crate_module!(segment);
pub(crate) mod cleanup;
2 changes: 1 addition & 1 deletion commons/zenoh-shm/src/posix_shm/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
fn os_id(id: ID, id_prefix: &str) -> String {
let os_id_str = format!("{id_prefix}_{id}");
let crc_os_id_str = ECMA.checksum(os_id_str.as_bytes());
format!("{:x}", crc_os_id_str)
format!("{:x}.zenoh", crc_os_id_str)
}

pub fn as_ptr(&self) -> *mut u8 {
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ pub mod shm {
zshm::{zshm, ZShm},
zshmmut::{zshmmut, ZShmMut},
},
cleanup::cleanup_orphaned_shm_segments,
client::{shm_client::ShmClient, shm_segment::ShmSegment},
client_storage::{ShmClientStorage, GLOBAL_CLIENT_STORAGE},
common::types::{ChunkID, ProtocolID, SegmentID},
Expand Down

0 comments on commit cdb869b

Please sign in to comment.