Skip to content

Commit

Permalink
Make quic endpoints count configurable (#2035)
Browse files Browse the repository at this point in the history
* Make number of quic_endpoints configurable

* Fixed unit test errors

* moved DEFAULT_QUIC_ENDPOINTS to quic::streamer

* make tpu fwd to use the configured count of quic endpoints as well

* show quic_endpoints count in stats

* Addressed a few comments from Alessandro

* use unwrap instead of expect as windows clippy complains about it

* reverted changes to NonZeroUsize const init

* argument name for tpu_num_quic_endpoints --> num_quic_endpoints

---------

Co-authored-by: Lijun Wang <[email protected]>
  • Loading branch information
lijunwangs and Lijun Wang authored Jul 18, 2024
1 parent a71c8f7 commit 5f4a08c
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 10 deletions.
37 changes: 27 additions & 10 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use {
},
solana_streamer::{
packet,
quic::DEFAULT_QUIC_ENDPOINTS,
socket::SocketAddrSpace,
streamer::{PacketBatchReceiver, PacketBatchSender},
},
Expand Down Expand Up @@ -2897,11 +2898,10 @@ pub struct NodeConfig {
pub public_tpu_forwards_addr: Option<SocketAddr>,
/// The number of TVU sockets to create
pub num_tvu_sockets: NonZeroUsize,
/// The number of QUIC tpu endpoints
pub num_quic_endpoints: NonZeroUsize,
}

// This will be adjusted and parameterized in follow-on PRs.
const QUIC_ENDPOINTS: usize = 1;

#[derive(Debug)]
pub struct Node {
pub info: ContactInfo,
Expand All @@ -2913,7 +2913,15 @@ impl Node {
let pubkey = solana_sdk::pubkey::new_rand();
Self::new_localhost_with_pubkey(&pubkey)
}

pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
Self::new_localhost_with_pubkey_and_quic_endpoints(pubkey, DEFAULT_QUIC_ENDPOINTS)
}

pub fn new_localhost_with_pubkey_and_quic_endpoints(
pubkey: &Pubkey,
num_quic_endpoints: usize,
) -> Self {
let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
let localhost_bind_addr = format!("{localhost_ip_addr:?}:0");
let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED));
Expand All @@ -2931,7 +2939,7 @@ impl Node {
)
.unwrap();
let tpu_quic =
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config.clone()).unwrap();
let (gossip_port, (gossip, ip_echo)) =
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
Expand All @@ -2947,7 +2955,7 @@ impl Node {
)
.unwrap();
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap();
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config).unwrap();
let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
let repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
Expand Down Expand Up @@ -3068,7 +3076,7 @@ impl Node {
)
.unwrap();
let tpu_quic =
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap();
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
Expand All @@ -3079,7 +3087,7 @@ impl Node {
)
.unwrap();
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap();
bind_more_with_config(tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap();
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
let (_, repair) = Self::bind(bind_ip_addr, port_range);
Expand Down Expand Up @@ -3151,6 +3159,7 @@ impl Node {
public_tpu_addr,
public_tpu_forwards_addr,
num_tvu_sockets,
num_quic_endpoints,
} = config;

let (gossip_port, (gossip, ip_echo)) =
Expand All @@ -3170,7 +3179,7 @@ impl Node {
quic_config.clone(),
);
let tpu_quic =
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
bind_more_with_config(tpu_quic, num_quic_endpoints.get(), quic_config.clone()).unwrap();

let (tpu_forwards_port, tpu_forwards_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
Expand All @@ -3183,8 +3192,12 @@ impl Node {
),
quic_config.clone(),
);
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
let tpu_forwards_quic = bind_more_with_config(
tpu_forwards_quic,
num_quic_endpoints.get(),
quic_config.clone(),
)
.unwrap();

let (tpu_vote_port, tpu_vote_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");
Expand Down Expand Up @@ -3399,6 +3412,8 @@ mod tests {
sync::Arc,
},
};
const DEFAULT_NUM_QUIC_ENDPOINTS: NonZeroUsize =
unsafe { NonZeroUsize::new_unchecked(DEFAULT_QUIC_ENDPOINTS) };

#[test]
fn test_gossip_node() {
Expand Down Expand Up @@ -3813,6 +3828,7 @@ mod tests {
public_tpu_addr: None,
public_tpu_forwards_addr: None,
num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS,
num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS,
};

let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config);
Expand All @@ -3835,6 +3851,7 @@ mod tests {
public_tpu_addr: None,
public_tpu_forwards_addr: None,
num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS,
num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS,
};

let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config);
Expand Down
3 changes: 3 additions & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ async fn run_server(
max_unstaked_connections,
max_streams_per_ms,
));
stats
.quic_endpoints_count
.store(incoming.len(), Ordering::Relaxed);
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
let (sender, receiver) = async_unbounded();
Expand Down
9 changes: 9 additions & 0 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ use {
pub const MAX_STAKED_CONNECTIONS: usize = 2000;
pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;

// This will be adjusted and parameterized in follow-on PRs.
pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;

pub struct SkipClientVerification;

impl SkipClientVerification {
Expand Down Expand Up @@ -197,6 +200,7 @@ pub struct StreamerStats {
pub(crate) connection_rate_limiter_length: AtomicUsize,
pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
pub(crate) total_incoming_connection_attempts: AtomicUsize,
pub(crate) quic_endpoints_count: AtomicUsize,
}

impl StreamerStats {
Expand Down Expand Up @@ -537,6 +541,11 @@ impl StreamerStats {
.load(Ordering::Relaxed),
i64
),
(
"quic_endpoints_count",
self.quic_endpoints_count.load(Ordering::Relaxed),
i64
),
);
}
}
Expand Down
14 changes: 14 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use {
solana_send_transaction_service::send_transaction_service::{
self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE,
},
solana_streamer::quic::DEFAULT_QUIC_ENDPOINTS,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_unified_scheduler_pool::DefaultSchedulerPool,
std::{path::PathBuf, str::FromStr},
Expand Down Expand Up @@ -903,6 +904,17 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.hidden(hidden_unless_forced())
.help("Controls the rate of the clients connections per IpAddr per minute."),
)
.arg(
Arg::with_name("num_quic_endpoints")
.long("num-quic-endpoints")
.takes_value(true)
.default_value(&default_args.num_quic_endpoints)
.validator(is_parsable::<usize>)
.hidden(hidden_unless_forced())
.help("The number of QUIC endpoints used for TPU and TPU-Forward. It can be increased to \
increase network ingest throughput, at the expense of higher CPU and general \
validator load."),
)
.arg(
Arg::with_name("staked_nodes_overrides")
.long("staked-nodes-overrides")
Expand Down Expand Up @@ -2213,6 +2225,7 @@ pub struct DefaultArgs {
pub accounts_shrink_ratio: String,
pub tpu_connection_pool_size: String,
pub tpu_max_connections_per_ipaddr_per_minute: String,
pub num_quic_endpoints: String,

// Exit subcommand
pub exit_min_idle_time: String,
Expand Down Expand Up @@ -2304,6 +2317,7 @@ impl DefaultArgs {
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE.to_string(),
tpu_max_connections_per_ipaddr_per_minute:
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE.to_string(),
num_quic_endpoints: DEFAULT_QUIC_ENDPOINTS.to_string(),
rpc_max_request_body_size: MAX_REQUEST_BODY_SIZE.to_string(),
exit_min_idle_time: "10".to_string(),
exit_max_delinquent_stake: "5".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1946,13 +1946,15 @@ pub fn main() {
})
});

let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
let node_config = NodeConfig {
gossip_addr,
port_range: dynamic_port_range,
bind_ip_addr: bind_address,
public_tpu_addr,
public_tpu_forwards_addr,
num_tvu_sockets: tvu_receive_threads,
num_quic_endpoints,
};

let cluster_entrypoints = entrypoint_addrs
Expand Down

0 comments on commit 5f4a08c

Please sign in to comment.