Skip to content

Commit

Permalink
Porting on zenoh-link-unixsocket_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Jan 5, 2024
1 parent 84356a4 commit 2ecd8b5
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 97 deletions.
16 changes: 9 additions & 7 deletions io/zenoh-links/zenoh-link-unixsock_stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ version = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
authors = [
"kydos <[email protected]>",
"Julien Enoch <[email protected]>",
"Olivier Hécart <[email protected]>",
"Luca Cominardi <[email protected]>",
"Pierre Avital <[email protected]>",
"Gabriele Baldoni <[email protected]>"
"kydos <[email protected]>",
"Julien Enoch <[email protected]>",
"Olivier Hécart <[email protected]>",
"Luca Cominardi <[email protected]>",
"Pierre Avital <[email protected]>",
"Gabriele Baldoni <[email protected]>"
]
edition = { workspace = true }
license = { workspace = true }
Expand All @@ -32,14 +32,16 @@ description = "Internal crate for zenoh."
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
nix = { workspace = true }
tokio = { workspace = true, features = ["io-std", "macros", "net", "rt-multi-thread", "time"] }
tokio-util = { workspace = true }
uuid = { workspace = true, features = ["default"] }
zenoh-core = { workspace = true }
zenoh-link-commons = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-runtime = { workspace = true }
zenoh-sync = { workspace = true }
181 changes: 91 additions & 90 deletions io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,92 +12,98 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use super::UNIXSOCKSTREAM_ACCEPT_THROTTLE_TIME;
use async_std::os::unix::net::{UnixListener, UnixStream};
use async_std::path::PathBuf;
use async_std::prelude::FutureExt;
use async_std::task;
use async_std::task::JoinHandle;
use async_trait::async_trait;
use futures::io::AsyncReadExt;
use futures::io::AsyncWriteExt;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::fmt;
use std::fs::remove_file;
use std::net::Shutdown;
use std::os::unix::io::RawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use uuid::Uuid;
use zenoh_core::{zread, zwrite};
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::Signal;

use super::{get_unix_path_as_string, UNIXSOCKSTREAM_DEFAULT_MTU, UNIXSOCKSTREAM_LOCATOR_PREFIX};

pub struct LinkUnicastUnixSocketStream {
// The underlying socket as returned from the async-std library
socket: UnixStream,
// The underlying socket as returned from the tokio library
socket: UnsafeCell<UnixStream>,
// The Unix domain socket source path
src_locator: Locator,
// The Unix domain socker destination path (random UUIDv4)
dst_locator: Locator,
}

unsafe impl Sync for LinkUnicastUnixSocketStream {}

impl LinkUnicastUnixSocketStream {
fn new(socket: UnixStream, src_path: &str, dst_path: &str) -> LinkUnicastUnixSocketStream {
LinkUnicastUnixSocketStream {
socket,
socket: UnsafeCell::new(socket),
src_locator: Locator::new(UNIXSOCKSTREAM_LOCATOR_PREFIX, src_path, "").unwrap(),
dst_locator: Locator::new(UNIXSOCKSTREAM_LOCATOR_PREFIX, dst_path, "").unwrap(),
}
}

#[allow(clippy::mut_from_ref)]
fn get_mut_socket(&self) -> &mut UnixStream {
unsafe { &mut *self.socket.get() }
}
}

#[async_trait]
impl LinkUnicastTrait for LinkUnicastUnixSocketStream {
async fn close(&self) -> ZResult<()> {
log::trace!("Closing UnixSocketStream link: {}", self);
// Close the underlying UnixSocketStream socket
let res = self.socket.shutdown(Shutdown::Both);
let res = self.get_mut_socket().shutdown().await;
log::trace!("UnixSocketStream link shutdown {}: {:?}", self, res);
res.map_err(|e| zerror!(e).into())
}

async fn write(&self, buffer: &[u8]) -> ZResult<usize> {
(&self.socket).write(buffer).await.map_err(|e| {
self.get_mut_socket().write(buffer).await.map_err(|e| {
let e = zerror!("Write error on UnixSocketStream link {}: {}", self, e);
log::trace!("{}", e);
e.into()
})
}

async fn write_all(&self, buffer: &[u8]) -> ZResult<()> {
(&self.socket).write_all(buffer).await.map_err(|e| {
self.get_mut_socket().write_all(buffer).await.map_err(|e| {
let e = zerror!("Write error on UnixSocketStream link {}: {}", self, e);
log::trace!("{}", e);
e.into()
})
}

async fn read(&self, buffer: &mut [u8]) -> ZResult<usize> {
(&self.socket).read(buffer).await.map_err(|e| {
self.get_mut_socket().read(buffer).await.map_err(|e| {
let e = zerror!("Read error on UnixSocketStream link {}: {}", self, e);
log::trace!("{}", e);
e.into()
})
}

async fn read_exact(&self, buffer: &mut [u8]) -> ZResult<()> {
(&self.socket).read_exact(buffer).await.map_err(|e| {
let e = zerror!("Read error on UnixSocketStream link {}: {}", self, e);
log::trace!("{}", e);
e.into()
})
self.get_mut_socket()
.read_exact(buffer)
.await
.map(|_len| ())
.map_err(|e| {
let e = zerror!("Read error on UnixSocketStream link {}: {}", self, e);
log::trace!("{}", e);
e.into()
})
}

#[inline(always)]
Expand Down Expand Up @@ -129,7 +135,7 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream {
impl Drop for LinkUnicastUnixSocketStream {
fn drop(&mut self) {
// Close the underlying UnixSocketStream socket
let _ = self.socket.shutdown(Shutdown::Both);
let _ = self.get_mut_socket().shutdown();
}
}

Expand All @@ -154,28 +160,31 @@ impl fmt::Debug for LinkUnicastUnixSocketStream {
/*************************************/
struct ListenerUnixSocketStream {
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
token: CancellationToken,
tracker: TaskTracker,
lock_fd: RawFd,
}

impl ListenerUnixSocketStream {
fn new(
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
token: CancellationToken,
tracker: TaskTracker,
lock_fd: RawFd,
) -> ListenerUnixSocketStream {
ListenerUnixSocketStream {
endpoint,
active,
signal,
handle,
token,
tracker,
lock_fd,
}
}

async fn stop(&self) {
self.token.cancel();
self.tracker.close();
self.tracker.wait().await;
}
}

pub struct LinkManagerUnicastUnixSocketStream {
Expand Down Expand Up @@ -319,7 +328,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream {
let _ = remove_file(path.clone());

// Bind the Unix socket
let socket = UnixListener::bind(&path).await.map_err(|e| {
let socket = UnixListener::bind(&path).map_err(|e| {
let e = zerror!(
"Can not create a new UnixSocketStream listener on {}: {}",
path,
Expand Down Expand Up @@ -360,24 +369,25 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream {
)?;

// Spawn the accept loop for the listener
let active = Arc::new(AtomicBool::new(true));
let signal = Signal::new();
let token = CancellationToken::new();
let c_token = token.clone();
let mut listeners = zwrite!(self.listeners);

let c_active = active.clone();
let c_signal = signal.clone();
let c_manager = self.manager.clone();
let c_listeners = self.listeners.clone();
let c_path = local_path_str.to_owned();
let handle = task::spawn(async move {

let tracker = TaskTracker::new();
let task = async move {
// Wait for the accept loop to terminate
let res = accept_task(socket, c_active, c_signal, c_manager).await;
let res = accept_task(socket, c_token, c_manager).await;
zwrite!(c_listeners).remove(&c_path);
res
});
};
tracker.spawn_on(task, &zenoh_runtime::ZRuntime::TX);

let locator = endpoint.to_locator();
let listener = ListenerUnixSocketStream::new(endpoint, active, signal, handle, lock_fd);
let listener = ListenerUnixSocketStream::new(endpoint, token, tracker, lock_fd);
listeners.insert(local_path_str.to_owned(), listener);

Ok(locator)
Expand All @@ -397,9 +407,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream {
})?;

// Send the stop signal
listener.active.store(false, Ordering::Release);
listener.signal.trigger();
let res = listener.handle.await;
listener.stop().await;

//Release the lock
let _ = nix::fcntl::flock(listener.lock_fd, nix::fcntl::FlockArg::UnlockNonblock);
Expand All @@ -410,7 +418,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream {
let lock_file_path = format!("{path}.lock");
let tmp = remove_file(lock_file_path);
log::trace!("UnixSocketStream Domain Socket removal result: {:?}", tmp);
res

Ok(())
}

fn get_listeners(&self) -> Vec<EndPoint> {
Expand All @@ -430,23 +439,12 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream {

async fn accept_task(
socket: UnixListener,
active: Arc<AtomicBool>,
signal: Signal,
token: CancellationToken,
manager: NewLinkChannelSender,
) -> ZResult<()> {
enum Action {
Accept(UnixStream),
Stop,
}

async fn accept(socket: &UnixListener) -> ZResult<Action> {
async fn accept(socket: &UnixListener) -> ZResult<UnixStream> {
let (stream, _) = socket.accept().await.map_err(|e| zerror!(e))?;
Ok(Action::Accept(stream))
}

async fn stop(signal: Signal) -> ZResult<Action> {
signal.wait().await;
Ok(Action::Stop)
Ok(stream)
}

let src_addr = socket.local_addr().map_err(|e| {
Expand Down Expand Up @@ -478,38 +476,41 @@ async fn accept_task(
"Ready to accept UnixSocketStream connections on: {}",
src_path
);
while active.load(Ordering::Acquire) {
// Wait for incoming connections
let stream = match accept(&socket).race(stop(signal.clone())).await {
Ok(action) => match action {
Action::Accept(stream) => stream,
Action::Stop => break,
},
Err(e) => {
log::warn!("{}. Hint: increase the system open file limit.", e);
// Throttle the accept loop upon an error
// NOTE: This might be due to various factors. However, the most common case is that
// the process has reached the maximum number of open files in the system. On
// Linux systems this limit can be changed by using the "ulimit" command line
// tool. In case of systemd-based systems, this can be changed by using the
// "sysctl" command line tool.
task::sleep(Duration::from_micros(*UNIXSOCKSTREAM_ACCEPT_THROTTLE_TIME)).await;
continue;
}
};

let dst_path = format!("{}", Uuid::new_v4());

log::debug!("Accepted UnixSocketStream connection on: {:?}", src_addr,);

// Create the new link object
let link = Arc::new(LinkUnicastUnixSocketStream::new(
stream, src_path, &dst_path,
));

// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link)).await {
log::error!("{}-{}: {}", file!(), line!(), e)
loop {
tokio::select! {
_ = token.cancelled() => break,

res = accept(&socket) => {
match res {
Ok(stream) => {
let dst_path = format!("{}", Uuid::new_v4());

log::debug!("Accepted UnixSocketStream connection on: {:?}", src_addr,);

// Create the new link object
let link = Arc::new(LinkUnicastUnixSocketStream::new(
stream, src_path, &dst_path,
));

// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link)).await {
log::error!("{}-{}: {}", file!(), line!(), e)
}

}
Err(e) => {
log::warn!("{}. Hint: increase the system open file limit.", e);
// Throttle the accept loop upon an error
// NOTE: This might be due to various factors. However, the most common case is that
// the process has reached the maximum number of open files in the system. On
// Linux systems this limit can be changed by using the "ulimit" command line
// tool. In case of systemd-based systems, this can be changed by using the
// "sysctl" command line tool.
tokio::time::sleep(Duration::from_micros(*UNIXSOCKSTREAM_ACCEPT_THROTTLE_TIME)).await;
}
}
}
}
}

Expand Down

0 comments on commit 2ecd8b5

Please sign in to comment.