Skip to content

Commit

Permalink
Revert "Revert changes"
Browse files Browse the repository at this point in the history
This reverts commit f3b7db3b0f4aa92f2606c87ed6f65f4798734889.
  • Loading branch information
acerone85 committed Oct 17, 2024
1 parent 939939d commit e8c563c
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 207 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Added
- [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`).

### Changed
- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Preparation work for supporting multiple request response protocols in the P2P service. Information about request exchange protocols known by a peer is saved in the peer manager upon connection. Currently we only have `/fuel/req_res/0.0.1`, but a node will select the latest compatible version of the protocol that is known by a peer for sending messages.

## [Version 0.40.0]

### Added
Expand Down
20 changes: 8 additions & 12 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
codecs::{
postcard::PostcardCodecV1,
postcard::PostcardCodec,
NetworkCodec,
},
config::Config,
Expand Down Expand Up @@ -60,15 +60,12 @@ pub struct FuelBehaviour {
/// Node discovery
discovery: discovery::Behaviour,

/// RequestResponse protocol Version 1
request_response_v1: request_response::Behaviour<PostcardCodecV1>,
/// RequestResponse protocol
request_response: request_response::Behaviour<PostcardCodec>,
}

impl FuelBehaviour {
pub(crate) fn new(
p2p_config: &Config,
codec: PostcardCodecV1,
) -> anyhow::Result<Self> {
pub(crate) fn new(p2p_config: &Config, codec: PostcardCodec) -> anyhow::Result<Self> {
let local_public_key = p2p_config.keypair.public();
let local_peer_id = PeerId::from_public_key(&local_public_key);

Expand Down Expand Up @@ -122,7 +119,7 @@ impl FuelBehaviour {
.with_request_timeout(p2p_config.set_request_timeout)
.with_max_concurrent_streams(p2p_config.max_concurrent_streams);

let request_response_v1 = request_response::Behaviour::with_codec(
let request_response = request_response::Behaviour::with_codec(
codec,
req_res_protocol,
req_res_config,
Expand All @@ -133,7 +130,7 @@ impl FuelBehaviour {
discovery,
gossipsub,
peer_report,
request_response_v1,
request_response,
blocked_peer: Default::default(),
identify,
heartbeat,
Expand Down Expand Up @@ -163,16 +160,15 @@ impl FuelBehaviour {
message_request: RequestMessage,
peer_id: &PeerId,
) -> OutboundRequestId {
self.request_response_v1
.send_request(peer_id, message_request)
self.request_response.send_request(peer_id, message_request)
}

pub fn send_response_msg(
&mut self,
channel: ResponseChannel<ResponseMessage>,
message: ResponseMessage,
) -> Result<(), ResponseMessage> {
self.request_response_v1.send_response(channel, message)
self.request_response.send_response(channel, message)
}

pub fn report_message_validation_result(
Expand Down
27 changes: 12 additions & 15 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
request_response::messages::{
RequestMessage,
ResponseMessage,
REQUEST_RESPONSE_PROTOCOL_ID_V1,
REQUEST_RESPONSE_PROTOCOL_ID,
},
};
use async_trait::async_trait;
Expand Down Expand Up @@ -40,14 +40,14 @@ fn serialize<D: Serialize>(data: &D) -> Result<Vec<u8>, io::Error> {
}

#[derive(Debug, Clone)]
pub struct PostcardCodec<const VERSION: usize> {
pub struct PostcardCodec {
/// Used for `max_size` parameter when reading Response Message
/// Necessary in order to avoid DoS attacks
/// Currently the size mostly depends on the max size of the Block
max_response_size: usize,
}

impl<const VERSION: usize> PostcardCodec<VERSION> {
impl PostcardCodec {
pub fn new(max_block_size: usize) -> Self {
assert_ne!(
max_block_size, 0,
Expand All @@ -60,8 +60,6 @@ impl<const VERSION: usize> PostcardCodec<VERSION> {
}
}

pub type PostcardCodecV1 = PostcardCodec<0>;

/// Since Postcard does not support async reads or writes out of the box
/// We prefix Request & Response Messages with the length of the data in bytes
/// We expect the substream to be properly closed when response channel is dropped.
Expand All @@ -70,8 +68,8 @@ pub type PostcardCodecV1 = PostcardCodec<0>;
/// If the substream was not properly closed when dropped, the sender would instead
/// run into a timeout waiting for the response.
#[async_trait]
impl request_response::Codec for PostcardCodecV1 {
type Protocol = MessageExchangePostcardProtocolV1;
impl request_response::Codec for PostcardCodec {
type Protocol = MessageExchangePostcardProtocol;
type Request = RequestMessage;
type Response = ResponseMessage;

Expand Down Expand Up @@ -137,8 +135,7 @@ impl request_response::Codec for PostcardCodecV1 {
}
}

// GossipsubCodec is independent of the PostcardCoded version being used
impl<const VERSION: usize> GossipsubCodec for PostcardCodec<VERSION> {
impl GossipsubCodec for PostcardCodec {
type RequestMessage = GossipsubBroadcastRequest;
type ResponseMessage = GossipsubMessage;

Expand All @@ -163,18 +160,18 @@ impl<const VERSION: usize> GossipsubCodec for PostcardCodec<VERSION> {
}
}

impl NetworkCodec for PostcardCodecV1 {
impl NetworkCodec for PostcardCodec {
fn get_req_res_protocol(&self) -> <Self as request_response::Codec>::Protocol {
MessageExchangePostcardProtocolV1 {}
MessageExchangePostcardProtocol {}
}
}

#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct MessageExchangePostcardProtocolV1;
#[derive(Default, Debug, Clone)]
pub struct MessageExchangePostcardProtocol;

impl AsRef<str> for MessageExchangePostcardProtocolV1 {
impl AsRef<str> for MessageExchangePostcardProtocol {
fn as_ref(&self) -> &str {
REQUEST_RESPONSE_PROTOCOL_ID_V1
REQUEST_RESPONSE_PROTOCOL_ID
}
}

Expand Down
51 changes: 18 additions & 33 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
FuelBehaviourEvent,
},
codecs::{
postcard::PostcardCodecV1,
postcard::PostcardCodec,
GossipsubCodec,
},
config::{
Expand All @@ -26,16 +26,13 @@ use crate::{
Punisher,
},
peer_report::PeerReportEvent,
request_response::{
self as fuel_request_response,
messages::{
RequestError,
RequestMessage,
ResponseError,
ResponseMessage,
ResponseSendError,
ResponseSender,
},
request_response::messages::{
RequestError,
RequestMessage,
ResponseError,
ResponseMessage,
ResponseSendError,
ResponseSender,
},
TryPeerId,
};
Expand Down Expand Up @@ -125,7 +122,7 @@ pub struct FuelP2PService {
inbound_requests_table: HashMap<InboundRequestId, ResponseChannel<ResponseMessage>>,

/// NetworkCodec used as `<GossipsubCodec>` for encoding and decoding of Gossipsub messages
network_codec: PostcardCodecV1,
network_codec: PostcardCodec,

/// Stores additional p2p network info
network_metadata: NetworkMetadata,
Expand Down Expand Up @@ -214,7 +211,7 @@ impl FuelP2PService {
pub async fn new(
reserved_peers_updates: broadcast::Sender<usize>,
config: Config,
codec: PostcardCodecV1,
codec: PostcardCodec,
) -> anyhow::Result<Self> {
let metrics = config.metrics;

Expand Down Expand Up @@ -417,24 +414,14 @@ impl FuelP2PService {
}
};

let latest_compatible_request_response_protocol_version = self
.peer_manager
.get_peer_info(&peer_id)
.and_then(|peer_info| peer_info.request_response_protocol_version.as_ref())
.unwrap_or_default();

match latest_compatible_request_response_protocol_version {
fuel_request_response::ProtocolVersion::V1 => {
let request_id = self
.swarm
.behaviour_mut()
.send_request_msg(message_request, &peer_id);
let request_id = self
.swarm
.behaviour_mut()
.send_request_msg(message_request, &peer_id);

self.outbound_requests_table.insert(request_id, on_response);
self.outbound_requests_table.insert(request_id, on_response);

Ok(request_id)
}
}
Ok(request_id)
}

/// Sends ResponseMessage to a peer that requested the data
Expand Down Expand Up @@ -580,7 +567,7 @@ impl FuelP2PService {
self.handle_gossipsub_event(event)
}
FuelBehaviourEvent::PeerReport(event) => self.handle_peer_report_event(event),
FuelBehaviourEvent::RequestResponseV1(event) => {
FuelBehaviourEvent::RequestResponse(event) => {
self.handle_request_response_event(event)
}
FuelBehaviourEvent::Identify(event) => {
Expand Down Expand Up @@ -785,10 +772,9 @@ impl FuelP2PService {
identify::Event::Received { peer_id, info } => {
self.update_metrics(increment_unique_peers);

let request_response_protocol_version =
fuel_request_response::ProtocolVersion::latest_compatible_version_for_peer(&info);
let mut addresses = info.listen_addrs;
let agent_version = info.agent_version;

if addresses.len() > MAX_IDENTIFY_ADDRESSES {
let protocol_version = info.protocol_version;
debug!(
Expand All @@ -803,7 +789,6 @@ impl FuelP2PService {
&peer_id,
addresses.clone(),
agent_version,
request_response_protocol_version,
);

self.swarm
Expand Down
25 changes: 0 additions & 25 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ use tracing::{
use crate::{
gossipsub_config::GRAYLIST_THRESHOLD,
peer_manager::heartbeat_data::HeartbeatData,
request_response::{
self,
ProtocolVersion,
},
};

pub mod heartbeat_data;
Expand All @@ -49,9 +45,6 @@ pub struct PeerInfo {
pub client_version: Option<String>,
pub heartbeat_data: HeartbeatData,
pub score: AppScore,
/// The latest protocol version that the peer supports and that is
/// compatible with the current version of the node
pub request_response_protocol_version: Option<request_response::ProtocolVersion>,
}

impl PeerInfo {
Expand All @@ -61,7 +54,6 @@ impl PeerInfo {
client_version: None,
heartbeat_data: HeartbeatData::new(heartbeat_avg_window),
score: DEFAULT_APP_SCORE,
request_response_protocol_version: None,
}
}
}
Expand Down Expand Up @@ -143,14 +135,10 @@ impl PeerManager {
peer_id: &PeerId,
addresses: Vec<Multiaddr>,
agent_version: String,
protocol_version: Option<ProtocolVersion>,
) {
let peers = self.get_assigned_peer_table_mut(peer_id);
insert_client_version(peers, peer_id, agent_version);
insert_peer_addresses(peers, peer_id, addresses);
protocol_version.into_iter().for_each(|protocol| {
update_request_response_protocol_version(peers, peer_id, protocol)
});
}

pub fn batch_update_score_with_decay(&mut self) {
Expand Down Expand Up @@ -368,19 +356,6 @@ fn insert_client_version(
}
}

// Updates the latest request response protocol version that the peer supports
fn update_request_response_protocol_version(
peers: &mut HashMap<PeerId, PeerInfo>,
peer_id: &PeerId,
protocol: ProtocolVersion,
) {
if let Some(peer) = peers.get_mut(peer_id) {
peer.request_response_protocol_version = Some(protocol);
} else {
log_missing_peer(peer_id);
}
}

fn log_missing_peer(peer_id: &PeerId) {
debug!(target: "fuel-p2p", "Peer with PeerId: {:?} is not among the connected peers", peer_id)
}
Expand Down
4 changes: 0 additions & 4 deletions crates/services/p2p/src/request_response.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
pub mod messages;

pub mod protocols;

pub use protocols::ProtocolVersion;
2 changes: 1 addition & 1 deletion crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::ops::Range;
use thiserror::Error;
use tokio::sync::oneshot;

pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID_V1: &str = "/fuel/req_res/0.0.1";
pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1";

/// Max Size in Bytes of the Request Message
#[cfg(test)]
Expand Down
Loading

0 comments on commit e8c563c

Please sign in to comment.