Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TCP buffer sizes configuration #1629

Merged
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,12 @@
// note that mTLS (client authentication) is required for a listener to disconnect a client on expiration
close_link_on_expiration: false,
},
/// Optional configuration for TCP system buffers sizes. Applies to TCP and TLS links.
///
/// Configure TCP read buffer size (bytes)
// tcp_rx_buffer: 123456,
/// Configure TCP write buffer size (bytes)
// tcp_tx_buffer: 123456,
},
/// Shared memory configuration.
/// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise
Expand Down
4 changes: 4 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ validated_struct::validator! {
UnixPipeConf {
file_access_mask: Option<u32>
},
/// Configure TCP read buffer size
pub tcp_rx_buffer: Option<u32>,
/// Configure TCP write buffer size
pub tcp_tx_buffer: Option<u32>,
},
pub shared_memory:
ShmConf {
Expand Down
3 changes: 3 additions & 0 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate alloc;

mod listener;
mod multicast;
pub mod tcp;
#[cfg(feature = "tls")]
pub mod tls;
mod unicast;
Expand All @@ -44,6 +45,8 @@ use zenoh_result::ZResult;
/*************************************/

pub const BIND_INTERFACE: &str = "iface";
pub const TCP_TX_BUFFER_SIZE: &str = "tcp_tx_buffer";
pub const TCP_RX_BUFFER_SIZE: &str = "tcp_rx_buffer";

#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)]
pub struct Link {
Expand Down
100 changes: 100 additions & 0 deletions io/zenoh-link-commons/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::net::SocketAddr;

use tokio::net::{TcpListener, TcpSocket, TcpStream};
use zenoh_result::{zerror, ZResult};

pub struct TcpSocketConfig<'a> {
tx_buffer_size: Option<u32>,
rx_buffer_size: Option<u32>,
iface: Option<&'a str>,
}

impl<'a> TcpSocketConfig<'a> {
pub fn new(
tx_buffer_size: Option<u32>,
rx_buffer_size: Option<u32>,
iface: Option<&'a str>,
) -> Self {
Self {
tx_buffer_size,
rx_buffer_size,
iface,
}
}

/// Build a new TCPListener bound to `addr` with the given configuration parameters
pub fn new_listener(&self, addr: &SocketAddr) -> ZResult<(TcpListener, SocketAddr)> {
let socket = self.socket_with_config(addr)?;
// Build a TcpListener from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
socket.set_reuseaddr(true)?;
socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?;
// backlog (the maximum number of pending connections are queued): 1024
let listener = socket
.listen(1024)
.map_err(|e| zerror!("{}: {}", addr, e))?;

let local_addr = listener
.local_addr()
.map_err(|e| zerror!("{}: {}", addr, e))?;

Ok((listener, local_addr))
}

/// Connect to a TCP socket address at `dst_addr` with the given configuration parameters
pub async fn new_link(
&self,
dst_addr: &SocketAddr,
) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> {
let socket = self.socket_with_config(dst_addr)?;
// Build a TcpStream from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
let stream = socket
.connect(*dst_addr)
.await
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

let src_addr = stream
.local_addr()
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

let dst_addr = stream
.peer_addr()
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

Ok((stream, src_addr, dst_addr))
}

/// Creates a TcpSocket with the provided config
fn socket_with_config(&self, addr: &SocketAddr) -> ZResult<TcpSocket> {
let socket = match addr {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}?;

if let Some(iface) = self.iface {
zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?;
}
if let Some(size) = self.tx_buffer_size {
socket.set_send_buffer_size(size)?;
}
if let Some(size) = self.rx_buffer_size {
socket.set_recv_buffer_size(size)?;
}

Ok(socket)
}
}
13 changes: 12 additions & 1 deletion io/zenoh-link/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use zenoh_link_serial::{LinkManagerUnicastSerial, SerialLocatorInspector, SERIAL
#[cfg(feature = "transport_tcp")]
pub use zenoh_link_tcp as tcp;
#[cfg(feature = "transport_tcp")]
use zenoh_link_tcp::{LinkManagerUnicastTcp, TcpLocatorInspector, TCP_LOCATOR_PREFIX};
use zenoh_link_tcp::{
LinkManagerUnicastTcp, TcpConfigurator, TcpLocatorInspector, TCP_LOCATOR_PREFIX,
};
#[cfg(feature = "transport_tls")]
pub use zenoh_link_tls as tls;
#[cfg(feature = "transport_tls")]
Expand Down Expand Up @@ -172,6 +174,8 @@ impl LocatorInspector {
}
#[derive(Default)]
pub struct LinkConfigurator {
#[cfg(feature = "transport_tcp")]
tcp_inspector: TcpConfigurator,
#[cfg(feature = "transport_quic")]
quic_inspector: QuicConfigurator,
#[cfg(feature = "transport_tls")]
Expand Down Expand Up @@ -199,6 +203,13 @@ impl LinkConfigurator {
errors.insert(proto, e);
}
};
#[cfg(feature = "transport_tcp")]
{
insert_config(
TCP_LOCATOR_PREFIX.into(),
self.tcp_inspector.inspect_config(config),
);
}
#[cfg(feature = "transport_quic")]
{
insert_config(
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-links/zenoh-link-tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ socket2 = { workspace = true }
tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = {workspace = true}
zenoh-config = { workspace = true }
zenoh-core = { workspace = true }
zenoh-link-commons = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
2 changes: 2 additions & 0 deletions io/zenoh-links/zenoh-link-tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use zenoh_protocol::{
use zenoh_result::{zerror, ZResult};

mod unicast;
mod utils;
pub use unicast::*;
pub use utils::TcpConfigurator;

// Default MTU (TCP PDU) in bytes.
// NOTE: Since TCP is a byte-stream oriented transport, theoretically it has
Expand Down
92 changes: 18 additions & 74 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ use std::{cell::UnsafeCell, convert::TryInto, fmt, net::SocketAddr, sync::Arc, t
use async_trait::async_trait;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpSocket, TcpStream},
net::{TcpListener, TcpStream},
};
use tokio_util::sync::CancellationToken;
use zenoh_link_commons::{
get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE,
get_ip_interface_names, tcp::TcpSocketConfig, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
transport::BatchSize,
};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};

use super::{
get_tcp_addrs, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU, TCP_LINGER_TIMEOUT,
TCP_LOCATOR_PREFIX,
use crate::{
get_tcp_addrs, utils::TcpLinkConfig, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU,
TCP_LINGER_TIMEOUT, TCP_LOCATOR_PREFIX,
};

pub struct LinkUnicastTcp {
Expand Down Expand Up @@ -241,80 +241,22 @@ impl LinkManagerUnicastTcp {
}
}

impl LinkManagerUnicastTcp {
async fn new_link_inner(
&self,
dst_addr: &SocketAddr,
iface: Option<&str>,
) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> {
let socket = match dst_addr {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}?;

if let Some(iface) = iface {
zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?;
}

// Build a TcpStream from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
let stream = socket
.connect(*dst_addr)
.await
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

let src_addr = stream
.local_addr()
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

let dst_addr = stream
.peer_addr()
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

Ok((stream, src_addr, dst_addr))
}

async fn new_listener_inner(
&self,
addr: &SocketAddr,
iface: Option<&str>,
) -> ZResult<(TcpListener, SocketAddr)> {
let socket = match addr {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}?;

if let Some(iface) = iface {
zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?;
}

// Build a TcpListener from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
socket.set_reuseaddr(true)?;
socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?;
// backlog (the maximum number of pending connections are queued): 1024
let listener = socket
.listen(1024)
.map_err(|e| zerror!("{}: {}", addr, e))?;

let local_addr = listener
.local_addr()
.map_err(|e| zerror!("{}: {}", addr, e))?;

Ok((listener, local_addr))
}
}

#[async_trait]
impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
async fn new_link(&self, endpoint: EndPoint) -> ZResult<LinkUnicast> {
let dst_addrs = get_tcp_addrs(endpoint.address()).await?;
let config = endpoint.config();
let iface = config.get(BIND_INTERFACE);

let link_config = TcpLinkConfig::new(&config)?;
let socket_config = TcpSocketConfig::new(
link_config.tx_buffer_size,
link_config.rx_buffer_size,
link_config.bind_iface,
);

let mut errs: Vec<ZError> = vec![];
for da in dst_addrs {
match self.new_link_inner(&da, iface).await {
match socket_config.new_link(&da).await {
Ok((stream, src_addr, dst_addr)) => {
let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr));
return Ok(LinkUnicast(link));
Expand All @@ -339,11 +281,13 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult<Locator> {
let addrs = get_tcp_addrs(endpoint.address()).await?;
let config = endpoint.config();
let iface = config.get(BIND_INTERFACE);

let link_config = TcpLinkConfig::new(&config)?;
let socket_config: TcpSocketConfig<'_> = link_config.into();

let mut errs: Vec<ZError> = vec![];
for da in addrs {
match self.new_listener_inner(&da, iface).await {
match socket_config.new_listener(&da) {
Ok((socket, local_addr)) => {
// Update the endpoint locator address
endpoint = EndPoint::new(
Expand Down
Loading
Loading