From 25f68b355c4ad319ce9fa2416f248a8f12429609 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 7 Aug 2019 12:40:49 +0200 Subject: [PATCH] Rename on_demand to light_dispatch and various minor changes (#3315) * Rename on_demand to light_server * Small docs improvement * Rename on_block_announce to update_best_number * More minor documentation * Light server -> light dispatch * is_light_rq_response -> is_light_response --- core/network/src/on_demand_layer.rs | 2 +- core/network/src/protocol.rs | 42 +-- .../{on_demand.rs => light_dispatch.rs} | 324 ++++++++++-------- core/network/src/service.rs | 16 +- 4 files changed, 206 insertions(+), 178 deletions(-) rename core/network/src/protocol/{on_demand.rs => light_dispatch.rs} (70%) diff --git a/core/network/src/on_demand_layer.rs b/core/network/src/on_demand_layer.rs index 1cbb5387d6416..818230eea54b8 100644 --- a/core/network/src/on_demand_layer.rs +++ b/core/network/src/on_demand_layer.rs @@ -16,7 +16,7 @@ //! On-demand requests service. -use crate::protocol::on_demand::RequestData; +use crate::protocol::light_dispatch::RequestData; use std::sync::Arc; use futures::{prelude::*, sync::mpsc, sync::oneshot}; use futures03::compat::{Compat01As03, Future01CompatExt as _}; diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index e23ec1e099745..9d4a537ed208d 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -34,7 +34,7 @@ use message::{BlockAttributes, Direction, FromBlock, Message, RequestId}; use message::generic::{Message as GenericMessage, ConsensusMessage}; use event::Event; use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; -use on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; +use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData}; use specialization::NetworkSpecialization; use sync::{ChainSync, SyncState}; use crate::service::{TransactionPool, ExHashT}; @@ -53,7 +53,7 @@ mod util; pub mod consensus_gossip; pub mod message; pub mod event; -pub mod on_demand; +pub mod light_dispatch; pub mod specialization; pub mod sync; @@ -96,8 +96,8 @@ pub struct Protocol, H: ExHashT> { /// Interval at which we call `propagate_extrinsics`. propagate_timeout: Box + Send>, config: ProtocolConfig, - /// Handler for on-demand requests. - on_demand_core: OnDemandCore, + /// Handler for light client requests. + light_dispatch: LightDispatch, genesis_hash: B::Hash, sync: ChainSync, specialization: S, @@ -149,12 +149,12 @@ pub struct PeerInfo { pub best_number: ::Number, } -struct OnDemandIn<'a, B: BlockT> { +struct LightDispatchIn<'a, B: BlockT> { behaviour: &'a mut CustomProto>, peerset: peerset::PeersetHandle, } -impl<'a, B: BlockT> OnDemandNetwork for OnDemandIn<'a, B> { +impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { fn report_peer(&mut self, who: &PeerId, reputation: i32) { self.peerset.report_peer(who.clone(), reputation) } @@ -373,7 +373,7 @@ impl, H: ExHashT> Protocol { peers: HashMap::new(), chain, }, - on_demand_core: OnDemandCore::new(checker), + light_dispatch: LightDispatch::new(checker), genesis_hash: info.chain.genesis_hash, sync, specialization: specialization, @@ -445,15 +445,15 @@ impl, H: ExHashT> Protocol { /// Starts a new data demand request. /// /// The parameter contains a `Sender` where the result, once received, must be sent. - pub(crate) fn add_on_demand_request(&mut self, rq: RequestData) { - self.on_demand_core.add_request(OnDemandIn { + pub(crate) fn add_light_client_request(&mut self, rq: RequestData) { + self.light_dispatch.add_request(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, rq); } - fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool { - self.on_demand_core.is_on_demand_response(&who, response_id) + fn is_light_response(&self, who: &PeerId, response_id: message::RequestId) -> bool { + self.light_dispatch.is_light_response(&who, response_id) } fn handle_response( @@ -506,7 +506,7 @@ impl, H: ExHashT> Protocol { GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { // Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter. - if self.is_on_demand_response(&who, r.id) { + if self.is_light_response(&who, r.id) { self.on_remote_body_response(who, r); } else { if let Some(request) = self.handle_response(who.clone(), &r) { @@ -629,7 +629,7 @@ impl, H: ExHashT> Protocol { } self.sync.peer_disconnected(peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone()); - self.on_demand_core.on_disconnect(OnDemandIn { + self.light_dispatch.on_disconnect(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, peer); @@ -793,7 +793,7 @@ impl, H: ExHashT> Protocol { &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) ); self.maintain_peers(); - self.on_demand_core.maintain_peers(OnDemandIn { + self.light_dispatch.maintain_peers(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }); @@ -914,7 +914,7 @@ impl, H: ExHashT> Protocol { }; let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone(); - self.on_demand_core.on_connect(OnDemandIn { + self.light_dispatch.on_connect(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who.clone(), status.roles, status.best_number); @@ -1053,7 +1053,7 @@ impl, H: ExHashT> Protocol { peer.known_blocks.insert(hash.clone()); } } - self.on_demand_core.on_block_announce(OnDemandIn { + self.light_dispatch.update_best_number(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who.clone(), *header.number()); @@ -1253,7 +1253,7 @@ impl, H: ExHashT> Protocol { response: message::RemoteCallResponse ) { trace!(target: "sync", "Remote call response {} from {}", response.id, who); - self.on_demand_core.on_remote_call_response(OnDemandIn { + self.light_dispatch.on_remote_call_response(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who, response); @@ -1294,7 +1294,7 @@ impl, H: ExHashT> Protocol { response: message::RemoteReadResponse ) { trace!(target: "sync", "Remote read response {} from {}", response.id, who); - self.on_demand_core.on_remote_read_response(OnDemandIn { + self.light_dispatch.on_remote_read_response(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who, response); @@ -1335,7 +1335,7 @@ impl, H: ExHashT> Protocol { response: message::RemoteHeaderResponse, ) { trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); - self.on_demand_core.on_remote_header_response(OnDemandIn { + self.light_dispatch.on_remote_header_response(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who, response); @@ -1401,7 +1401,7 @@ impl, H: ExHashT> Protocol { who, response.max ); - self.on_demand_core.on_remote_changes_response(OnDemandIn { + self.light_dispatch.on_remote_changes_response(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who, response); @@ -1462,7 +1462,7 @@ impl, H: ExHashT> Protocol { peer: PeerId, response: message::BlockResponse ) { - self.on_demand_core.on_remote_body_response(OnDemandIn { + self.light_dispatch.on_remote_body_response(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, peer, response); diff --git a/core/network/src/protocol/on_demand.rs b/core/network/src/protocol/light_dispatch.rs similarity index 70% rename from core/network/src/protocol/on_demand.rs rename to core/network/src/protocol/light_dispatch.rs index 9d35535e21deb..a7b327686af60 100644 --- a/core/network/src/protocol/on_demand.rs +++ b/core/network/src/protocol/light_dispatch.rs @@ -14,7 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -//! On-demand requests service. +//! Light client requests service. +//! +//! Handles requests for data coming from our local light client and that must be answered by +//! nodes on the network. use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -38,8 +41,8 @@ const RETRY_COUNT: usize = 1; /// Reputation change for a peer when a request timed out. const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8); -/// Trait used by the `OnDemandCore` service to communicate messages back to the network. -pub trait OnDemandNetwork { +/// Trait used by the `LightDispatch` service to communicate messages back to the network. +pub trait LightDispatchNetwork { /// Adjusts the reputation of the given peer. fn report_peer(&mut self, who: &PeerId, reputation_change: i32); @@ -97,19 +100,30 @@ pub trait OnDemandNetwork { ); } -/// On-demand requests service. Dispatches requests to appropriate peers. -pub struct OnDemandCore { +/// Light client requests service. Dispatches requests to appropriate peers. +pub struct LightDispatch { + /// Verifies that responses are correct. Passed at initialization. checker: Arc>, + /// Numeric ID to assign to the next outgoing request. Used to assign responses to their + /// corresponding request. next_request_id: u64, + /// Requests that we have yet to send out on the network. pending_requests: VecDeque>, + /// List of nodes to which we have sent a request and that are yet to answer. active_peers: LinkedHashMap>, + /// List of nodes that we know of that aren't doing anything and that are available for new + /// requests. idle_peers: VecDeque, + /// Best known block for each node in `active_peers` and `idle_peers`. best_blocks: HashMap>, } struct Request { id: u64, + /// When the request got created or sent out to the network. timestamp: Instant, + /// Number of remaining attempts to fulfill this request. If it reaches 0, we interrupt the + /// attempt. retry_count: usize, data: RequestData, } @@ -196,12 +210,12 @@ impl FetchChecker for AlwaysBadChecker { } } -impl OnDemandCore where +impl LightDispatch where B::Header: HeaderT, { - /// Creates new on-demand requests processer. + /// Creates new light client requests processer. pub fn new(checker: Arc>) -> Self { - OnDemandCore { + LightDispatch { checker, next_request_id: 0, pending_requests: VecDeque::new(), @@ -212,7 +226,7 @@ impl OnDemandCore where } /// Inserts a new request in the list of requests to execute. - pub(crate) fn add_request(&mut self, network: impl OnDemandNetwork, data: RequestData) { + pub(crate) fn add_request(&mut self, network: impl LightDispatchNetwork, data: RequestData) { self.insert(RETRY_COUNT, data); self.dispatch(network); } @@ -234,7 +248,7 @@ impl OnDemandCore where fn accept_response( &mut self, rtype: &str, - mut network: impl OnDemandNetwork, + mut network: impl LightDispatchNetwork, peer: PeerId, request_id: u64, try_accept: impl FnOnce(Request, &Arc>) -> Accept @@ -284,9 +298,10 @@ impl OnDemandCore where self.dispatch(network); } + /// Call this when we connect to a node on the network. pub fn on_connect( &mut self, - network: impl OnDemandNetwork, + network: impl LightDispatchNetwork, peer: PeerId, role: Roles, best_number: NumberFor @@ -301,17 +316,20 @@ impl OnDemandCore where self.dispatch(network); } - pub fn on_block_announce(&mut self, network: impl OnDemandNetwork, peer: PeerId, best_number: NumberFor) { + /// Sets the best seen block for the given node. + pub fn update_best_number(&mut self, network: impl LightDispatchNetwork, peer: PeerId, best_number: NumberFor) { self.best_blocks.insert(peer, best_number); self.dispatch(network); } - pub fn on_disconnect(&mut self, network: impl OnDemandNetwork, peer: PeerId) { + /// Call this when we disconnect from a node. + pub fn on_disconnect(&mut self, network: impl LightDispatchNetwork, peer: PeerId) { self.remove_peer(peer); self.dispatch(network); } - pub fn maintain_peers(&mut self, mut network: impl OnDemandNetwork) { + /// Must be called periodically in order to perform maintenance. + pub fn maintain_peers(&mut self, mut network: impl LightDispatchNetwork) { let now = Instant::now(); loop { @@ -329,9 +347,10 @@ impl OnDemandCore where self.dispatch(network); } + /// Handles a remote header response message from on the network. pub fn on_remote_header_response( &mut self, - network: impl OnDemandNetwork, + network: impl LightDispatchNetwork, peer: PeerId, response: message::RemoteHeaderResponse ) { @@ -352,9 +371,10 @@ impl OnDemandCore where }) } + /// Handles a remote read response message from on the network. pub fn on_remote_read_response( &mut self, - network: impl OnDemandNetwork, + network: impl LightDispatchNetwork, peer: PeerId, response: message::RemoteReadResponse ) { @@ -387,9 +407,10 @@ impl OnDemandCore where }) } + /// Handles a remote call response message from on the network. pub fn on_remote_call_response( &mut self, - network: impl OnDemandNetwork, + network: impl LightDispatchNetwork, peer: PeerId, response: message::RemoteCallResponse ) { @@ -406,9 +427,10 @@ impl OnDemandCore where }) } + /// Handles a remote changes response message from on the network. pub fn on_remote_changes_response( &mut self, - network: impl OnDemandNetwork, + network: impl LightDispatchNetwork, peer: PeerId, response: message::RemoteChangesResponse, B::Hash> ) { @@ -431,9 +453,10 @@ impl OnDemandCore where }) } + /// Handles a remote body response message from on the network. pub fn on_remote_body_response( &mut self, - network: impl OnDemandNetwork, + network: impl LightDispatchNetwork, peer: PeerId, response: message::BlockResponse ) { @@ -466,7 +489,7 @@ impl OnDemandCore where }) } - pub fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool { + pub fn is_light_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool { self.active_peers.get(&peer).map_or(false, |r| r.id == request_id) } @@ -483,7 +506,10 @@ impl OnDemandCore where } } - pub fn remove_peer(&mut self, peer: PeerId) { + /// Removes a peer from the list of known peers. + /// + /// Puts back the active request that this node was performing into `pending_requests`. + fn remove_peer(&mut self, peer: PeerId) { self.best_blocks.remove(&peer); if let Some(request) = self.active_peers.remove(&peer) { @@ -497,7 +523,7 @@ impl OnDemandCore where } /// Dispatches pending requests. - fn dispatch(&mut self, mut network: impl OnDemandNetwork) { + fn dispatch(&mut self, mut network: impl LightDispatchNetwork) { let mut last_peer = self.idle_peers.back().cloned(); let mut unhandled_requests = VecDeque::new(); @@ -551,6 +577,8 @@ impl OnDemandCore where } impl Request { + /// Returns the block that the remote needs to have in order to be able to fulfill + /// this request. fn required_block(&self) -> NumberFor { match self.data { RequestData::RemoteHeader(ref data, _) => data.block, @@ -562,7 +590,7 @@ impl Request { } } - fn send_to(&self, out: &mut impl OnDemandNetwork, peer: &PeerId) { + fn send_to(&self, out: &mut impl LightDispatchNetwork, peer: &PeerId) { match self.data { RequestData::RemoteHeader(ref data, _) => out.send_header_request( @@ -645,7 +673,7 @@ pub mod tests { use crate::config::Roles; use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; use libp2p::PeerId; - use super::{REQUEST_TIMEOUT, OnDemandCore, OnDemandNetwork, RequestData}; + use super::{REQUEST_TIMEOUT, LightDispatch, LightDispatchNetwork, RequestData}; use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header}; struct DummyFetchChecker { ok: bool } @@ -711,21 +739,21 @@ pub mod tests { } } - fn dummy(ok: bool) -> OnDemandCore { - OnDemandCore::new(Arc::new(DummyFetchChecker { ok })) + fn dummy(ok: bool) -> LightDispatch { + LightDispatch::new(Arc::new(DummyFetchChecker { ok })) } - fn total_peers(on_demand: &OnDemandCore) -> usize { - on_demand.idle_peers.len() + on_demand.active_peers.len() + fn total_peers(light_dispatch: &LightDispatch) -> usize { + light_dispatch.idle_peers.len() + light_dispatch.active_peers.len() } fn receive_call_response( - network_interface: impl OnDemandNetwork, - on_demand: &mut OnDemandCore, + network_interface: impl LightDispatchNetwork, + light_dispatch: &mut LightDispatch, peer: PeerId, id: message::RequestId ) { - on_demand.on_remote_call_response(network_interface, peer, message::RemoteCallResponse { + light_dispatch.on_remote_call_response(network_interface, peer, message::RemoteCallResponse { id: id, proof: vec![vec![2]], }); @@ -746,7 +774,7 @@ pub mod tests { disconnected_peers: HashSet, } - impl<'a, B: BlockT> OnDemandNetwork for &'a mut DummyNetwork { + impl<'a, B: BlockT> LightDispatchNetwork for &'a mut DummyNetwork { fn report_peer(&mut self, _: &PeerId, _: i32) {} fn disconnect_peer(&mut self, who: &PeerId) { self.disconnected_peers.insert(who.clone()); @@ -769,16 +797,16 @@ pub mod tests { #[test] fn knows_about_peers_roles() { let mut network_interface = DummyNetwork::default(); - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let peer0 = PeerId::random(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer0, Roles::LIGHT, 1000); - on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 2000); - on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::AUTHORITY, 3000); - assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.best_blocks.get(&peer1), Some(&2000)); - assert_eq!(on_demand.best_blocks.get(&peer2), Some(&3000)); + light_dispatch.on_connect(&mut network_interface, peer0, Roles::LIGHT, 1000); + light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 2000); + light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::AUTHORITY, 3000); + assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); + assert_eq!(light_dispatch.best_blocks.get(&peer1), Some(&2000)); + assert_eq!(light_dispatch.best_blocks.get(&peer2), Some(&3000)); } #[test] @@ -786,69 +814,69 @@ pub mod tests { let peer0 = PeerId::random(); let mut network_interface = DummyNetwork::default(); - let mut on_demand = dummy(true); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 100); - assert_eq!(1, total_peers(&on_demand)); - assert!(!on_demand.best_blocks.is_empty()); - - on_demand.on_disconnect(&mut network_interface, peer0); - assert_eq!(0, total_peers(&on_demand)); - assert!(on_demand.best_blocks.is_empty()); + let mut light_dispatch = dummy(true); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 100); + assert_eq!(1, total_peers(&light_dispatch)); + assert!(!light_dispatch.best_blocks.is_empty()); + + light_dispatch.on_disconnect(&mut network_interface, peer0); + assert_eq!(0, total_peers(&light_dispatch)); + assert!(light_dispatch.best_blocks.is_empty()); } #[test] fn disconnects_from_timeouted_peer() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); let peer1 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 1000); - assert_eq!(vec![peer0.clone(), peer1.clone()], on_demand.idle_peers.iter().cloned().collect::>()); - assert!(on_demand.active_peers.is_empty()); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 1000); + assert_eq!(vec![peer0.clone(), peer1.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); + assert!(light_dispatch.active_peers.is_empty()); - on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), call_data: vec![], retry_count: None, }, oneshot::channel().0)); - assert_eq!(vec![peer1.clone()], on_demand.idle_peers.iter().cloned().collect::>()); - assert_eq!(vec![peer0.clone()], on_demand.active_peers.keys().cloned().collect::>()); + assert_eq!(vec![peer1.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); + assert_eq!(vec![peer0.clone()], light_dispatch.active_peers.keys().cloned().collect::>()); - on_demand.active_peers[&peer0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; - on_demand.maintain_peers(&mut network_interface); - assert!(on_demand.idle_peers.is_empty()); - assert_eq!(vec![peer1.clone()], on_demand.active_peers.keys().cloned().collect::>()); + light_dispatch.active_peers[&peer0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; + light_dispatch.maintain_peers(&mut network_interface); + assert!(light_dispatch.idle_peers.is_empty()); + assert_eq!(vec![peer1.clone()], light_dispatch.active_peers.keys().cloned().collect::>()); assert_disconnected_peer(&network_interface); } #[test] fn disconnects_from_peer_on_response_with_wrong_id() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let peer0 = PeerId::random(); let mut network_interface = DummyNetwork::default(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), call_data: vec![], retry_count: None, }, oneshot::channel().0)); - receive_call_response(&mut network_interface, &mut on_demand, peer0, 1); + receive_call_response(&mut network_interface, &mut light_dispatch, peer0, 1); assert_disconnected_peer(&network_interface); - assert_eq!(on_demand.pending_requests.len(), 1); + assert_eq!(light_dispatch.pending_requests.len(), 1); } #[test] fn disconnects_from_peer_on_incorrect_response() { - let mut on_demand = dummy(false); + let mut light_dispatch = dummy(false); let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), @@ -856,31 +884,31 @@ pub mod tests { retry_count: Some(1), }, oneshot::channel().0)); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - receive_call_response(&mut network_interface, &mut on_demand, peer0.clone(), 0); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + receive_call_response(&mut network_interface, &mut light_dispatch, peer0.clone(), 0); assert_disconnected_peer(&network_interface); - assert_eq!(on_demand.pending_requests.len(), 1); + assert_eq!(light_dispatch.pending_requests.len(), 1); } #[test] fn disconnects_from_peer_on_unexpected_response() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - receive_call_response(&mut network_interface, &mut on_demand, peer0, 0); + receive_call_response(&mut network_interface, &mut light_dispatch, peer0, 0); assert_disconnected_peer(&network_interface); } #[test] fn disconnects_from_peer_on_wrong_response_type() { - let mut on_demand = dummy(false); + let mut light_dispatch = dummy(false); let peer0 = PeerId::random(); let mut network_interface = DummyNetwork::default(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), @@ -888,26 +916,26 @@ pub mod tests { retry_count: Some(1), }, oneshot::channel().0)); - on_demand.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { + light_dispatch.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { id: 0, proof: vec![vec![2]], }); assert_disconnected_peer(&network_interface); - assert_eq!(on_demand.pending_requests.len(), 1); + assert_eq!(light_dispatch.pending_requests.len(), 1); } #[test] fn receives_remote_failure_after_retry_count_failures() { let retry_count = 2; let peer_ids = (0 .. retry_count + 1).map(|_| PeerId::random()).collect::>(); - let mut on_demand = dummy(false); + let mut light_dispatch = dummy(false); let mut network_interface = DummyNetwork::default(); for i in 0..retry_count+1 { - on_demand.on_connect(&mut network_interface, peer_ids[i].clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer_ids[i].clone(), Roles::FULL, 1000); } let (tx, mut response) = oneshot::channel(); - on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), @@ -917,7 +945,7 @@ pub mod tests { for i in 0..retry_count { assert!(response.try_recv().unwrap().is_none()); - receive_call_response(&mut network_interface, &mut on_demand, peer_ids[i].clone(), i as u64); + receive_call_response(&mut network_interface, &mut light_dispatch, peer_ids[i].clone(), i as u64); } assert!(response.try_recv().unwrap().unwrap().is_err()); @@ -925,13 +953,13 @@ pub mod tests { #[test] fn receives_remote_call_response() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); let (tx, response) = oneshot::channel(); - on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), @@ -939,26 +967,26 @@ pub mod tests { retry_count: None, }, tx)); - receive_call_response(&mut network_interface, &mut on_demand, peer0.clone(), 0); + receive_call_response(&mut network_interface, &mut light_dispatch, peer0.clone(), 0); assert_eq!(response.wait().unwrap().unwrap(), vec![42]); } #[test] fn receives_remote_read_response() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); let (tx, response) = oneshot::channel(); - on_demand.add_request(&mut network_interface, RequestData::RemoteRead(RemoteReadRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteRead(RemoteReadRequest { header: dummy_header(), block: Default::default(), key: b":key".to_vec(), retry_count: None, }, tx)); - on_demand.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { + light_dispatch.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { id: 0, proof: vec![vec![2]], }); @@ -967,13 +995,13 @@ pub mod tests { #[test] fn receives_remote_read_child_response() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); let (tx, response) = oneshot::channel(); - on_demand.add_request(&mut network_interface, RequestData::RemoteReadChild(RemoteReadChildRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteReadChild(RemoteReadChildRequest { header: dummy_header(), block: Default::default(), storage_key: b":child_storage:sub".to_vec(), @@ -981,7 +1009,7 @@ pub mod tests { retry_count: None, }, tx)); - on_demand.on_remote_read_response(&mut network_interface, + light_dispatch.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { id: 0, proof: vec![vec![2]], @@ -991,19 +1019,19 @@ pub mod tests { #[test] fn receives_remote_header_response() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); let (tx, response) = oneshot::channel(); - on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 1, retry_count: None, }, tx)); - on_demand.on_remote_header_response(&mut network_interface, peer0.clone(), message::RemoteHeaderResponse { + light_dispatch.on_remote_header_response(&mut network_interface, peer0.clone(), message::RemoteHeaderResponse { id: 0, header: Some(Header { parent_hash: Default::default(), @@ -1022,13 +1050,13 @@ pub mod tests { #[test] fn receives_remote_changes_response() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); let (tx, response) = oneshot::channel(); - on_demand.add_request(&mut network_interface, RequestData::RemoteChanges(RemoteChangesRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteChanges(RemoteChangesRequest { changes_trie_config: changes_trie_config(), first_block: (1, Default::default()), last_block: (100, Default::default()), @@ -1038,7 +1066,7 @@ pub mod tests { retry_count: None, }, tx)); - on_demand.on_remote_changes_response(&mut network_interface, peer0.clone(), message::RemoteChangesResponse { + light_dispatch.on_remote_changes_response(&mut network_interface, peer0.clone(), message::RemoteChangesResponse { id: 0, max: 1000, proof: vec![vec![2]], @@ -1050,52 +1078,52 @@ pub mod tests { #[test] fn does_not_sends_request_to_peer_who_has_no_required_block() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); - on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 100); + light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 100); - on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 200, retry_count: None, }, oneshot::channel().0)); - on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, }, oneshot::channel().0)); - on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, }, oneshot::channel().0)); - on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 150); + light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 150); - assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.pending_requests.len(), 3); + assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); + assert_eq!(light_dispatch.pending_requests.len(), 3); - on_demand.on_block_announce(&mut network_interface, peer1.clone(), 250); + light_dispatch.update_best_number(&mut network_interface, peer1.clone(), 250); - assert_eq!(vec![peer2.clone()], on_demand.idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.pending_requests.len(), 2); + assert_eq!(vec![peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); + assert_eq!(light_dispatch.pending_requests.len(), 2); - on_demand.on_block_announce(&mut network_interface, peer2.clone(), 250); + light_dispatch.update_best_number(&mut network_interface, peer2.clone(), 250); - assert!(!on_demand.idle_peers.iter().any(|_| true)); - assert_eq!(on_demand.pending_requests.len(), 1); + assert!(!light_dispatch.idle_peers.iter().any(|_| true)); + assert_eq!(light_dispatch.pending_requests.len(), 1); - on_demand.on_remote_header_response(&mut network_interface, peer1.clone(), message::RemoteHeaderResponse { + light_dispatch.on_remote_header_response(&mut network_interface, peer1.clone(), message::RemoteHeaderResponse { id: 0, header: Some(dummy_header()), proof: vec![], }); - assert!(!on_demand.idle_peers.iter().any(|_| true)); - assert_eq!(on_demand.pending_requests.len(), 0); + assert!(!light_dispatch.idle_peers.iter().any(|_| true)); + assert_eq!(light_dispatch.pending_requests.len(), 0); } #[test] @@ -1103,70 +1131,70 @@ pub mod tests { // this test is a regression for a bug where the dispatch function would // loop forever after dispatching a request to the last peer, since the // last peer was not updated - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); let peer3 = PeerId::random(); - on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, }, oneshot::channel().0)); - on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, }, oneshot::channel().0)); - on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 200); - on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 200); - on_demand.on_connect(&mut network_interface, peer3.clone(), Roles::FULL, 250); + light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 200); + light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 200); + light_dispatch.on_connect(&mut network_interface, peer3.clone(), Roles::FULL, 250); - assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.pending_requests.len(), 1); + assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); + assert_eq!(light_dispatch.pending_requests.len(), 1); } #[test] fn tries_to_send_all_pending_requests() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); - on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 300, retry_count: None, }, oneshot::channel().0)); - on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, }, oneshot::channel().0)); - on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); + light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - assert!(on_demand.idle_peers.iter().cloned().collect::>().is_empty()); - assert_eq!(on_demand.pending_requests.len(), 1); + assert!(light_dispatch.idle_peers.iter().cloned().collect::>().is_empty()); + assert_eq!(light_dispatch.pending_requests.len(), 1); } #[test] fn remote_body_with_one_block_body_should_succeed() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); let header = dummy_header(); - on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); + light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - on_demand.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest { header: header.clone(), retry_count: None, }, oneshot::channel().0)); - assert!(on_demand.pending_requests.is_empty()); - assert_eq!(on_demand.active_peers.len(), 1); + assert!(light_dispatch.pending_requests.is_empty()); + assert_eq!(light_dispatch.active_peers.len(), 1); let block = message::BlockData:: { hash: primitives::H256::random(), @@ -1182,28 +1210,28 @@ pub mod tests { blocks: vec![block], }; - on_demand.on_remote_body_response(&mut network_interface, peer1.clone(), response); + light_dispatch.on_remote_body_response(&mut network_interface, peer1.clone(), response); - assert!(on_demand.active_peers.is_empty()); - assert_eq!(on_demand.idle_peers.len(), 1); + assert!(light_dispatch.active_peers.is_empty()); + assert_eq!(light_dispatch.idle_peers.len(), 1); } #[test] fn remote_body_with_three_bodies_should_fail() { - let mut on_demand = dummy(true); + let mut light_dispatch = dummy(true); let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); let header = dummy_header(); - on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); + light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - on_demand.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest { + light_dispatch.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest { header: header.clone(), retry_count: None, }, oneshot::channel().0)); - assert!(on_demand.pending_requests.is_empty()); - assert_eq!(on_demand.active_peers.len(), 1); + assert!(light_dispatch.pending_requests.is_empty()); + assert_eq!(light_dispatch.active_peers.len(), 1); let response = { let blocks: Vec<_> = (0..3).map(|_| message::BlockData:: { @@ -1221,8 +1249,8 @@ pub mod tests { } }; - on_demand.on_remote_body_response(&mut network_interface, peer1.clone(), response); - assert!(on_demand.active_peers.is_empty()); - assert!(on_demand.idle_peers.is_empty(), "peer should be disconnected after bad response"); + light_dispatch.on_remote_body_response(&mut network_interface, peer1.clone(), response); + assert!(light_dispatch.active_peers.is_empty()); + assert!(light_dispatch.idle_peers.is_empty(), "peer should be disconnected after bad response"); } } diff --git a/core/network/src/service.rs b/core/network/src/service.rs index acd3bbeab7b10..ff5db5f702096 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -47,7 +47,7 @@ use crate::config::{Params, TransportConfig}; use crate::error::Error; use crate::protocol::{self, Protocol, Context, CustomMessageOutcome, PeerInfo}; use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; -use crate::protocol::{event::Event, on_demand::{AlwaysBadChecker, RequestData}}; +use crate::protocol::{event::Event, light_dispatch::{AlwaysBadChecker, RequestData}}; use crate::protocol::specialization::NetworkSpecialization; use crate::protocol::sync::SyncState; @@ -241,7 +241,7 @@ impl, H: ExHashT> NetworkWorker service, import_queue: params.import_queue, from_worker, - on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()), + light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), }) } @@ -585,8 +585,8 @@ pub struct NetworkWorker, H: Ex import_queue: Box>, /// Messages from the `NetworkService` and that must be processed. from_worker: mpsc::UnboundedReceiver>, - /// Receiver for queries from the on-demand that must be processed. - on_demand_in: Option>>, + /// Receiver for queries from the light client that must be processed. + light_client_rqs: Option>>, } impl, H: ExHashT> Future for NetworkWorker { @@ -602,10 +602,10 @@ impl, H: ExHashT> Future for Ne std::task::Poll::Pending::> }).compat().poll(); - // Check for new incoming on-demand requests. - if let Some(on_demand_in) = self.on_demand_in.as_mut() { - while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() { - self.network_service.user_protocol_mut().add_on_demand_request(rq); + // Check for new incoming light client requests. + if let Some(light_client_rqs) = self.light_client_rqs.as_mut() { + while let Ok(Async::Ready(Some(rq))) = light_client_rqs.poll() { + self.network_service.user_protocol_mut().add_light_client_request(rq); } }