diff --git a/node/actors/sync_blocks/src/peers/events.rs b/node/actors/sync_blocks/src/peers/events.rs index 84e8eb09..97980d2a 100644 --- a/node/actors/sync_blocks/src/peers/events.rs +++ b/node/actors/sync_blocks/src/peers/events.rs @@ -4,12 +4,19 @@ use roles::{node, validator::BlockNumber}; /// Events emitted by `PeerStates` actor. Only used for tests so far. #[derive(Debug)] -#[allow(unused_tuple_struct_fields)] // Variant fields are only read in tests +#[allow(dead_code, unused_tuple_struct_fields)] // Variant fields are only read in tests pub(super) enum PeerStateEvent { /// Node has successfully downloaded the specified block. GotBlock(BlockNumber), + /// Received an invalid block from the peer. + GotInvalidBlock { + peer_key: node::PublicKey, + block_number: BlockNumber, + }, /// Peer state was updated. Includes creating a state for a newly connected peer. PeerUpdated(node::PublicKey), + /// Received invalid `SyncState` from a peer. + InvalidPeerUpdate(node::PublicKey), /// Peer was disconnected (i.e., it has dropped a request). PeerDisconnected(node::PublicKey), } diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index f81cf469..4485a386 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -156,6 +156,9 @@ impl PeerStates { Ok(block_number) => block_number, Err(err) => { tracing::warn!(%err, "Invalid `SyncState` received from peer"); + if let Some(events_sender) = &self.events_sender { + events_sender.send(PeerStateEvent::InvalidPeerUpdate(peer_key)); + } return Ok(BlockNumber(0)); // TODO: ban peer etc. } @@ -254,8 +257,15 @@ impl PeerStates { "Received invalid block #{block_number} from peer {peer_key:?}" ); // TODO: ban peer etc. + if let Some(events_sender) = &self.events_sender { + events_sender.send(PeerStateEvent::GotInvalidBlock { + peer_key, + block_number, + }); + } + } else { + return Ok(block); } - return Ok(block); } } diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs index 390b01e9..9dcf01ad 100644 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ b/node/actors/sync_blocks/src/peers/tests.rs @@ -6,7 +6,7 @@ use concurrency::time; use rand::{rngs::StdRng, seq::IteratorRandom, Rng}; use roles::validator; use std::{collections::HashSet, fmt}; -use storage::InMemoryStorage; +use storage::{BlockStore, InMemoryStorage}; use test_casing::{test_casing, Product}; const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5); @@ -60,6 +60,28 @@ async fn wait_for_stored_block( Ok(()) } +#[instrument(level = "trace", skip(ctx, events_receiver))] +async fn wait_for_peer_update( + ctx: &ctx::Ctx, + events_receiver: &mut channel::UnboundedReceiver<PeerStateEvent>, + expected_peer: &node::PublicKey, +) -> ctx::OrCanceled<()> { + loop { + let peer_event = events_receiver.recv(ctx).await?; + tracing::trace!(?peer_event, "received peer event"); + match peer_event { + PeerStateEvent::PeerUpdated(key) => { + assert_eq!(key, *expected_peer); + return Ok(()); + } + PeerStateEvent::PeerDisconnected(_) | PeerStateEvent::GotBlock(_) => { + // Skip update + } + _ => panic!("Received unexpected peer event: {peer_event:?}"), + } + } +} + #[instrument(level = "trace")] async fn test_peer_states<T: Test>(test: T) { concurrency::testonly::abort_on_panic(); @@ -291,7 +313,7 @@ impl Test for DownloadingBlocksInGaps { peer_states_handle, storage, mut message_receiver, - .. + mut events_receiver, } = handles; let peer_key = rng.gen::<node::SecretKey>().public(); @@ -304,6 +326,8 @@ impl Test for DownloadingBlocksInGaps { peer_key.clone(), test_validators.sync_state(last_peer_block_number), ); + wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; + clock.advance(BLOCK_SLEEP_INTERVAL); let expected_block_numbers = (1..Self::BLOCK_COUNT).filter(|number| !self.local_block_numbers.contains(number)); @@ -316,6 +340,9 @@ impl Test for DownloadingBlocksInGaps { peer_key.clone(), test_validators.sync_state(last_peer_block_number), ); + // Wait until the update is processed. + wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; + clock.advance(BLOCK_SLEEP_INTERVAL); } @@ -366,6 +393,7 @@ impl Test for LimitingGetBlockConcurrency { peer_states_handle, storage, mut message_receiver, + mut events_receiver, .. } = handles; let mut storage_subscriber = storage.subscribe_to_block_writes(); @@ -375,6 +403,7 @@ impl Test for LimitingGetBlockConcurrency { peer_key.clone(), test_validators.sync_state(Self::BLOCK_COUNT - 1), ); + wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; // The actor should request 3 new blocks it's now aware of from the only peer it's currently // aware of. Note that blocks may be queried in any order. @@ -388,7 +417,7 @@ impl Test for LimitingGetBlockConcurrency { assert_eq!(recipient, peer_key); assert!(message_responses.insert(number.0, response).is_none()); } - assert!(message_receiver.try_recv().is_none()); + assert_matches!(message_receiver.try_recv(), None); assert_eq!( message_responses.keys().copied().collect::<HashSet<_>>(), HashSet::from([1, 2, 3]) @@ -439,66 +468,86 @@ impl Test for RequestingBlocksFromTwoPeers { peer_states_handle, storage, mut message_receiver, - .. + mut events_receiver, } = handles; let first_peer = rng.gen::<node::SecretKey>().public(); peer_states_handle.update(first_peer.clone(), test_validators.sync_state(2)); + wait_for_peer_update(ctx, &mut events_receiver, &first_peer).await?; let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { recipient, - number, + number: first_peer_block_number, response: first_peer_response, }) = message_receiver.recv(ctx).await?; assert_eq!(recipient, first_peer); - assert_eq!(number, BlockNumber(1)); + assert!( + first_peer_block_number == BlockNumber(1) || first_peer_block_number == BlockNumber(2) + ); let second_peer = rng.gen::<node::SecretKey>().public(); peer_states_handle.update(second_peer.clone(), test_validators.sync_state(4)); + wait_for_peer_update(ctx, &mut events_receiver, &second_peer).await?; clock.advance(BLOCK_SLEEP_INTERVAL); let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { recipient, - number, + number: second_peer_block_number, response: second_peer_response, }) = message_receiver.recv(ctx).await?; assert_eq!(recipient, second_peer); - assert_eq!(number, BlockNumber(2)); + assert!( + second_peer_block_number == BlockNumber(1) + || second_peer_block_number == BlockNumber(2) + ); - test_validators.send_block(BlockNumber(1), first_peer_response); + test_validators.send_block(first_peer_block_number, first_peer_response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); // The node shouldn't send more requests to the first peer since it would be beyond // its known latest block number (2). clock.advance(BLOCK_SLEEP_INTERVAL); - assert!(message_receiver.try_recv().is_none()); + assert_matches!(message_receiver.try_recv(), None); peer_states_handle.update(first_peer.clone(), test_validators.sync_state(4)); + wait_for_peer_update(ctx, &mut events_receiver, &first_peer).await?; clock.advance(BLOCK_SLEEP_INTERVAL); // Now the actor can get block #3 from the peer. let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { recipient, - number, + number: first_peer_block_number, response: first_peer_response, }) = message_receiver.recv(ctx).await?; assert_eq!(recipient, first_peer); - assert_eq!(number, BlockNumber(3)); + assert!( + first_peer_block_number == BlockNumber(3) || first_peer_block_number == BlockNumber(4) + ); - test_validators.send_block(BlockNumber(3), first_peer_response); + test_validators.send_block(first_peer_block_number, first_peer_response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); clock.advance(BLOCK_SLEEP_INTERVAL); let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { recipient, - number, + number: first_peer_block_number, response: first_peer_response, }) = message_receiver.recv(ctx).await?; assert_eq!(recipient, first_peer); - assert_eq!(number, BlockNumber(4)); + assert!( + first_peer_block_number == BlockNumber(3) || first_peer_block_number == BlockNumber(4) + ); - test_validators.send_block(BlockNumber(2), second_peer_response); - test_validators.send_block(BlockNumber(4), first_peer_response); + test_validators.send_block(second_peer_block_number, second_peer_response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == second_peer_block_number); + test_validators.send_block(first_peer_block_number, first_peer_response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); // No more blocks should be requested from peers. clock.advance(BLOCK_SLEEP_INTERVAL); - assert!(message_receiver.try_recv().is_none()); + assert_matches!(message_receiver.try_recv(), None); wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(4)).await?; Ok(()) @@ -659,6 +708,7 @@ impl Test for RequestingBlocksFromMultiplePeers { clock.advance(BLOCK_SLEEP_INTERVAL); } PeerStateEvent::PeerDisconnected(_) => { /* Do nothing */ } + _ => panic!("Unexpected peer event: {peer_event:?}"), } } @@ -714,7 +764,7 @@ impl Test for DisconnectingPeer { // Check that no new requests are sent (there are no peers to send them to). clock.advance(BLOCK_SLEEP_INTERVAL); - assert!(message_receiver.try_recv().is_none()); + assert_matches!(message_receiver.try_recv(), None); // Re-connect the peer with an updated state. peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); @@ -740,16 +790,16 @@ impl Test for DisconnectingPeer { // Send one of the responses and drop the other request. let response = responses.remove(&2).unwrap(); test_validators.send_block(BlockNumber(2), response); - drop(responses); let peer_event = events_receiver.recv(ctx).await?; assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(2))); + drop(responses); let peer_event = events_receiver.recv(ctx).await?; assert_matches!(peer_event, PeerStateEvent::PeerDisconnected(key) if key == peer_key); // Check that no new requests are sent (there are no peers to send them to). clock.advance(BLOCK_SLEEP_INTERVAL); - assert!(message_receiver.try_recv().is_none()); + assert_matches!(message_receiver.try_recv(), None); // Re-connect the peer with the same state. peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); @@ -772,7 +822,7 @@ impl Test for DisconnectingPeer { // Check that no new requests are sent (all blocks are downloaded). clock.advance(BLOCK_SLEEP_INTERVAL); - assert!(message_receiver.try_recv().is_none()); + assert_matches!(message_receiver.try_recv(), None); wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(2)).await?; Ok(()) @@ -888,3 +938,107 @@ async fn processing_invalid_blocks() { .unwrap_err(); assert_matches!(err, BlockValidationError::Justification(_)); } + +#[derive(Debug)] +struct PeerWithFakeSyncState; + +#[async_trait] +impl Test for PeerWithFakeSyncState { + const BLOCK_COUNT: usize = 10; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + mut events_receiver, + .. + } = handles; + + let peer_key = rng.gen::<node::SecretKey>().public(); + let mut fake_sync_state = test_validators.sync_state(1); + fake_sync_state + .last_contiguous_stored_block + .message + .proposal + .number = BlockNumber(42); + peer_states_handle.update(peer_key.clone(), fake_sync_state); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::InvalidPeerUpdate(key) if key == peer_key); + + clock.advance(BLOCK_SLEEP_INTERVAL); + assert_matches!(events_receiver.try_recv(), None); + Ok(()) + } +} + +#[tokio::test] +async fn receiving_fake_sync_state_from_peer() { + test_peer_states(PeerWithFakeSyncState).await; +} + +#[derive(Debug)] +struct PeerWithFakeBlock; + +#[async_trait] +impl Test for PeerWithFakeBlock { + const BLOCK_COUNT: usize = 10; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + } = handles; + + let peer_key = rng.gen::<node::SecretKey>().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(1)); + + let mut fake_block = test_validators.final_blocks[2].clone(); + fake_block.header.number = BlockNumber(1); + response.send(Ok(fake_block)).unwrap(); + + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!( + peer_event, + PeerStateEvent::GotInvalidBlock { + block_number: BlockNumber(1), + peer_key: key, + } if key == peer_key + ); + clock.advance(BLOCK_SLEEP_INTERVAL); + + // The invalid block must not be saved. + assert_matches!(events_receiver.try_recv(), None); + assert!(storage.block(ctx, BlockNumber(1)).await?.is_none()); + + // Since we don't ban misbehaving peers, the node will send a request to the same peer again. + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, number, .. + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(1)); + + Ok(()) + } +} + +#[tokio::test] +async fn receiving_fake_block_from_peer() { + test_peer_states(PeerWithFakeBlock).await; +}