Skip to content

Commit

Permalink
Add UDP support, fix windows/macos build
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 23, 2024
1 parent 6400201 commit cb88fb9
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions io/zenoh-link-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async-trait = { workspace = true }
flume = { workspace = true }
lz4_flex = { workspace = true }
log = { workspace = true }
libc = { workspace = true }
serde = { workspace = true, features = ["default"] }
typenum = { workspace = true }
zenoh-buffers = { workspace = true }
Expand Down
24 changes: 24 additions & 0 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,27 @@ pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec<String> {
}
}
}

pub fn set_bind_to_device(socket: std::os::raw::c_int, iface: &Option<String>) {
#[cfg(target_os = "linux")]
{
if let Some(iface) = iface {
// @TODO: switch to bind_device after tokio porting
log::debug!("Listen at the interface: {}", iface);
unsafe {
libc::setsockopt(
socket,
libc::SOL_SOCKET,
libc::SO_BINDTODEVICE,
iface.as_ptr() as *const std::os::raw::c_void,
iface.len() as libc::socklen_t,
);
}
}
}

#[cfg(any(target_os = "macos", target_os = "windows"))]
{
log::warn!("Listen at the interface is not supported for this platform");
}
}
1 change: 0 additions & 1 deletion io/zenoh-links/zenoh-link-tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ description = "Internal crate for zenoh."
async-std = { workspace = true }
async-trait = { workspace = true }
log = { workspace = true }
libc = { workspace = true }
zenoh-core = { workspace = true }
zenoh-link-commons = { workspace = true }
zenoh-protocol = { workspace = true }
Expand Down
18 changes: 3 additions & 15 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use zenoh_link_commons::{
get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE,
get_ip_interface_names, set_bind_to_device, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};
Expand Down Expand Up @@ -226,19 +226,7 @@ impl LinkManagerUnicastTcp {
.await
.map_err(|e| zerror!("{}: {}", addr, e))?;

if let Some(iface) = iface {
// @TODO: switch to bind_device after tokio porting
log::debug!("Listen at the interface: {}", iface);
unsafe {
libc::setsockopt(
socket.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_BINDTODEVICE,
iface.as_ptr() as *const std::os::raw::c_void,
iface.len() as libc::socklen_t,
);
}
}
set_bind_to_device(socket.as_raw_fd(), iface);

let local_addr = socket
.local_addr()
Expand Down
17 changes: 13 additions & 4 deletions io/zenoh-links/zenoh-link-udp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use async_std::task;
use async_trait::async_trait;
use std::collections::HashMap;
use std::fmt;
use std::os::fd::AsRawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use zenoh_core::{zasynclock, zlock};
use zenoh_link_commons::{
get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender,
get_ip_interface_names, set_bind_to_device, ConstructibleLinkManagerUnicast,
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP,
NewLinkChannelSender, BIND_INTERFACE,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};
Expand Down Expand Up @@ -301,14 +303,20 @@ impl LinkManagerUnicastUdp {
Ok((socket, src_addr, dst_addr))
}

async fn new_listener_inner(&self, addr: &SocketAddr) -> ZResult<(UdpSocket, SocketAddr)> {
async fn new_listener_inner(
&self,
addr: &SocketAddr,
iface: &Option<String>,
) -> ZResult<(UdpSocket, SocketAddr)> {
// Bind the UDP socket
let socket = UdpSocket::bind(addr).await.map_err(|e| {
let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e);
log::warn!("{}", e);
e
})?;

set_bind_to_device(socket.as_raw_fd(), iface);

let local_addr = socket.local_addr().map_err(|e| {
let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e);
log::warn!("{}", e);
Expand Down Expand Up @@ -362,10 +370,11 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp {
let addrs = get_udp_addrs(endpoint.address())
.await?
.filter(|a| !a.ip().is_multicast());
let iface = endpoint.config().get(BIND_INTERFACE).map(|s| s.to_string());

let mut errs: Vec<ZError> = vec![];
for da in addrs {
match self.new_listener_inner(&da).await {
match self.new_listener_inner(&da, &iface).await {
Ok((socket, local_addr)) => {
// Update the endpoint locator address
endpoint = EndPoint::new(
Expand Down

0 comments on commit cb88fb9

Please sign in to comment.