From 4b8f2a6e66ad70c5b81987d0e962f4b7ae6ba606 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Thu, 28 Sep 2023 14:48:28 +0800 Subject: [PATCH 1/5] Add interface bound tcp link --- Cargo.lock | 1 + io/zenoh-links/zenoh-link-tcp/Cargo.toml | 1 + io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 49 +++++++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e945a9e637..bac7a20048 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4718,6 +4718,7 @@ dependencies = [ "async-std", "async-trait", "log", + "socket2 0.5.4", "zenoh-core", "zenoh-link-commons", "zenoh-protocol", diff --git a/io/zenoh-links/zenoh-link-tcp/Cargo.toml b/io/zenoh-links/zenoh-link-tcp/Cargo.toml index 9c4725ff03..ebed708da3 100644 --- a/io/zenoh-links/zenoh-link-tcp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tcp/Cargo.toml @@ -34,3 +34,4 @@ zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-sync = { workspace = true } zenoh-util = { workspace = true } +socket2 = {workspace = true} diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 3960b91228..b4aa02ffa7 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -256,10 +256,32 @@ impl LinkManagerUnicastTcp { impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_link(&self, endpoint: EndPoint) -> ZResult { let dst_addrs = get_tcp_addrs(endpoint.address()).await?; + let config = endpoint.config(); + let iface = config.get("iface").map(|s| s.as_bytes()); let mut errs: Vec = vec![]; + for da in dst_addrs { - match self.new_link_inner(&da).await { + let connection = { + // Since the building of async_std::net::TcpStream from socket2 is synchronous, let's separate the cases + // for the sake of the performance + if iface.is_some() { + let socket = socket2::Socket::new( + socket2::Domain::for_address(da), + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )?; + socket.connect(&da.into())?; + socket.bind_device(iface)?; + let stream = TcpStream::from(std::net::TcpStream::from(socket)); + let src_addr = stream.local_addr().map_err(|e| zerror!("{}: {}", da, e))?; + let dst_addr = stream.peer_addr().map_err(|e| zerror!("{}: {}", da, e))?; + Ok((stream, src_addr, dst_addr)) + } else { + self.new_link_inner(&da).await + } + }; + match connection { Ok((stream, src_addr, dst_addr)) => { let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); return Ok(LinkUnicast(link)); @@ -283,10 +305,33 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; + let config = endpoint.config(); + let iface = config.get("iface").map(|s| s.as_bytes()); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da).await { + let connection = { + // Since the building of async_std::net::TcpListener from socket2 is synchronous, let's separate the cases + // for the sake of the performance + if iface.is_some() { + let socket = socket2::Socket::new( + socket2::Domain::for_address(da), + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )?; + socket.bind(&da.into())?; + socket.bind_device(iface)?; + socket.listen(128)?; + let listener = TcpListener::from(std::net::TcpListener::from(socket)); + let local_addr = listener + .local_addr() + .map_err(|e| zerror!("{}: {}", da, e))?; + Ok((listener, local_addr)) + } else { + self.new_listener_inner(&da).await + } + }; + match connection { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new( From 086991a833d0ed1d54107cec04c6afe35f5e1601 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Tue, 3 Oct 2023 17:21:31 +0800 Subject: [PATCH 2/5] Set this feature to linux only for the time being --- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index b4aa02ffa7..ad62a9ee9a 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -310,6 +310,9 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { let mut errs: Vec = vec![]; for da in addrs { + // bind_device is only supported on Android, Fushia, and Linux + // https://docs.rs/socket2/latest/x86_64-unknown-linux-gnu/socket2/struct.Socket.html#method.bind_device + #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] let connection = { // Since the building of async_std::net::TcpListener from socket2 is synchronous, let's separate the cases // for the sake of the performance @@ -331,6 +334,9 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { self.new_listener_inner(&da).await } }; + #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))] + let connection = self.new_listener_inner(&da).await?; + match connection { Ok((socket, local_addr)) => { // Update the endpoint locator address From 467cbe409eaeb12b80ec6805e662a3ef537b3503 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Tue, 3 Oct 2023 17:32:38 +0800 Subject: [PATCH 3/5] Fix the wrong result type --- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index ad62a9ee9a..dc9e210304 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -335,7 +335,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { } }; #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))] - let connection = self.new_listener_inner(&da).await?; + let connection = self.new_listener_inner(&da).await; match connection { Ok((socket, local_addr)) => { From 52d49fb6e02d10cdc6cfd70cfd6b3e5811bf8af4 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Tue, 3 Oct 2023 19:53:41 +0800 Subject: [PATCH 4/5] Fix the missing part --- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index dc9e210304..fbc074c067 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -262,6 +262,9 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { let mut errs: Vec = vec![]; for da in dst_addrs { + // bind_device is only supported on Android, Fushia, and Linux + // https://docs.rs/socket2/latest/x86_64-unknown-linux-gnu/socket2/struct.Socket.html#method.bind_device + #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] let connection = { // Since the building of async_std::net::TcpStream from socket2 is synchronous, let's separate the cases // for the sake of the performance @@ -281,6 +284,9 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { self.new_link_inner(&da).await } }; + #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))] + let connection = self.new_link_inner(&da).await; + match connection { Ok((stream, src_addr, dst_addr)) => { let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); From ec1b5a71adba0ea41be003f6746f3dc9c4bac611 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Tue, 3 Oct 2023 21:10:40 +0800 Subject: [PATCH 5/5] Quickfix --- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index fbc074c067..5a688f69fe 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -257,6 +257,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_link(&self, endpoint: EndPoint) -> ZResult { let dst_addrs = get_tcp_addrs(endpoint.address()).await?; let config = endpoint.config(); + #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] let iface = config.get("iface").map(|s| s.as_bytes()); let mut errs: Vec = vec![]; @@ -312,6 +313,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; let config = endpoint.config(); + #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] let iface = config.get("iface").map(|s| s.as_bytes()); let mut errs: Vec = vec![];