Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interface bound tcp link #558

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions io/zenoh-links/zenoh-link-tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-sync = { workspace = true }
zenoh-util = { workspace = true }
socket2 = {workspace = true}
63 changes: 61 additions & 2 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,39 @@
impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
async fn new_link(&self, endpoint: EndPoint) -> ZResult<LinkUnicast> {
let dst_addrs = get_tcp_addrs(endpoint.address()).await?;
let config = endpoint.config();

Check failure on line 259 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `config`

Check warning on line 259 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

unused variable: `config`

Check failure on line 259 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `config`

Check warning on line 259 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

unused variable: `config`
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
let iface = config.get("iface").map(|s| s.as_bytes());

let mut errs: Vec<ZError> = vec![];

for da in dst_addrs {
match self.new_link_inner(&da).await {
// bind_device is only supported on Android, Fushia, and Linux
// https://docs.rs/socket2/latest/x86_64-unknown-linux-gnu/socket2/struct.Socket.html#method.bind_device
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
let connection = {
// Since the building of async_std::net::TcpStream from socket2 is synchronous, let's separate the cases
// for the sake of the performance
if iface.is_some() {
let socket = socket2::Socket::new(
socket2::Domain::for_address(da),
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
socket.connect(&da.into())?;
socket.bind_device(iface)?;
let stream = TcpStream::from(std::net::TcpStream::from(socket));
let src_addr = stream.local_addr().map_err(|e| zerror!("{}: {}", da, e))?;
let dst_addr = stream.peer_addr().map_err(|e| zerror!("{}: {}", da, e))?;
Ok((stream, src_addr, dst_addr))
} else {
self.new_link_inner(&da).await
}
};
#[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
let connection = self.new_link_inner(&da).await;

match connection {
Ok((stream, src_addr, dst_addr)) => {
let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr));
return Ok(LinkUnicast(link));
Expand All @@ -283,10 +312,40 @@

async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult<Locator> {
let addrs = get_tcp_addrs(endpoint.address()).await?;
let config = endpoint.config();

Check failure on line 315 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `config`

Check warning on line 315 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

unused variable: `config`

Check warning on line 315 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

unused variable: `config`

Check failure on line 315 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `config`

Check warning on line 315 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

unused variable: `config`

Check warning on line 315 in io/zenoh-links/zenoh-link-tcp/src/unicast.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

unused variable: `config`
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
let iface = config.get("iface").map(|s| s.as_bytes());

let mut errs: Vec<ZError> = vec![];
for da in addrs {
match self.new_listener_inner(&da).await {
// bind_device is only supported on Android, Fushia, and Linux
// https://docs.rs/socket2/latest/x86_64-unknown-linux-gnu/socket2/struct.Socket.html#method.bind_device
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
let connection = {
// Since the building of async_std::net::TcpListener from socket2 is synchronous, let's separate the cases
// for the sake of the performance
if iface.is_some() {
let socket = socket2::Socket::new(
socket2::Domain::for_address(da),
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
socket.bind(&da.into())?;
socket.bind_device(iface)?;
socket.listen(128)?;
let listener = TcpListener::from(std::net::TcpListener::from(socket));
let local_addr = listener
.local_addr()
.map_err(|e| zerror!("{}: {}", da, e))?;
Ok((listener, local_addr))
} else {
self.new_listener_inner(&da).await
}
};
#[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
let connection = self.new_listener_inner(&da).await;

match connection {
Ok((socket, local_addr)) => {
// Update the endpoint locator address
endpoint = EndPoint::new(
Expand Down
Loading