diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index 45bf0be84a..d11d0cf35f 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -8,7 +8,7 @@ use std::collections::HashMap; use futures::channel::mpsc::{Receiver, SendError, Sender, UnboundedReceiver, UnboundedSender}; use futures::future::{ready, Ready}; use futures::sink::With; -use futures::stream::{BoxStream, Map, SelectAll}; +use futures::stream::{self, Chain, Map, Once}; use futures::{SinkExt, StreamExt}; use libp2p::gossipsub::{SubscriptionError, TopicHash}; use libp2p::swarm::SwarmEvent; @@ -28,8 +28,6 @@ use crate::sqmr::{self, InboundSessionId, OutboundSessionId, SessionId}; use crate::utils::StreamHashMap; use crate::{gossipsub_impl, DataType, NetworkConfig, Protocol}; -type StreamCollection = SelectAll>; - #[derive(thiserror::Error, Debug)] pub enum NetworkError { #[error(transparent)] @@ -40,12 +38,12 @@ pub struct GenericNetworkManager>, // Splitting the response receivers from the query senders in order to poll all // receivers simultaneously. // Each receiver has a matching sender and vice versa (i.e the maps have the same keys). - sqmr_query_receivers: StreamHashMap>, - sqmr_response_senders: HashMap>, + sqmr_outbound_query_receivers: StreamHashMap>, + sqmr_outbound_response_senders: HashMap>, // Splitting the broadcast receivers from the broadcasted senders in order to poll all // receivers simultaneously. // Each receiver has a matching sender and vice versa (i.e the maps have the same keys). @@ -66,8 +64,8 @@ impl GenericNetworkManager self.handle_swarm_event(event), _ = self.db_executor.run() => panic!("DB executor should never finish."), - Some(res) = self.query_results_router.next() => self.handle_query_result_routing_to_other_peer(res), - Some((protocol, query)) = self.sqmr_query_receivers.next() => { + Some(res) = self.sqmr_inbound_response_receivers.next() => self.handle_response_for_inbound_query(res), + Some((protocol, query)) = self.sqmr_outbound_query_receivers.next() => { self.handle_local_sqmr_query(protocol, query) } Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => { @@ -89,9 +87,9 @@ impl GenericNetworkManager GenericNetworkManager GenericNetworkManager Option = Some; + self.sqmr_inbound_response_receivers.insert( + inbound_session_id, + receiver.map(response_fn).chain(stream::once(ready(None))), + ); } sqmr::behaviour::ExternalEvent::ReceivedData { outbound_session_id, data, peer_id } => { trace!( @@ -320,7 +321,8 @@ impl GenericNetworkManager GenericNetworkManager)) { + let (inbound_session_id, maybe_data) = res; + match maybe_data { + Some(data) => { + let mut data_bytes = vec![]; + data.encode(&mut data_bytes).expect("failed to encode data"); + self.swarm.send_data(data_bytes, inbound_session_id).unwrap_or_else(|e| { + error!( + "Failed to send data to peer. Session id: {inbound_session_id:?} not \ + found error: {e:?}" + ); + }); + } + None => { + self.swarm.close_inbound_session(inbound_session_id).unwrap_or_else(|e| { + error!( + "Failed to close session after sending all data. Session id: \ + {inbound_session_id:?} not found error: {e:?}" + ) + }); + } + }; } fn handle_local_sqmr_query(&mut self, protocol: Protocol, query: Bytes) { @@ -531,3 +531,6 @@ pub struct BroadcastSubscriberChannels> { pub messages_to_broadcast_sender: SubscriberSender, pub broadcasted_messages_receiver: SubscriberReceiver, } + +type SqmrResponseReceiver = + Chain, fn(Response) -> Option>, Once>>>; diff --git a/crates/papyrus_network/src/utils.rs b/crates/papyrus_network/src/utils.rs index d9a9f418d9..2ac26a8c15 100644 --- a/crates/papyrus_network/src/utils.rs +++ b/crates/papyrus_network/src/utils.rs @@ -60,6 +60,7 @@ impl Stream for StreamHashMap