diff --git a/ipa-core/src/error.rs b/ipa-core/src/error.rs index 5fb22dfca..0bd38cda2 100644 --- a/ipa-core/src/error.rs +++ b/ipa-core/src/error.rs @@ -6,7 +6,7 @@ use std::{ use thiserror::Error; -use crate::{report::InvalidReportError, task::JoinError}; +use crate::{helpers::Role, report::InvalidReportError, sharding::ShardIndex, task::JoinError}; /// An error raised by the IPA protocol. /// @@ -52,8 +52,10 @@ pub enum Error { #[error("failed to parse json: {0}")] #[cfg(feature = "enable-serde")] Serde(#[from] serde_json::Error), - #[error("Infrastructure error: {0}")] - InfraError(#[from] crate::helpers::Error), + #[error("MPC Infrastructure error: {0}")] + MpcInfraError(#[from] crate::helpers::Error), + #[error("Shard Infrastructure error: {0}")] + ShardInfraError(#[from] crate::helpers::Error), #[error("Value truncation error: {0}")] FieldValueTruncation(String), #[error("Invalid query parameter: {0}")] diff --git a/ipa-core/src/helpers/buffers/unordered_receiver.rs b/ipa-core/src/helpers/buffers/unordered_receiver.rs index 4a236a59d..f9ba225bb 100644 --- a/ipa-core/src/helpers/buffers/unordered_receiver.rs +++ b/ipa-core/src/helpers/buffers/unordered_receiver.rs @@ -11,7 +11,7 @@ use generic_array::GenericArray; use typenum::Unsigned; use crate::{ - helpers::{Error, Message}, + helpers::{Error, Message, Role}, protocol::RecordId, sync::{Arc, Mutex}, }; @@ -160,7 +160,7 @@ pub enum ReceiveError { #[error("Error deserializing {0:?} record: {1}")] DeserializationError(RecordId, #[source] M::DeserializationError), #[error(transparent)] - InfraError(#[from] Error), + InfraError(#[from] Error), } impl OperatingState diff --git a/ipa-core/src/helpers/error.rs b/ipa-core/src/helpers/error.rs index d73c38359..9de2b24bb 100644 --- a/ipa-core/src/helpers/error.rs +++ b/ipa-core/src/helpers/error.rs @@ -1,27 +1,21 @@ use thiserror::Error; -use tokio::sync::mpsc::error::SendError; use crate::{ error::BoxError, - helpers::{ChannelId, HelperIdentity, Message, Role, TotalRecords}, + helpers::{ChannelId, HelperChannelId, HelperIdentity, Role, TotalRecords, TransportIdentity}, protocol::{step::Gate, RecordId}, }; /// An error raised by the IPA supporting infrastructure. #[derive(Error, Debug)] -pub enum Error { +pub enum Error { #[error("An error occurred while sending data to {channel:?}: {inner}")] SendError { - channel: ChannelId, + channel: ChannelId, #[source] inner: BoxError, }, - #[error("An error occurred while sending data over a reordering channel: {inner}")] - OrderedChannelError { - #[source] - inner: BoxError, - }, #[error("An error occurred while sending data to unknown helper: {inner}")] PollSendError { #[source] @@ -29,7 +23,7 @@ pub enum Error { }, #[error("An error occurred while receiving data from {source:?}/{step}: {inner}")] ReceiveError { - source: Role, + source: I, step: String, #[source] inner: BoxError, @@ -51,16 +45,16 @@ pub enum Error { #[error("record ID {record_id:?} is out of range for {channel_id:?} (expected {total_records:?} records)")] TooManyRecords { record_id: RecordId, - channel_id: ChannelId, + channel_id: ChannelId, total_records: TotalRecords, }, } -impl Error { +impl Error { pub fn send_error>>( - channel: ChannelId, + channel: HelperChannelId, inner: E, - ) -> Error { + ) -> Self { Self::SendError { channel, inner: inner.into(), @@ -72,7 +66,7 @@ impl Error { record_id: RecordId, gate: &Gate, inner: E, - ) -> Error { + ) -> Self { Self::SerializationError { record_id, step: String::from(gate.as_ref()), @@ -81,12 +75,4 @@ impl Error { } } -impl From> for Error { - fn from(_: SendError<(usize, M)>) -> Self { - Self::OrderedChannelError { - inner: "ordered string".into(), - } - } -} - -pub type Result = std::result::Result; +pub type Result = std::result::Result>; diff --git a/ipa-core/src/helpers/gateway/mod.rs b/ipa-core/src/helpers/gateway/mod.rs index 018431d62..436c3dccf 100644 --- a/ipa-core/src/helpers/gateway/mod.rs +++ b/ipa-core/src/helpers/gateway/mod.rs @@ -15,10 +15,12 @@ pub(super) use stall_detection::InstrumentedGateway; use crate::{ helpers::{ + buffers::UnorderedReceiver, gateway::{ receive::GatewayReceivers, send::GatewaySenders, transport::RoleResolvingTransport, }, - ChannelId, Message, Role, RoleAssignment, TotalRecords, Transport, + HelperChannelId, HelperIdentity, Message, Role, RoleAssignment, RouteId, TotalRecords, + Transport, }, protocol::QueryId, }; @@ -33,12 +35,13 @@ pub type TransportImpl = super::transport::InMemoryTransport; #[cfg(feature = "real-world-infra")] pub type TransportImpl = crate::sync::Arc; -pub type TransportError = ::Error; +pub type TransportError = >::Error; /// Gateway into IPA Network infrastructure. It allows helpers send and receive messages. pub struct Gateway { config: GatewayConfig, transport: RoleResolvingTransport, + query_id: QueryId, #[cfg(feature = "stall-detection")] inner: crate::sync::Arc, #[cfg(not(feature = "stall-detection"))] @@ -74,12 +77,11 @@ impl Gateway { ) -> Self { #[allow(clippy::useless_conversion)] // not useless in stall-detection build Self { + query_id, config, transport: RoleResolvingTransport { - query_id, roles, inner: transport, - config, }, inner: State::default().into(), } @@ -87,7 +89,7 @@ impl Gateway { #[must_use] pub fn role(&self) -> Role { - self.transport.role() + self.transport.identity() } #[must_use] @@ -101,7 +103,7 @@ impl Gateway { #[must_use] pub fn get_sender( &self, - channel_id: &ChannelId, + channel_id: &HelperChannelId, total_records: TotalRecords, ) -> send::SendingEnd { let (tx, maybe_stream) = self.inner.senders.get_or_create::( @@ -113,10 +115,15 @@ impl Gateway { tokio::spawn({ let channel_id = channel_id.clone(); let transport = self.transport.clone(); + let query_id = self.query_id; async move { // TODO(651): In the HTTP case we probably need more robust error handling here. transport - .send(&channel_id, stream) + .send( + channel_id.peer, + (RouteId::Records, query_id, channel_id.gate), + stream, + ) .await .expect("{channel_id:?} receiving end should be accepted by transport"); } @@ -127,12 +134,21 @@ impl Gateway { } #[must_use] - pub fn get_receiver(&self, channel_id: &ChannelId) -> receive::ReceivingEnd { + pub fn get_receiver( + &self, + channel_id: &HelperChannelId, + ) -> receive::ReceivingEnd { receive::ReceivingEnd::new( channel_id.clone(), - self.inner - .receivers - .get_or_create(channel_id, || self.transport.receive(channel_id)), + self.inner.receivers.get_or_create(channel_id, || { + UnorderedReceiver::new( + Box::pin( + self.transport + .receive(channel_id.peer, (self.query_id, channel_id.gate.clone())), + ), + self.config.active_work(), + ) + }), ) } } diff --git a/ipa-core/src/helpers/gateway/receive.rs b/ipa-core/src/helpers/gateway/receive.rs index 0b2686ff1..6326cf5c3 100644 --- a/ipa-core/src/helpers/gateway/receive.rs +++ b/ipa-core/src/helpers/gateway/receive.rs @@ -4,13 +4,16 @@ use dashmap::{mapref::entry::Entry, DashMap}; use futures::Stream; use crate::{ - helpers::{buffers::UnorderedReceiver, ChannelId, Error, Message, Transport, TransportImpl}, + helpers::{ + buffers::UnorderedReceiver, gateway::transport::RoleResolvingTransport, Error, + HelperChannelId, Message, Role, Transport, + }, protocol::RecordId, }; -/// Receiving end end of the gateway channel. +/// Receiving end of the gateway channel. pub struct ReceivingEnd { - channel_id: ChannelId, + channel_id: HelperChannelId, unordered_rx: UR, _phantom: PhantomData, } @@ -18,16 +21,16 @@ pub struct ReceivingEnd { /// Receiving channels, indexed by (role, step). #[derive(Default)] pub(super) struct GatewayReceivers { - pub(super) inner: DashMap, + pub(super) inner: DashMap, } pub(super) type UR = UnorderedReceiver< - ::RecordsStream, - <::RecordsStream as Stream>::Item, + >::RecordsStream, + <>::RecordsStream as Stream>::Item, >; impl ReceivingEnd { - pub(super) fn new(channel_id: ChannelId, rx: UR) -> Self { + pub(super) fn new(channel_id: HelperChannelId, rx: UR) -> Self { Self { channel_id, unordered_rx: rx, @@ -44,13 +47,13 @@ impl ReceivingEnd { /// ## Panics /// This will panic if message size does not fit into 8 bytes and it somehow got serialized /// and sent to this helper. - #[tracing::instrument(level = "trace", "receive", skip_all, fields(i = %record_id, from = ?self.channel_id.role, gate = ?self.channel_id.gate.as_ref()))] - pub async fn receive(&self, record_id: RecordId) -> Result { + #[tracing::instrument(level = "trace", "receive", skip_all, fields(i = %record_id, from = ?self.channel_id.peer, gate = ?self.channel_id.gate.as_ref()))] + pub async fn receive(&self, record_id: RecordId) -> Result> { self.unordered_rx .recv::(record_id) .await .map_err(|e| Error::ReceiveError { - source: self.channel_id.role, + source: self.channel_id.peer, step: self.channel_id.gate.to_string(), inner: Box::new(e), }) @@ -58,7 +61,7 @@ impl ReceivingEnd { } impl GatewayReceivers { - pub fn get_or_create UR>(&self, channel_id: &ChannelId, ctr: F) -> UR { + pub fn get_or_create UR>(&self, channel_id: &HelperChannelId, ctr: F) -> UR { // TODO: raw entry API if it becomes available to avoid cloning the key match self.inner.entry(channel_id.clone()) { Entry::Occupied(entry) => entry.get().clone(), diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index 00d8de096..473deb486 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -11,7 +11,7 @@ use futures::Stream; use typenum::Unsigned; use crate::{ - helpers::{buffers::OrderingSender, ChannelId, Error, Message, Role, TotalRecords}, + helpers::{buffers::OrderingSender, Error, HelperChannelId, Message, Role, TotalRecords}, protocol::RecordId, sync::Arc, telemetry::{ @@ -23,7 +23,7 @@ use crate::{ /// Sending end of the gateway channel. pub struct SendingEnd { sender_role: Role, - channel_id: ChannelId, + channel_id: HelperChannelId, inner: Arc, _phantom: PhantomData, } @@ -31,11 +31,11 @@ pub struct SendingEnd { /// Sending channels, indexed by (role, step). #[derive(Default)] pub(super) struct GatewaySenders { - pub(super) inner: DashMap>, + pub(super) inner: DashMap>, } pub(super) struct GatewaySender { - channel_id: ChannelId, + channel_id: HelperChannelId, ordering_tx: OrderingSender, total_records: TotalRecords, } @@ -45,7 +45,7 @@ pub(super) struct GatewaySendStream { } impl GatewaySender { - fn new(channel_id: ChannelId, tx: OrderingSender, total_records: TotalRecords) -> Self { + fn new(channel_id: HelperChannelId, tx: OrderingSender, total_records: TotalRecords) -> Self { Self { channel_id, ordering_tx: tx, @@ -57,7 +57,7 @@ impl GatewaySender { &self, record_id: RecordId, msg: B, - ) -> Result<(), Error> { + ) -> Result<(), Error> { debug_assert!( self.total_records.is_specified(), "total_records cannot be unspecified when sending" @@ -95,7 +95,11 @@ impl GatewaySender { } impl SendingEnd { - pub(super) fn new(sender: Arc, role: Role, channel_id: &ChannelId) -> Self { + pub(super) fn new( + sender: Arc, + role: Role, + channel_id: &HelperChannelId, + ) -> Self { Self { sender_role: role, channel_id: channel_id.clone(), @@ -113,8 +117,8 @@ impl SendingEnd { /// call. /// /// [`set_total_records`]: crate::protocol::context::Context::set_total_records - #[tracing::instrument(level = "trace", "send", skip_all, fields(i = %record_id, total = %self.inner.total_records, to = ?self.channel_id.role, gate = ?self.channel_id.gate.as_ref()))] - pub async fn send>(&self, record_id: RecordId, msg: B) -> Result<(), Error> { + #[tracing::instrument(level = "trace", "send", skip_all, fields(i = %record_id, total = %self.inner.total_records, to = ?self.channel_id.peer, gate = ?self.channel_id.gate.as_ref()))] + pub async fn send>(&self, record_id: RecordId, msg: B) -> Result<(), Error> { let r = self.inner.send(record_id, msg).await; metrics::increment_counter!(RECORDS_SENT, STEP => self.channel_id.gate.as_ref().to_string(), @@ -135,7 +139,7 @@ impl GatewaySenders { /// messages to get through. pub(crate) fn get_or_create( &self, - channel_id: &ChannelId, + channel_id: &HelperChannelId, capacity: NonZeroUsize, total_records: TotalRecords, // TODO track children for indeterminate senders ) -> (Arc, Option) { diff --git a/ipa-core/src/helpers/gateway/stall_detection.rs b/ipa-core/src/helpers/gateway/stall_detection.rs index 9a1b28732..654fbb11c 100644 --- a/ipa-core/src/helpers/gateway/stall_detection.rs +++ b/ipa-core/src/helpers/gateway/stall_detection.rs @@ -74,8 +74,8 @@ mod gateway { use crate::{ helpers::{ gateway::{Gateway, State}, - ChannelId, GatewayConfig, Message, ReceivingEnd, Role, RoleAssignment, SendingEnd, - TotalRecords, TransportImpl, + GatewayConfig, HelperChannelId, Message, ReceivingEnd, Role, RoleAssignment, + SendingEnd, TotalRecords, TransportImpl, }, protocol::QueryId, sync::Arc, @@ -149,7 +149,7 @@ mod gateway { #[must_use] pub fn get_sender( &self, - channel_id: &ChannelId, + channel_id: &HelperChannelId, total_records: TotalRecords, ) -> SendingEnd { Observed::wrap( @@ -159,7 +159,7 @@ mod gateway { } #[must_use] - pub fn get_receiver(&self, channel_id: &ChannelId) -> ReceivingEnd { + pub fn get_receiver(&self, channel_id: &HelperChannelId) -> ReceivingEnd { Observed::wrap( Weak::clone(self.get_sn()), self.inner().gateway.get_receiver(channel_id), @@ -221,7 +221,7 @@ mod receive { helpers::{ error::Error, gateway::{receive::GatewayReceivers, ReceivingEnd}, - ChannelId, Message, + HelperChannelId, Message, Role, }, protocol::RecordId, }; @@ -230,12 +230,12 @@ mod receive { delegate::delegate! { to { self.advance(); self.inner() } { #[inline] - pub async fn receive(&self, record_id: RecordId) -> Result; + pub async fn receive(&self, record_id: RecordId) -> Result>; } } } - pub struct WaitingTasks(BTreeMap>); + pub struct WaitingTasks(BTreeMap>); impl Debug for WaitingTasks { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -243,7 +243,7 @@ mod receive { write!( f, "\n\"{:?}\", from={:?}. Waiting to receive records {:?}.", - channel.gate, channel.role, records + channel.gate, channel.peer, records )?; } @@ -280,7 +280,7 @@ mod send { helpers::{ error::Error, gateway::send::{GatewaySender, GatewaySenders}, - ChannelId, Message, TotalRecords, + HelperChannelId, Message, Role, TotalRecords, }, protocol::RecordId, }; @@ -289,12 +289,12 @@ mod send { delegate::delegate! { to { self.advance(); self.inner() } { #[inline] - pub async fn send>(&self, record_id: RecordId, msg: B) -> Result<(), Error>; + pub async fn send>(&self, record_id: RecordId, msg: B) -> Result<(), Error>; } } } - pub struct WaitingTasks(BTreeMap)>); + pub struct WaitingTasks(BTreeMap)>); impl Debug for WaitingTasks { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -302,7 +302,7 @@ mod send { write!( f, "\n\"{:?}\", to={:?}. Waiting to send records {:?} out of {total:?}.", - channel.gate, channel.role, records + channel.gate, channel.peer, records )?; } diff --git a/ipa-core/src/helpers/gateway/transport.rs b/ipa-core/src/helpers/gateway/transport.rs index 8c90a29ee..efbc90970 100644 --- a/ipa-core/src/helpers/gateway/transport.rs +++ b/ipa-core/src/helpers/gateway/transport.rs @@ -1,64 +1,94 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use async_trait::async_trait; +use futures::Stream; + use crate::{ helpers::{ - buffers::UnorderedReceiver, - gateway::{receive::UR, send::GatewaySendStream}, - ChannelId, GatewayConfig, Role, RoleAssignment, RouteId, Transport, TransportImpl, + HelperIdentity, NoResourceIdentifier, QueryIdBinding, Role, RoleAssignment, RouteId, + RouteParams, StepBinding, Transport, TransportImpl, }, - protocol::QueryId, + protocol::{step::Gate, QueryId}, }; +#[derive(Debug, thiserror::Error)] +#[error("Failed to send to {0:?}: {1:?}")] +pub struct SendToRoleError(Role, >::Error); + +/// This struct exists to hide the generic type used to index streams internally. +#[pin_project::pin_project] +pub struct RoleRecordsStream(#[pin] >::RecordsStream); + /// Transport adapter that resolves [`Role`] -> [`HelperIdentity`] mapping. As gateways created /// per query, it is not ambiguous. /// /// [`HelperIdentity`]: crate::helpers::HelperIdentity #[derive(Clone)] -pub(super) struct RoleResolvingTransport { - pub query_id: QueryId, - pub roles: RoleAssignment, - pub config: GatewayConfig, - pub inner: TransportImpl, +pub struct RoleResolvingTransport { + pub(super) roles: RoleAssignment, + pub(super) inner: TransportImpl, +} + +impl Stream for RoleRecordsStream { + type Item = Vec; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_next(cx) + } } -impl RoleResolvingTransport { - pub(crate) async fn send( +#[async_trait] +impl Transport for RoleResolvingTransport { + type RecordsStream = RoleRecordsStream; + type Error = SendToRoleError; + + fn identity(&self) -> Role { + let helper_identity = self.inner.identity(); + self.roles.role(helper_identity) + } + + async fn send< + D: Stream> + Send + 'static, + Q: QueryIdBinding, + S: StepBinding, + R: RouteParams, + >( &self, - channel_id: &ChannelId, - data: GatewaySendStream, - ) -> Result<(), ::Error> { - let dest_identity = self.roles.identity(channel_id.role); + dest: Role, + route: R, + data: D, + ) -> Result<(), Self::Error> + where + Option: From, + Option: From, + { + let dest_helper = self.roles.identity(dest); assert_ne!( - dest_identity, + dest_helper, self.inner.identity(), "can't send message to itself" ); - self.inner - .send( - dest_identity, - (RouteId::Records, self.query_id, channel_id.gate.clone()), - data, - ) + .send(dest_helper, route, data) .await + .map_err(|e| SendToRoleError(dest, e)) } - pub(crate) fn receive(&self, channel_id: &ChannelId) -> UR { - let peer = self.roles.identity(channel_id.role); + fn receive>( + &self, + from: Role, + route: R, + ) -> Self::RecordsStream { + let origin_helper = self.roles.identity(from); assert_ne!( - peer, + origin_helper, self.inner.identity(), "can't receive message from itself" ); - UnorderedReceiver::new( - Box::pin( - self.inner - .receive(peer, (self.query_id, channel_id.gate.clone())), - ), - self.config.active_work(), - ) - } - - pub(crate) fn role(&self) -> Role { - self.roles.role(self.inner.identity()) + RoleRecordsStream(self.inner.receive(origin_helper, route)) } } diff --git a/ipa-core/src/helpers/mod.rs b/ipa-core/src/helpers/mod.rs index b2b15f305..5a1105a05 100644 --- a/ipa-core/src/helpers/mod.rs +++ b/ipa-core/src/helpers/mod.rs @@ -49,9 +49,10 @@ pub use prss_protocol::negotiate as negotiate_prss; #[cfg(feature = "web-app")] pub use transport::WrappedAxumBodyStream; pub use transport::{ - callbacks::*, query, BodyStream, BytesStream, LengthDelimitedStream, LogErrors, - NoResourceIdentifier, QueryIdBinding, ReceiveRecords, RecordsStream, RouteId, RouteParams, - StepBinding, StreamCollection, StreamKey, Transport, WrappedBoxBodyStream, + callbacks::*, query, BodyStream, BytesStream, Identity as TransportIdentity, + LengthDelimitedStream, LogErrors, NoResourceIdentifier, QueryIdBinding, ReceiveRecords, + RecordsStream, RouteId, RouteParams, StepBinding, StreamCollection, StreamKey, Transport, + WrappedBoxBodyStream, }; #[cfg(feature = "in-memory-infra")] pub use transport::{InMemoryNetwork, InMemoryTransport}; @@ -405,23 +406,25 @@ impl TryFrom<[Role; 3]> for RoleAssignment { /// Combination of helper role and step that uniquely identifies a single channel of communication /// between two helpers. #[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] -pub struct ChannelId { - pub role: Role, +pub struct ChannelId { + pub peer: I, // TODO: step could be either reference or owned value. references are convenient to use inside // gateway , owned values can be used inside lookup tables. pub gate: Gate, } -impl ChannelId { +pub type HelperChannelId = ChannelId; + +impl ChannelId { #[must_use] - pub fn new(role: Role, gate: Gate) -> Self { - Self { role, gate } + pub fn new(peer: I, gate: Gate) -> Self { + Self { peer, gate } } } -impl Debug for ChannelId { +impl Debug for ChannelId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "channel[{:?},{:?}]", self.role, self.gate.as_ref()) + write!(f, "channel[{:?},{:?}]", self.peer, self.gate.as_ref()) } } diff --git a/ipa-core/src/helpers/prss_protocol.rs b/ipa-core/src/helpers/prss_protocol.rs index 4dddd21fb..348a36596 100644 --- a/ipa-core/src/helpers/prss_protocol.rs +++ b/ipa-core/src/helpers/prss_protocol.rs @@ -3,7 +3,7 @@ use rand_core::{CryptoRng, RngCore}; use x25519_dalek::PublicKey; use crate::{ - helpers::{ChannelId, Direction, Error, Gateway, TotalRecords}, + helpers::{ChannelId, Direction, Error, Gateway, Role, TotalRecords}, protocol::{ prss, step::{Gate, Step, StepNarrow}, @@ -28,7 +28,7 @@ pub async fn negotiate( gateway: &Gateway, gate: &Gate, rng: &mut R, -) -> Result { +) -> Result> { // setup protocol to exchange prss public keys. This protocol sends one message per peer. // Each message contains this helper's public key. At the end of this protocol, all helpers // have completed key exchange and each of them have established a shared secret with each peer. diff --git a/ipa-core/src/helpers/transport/in_memory/transport.rs b/ipa-core/src/helpers/transport/in_memory/transport.rs index f23d586bc..ab88e24e1 100644 --- a/ipa-core/src/helpers/transport/in_memory/transport.rs +++ b/ipa-core/src/helpers/transport/in_memory/transport.rs @@ -151,7 +151,7 @@ impl InMemoryTransport { } #[async_trait] -impl Transport for Weak { +impl Transport for Weak { type RecordsStream = ReceiveRecords; type Error = Error; diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index acbbb8e8e..2bdcb5ace 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -1,4 +1,4 @@ -use std::borrow::Borrow; +use std::{borrow::Borrow, fmt::Debug, hash::Hash}; use async_trait::async_trait; use futures::Stream; @@ -25,6 +25,19 @@ pub use stream::{ WrappedBoxBodyStream, }; +use crate::{helpers::Role, sharding::ShardIndex}; + +/// An identity of a peer that can be communicated with using [`Transport`]. There are currently two +/// types of peers - helpers and shards. +pub trait Identity: Copy + Clone + Debug + PartialEq + Eq + Hash + Send + Sync + 'static {} + +impl Identity for ShardIndex {} +impl Identity for HelperIdentity {} + +/// Role is an identifier of helper peer, only valid within a given query. For every query, there +/// exists a static mapping from role to helper identity. +impl Identity for Role {} + pub trait ResourceIdentifier: Sized {} pub trait QueryIdBinding: Sized where @@ -125,21 +138,16 @@ impl RouteParams for (RouteId, QueryId, Gate) { /// Transport that supports per-query,per-step channels #[async_trait] -pub trait Transport: Clone + Send + Sync + 'static { +pub trait Transport: Clone + Send + Sync + 'static { type RecordsStream: Stream> + Send + Unpin; type Error: std::fmt::Debug; - fn identity(&self) -> HelperIdentity; + fn identity(&self) -> I; /// Sends a new request to the given destination helper party. /// Depending on the specific request, it may or may not require acknowledgment by the remote /// party - async fn send( - &self, - dest: HelperIdentity, - route: R, - data: D, - ) -> Result<(), Self::Error> + async fn send(&self, dest: I, route: R, data: D) -> Result<(), Self::Error> where Option: From, Option: From, @@ -152,7 +160,7 @@ pub trait Transport: Clone + Send + Sync + 'static { /// and step fn receive>( &self, - from: HelperIdentity, + from: I, route: R, ) -> Self::RecordsStream; diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index fcdc63c33..048430a8a 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -123,7 +123,7 @@ impl HttpTransport { } #[async_trait] -impl Transport for Arc { +impl Transport for Arc { type RecordsStream = ReceiveRecords; type Error = Error;