Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix race conditions in sync_blocks tests #19

Merged
merged 4 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion node/actors/sync_blocks/src/peers/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ use roles::{node, validator::BlockNumber};

/// Events emitted by `PeerStates` actor. Only used for tests so far.
#[derive(Debug)]
#[allow(unused_tuple_struct_fields)] // Variant fields are only read in tests
#[allow(dead_code, unused_tuple_struct_fields)] // Variant fields are only read in tests
pub(super) enum PeerStateEvent {
/// Node has successfully downloaded the specified block.
GotBlock(BlockNumber),
/// Received an invalid block from the peer.
GotInvalidBlock {
peer_key: node::PublicKey,
block_number: BlockNumber,
},
/// Peer state was updated. Includes creating a state for a newly connected peer.
PeerUpdated(node::PublicKey),
/// Received invalid `SyncState` from a peer.
InvalidPeerUpdate(node::PublicKey),
/// Peer was disconnected (i.e., it has dropped a request).
PeerDisconnected(node::PublicKey),
}
12 changes: 11 additions & 1 deletion node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ impl PeerStates {
Ok(block_number) => block_number,
Err(err) => {
tracing::warn!(%err, "Invalid `SyncState` received from peer");
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::InvalidPeerUpdate(peer_key));
}
return Ok(BlockNumber(0));
// TODO: ban peer etc.
}
Expand Down Expand Up @@ -254,8 +257,15 @@ impl PeerStates {
"Received invalid block #{block_number} from peer {peer_key:?}"
);
// TODO: ban peer etc.
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::GotInvalidBlock {
peer_key,
block_number,
});
}
} else {
return Ok(block);
}
return Ok(block);
}
}

Expand Down
198 changes: 176 additions & 22 deletions node/actors/sync_blocks/src/peers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use concurrency::time;
use rand::{rngs::StdRng, seq::IteratorRandom, Rng};
use roles::validator;
use std::{collections::HashSet, fmt};
use storage::InMemoryStorage;
use storage::{BlockStore, InMemoryStorage};
use test_casing::{test_casing, Product};

const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5);
Expand Down Expand Up @@ -60,6 +60,28 @@ async fn wait_for_stored_block(
Ok(())
}

#[instrument(level = "trace", skip(ctx, events_receiver))]
async fn wait_for_peer_update(
ctx: &ctx::Ctx,
events_receiver: &mut channel::UnboundedReceiver<PeerStateEvent>,
expected_peer: &node::PublicKey,
) -> ctx::OrCanceled<()> {
loop {
let peer_event = events_receiver.recv(ctx).await?;
tracing::trace!(?peer_event, "received peer event");
match peer_event {
PeerStateEvent::PeerUpdated(key) => {
assert_eq!(key, *expected_peer);
return Ok(());
}
PeerStateEvent::PeerDisconnected(_) | PeerStateEvent::GotBlock(_) => {
// Skip update
}
_ => panic!("Received unexpected peer event: {peer_event:?}"),
}
}
}

#[instrument(level = "trace")]
async fn test_peer_states<T: Test>(test: T) {
concurrency::testonly::abort_on_panic();
Expand Down Expand Up @@ -291,7 +313,7 @@ impl Test for DownloadingBlocksInGaps {
peer_states_handle,
storage,
mut message_receiver,
..
mut events_receiver,
} = handles;

let peer_key = rng.gen::<node::SecretKey>().public();
Expand All @@ -304,6 +326,8 @@ impl Test for DownloadingBlocksInGaps {
peer_key.clone(),
test_validators.sync_state(last_peer_block_number),
);
wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?;
clock.advance(BLOCK_SLEEP_INTERVAL);

let expected_block_numbers =
(1..Self::BLOCK_COUNT).filter(|number| !self.local_block_numbers.contains(number));
Expand All @@ -316,6 +340,9 @@ impl Test for DownloadingBlocksInGaps {
peer_key.clone(),
test_validators.sync_state(last_peer_block_number),
);
// Wait until the update is processed.
wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?;

clock.advance(BLOCK_SLEEP_INTERVAL);
}

Expand Down Expand Up @@ -366,6 +393,7 @@ impl Test for LimitingGetBlockConcurrency {
peer_states_handle,
storage,
mut message_receiver,
mut events_receiver,
..
} = handles;
let mut storage_subscriber = storage.subscribe_to_block_writes();
Expand All @@ -375,6 +403,7 @@ impl Test for LimitingGetBlockConcurrency {
peer_key.clone(),
test_validators.sync_state(Self::BLOCK_COUNT - 1),
);
wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?;

// The actor should request 3 new blocks it's now aware of from the only peer it's currently
// aware of. Note that blocks may be queried in any order.
Expand All @@ -388,7 +417,7 @@ impl Test for LimitingGetBlockConcurrency {
assert_eq!(recipient, peer_key);
assert!(message_responses.insert(number.0, response).is_none());
}
assert!(message_receiver.try_recv().is_none());
assert_matches!(message_receiver.try_recv(), None);
assert_eq!(
message_responses.keys().copied().collect::<HashSet<_>>(),
HashSet::from([1, 2, 3])
Expand Down Expand Up @@ -439,66 +468,86 @@ impl Test for RequestingBlocksFromTwoPeers {
peer_states_handle,
storage,
mut message_receiver,
..
mut events_receiver,
} = handles;

let first_peer = rng.gen::<node::SecretKey>().public();
peer_states_handle.update(first_peer.clone(), test_validators.sync_state(2));
wait_for_peer_update(ctx, &mut events_receiver, &first_peer).await?;

let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient,
number,
number: first_peer_block_number,
response: first_peer_response,
}) = message_receiver.recv(ctx).await?;
assert_eq!(recipient, first_peer);
assert_eq!(number, BlockNumber(1));
assert!(
first_peer_block_number == BlockNumber(1) || first_peer_block_number == BlockNumber(2)
);

let second_peer = rng.gen::<node::SecretKey>().public();
peer_states_handle.update(second_peer.clone(), test_validators.sync_state(4));
wait_for_peer_update(ctx, &mut events_receiver, &second_peer).await?;
clock.advance(BLOCK_SLEEP_INTERVAL);

let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient,
number,
number: second_peer_block_number,
response: second_peer_response,
}) = message_receiver.recv(ctx).await?;
assert_eq!(recipient, second_peer);
assert_eq!(number, BlockNumber(2));
assert!(
second_peer_block_number == BlockNumber(1)
|| second_peer_block_number == BlockNumber(2)
);

test_validators.send_block(BlockNumber(1), first_peer_response);
test_validators.send_block(first_peer_block_number, first_peer_response);
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number);
// The node shouldn't send more requests to the first peer since it would be beyond
// its known latest block number (2).
clock.advance(BLOCK_SLEEP_INTERVAL);
assert!(message_receiver.try_recv().is_none());
assert_matches!(message_receiver.try_recv(), None);

peer_states_handle.update(first_peer.clone(), test_validators.sync_state(4));
wait_for_peer_update(ctx, &mut events_receiver, &first_peer).await?;
clock.advance(BLOCK_SLEEP_INTERVAL);
// Now the actor can get block #3 from the peer.

let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient,
number,
number: first_peer_block_number,
response: first_peer_response,
}) = message_receiver.recv(ctx).await?;
assert_eq!(recipient, first_peer);
assert_eq!(number, BlockNumber(3));
assert!(
first_peer_block_number == BlockNumber(3) || first_peer_block_number == BlockNumber(4)
);

test_validators.send_block(BlockNumber(3), first_peer_response);
test_validators.send_block(first_peer_block_number, first_peer_response);
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number);
clock.advance(BLOCK_SLEEP_INTERVAL);

let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient,
number,
number: first_peer_block_number,
response: first_peer_response,
}) = message_receiver.recv(ctx).await?;
assert_eq!(recipient, first_peer);
assert_eq!(number, BlockNumber(4));
assert!(
first_peer_block_number == BlockNumber(3) || first_peer_block_number == BlockNumber(4)
);

test_validators.send_block(BlockNumber(2), second_peer_response);
test_validators.send_block(BlockNumber(4), first_peer_response);
test_validators.send_block(second_peer_block_number, second_peer_response);
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == second_peer_block_number);
test_validators.send_block(first_peer_block_number, first_peer_response);
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number);
// No more blocks should be requested from peers.
clock.advance(BLOCK_SLEEP_INTERVAL);
assert!(message_receiver.try_recv().is_none());
assert_matches!(message_receiver.try_recv(), None);

wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(4)).await?;
Ok(())
Expand Down Expand Up @@ -659,6 +708,7 @@ impl Test for RequestingBlocksFromMultiplePeers {
clock.advance(BLOCK_SLEEP_INTERVAL);
}
PeerStateEvent::PeerDisconnected(_) => { /* Do nothing */ }
_ => panic!("Unexpected peer event: {peer_event:?}"),
}
}

Expand Down Expand Up @@ -714,7 +764,7 @@ impl Test for DisconnectingPeer {

// Check that no new requests are sent (there are no peers to send them to).
clock.advance(BLOCK_SLEEP_INTERVAL);
assert!(message_receiver.try_recv().is_none());
assert_matches!(message_receiver.try_recv(), None);

// Re-connect the peer with an updated state.
peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2));
Expand All @@ -740,16 +790,16 @@ impl Test for DisconnectingPeer {
// Send one of the responses and drop the other request.
let response = responses.remove(&2).unwrap();
test_validators.send_block(BlockNumber(2), response);
drop(responses);

let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(2)));
drop(responses);
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::PeerDisconnected(key) if key == peer_key);

// Check that no new requests are sent (there are no peers to send them to).
clock.advance(BLOCK_SLEEP_INTERVAL);
assert!(message_receiver.try_recv().is_none());
assert_matches!(message_receiver.try_recv(), None);

// Re-connect the peer with the same state.
peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2));
Expand All @@ -772,7 +822,7 @@ impl Test for DisconnectingPeer {

// Check that no new requests are sent (all blocks are downloaded).
clock.advance(BLOCK_SLEEP_INTERVAL);
assert!(message_receiver.try_recv().is_none());
assert_matches!(message_receiver.try_recv(), None);

wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(2)).await?;
Ok(())
Expand Down Expand Up @@ -888,3 +938,107 @@ async fn processing_invalid_blocks() {
.unwrap_err();
assert_matches!(err, BlockValidationError::Justification(_));
}

#[derive(Debug)]
struct PeerWithFakeSyncState;

#[async_trait]
impl Test for PeerWithFakeSyncState {
const BLOCK_COUNT: usize = 10;

async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> {
let TestHandles {
clock,
mut rng,
test_validators,
peer_states_handle,
mut events_receiver,
..
} = handles;

let peer_key = rng.gen::<node::SecretKey>().public();
let mut fake_sync_state = test_validators.sync_state(1);
fake_sync_state
.last_contiguous_stored_block
.message
.proposal
.number = BlockNumber(42);
peer_states_handle.update(peer_key.clone(), fake_sync_state);
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::InvalidPeerUpdate(key) if key == peer_key);

clock.advance(BLOCK_SLEEP_INTERVAL);
assert_matches!(events_receiver.try_recv(), None);
Ok(())
}
}

#[tokio::test]
async fn receiving_fake_sync_state_from_peer() {
test_peer_states(PeerWithFakeSyncState).await;
}

#[derive(Debug)]
struct PeerWithFakeBlock;

#[async_trait]
impl Test for PeerWithFakeBlock {
const BLOCK_COUNT: usize = 10;

async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> {
let TestHandles {
clock,
mut rng,
test_validators,
peer_states_handle,
storage,
mut message_receiver,
mut events_receiver,
} = handles;

let peer_key = rng.gen::<node::SecretKey>().public();
peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1));
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key);

let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient,
number,
response,
}) = message_receiver.recv(ctx).await?;
assert_eq!(recipient, peer_key);
assert_eq!(number, BlockNumber(1));

let mut fake_block = test_validators.final_blocks[2].clone();
fake_block.header.number = BlockNumber(1);
response.send(Ok(fake_block)).unwrap();

let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(
peer_event,
PeerStateEvent::GotInvalidBlock {
block_number: BlockNumber(1),
peer_key: key,
} if key == peer_key
);
clock.advance(BLOCK_SLEEP_INTERVAL);

// The invalid block must not be saved.
assert_matches!(events_receiver.try_recv(), None);
assert!(storage.block(ctx, BlockNumber(1)).await?.is_none());

// Since we don't ban misbehaving peers, the node will send a request to the same peer again.
let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient, number, ..
}) = message_receiver.recv(ctx).await?;
assert_eq!(recipient, peer_key);
assert_eq!(number, BlockNumber(1));

Ok(())
}
}

#[tokio::test]
async fn receiving_fake_block_from_peer() {
test_peer_states(PeerWithFakeBlock).await;
}
Loading