From 64002016abd15e95a5e582141d232071d80636db Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 23 Feb 2024 14:32:41 +0100 Subject: [PATCH] Add binding to the interface --- commons/zenoh-util/src/std_only/net/mod.rs | 25 +++++++++----- io/zenoh-link-commons/src/lib.rs | 3 ++ io/zenoh-link-commons/src/listener.rs | 11 +++++-- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 34 ++++++++++++-------- io/zenoh-links/zenoh-link-ws/src/unicast.rs | 4 +-- 5 files changed, 51 insertions(+), 26 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 67e732d3ee..810e4428f8 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -210,12 +210,19 @@ pub fn get_multicast_interfaces() -> Vec { } } -pub fn get_local_addresses() -> ZResult> { +pub fn get_local_addresses(interface: Option) -> ZResult> { #[cfg(unix)] { Ok(pnet_datalink::interfaces() .into_iter() - .filter(|iface| iface.is_up() && iface.is_running()) + .filter(|iface| { + if let Some(interface) = interface.clone() { + if iface.name != interface { + return false; + } + } + iface.is_up() && iface.is_running() + }) .flat_map(|iface| iface.ips) .map(|ipnet| ipnet.ip()) .collect()) @@ -412,25 +419,26 @@ pub fn get_interface_names_by_addr(addr: IpAddr) -> ZResult> { } } -pub fn get_ipv4_ipaddrs() -> Vec { - get_local_addresses() +pub fn get_ipv4_ipaddrs(interface: Option) -> Vec { + get_local_addresses(interface) .unwrap_or_else(|_| vec![]) .drain(..) .filter_map(|x| match x { IpAddr::V4(a) => Some(a), IpAddr::V6(_) => None, }) - .filter(|x| !x.is_loopback() && !x.is_multicast()) + .filter(|x| !x.is_multicast()) + // .filter(|x| !x.is_loopback() && !x.is_multicast()) // TODO(sashacmc): Why we exclude loopback? .map(IpAddr::V4) .collect() } -pub fn get_ipv6_ipaddrs() -> Vec { +pub fn get_ipv6_ipaddrs(interface: Option) -> Vec { const fn is_unicast_link_local(addr: &Ipv6Addr) -> bool { (addr.segments()[0] & 0xffc0) == 0xfe80 } - let ipaddrs = get_local_addresses().unwrap_or_else(|_| vec![]); + let ipaddrs = get_local_addresses(interface).unwrap_or_else(|_| vec![]); // Get first all IPv4 addresses let ipv4_iter = ipaddrs @@ -440,7 +448,8 @@ pub fn get_ipv6_ipaddrs() -> Vec { IpAddr::V6(_) => None, }) .filter(|x| { - !x.is_loopback() && !x.is_link_local() && !x.is_multicast() && !x.is_broadcast() + // !x.is_loopback() && !x.is_link_local() && !x.is_multicast() && !x.is_broadcast() // TODO(sashacmc): Why we exclude loopback? + !x.is_multicast() && !x.is_broadcast() }); // Get next all IPv6 addresses diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 2ee28c3f08..0a43aac3d9 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -36,6 +36,9 @@ use zenoh_result::ZResult; /*************************************/ /* GENERAL */ /*************************************/ + +pub const BIND_INTERFACE: &str = "iface"; + #[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] pub struct Link { pub src: Locator, diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index 1d5d7bb172..89b8d9e918 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -23,6 +23,8 @@ use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::Signal; +use crate::BIND_INTERFACE; + pub struct ListenerUnicastIP { endpoint: EndPoint, active: Arc, @@ -109,12 +111,17 @@ impl ListenersUnicastIP { let guard = zread!(self.listeners); for (key, value) in guard.iter() { let (kip, kpt) = (key.ip(), key.port()); + let iface = value + .endpoint + .config() + .get(BIND_INTERFACE) + .map(|s| s.to_string()); // Either ipv4/0.0.0.0 or ipv6/[::] if kip.is_unspecified() { let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), + IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(iface), + IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(iface), }; let iter = addrs.drain(..).map(|x| { Locator::new( diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 175d3eb9d7..cfad120d84 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::time::Duration; use zenoh_link_commons::{ get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, + ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -216,22 +216,28 @@ impl LinkManagerUnicastTcp { Ok((stream, src_addr, dst_addr)) } - async fn new_listener_inner(&self, addr: &SocketAddr) -> ZResult<(TcpListener, SocketAddr)> { + async fn new_listener_inner( + &self, + addr: &SocketAddr, + iface: &Option, + ) -> ZResult<(TcpListener, SocketAddr)> { // Bind the TCP socket let socket = TcpListener::bind(addr) .await .map_err(|e| zerror!("{}: {}", addr, e))?; - //let iface = "wg0"; - let iface = "lo"; - unsafe { - libc::setsockopt( - socket.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_BINDTODEVICE, - iface.as_ptr() as *const std::os::raw::c_void, - iface.len() as libc::socklen_t, - ); + if let Some(iface) = iface { + // @TODO: switch to bind_device after tokio porting + log::debug!("Listen at the interface: {}", iface); + unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_BINDTODEVICE, + iface.as_ptr() as *const std::os::raw::c_void, + iface.len() as libc::socklen_t, + ); + } } let local_addr = socket @@ -273,10 +279,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; - + let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string()); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da).await { + match self.new_listener_inner(&da, &iface).await { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new( diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 4276e2bfaf..0ff1b1ab46 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -416,7 +416,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { for (key, value) in guard.iter() { let listener_locator = value.endpoint.to_locator(); if key.ip() == default_ipv4 { - match zenoh_util::net::get_local_addresses() { + match zenoh_util::net::get_local_addresses(None) { Ok(ipaddrs) => { for ipaddr in ipaddrs { if !ipaddr.is_loopback() && !ipaddr.is_multicast() && ipaddr.is_ipv4() { @@ -433,7 +433,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { Err(err) => log::error!("Unable to get local addresses: {}", err), } } else if key.ip() == default_ipv6 { - match zenoh_util::net::get_local_addresses() { + match zenoh_util::net::get_local_addresses(None) { Ok(ipaddrs) => { for ipaddr in ipaddrs { if !ipaddr.is_loopback() && !ipaddr.is_multicast() && ipaddr.is_ipv6() {