diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index dd6cf9fb0768a8..854779fe462415 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -72,6 +72,7 @@ use { }, solana_streamer::{ packet, + quic::DEFAULT_QUIC_ENDPOINTS, socket::SocketAddrSpace, streamer::{PacketBatchReceiver, PacketBatchSender}, }, @@ -2897,11 +2898,10 @@ pub struct NodeConfig { pub public_tpu_forwards_addr: Option, /// The number of TVU sockets to create pub num_tvu_sockets: NonZeroUsize, + /// The number of QUIC tpu endpoints + pub num_quic_endpoints: NonZeroUsize, } -// This will be adjusted and parameterized in follow-on PRs. -const QUIC_ENDPOINTS: usize = 1; - #[derive(Debug)] pub struct Node { pub info: ContactInfo, @@ -2913,7 +2913,15 @@ impl Node { let pubkey = solana_sdk::pubkey::new_rand(); Self::new_localhost_with_pubkey(&pubkey) } + pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self { + Self::new_localhost_with_pubkey_and_quic_endpoints(pubkey, DEFAULT_QUIC_ENDPOINTS) + } + + pub fn new_localhost_with_pubkey_and_quic_endpoints( + pubkey: &Pubkey, + num_quic_endpoints: usize, + ) -> Self { let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST); let localhost_bind_addr = format!("{localhost_ip_addr:?}:0"); let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED)); @@ -2931,7 +2939,7 @@ impl Node { ) .unwrap(); let tpu_quic = - bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); @@ -2947,7 +2955,7 @@ impl Node { ) .unwrap(); let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); + bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config).unwrap(); let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); @@ -3068,7 +3076,7 @@ impl Node { ) .unwrap(); let tpu_quic = - bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, @@ -3079,7 +3087,7 @@ impl Node { ) .unwrap(); let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); + bind_more_with_config(tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap(); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); @@ -3151,6 +3159,7 @@ impl Node { public_tpu_addr, public_tpu_forwards_addr, num_tvu_sockets, + num_quic_endpoints, } = config; let (gossip_port, (gossip, ip_echo)) = @@ -3170,7 +3179,7 @@ impl Node { quic_config.clone(), ); let tpu_quic = - bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, num_quic_endpoints.get(), quic_config.clone()).unwrap(); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); @@ -3183,8 +3192,12 @@ impl Node { ), quic_config.clone(), ); - let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + let tpu_forwards_quic = bind_more_with_config( + tpu_forwards_quic, + num_quic_endpoints.get(), + quic_config.clone(), + ) + .unwrap(); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); @@ -3399,6 +3412,8 @@ mod tests { sync::Arc, }, }; + const DEFAULT_NUM_QUIC_ENDPOINTS: NonZeroUsize = + unsafe { NonZeroUsize::new_unchecked(DEFAULT_QUIC_ENDPOINTS) }; #[test] fn test_gossip_node() { @@ -3813,6 +3828,7 @@ mod tests { public_tpu_addr: None, public_tpu_forwards_addr: None, num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS, + num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS, }; let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config); @@ -3835,6 +3851,7 @@ mod tests { public_tpu_addr: None, public_tpu_forwards_addr: None, num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS, + num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS, }; let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config); diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 7871622e234071..0604973526fb55 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -266,6 +266,9 @@ async fn run_server( max_unstaked_connections, max_streams_per_ms, )); + stats + .quic_endpoints_count + .store(incoming.len(), Ordering::Relaxed); let staked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new())); let (sender, receiver) = async_unbounded(); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index a9309cca925829..08bcf7cd3f7f00 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -28,6 +28,9 @@ use { pub const MAX_STAKED_CONNECTIONS: usize = 2000; pub const MAX_UNSTAKED_CONNECTIONS: usize = 500; +// This will be adjusted and parameterized in follow-on PRs. +pub const DEFAULT_QUIC_ENDPOINTS: usize = 1; + pub struct SkipClientVerification; impl SkipClientVerification { @@ -197,6 +200,7 @@ pub struct StreamerStats { pub(crate) connection_rate_limiter_length: AtomicUsize, pub(crate) outstanding_incoming_connection_attempts: AtomicUsize, pub(crate) total_incoming_connection_attempts: AtomicUsize, + pub(crate) quic_endpoints_count: AtomicUsize, } impl StreamerStats { @@ -537,6 +541,11 @@ impl StreamerStats { .load(Ordering::Relaxed), i64 ), + ( + "quic_endpoints_count", + self.quic_endpoints_count.load(Ordering::Relaxed), + i64 + ), ); } } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index acdedaca0799a1..0f0b4170392631 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -47,6 +47,7 @@ use { solana_send_transaction_service::send_transaction_service::{ self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE, }, + solana_streamer::quic::DEFAULT_QUIC_ENDPOINTS, solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, solana_unified_scheduler_pool::DefaultSchedulerPool, std::{path::PathBuf, str::FromStr}, @@ -903,6 +904,17 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .hidden(hidden_unless_forced()) .help("Controls the rate of the clients connections per IpAddr per minute."), ) + .arg( + Arg::with_name("num_quic_endpoints") + .long("num-quic-endpoints") + .takes_value(true) + .default_value(&default_args.num_quic_endpoints) + .validator(is_parsable::) + .hidden(hidden_unless_forced()) + .help("The number of QUIC endpoints used for TPU and TPU-Forward. It can be increased to \ + increase network ingest throughput, at the expense of higher CPU and general \ + validator load."), + ) .arg( Arg::with_name("staked_nodes_overrides") .long("staked-nodes-overrides") @@ -2213,6 +2225,7 @@ pub struct DefaultArgs { pub accounts_shrink_ratio: String, pub tpu_connection_pool_size: String, pub tpu_max_connections_per_ipaddr_per_minute: String, + pub num_quic_endpoints: String, // Exit subcommand pub exit_min_idle_time: String, @@ -2304,6 +2317,7 @@ impl DefaultArgs { tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE.to_string(), tpu_max_connections_per_ipaddr_per_minute: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE.to_string(), + num_quic_endpoints: DEFAULT_QUIC_ENDPOINTS.to_string(), rpc_max_request_body_size: MAX_REQUEST_BODY_SIZE.to_string(), exit_min_idle_time: "10".to_string(), exit_max_delinquent_stake: "5".to_string(), diff --git a/validator/src/main.rs b/validator/src/main.rs index 4248f28c514572..45c35a43ef5392 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1946,6 +1946,7 @@ pub fn main() { }) }); + let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize); let node_config = NodeConfig { gossip_addr, port_range: dynamic_port_range, @@ -1953,6 +1954,7 @@ pub fn main() { public_tpu_addr, public_tpu_forwards_addr, num_tvu_sockets: tvu_receive_threads, + num_quic_endpoints, }; let cluster_entrypoints = entrypoint_addrs