diff --git a/Cargo.lock b/Cargo.lock index e945a9e637..bac7a20048 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4718,6 +4718,7 @@ dependencies = [ "async-std", "async-trait", "log", + "socket2 0.5.4", "zenoh-core", "zenoh-link-commons", "zenoh-protocol", diff --git a/io/zenoh-links/zenoh-link-tcp/Cargo.toml b/io/zenoh-links/zenoh-link-tcp/Cargo.toml index 9c4725ff03..ebed708da3 100644 --- a/io/zenoh-links/zenoh-link-tcp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tcp/Cargo.toml @@ -34,3 +34,4 @@ zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-sync = { workspace = true } zenoh-util = { workspace = true } +socket2 = {workspace = true} diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 3960b91228..5a688f69fe 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -256,10 +256,39 @@ impl LinkManagerUnicastTcp { impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_link(&self, endpoint: EndPoint) -> ZResult { let dst_addrs = get_tcp_addrs(endpoint.address()).await?; + let config = endpoint.config(); + #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] + let iface = config.get("iface").map(|s| s.as_bytes()); let mut errs: Vec = vec![]; + for da in dst_addrs { - match self.new_link_inner(&da).await { + // bind_device is only supported on Android, Fushia, and Linux + // https://docs.rs/socket2/latest/x86_64-unknown-linux-gnu/socket2/struct.Socket.html#method.bind_device + #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] + let connection = { + // Since the building of async_std::net::TcpStream from socket2 is synchronous, let's separate the cases + // for the sake of the performance + if iface.is_some() { + let socket = socket2::Socket::new( + socket2::Domain::for_address(da), + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )?; + socket.connect(&da.into())?; + socket.bind_device(iface)?; + let stream = TcpStream::from(std::net::TcpStream::from(socket)); + let src_addr = stream.local_addr().map_err(|e| zerror!("{}: {}", da, e))?; + let dst_addr = stream.peer_addr().map_err(|e| zerror!("{}: {}", da, e))?; + Ok((stream, src_addr, dst_addr)) + } else { + self.new_link_inner(&da).await + } + }; + #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))] + let connection = self.new_link_inner(&da).await; + + match connection { Ok((stream, src_addr, dst_addr)) => { let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); return Ok(LinkUnicast(link)); @@ -283,10 +312,40 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; + let config = endpoint.config(); + #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] + let iface = config.get("iface").map(|s| s.as_bytes()); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da).await { + // bind_device is only supported on Android, Fushia, and Linux + // https://docs.rs/socket2/latest/x86_64-unknown-linux-gnu/socket2/struct.Socket.html#method.bind_device + #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] + let connection = { + // Since the building of async_std::net::TcpListener from socket2 is synchronous, let's separate the cases + // for the sake of the performance + if iface.is_some() { + let socket = socket2::Socket::new( + socket2::Domain::for_address(da), + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )?; + socket.bind(&da.into())?; + socket.bind_device(iface)?; + socket.listen(128)?; + let listener = TcpListener::from(std::net::TcpListener::from(socket)); + let local_addr = listener + .local_addr() + .map_err(|e| zerror!("{}: {}", da, e))?; + Ok((listener, local_addr)) + } else { + self.new_listener_inner(&da).await + } + }; + #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))] + let connection = self.new_listener_inner(&da).await; + + match connection { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new(