Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Companion for paritytech/substrate#14197 #7582

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/network/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition.workspace = true
license.workspace = true

[dependencies]
always-assert = "0.1"
async-trait = "0.1.57"
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../gum" }
Expand Down
56 changes: 28 additions & 28 deletions node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{collections::HashSet, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use parking_lot::Mutex;

use parity_scale_codec::Encode;

use sc_network::{
config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, Event as NetworkEvent,
IfDisconnected, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkRequest,
NetworkService, OutboundFailure, ReputationChange, RequestFailure,
config::parse_addr, multiaddr::Multiaddr, service::traits::MessageSink, types::ProtocolName,
IfDisconnected, NetworkPeers, NetworkRequest, NetworkService, ObservedRole, OutboundFailure,
ReputationChange, RequestFailure,
};

use polkadot_node_network_protocol::{
peer_set::{PeerSet, PeerSetProtocolNames, ProtocolVersion},
peer_set::{PeerSet, ProtocolVersion},
request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests},
PeerId,
};
Expand All @@ -45,13 +48,12 @@ const LOG_TARGET: &'static str = "parachain::network-bridge-net";
/// messages that are compatible with the passed peer set, as that is currently not enforced by
/// this function. These are messages of type `WireMessage` parameterized on the matching type.
pub(crate) fn send_message<M>(
net: &mut impl Network,
mut peers: Vec<PeerId>,
peer_set: PeerSet,
version: ProtocolVersion,
protocol_names: &PeerSetProtocolNames,
message: M,
metrics: &super::Metrics,
network_notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
) where
M: Encode + Clone,
{
Expand All @@ -61,29 +63,31 @@ pub(crate) fn send_message<M>(
encoded
};

let notification_sinks = network_notification_sinks.lock();

// optimization: avoid cloning the message for the last peer in the
// list. The message payload can be quite large. If the underlying
// network used `Bytes` this would not be necessary.
//
// peer may have gotten disconnect by the time `send_message()` is called
// at which point the the sink is not available.
let last_peer = peers.pop();
// optimization: generate the protocol name once.
let protocol_name = protocol_names.get_name(peer_set, version);
peers.into_iter().for_each(|peer| {
net.write_notification(peer, protocol_name.clone(), message.clone());
if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
sink.send_sync_notification(message.clone());
}
});

if let Some(peer) = last_peer {
net.write_notification(peer, protocol_name, message);
if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
sink.send_sync_notification(message.clone());
}
}
}

/// An abstraction over networking for the purposes of this subsystem.
#[async_trait]
pub trait Network: Clone + Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
/// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;

/// Ask the network to keep a substream open with these nodes and not disconnect from them
/// until removed from the protocol's peer set.
/// Note that `out_peers` setting has no effect on this.
Expand Down Expand Up @@ -115,16 +119,12 @@ pub trait Network: Clone + Send + 'static {
/// Disconnect a given peer from the protocol specified without harming reputation.
fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName);

/// Write a notification to a peer on the given protocol.
fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec<u8>);
/// Get peer role.
fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<ObservedRole>;
}

#[async_trait]
impl Network for Arc<NetworkService<Block, Hash>> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}

async fn set_reserved_peers(
&mut self,
protocol: ProtocolName,
Expand All @@ -149,10 +149,6 @@ impl Network for Arc<NetworkService<Block, Hash>> {
NetworkService::disconnect_peer(&**self, who, protocol);
}

fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec<u8>) {
NetworkService::write_notification(&**self, who, protocol, message);
}

async fn start_request<AD: AuthorityDiscovery>(
&self,
authority_discovery: &mut AD,
Expand Down Expand Up @@ -224,6 +220,10 @@ impl Network for Arc<NetworkService<Block, Hash>> {
if_disconnected,
);
}

fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
NetworkService::peer_role(self, who, handshake)
}
}

/// We assume one `peer_id` per `authority_id`.
Expand Down
Loading
Loading