Skip to content

Commit

Permalink
feat: Allow reusing same shared IpcSharedMem for transfers (#356)
Browse files Browse the repository at this point in the history
* Impl unsafe `deref_mut` on IpcSharedMemory

Signed-off-by: sagudev <[email protected]>

* bench/test

Signed-off-by: sagudev <[email protected]>

---------

Signed-off-by: sagudev <[email protected]>
  • Loading branch information
sagudev authored Nov 10, 2024
1 parent 66453c0 commit aa43418
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 1 deletion.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ harness = false
name = "ipc_receiver_set"
harness = false

[[bench]]
name = "ipc_shared_mem"
harness = false

[features]
default = []
force-inprocess = []
Expand Down
65 changes: 65 additions & 0 deletions benches/ipc_shared_mem.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::time::Instant;

use criterion::{criterion_group, criterion_main, Criterion};
use ipc_channel::ipc::{self, IpcSharedMemory};

#[inline]
fn on_recv<const MUT: bool>(mut ism: IpcSharedMemory) -> IpcSharedMemory {
if MUT {
let data = unsafe { ism.deref_mut() };
for d in data {
*d += 1;
}
ism
} else {
let mut data = ism.to_vec();
for d in &mut data {
*d += 1;
}
IpcSharedMemory::from_bytes(&data)
}
}

fn ping_pong_mut_shared_mem<const MUT: bool, const SIZE: usize, const COUNT: u8>(
criterion: &mut Criterion,
) {
criterion.bench_function(
&format!(
"ping_pong_shared_mem{}_{SIZE}_{COUNT}",
if MUT { "_mut" } else { "" }
),
|bencher| {
bencher.iter_custom(|_| {
let (tx1, rx1) = ipc::channel().unwrap();
let (tx2, rx2) = ipc::channel().unwrap();
let tx = tx1.clone();
let _t1 = std::thread::spawn(move || {
for _i in 0..=COUNT / 2 {
tx2.send(on_recv::<MUT>(rx1.recv().unwrap())).unwrap();
}
});
let t2 = std::thread::spawn(move || {
for _i in 0..COUNT / 2 {
tx1.send(on_recv::<MUT>(rx2.recv().unwrap())).unwrap();
}
rx2.recv().unwrap().to_vec()
});
let start = Instant::now();
tx.send(IpcSharedMemory::from_byte(0, SIZE)).unwrap();
let data = t2.join().unwrap();
let duration = start.elapsed();
assert!(data.iter().all(|d| *d == (COUNT / 2) * 2 + 1));
duration
});
},
);
}

criterion_group!(
benches,
ping_pong_mut_shared_mem<true, {4*1024*1024}, 100>,
ping_pong_mut_shared_mem<false, {4*1024*1024}, 100>,
ping_pong_mut_shared_mem<true, {4*1024*1024}, 125>,
ping_pong_mut_shared_mem<false, {4*1024*1024}, 125>,
);
criterion_main!(benches);
18 changes: 18 additions & 0 deletions src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,24 @@ impl Deref for IpcSharedMemory {
}
}

impl IpcSharedMemory {
/// Returns a mutable reference to the deref of this [`IpcSharedMemory`].
///
/// # Safety
///
/// This is safe if there is only one reader/writer on the data.
/// User can achieve this by not cloning [`IpcSharedMemory`]
/// and serializing/deserializing only once.
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
if let Some(os_shared_memory) = &mut self.os_shared_memory {
os_shared_memory.deref_mut()
} else {
&mut []
}
}
}

impl<'de> Deserialize<'de> for IpcSharedMemory {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
Expand Down
10 changes: 10 additions & 0 deletions src/platform/inprocess/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,16 @@ impl Deref for OsIpcSharedMemory {
}
}

impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
if self.ptr.is_null() {
panic!("attempted to access a consumed `OsIpcSharedMemory`")
}
unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
}
}

impl OsIpcSharedMemory {
pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
let mut v = Arc::new(vec![byte; length]);
Expand Down
10 changes: 10 additions & 0 deletions src/platform/macos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,16 @@ impl Deref for OsIpcSharedMemory {
}
}

impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
if self.ptr.is_null() && self.length > 0 {
panic!("attempted to access a consumed `OsIpcSharedMemory`")
}
unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
}
}

impl OsIpcSharedMemory {
unsafe fn from_raw_parts(ptr: *mut u8, length: usize) -> OsIpcSharedMemory {
OsIpcSharedMemory { ptr, length }
Expand Down
9 changes: 8 additions & 1 deletion src/platform/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::hash::BuildHasherDefault;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, RangeFrom};
use std::ops::{Deref, DerefMut, RangeFrom};
use std::os::fd::RawFd;
use std::ptr;
use std::slice;
Expand Down Expand Up @@ -867,6 +867,13 @@ impl Deref for OsIpcSharedMemory {
}
}

impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
}
}

impl OsIpcSharedMemory {
unsafe fn from_raw_parts(
ptr: *mut u8,
Expand Down
8 changes: 8 additions & 0 deletions src/platform/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1765,6 +1765,14 @@ impl Deref for OsIpcSharedMemory {
}
}

impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
assert!(!self.view_handle.Value.is_null() && self.handle.is_valid());
unsafe { slice::from_raw_parts_mut(self.view_handle.Value as _, self.length) }
}
}

impl OsIpcSharedMemory {
fn new(length: usize) -> Result<OsIpcSharedMemory, WinError> {
unsafe {
Expand Down

0 comments on commit aa43418

Please sign in to comment.