Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix establish failure on shm #1242

Merged
29 changes: 28 additions & 1 deletion commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::num::NonZeroUsize;

use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
Expand Down Expand Up @@ -62,6 +64,18 @@ where
}
}

impl<W> WCodec<NonZeroUsize, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: NonZeroUsize) -> Self::Output {
self.write(&mut *writer, x.get())?;
Ok(())
}
}

impl<W> WCodec<&ShmBufInfo, &mut W> for Zenoh080
where
W: Writer,
Expand All @@ -80,7 +94,7 @@ where

self.write(&mut *writer, data_descriptor)?;
self.write(&mut *writer, shm_protocol)?;
self.write(&mut *writer, data_len)?;
self.write(&mut *writer, *data_len)?;
self.write(&mut *writer, watchdog_descriptor)?;
self.write(&mut *writer, header_descriptor)?;
self.write(&mut *writer, generation)?;
Expand Down Expand Up @@ -138,6 +152,19 @@ where
}
}

impl<R> RCodec<NonZeroUsize, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<NonZeroUsize, Self::Error> {
let size: usize = self.read(&mut *reader)?;
let size = NonZeroUsize::new(size).ok_or(DidntRead)?;
Ok(size)
}
}

impl<R> RCodec<ShmBufInfo, &mut R> for Zenoh080
where
R: Reader,
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-shm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ lockfree = { workspace = true }
stabby = { workspace = true }

[dev-dependencies]
libc = { workspace = true }
libc = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
borrow::Borrow,
cmp,
collections::BinaryHeap,
num::NonZeroUsize,
sync::{
atomic::{AtomicPtr, AtomicUsize, Ordering},
Mutex,
Expand All @@ -31,7 +32,7 @@ use crate::api::{
provider::{
chunk::{AllocatedChunk, ChunkDescriptor},
shm_provider_backend::ShmProviderBackend,
types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError},
types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutError},
},
};

Expand All @@ -45,7 +46,7 @@ const MIN_FREE_CHUNK_SIZE: usize = 1_024;
#[derive(Eq, Copy, Clone, Debug)]
struct Chunk {
offset: ChunkID,
size: usize,
size: NonZeroUsize,
}

impl Ord for Chunk {
Expand Down Expand Up @@ -86,7 +87,7 @@ impl PosixShmProviderBackendBuilder {
self,
size: usize,
alignment: AllocAlignment,
) -> ZResult<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>> {
) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
let layout = MemoryLayout::new(size, alignment)?;
Ok(LayoutedPosixShmProviderBackendBuilder { layout })
}
Expand All @@ -96,7 +97,7 @@ impl PosixShmProviderBackendBuilder {
pub fn with_size(
self,
size: usize,
) -> ZResult<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>> {
) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
let layout = MemoryLayout::new(size, AllocAlignment::default())?;
Ok(LayoutedPosixShmProviderBackendBuilder { layout })
}
Expand Down Expand Up @@ -149,7 +150,7 @@ impl PosixShmProviderBackend {
);

Ok(Self {
available: AtomicUsize::new(layout.size()),
available: AtomicUsize::new(layout.size().get()),
segment,
free_list: Mutex::new(free_list),
alignment: layout.alignment(),
Expand All @@ -163,7 +164,7 @@ impl ShmProviderBackend for PosixShmProviderBackend {

let required_len = layout.size();

if self.available.load(Ordering::Relaxed) < required_len {
if self.available.load(Ordering::Relaxed) < required_len.get() {
tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout);
return Err(ZAllocError::OutOfMemory);
}
Expand All @@ -176,16 +177,20 @@ impl ShmProviderBackend for PosixShmProviderBackend {
Some(mut chunk) if chunk.size >= required_len => {
// NOTE: don't loose any chunks here, as it will lead to memory leak
tracing::trace!("Allocator selected Chunk ({:?})", &chunk);
if chunk.size - required_len >= MIN_FREE_CHUNK_SIZE {
if chunk.size.get() - required_len.get() >= MIN_FREE_CHUNK_SIZE {
let free_chunk = Chunk {
offset: chunk.offset + required_len as ChunkID,
size: chunk.size - required_len,
offset: chunk.offset + required_len.get() as ChunkID,
// SAFETY: this is safe because we always operate on a leftover, which is checked above!
size: unsafe {
NonZeroUsize::new_unchecked(chunk.size.get() - required_len.get())
},
};
tracing::trace!("The allocation will leave a Free Chunk: {:?}", &free_chunk);
guard.push(free_chunk);
chunk.size = required_len;
}
self.available.fetch_sub(chunk.size, Ordering::Relaxed);
self.available
.fetch_sub(chunk.size.get(), Ordering::Relaxed);

let descriptor =
ChunkDescriptor::new(self.segment.segment.id(), chunk.offset, chunk.size);
Expand Down Expand Up @@ -219,16 +224,18 @@ impl ShmProviderBackend for PosixShmProviderBackend {
offset: chunk.chunk,
size: chunk.len,
};
self.available.fetch_add(free_chunk.size, Ordering::Relaxed);
self.available
.fetch_add(free_chunk.size.get(), Ordering::Relaxed);
zlock!(self.free_list).push(free_chunk);
}

fn defragment(&self) -> usize {
fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option<Chunk> {
let end_offset = a.offset as usize + a.size;
let end_offset = a.offset as usize + a.size.get();
if end_offset == b.offset as usize {
Some(Chunk {
size: a.size + b.size,
// SAFETY: this is safe because we operate on non-zero sizes and it will never overflow
size: unsafe { NonZeroUsize::new_unchecked(a.size.get() + b.size.get()) },
offset: a.offset,
})
} else {
Expand Down Expand Up @@ -256,7 +263,7 @@ impl ShmProviderBackend for PosixShmProviderBackend {
match try_merge_adjacent_chunks(&current, &next) {
Some(c) => {
current = c;
largest = largest.max(current.size);
largest = largest.max(current.size.get());
if i == n {
guard.push(current)
}
Expand All @@ -279,7 +286,7 @@ impl ShmProviderBackend for PosixShmProviderBackend {
self.available.load(Ordering::Relaxed)
}

fn layout_for(&self, layout: MemoryLayout) -> ZResult<MemoryLayout> {
fn layout_for(&self, layout: MemoryLayout) -> Result<MemoryLayout, ZLayoutError> {
layout.extend(self.alignment)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::AtomicPtr;
use std::{num::NonZeroUsize, sync::atomic::AtomicPtr};

use zenoh_result::ZResult;

Expand All @@ -32,8 +32,8 @@ pub(crate) struct PosixShmSegment {
}

impl PosixShmSegment {
pub(crate) fn create(alloc_size: usize) -> ZResult<Self> {
let segment = ArrayInSHM::create(alloc_size, POSIX_SHM_SEGMENT_PREFIX)?;
pub(crate) fn create(alloc_size: NonZeroUsize) -> ZResult<Self> {
let segment = ArrayInSHM::create(alloc_size.get(), POSIX_SHM_SEGMENT_PREFIX)?;
Ok(Self { segment })
}

Expand Down
6 changes: 3 additions & 3 deletions commons/zenoh-shm/src/api/provider/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::AtomicPtr;
use std::{num::NonZeroUsize, sync::atomic::AtomicPtr};

use crate::api::common::types::{ChunkID, SegmentID};

Expand All @@ -22,13 +22,13 @@ use crate::api::common::types::{ChunkID, SegmentID};
pub struct ChunkDescriptor {
pub segment: SegmentID,
pub chunk: ChunkID,
pub len: usize,
pub len: NonZeroUsize,
}

impl ChunkDescriptor {
/// Create a new Chunk Descriptor
#[zenoh_macros::unstable_doc]
pub fn new(segment: SegmentID, chunk: ChunkID, len: usize) -> Self {
pub fn new(segment: SegmentID, chunk: ChunkID, len: NonZeroUsize) -> Self {
Self {
segment,
chunk,
Expand Down
20 changes: 12 additions & 8 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
collections::VecDeque,
future::{Future, IntoFuture},
marker::PhantomData,
num::NonZeroUsize,
pin::Pin,
sync::{atomic::Ordering, Arc, Mutex},
time::Duration,
Expand Down Expand Up @@ -159,7 +160,7 @@ where
IDSource: ProtocolIDSource,
Backend: ShmProviderBackend,
{
size: usize,
size: NonZeroUsize,
provider_layout: MemoryLayout,
provider: &'a ShmProvider<IDSource, Backend>,
}
Expand All @@ -185,6 +186,7 @@ where
// Create layout for specified arguments
let layout = MemoryLayout::new(data.size, data.alignment)
.map_err(|_| ZLayoutError::IncorrectLayoutArgs)?;
let size = layout.size();

// Obtain provider's layout for our layout
let provider_layout = data
Expand All @@ -194,7 +196,7 @@ where
.map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;

Ok(Self {
size: data.size,
size,
provider_layout,
provider: data.provider,
})
Expand Down Expand Up @@ -320,7 +322,7 @@ where
let result = InnerPolicy::alloc(layout, provider);
if let Err(ZAllocError::OutOfMemory) = result {
// try to alloc again only if GC managed to reclaim big enough chunk
if provider.garbage_collect() >= layout.size() {
if provider.garbage_collect() >= layout.size().get() {
return AltPolicy::alloc(layout, provider);
}
}
Expand Down Expand Up @@ -352,7 +354,7 @@ where
let result = InnerPolicy::alloc(layout, provider);
if let Err(ZAllocError::NeedDefragment) = result {
// try to alloc again only if big enough chunk was defragmented
if provider.defragment() >= layout.size() {
if provider.defragment() >= layout.size().get() {
return AltPolicy::alloc(layout, provider);
}
}
Expand Down Expand Up @@ -803,6 +805,8 @@ where
/// Remember that chunk's len may be >= len!
#[zenoh_macros::unstable_doc]
pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult<ZShmMut> {
let len = len.try_into()?;

// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;

Expand Down Expand Up @@ -837,7 +841,7 @@ where
if is_free_chunk(maybe_free) {
tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free);
self.backend.free(&maybe_free.descriptor);
largest = largest.max(maybe_free.descriptor.len);
largest = largest.max(maybe_free.descriptor.len.get());
return false;
}
true
Expand Down Expand Up @@ -868,7 +872,7 @@ where
}
}

fn alloc_inner<Policy>(&self, size: usize, layout: &MemoryLayout) -> BufAllocResult
fn alloc_inner<Policy>(&self, size: NonZeroUsize, layout: &MemoryLayout) -> BufAllocResult
where
Policy: AllocPolicy,
{
Expand Down Expand Up @@ -914,7 +918,7 @@ where
fn wrap(
&self,
chunk: AllocatedChunk,
len: usize,
len: NonZeroUsize,
allocated_header: AllocatedHeaderDescriptor,
allocated_watchdog: AllocatedWatchdog,
confirmed_watchdog: ConfirmedDescriptor,
Expand Down Expand Up @@ -971,7 +975,7 @@ where
{
async fn alloc_inner_async<Policy>(
&self,
size: usize,
size: NonZeroUsize,
backend_layout: &MemoryLayout,
) -> BufAllocResult
where
Expand Down
7 changes: 2 additions & 5 deletions commons/zenoh-shm/src/api/provider/shm_provider_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use zenoh_result::ZResult;

use super::{
chunk::ChunkDescriptor,
types::{ChunkAllocResult, MemoryLayout},
types::{ChunkAllocResult, MemoryLayout, ZLayoutError},
};

/// The provider backend trait
Expand Down Expand Up @@ -48,5 +45,5 @@ pub trait ShmProviderBackend {
/// - validate, if the provided layout can be used with this backend
/// - adopt the layout for backend capabilities
#[zenoh_macros::unstable_doc]
fn layout_for(&self, layout: MemoryLayout) -> ZResult<MemoryLayout>;
fn layout_for(&self, layout: MemoryLayout) -> Result<MemoryLayout, ZLayoutError>;
}
Loading
Loading