From d0255d64d287f09b59fc22a2ad40ff789d54fa23 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 13 Nov 2024 22:33:33 +0000 Subject: [PATCH 1/6] update socket config and create builder pattern --- bench-streamer/src/main.rs | 5 +- connection-cache/Cargo.toml | 1 + connection-cache/src/connection_cache.rs | 15 +- gossip/src/cluster_info.rs | 183 ++++++++++------ net-utils/src/lib.rs | 244 +++++++++++++++++---- programs/sbf/Cargo.lock | 1 + quic-client/src/nonblocking/quic_client.rs | 6 +- svm/examples/Cargo.lock | 1 + udp-client/src/lib.rs | 8 +- udp-client/src/nonblocking/udp_client.rs | 9 +- 10 files changed, 361 insertions(+), 112 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index cba5486fc5bcb9..da203e752752eb 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -3,7 +3,7 @@ use { clap::{crate_description, crate_name, Arg, Command}, crossbeam_channel::unbounded, - solana_net_utils::bind_to_unspecified, + solana_net_utils::{bind_to_unspecified, SocketConfig}, solana_streamer::{ packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats}, @@ -95,9 +95,10 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); let recycler = PacketBatchRecycler::default(); - let (_port, read_sockets) = solana_net_utils::multi_bind_in_range( + let (_port, read_sockets) = solana_net_utils::multi_bind_in_range_with_config( ip_addr, (port, port + num_sockets as u16), + SocketConfig::default().reuseport(true), num_sockets, ) .unwrap(); diff --git a/connection-cache/Cargo.toml b/connection-cache/Cargo.toml index c246526e16d559..a1733af0773e99 100644 --- a/connection-cache/Cargo.toml +++ b/connection-cache/Cargo.toml @@ -22,6 +22,7 @@ rayon = { workspace = true } solana-keypair = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-time-utils = { workspace = true } solana-transaction-error = { workspace = true } thiserror = { workspace = true } diff --git a/connection-cache/src/connection_cache.rs b/connection-cache/src/connection_cache.rs index 8e4363117d361b..0e6e51c4b8d49a 100644 --- a/connection-cache/src/connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -514,6 +514,7 @@ mod tests { async_trait::async_trait, rand::{Rng, SeedableRng}, rand_chacha::ChaChaRng, + solana_net_utils::SocketConfig, solana_transaction_error::TransportResult, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, @@ -571,8 +572,11 @@ mod tests { fn default() -> Self { Self { udp_socket: Arc::new( - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .expect("Unable to bind to UDP socket"), + solana_net_utils::bind_with_any_port_with_config( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + SocketConfig::default(), + ) + .expect("Unable to bind to UDP socket"), ), } } @@ -582,8 +586,11 @@ mod tests { fn new() -> Result { Ok(Self { udp_socket: Arc::new( - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(Into::::into)?, + solana_net_utils::bind_with_any_port_with_config( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + SocketConfig::default(), + ) + .map_err(Into::::into)?, ), }) } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index a1ec2f21915d90..24939053beed49 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -54,10 +54,10 @@ use { solana_ledger::shred::Shred, solana_measure::measure::Measure, solana_net_utils::{ - bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config, - bind_more_with_config, bind_to_localhost, bind_to_unspecified, + bind_common_in_range_with_config, bind_common_with_config, bind_in_range, + bind_in_range_with_config, bind_more_with_config, bind_to_localhost, bind_to_unspecified, bind_two_in_range_with_offset_and_config, find_available_port_in_range, - multi_bind_in_range, PortRange, SocketConfig, VALIDATOR_PORT_RANGE, + multi_bind_in_range_with_config, PortRange, SocketConfig, VALIDATOR_PORT_RANGE, }, solana_perf::{ data_budget::DataBudget, @@ -2612,8 +2612,8 @@ impl Node { let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST); let port_range = (1024, 65535); - let udp_config = SocketConfig { reuseport: false }; - let quic_config = SocketConfig { reuseport: true }; + let udp_config = SocketConfig::default(); + let quic_config = SocketConfig::default().reuseport(true); let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( localhost_ip_addr, @@ -2626,7 +2626,8 @@ impl Node { let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = - bind_common_in_range(localhost_ip_addr, port_range).unwrap(); + bind_common_in_range_with_config(localhost_ip_addr, port_range, udp_config.clone()) + .unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); let tvu = bind_to_localhost().unwrap(); let tvu_quic = bind_to_localhost().unwrap(); @@ -2730,21 +2731,19 @@ impl Node { port_range: PortRange, bind_ip_addr: IpAddr, ) -> (u16, (UdpSocket, TcpListener)) { + let config = SocketConfig::default(); if gossip_addr.port() != 0 { ( gossip_addr.port(), - bind_common(bind_ip_addr, gossip_addr.port()).unwrap_or_else(|e| { - panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e) - }), + bind_common_with_config(bind_ip_addr, gossip_addr.port(), config).unwrap_or_else( + |e| panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e), + ), ) } else { - bind_common_in_range(bind_ip_addr, port_range).expect("Failed to bind") + bind_common_in_range_with_config(bind_ip_addr, port_range, config) + .expect("Failed to bind") } } - fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) { - let config = SocketConfig { reuseport: false }; - Self::bind_with_config(bind_ip_addr, port_range, config) - } fn bind_with_config( bind_ip_addr: IpAddr, @@ -2762,49 +2761,74 @@ impl Node { ) -> Self { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); - let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); - let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); - let udp_config = SocketConfig { reuseport: false }; - let quic_config = SocketConfig { reuseport: true }; + + let read_write_socket_config = SocketConfig::default(); + let (tvu_port, tvu) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (tvu_quic_port, tvu_quic) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let tpu_udp_config = SocketConfig::default(); + let tpu_quic_config = SocketConfig::default().reuseport(true); let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, port_range, QUIC_PORT_OFFSET, - udp_config.clone(), - quic_config.clone(), + tpu_udp_config.clone(), + tpu_quic_config.clone(), ) .unwrap(); let tpu_quic: Vec = - bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, tpu_quic_config.clone()) + .unwrap(); + + let tpu_forwards_udp_config = SocketConfig::default(); + let tpu_forwards_quic_config = SocketConfig::default().reuseport(true); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, port_range, QUIC_PORT_OFFSET, - udp_config, - quic_config.clone(), + tpu_forwards_udp_config, + tpu_forwards_quic_config.clone(), ) .unwrap(); let tpu_forwards_quic = bind_more_with_config( tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, - quic_config.clone(), + tpu_forwards_quic_config.clone(), ) .unwrap(); - let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); - let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range); + + let tpu_vote_udp_config = SocketConfig::default(); + let tpu_vote_quic_config = SocketConfig::default().reuseport(true); + let (tpu_vote_port, tpu_vote) = + Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config.clone()); + // using udp port for quic really because we need to reusport set to false, since Self::bind() defaults to false + let (tpu_vote_quic_port, tpu_vote_quic) = + Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config); let tpu_vote_quic: Vec = - bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap(); + bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, tpu_vote_quic_config) + .unwrap(); - let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); - let (_, repair) = Self::bind(bind_ip_addr, port_range); - let (_, repair_quic) = Self::bind(bind_ip_addr, port_range); - let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); - let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range); - let (_, broadcast) = Self::bind(bind_ip_addr, port_range); - let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); - let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range); + let write_only_socket_config = SocketConfig::default(); + + let (_, retransmit_socket) = + Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config.clone()); + let (_, repair) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (_, repair_quic) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (serve_repair_port, serve_repair) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (serve_repair_quic_port, serve_repair_quic) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (_, broadcast) = + Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config); + let (_, ancestor_hashes_requests) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (_, ancestor_hashes_requests_quic) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); @@ -2880,24 +2904,40 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr); - let (tvu_port, tvu_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, num_tvu_sockets.get()) - .expect("tvu multi_bind"); - let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); + let tvu_config = SocketConfig::default().reuseport(true); + let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config( + bind_ip_addr, + port_range, + tvu_config.clone(), + num_tvu_sockets.get(), + ) + .expect("tvu multi_bind"); + + let tvu_config = SocketConfig::default().reuseport(false); + let (tvu_quic_port, tvu_quic) = + Self::bind_with_config(bind_ip_addr, port_range, tvu_config); + + let tpu_config = SocketConfig::default().reuseport(true); let (tpu_port, tpu_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); + multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_config.clone(), 32) + .expect("tpu multi_bind"); - let quic_config = SocketConfig { reuseport: true }; let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), - quic_config.clone(), + tpu_config.clone(), ); let tpu_quic = - bind_more_with_config(tpu_quic, num_quic_endpoints.get(), quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, num_quic_endpoints.get(), tpu_config.clone()).unwrap(); - let (tpu_forwards_port, tpu_forwards_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); + let tpu_forwards_config = SocketConfig::default().reuseport(true); + let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range_with_config( + bind_ip_addr, + port_range, + tpu_forwards_config.clone(), + 8, + ) + .expect("tpu_forwards multi_bind"); let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config( bind_ip_addr, @@ -2905,37 +2945,58 @@ impl Node { tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), - quic_config.clone(), + tpu_forwards_config.clone(), ); let tpu_forwards_quic = bind_more_with_config( tpu_forwards_quic, num_quic_endpoints.get(), - quic_config.clone(), + tpu_forwards_config.clone(), ) .unwrap(); + let tpu_vote_config = SocketConfig::default().reuseport(true); let (tpu_vote_port, tpu_vote_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); + multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_vote_config.clone(), 1) + .expect("tpu_vote multi_bind"); - let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range); + let tpu_vote_config = SocketConfig::default(); + let (tpu_vote_quic_port, tpu_vote_quic) = + Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_config.clone()); - let tpu_vote_quic = - bind_more_with_config(tpu_vote_quic, num_quic_endpoints.get(), quic_config.clone()) - .unwrap(); + let tpu_vote_config = SocketConfig::default().reuseport(true); + let tpu_vote_quic = bind_more_with_config( + tpu_vote_quic, + num_quic_endpoints.get(), + tpu_vote_config.clone(), + ) + .unwrap(); + let retransmit_config = SocketConfig::default().reuseport(true); let (_, retransmit_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind"); + multi_bind_in_range_with_config(bind_ip_addr, port_range, retransmit_config, 8) + .expect("retransmit multi_bind"); - let (_, repair) = Self::bind(bind_ip_addr, port_range); - let (_, repair_quic) = Self::bind(bind_ip_addr, port_range); - let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); - let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range); + let repair_config = SocketConfig::default(); + let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); + let (_, repair_quic) = + Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); - let (_, broadcast) = - multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind"); + let serve_repair_config = SocketConfig::default(); + let (serve_repair_port, serve_repair) = + Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config.clone()); + let (serve_repair_quic_port, serve_repair_quic) = + Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config); - let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); - let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range); + let broadcast_config = SocketConfig::default().reuseport(true); + let (_, broadcast) = + multi_bind_in_range_with_config(bind_ip_addr, port_range, broadcast_config, 4) + .expect("broadcast multi_bind"); + + let ancestor_hashes_config = SocketConfig::default(); + let (_, ancestor_hashes_requests) = + Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config.clone()); + let (_, ancestor_hashes_requests_quic) = + Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config); let mut info = ContactInfo::new( *pubkey, diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 6099d5460af403..8c1d2aeeec6dab 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -37,9 +37,16 @@ pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; // VALIDATOR_PORT_RANGE must be at least this wide #[cfg(not(any(windows, target_os = "ios")))] -const DEFAULT_RECV_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64 MB - Doubled to 128MB by the kernel +const DEFAULT_RECV_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB - Doubled to 128MB by the kernel #[cfg(not(any(windows, target_os = "ios")))] -const DEFAULT_SEND_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64 MB - Doubled to 128MB by the kernel +const DEFAULT_SEND_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB - Doubled to 128MB by the kernel + +#[derive(Clone, Debug)] +pub enum SocketUsage { + ReadOnly, + WriteOnly, + ReadWrite, +} pub(crate) const HEADER_LENGTH: usize = 4; pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23; @@ -392,21 +399,67 @@ pub fn is_host_port(string: String) -> Result<(), String> { } #[derive(Clone, Debug)] +#[cfg_attr(any(windows, target_os = "ios"), derive(Default))] pub struct SocketConfig { - pub reuseport: bool, + reuseport: bool, + #[cfg(not(any(windows, target_os = "ios")))] + usage: SocketUsage, + #[cfg(not(any(windows, target_os = "ios")))] + recv_buffer_size: usize, + #[cfg(not(any(windows, target_os = "ios")))] + send_buffer_size: usize, } +#[cfg(not(any(windows, target_os = "ios")))] impl Default for SocketConfig { - #[allow(clippy::derivable_impls)] fn default() -> Self { - Self { reuseport: false } + Self { + reuseport: false, + #[cfg(not(any(windows, target_os = "ios")))] + usage: SocketUsage::ReadWrite, + #[cfg(not(any(windows, target_os = "ios")))] + recv_buffer_size: DEFAULT_RECV_BUFFER_SIZE, + #[cfg(not(any(windows, target_os = "ios")))] + send_buffer_size: DEFAULT_SEND_BUFFER_SIZE, + } } } -#[cfg(any(windows, target_os = "ios"))] -fn udp_socket(_reuseaddr: bool) -> io::Result { - let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; - Ok(sock) +impl SocketConfig { + pub fn reuseport(mut self, reuseport: bool) -> Self { + self.reuseport = reuseport; + self + } + + // allow here to supress unused warnings from windows/ios builds + #[allow(unused_mut, unused_variables)] + pub fn usage(mut self, usage: SocketUsage) -> Self { + #[cfg(not(any(windows, target_os = "ios")))] + { + self.usage = usage; + } + self + } + + // allow here to supress unused warnings from windows/ios builds + #[allow(unused_mut, unused_variables)] + pub fn recv_buffer_size(mut self, size: usize) -> Self { + #[cfg(not(any(windows, target_os = "ios")))] + { + self.recv_buffer_size = size; + } + self + } + + // allow here to supress unused warnings from windows/ios builds + #[allow(unused_mut, unused_variables)] + pub fn send_buffer_size(mut self, size: usize) -> Self { + #[cfg(not(any(windows, target_os = "ios")))] + { + self.send_buffer_size = size; + } + self + } } #[cfg(any(windows, target_os = "ios"))] @@ -415,22 +468,21 @@ fn udp_socket_with_config(_config: SocketConfig) -> io::Result { Ok(sock) } -#[cfg(not(any(windows, target_os = "ios")))] -fn udp_socket(reuseport: bool) -> io::Result { - let config = SocketConfig { reuseport }; - udp_socket_with_config(config) -} - #[cfg(not(any(windows, target_os = "ios")))] fn udp_socket_with_config(config: SocketConfig) -> io::Result { use nix::sys::socket::{setsockopt, sockopt::ReusePort}; - let SocketConfig { reuseport } = config; + let SocketConfig { + reuseport, + usage: _, + recv_buffer_size, + send_buffer_size, + } = config; let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; - // Set recv and send buffer sizes to 128MB - sock.set_recv_buffer_size(DEFAULT_RECV_BUFFER_SIZE)?; - sock.set_send_buffer_size(DEFAULT_SEND_BUFFER_SIZE)?; + // Set buffer sizes + sock.set_recv_buffer_size(recv_buffer_size)?; + sock.set_send_buffer_size(send_buffer_size)?; if reuseport { setsockopt(&sock, ReusePort, &true).ok(); @@ -439,7 +491,29 @@ fn udp_socket_with_config(config: SocketConfig) -> io::Result { Ok(sock) } +// Find a port in the given range with a socket config that is available for both TCP and UDP +pub fn bind_common_in_range_with_config( + ip_addr: IpAddr, + range: PortRange, + config: SocketConfig, +) -> io::Result<(u16, (UdpSocket, TcpListener))> { + for port in range.0..range.1 { + if let Ok((sock, listener)) = bind_common_with_config(ip_addr, port, config.clone()) { + return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener))); + } + } + + Err(io::Error::new( + io::ErrorKind::Other, + format!("No available TCP/UDP ports in {range:?}"), + )) +} + // Find a port in the given range that is available for both TCP and UDP +#[deprecated( + since = "2.1.5", + note = "use `bind_common_in_range_with_config` instead" +)] pub fn bind_common_in_range( ip_addr: IpAddr, range: PortRange, @@ -483,8 +557,24 @@ pub fn bind_in_range_with_config( )) } +pub fn bind_with_any_port_with_config( + ip_addr: IpAddr, + config: SocketConfig, +) -> io::Result { + let sock = udp_socket_with_config(config)?; + let addr = SocketAddr::new(ip_addr, 0); + match sock.bind(&SockAddr::from(addr)) { + Ok(_) => Result::Ok(sock.into()), + Err(err) => Err(io::Error::new( + io::ErrorKind::Other, + format!("No available UDP port: {err}"), + )), + } +} + +#[deprecated(since = "2.1.5", note = "use `bind_with_any_port_with_config` instead")] pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { - let sock = udp_socket(false)?; + let sock = udp_socket_with_config(SocketConfig::default())?; let addr = SocketAddr::new(ip_addr, 0); match sock.bind(&SockAddr::from(addr)) { Ok(_) => Result::Ok(sock.into()), @@ -495,7 +585,64 @@ pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { } } +// binds many sockets to the same port in a range with config +pub fn multi_bind_in_range_with_config( + ip_addr: IpAddr, + range: PortRange, + config: SocketConfig, + mut num: usize, +) -> io::Result<(u16, Vec)> { + if !config.reuseport { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "SocketConfig.reuseport must be true for multi_bind_in_range_with_config", + )); + } + if cfg!(windows) && num != 1 { + // See https://github.com/solana-labs/solana/issues/4607 + warn!( + "multi_bind_in_range_with_config() only supports 1 socket in windows ({} requested)", + num + ); + num = 1; + } + let mut sockets = Vec::with_capacity(num); + + const NUM_TRIES: usize = 100; + let mut port = 0; + let mut error = None; + for _ in 0..NUM_TRIES { + port = { + let (port, _) = bind_in_range(ip_addr, range)?; + port + }; // drop the probe, port should be available... briefly. + + for _ in 0..num { + let sock = bind_to_with_config(ip_addr, port, config.clone()); + if let Ok(sock) = sock { + sockets.push(sock); + } else { + error = Some(sock); + break; + } + } + if sockets.len() == num { + break; + } else { + sockets.clear(); + } + } + if sockets.len() != num { + error.unwrap()?; + } + Ok((port, sockets)) +} + // binds many sockets to the same port in a range +#[deprecated( + since = "2.1.5", + note = "use `multi_bind_in_range_with_config` instead" +)] pub fn multi_bind_in_range( ip_addr: IpAddr, range: PortRange, @@ -520,7 +667,7 @@ pub fn multi_bind_in_range( port }; // drop the probe, port should be available... briefly. - let config = SocketConfig { reuseport: true }; + let config = SocketConfig::default().reuseport(true); for _ in 0..num { let sock = bind_to_with_config(ip_addr, port, config.clone()); if let Ok(sock) = sock { @@ -543,7 +690,7 @@ pub fn multi_bind_in_range( } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result { - let config = SocketConfig { reuseport }; + let config = SocketConfig::default().reuseport(reuseport); bind_to_with_config(ip_addr, port, config) } @@ -553,7 +700,7 @@ pub async fn bind_to_async( port: u16, reuseport: bool, ) -> io::Result { - let config = SocketConfig { reuseport }; + let config = SocketConfig::default().reuseport(reuseport); let socket = bind_to_with_config_non_blocking(ip_addr, port, config)?; TokioUdpSocket::from_std(socket) } @@ -622,7 +769,7 @@ pub fn bind_to_with_config_non_blocking( // binds both a UdpSocket and a TcpListener pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> { - let config = SocketConfig { reuseport: false }; + let config = SocketConfig::default(); bind_common_with_config(ip_addr, port, config) } @@ -824,9 +971,9 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig { reuseport: true }; + let config = SocketConfig::default().reuseport(true); let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); - let y = bind_to_with_config(ip_addr, 2002, config).unwrap(); + let y = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); assert_eq!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() @@ -834,7 +981,8 @@ mod tests { bind_to(ip_addr, 2002, false).unwrap_err(); bind_in_range(ip_addr, (2002, 2003)).unwrap_err(); - let (port, v) = multi_bind_in_range(ip_addr, (2010, 2110), 10).unwrap(); + let (port, v) = + multi_bind_in_range_with_config(ip_addr, (2010, 2110), config.clone(), 10).unwrap(); for sock in &v { assert_eq!(port, sock.local_addr().unwrap().port()); } @@ -843,8 +991,9 @@ mod tests { #[test] fn test_bind_with_any_port() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let x = bind_with_any_port(ip_addr).unwrap(); - let y = bind_with_any_port(ip_addr).unwrap(); + let config = SocketConfig::default(); + let x = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); + let y = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); assert_ne!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() @@ -875,18 +1024,21 @@ mod tests { #[test] fn test_bind_common_in_range() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let (port, _sockets) = bind_common_in_range(ip_addr, (3100, 3150)).unwrap(); + let config = SocketConfig::default(); + let (port, _sockets) = + bind_common_in_range_with_config(ip_addr, (3100, 3150), config.clone()).unwrap(); assert!((3100..3150).contains(&port)); - bind_common_in_range(ip_addr, (port, port + 1)).unwrap_err(); + bind_common_in_range_with_config(ip_addr, (port, port + 1), config.clone()).unwrap_err(); } #[test] fn test_get_public_ip_addr_none() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, server_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); let _runtime = ip_echo_server( server_tcp_listener, @@ -907,10 +1059,11 @@ mod tests { fn test_get_public_ip_addr_reachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, server_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); let (client_port, (client_udp_socket, client_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); let _runtime = ip_echo_server( server_tcp_listener, @@ -935,15 +1088,16 @@ mod tests { fn test_get_public_ip_addr_tcp_unreachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); // make the socket unreachable by not running the ip echo server! let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); let (correct_client_port, (_client_udp_socket, client_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); assert!(!do_verify_reachable_ports( &server_ip_echo_addr, @@ -958,15 +1112,16 @@ mod tests { fn test_get_public_ip_addr_udp_unreachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); // make the socket unreachable by not running the ip echo server! let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); assert!(!do_verify_reachable_ports( &server_ip_echo_addr, @@ -995,4 +1150,17 @@ mod tests { } assert!(bind_two_in_range_with_offset(ip_addr, (1024, 1044), offset).is_err()); } + + #[test] + fn test_multi_bind_in_range_with_config_reuseport_disabled() { + let ip_addr: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST); + let config = SocketConfig::default(); //reuseport is false by default + + let result = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 2); + + assert!( + result.is_err(), + "Expected an error when reuseport is not set to true" + ); + } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 75f3192dea3ccf..ff97c73a61056e 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5423,6 +5423,7 @@ dependencies = [ "solana-keypair", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-time-utils", "solana-transaction-error", "thiserror 2.0.6", diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index 2c9020bf846835..7214cd61790782 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -17,7 +17,7 @@ use { }, solana_keypair::Keypair, solana_measure::measure::Measure, - solana_net_utils::VALIDATOR_PORT_RANGE, + solana_net_utils::{SocketConfig, VALIDATOR_PORT_RANGE}, solana_quic_definitions::{ QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, }, @@ -75,9 +75,11 @@ impl QuicLazyInitializedEndpoint { let mut endpoint = if let Some(endpoint) = &self.client_endpoint { endpoint.clone() } else { - let client_socket = solana_net_utils::bind_in_range( + let config = SocketConfig::default(); + let client_socket = solana_net_utils::bind_in_range_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), VALIDATOR_PORT_RANGE, + config, ) .expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range") .1; diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 901d23235388e7..3093ceaf6ddcef 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5274,6 +5274,7 @@ dependencies = [ "solana-keypair", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-time-utils", "solana-transaction-error", "thiserror 2.0.6", diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs index cecb0664ae3217..db53617553dd67 100644 --- a/udp-client/src/lib.rs +++ b/udp-client/src/lib.rs @@ -16,6 +16,7 @@ use { connection_cache_stats::ConnectionCacheStats, }, solana_keypair::Keypair, + solana_net_utils::SocketConfig, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::Arc, @@ -62,8 +63,11 @@ pub struct UdpConfig { impl NewConnectionConfig for UdpConfig { fn new() -> Result { - let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(Into::::into)?; + let socket = solana_net_utils::bind_with_any_port_with_config( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + SocketConfig::default(), + ) + .map_err(Into::::into)?; Ok(Self { udp_socket: Arc::new(socket), }) diff --git a/udp-client/src/nonblocking/udp_client.rs b/udp-client/src/nonblocking/udp_client.rs index f688f23e0ba9ab..62d55c1c486dee 100644 --- a/udp-client/src/nonblocking/udp_client.rs +++ b/udp-client/src/nonblocking/udp_client.rs @@ -46,7 +46,7 @@ impl ClientConnection for UdpClientConnection { mod tests { use { super::*, - solana_net_utils::bind_to_async, + solana_net_utils::{bind_to_async, SocketConfig}, solana_packet::{Packet, PACKET_DATA_SIZE}, solana_streamer::nonblocking::recvmmsg::recv_mmsg, std::net::{IpAddr, Ipv4Addr}, @@ -73,8 +73,11 @@ mod tests { async fn test_send_from_addr() { let addr_str = "0.0.0.0:50100"; let addr = addr_str.parse().unwrap(); - let socket = - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).unwrap(); + let socket = solana_net_utils::bind_with_any_port_with_config( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + SocketConfig::default(), + ) + .unwrap(); let connection = UdpClientConnection::new_from_addr(socket, addr); let reader = bind_to_async( addr.ip(), From c373d6fd05b957aa85dbad34e8216f69338bf722 Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 6 Dec 2024 18:58:57 +0000 Subject: [PATCH 2/6] reuse new code for deprecated methods --- net-utils/src/lib.rs | 70 ++++++-------------------------------------- 1 file changed, 9 insertions(+), 61 deletions(-) diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 8c1d2aeeec6dab..85d0991c354671 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -511,23 +511,14 @@ pub fn bind_common_in_range_with_config( // Find a port in the given range that is available for both TCP and UDP #[deprecated( - since = "2.1.5", + since = "2.2.0", note = "use `bind_common_in_range_with_config` instead" )] pub fn bind_common_in_range( ip_addr: IpAddr, range: PortRange, ) -> io::Result<(u16, (UdpSocket, TcpListener))> { - for port in range.0..range.1 { - if let Ok((sock, listener)) = bind_common(ip_addr, port) { - return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener))); - } - } - - Err(io::Error::new( - io::ErrorKind::Other, - format!("No available TCP/UDP ports in {range:?}"), - )) + bind_common_in_range_with_config(ip_addr, range, SocketConfig::default()) } pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> { @@ -572,17 +563,9 @@ pub fn bind_with_any_port_with_config( } } -#[deprecated(since = "2.1.5", note = "use `bind_with_any_port_with_config` instead")] +#[deprecated(since = "2.2.0", note = "use `bind_with_any_port_with_config` instead")] pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { - let sock = udp_socket_with_config(SocketConfig::default())?; - let addr = SocketAddr::new(ip_addr, 0); - match sock.bind(&SockAddr::from(addr)) { - Ok(_) => Result::Ok(sock.into()), - Err(err) => Err(io::Error::new( - io::ErrorKind::Other, - format!("No available UDP port: {err}"), - )), - } + bind_with_any_port_with_config(ip_addr, SocketConfig::default()) } // binds many sockets to the same port in a range with config @@ -639,54 +622,19 @@ pub fn multi_bind_in_range_with_config( } // binds many sockets to the same port in a range +// Note: The `mut` modifier for `num` is unused but kept for compatibility with the public API. #[deprecated( - since = "2.1.5", + since = "2.2.0", note = "use `multi_bind_in_range_with_config` instead" )] +#[allow(unused_mut)] pub fn multi_bind_in_range( ip_addr: IpAddr, range: PortRange, mut num: usize, ) -> io::Result<(u16, Vec)> { - if cfg!(windows) && num != 1 { - // See https://github.com/solana-labs/solana/issues/4607 - warn!( - "multi_bind_in_range() only supports 1 socket in windows ({} requested)", - num - ); - num = 1; - } - let mut sockets = Vec::with_capacity(num); - - const NUM_TRIES: usize = 100; - let mut port = 0; - let mut error = None; - for _ in 0..NUM_TRIES { - port = { - let (port, _) = bind_in_range(ip_addr, range)?; - port - }; // drop the probe, port should be available... briefly. - - let config = SocketConfig::default().reuseport(true); - for _ in 0..num { - let sock = bind_to_with_config(ip_addr, port, config.clone()); - if let Ok(sock) = sock { - sockets.push(sock); - } else { - error = Some(sock); - break; - } - } - if sockets.len() == num { - break; - } else { - sockets.clear(); - } - } - if sockets.len() != num { - error.unwrap()?; - } - Ok((port, sockets)) + let config = SocketConfig::default().reuseport(true); + multi_bind_in_range_with_config(ip_addr, range, config, num) } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result { From cd98ecf93091b0625197bc6f0aaccd5dde0006e9 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 11 Dec 2024 05:54:49 +0000 Subject: [PATCH 3/6] add default config for readwrite socket --- bench-streamer/src/main.rs | 2 +- connection-cache/src/connection_cache.rs | 4 +- gossip/src/cluster_info.rs | 46 +++++++++--------- net-utils/src/lib.rs | 56 ++++++++-------------- quic-client/src/nonblocking/quic_client.rs | 2 +- udp-client/src/lib.rs | 2 +- udp-client/src/nonblocking/udp_client.rs | 2 +- 7 files changed, 48 insertions(+), 66 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index da203e752752eb..c67379e4bcb593 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -98,7 +98,7 @@ fn main() -> Result<()> { let (_port, read_sockets) = solana_net_utils::multi_bind_in_range_with_config( ip_addr, (port, port + num_sockets as u16), - SocketConfig::default().reuseport(true), + SocketConfig::default_rw().reuseport(true), num_sockets, ) .unwrap(); diff --git a/connection-cache/src/connection_cache.rs b/connection-cache/src/connection_cache.rs index 0e6e51c4b8d49a..2d6daab4dc5457 100644 --- a/connection-cache/src/connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -574,7 +574,7 @@ mod tests { udp_socket: Arc::new( solana_net_utils::bind_with_any_port_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - SocketConfig::default(), + SocketConfig::default_rw(), ) .expect("Unable to bind to UDP socket"), ), @@ -588,7 +588,7 @@ mod tests { udp_socket: Arc::new( solana_net_utils::bind_with_any_port_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - SocketConfig::default(), + SocketConfig::default_rw(), ) .map_err(Into::::into)?, ), diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 24939053beed49..a50686ad9da84b 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2612,8 +2612,8 @@ impl Node { let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST); let port_range = (1024, 65535); - let udp_config = SocketConfig::default(); - let quic_config = SocketConfig::default().reuseport(true); + let udp_config = SocketConfig::default_rw(); + let quic_config = SocketConfig::default_rw().reuseport(true); let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( localhost_ip_addr, @@ -2731,7 +2731,7 @@ impl Node { port_range: PortRange, bind_ip_addr: IpAddr, ) -> (u16, (UdpSocket, TcpListener)) { - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); if gossip_addr.port() != 0 { ( gossip_addr.port(), @@ -2762,13 +2762,13 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); - let read_write_socket_config = SocketConfig::default(); + let read_write_socket_config = SocketConfig::default_rw(); let (tvu_port, tvu) = Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); let (tvu_quic_port, tvu_quic) = Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); - let tpu_udp_config = SocketConfig::default(); - let tpu_quic_config = SocketConfig::default().reuseport(true); + let tpu_udp_config = SocketConfig::default_rw(); + let tpu_quic_config = SocketConfig::default_rw().reuseport(true); let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, @@ -2782,8 +2782,8 @@ impl Node { bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, tpu_quic_config.clone()) .unwrap(); - let tpu_forwards_udp_config = SocketConfig::default(); - let tpu_forwards_quic_config = SocketConfig::default().reuseport(true); + let tpu_forwards_udp_config = SocketConfig::default_rw(); + let tpu_forwards_quic_config = SocketConfig::default_rw().reuseport(true); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, @@ -2800,8 +2800,8 @@ impl Node { ) .unwrap(); - let tpu_vote_udp_config = SocketConfig::default(); - let tpu_vote_quic_config = SocketConfig::default().reuseport(true); + let tpu_vote_udp_config = SocketConfig::default_rw(); + let tpu_vote_quic_config = SocketConfig::default_rw().reuseport(true); let (tpu_vote_port, tpu_vote) = Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config.clone()); // using udp port for quic really because we need to reusport set to false, since Self::bind() defaults to false @@ -2811,7 +2811,7 @@ impl Node { bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, tpu_vote_quic_config) .unwrap(); - let write_only_socket_config = SocketConfig::default(); + let write_only_socket_config = SocketConfig::default_rw(); let (_, retransmit_socket) = Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config.clone()); @@ -2904,7 +2904,7 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr); - let tvu_config = SocketConfig::default().reuseport(true); + let tvu_config = SocketConfig::default_rw().reuseport(true); let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config( bind_ip_addr, port_range, @@ -2913,11 +2913,11 @@ impl Node { ) .expect("tvu multi_bind"); - let tvu_config = SocketConfig::default().reuseport(false); + let tvu_config = SocketConfig::default_rw().reuseport(false); let (tvu_quic_port, tvu_quic) = Self::bind_with_config(bind_ip_addr, port_range, tvu_config); - let tpu_config = SocketConfig::default().reuseport(true); + let tpu_config = SocketConfig::default_rw().reuseport(true); let (tpu_port, tpu_sockets) = multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_config.clone(), 32) .expect("tpu multi_bind"); @@ -2930,7 +2930,7 @@ impl Node { let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints.get(), tpu_config.clone()).unwrap(); - let tpu_forwards_config = SocketConfig::default().reuseport(true); + let tpu_forwards_config = SocketConfig::default_rw().reuseport(true); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range_with_config( bind_ip_addr, port_range, @@ -2954,16 +2954,16 @@ impl Node { ) .unwrap(); - let tpu_vote_config = SocketConfig::default().reuseport(true); + let tpu_vote_config = SocketConfig::default_rw().reuseport(true); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_vote_config.clone(), 1) .expect("tpu_vote multi_bind"); - let tpu_vote_config = SocketConfig::default(); + let tpu_vote_config = SocketConfig::default_rw(); let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_config.clone()); - let tpu_vote_config = SocketConfig::default().reuseport(true); + let tpu_vote_config = SocketConfig::default_rw().reuseport(true); let tpu_vote_quic = bind_more_with_config( tpu_vote_quic, num_quic_endpoints.get(), @@ -2971,28 +2971,28 @@ impl Node { ) .unwrap(); - let retransmit_config = SocketConfig::default().reuseport(true); + let retransmit_config = SocketConfig::default_rw().reuseport(true); let (_, retransmit_sockets) = multi_bind_in_range_with_config(bind_ip_addr, port_range, retransmit_config, 8) .expect("retransmit multi_bind"); - let repair_config = SocketConfig::default(); + let repair_config = SocketConfig::default_rw(); let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); let (_, repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); - let serve_repair_config = SocketConfig::default(); + let serve_repair_config = SocketConfig::default_rw(); let (serve_repair_port, serve_repair) = Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config.clone()); let (serve_repair_quic_port, serve_repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config); - let broadcast_config = SocketConfig::default().reuseport(true); + let broadcast_config = SocketConfig::default_rw().reuseport(true); let (_, broadcast) = multi_bind_in_range_with_config(bind_ip_addr, port_range, broadcast_config, 4) .expect("broadcast multi_bind"); - let ancestor_hashes_config = SocketConfig::default(); + let ancestor_hashes_config = SocketConfig::default_rw(); let (_, ancestor_hashes_requests) = Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config.clone()); let (_, ancestor_hashes_requests_quic) = diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 85d0991c354671..16a02c8f9d71bd 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -403,44 +403,27 @@ pub fn is_host_port(string: String) -> Result<(), String> { pub struct SocketConfig { reuseport: bool, #[cfg(not(any(windows, target_os = "ios")))] - usage: SocketUsage, - #[cfg(not(any(windows, target_os = "ios")))] recv_buffer_size: usize, #[cfg(not(any(windows, target_os = "ios")))] send_buffer_size: usize, } -#[cfg(not(any(windows, target_os = "ios")))] -impl Default for SocketConfig { - fn default() -> Self { +impl SocketConfig { + pub fn default_rw() -> Self { Self { reuseport: false, #[cfg(not(any(windows, target_os = "ios")))] - usage: SocketUsage::ReadWrite, - #[cfg(not(any(windows, target_os = "ios")))] recv_buffer_size: DEFAULT_RECV_BUFFER_SIZE, #[cfg(not(any(windows, target_os = "ios")))] send_buffer_size: DEFAULT_SEND_BUFFER_SIZE, } } -} -impl SocketConfig { pub fn reuseport(mut self, reuseport: bool) -> Self { self.reuseport = reuseport; self } - // allow here to supress unused warnings from windows/ios builds - #[allow(unused_mut, unused_variables)] - pub fn usage(mut self, usage: SocketUsage) -> Self { - #[cfg(not(any(windows, target_os = "ios")))] - { - self.usage = usage; - } - self - } - // allow here to supress unused warnings from windows/ios builds #[allow(unused_mut, unused_variables)] pub fn recv_buffer_size(mut self, size: usize) -> Self { @@ -473,7 +456,6 @@ fn udp_socket_with_config(config: SocketConfig) -> io::Result { use nix::sys::socket::{setsockopt, sockopt::ReusePort}; let SocketConfig { reuseport, - usage: _, recv_buffer_size, send_buffer_size, } = config; @@ -518,11 +500,11 @@ pub fn bind_common_in_range( ip_addr: IpAddr, range: PortRange, ) -> io::Result<(u16, (UdpSocket, TcpListener))> { - bind_common_in_range_with_config(ip_addr, range, SocketConfig::default()) + bind_common_in_range_with_config(ip_addr, range, SocketConfig::default_rw()) } pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> { - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); bind_in_range_with_config(ip_addr, range, config) } @@ -565,7 +547,7 @@ pub fn bind_with_any_port_with_config( #[deprecated(since = "2.2.0", note = "use `bind_with_any_port_with_config` instead")] pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { - bind_with_any_port_with_config(ip_addr, SocketConfig::default()) + bind_with_any_port_with_config(ip_addr, SocketConfig::default_rw()) } // binds many sockets to the same port in a range with config @@ -633,12 +615,12 @@ pub fn multi_bind_in_range( range: PortRange, mut num: usize, ) -> io::Result<(u16, Vec)> { - let config = SocketConfig::default().reuseport(true); + let config = SocketConfig::default_rw().reuseport(true); multi_bind_in_range_with_config(ip_addr, range, config, num) } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result { - let config = SocketConfig::default().reuseport(reuseport); + let config = SocketConfig::default_rw().reuseport(reuseport); bind_to_with_config(ip_addr, port, config) } @@ -648,7 +630,7 @@ pub async fn bind_to_async( port: u16, reuseport: bool, ) -> io::Result { - let config = SocketConfig::default().reuseport(reuseport); + let config = SocketConfig::default_rw().reuseport(reuseport); let socket = bind_to_with_config_non_blocking(ip_addr, port, config)?; TokioUdpSocket::from_std(socket) } @@ -717,7 +699,7 @@ pub fn bind_to_with_config_non_blocking( // binds both a UdpSocket and a TcpListener pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> { - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); bind_common_with_config(ip_addr, port, config) } @@ -740,8 +722,8 @@ pub fn bind_two_in_range_with_offset( range: PortRange, offset: u16, ) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { - let sock1_config = SocketConfig::default(); - let sock2_config = SocketConfig::default(); + let sock1_config = SocketConfig::default_rw(); + let sock2_config = SocketConfig::default_rw(); bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config) } @@ -919,7 +901,7 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default().reuseport(true); + let config = SocketConfig::default_rw().reuseport(true); let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); let y = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); assert_eq!( @@ -939,7 +921,7 @@ mod tests { #[test] fn test_bind_with_any_port() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); let x = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); let y = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); assert_ne!( @@ -972,7 +954,7 @@ mod tests { #[test] fn test_bind_common_in_range() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); let (port, _sockets) = bind_common_in_range_with_config(ip_addr, (3100, 3150), config.clone()).unwrap(); assert!((3100..3150).contains(&port)); @@ -984,7 +966,7 @@ mod tests { fn test_get_public_ip_addr_none() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); let (_server_port, (server_udp_socket, server_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); @@ -1007,7 +989,7 @@ mod tests { fn test_get_public_ip_addr_reachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); let (_server_port, (server_udp_socket, server_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); let (client_port, (client_udp_socket, client_tcp_listener)) = @@ -1036,7 +1018,7 @@ mod tests { fn test_get_public_ip_addr_tcp_unreachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); @@ -1060,7 +1042,7 @@ mod tests { fn test_get_public_ip_addr_udp_unreachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); @@ -1102,7 +1084,7 @@ mod tests { #[test] fn test_multi_bind_in_range_with_config_reuseport_disabled() { let ip_addr: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST); - let config = SocketConfig::default(); //reuseport is false by default + let config = SocketConfig::default_rw(); //reuseport is false by default let result = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 2); diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index 7214cd61790782..883e7485150e2b 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -75,7 +75,7 @@ impl QuicLazyInitializedEndpoint { let mut endpoint = if let Some(endpoint) = &self.client_endpoint { endpoint.clone() } else { - let config = SocketConfig::default(); + let config = SocketConfig::default_rw(); let client_socket = solana_net_utils::bind_in_range_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), VALIDATOR_PORT_RANGE, diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs index db53617553dd67..5cbc0f4c8d6762 100644 --- a/udp-client/src/lib.rs +++ b/udp-client/src/lib.rs @@ -65,7 +65,7 @@ impl NewConnectionConfig for UdpConfig { fn new() -> Result { let socket = solana_net_utils::bind_with_any_port_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - SocketConfig::default(), + SocketConfig::default_rw(), ) .map_err(Into::::into)?; Ok(Self { diff --git a/udp-client/src/nonblocking/udp_client.rs b/udp-client/src/nonblocking/udp_client.rs index 62d55c1c486dee..86bf4f0f1f5420 100644 --- a/udp-client/src/nonblocking/udp_client.rs +++ b/udp-client/src/nonblocking/udp_client.rs @@ -75,7 +75,7 @@ mod tests { let addr = addr_str.parse().unwrap(); let socket = solana_net_utils::bind_with_any_port_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - SocketConfig::default(), + SocketConfig::default_rw(), ) .unwrap(); let connection = UdpClientConnection::new_from_addr(socket, addr); From ba21675fb26fc990aa39e741f18a16c07b00965f Mon Sep 17 00:00:00 2001 From: greg Date: Thu, 12 Dec 2024 21:04:14 +0000 Subject: [PATCH 4/6] leave default buffer size to OS --- bench-streamer/src/main.rs | 2 +- connection-cache/src/connection_cache.rs | 4 +- gossip/src/cluster_info.rs | 46 ++++++------ net-utils/src/lib.rs | 81 +++++++++++----------- quic-client/src/nonblocking/quic_client.rs | 2 +- udp-client/src/lib.rs | 2 +- udp-client/src/nonblocking/udp_client.rs | 2 +- 7 files changed, 70 insertions(+), 69 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index c67379e4bcb593..da203e752752eb 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -98,7 +98,7 @@ fn main() -> Result<()> { let (_port, read_sockets) = solana_net_utils::multi_bind_in_range_with_config( ip_addr, (port, port + num_sockets as u16), - SocketConfig::default_rw().reuseport(true), + SocketConfig::default().reuseport(true), num_sockets, ) .unwrap(); diff --git a/connection-cache/src/connection_cache.rs b/connection-cache/src/connection_cache.rs index 2d6daab4dc5457..0e6e51c4b8d49a 100644 --- a/connection-cache/src/connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -574,7 +574,7 @@ mod tests { udp_socket: Arc::new( solana_net_utils::bind_with_any_port_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - SocketConfig::default_rw(), + SocketConfig::default(), ) .expect("Unable to bind to UDP socket"), ), @@ -588,7 +588,7 @@ mod tests { udp_socket: Arc::new( solana_net_utils::bind_with_any_port_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - SocketConfig::default_rw(), + SocketConfig::default(), ) .map_err(Into::::into)?, ), diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index a50686ad9da84b..24939053beed49 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2612,8 +2612,8 @@ impl Node { let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST); let port_range = (1024, 65535); - let udp_config = SocketConfig::default_rw(); - let quic_config = SocketConfig::default_rw().reuseport(true); + let udp_config = SocketConfig::default(); + let quic_config = SocketConfig::default().reuseport(true); let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( localhost_ip_addr, @@ -2731,7 +2731,7 @@ impl Node { port_range: PortRange, bind_ip_addr: IpAddr, ) -> (u16, (UdpSocket, TcpListener)) { - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); if gossip_addr.port() != 0 { ( gossip_addr.port(), @@ -2762,13 +2762,13 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); - let read_write_socket_config = SocketConfig::default_rw(); + let read_write_socket_config = SocketConfig::default(); let (tvu_port, tvu) = Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); let (tvu_quic_port, tvu_quic) = Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); - let tpu_udp_config = SocketConfig::default_rw(); - let tpu_quic_config = SocketConfig::default_rw().reuseport(true); + let tpu_udp_config = SocketConfig::default(); + let tpu_quic_config = SocketConfig::default().reuseport(true); let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, @@ -2782,8 +2782,8 @@ impl Node { bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, tpu_quic_config.clone()) .unwrap(); - let tpu_forwards_udp_config = SocketConfig::default_rw(); - let tpu_forwards_quic_config = SocketConfig::default_rw().reuseport(true); + let tpu_forwards_udp_config = SocketConfig::default(); + let tpu_forwards_quic_config = SocketConfig::default().reuseport(true); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, @@ -2800,8 +2800,8 @@ impl Node { ) .unwrap(); - let tpu_vote_udp_config = SocketConfig::default_rw(); - let tpu_vote_quic_config = SocketConfig::default_rw().reuseport(true); + let tpu_vote_udp_config = SocketConfig::default(); + let tpu_vote_quic_config = SocketConfig::default().reuseport(true); let (tpu_vote_port, tpu_vote) = Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config.clone()); // using udp port for quic really because we need to reusport set to false, since Self::bind() defaults to false @@ -2811,7 +2811,7 @@ impl Node { bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, tpu_vote_quic_config) .unwrap(); - let write_only_socket_config = SocketConfig::default_rw(); + let write_only_socket_config = SocketConfig::default(); let (_, retransmit_socket) = Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config.clone()); @@ -2904,7 +2904,7 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr); - let tvu_config = SocketConfig::default_rw().reuseport(true); + let tvu_config = SocketConfig::default().reuseport(true); let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config( bind_ip_addr, port_range, @@ -2913,11 +2913,11 @@ impl Node { ) .expect("tvu multi_bind"); - let tvu_config = SocketConfig::default_rw().reuseport(false); + let tvu_config = SocketConfig::default().reuseport(false); let (tvu_quic_port, tvu_quic) = Self::bind_with_config(bind_ip_addr, port_range, tvu_config); - let tpu_config = SocketConfig::default_rw().reuseport(true); + let tpu_config = SocketConfig::default().reuseport(true); let (tpu_port, tpu_sockets) = multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_config.clone(), 32) .expect("tpu multi_bind"); @@ -2930,7 +2930,7 @@ impl Node { let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints.get(), tpu_config.clone()).unwrap(); - let tpu_forwards_config = SocketConfig::default_rw().reuseport(true); + let tpu_forwards_config = SocketConfig::default().reuseport(true); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range_with_config( bind_ip_addr, port_range, @@ -2954,16 +2954,16 @@ impl Node { ) .unwrap(); - let tpu_vote_config = SocketConfig::default_rw().reuseport(true); + let tpu_vote_config = SocketConfig::default().reuseport(true); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_vote_config.clone(), 1) .expect("tpu_vote multi_bind"); - let tpu_vote_config = SocketConfig::default_rw(); + let tpu_vote_config = SocketConfig::default(); let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_config.clone()); - let tpu_vote_config = SocketConfig::default_rw().reuseport(true); + let tpu_vote_config = SocketConfig::default().reuseport(true); let tpu_vote_quic = bind_more_with_config( tpu_vote_quic, num_quic_endpoints.get(), @@ -2971,28 +2971,28 @@ impl Node { ) .unwrap(); - let retransmit_config = SocketConfig::default_rw().reuseport(true); + let retransmit_config = SocketConfig::default().reuseport(true); let (_, retransmit_sockets) = multi_bind_in_range_with_config(bind_ip_addr, port_range, retransmit_config, 8) .expect("retransmit multi_bind"); - let repair_config = SocketConfig::default_rw(); + let repair_config = SocketConfig::default(); let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); let (_, repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); - let serve_repair_config = SocketConfig::default_rw(); + let serve_repair_config = SocketConfig::default(); let (serve_repair_port, serve_repair) = Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config.clone()); let (serve_repair_quic_port, serve_repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config); - let broadcast_config = SocketConfig::default_rw().reuseport(true); + let broadcast_config = SocketConfig::default().reuseport(true); let (_, broadcast) = multi_bind_in_range_with_config(bind_ip_addr, port_range, broadcast_config, 4) .expect("broadcast multi_bind"); - let ancestor_hashes_config = SocketConfig::default_rw(); + let ancestor_hashes_config = SocketConfig::default(); let (_, ancestor_hashes_requests) = Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config.clone()); let (_, ancestor_hashes_requests_quic) = diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 16a02c8f9d71bd..93c826f747c17b 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -36,11 +36,6 @@ pub type PortRange = (u16, u16); pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; // VALIDATOR_PORT_RANGE must be at least this wide -#[cfg(not(any(windows, target_os = "ios")))] -const DEFAULT_RECV_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB - Doubled to 128MB by the kernel -#[cfg(not(any(windows, target_os = "ios")))] -const DEFAULT_SEND_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB - Doubled to 128MB by the kernel - #[derive(Clone, Debug)] pub enum SocketUsage { ReadOnly, @@ -398,48 +393,49 @@ pub fn is_host_port(string: String) -> Result<(), String> { parse_host_port(&string).map(|_| ()) } -#[derive(Clone, Debug)] -#[cfg_attr(any(windows, target_os = "ios"), derive(Default))] +#[derive(Clone, Debug, Default)] pub struct SocketConfig { reuseport: bool, #[cfg(not(any(windows, target_os = "ios")))] - recv_buffer_size: usize, + recv_buffer_size: Option, #[cfg(not(any(windows, target_os = "ios")))] - send_buffer_size: usize, + send_buffer_size: Option, } impl SocketConfig { - pub fn default_rw() -> Self { - Self { - reuseport: false, - #[cfg(not(any(windows, target_os = "ios")))] - recv_buffer_size: DEFAULT_RECV_BUFFER_SIZE, - #[cfg(not(any(windows, target_os = "ios")))] - send_buffer_size: DEFAULT_SEND_BUFFER_SIZE, - } - } - pub fn reuseport(mut self, reuseport: bool) -> Self { self.reuseport = reuseport; self } + /// Sets the receive buffer size for the socket. + /// + /// **Note:** On Linux the kernel will double the value you specify. + /// For example, if you specify `16MB`, the kernel will configure the + /// socket to use `32MB`. + /// See: https://man7.org/linux/man-pages/man7/socket.7.html: SO_RCVBUF // allow here to supress unused warnings from windows/ios builds #[allow(unused_mut, unused_variables)] pub fn recv_buffer_size(mut self, size: usize) -> Self { #[cfg(not(any(windows, target_os = "ios")))] { - self.recv_buffer_size = size; + self.recv_buffer_size = Some(size); } self } + /// Sets the send buffer size for the socket. + /// + /// **Note:** On Linux the kernel will double the value you specify. + /// For example, if you specify `16MB`, the kernel will configure the + /// socket to use `32MB`. + /// See: https://man7.org/linux/man-pages/man7/socket.7.html: SO_SNDBUF // allow here to supress unused warnings from windows/ios builds #[allow(unused_mut, unused_variables)] pub fn send_buffer_size(mut self, size: usize) -> Self { #[cfg(not(any(windows, target_os = "ios")))] { - self.send_buffer_size = size; + self.send_buffer_size = Some(size); } self } @@ -463,8 +459,13 @@ fn udp_socket_with_config(config: SocketConfig) -> io::Result { let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; // Set buffer sizes - sock.set_recv_buffer_size(recv_buffer_size)?; - sock.set_send_buffer_size(send_buffer_size)?; + if let Some(recv_buffer_size) = recv_buffer_size { + sock.set_recv_buffer_size(recv_buffer_size)?; + } + + if let Some(send_buffer_size) = send_buffer_size { + sock.set_send_buffer_size(send_buffer_size)?; + } if reuseport { setsockopt(&sock, ReusePort, &true).ok(); @@ -500,11 +501,11 @@ pub fn bind_common_in_range( ip_addr: IpAddr, range: PortRange, ) -> io::Result<(u16, (UdpSocket, TcpListener))> { - bind_common_in_range_with_config(ip_addr, range, SocketConfig::default_rw()) + bind_common_in_range_with_config(ip_addr, range, SocketConfig::default()) } pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> { - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); bind_in_range_with_config(ip_addr, range, config) } @@ -547,7 +548,7 @@ pub fn bind_with_any_port_with_config( #[deprecated(since = "2.2.0", note = "use `bind_with_any_port_with_config` instead")] pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { - bind_with_any_port_with_config(ip_addr, SocketConfig::default_rw()) + bind_with_any_port_with_config(ip_addr, SocketConfig::default()) } // binds many sockets to the same port in a range with config @@ -615,12 +616,12 @@ pub fn multi_bind_in_range( range: PortRange, mut num: usize, ) -> io::Result<(u16, Vec)> { - let config = SocketConfig::default_rw().reuseport(true); + let config = SocketConfig::default().reuseport(true); multi_bind_in_range_with_config(ip_addr, range, config, num) } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result { - let config = SocketConfig::default_rw().reuseport(reuseport); + let config = SocketConfig::default().reuseport(reuseport); bind_to_with_config(ip_addr, port, config) } @@ -630,7 +631,7 @@ pub async fn bind_to_async( port: u16, reuseport: bool, ) -> io::Result { - let config = SocketConfig::default_rw().reuseport(reuseport); + let config = SocketConfig::default().reuseport(reuseport); let socket = bind_to_with_config_non_blocking(ip_addr, port, config)?; TokioUdpSocket::from_std(socket) } @@ -699,7 +700,7 @@ pub fn bind_to_with_config_non_blocking( // binds both a UdpSocket and a TcpListener pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> { - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); bind_common_with_config(ip_addr, port, config) } @@ -722,8 +723,8 @@ pub fn bind_two_in_range_with_offset( range: PortRange, offset: u16, ) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { - let sock1_config = SocketConfig::default_rw(); - let sock2_config = SocketConfig::default_rw(); + let sock1_config = SocketConfig::default(); + let sock2_config = SocketConfig::default(); bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config) } @@ -901,7 +902,7 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default_rw().reuseport(true); + let config = SocketConfig::default().reuseport(true); let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); let y = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); assert_eq!( @@ -921,7 +922,7 @@ mod tests { #[test] fn test_bind_with_any_port() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); let x = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); let y = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); assert_ne!( @@ -954,7 +955,7 @@ mod tests { #[test] fn test_bind_common_in_range() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); let (port, _sockets) = bind_common_in_range_with_config(ip_addr, (3100, 3150), config.clone()).unwrap(); assert!((3100..3150).contains(&port)); @@ -966,7 +967,7 @@ mod tests { fn test_get_public_ip_addr_none() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, server_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); @@ -989,7 +990,7 @@ mod tests { fn test_get_public_ip_addr_reachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, server_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); let (client_port, (client_udp_socket, client_tcp_listener)) = @@ -1018,7 +1019,7 @@ mod tests { fn test_get_public_ip_addr_tcp_unreachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); @@ -1042,7 +1043,7 @@ mod tests { fn test_get_public_ip_addr_udp_unreachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); @@ -1084,7 +1085,7 @@ mod tests { #[test] fn test_multi_bind_in_range_with_config_reuseport_disabled() { let ip_addr: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST); - let config = SocketConfig::default_rw(); //reuseport is false by default + let config = SocketConfig::default(); //reuseport is false by default let result = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 2); diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index 883e7485150e2b..7214cd61790782 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -75,7 +75,7 @@ impl QuicLazyInitializedEndpoint { let mut endpoint = if let Some(endpoint) = &self.client_endpoint { endpoint.clone() } else { - let config = SocketConfig::default_rw(); + let config = SocketConfig::default(); let client_socket = solana_net_utils::bind_in_range_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), VALIDATOR_PORT_RANGE, diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs index 5cbc0f4c8d6762..db53617553dd67 100644 --- a/udp-client/src/lib.rs +++ b/udp-client/src/lib.rs @@ -65,7 +65,7 @@ impl NewConnectionConfig for UdpConfig { fn new() -> Result { let socket = solana_net_utils::bind_with_any_port_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - SocketConfig::default_rw(), + SocketConfig::default(), ) .map_err(Into::::into)?; Ok(Self { diff --git a/udp-client/src/nonblocking/udp_client.rs b/udp-client/src/nonblocking/udp_client.rs index 86bf4f0f1f5420..62d55c1c486dee 100644 --- a/udp-client/src/nonblocking/udp_client.rs +++ b/udp-client/src/nonblocking/udp_client.rs @@ -75,7 +75,7 @@ mod tests { let addr = addr_str.parse().unwrap(); let socket = solana_net_utils::bind_with_any_port_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - SocketConfig::default_rw(), + SocketConfig::default(), ) .unwrap(); let connection = UdpClientConnection::new_from_addr(socket, addr); From 296b463f3eac82e4c619e4cd60cd5c22f06d1f34 Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 13 Dec 2024 17:58:30 +0000 Subject: [PATCH 5/6] switch to unified socket config by reuseport --- gossip/src/cluster_info.rs | 156 +++++++++++++++++++------------------ net-utils/src/lib.rs | 7 -- 2 files changed, 82 insertions(+), 81 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 24939053beed49..8e0acf18e6930a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2762,73 +2762,70 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); - let read_write_socket_config = SocketConfig::default(); + let socket_config = SocketConfig::default(); + let socket_config_reuseport = SocketConfig::default().reuseport(true); let (tvu_port, tvu) = - Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (tvu_quic_port, tvu_quic) = - Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); - let tpu_udp_config = SocketConfig::default(); - let tpu_quic_config = SocketConfig::default().reuseport(true); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, port_range, QUIC_PORT_OFFSET, - tpu_udp_config.clone(), - tpu_quic_config.clone(), + socket_config.clone(), + socket_config_reuseport.clone(), ) .unwrap(); - let tpu_quic: Vec = - bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, tpu_quic_config.clone()) - .unwrap(); + let tpu_quic: Vec = bind_more_with_config( + tpu_quic, + DEFAULT_QUIC_ENDPOINTS, + socket_config_reuseport.clone(), + ) + .unwrap(); - let tpu_forwards_udp_config = SocketConfig::default(); - let tpu_forwards_quic_config = SocketConfig::default().reuseport(true); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, port_range, QUIC_PORT_OFFSET, - tpu_forwards_udp_config, - tpu_forwards_quic_config.clone(), + socket_config.clone(), + socket_config_reuseport.clone(), ) .unwrap(); let tpu_forwards_quic = bind_more_with_config( tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, - tpu_forwards_quic_config.clone(), + socket_config_reuseport.clone(), ) .unwrap(); - let tpu_vote_udp_config = SocketConfig::default(); - let tpu_vote_quic_config = SocketConfig::default().reuseport(true); let (tpu_vote_port, tpu_vote) = - Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config.clone()); - // using udp port for quic really because we need to reusport set to false, since Self::bind() defaults to false + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (tpu_vote_quic_port, tpu_vote_quic) = - Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config); - let tpu_vote_quic: Vec = - bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, tpu_vote_quic_config) - .unwrap(); - - let write_only_socket_config = SocketConfig::default(); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + let tpu_vote_quic: Vec = bind_more_with_config( + tpu_vote_quic, + DEFAULT_QUIC_ENDPOINTS, + socket_config_reuseport, + ) + .unwrap(); let (_, retransmit_socket) = - Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config.clone()); - let (_, repair) = - Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (_, repair_quic) = - Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (serve_repair_port, serve_repair) = - Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (serve_repair_quic_port, serve_repair_quic) = - Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (_, broadcast) = - Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (_, ancestor_hashes_requests) = - Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (_, ancestor_hashes_requests_quic) = - Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); @@ -2904,37 +2901,44 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr); - let tvu_config = SocketConfig::default().reuseport(true); + let socket_config = SocketConfig::default(); + let socket_config_reuseport = SocketConfig::default().reuseport(true); + let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config( bind_ip_addr, port_range, - tvu_config.clone(), + socket_config_reuseport.clone(), num_tvu_sockets.get(), ) .expect("tvu multi_bind"); - let tvu_config = SocketConfig::default().reuseport(false); let (tvu_quic_port, tvu_quic) = - Self::bind_with_config(bind_ip_addr, port_range, tvu_config); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); - let tpu_config = SocketConfig::default().reuseport(true); - let (tpu_port, tpu_sockets) = - multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_config.clone(), 32) - .expect("tpu multi_bind"); + let (tpu_port, tpu_sockets) = multi_bind_in_range_with_config( + bind_ip_addr, + port_range, + socket_config_reuseport.clone(), + 32, + ) + .expect("tpu multi_bind"); let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), - tpu_config.clone(), + socket_config_reuseport.clone(), ); - let tpu_quic = - bind_more_with_config(tpu_quic, num_quic_endpoints.get(), tpu_config.clone()).unwrap(); + let tpu_quic = bind_more_with_config( + tpu_quic, + num_quic_endpoints.get(), + socket_config_reuseport.clone(), + ) + .unwrap(); - let tpu_forwards_config = SocketConfig::default().reuseport(true); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range_with_config( bind_ip_addr, port_range, - tpu_forwards_config.clone(), + socket_config_reuseport.clone(), 8, ) .expect("tpu_forwards multi_bind"); @@ -2945,58 +2949,62 @@ impl Node { tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), - tpu_forwards_config.clone(), + socket_config_reuseport.clone(), ); let tpu_forwards_quic = bind_more_with_config( tpu_forwards_quic, num_quic_endpoints.get(), - tpu_forwards_config.clone(), + socket_config_reuseport.clone(), ) .unwrap(); - let tpu_vote_config = SocketConfig::default().reuseport(true); - let (tpu_vote_port, tpu_vote_sockets) = - multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_vote_config.clone(), 1) - .expect("tpu_vote multi_bind"); + let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range_with_config( + bind_ip_addr, + port_range, + socket_config_reuseport.clone(), + 1, + ) + .expect("tpu_vote multi_bind"); - let tpu_vote_config = SocketConfig::default(); let (tpu_vote_quic_port, tpu_vote_quic) = - Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); - let tpu_vote_config = SocketConfig::default().reuseport(true); let tpu_vote_quic = bind_more_with_config( tpu_vote_quic, num_quic_endpoints.get(), - tpu_vote_config.clone(), + socket_config_reuseport.clone(), ) .unwrap(); - let retransmit_config = SocketConfig::default().reuseport(true); - let (_, retransmit_sockets) = - multi_bind_in_range_with_config(bind_ip_addr, port_range, retransmit_config, 8) - .expect("retransmit multi_bind"); + let (_, retransmit_sockets) = multi_bind_in_range_with_config( + bind_ip_addr, + port_range, + socket_config_reuseport.clone(), + 8, + ) + .expect("retransmit multi_bind"); - let repair_config = SocketConfig::default(); - let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); + let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (_, repair_quic) = - Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); - let serve_repair_config = SocketConfig::default(); let (serve_repair_port, serve_repair) = - Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (serve_repair_quic_port, serve_repair_quic) = - Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); - let broadcast_config = SocketConfig::default().reuseport(true); - let (_, broadcast) = - multi_bind_in_range_with_config(bind_ip_addr, port_range, broadcast_config, 4) - .expect("broadcast multi_bind"); + let (_, broadcast) = multi_bind_in_range_with_config( + bind_ip_addr, + port_range, + socket_config_reuseport.clone(), + 4, + ) + .expect("broadcast multi_bind"); - let ancestor_hashes_config = SocketConfig::default(); let (_, ancestor_hashes_requests) = - Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let (_, ancestor_hashes_requests_quic) = - Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config); + Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); let mut info = ContactInfo::new( *pubkey, diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 93c826f747c17b..3e032c53b6fae9 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -36,13 +36,6 @@ pub type PortRange = (u16, u16); pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; // VALIDATOR_PORT_RANGE must be at least this wide -#[derive(Clone, Debug)] -pub enum SocketUsage { - ReadOnly, - WriteOnly, - ReadWrite, -} - pub(crate) const HEADER_LENGTH: usize = 4; pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23; From f5670514e5e51b356b5ea913c484edb79d6e9775 Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 13 Dec 2024 22:58:50 +0000 Subject: [PATCH 6/6] remove cfg gating and add derive copy --- gossip/src/cluster_info.rs | 149 +++++++++++++++---------------------- net-utils/src/lib.rs | 55 +++++--------- 2 files changed, 78 insertions(+), 126 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 8e0acf18e6930a..5f1945b83b8465 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2619,15 +2619,13 @@ impl Node { localhost_ip_addr, port_range, QUIC_PORT_OFFSET, - udp_config.clone(), - quic_config.clone(), + udp_config, + quic_config, ) .unwrap(); - let tpu_quic = - bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config.clone()).unwrap(); + let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config).unwrap(); let (gossip_port, (gossip, ip_echo)) = - bind_common_in_range_with_config(localhost_ip_addr, port_range, udp_config.clone()) - .unwrap(); + bind_common_in_range_with_config(localhost_ip_addr, port_range, udp_config).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); let tvu = bind_to_localhost().unwrap(); let tvu_quic = bind_to_localhost().unwrap(); @@ -2636,13 +2634,12 @@ impl Node { localhost_ip_addr, port_range, QUIC_PORT_OFFSET, - udp_config.clone(), - quic_config.clone(), + udp_config, + quic_config, ) .unwrap(); let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config.clone()) - .unwrap(); + bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config).unwrap(); let tpu_vote = bind_to_localhost().unwrap(); let tpu_vote_quic = bind_to_localhost().unwrap(); let tpu_vote_quic = @@ -2764,46 +2761,42 @@ impl Node { let socket_config = SocketConfig::default(); let socket_config_reuseport = SocketConfig::default().reuseport(true); - let (tvu_port, tvu) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + let (tvu_port, tvu) = Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (tvu_quic_port, tvu_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, port_range, QUIC_PORT_OFFSET, - socket_config.clone(), - socket_config_reuseport.clone(), + socket_config, + socket_config_reuseport, ) .unwrap(); - let tpu_quic: Vec = bind_more_with_config( - tpu_quic, - DEFAULT_QUIC_ENDPOINTS, - socket_config_reuseport.clone(), - ) - .unwrap(); + let tpu_quic: Vec = + bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, socket_config_reuseport) + .unwrap(); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, port_range, QUIC_PORT_OFFSET, - socket_config.clone(), - socket_config_reuseport.clone(), + socket_config, + socket_config_reuseport, ) .unwrap(); let tpu_forwards_quic = bind_more_with_config( tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, - socket_config_reuseport.clone(), + socket_config_reuseport, ) .unwrap(); let (tpu_vote_port, tpu_vote) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (tpu_vote_quic_port, tpu_vote_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let tpu_vote_quic: Vec = bind_more_with_config( tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, @@ -2812,20 +2805,18 @@ impl Node { .unwrap(); let (_, retransmit_socket) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); - let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); - let (_, repair_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); + let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config); + let (_, repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (serve_repair_port, serve_repair) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (serve_repair_quic_port, serve_repair_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); - let (_, broadcast) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); + let (_, broadcast) = Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (_, ancestor_hashes_requests) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (_, ancestor_hashes_requests_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); @@ -2907,41 +2898,30 @@ impl Node { let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config( bind_ip_addr, port_range, - socket_config_reuseport.clone(), + socket_config_reuseport, num_tvu_sockets.get(), ) .expect("tvu multi_bind"); let (tvu_quic_port, tvu_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); - let (tpu_port, tpu_sockets) = multi_bind_in_range_with_config( - bind_ip_addr, - port_range, - socket_config_reuseport.clone(), - 32, - ) - .expect("tpu multi_bind"); + let (tpu_port, tpu_sockets) = + multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 32) + .expect("tpu multi_bind"); let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), - socket_config_reuseport.clone(), + socket_config_reuseport, ); - let tpu_quic = bind_more_with_config( - tpu_quic, - num_quic_endpoints.get(), - socket_config_reuseport.clone(), - ) - .unwrap(); + let tpu_quic = + bind_more_with_config(tpu_quic, num_quic_endpoints.get(), socket_config_reuseport) + .unwrap(); - let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range_with_config( - bind_ip_addr, - port_range, - socket_config_reuseport.clone(), - 8, - ) - .expect("tpu_forwards multi_bind"); + let (tpu_forwards_port, tpu_forwards_sockets) = + multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 8) + .expect("tpu_forwards multi_bind"); let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config( bind_ip_addr, @@ -2949,62 +2929,49 @@ impl Node { tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), - socket_config_reuseport.clone(), + socket_config_reuseport, ); let tpu_forwards_quic = bind_more_with_config( tpu_forwards_quic, num_quic_endpoints.get(), - socket_config_reuseport.clone(), + socket_config_reuseport, ) .unwrap(); - let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range_with_config( - bind_ip_addr, - port_range, - socket_config_reuseport.clone(), - 1, - ) - .expect("tpu_vote multi_bind"); + let (tpu_vote_port, tpu_vote_sockets) = + multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 1) + .expect("tpu_vote multi_bind"); let (tpu_vote_quic_port, tpu_vote_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let tpu_vote_quic = bind_more_with_config( tpu_vote_quic, num_quic_endpoints.get(), - socket_config_reuseport.clone(), + socket_config_reuseport, ) .unwrap(); - let (_, retransmit_sockets) = multi_bind_in_range_with_config( - bind_ip_addr, - port_range, - socket_config_reuseport.clone(), - 8, - ) - .expect("retransmit multi_bind"); + let (_, retransmit_sockets) = + multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 8) + .expect("retransmit multi_bind"); - let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); - let (_, repair_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config); + let (_, repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (serve_repair_port, serve_repair) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (serve_repair_quic_port, serve_repair_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); - let (_, broadcast) = multi_bind_in_range_with_config( - bind_ip_addr, - port_range, - socket_config_reuseport.clone(), - 4, - ) - .expect("broadcast multi_bind"); + let (_, broadcast) = + multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 4) + .expect("broadcast multi_bind"); let (_, ancestor_hashes_requests) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let (_, ancestor_hashes_requests_quic) = - Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone()); + Self::bind_with_config(bind_ip_addr, port_range, socket_config); let mut info = ContactInfo::new( *pubkey, diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 3e032c53b6fae9..d4d89abf097686 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -386,12 +386,10 @@ pub fn is_host_port(string: String) -> Result<(), String> { parse_host_port(&string).map(|_| ()) } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Copy, Debug, Default)] pub struct SocketConfig { reuseport: bool, - #[cfg(not(any(windows, target_os = "ios")))] recv_buffer_size: Option, - #[cfg(not(any(windows, target_os = "ios")))] send_buffer_size: Option, } @@ -401,35 +399,25 @@ impl SocketConfig { self } - /// Sets the receive buffer size for the socket. + /// Sets the receive buffer size for the socket (no effect on windows/ios). /// /// **Note:** On Linux the kernel will double the value you specify. /// For example, if you specify `16MB`, the kernel will configure the /// socket to use `32MB`. /// See: https://man7.org/linux/man-pages/man7/socket.7.html: SO_RCVBUF - // allow here to supress unused warnings from windows/ios builds - #[allow(unused_mut, unused_variables)] pub fn recv_buffer_size(mut self, size: usize) -> Self { - #[cfg(not(any(windows, target_os = "ios")))] - { - self.recv_buffer_size = Some(size); - } + self.recv_buffer_size = Some(size); self } - /// Sets the send buffer size for the socket. + /// Sets the send buffer size for the socket (no effect on windows/ios) /// /// **Note:** On Linux the kernel will double the value you specify. /// For example, if you specify `16MB`, the kernel will configure the /// socket to use `32MB`. /// See: https://man7.org/linux/man-pages/man7/socket.7.html: SO_SNDBUF - // allow here to supress unused warnings from windows/ios builds - #[allow(unused_mut, unused_variables)] pub fn send_buffer_size(mut self, size: usize) -> Self { - #[cfg(not(any(windows, target_os = "ios")))] - { - self.send_buffer_size = Some(size); - } + self.send_buffer_size = Some(size); self } } @@ -474,7 +462,7 @@ pub fn bind_common_in_range_with_config( config: SocketConfig, ) -> io::Result<(u16, (UdpSocket, TcpListener))> { for port in range.0..range.1 { - if let Ok((sock, listener)) = bind_common_with_config(ip_addr, port, config.clone()) { + if let Ok((sock, listener)) = bind_common_with_config(ip_addr, port, config) { return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener))); } } @@ -577,7 +565,7 @@ pub fn multi_bind_in_range_with_config( }; // drop the probe, port should be available... briefly. for _ in 0..num { - let sock = bind_to_with_config(ip_addr, port, config.clone()); + let sock = bind_to_with_config(ip_addr, port, config); if let Ok(sock) = sock { sockets.push(sock); } else { @@ -735,11 +723,9 @@ pub fn bind_two_in_range_with_offset_and_config( )); } for port in range.0..range.1 { - if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config.clone()) { + if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config) { if range.1.saturating_sub(port) >= offset { - if let Ok(second_bind) = - bind_to_with_config(ip_addr, port + offset, sock2_config.clone()) - { + if let Ok(second_bind) = bind_to_with_config(ip_addr, port + offset, sock2_config) { return Ok(( (first_bind.local_addr().unwrap().port(), first_bind), (second_bind.local_addr().unwrap().port(), second_bind), @@ -788,7 +774,7 @@ pub fn bind_more_with_config( let ip = addr.ip(); let port = addr.port(); std::iter::once(Ok(socket)) - .chain((1..num).map(|_| bind_to_with_config(ip, port, config.clone()))) + .chain((1..num).map(|_| bind_to_with_config(ip, port, config))) .collect() } @@ -896,8 +882,8 @@ mod tests { assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); let config = SocketConfig::default().reuseport(true); - let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); - let y = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); + let x = bind_to_with_config(ip_addr, 2002, config).unwrap(); + let y = bind_to_with_config(ip_addr, 2002, config).unwrap(); assert_eq!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() @@ -905,8 +891,7 @@ mod tests { bind_to(ip_addr, 2002, false).unwrap_err(); bind_in_range(ip_addr, (2002, 2003)).unwrap_err(); - let (port, v) = - multi_bind_in_range_with_config(ip_addr, (2010, 2110), config.clone(), 10).unwrap(); + let (port, v) = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 10).unwrap(); for sock in &v { assert_eq!(port, sock.local_addr().unwrap().port()); } @@ -916,8 +901,8 @@ mod tests { fn test_bind_with_any_port() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); let config = SocketConfig::default(); - let x = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); - let y = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); + let x = bind_with_any_port_with_config(ip_addr, config).unwrap(); + let y = bind_with_any_port_with_config(ip_addr, config).unwrap(); assert_ne!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() @@ -950,10 +935,10 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); let config = SocketConfig::default(); let (port, _sockets) = - bind_common_in_range_with_config(ip_addr, (3100, 3150), config.clone()).unwrap(); + bind_common_in_range_with_config(ip_addr, (3100, 3150), config).unwrap(); assert!((3100..3150).contains(&port)); - bind_common_in_range_with_config(ip_addr, (port, port + 1), config.clone()).unwrap_err(); + bind_common_in_range_with_config(ip_addr, (port, port + 1), config).unwrap_err(); } #[test] @@ -985,7 +970,7 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); let config = SocketConfig::default(); let (_server_port, (server_udp_socket, server_tcp_listener)) = - bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); let (client_port, (client_udp_socket, client_tcp_listener)) = bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); @@ -1014,7 +999,7 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); let config = SocketConfig::default(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = - bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); // make the socket unreachable by not running the ip echo server! @@ -1038,7 +1023,7 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); let config = SocketConfig::default(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = - bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); // make the socket unreachable by not running the ip echo server!