From 998073365b9de0b3bf31634894e5b387b05df0c0 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 19 Mar 2024 14:20:40 +0100 Subject: [PATCH] Implement vsock link support --- Cargo.lock | 46 ++- Cargo.toml | 2 + DEFAULT_CONFIG.json5 | 2 +- io/zenoh-link/Cargo.toml | 2 + io/zenoh-link/src/lib.rs | 13 + io/zenoh-links/zenoh-link-vsock/Cargo.toml | 42 ++ io/zenoh-links/zenoh-link-vsock/src/lib.rs | 52 +++ .../zenoh-link-vsock/src/unicast.rs | 374 ++++++++++++++++++ io/zenoh-transport/Cargo.toml | 1 + io/zenoh-transport/tests/endpoints.rs | 14 + .../tests/transport_whitelist.rs | 14 + .../tests/unicast_intermittent.rs | 8 + io/zenoh-transport/tests/unicast_multilink.rs | 9 + io/zenoh-transport/tests/unicast_openclose.rs | 8 + zenoh/Cargo.toml | 1 + zenoh/src/lib.rs | 1 + zenohd/src/main.rs | 2 + 17 files changed, 588 insertions(+), 3 deletions(-) create mode 100644 io/zenoh-links/zenoh-link-vsock/Cargo.toml create mode 100644 io/zenoh-links/zenoh-link-vsock/src/lib.rs create mode 100644 io/zenoh-links/zenoh-link-vsock/src/unicast.rs diff --git a/Cargo.lock b/Cargo.lock index ba84d51ee8..42a1783a9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2074,6 +2074,7 @@ dependencies = [ "bitflags 2.4.0", "cfg-if 1.0.0", "libc", + "memoffset 0.9.0", ] [[package]] @@ -3819,6 +3820,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "tokio-vsock" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e336ac4b36df625d5429a735dd5847732fe5f62010e3ce0c50f3705d44730f8" +dependencies = [ + "bytes", + "futures", + "libc", + "tokio", + "vsock", +] + [[package]] name = "tower-service" version = "0.3.2" @@ -4100,6 +4114,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vsock" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfb6e7a74830912f1f4a7655227c9ded1ea4e9136676311fedf54bedb412f35" +dependencies = [ + "libc", + "nix 0.27.1", +] + [[package]] name = "waker-fn" version = "1.1.0" @@ -4666,6 +4690,7 @@ dependencies = [ "zenoh-link-udp", "zenoh-link-unixpipe", "zenoh-link-unixsock_stream", + "zenoh-link-vsock", "zenoh-link-ws", "zenoh-protocol", "zenoh-result", @@ -4679,13 +4704,11 @@ dependencies = [ "flume", "futures", "log", - "lz4_flex", "rustls 0.22.2", "rustls-webpki 0.102.2", "serde", "tokio", "tokio-util", - "typenum", "zenoh-buffers", "zenoh-codec", "zenoh-core", @@ -4848,6 +4871,25 @@ dependencies = [ "zenoh-sync", ] +[[package]] +name = "zenoh-link-vsock" +version = "0.11.0-dev" +dependencies = [ + "async-trait", + "libc", + "log", + "tokio", + "tokio-util", + "tokio-vsock", + "zenoh-core", + "zenoh-link-commons", + "zenoh-protocol", + "zenoh-result", + "zenoh-runtime", + "zenoh-sync", + "zenoh-util", +] + [[package]] name = "zenoh-link-ws" version = "0.11.0-dev" diff --git a/Cargo.toml b/Cargo.toml index d9be6e3685..b7f73b4b5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ members = [ "io/zenoh-links/zenoh-link-unixsock_stream/", "io/zenoh-links/zenoh-link-ws/", "io/zenoh-links/zenoh-link-unixpipe/", + "io/zenoh-links/zenoh-link-vsock/", "io/zenoh-transport", "plugins/zenoh-backend-example", "plugins/zenoh-plugin-example", @@ -190,6 +191,7 @@ zenoh-link-udp = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-udp zenoh-link-ws = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-ws" } zenoh-link-unixpipe = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-unixpipe" } zenoh-link-serial = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-serial" } +zenoh-link-vsock = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-vsock" } zenoh-link = { version = "0.11.0-dev", path = "io/zenoh-link" } zenoh-link-commons = { version = "0.11.0-dev", path = "io/zenoh-link-commons" } zenoh = { version = "0.11.0-dev", path = "zenoh", default-features = false } diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 66352fe141..ee255e5db7 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -177,7 +177,7 @@ link: { /// An optional whitelist of protocols to be used for accepting and opening sessions. /// If not configured, all the supported protocols are automatically whitelisted. - /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream"] + /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] /// For example, to only enable "tls" and "quic": // protocols: ["tls", "quic"], /// Configure the zenoh TX parameters of a link diff --git a/io/zenoh-link/Cargo.toml b/io/zenoh-link/Cargo.toml index 25d30903da..7a9772391f 100644 --- a/io/zenoh-link/Cargo.toml +++ b/io/zenoh-link/Cargo.toml @@ -33,6 +33,7 @@ transport_unixsock-stream = ["zenoh-link-unixsock_stream"] transport_ws = ["zenoh-link-ws"] transport_serial = ["zenoh-link-serial"] transport_unixpipe = ["zenoh-link-unixpipe", "zenoh-link-unixpipe/transport_unixpipe"] +transport_vsock = ["zenoh-link-vsock"] [dependencies] async-trait = { workspace = true } @@ -47,5 +48,6 @@ zenoh-link-udp = { workspace = true, optional = true } zenoh-link-unixsock_stream = { workspace = true, optional = true } zenoh-link-ws = { workspace = true, optional = true } zenoh-link-unixpipe = { workspace = true, optional = true } +zenoh-link-vsock = { workspace = true, optional = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } diff --git a/io/zenoh-link/src/lib.rs b/io/zenoh-link/src/lib.rs index 0e3e5879a8..21f26ecf1b 100644 --- a/io/zenoh-link/src/lib.rs +++ b/io/zenoh-link/src/lib.rs @@ -72,6 +72,11 @@ use zenoh_link_unixpipe::{ LinkManagerUnicastPipe, UnixPipeConfigurator, UnixPipeLocatorInspector, UNIXPIPE_LOCATOR_PREFIX, }; +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +pub use zenoh_link_vsock as vsock; +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +use zenoh_link_vsock::{LinkManagerUnicastVsock, VsockLocatorInspector, VSOCK_LOCATOR_PREFIX}; + pub use zenoh_link_commons::*; pub use zenoh_protocol::core::{EndPoint, Locator}; @@ -92,6 +97,8 @@ pub const PROTOCOLS: &[&str] = &[ serial::SERIAL_LOCATOR_PREFIX, #[cfg(feature = "transport_unixpipe")] unixpipe::UNIXPIPE_LOCATOR_PREFIX, + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + vsock::VSOCK_LOCATOR_PREFIX, ]; #[derive(Default, Clone)] @@ -112,6 +119,8 @@ pub struct LocatorInspector { serial_inspector: SerialLocatorInspector, #[cfg(feature = "transport_unixpipe")] unixpipe_inspector: UnixPipeLocatorInspector, + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + vsock_inspector: VsockLocatorInspector, } impl LocatorInspector { pub async fn is_multicast(&self, locator: &Locator) -> ZResult { @@ -137,6 +146,8 @@ impl LocatorInspector { SERIAL_LOCATOR_PREFIX => self.serial_inspector.is_multicast(locator).await, #[cfg(feature = "transport_unixpipe")] UNIXPIPE_LOCATOR_PREFIX => self.unixpipe_inspector.is_multicast(locator).await, + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + VSOCK_LOCATOR_PREFIX => self.vsock_inspector.is_multicast(locator).await, _ => bail!("Unsupported protocol: {}.", protocol), } } @@ -226,6 +237,8 @@ impl LinkManagerBuilderUnicast { UNIXPIPE_LOCATOR_PREFIX => { Ok(std::sync::Arc::new(LinkManagerUnicastPipe::new(_manager))) } + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + VSOCK_LOCATOR_PREFIX => Ok(std::sync::Arc::new(LinkManagerUnicastVsock::new(_manager))), _ => bail!("Unicast not supported for {} protocol", protocol), } } diff --git a/io/zenoh-links/zenoh-link-vsock/Cargo.toml b/io/zenoh-links/zenoh-link-vsock/Cargo.toml new file mode 100644 index 0000000000..c1bde48515 --- /dev/null +++ b/io/zenoh-links/zenoh-link-vsock/Cargo.toml @@ -0,0 +1,42 @@ +# +# Copyright (c) 2024 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +[package] +rust-version = { workspace = true } +name = "zenoh-link-vsock" +version = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +categories = { workspace = true } +description = "Internal crate for zenoh." +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = { workspace = true } +tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] } +tokio-util = { workspace = true, features = ["rt"] } +log = { workspace = true } +libc = { workspace = true } +zenoh-core = { workspace = true } +zenoh-link-commons = { workspace = true } +zenoh-protocol = { workspace = true } +zenoh-result = { workspace = true } +zenoh-sync = { workspace = true } +zenoh-util = { workspace = true } +zenoh-runtime = { workspace = true } + +[target.'cfg(target_os = "linux")'.dependencies] +tokio-vsock = "0.5.0" diff --git a/io/zenoh-links/zenoh-link-vsock/src/lib.rs b/io/zenoh-links/zenoh-link-vsock/src/lib.rs new file mode 100644 index 0000000000..875622b6aa --- /dev/null +++ b/io/zenoh-links/zenoh-link-vsock/src/lib.rs @@ -0,0 +1,52 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This crate is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) +use async_trait::async_trait; +use zenoh_core::zconfigurable; +use zenoh_link_commons::LocatorInspector; +use zenoh_protocol::core::Locator; +use zenoh_result::ZResult; + +#[cfg(target_os = "linux")] +mod unicast; +#[cfg(target_os = "linux")] +pub use unicast::*; + +pub const VSOCK_LOCATOR_PREFIX: &str = "vsock"; + +#[derive(Default, Clone, Copy)] +pub struct VsockLocatorInspector; +#[async_trait] +impl LocatorInspector for VsockLocatorInspector { + fn protocol(&self) -> &str { + VSOCK_LOCATOR_PREFIX + } + + async fn is_multicast(&self, _locator: &Locator) -> ZResult { + Ok(false) + } +} + +zconfigurable! { + // Default MTU in bytes. + static ref VSOCK_DEFAULT_MTU: u16 = u16::MAX; + // Amount of time in microseconds to throttle the accept loop upon an error. + // Default set to 100 ms. + static ref VSOCK_ACCEPT_THROTTLE_TIME: u64 = 100_000; +} diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs new file mode 100644 index 0000000000..ced7b9dc15 --- /dev/null +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -0,0 +1,374 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use async_trait::async_trait; +use libc::VMADDR_PORT_ANY; +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::RwLock as AsyncRwLock; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use zenoh_core::{zasyncread, zasyncwrite}; +use zenoh_link_commons::{ + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, +}; +use zenoh_protocol::core::endpoint::Address; +use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_result::{bail, zerror, ZResult}; + +use super::{VSOCK_ACCEPT_THROTTLE_TIME, VSOCK_DEFAULT_MTU, VSOCK_LOCATOR_PREFIX}; +use tokio_vsock::{ + VsockAddr, VsockListener, VsockStream, VMADDR_CID_ANY, VMADDR_CID_HOST, VMADDR_CID_HYPERVISOR, + VMADDR_CID_LOCAL, +}; + +pub const VSOCK_VMADDR_CID_ANY: &str = "VMADDR_CID_ANY"; +pub const VSOCK_VMADDR_CID_HYPERVISOR: &str = "VMADDR_CID_HYPERVISOR"; +pub const VSOCK_VMADDR_CID_LOCAL: &str = "VMADDR_CID_LOCAL"; +pub const VSOCK_VMADDR_CID_HOST: &str = "VMADDR_CID_HOST"; + +pub const VSOCK_VMADDR_PORT_ANY: &str = "VMADDR_PORT_ANY"; + +pub fn get_vsock_addr(address: Address<'_>) -> ZResult { + let parts: Vec<&str> = address.as_str().split(':').collect(); + + if parts.len() != 2 { + bail!("Incorrect vsock address: {:?}", address); + } + + let cid = match parts[0].to_uppercase().as_str() { + VSOCK_VMADDR_CID_HYPERVISOR => VMADDR_CID_HYPERVISOR, + VSOCK_VMADDR_CID_HOST => VMADDR_CID_HOST, + VSOCK_VMADDR_CID_LOCAL => VMADDR_CID_LOCAL, + VSOCK_VMADDR_CID_ANY => VMADDR_CID_ANY, + "-1" => VMADDR_CID_ANY, + _ => { + if let Ok(cid) = parts[0].parse::() { + cid + } else { + bail!("Incorrect vsock cid: {:?}", parts[0]); + } + } + }; + + let port = match parts[1].to_uppercase().as_str() { + VSOCK_VMADDR_PORT_ANY => VMADDR_PORT_ANY, + "-1" => VMADDR_PORT_ANY, + _ => { + if let Ok(cid) = parts[1].parse::() { + cid + } else { + bail!("Incorrect vsock port: {:?}", parts[1]); + } + } + }; + + Ok(VsockAddr::new(cid, port)) +} + +pub struct LinkUnicastVsock { + // The underlying socket as returned from the async-std library + socket: UnsafeCell, + // The source socket address of this link (address used on the local host) + src_addr: VsockAddr, + src_locator: Locator, + // The destination socket address of this link (address used on the remote host) + dst_addr: VsockAddr, + dst_locator: Locator, +} + +unsafe impl Sync for LinkUnicastVsock {} + +impl LinkUnicastVsock { + fn new(socket: VsockStream, src_addr: VsockAddr, dst_addr: VsockAddr) -> LinkUnicastVsock { + // Build the vsock object + LinkUnicastVsock { + socket: UnsafeCell::new(socket), + src_addr, + src_locator: Locator::new(VSOCK_LOCATOR_PREFIX, src_addr.to_string(), "").unwrap(), + dst_addr, + dst_locator: Locator::new(VSOCK_LOCATOR_PREFIX, dst_addr.to_string(), "").unwrap(), + } + } + #[allow(clippy::mut_from_ref)] + fn get_mut_socket(&self) -> &mut VsockStream { + unsafe { &mut *self.socket.get() } + } +} + +#[async_trait] +impl LinkUnicastTrait for LinkUnicastVsock { + async fn close(&self) -> ZResult<()> { + log::trace!("Closing vsock link: {}", self); + self.get_mut_socket().shutdown().await.map_err(|e| { + let e = zerror!("vsock link shutdown {}: {:?}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn write(&self, buffer: &[u8]) -> ZResult { + self.get_mut_socket().write(buffer).await.map_err(|e| { + let e = zerror!("Write error on vsock link {}: {}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn write_all(&self, buffer: &[u8]) -> ZResult<()> { + self.get_mut_socket().write_all(buffer).await.map_err(|e| { + let e = zerror!("Write error on vsock link {}: {}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn read(&self, buffer: &mut [u8]) -> ZResult { + self.get_mut_socket().read(buffer).await.map_err(|e| { + let e = zerror!("Read error on vsock link {}: {}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn read_exact(&self, buffer: &mut [u8]) -> ZResult<()> { + let _ = self + .get_mut_socket() + .read_exact(buffer) + .await + .map_err(|e| { + let e = zerror!("Read error on vsock link {}: {}", self, e); + log::trace!("{}", e); + e + })?; + Ok(()) + } + + #[inline(always)] + fn get_src(&self) -> &Locator { + &self.src_locator + } + + #[inline(always)] + fn get_dst(&self) -> &Locator { + &self.dst_locator + } + + #[inline(always)] + fn get_mtu(&self) -> u16 { + *VSOCK_DEFAULT_MTU + } + + #[inline(always)] + fn get_interface_names(&self) -> Vec { + vec!["vsock".to_string()] + } + + #[inline(always)] + fn is_reliable(&self) -> bool { + true + } + + #[inline(always)] + fn is_streamed(&self) -> bool { + true + } +} + +impl fmt::Display for LinkUnicastVsock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} => {}", self.src_addr, self.dst_addr)?; + Ok(()) + } +} + +impl fmt::Debug for LinkUnicastVsock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Vsock") + .field("src", &self.src_addr) + .field("dst", &self.dst_addr) + .finish() + } +} + +struct ListenerUnicastVsock { + endpoint: EndPoint, + token: CancellationToken, + handle: JoinHandle>, +} + +impl ListenerUnicastVsock { + fn new(endpoint: EndPoint, token: CancellationToken, handle: JoinHandle>) -> Self { + Self { + endpoint, + token, + handle, + } + } + + async fn stop(&self) { + self.token.cancel(); + } +} + +pub struct LinkManagerUnicastVsock { + manager: NewLinkChannelSender, + listeners: Arc>>, +} + +impl LinkManagerUnicastVsock { + pub fn new(manager: NewLinkChannelSender) -> Self { + Self { + manager, + listeners: Arc::new(AsyncRwLock::new(HashMap::new())), + } + } +} + +#[async_trait] +impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { + async fn new_link(&self, endpoint: EndPoint) -> ZResult { + let addr = get_vsock_addr(endpoint.address())?; + if let Ok(stream) = VsockStream::connect(addr).await { + let local_addr = stream.local_addr()?; + let peer_addr = stream.peer_addr()?; + let link = Arc::new(LinkUnicastVsock::new(stream, local_addr, peer_addr)); + return Ok(LinkUnicast(link)); + } + + bail!("Can not create a new vsock link bound to {}", endpoint) + } + + async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { + let addr = get_vsock_addr(endpoint.address())?; + if let Ok(listener) = VsockListener::bind(addr) { + let local_addr = listener.local_addr()?; + // Update the endpoint locator address + endpoint = EndPoint::new( + endpoint.protocol(), + &format!("{local_addr}"), + endpoint.metadata(), + endpoint.config(), + )?; + let token = CancellationToken::new(); + let c_token = token.clone(); + + let c_manager = self.manager.clone(); + + let locator = endpoint.to_locator(); + + let mut listeners = zasyncwrite!(self.listeners); + let c_listeners = self.listeners.clone(); + let c_addr = addr; + let task = async move { + // Wait for the accept loop to terminate + let res = accept_task(listener, c_token, c_manager).await; + zasyncwrite!(c_listeners).remove(&c_addr); + res + }; + let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); + + let listener = ListenerUnicastVsock::new(endpoint, token, handle); + // Update the list of active listeners on the manager + listeners.insert(addr, listener); + return Ok(locator); + } + + bail!("Can not create a new vsock listener bound to {}", endpoint) + } + + async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { + let addr = get_vsock_addr(endpoint.address())?; + + let listener = zasyncwrite!(self.listeners).remove(&addr).ok_or_else(|| { + zerror!( + "Can not delete the listener because it has not been found: {}", + addr + ) + })?; + + // Send the stop signal + listener.stop().await; + listener.handle.await? + } + + async fn get_listeners(&self) -> Vec { + zasyncread!(self.listeners) + .values() + .map(|x| x.endpoint.clone()) + .collect() + } + + async fn get_locators(&self) -> Vec { + zasyncread!(self.listeners) + .values() + .map(|x| x.endpoint.to_locator()) + .collect() + } +} + +async fn accept_task( + mut socket: VsockListener, + token: CancellationToken, + manager: NewLinkChannelSender, +) -> ZResult<()> { + async fn accept(socket: &mut VsockListener) -> ZResult<(VsockStream, VsockAddr)> { + let res = socket.accept().await.map_err(|e| zerror!(e))?; + Ok(res) + } + + let src_addr = socket.local_addr().map_err(|e| { + let e = zerror!("Can not accept vsock connections: {}", e); + log::warn!("{}", e); + e + })?; + + log::trace!("Ready to accept vsock connections on: {:?}", src_addr); + loop { + tokio::select! { + _ = token.cancelled() => break, + res = accept(&mut socket) => { + match res { + Ok((stream, dst_addr)) => { + log::debug!("Accepted vsock connection on {:?}: {:?}", src_addr, dst_addr); + // Create the new link object + let link = Arc::new(LinkUnicastVsock::new(stream, src_addr, dst_addr)); + + // Communicate the new link to the initial transport manager + if let Err(e) = manager.send_async(LinkUnicast(link)).await { + log::error!("{}-{}: {}", file!(), line!(), e) + } + }, + Err(e) => { + log::warn!("{}. Hint: increase the system open file limit.", e); + // Throttle the accept loop upon an error + // NOTE: This might be due to various factors. However, the most common case is that + // the process has reached the maximum number of open files in the system. On + // Linux systems this limit can be changed by using the "ulimit" command line + // tool. In case of systemd-based systems, this can be changed by using the + // "sysctl" command line tool. + tokio::time::sleep(Duration::from_micros(*VSOCK_ACCEPT_THROTTLE_TIME)).await; + } + + } + } + }; + } + + Ok(()) +} diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 4cb51fc504..6f18f7cc5c 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -43,6 +43,7 @@ transport_ws = ["zenoh-link/transport_ws"] transport_serial = ["zenoh-link/transport_serial"] transport_compression = [] transport_unixpipe = ["zenoh-link/transport_unixpipe"] +transport_vsock= ["zenoh-link/transport_vsock"] stats = ["zenoh-protocol/stats"] test = [] unstable = [] diff --git a/io/zenoh-transport/tests/endpoints.rs b/io/zenoh-transport/tests/endpoints.rs index 2f4335ca31..13a605a588 100644 --- a/io/zenoh-transport/tests/endpoints.rs +++ b/io/zenoh-transport/tests/endpoints.rs @@ -408,3 +408,17 @@ AXVFFIgCSluyrolaD6CWD9MqOex4YOfJR2bNxI7lFvuK4AwjyUJzT1U1HXib17mM let endpoints = vec![endpoint]; run(&endpoints).await; } + +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn endpoint_vsock() { + let _ = env_logger::try_init(); + // Define the locators + let endpoints: Vec = vec![ + "vsock/-1:1234".parse().unwrap(), + "vsock/VMADDR_CID_ANY:VMADDR_PORT_ANY".parse().unwrap(), + "vsock/VMADDR_CID_LOCAL:2345".parse().unwrap(), + "vsock/VMADDR_CID_LOCAL:VMADDR_PORT_ANY".parse().unwrap(), + ]; + run(&endpoints).await; +} diff --git a/io/zenoh-transport/tests/transport_whitelist.rs b/io/zenoh-transport/tests/transport_whitelist.rs index da7ec67703..ccc74e679e 100644 --- a/io/zenoh-transport/tests/transport_whitelist.rs +++ b/io/zenoh-transport/tests/transport_whitelist.rs @@ -142,3 +142,17 @@ async fn transport_whitelist_unixpipe() { // Run run(&endpoints).await; } + +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_whitelist_vsock() { + let _ = env_logger::try_init(); + + // Define the locators + let endpoints: Vec = vec![ + "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(), + "vsock/1:17001".parse().unwrap(), + ]; + // Run + run(&endpoints).await; +} diff --git a/io/zenoh-transport/tests/unicast_intermittent.rs b/io/zenoh-transport/tests/unicast_intermittent.rs index 04711e66ec..6d9f889d8c 100644 --- a/io/zenoh-transport/tests/unicast_intermittent.rs +++ b/io/zenoh-transport/tests/unicast_intermittent.rs @@ -464,3 +464,11 @@ async fn transport_unixpipe_intermittent_for_lowlatency_transport() { .unwrap(); lowlatency_transport_intermittent(&endpoint).await; } + +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_vsock_intermittent() { + let _ = env_logger::try_init(); + let endpoint: EndPoint = "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(); + universal_transport_intermittent(&endpoint).await; +} diff --git a/io/zenoh-transport/tests/unicast_multilink.rs b/io/zenoh-transport/tests/unicast_multilink.rs index 2fe73853b9..5e4499be2a 100644 --- a/io/zenoh-transport/tests/unicast_multilink.rs +++ b/io/zenoh-transport/tests/unicast_multilink.rs @@ -722,4 +722,13 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== multilink_transport(&endpoint).await; } + + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn multilink_vsock_only() { + let _ = env_logger::try_init(); + + let endpoint: EndPoint = "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(); + multilink_transport(&endpoint).await; + } } diff --git a/io/zenoh-transport/tests/unicast_openclose.rs b/io/zenoh-transport/tests/unicast_openclose.rs index dfa690c889..56e4a1b140 100644 --- a/io/zenoh-transport/tests/unicast_openclose.rs +++ b/io/zenoh-transport/tests/unicast_openclose.rs @@ -825,3 +825,11 @@ async fn openclose_udp_only_listen_with_interface_restriction() { // should not connect to local interface and external address openclose_transport(&listen_endpoint, &connect_endpoint, false).await; } + +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn openclose_vsock() { + let _ = env_logger::try_init(); + let endpoint: EndPoint = "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(); + openclose_lowlatency_transport(&endpoint).await; +} diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 0e28905253..955e362bc7 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -48,6 +48,7 @@ transport_tls = ["zenoh-transport/transport_tls"] transport_udp = ["zenoh-transport/transport_udp"] transport_unixsock-stream = ["zenoh-transport/transport_unixsock-stream"] transport_ws = ["zenoh-transport/transport_ws"] +transport_vsock= ["zenoh-transport/transport_vsock"] unstable = [] default = [ "auth_pubkey", diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 3ee115e293..d8820f7ad1 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -116,6 +116,7 @@ pub const FEATURES: &str = concat_enabled_features!( "transport_udp", "transport_unixsock-stream", "transport_ws", + "transport_vsock", "unstable", "default" ] diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index 7204a83612..d7cb9a52a9 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -386,6 +386,7 @@ fn test_default_features() { " zenoh/transport_udp", " zenoh/transport_unixsock-stream", " zenoh/transport_ws", + // " zenoh/transport_vsock", " zenoh/unstable", " zenoh/default", ) @@ -412,6 +413,7 @@ fn test_no_default_features() { // " zenoh/transport_udp", // " zenoh/transport_unixsock-stream", // " zenoh/transport_ws", + // " zenoh/transport_vsock", " zenoh/unstable", // " zenoh/default", )