From 5c6398b7f84c25b2e238afac653b4931d4ad74e8 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 20 Feb 2024 14:47:23 +0100 Subject: [PATCH 1/9] Add test binding --- Cargo.lock | 1 + io/zenoh-links/zenoh-link-tcp/Cargo.toml | 1 + io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 13 +++++++++++++ 3 files changed, 15 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index fa9de7e800..17d5d7a3df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4844,6 +4844,7 @@ version = "0.11.0-dev" dependencies = [ "async-std", "async-trait", + "libc", "log", "zenoh-core", "zenoh-link-commons", diff --git a/io/zenoh-links/zenoh-link-tcp/Cargo.toml b/io/zenoh-links/zenoh-link-tcp/Cargo.toml index 9c4725ff03..9e20f7c7c0 100644 --- a/io/zenoh-links/zenoh-link-tcp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tcp/Cargo.toml @@ -28,6 +28,7 @@ description = "Internal crate for zenoh." async-std = { workspace = true } async-trait = { workspace = true } log = { workspace = true } +libc = { workspace = true } zenoh-core = { workspace = true } zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 551f4c8c97..175d3eb9d7 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -18,6 +18,7 @@ use async_trait::async_trait; use std::convert::TryInto; use std::fmt; use std::net::Shutdown; +use std::os::fd::AsRawFd; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -221,6 +222,18 @@ impl LinkManagerUnicastTcp { .await .map_err(|e| zerror!("{}: {}", addr, e))?; + //let iface = "wg0"; + let iface = "lo"; + unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_BINDTODEVICE, + iface.as_ptr() as *const std::os::raw::c_void, + iface.len() as libc::socklen_t, + ); + } + let local_addr = socket .local_addr() .map_err(|e| zerror!("{}: {}", addr, e))?; From 64002016abd15e95a5e582141d232071d80636db Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 23 Feb 2024 14:32:41 +0100 Subject: [PATCH 2/9] Add binding to the interface --- commons/zenoh-util/src/std_only/net/mod.rs | 25 +++++++++----- io/zenoh-link-commons/src/lib.rs | 3 ++ io/zenoh-link-commons/src/listener.rs | 11 +++++-- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 34 ++++++++++++-------- io/zenoh-links/zenoh-link-ws/src/unicast.rs | 4 +-- 5 files changed, 51 insertions(+), 26 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 67e732d3ee..810e4428f8 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -210,12 +210,19 @@ pub fn get_multicast_interfaces() -> Vec { } } -pub fn get_local_addresses() -> ZResult> { +pub fn get_local_addresses(interface: Option) -> ZResult> { #[cfg(unix)] { Ok(pnet_datalink::interfaces() .into_iter() - .filter(|iface| iface.is_up() && iface.is_running()) + .filter(|iface| { + if let Some(interface) = interface.clone() { + if iface.name != interface { + return false; + } + } + iface.is_up() && iface.is_running() + }) .flat_map(|iface| iface.ips) .map(|ipnet| ipnet.ip()) .collect()) @@ -412,25 +419,26 @@ pub fn get_interface_names_by_addr(addr: IpAddr) -> ZResult> { } } -pub fn get_ipv4_ipaddrs() -> Vec { - get_local_addresses() +pub fn get_ipv4_ipaddrs(interface: Option) -> Vec { + get_local_addresses(interface) .unwrap_or_else(|_| vec![]) .drain(..) .filter_map(|x| match x { IpAddr::V4(a) => Some(a), IpAddr::V6(_) => None, }) - .filter(|x| !x.is_loopback() && !x.is_multicast()) + .filter(|x| !x.is_multicast()) + // .filter(|x| !x.is_loopback() && !x.is_multicast()) // TODO(sashacmc): Why we exclude loopback? .map(IpAddr::V4) .collect() } -pub fn get_ipv6_ipaddrs() -> Vec { +pub fn get_ipv6_ipaddrs(interface: Option) -> Vec { const fn is_unicast_link_local(addr: &Ipv6Addr) -> bool { (addr.segments()[0] & 0xffc0) == 0xfe80 } - let ipaddrs = get_local_addresses().unwrap_or_else(|_| vec![]); + let ipaddrs = get_local_addresses(interface).unwrap_or_else(|_| vec![]); // Get first all IPv4 addresses let ipv4_iter = ipaddrs @@ -440,7 +448,8 @@ pub fn get_ipv6_ipaddrs() -> Vec { IpAddr::V6(_) => None, }) .filter(|x| { - !x.is_loopback() && !x.is_link_local() && !x.is_multicast() && !x.is_broadcast() + // !x.is_loopback() && !x.is_link_local() && !x.is_multicast() && !x.is_broadcast() // TODO(sashacmc): Why we exclude loopback? + !x.is_multicast() && !x.is_broadcast() }); // Get next all IPv6 addresses diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 2ee28c3f08..0a43aac3d9 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -36,6 +36,9 @@ use zenoh_result::ZResult; /*************************************/ /* GENERAL */ /*************************************/ + +pub const BIND_INTERFACE: &str = "iface"; + #[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] pub struct Link { pub src: Locator, diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index 1d5d7bb172..89b8d9e918 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -23,6 +23,8 @@ use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::Signal; +use crate::BIND_INTERFACE; + pub struct ListenerUnicastIP { endpoint: EndPoint, active: Arc, @@ -109,12 +111,17 @@ impl ListenersUnicastIP { let guard = zread!(self.listeners); for (key, value) in guard.iter() { let (kip, kpt) = (key.ip(), key.port()); + let iface = value + .endpoint + .config() + .get(BIND_INTERFACE) + .map(|s| s.to_string()); // Either ipv4/0.0.0.0 or ipv6/[::] if kip.is_unspecified() { let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), + IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(iface), + IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(iface), }; let iter = addrs.drain(..).map(|x| { Locator::new( diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 175d3eb9d7..cfad120d84 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::time::Duration; use zenoh_link_commons::{ get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, + ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -216,22 +216,28 @@ impl LinkManagerUnicastTcp { Ok((stream, src_addr, dst_addr)) } - async fn new_listener_inner(&self, addr: &SocketAddr) -> ZResult<(TcpListener, SocketAddr)> { + async fn new_listener_inner( + &self, + addr: &SocketAddr, + iface: &Option, + ) -> ZResult<(TcpListener, SocketAddr)> { // Bind the TCP socket let socket = TcpListener::bind(addr) .await .map_err(|e| zerror!("{}: {}", addr, e))?; - //let iface = "wg0"; - let iface = "lo"; - unsafe { - libc::setsockopt( - socket.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_BINDTODEVICE, - iface.as_ptr() as *const std::os::raw::c_void, - iface.len() as libc::socklen_t, - ); + if let Some(iface) = iface { + // @TODO: switch to bind_device after tokio porting + log::debug!("Listen at the interface: {}", iface); + unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_BINDTODEVICE, + iface.as_ptr() as *const std::os::raw::c_void, + iface.len() as libc::socklen_t, + ); + } } let local_addr = socket @@ -273,10 +279,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; - + let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da).await { + match self.new_listener_inner(&da, &iface).await { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new( diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 4276e2bfaf..0ff1b1ab46 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -416,7 +416,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { for (key, value) in guard.iter() { let listener_locator = value.endpoint.to_locator(); if key.ip() == default_ipv4 { - match zenoh_util::net::get_local_addresses() { + match zenoh_util::net::get_local_addresses(None) { Ok(ipaddrs) => { for ipaddr in ipaddrs { if !ipaddr.is_loopback() && !ipaddr.is_multicast() && ipaddr.is_ipv4() { @@ -433,7 +433,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { Err(err) => log::error!("Unable to get local addresses: {}", err), } } else if key.ip() == default_ipv6 { - match zenoh_util::net::get_local_addresses() { + match zenoh_util::net::get_local_addresses(None) { Ok(ipaddrs) => { for ipaddr in ipaddrs { if !ipaddr.is_loopback() && !ipaddr.is_multicast() && ipaddr.is_ipv6() { From cb88fb9e7830be239326a55cb1ddc5e4223385c1 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 23 Feb 2024 16:47:44 +0100 Subject: [PATCH 3/9] Add UDP support, fix windows/macos build --- Cargo.lock | 2 +- io/zenoh-link-commons/Cargo.toml | 1 + io/zenoh-link-commons/src/unicast.rs | 24 ++++++++++++++++++++ io/zenoh-links/zenoh-link-tcp/Cargo.toml | 1 - io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 18 +++------------ io/zenoh-links/zenoh-link-udp/src/unicast.rs | 17 ++++++++++---- 6 files changed, 42 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17d5d7a3df..9590364884 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4779,6 +4779,7 @@ dependencies = [ "async-std", "async-trait", "flume", + "libc", "log", "lz4_flex", "serde", @@ -4844,7 +4845,6 @@ version = "0.11.0-dev" dependencies = [ "async-std", "async-trait", - "libc", "log", "zenoh-core", "zenoh-link-commons", diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index 019067de56..a691473cb3 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -34,6 +34,7 @@ async-trait = { workspace = true } flume = { workspace = true } lz4_flex = { workspace = true } log = { workspace = true } +libc = { workspace = true } serde = { workspace = true, features = ["default"] } typenum = { workspace = true } zenoh-buffers = { workspace = true } diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 1237024ca9..2287941542 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -114,3 +114,27 @@ pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec { } } } + +pub fn set_bind_to_device(socket: std::os::raw::c_int, iface: &Option) { + #[cfg(target_os = "linux")] + { + if let Some(iface) = iface { + // @TODO: switch to bind_device after tokio porting + log::debug!("Listen at the interface: {}", iface); + unsafe { + libc::setsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_BINDTODEVICE, + iface.as_ptr() as *const std::os::raw::c_void, + iface.len() as libc::socklen_t, + ); + } + } + } + + #[cfg(any(target_os = "macos", target_os = "windows"))] + { + log::warn!("Listen at the interface is not supported for this platform"); + } +} diff --git a/io/zenoh-links/zenoh-link-tcp/Cargo.toml b/io/zenoh-links/zenoh-link-tcp/Cargo.toml index 9e20f7c7c0..9c4725ff03 100644 --- a/io/zenoh-links/zenoh-link-tcp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tcp/Cargo.toml @@ -28,7 +28,6 @@ description = "Internal crate for zenoh." async-std = { workspace = true } async-trait = { workspace = true } log = { workspace = true } -libc = { workspace = true } zenoh-core = { workspace = true } zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index cfad120d84..f421e55d08 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -23,8 +23,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use zenoh_link_commons::{ - get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + get_ip_interface_names, set_bind_to_device, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -226,19 +226,7 @@ impl LinkManagerUnicastTcp { .await .map_err(|e| zerror!("{}: {}", addr, e))?; - if let Some(iface) = iface { - // @TODO: switch to bind_device after tokio porting - log::debug!("Listen at the interface: {}", iface); - unsafe { - libc::setsockopt( - socket.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_BINDTODEVICE, - iface.as_ptr() as *const std::os::raw::c_void, - iface.len() as libc::socklen_t, - ); - } - } + set_bind_to_device(socket.as_raw_fd(), iface); let local_addr = socket .local_addr() diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index a5bd3c7726..169558bc88 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -22,13 +22,15 @@ use async_std::task; use async_trait::async_trait; use std::collections::HashMap; use std::fmt; +use std::os::fd::AsRawFd; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ - get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, + get_ip_interface_names, set_bind_to_device, ConstructibleLinkManagerUnicast, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, + NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -301,7 +303,11 @@ impl LinkManagerUnicastUdp { Ok((socket, src_addr, dst_addr)) } - async fn new_listener_inner(&self, addr: &SocketAddr) -> ZResult<(UdpSocket, SocketAddr)> { + async fn new_listener_inner( + &self, + addr: &SocketAddr, + iface: &Option, + ) -> ZResult<(UdpSocket, SocketAddr)> { // Bind the UDP socket let socket = UdpSocket::bind(addr).await.map_err(|e| { let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e); @@ -309,6 +315,8 @@ impl LinkManagerUnicastUdp { e })?; + set_bind_to_device(socket.as_raw_fd(), iface); + let local_addr = socket.local_addr().map_err(|e| { let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e); log::warn!("{}", e); @@ -362,10 +370,11 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { let addrs = get_udp_addrs(endpoint.address()) .await? .filter(|a| !a.ip().is_multicast()); + let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da).await { + match self.new_listener_inner(&da, &iface).await { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new( From ebc9de100ad186716d12710b48e6a2311990cd37 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 23 Feb 2024 18:58:56 +0100 Subject: [PATCH 4/9] Add tests, fix windows build --- commons/zenoh-util/src/std_only/net/mod.rs | 11 +++-- io/zenoh-link-commons/src/unicast.rs | 6 ++- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 1 + io/zenoh-links/zenoh-link-udp/src/unicast.rs | 1 + io/zenoh-transport/tests/unicast_openclose.rs | 45 +++++++++++++++++-- 5 files changed, 56 insertions(+), 8 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 810e4428f8..3c4c5768a8 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -239,6 +239,11 @@ pub fn get_local_addresses(interface: Option) -> ZResult> { let mut result = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { + if let Some(interface) = interface.clone() { + if ffi::pstr_to_string(iface.AdapterName) != interface { + continue; + } + } let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); while let Some(ucast_addr) = next_ucast_addr { if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) { @@ -427,8 +432,7 @@ pub fn get_ipv4_ipaddrs(interface: Option) -> Vec { IpAddr::V4(a) => Some(a), IpAddr::V6(_) => None, }) - .filter(|x| !x.is_multicast()) - // .filter(|x| !x.is_loopback() && !x.is_multicast()) // TODO(sashacmc): Why we exclude loopback? + .filter(|x| !x.is_loopback() && !x.is_multicast()) .map(IpAddr::V4) .collect() } @@ -448,8 +452,7 @@ pub fn get_ipv6_ipaddrs(interface: Option) -> Vec { IpAddr::V6(_) => None, }) .filter(|x| { - // !x.is_loopback() && !x.is_link_local() && !x.is_multicast() && !x.is_broadcast() // TODO(sashacmc): Why we exclude loopback? - !x.is_multicast() && !x.is_broadcast() + !x.is_loopback() && !x.is_link_local() && !x.is_multicast() && !x.is_broadcast() }); // Get next all IPv6 addresses diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 2287941542..278294b3ce 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -135,6 +135,10 @@ pub fn set_bind_to_device(socket: std::os::raw::c_int, iface: &Option) { #[cfg(any(target_os = "macos", target_os = "windows"))] { - log::warn!("Listen at the interface is not supported for this platform"); + log::warn!( + "Listen at the interface ({:?}, {:?}) is not supported for this platform", + socket, + iface + ); } } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index f421e55d08..066c5556af 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -226,6 +226,7 @@ impl LinkManagerUnicastTcp { .await .map_err(|e| zerror!("{}: {}", addr, e))?; + #[cfg(unix)] set_bind_to_device(socket.as_raw_fd(), iface); let local_addr = socket diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 169558bc88..3814d4c553 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -315,6 +315,7 @@ impl LinkManagerUnicastUdp { e })?; + #[cfg(unix)] set_bind_to_device(socket.as_raw_fd(), iface); let local_addr = socket.local_addr().map_err(|e| { diff --git a/io/zenoh-transport/tests/unicast_openclose.rs b/io/zenoh-transport/tests/unicast_openclose.rs index 76a63cc6e0..fa4b8614cf 100644 --- a/io/zenoh-transport/tests/unicast_openclose.rs +++ b/io/zenoh-transport/tests/unicast_openclose.rs @@ -23,6 +23,7 @@ use zenoh_transport::{ DummyTransportPeerEventHandler, TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; +use zenoh_util::net::get_ipv4_ipaddrs; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_millis(100); @@ -153,10 +154,10 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { let mut links_num = 1; println!("Transport Open Close [1c1]"); - let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + let open_res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); println!("Transport Open Close [1c2]: {res:?}"); - assert!(res.is_ok()); - let c_ses1 = res.unwrap(); + assert!(open_res.is_ok()); + let c_ses1 = open_res.unwrap(); println!("Transport Open Close [1d1]"); let transports = ztimeout!(client01_manager.get_transports_unicast()); println!("Transport Open Close [1d2]: {transports:?}"); @@ -786,3 +787,41 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== task::block_on(openclose_universal_transport(&endpoint)); } + +#[cfg(feature = "transport_tcp")] +#[cfg(unix)] +#[test] +#[should_panic(expected = "assertion failed: open_res.is_ok()")] +fn openclose_tcp_only_with_interface_restriction() { + let addrs = get_ipv4_ipaddrs(None); + + let _ = env_logger::try_init(); + task::block_on(async { + zasync_executor_init!(); + }); + + // should not connect to local interface and external address + let endpoint: EndPoint = format!("tcp/{}:{}#iface=lo", addrs[0], 13001) + .parse() + .unwrap(); + task::block_on(openclose_universal_transport(&endpoint)); +} + +#[cfg(feature = "transport_udp")] +#[cfg(unix)] +#[test] +#[should_panic(expected = "assertion failed: open_res.is_ok()")] +fn openclose_udp_only_with_interface_restriction() { + let addrs = get_ipv4_ipaddrs(None); + + let _ = env_logger::try_init(); + task::block_on(async { + zasync_executor_init!(); + }); + + // should not connect to local interface and external address + let endpoint: EndPoint = format!("udp/{}:{}#iface=lo", addrs[0], 13011) + .parse() + .unwrap(); + task::block_on(openclose_universal_transport(&endpoint)); +} From 409b8855608e954e0af38d8a5a71ed9ecce2db39 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 23 Feb 2024 19:18:59 +0100 Subject: [PATCH 5/9] Fix windows/macos builds --- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 9 ++++++--- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 10 ++++++---- io/zenoh-transport/tests/unicast_openclose.rs | 4 ++-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 066c5556af..e16b44aadd 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -23,13 +23,16 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use zenoh_link_commons::{ - get_ip_interface_names, set_bind_to_device, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; use zenoh_sync::Signal; +#[cfg(unix)] +use zenoh_link_commons::set_bind_to_device; + use super::{ get_tcp_addrs, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU, TCP_LINGER_TIMEOUT, TCP_LOCATOR_PREFIX, @@ -219,7 +222,7 @@ impl LinkManagerUnicastTcp { async fn new_listener_inner( &self, addr: &SocketAddr, - iface: &Option, + #[warn(unused_variables)] iface: &Option, ) -> ZResult<(TcpListener, SocketAddr)> { // Bind the TCP socket let socket = TcpListener::bind(addr) diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 3814d4c553..99577ae44b 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -28,15 +28,17 @@ use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ - get_ip_interface_names, set_bind_to_device, ConstructibleLinkManagerUnicast, - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, - NewLinkChannelSender, BIND_INTERFACE, + get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; use zenoh_sync::Mvar; use zenoh_sync::Signal; +#[cfg(unix)] +use zenoh_link_commons::set_bind_to_device; + type LinkHashMap = Arc>>>; type LinkInput = (Vec, usize); type LinkLeftOver = (Vec, usize, usize); @@ -306,7 +308,7 @@ impl LinkManagerUnicastUdp { async fn new_listener_inner( &self, addr: &SocketAddr, - iface: &Option, + #[warn(unused_variables)] iface: &Option, ) -> ZResult<(UdpSocket, SocketAddr)> { // Bind the UDP socket let socket = UdpSocket::bind(addr).await.map_err(|e| { diff --git a/io/zenoh-transport/tests/unicast_openclose.rs b/io/zenoh-transport/tests/unicast_openclose.rs index fa4b8614cf..b2526d2c7b 100644 --- a/io/zenoh-transport/tests/unicast_openclose.rs +++ b/io/zenoh-transport/tests/unicast_openclose.rs @@ -789,7 +789,7 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== } #[cfg(feature = "transport_tcp")] -#[cfg(unix)] +#[cfg(target_os = "linux")] #[test] #[should_panic(expected = "assertion failed: open_res.is_ok()")] fn openclose_tcp_only_with_interface_restriction() { @@ -808,7 +808,7 @@ fn openclose_tcp_only_with_interface_restriction() { } #[cfg(feature = "transport_udp")] -#[cfg(unix)] +#[cfg(target_os = "linux")] #[test] #[should_panic(expected = "assertion failed: open_res.is_ok()")] fn openclose_udp_only_with_interface_restriction() { From f252c7d331696e29669836ccb8c2efe1408d7b9f Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 23 Feb 2024 19:52:26 +0100 Subject: [PATCH 6/9] Move platfrom-related code to the zenoh-util --- commons/zenoh-util/src/std_only/net/mod.rs | 41 ++++++++++++++++++- io/zenoh-link-commons/src/unicast.rs | 28 ------------- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 9 +--- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 9 +--- io/zenoh-transport/tests/unicast_openclose.rs | 2 + 5 files changed, 46 insertions(+), 43 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 3c4c5768a8..33a8f348d9 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::net::TcpStream; +use async_std::net::{TcpListener, TcpStream, UdpSocket}; use std::net::{IpAddr, Ipv6Addr}; use std::time::Duration; use zenoh_core::zconfigurable; @@ -491,3 +491,42 @@ pub fn get_ipv6_ipaddrs(interface: Option) -> Vec { .chain(priv_ipv4_addrs) .collect() } + +#[cfg(target_os = "linux")] +fn set_bind_to_device(socket: std::os::raw::c_int, iface: &Option) { + if let Some(iface) = iface { + // @TODO: switch to bind_device after tokio porting + log::debug!("Listen at the interface: {}", iface); + unsafe { + libc::setsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_BINDTODEVICE, + iface.as_ptr() as *const std::os::raw::c_void, + iface.len() as libc::socklen_t, + ); + } + } +} + +#[cfg(target_os = "linux")] +pub fn set_bind_to_device_tcp(socket: &TcpListener, iface: &Option) { + use std::os::fd::AsRawFd; + set_bind_to_device(socket.as_raw_fd(), iface); +} + +#[cfg(target_os = "linux")] +pub fn set_bind_to_device_udp(socket: &UdpSocket, iface: &Option) { + use std::os::fd::AsRawFd; + set_bind_to_device(socket.as_raw_fd(), iface); +} + +#[cfg(any(target_os = "macos", target_os = "windows"))] +pub fn set_bind_to_device_tcp(_socket: &TcpListener, _iface: &Option) { + log::warn!("Listen at the interface is not supported for this platform"); +} + +#[cfg(any(target_os = "macos", target_os = "windows"))] +pub fn set_bind_to_device_udp(_socket: &UdpSocket, _iface: &Option) { + log::warn!("Listen at the interface is not supported for this platform"); +} diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 278294b3ce..1237024ca9 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -114,31 +114,3 @@ pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec { } } } - -pub fn set_bind_to_device(socket: std::os::raw::c_int, iface: &Option) { - #[cfg(target_os = "linux")] - { - if let Some(iface) = iface { - // @TODO: switch to bind_device after tokio porting - log::debug!("Listen at the interface: {}", iface); - unsafe { - libc::setsockopt( - socket, - libc::SOL_SOCKET, - libc::SO_BINDTODEVICE, - iface.as_ptr() as *const std::os::raw::c_void, - iface.len() as libc::socklen_t, - ); - } - } - } - - #[cfg(any(target_os = "macos", target_os = "windows"))] - { - log::warn!( - "Listen at the interface ({:?}, {:?}) is not supported for this platform", - socket, - iface - ); - } -} diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index e16b44aadd..34f56be586 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -18,7 +18,6 @@ use async_trait::async_trait; use std::convert::TryInto; use std::fmt; use std::net::Shutdown; -use std::os::fd::AsRawFd; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -30,9 +29,6 @@ use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; use zenoh_sync::Signal; -#[cfg(unix)] -use zenoh_link_commons::set_bind_to_device; - use super::{ get_tcp_addrs, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU, TCP_LINGER_TIMEOUT, TCP_LOCATOR_PREFIX, @@ -222,15 +218,14 @@ impl LinkManagerUnicastTcp { async fn new_listener_inner( &self, addr: &SocketAddr, - #[warn(unused_variables)] iface: &Option, + iface: &Option, ) -> ZResult<(TcpListener, SocketAddr)> { // Bind the TCP socket let socket = TcpListener::bind(addr) .await .map_err(|e| zerror!("{}: {}", addr, e))?; - #[cfg(unix)] - set_bind_to_device(socket.as_raw_fd(), iface); + zenoh_util::net::set_bind_to_device_tcp(&socket, iface); let local_addr = socket .local_addr() diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 99577ae44b..129ee9ae63 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -22,7 +22,6 @@ use async_std::task; use async_trait::async_trait; use std::collections::HashMap; use std::fmt; -use std::os::fd::AsRawFd; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; @@ -36,9 +35,6 @@ use zenoh_result::{bail, zerror, Error as ZError, ZResult}; use zenoh_sync::Mvar; use zenoh_sync::Signal; -#[cfg(unix)] -use zenoh_link_commons::set_bind_to_device; - type LinkHashMap = Arc>>>; type LinkInput = (Vec, usize); type LinkLeftOver = (Vec, usize, usize); @@ -308,7 +304,7 @@ impl LinkManagerUnicastUdp { async fn new_listener_inner( &self, addr: &SocketAddr, - #[warn(unused_variables)] iface: &Option, + iface: &Option, ) -> ZResult<(UdpSocket, SocketAddr)> { // Bind the UDP socket let socket = UdpSocket::bind(addr).await.map_err(|e| { @@ -317,8 +313,7 @@ impl LinkManagerUnicastUdp { e })?; - #[cfg(unix)] - set_bind_to_device(socket.as_raw_fd(), iface); + zenoh_util::net::set_bind_to_device_udp(&socket, iface); let local_addr = socket.local_addr().map_err(|e| { let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e); diff --git a/io/zenoh-transport/tests/unicast_openclose.rs b/io/zenoh-transport/tests/unicast_openclose.rs index b2526d2c7b..224bf33574 100644 --- a/io/zenoh-transport/tests/unicast_openclose.rs +++ b/io/zenoh-transport/tests/unicast_openclose.rs @@ -23,6 +23,8 @@ use zenoh_transport::{ DummyTransportPeerEventHandler, TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; + +#[cfg(target_os = "linux")] use zenoh_util::net::get_ipv4_ipaddrs; const TIMEOUT: Duration = Duration::from_secs(60); From b2338223b59db26c5194a99160110b032967fa6a Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 23 Feb 2024 20:18:29 +0100 Subject: [PATCH 7/9] Remove unused dependencies --- Cargo.lock | 1 - io/zenoh-link-commons/Cargo.toml | 1 - io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 1 + 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9590364884..fa9de7e800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4779,7 +4779,6 @@ dependencies = [ "async-std", "async-trait", "flume", - "libc", "log", "lz4_flex", "serde", diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index a691473cb3..019067de56 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -34,7 +34,6 @@ async-trait = { workspace = true } flume = { workspace = true } lz4_flex = { workspace = true } log = { workspace = true } -libc = { workspace = true } serde = { workspace = true, features = ["default"] } typenum = { workspace = true } zenoh-buffers = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 34f56be586..40f177b681 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -267,6 +267,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); + let mut errs: Vec = vec![]; for da in addrs { match self.new_listener_inner(&da, &iface).await { From e96de43f07c09511ec96ec592e091554c9b749a8 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 23 Feb 2024 21:22:25 +0100 Subject: [PATCH 8/9] Add interface binding on connection --- commons/zenoh-util/src/std_only/net/mod.rs | 19 ++- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 8 +- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 8 +- io/zenoh-transport/tests/unicast_openclose.rs | 114 +++++++++++++++--- 4 files changed, 123 insertions(+), 26 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 33a8f348d9..cd5385309b 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -510,23 +510,34 @@ fn set_bind_to_device(socket: std::os::raw::c_int, iface: &Option) { } #[cfg(target_os = "linux")] -pub fn set_bind_to_device_tcp(socket: &TcpListener, iface: &Option) { +pub fn set_bind_to_device_tcp_listener(socket: &TcpListener, iface: &Option) { use std::os::fd::AsRawFd; set_bind_to_device(socket.as_raw_fd(), iface); } #[cfg(target_os = "linux")] -pub fn set_bind_to_device_udp(socket: &UdpSocket, iface: &Option) { +pub fn set_bind_to_device_tcp_stream(socket: &TcpStream, iface: &Option) { use std::os::fd::AsRawFd; set_bind_to_device(socket.as_raw_fd(), iface); } +#[cfg(target_os = "linux")] +pub fn set_bind_to_device_udp_socket(socket: &UdpSocket, iface: &Option) { + use std::os::fd::AsRawFd; + set_bind_to_device(socket.as_raw_fd(), iface); +} + +#[cfg(any(target_os = "macos", target_os = "windows"))] +pub fn set_bind_to_device_tcp_listener(_socket: &TcpListener, _iface: &Option) { + log::warn!("Listen at the interface is not supported for this platform"); +} + #[cfg(any(target_os = "macos", target_os = "windows"))] -pub fn set_bind_to_device_tcp(_socket: &TcpListener, _iface: &Option) { +pub fn set_bind_to_device_tcp_stream(_socket: &TcpStream, _iface: &Option) { log::warn!("Listen at the interface is not supported for this platform"); } #[cfg(any(target_os = "macos", target_os = "windows"))] -pub fn set_bind_to_device_udp(_socket: &UdpSocket, _iface: &Option) { +pub fn set_bind_to_device_udp_socket(_socket: &UdpSocket, _iface: &Option) { log::warn!("Listen at the interface is not supported for this platform"); } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 40f177b681..25f9603995 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -199,6 +199,7 @@ impl LinkManagerUnicastTcp { async fn new_link_inner( &self, dst_addr: &SocketAddr, + iface: &Option, ) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> { let stream = TcpStream::connect(dst_addr) .await @@ -212,6 +213,8 @@ impl LinkManagerUnicastTcp { .peer_addr() .map_err(|e| zerror!("{}: {}", dst_addr, e))?; + zenoh_util::net::set_bind_to_device_tcp_stream(&stream, iface); + Ok((stream, src_addr, dst_addr)) } @@ -225,7 +228,7 @@ impl LinkManagerUnicastTcp { .await .map_err(|e| zerror!("{}: {}", addr, e))?; - zenoh_util::net::set_bind_to_device_tcp(&socket, iface); + zenoh_util::net::set_bind_to_device_tcp_listener(&socket, iface); let local_addr = socket .local_addr() @@ -239,10 +242,11 @@ impl LinkManagerUnicastTcp { impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_link(&self, endpoint: EndPoint) -> ZResult { let dst_addrs = get_tcp_addrs(endpoint.address()).await?; + let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); let mut errs: Vec = vec![]; for da in dst_addrs { - match self.new_link_inner(&da).await { + match self.new_link_inner(&da, &iface).await { Ok((stream, src_addr, dst_addr)) => { let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); return Ok(LinkUnicast(link)); diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 129ee9ae63..53bcc95c9f 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -261,6 +261,7 @@ impl LinkManagerUnicastUdp { async fn new_link_inner( &self, dst_addr: &SocketAddr, + iface: &Option, ) -> ZResult<(UdpSocket, SocketAddr, SocketAddr)> { // Establish a UDP socket let socket = UdpSocket::bind(SocketAddr::new( @@ -278,6 +279,8 @@ impl LinkManagerUnicastUdp { e })?; + zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface); + // Connect the socket to the remote address socket.connect(dst_addr).await.map_err(|e| { let e = zerror!("Can not create a new UDP link bound to {}: {}", dst_addr, e); @@ -313,7 +316,7 @@ impl LinkManagerUnicastUdp { e })?; - zenoh_util::net::set_bind_to_device_udp(&socket, iface); + zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface); let local_addr = socket.local_addr().map_err(|e| { let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e); @@ -331,10 +334,11 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { let dst_addrs = get_udp_addrs(endpoint.address()) .await? .filter(|a| !a.ip().is_multicast()); + let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); let mut errs: Vec = vec![]; for da in dst_addrs { - match self.new_link_inner(&da).await { + match self.new_link_inner(&da, &iface).await { Ok((socket, src_addr, dst_addr)) => { // Create UDP link let link = Arc::new(LinkUnicastUdp::new( diff --git a/io/zenoh-transport/tests/unicast_openclose.rs b/io/zenoh-transport/tests/unicast_openclose.rs index 224bf33574..56b686947a 100644 --- a/io/zenoh-transport/tests/unicast_openclose.rs +++ b/io/zenoh-transport/tests/unicast_openclose.rs @@ -28,6 +28,7 @@ use zenoh_transport::{ use zenoh_util::net::get_ipv4_ipaddrs; const TIMEOUT: Duration = Duration::from_secs(60); +const TIMEOUT_EXPECTED: Duration = Duration::from_secs(5); const SLEEP: Duration = Duration::from_millis(100); macro_rules! ztimeout { @@ -36,6 +37,12 @@ macro_rules! ztimeout { }; } +macro_rules! ztimeout_expected { + ($f:expr) => { + $f.timeout(TIMEOUT_EXPECTED).await.unwrap() + }; +} + #[cfg(test)] #[derive(Default)] struct SHRouterOpenClose; @@ -83,7 +90,11 @@ impl TransportEventHandler for SHClientOpenClose { } } -async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { +async fn openclose_transport( + listen_endpoint: &EndPoint, + connect_endpoint: &EndPoint, + lowlatency_transport: bool, +) { /* [ROUTER] */ let router_id = ZenohId::try_from([1]).unwrap(); @@ -143,7 +154,7 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { /* [1] */ println!("\nTransport Open Close [1a1]"); // Add the locator on the router - let res = ztimeout!(router_manager.add_listener(endpoint.clone())); + let res = ztimeout!(router_manager.add_listener(listen_endpoint.clone())); println!("Transport Open Close [1a1]: {res:?}"); assert!(res.is_ok()); println!("Transport Open Close [1a2]"); @@ -156,7 +167,8 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { let mut links_num = 1; println!("Transport Open Close [1c1]"); - let open_res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + let open_res = + ztimeout_expected!(client01_manager.open_transport_unicast(connect_endpoint.clone())); println!("Transport Open Close [1c2]: {res:?}"); assert!(open_res.is_ok()); let c_ses1 = open_res.unwrap(); @@ -198,7 +210,7 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { links_num = 2; println!("\nTransport Open Close [2a1]"); - let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + let res = ztimeout!(client01_manager.open_transport_unicast(connect_endpoint.clone())); println!("Transport Open Close [2a2]: {res:?}"); assert!(res.is_ok()); let c_ses2 = res.unwrap(); @@ -238,7 +250,7 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { // Open transport -> This should be rejected because // of the maximum limit of links per transport println!("\nTransport Open Close [3a1]"); - let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + let res = ztimeout!(client01_manager.open_transport_unicast(connect_endpoint.clone())); println!("Transport Open Close [3a2]: {res:?}"); assert!(res.is_err()); println!("Transport Open Close [3b1]"); @@ -297,7 +309,7 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { links_num = 1; println!("\nTransport Open Close [5a1]"); - let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + let res = ztimeout!(client01_manager.open_transport_unicast(connect_endpoint.clone())); println!("Transport Open Close [5a2]: {res:?}"); assert!(res.is_ok()); let c_ses3 = res.unwrap(); @@ -329,7 +341,7 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { // Open transport -> This should be rejected because // of the maximum limit of transports println!("\nTransport Open Close [6a1]"); - let res = ztimeout!(client02_manager.open_transport_unicast(endpoint.clone())); + let res = ztimeout!(client02_manager.open_transport_unicast(connect_endpoint.clone())); println!("Transport Open Close [6a2]: {res:?}"); assert!(res.is_err()); println!("Transport Open Close [6b1]"); @@ -380,7 +392,7 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { links_num = 1; println!("\nTransport Open Close [8a1]"); - let res = ztimeout!(client02_manager.open_transport_unicast(endpoint.clone())); + let res = ztimeout!(client02_manager.open_transport_unicast(connect_endpoint.clone())); println!("Transport Open Close [8a2]: {res:?}"); assert!(res.is_ok()); let c_ses4 = res.unwrap(); @@ -438,7 +450,7 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { /* [10] */ // Perform clean up of the open locators println!("\nTransport Open Close [10a1]"); - let res = ztimeout!(router_manager.del_listener(endpoint)); + let res = ztimeout!(router_manager.del_listener(listen_endpoint)); println!("Transport Open Close [10a2]: {res:?}"); assert!(res.is_ok()); @@ -460,11 +472,11 @@ async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { } async fn openclose_universal_transport(endpoint: &EndPoint) { - openclose_transport(endpoint, false).await + openclose_transport(endpoint, endpoint, false).await } async fn openclose_lowlatency_transport(endpoint: &EndPoint) { - openclose_transport(endpoint, true).await + openclose_transport(endpoint, endpoint, true).await } #[cfg(feature = "transport_tcp")] @@ -790,11 +802,37 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== task::block_on(openclose_universal_transport(&endpoint)); } +#[cfg(feature = "transport_tcp")] +#[cfg(target_os = "linux")] +#[test] +#[should_panic(expected = "TimeoutError")] +fn openclose_tcp_only_connect_with_interface_restriction() { + let addrs = get_ipv4_ipaddrs(None); + + let _ = env_logger::try_init(); + task::block_on(async { + zasync_executor_init!(); + }); + + let listen_endpoint: EndPoint = format!("tcp/{}:{}", addrs[0], 13001).parse().unwrap(); + + let connect_endpoint: EndPoint = format!("tcp/{}:{}#iface=lo", addrs[0], 13001) + .parse() + .unwrap(); + + // should not connect to local interface and external address + task::block_on(openclose_transport( + &listen_endpoint, + &connect_endpoint, + false, + )); +} + #[cfg(feature = "transport_tcp")] #[cfg(target_os = "linux")] #[test] #[should_panic(expected = "assertion failed: open_res.is_ok()")] -fn openclose_tcp_only_with_interface_restriction() { +fn openclose_tcp_only_listen_with_interface_restriction() { let addrs = get_ipv4_ipaddrs(None); let _ = env_logger::try_init(); @@ -802,18 +840,51 @@ fn openclose_tcp_only_with_interface_restriction() { zasync_executor_init!(); }); + let listen_endpoint: EndPoint = format!("tcp/{}:{}#iface=lo", addrs[0], 13002) + .parse() + .unwrap(); + + let connect_endpoint: EndPoint = format!("tcp/{}:{}", addrs[0], 13002).parse().unwrap(); + // should not connect to local interface and external address - let endpoint: EndPoint = format!("tcp/{}:{}#iface=lo", addrs[0], 13001) + task::block_on(openclose_transport( + &listen_endpoint, + &connect_endpoint, + false, + )); +} + +#[cfg(feature = "transport_udp")] +#[cfg(target_os = "linux")] +#[test] +#[should_panic(expected = "TimeoutError")] +fn openclose_udp_only_connect_with_interface_restriction() { + let addrs = get_ipv4_ipaddrs(None); + + let _ = env_logger::try_init(); + task::block_on(async { + zasync_executor_init!(); + }); + + let listen_endpoint: EndPoint = format!("udp/{}:{}", addrs[0], 13003).parse().unwrap(); + + let connect_endpoint: EndPoint = format!("udp/{}:{}#iface=lo", addrs[0], 13003) .parse() .unwrap(); - task::block_on(openclose_universal_transport(&endpoint)); + + // should not connect to local interface and external address + task::block_on(openclose_transport( + &listen_endpoint, + &connect_endpoint, + false, + )); } #[cfg(feature = "transport_udp")] #[cfg(target_os = "linux")] #[test] #[should_panic(expected = "assertion failed: open_res.is_ok()")] -fn openclose_udp_only_with_interface_restriction() { +fn openclose_udp_onlyi_listen_with_interface_restriction() { let addrs = get_ipv4_ipaddrs(None); let _ = env_logger::try_init(); @@ -821,9 +892,16 @@ fn openclose_udp_only_with_interface_restriction() { zasync_executor_init!(); }); - // should not connect to local interface and external address - let endpoint: EndPoint = format!("udp/{}:{}#iface=lo", addrs[0], 13011) + let listen_endpoint: EndPoint = format!("udp/{}:{}#iface=lo", addrs[0], 13004) .parse() .unwrap(); - task::block_on(openclose_universal_transport(&endpoint)); + + let connect_endpoint: EndPoint = format!("udp/{}:{}", addrs[0], 13004).parse().unwrap(); + + // should not connect to local interface and external address + task::block_on(openclose_transport( + &listen_endpoint, + &connect_endpoint, + false, + )); } From c6f81bd43a0c3dc417c26643f93b9f1e25c3d7d0 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Mon, 26 Feb 2024 19:18:03 +0100 Subject: [PATCH 9/9] String usage refactoring --- commons/zenoh-util/src/std_only/net/mod.rs | 28 ++++++++++---------- io/zenoh-link-commons/src/listener.rs | 7 ++--- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 14 +++++----- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 14 +++++----- 4 files changed, 32 insertions(+), 31 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index cd5385309b..8658e24bbc 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -210,14 +210,14 @@ pub fn get_multicast_interfaces() -> Vec { } } -pub fn get_local_addresses(interface: Option) -> ZResult> { +pub fn get_local_addresses(interface: Option<&str>) -> ZResult> { #[cfg(unix)] { Ok(pnet_datalink::interfaces() .into_iter() .filter(|iface| { - if let Some(interface) = interface.clone() { - if iface.name != interface { + if let Some(interface) = interface.as_ref() { + if iface.name != *interface { return false; } } @@ -239,8 +239,8 @@ pub fn get_local_addresses(interface: Option) -> ZResult> { let mut result = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { - if let Some(interface) = interface.clone() { - if ffi::pstr_to_string(iface.AdapterName) != interface { + if let Some(interface) = interface.as_ref() { + if ffi::pstr_to_string(iface.AdapterName) != *interface { continue; } } @@ -424,7 +424,7 @@ pub fn get_interface_names_by_addr(addr: IpAddr) -> ZResult> { } } -pub fn get_ipv4_ipaddrs(interface: Option) -> Vec { +pub fn get_ipv4_ipaddrs(interface: Option<&str>) -> Vec { get_local_addresses(interface) .unwrap_or_else(|_| vec![]) .drain(..) @@ -437,7 +437,7 @@ pub fn get_ipv4_ipaddrs(interface: Option) -> Vec { .collect() } -pub fn get_ipv6_ipaddrs(interface: Option) -> Vec { +pub fn get_ipv6_ipaddrs(interface: Option<&str>) -> Vec { const fn is_unicast_link_local(addr: &Ipv6Addr) -> bool { (addr.segments()[0] & 0xffc0) == 0xfe80 } @@ -493,7 +493,7 @@ pub fn get_ipv6_ipaddrs(interface: Option) -> Vec { } #[cfg(target_os = "linux")] -fn set_bind_to_device(socket: std::os::raw::c_int, iface: &Option) { +fn set_bind_to_device(socket: std::os::raw::c_int, iface: Option<&str>) { if let Some(iface) = iface { // @TODO: switch to bind_device after tokio porting log::debug!("Listen at the interface: {}", iface); @@ -510,34 +510,34 @@ fn set_bind_to_device(socket: std::os::raw::c_int, iface: &Option) { } #[cfg(target_os = "linux")] -pub fn set_bind_to_device_tcp_listener(socket: &TcpListener, iface: &Option) { +pub fn set_bind_to_device_tcp_listener(socket: &TcpListener, iface: Option<&str>) { use std::os::fd::AsRawFd; set_bind_to_device(socket.as_raw_fd(), iface); } #[cfg(target_os = "linux")] -pub fn set_bind_to_device_tcp_stream(socket: &TcpStream, iface: &Option) { +pub fn set_bind_to_device_tcp_stream(socket: &TcpStream, iface: Option<&str>) { use std::os::fd::AsRawFd; set_bind_to_device(socket.as_raw_fd(), iface); } #[cfg(target_os = "linux")] -pub fn set_bind_to_device_udp_socket(socket: &UdpSocket, iface: &Option) { +pub fn set_bind_to_device_udp_socket(socket: &UdpSocket, iface: Option<&str>) { use std::os::fd::AsRawFd; set_bind_to_device(socket.as_raw_fd(), iface); } #[cfg(any(target_os = "macos", target_os = "windows"))] -pub fn set_bind_to_device_tcp_listener(_socket: &TcpListener, _iface: &Option) { +pub fn set_bind_to_device_tcp_listener(_socket: &TcpListener, _iface: Option<&str>) { log::warn!("Listen at the interface is not supported for this platform"); } #[cfg(any(target_os = "macos", target_os = "windows"))] -pub fn set_bind_to_device_tcp_stream(_socket: &TcpStream, _iface: &Option) { +pub fn set_bind_to_device_tcp_stream(_socket: &TcpStream, _iface: Option<&str>) { log::warn!("Listen at the interface is not supported for this platform"); } #[cfg(any(target_os = "macos", target_os = "windows"))] -pub fn set_bind_to_device_udp_socket(_socket: &UdpSocket, _iface: &Option) { +pub fn set_bind_to_device_udp_socket(_socket: &UdpSocket, _iface: Option<&str>) { log::warn!("Listen at the interface is not supported for this platform"); } diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index 89b8d9e918..7cf294de8a 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -111,11 +111,8 @@ impl ListenersUnicastIP { let guard = zread!(self.listeners); for (key, value) in guard.iter() { let (kip, kpt) = (key.ip(), key.port()); - let iface = value - .endpoint - .config() - .get(BIND_INTERFACE) - .map(|s| s.to_string()); + let config = value.endpoint.config(); + let iface = config.get(BIND_INTERFACE); // Either ipv4/0.0.0.0 or ipv6/[::] if kip.is_unspecified() { diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 25f9603995..b01d8be22e 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -199,7 +199,7 @@ impl LinkManagerUnicastTcp { async fn new_link_inner( &self, dst_addr: &SocketAddr, - iface: &Option, + iface: Option<&str>, ) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> { let stream = TcpStream::connect(dst_addr) .await @@ -221,7 +221,7 @@ impl LinkManagerUnicastTcp { async fn new_listener_inner( &self, addr: &SocketAddr, - iface: &Option, + iface: Option<&str>, ) -> ZResult<(TcpListener, SocketAddr)> { // Bind the TCP socket let socket = TcpListener::bind(addr) @@ -242,11 +242,12 @@ impl LinkManagerUnicastTcp { impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_link(&self, endpoint: EndPoint) -> ZResult { let dst_addrs = get_tcp_addrs(endpoint.address()).await?; - let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); + let config = endpoint.config(); + let iface = config.get(BIND_INTERFACE); let mut errs: Vec = vec![]; for da in dst_addrs { - match self.new_link_inner(&da, &iface).await { + match self.new_link_inner(&da, iface).await { Ok((stream, src_addr, dst_addr)) => { let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); return Ok(LinkUnicast(link)); @@ -270,11 +271,12 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; - let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); + let config = endpoint.config(); + let iface = config.get(BIND_INTERFACE); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da, &iface).await { + match self.new_listener_inner(&da, iface).await { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new( diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 53bcc95c9f..d5214510be 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -261,7 +261,7 @@ impl LinkManagerUnicastUdp { async fn new_link_inner( &self, dst_addr: &SocketAddr, - iface: &Option, + iface: Option<&str>, ) -> ZResult<(UdpSocket, SocketAddr, SocketAddr)> { // Establish a UDP socket let socket = UdpSocket::bind(SocketAddr::new( @@ -307,7 +307,7 @@ impl LinkManagerUnicastUdp { async fn new_listener_inner( &self, addr: &SocketAddr, - iface: &Option, + iface: Option<&str>, ) -> ZResult<(UdpSocket, SocketAddr)> { // Bind the UDP socket let socket = UdpSocket::bind(addr).await.map_err(|e| { @@ -334,11 +334,12 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { let dst_addrs = get_udp_addrs(endpoint.address()) .await? .filter(|a| !a.ip().is_multicast()); - let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); + let config = endpoint.config(); + let iface = config.get(BIND_INTERFACE); let mut errs: Vec = vec![]; for da in dst_addrs { - match self.new_link_inner(&da, &iface).await { + match self.new_link_inner(&da, iface).await { Ok((socket, src_addr, dst_addr)) => { // Create UDP link let link = Arc::new(LinkUnicastUdp::new( @@ -372,11 +373,12 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { let addrs = get_udp_addrs(endpoint.address()) .await? .filter(|a| !a.ip().is_multicast()); - let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); + let config = endpoint.config(); + let iface = config.get(BIND_INTERFACE); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da, &iface).await { + match self.new_listener_inner(&da, iface).await { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new(