Skip to content

Commit

Permalink
Use localhost lookup in reserve_listener()
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Nov 2, 2023
1 parent 470e75d commit 3be8165
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 56 deletions.
1 change: 1 addition & 0 deletions node/actors/consensus/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl Test {
pub(crate) async fn run(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let rng = &mut ctx.rng();
let nodes: Vec<_> = network::testonly::Instance::new(rng, self.nodes.len(), 1)
.await
.into_iter()
.enumerate()
.map(|(i, net)| Node {
Expand Down
12 changes: 6 additions & 6 deletions node/actors/executor/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pub struct FullValidatorConfig {

impl FullValidatorConfig {
/// Generates a validator config for a network with a single validator.
pub fn for_single_validator(rng: &mut impl Rng, genesis_block_payload: Payload) -> Self {
let mut net_configs = Instance::new_configs(rng, 1, 0);
pub async fn for_single_validator(rng: &mut impl Rng, genesis_block_payload: Payload) -> Self {
let mut net_configs = Instance::new_configs(rng, 1, 0).await;
assert_eq!(net_configs.len(), 1);
let net_config = net_configs.pop().unwrap();
let consensus_config = net_config.consensus.unwrap();
Expand All @@ -55,8 +55,8 @@ impl FullValidatorConfig {
}

/// Creates a new external node and configures this validator to accept incoming connections from it.
pub fn connect_external_node(&mut self, rng: &mut impl Rng) -> ExternalNodeConfig {
let external_node_config = ExternalNodeConfig::new(rng, self);
pub async fn connect_external_node(&mut self, rng: &mut impl Rng) -> ExternalNodeConfig {
let external_node_config = ExternalNodeConfig::new(rng, self).await;
self.node_config
.gossip
.static_inbound
Expand All @@ -76,9 +76,9 @@ pub struct ExternalNodeConfig {
}

impl ExternalNodeConfig {
fn new(rng: &mut impl Rng, validator: &FullValidatorConfig) -> Self {
async fn new(rng: &mut impl Rng, validator: &FullValidatorConfig) -> Self {
let node_key: node::SecretKey = rng.gen();
let external_node_addr = net::tcp::testonly::reserve_listener();
let external_node_addr = net::tcp::testonly::reserve_listener().await;
let node_config = ExecutorConfig {
server_addr: *external_node_addr,
gossip: GossipConfig {
Expand Down
6 changes: 3 additions & 3 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn executing_single_validator() {
let ctx = &ctx::root();
let rng = &mut ctx.rng();

let validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![]));
let validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![])).await;
let genesis_block = &validator.node_config.genesis_block;
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = Arc::new(storage);
Expand Down Expand Up @@ -90,8 +90,8 @@ async fn executing_validator_and_external_node() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();

let mut validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![]));
let external_node = validator.connect_external_node(rng);
let mut validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![])).await;
let external_node = validator.connect_external_node(rng).await;

let genesis_block = &validator.node_config.genesis_block;
let validator_storage = InMemoryStorage::new(genesis_block.clone());
Expand Down
12 changes: 6 additions & 6 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ async fn test_one_connection_per_validator() {
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let mut nodes = testonly::Instance::new(rng, 3, 1);
let mut nodes = testonly::Instance::new(rng, 3, 1).await;

scope::run!(ctx, |ctx,s| async {
for (i,n) in nodes.iter().enumerate() {
for (i, node) in nodes.iter().enumerate() {
let (network_pipe, _) = pipe::new();

s.spawn_bg(run_network(
ctx,
n.state.clone(),
node.state.clone(),
network_pipe
).instrument(tracing::info_span!("node", i)));
}
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn test_address_change() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.));
let rng = &mut ctx.rng();

let mut nodes = testonly::Instance::new(rng, 5, 1);
let mut nodes = testonly::Instance::new(rng, 5, 1).await;
scope::run!(ctx, |ctx, s| async {
for (i, n) in nodes.iter().enumerate().skip(1) {
let (pipe, _) = pipe::new();
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn test_address_change() {
// Then it should broadcast its new address and the consensus network
// should get reconstructed.
let mut cfg = nodes[0].to_config();
cfg.server_addr = net::tcp::testonly::reserve_listener();
cfg.server_addr = net::tcp::testonly::reserve_listener().await;
cfg.consensus.as_mut().unwrap().public_addr = *cfg.server_addr;
nodes[0] = testonly::Instance::from_cfg(cfg);

Expand Down Expand Up @@ -148,7 +148,7 @@ async fn test_transmission() {
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let mut nodes = testonly::Instance::new(rng, 2, 1);
let mut nodes = testonly::Instance::new(rng, 2, 1).await;

scope::run!(ctx, |ctx, s| async {
let mut pipes = vec![];
Expand Down
18 changes: 9 additions & 9 deletions node/actors/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ async fn test_one_connection_per_node() {
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let mut nodes: Vec<_> = testonly::Instance::new(rng, 5, 2);
let mut nodes: Vec<_> = testonly::Instance::new(rng, 5, 2).await;

scope::run!(ctx, |ctx,s| async {
for n in &nodes {
for node in &nodes {
let (network_pipe, _) = pipe::new();

s.spawn_bg(run_network(
ctx,
n.state.clone(),
node.state.clone(),
network_pipe
));
}
Expand Down Expand Up @@ -236,7 +236,7 @@ async fn test_validator_addrs_propagation() {
concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(40.));
let rng = &mut ctx.rng();
let nodes: Vec<_> = testonly::Instance::new(rng, 10, 1);
let nodes: Vec<_> = testonly::Instance::new(rng, 10, 1).await;

scope::run!(ctx, |ctx, s| async {
for n in &nodes {
Expand Down Expand Up @@ -282,7 +282,7 @@ async fn syncing_blocks(node_count: usize, gossip_peers: usize) {
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let ctx = &ctx.with_timeout(time::Duration::seconds(200));
let rng = &mut ctx.rng();
let mut nodes = testonly::Instance::new(rng, node_count, gossip_peers);
let mut nodes = testonly::Instance::new(rng, node_count, gossip_peers).await;
let network_state = NetworkState::new(rng, &mut nodes);

scope::run!(ctx, |ctx, s| async {
Expand Down Expand Up @@ -403,7 +403,7 @@ async fn uncoordinated_block_syncing(
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let ctx = &ctx.with_timeout(time::Duration::seconds(200));
let rng = &mut ctx.rng();
let mut nodes = testonly::Instance::new(rng, node_count, gossip_peers);
let mut nodes = testonly::Instance::new(rng, node_count, gossip_peers).await;

let mut states: Vec<_> = (0..EXCHANGED_STATE_COUNT)
.map(|i| io::SyncState::gen(rng, BlockNumber(i)))
Expand Down Expand Up @@ -487,7 +487,7 @@ async fn getting_blocks_from_peers(node_count: usize, gossip_peers: usize) {
let ctx = &ctx::test_root(&ctx::RealClock).with_timeout(TEST_TIMEOUT);
let ctx = &ctx::test_with_clock(ctx, &ctx::ManualClock::new());
let rng = &mut ctx.rng();
let mut nodes = testonly::Instance::new(rng, node_count, gossip_peers);
let mut nodes = testonly::Instance::new(rng, node_count, gossip_peers).await;
for node in &mut nodes {
node.disable_gossip_pings();
}
Expand Down Expand Up @@ -607,7 +607,7 @@ async fn validator_node_restart() {
let sec = time::Duration::seconds(1);

scope::run!(ctx, |ctx, s| async {
let mut cfgs = testonly::Instance::new_configs(rng, 2, 1);
let mut cfgs = testonly::Instance::new_configs(rng, 2, 1).await;
let mut node1 = testonly::Instance::from_cfg(cfgs[1].clone());
let (pipe, _) = pipe::new();
s.spawn_bg(
Expand Down Expand Up @@ -689,7 +689,7 @@ async fn rate_limiting() {

// construct star topology.
let n = 10;
let mut cfgs = testonly::Instance::new_configs(rng, n, 0);
let mut cfgs = testonly::Instance::new_configs(rng, n, 0).await;
let want: HashMap<_, _> = cfgs
.iter()
.map(|cfg| {
Expand Down
45 changes: 23 additions & 22 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,29 @@ pub struct Instance {

impl Instance {
/// Construct configs for `n` validators of the consensus.
pub fn new_configs<R: Rng>(rng: &mut R, n: usize, gossip_peers: usize) -> Vec<Config> {
pub async fn new_configs<R: Rng>(rng: &mut R, n: usize, gossip_peers: usize) -> Vec<Config> {
let keys: Vec<validator::SecretKey> = (0..n).map(|_| rng.gen()).collect();
let validators = validator::ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap();
let mut cfgs: Vec<_> = (0..n)
.map(|i| {
let addr = net::tcp::testonly::reserve_listener();
Config {
server_addr: addr,
validators: validators.clone(),
consensus: Some(consensus::Config {
key: keys[i].clone(),
public_addr: *addr,
}),
gossip: gossip::Config {
key: rng.gen(),
dynamic_inbound_limit: n as u64,
static_inbound: HashSet::default(),
static_outbound: HashMap::default(),
enable_pings: true,
},
}
})
.collect();
let mut cfgs = Vec::with_capacity(n);
for key in &keys {
let addr = net::tcp::testonly::reserve_listener().await;
cfgs.push(Config {
server_addr: addr,
validators: validators.clone(),
consensus: Some(consensus::Config {
key: key.clone(),
public_addr: *addr,
}),
gossip: gossip::Config {
key: rng.gen(),
dynamic_inbound_limit: n as u64,
static_inbound: HashSet::default(),
static_outbound: HashMap::default(),
enable_pings: true,
},
});
}

for i in 0..cfgs.len() {
for j in 0..gossip_peers {
let j = (i + j + 1) % n;
Expand All @@ -95,8 +95,9 @@ impl Instance {

/// Constructs `n` node instances, configured to connect to each other.
/// Non-blocking (it doesn't run the network actors).
pub fn new<R: Rng>(rng: &mut R, n: usize, gossip_peers: usize) -> Vec<Self> {
pub async fn new<R: Rng>(rng: &mut R, n: usize, gossip_peers: usize) -> Vec<Self> {
Self::new_configs(rng, n, gossip_peers)
.await
.into_iter()
.map(Self::from_cfg)
.collect()
Expand Down
6 changes: 3 additions & 3 deletions node/actors/network/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ async fn test_metrics() {
concurrency::testonly::abort_on_panic();
let ctx = &mut ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let nodes = testonly::Instance::new(rng, 3, 1);
let nodes = testonly::Instance::new(rng, 3, 1).await;
scope::run!(ctx, |ctx, s| async {
for (i, n) in nodes.iter().enumerate() {
for (i, node) in nodes.iter().enumerate() {
let (network_pipe, _) = pipe::new();
s.spawn_bg(
run_network(ctx, n.state.clone(), network_pipe)
run_network(ctx, node.state.clone(), network_pipe)
.instrument(tracing::info_span!("node", i)),
);
}
Expand Down
6 changes: 3 additions & 3 deletions node/actors/sync_blocks/src/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ struct GossipNetwork<H = NodeHandle> {
}

impl GossipNetwork<InitialNodeHandle> {
fn new(rng: &mut impl Rng, node_count: usize, gossip_peers: usize) -> (Self, Vec<Node>) {
async fn new(rng: &mut impl Rng, node_count: usize, gossip_peers: usize) -> (Self, Vec<Node>) {
let test_validators = TestValidators::new(4, 20, rng);
let nodes = NetworkInstance::new(rng, node_count, gossip_peers);
let nodes = NetworkInstance::new(rng, node_count, gossip_peers).await;
let (nodes, node_handles) = nodes.into_iter().map(Node::new).unzip();
let this = Self {
test_validators,
Expand All @@ -233,7 +233,7 @@ async fn test_sync_blocks<T: GossipNetworkTest>(test: T) {
let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64))
.with_timeout(TEST_TIMEOUT * CLOCK_SPEEDUP);
let (node_count, gossip_peers) = test.network_params();
let (network, nodes) = GossipNetwork::new(&mut ctx.rng(), node_count, gossip_peers);
let (network, nodes) = GossipNetwork::new(&mut ctx.rng(), node_count, gossip_peers).await;
scope::run!(ctx, |ctx, s| async {
for node in nodes {
let test_validators = network.test_validators.clone();
Expand Down
19 changes: 15 additions & 4 deletions node/libs/concurrency/src/net/tcp/testonly.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
//! Test-only TCP utilities.
use super::{accept, connect, ListenerAddr, Stream, RESERVED_LISTENER_ADDRS};
use crate::{ctx, scope};
use std::net::SocketAddr;

/// Reserves a random port on localhost for a TCP listener.
pub fn reserve_listener() -> ListenerAddr {
let guard = tokio::net::TcpSocket::new_v6().unwrap();
pub async fn reserve_listener() -> ListenerAddr {
let mut localhost_addrs = tokio::net::lookup_host(("localhost", 0))
.await
.expect("failed `localhost` lookup");
let localhost_addr = localhost_addrs
.next()
.expect("`localhost` did not resolve to address");

let guard = match &localhost_addr {
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4().unwrap(),
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6().unwrap(),
};
guard.set_reuseaddr(true).unwrap();
guard.set_reuseport(true).unwrap();
guard.bind("[::1]:0".parse().unwrap()).unwrap();
guard.bind(localhost_addr).unwrap();
let addr = guard.local_addr().unwrap();
RESERVED_LISTENER_ADDRS.lock().unwrap().insert(addr, guard);
ListenerAddr(addr)
}

/// Establishes a loopback TCP connection.
pub async fn pipe(ctx: &ctx::Ctx) -> (Stream, Stream) {
let addr = reserve_listener();
let addr = reserve_listener().await;
scope::run!(ctx, |ctx, s| async {
let mut listener = addr.bind()?;
let s1 = s.spawn(async { connect(ctx, *addr).await.unwrap() });
Expand Down

0 comments on commit 3be8165

Please sign in to comment.