Skip to content

Commit

Permalink
Introduce TransportIdentity
Browse files Browse the repository at this point in the history
Refactor `RoleResolvingTransport` to make the use of it nicer
  • Loading branch information
akoshelev committed Mar 9, 2024
1 parent d5ea540 commit ae80516
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 133 deletions.
8 changes: 5 additions & 3 deletions ipa-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use thiserror::Error;

use crate::{report::InvalidReportError, task::JoinError};
use crate::{helpers::Role, report::InvalidReportError, sharding::ShardIndex, task::JoinError};

/// An error raised by the IPA protocol.
///
Expand Down Expand Up @@ -52,8 +52,10 @@ pub enum Error {
#[error("failed to parse json: {0}")]
#[cfg(feature = "enable-serde")]
Serde(#[from] serde_json::Error),
#[error("Infrastructure error: {0}")]
InfraError(#[from] crate::helpers::Error),
#[error("MPC Infrastructure error: {0}")]
MpcInfraError(#[from] crate::helpers::Error<Role>),
#[error("Shard Infrastructure error: {0}")]
ShardInfraError(#[from] crate::helpers::Error<ShardIndex>),
#[error("Value truncation error: {0}")]
FieldValueTruncation(String),
#[error("Invalid query parameter: {0}")]
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/helpers/buffers/unordered_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use generic_array::GenericArray;
use typenum::Unsigned;

use crate::{
helpers::{Error, Message},
helpers::{Error, Message, Role},
protocol::RecordId,
sync::{Arc, Mutex},
};
Expand Down Expand Up @@ -160,7 +160,7 @@ pub enum ReceiveError<M: Message> {
#[error("Error deserializing {0:?} record: {1}")]
DeserializationError(RecordId, #[source] M::DeserializationError),
#[error(transparent)]
InfraError(#[from] Error),
InfraError(#[from] Error<Role>),
}

impl<S, C> OperatingState<S, C>
Expand Down
34 changes: 10 additions & 24 deletions ipa-core/src/helpers/error.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,29 @@
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;

use crate::{
error::BoxError,
helpers::{ChannelId, HelperIdentity, Message, Role, TotalRecords},
helpers::{ChannelId, HelperChannelId, HelperIdentity, Role, TotalRecords, TransportIdentity},
protocol::{step::Gate, RecordId},
};

/// An error raised by the IPA supporting infrastructure.
#[derive(Error, Debug)]
pub enum Error {
pub enum Error<I: TransportIdentity> {
#[error("An error occurred while sending data to {channel:?}: {inner}")]
SendError {
channel: ChannelId,
channel: ChannelId<I>,

#[source]
inner: BoxError,
},
#[error("An error occurred while sending data over a reordering channel: {inner}")]
OrderedChannelError {
#[source]
inner: BoxError,
},
#[error("An error occurred while sending data to unknown helper: {inner}")]
PollSendError {
#[source]
inner: BoxError,
},
#[error("An error occurred while receiving data from {source:?}/{step}: {inner}")]
ReceiveError {
source: Role,
source: I,
step: String,
#[source]
inner: BoxError,
Expand All @@ -51,16 +45,16 @@ pub enum Error {
#[error("record ID {record_id:?} is out of range for {channel_id:?} (expected {total_records:?} records)")]
TooManyRecords {
record_id: RecordId,
channel_id: ChannelId,
channel_id: ChannelId<I>,
total_records: TotalRecords,
},
}

impl Error {
impl Error<Role> {
pub fn send_error<E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>>(
channel: ChannelId,
channel: HelperChannelId,

Check warning on line 55 in ipa-core/src/helpers/error.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/helpers/error.rs#L55

Added line #L55 was not covered by tests
inner: E,
) -> Error {
) -> Self {

Check warning on line 57 in ipa-core/src/helpers/error.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/helpers/error.rs#L57

Added line #L57 was not covered by tests
Self::SendError {
channel,
inner: inner.into(),
Expand All @@ -72,7 +66,7 @@ impl Error {
record_id: RecordId,
gate: &Gate,
inner: E,
) -> Error {
) -> Self {

Check warning on line 69 in ipa-core/src/helpers/error.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/helpers/error.rs#L69

Added line #L69 was not covered by tests
Self::SerializationError {
record_id,
step: String::from(gate.as_ref()),
Expand All @@ -81,12 +75,4 @@ impl Error {
}
}

impl<M: Message> From<SendError<(usize, M)>> for Error {
fn from(_: SendError<(usize, M)>) -> Self {
Self::OrderedChannelError {
inner: "ordered string".into(),
}
}
}

pub type Result<T> = std::result::Result<T, Error>;
pub type Result<T> = std::result::Result<T, Error<Role>>;
38 changes: 27 additions & 11 deletions ipa-core/src/helpers/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ pub(super) use stall_detection::InstrumentedGateway;

use crate::{
helpers::{
buffers::UnorderedReceiver,
gateway::{
receive::GatewayReceivers, send::GatewaySenders, transport::RoleResolvingTransport,
},
ChannelId, Message, Role, RoleAssignment, TotalRecords, Transport,
HelperChannelId, HelperIdentity, Message, Role, RoleAssignment, RouteId, TotalRecords,
Transport,
},
protocol::QueryId,
};
Expand All @@ -33,12 +35,13 @@ pub type TransportImpl = super::transport::InMemoryTransport;
#[cfg(feature = "real-world-infra")]
pub type TransportImpl = crate::sync::Arc<crate::net::HttpTransport>;

pub type TransportError = <TransportImpl as Transport>::Error;
pub type TransportError = <TransportImpl as Transport<HelperIdentity>>::Error;

/// Gateway into IPA Network infrastructure. It allows helpers send and receive messages.
pub struct Gateway {
config: GatewayConfig,
transport: RoleResolvingTransport,
query_id: QueryId,
#[cfg(feature = "stall-detection")]
inner: crate::sync::Arc<State>,
#[cfg(not(feature = "stall-detection"))]
Expand Down Expand Up @@ -74,20 +77,19 @@ impl Gateway {
) -> Self {
#[allow(clippy::useless_conversion)] // not useless in stall-detection build
Self {
query_id,
config,
transport: RoleResolvingTransport {
query_id,
roles,
inner: transport,
config,
},
inner: State::default().into(),
}
}

#[must_use]
pub fn role(&self) -> Role {
self.transport.role()
self.transport.identity()
}

#[must_use]
Expand All @@ -101,7 +103,7 @@ impl Gateway {
#[must_use]
pub fn get_sender<M: Message>(
&self,
channel_id: &ChannelId,
channel_id: &HelperChannelId,
total_records: TotalRecords,
) -> send::SendingEnd<M> {
let (tx, maybe_stream) = self.inner.senders.get_or_create::<M>(
Expand All @@ -113,10 +115,15 @@ impl Gateway {
tokio::spawn({
let channel_id = channel_id.clone();
let transport = self.transport.clone();
let query_id = self.query_id;
async move {
// TODO(651): In the HTTP case we probably need more robust error handling here.
transport
.send(&channel_id, stream)
.send(
channel_id.peer,
(RouteId::Records, query_id, channel_id.gate),
stream,
)
.await
.expect("{channel_id:?} receiving end should be accepted by transport");
}
Expand All @@ -127,12 +134,21 @@ impl Gateway {
}

#[must_use]
pub fn get_receiver<M: Message>(&self, channel_id: &ChannelId) -> receive::ReceivingEnd<M> {
pub fn get_receiver<M: Message>(
&self,
channel_id: &HelperChannelId,
) -> receive::ReceivingEnd<M> {
receive::ReceivingEnd::new(
channel_id.clone(),
self.inner
.receivers
.get_or_create(channel_id, || self.transport.receive(channel_id)),
self.inner.receivers.get_or_create(channel_id, || {
UnorderedReceiver::new(
Box::pin(
self.transport
.receive(channel_id.peer, (self.query_id, channel_id.gate.clone())),
),
self.config.active_work(),
)
}),
)
}
}
Expand Down
25 changes: 14 additions & 11 deletions ipa-core/src/helpers/gateway/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,33 @@ use dashmap::{mapref::entry::Entry, DashMap};
use futures::Stream;

use crate::{
helpers::{buffers::UnorderedReceiver, ChannelId, Error, Message, Transport, TransportImpl},
helpers::{
buffers::UnorderedReceiver, gateway::transport::RoleResolvingTransport, Error,
HelperChannelId, Message, Role, Transport,
},
protocol::RecordId,
};

/// Receiving end end of the gateway channel.
/// Receiving end of the gateway channel.
pub struct ReceivingEnd<M: Message> {
channel_id: ChannelId,
channel_id: HelperChannelId,
unordered_rx: UR,
_phantom: PhantomData<M>,
}

/// Receiving channels, indexed by (role, step).
#[derive(Default)]
pub(super) struct GatewayReceivers {
pub(super) inner: DashMap<ChannelId, UR>,
pub(super) inner: DashMap<HelperChannelId, UR>,
}

pub(super) type UR = UnorderedReceiver<
<TransportImpl as Transport>::RecordsStream,
<<TransportImpl as Transport>::RecordsStream as Stream>::Item,
<RoleResolvingTransport as Transport<Role>>::RecordsStream,
<<RoleResolvingTransport as Transport<Role>>::RecordsStream as Stream>::Item,
>;

impl<M: Message> ReceivingEnd<M> {
pub(super) fn new(channel_id: ChannelId, rx: UR) -> Self {
pub(super) fn new(channel_id: HelperChannelId, rx: UR) -> Self {
Self {
channel_id,
unordered_rx: rx,
Expand All @@ -44,21 +47,21 @@ impl<M: Message> ReceivingEnd<M> {
/// ## Panics
/// This will panic if message size does not fit into 8 bytes and it somehow got serialized
/// and sent to this helper.
#[tracing::instrument(level = "trace", "receive", skip_all, fields(i = %record_id, from = ?self.channel_id.role, gate = ?self.channel_id.gate.as_ref()))]
pub async fn receive(&self, record_id: RecordId) -> Result<M, Error> {
#[tracing::instrument(level = "trace", "receive", skip_all, fields(i = %record_id, from = ?self.channel_id.peer, gate = ?self.channel_id.gate.as_ref()))]
pub async fn receive(&self, record_id: RecordId) -> Result<M, Error<Role>> {
self.unordered_rx
.recv::<M, _>(record_id)
.await
.map_err(|e| Error::ReceiveError {
source: self.channel_id.role,
source: self.channel_id.peer,

Check warning on line 56 in ipa-core/src/helpers/gateway/receive.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/helpers/gateway/receive.rs#L56

Added line #L56 was not covered by tests
step: self.channel_id.gate.to_string(),
inner: Box::new(e),
})
}
}

impl GatewayReceivers {
pub fn get_or_create<F: FnOnce() -> UR>(&self, channel_id: &ChannelId, ctr: F) -> UR {
pub fn get_or_create<F: FnOnce() -> UR>(&self, channel_id: &HelperChannelId, ctr: F) -> UR {
// TODO: raw entry API if it becomes available to avoid cloning the key
match self.inner.entry(channel_id.clone()) {
Entry::Occupied(entry) => entry.get().clone(),
Expand Down
24 changes: 14 additions & 10 deletions ipa-core/src/helpers/gateway/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::Stream;
use typenum::Unsigned;

use crate::{
helpers::{buffers::OrderingSender, ChannelId, Error, Message, Role, TotalRecords},
helpers::{buffers::OrderingSender, Error, HelperChannelId, Message, Role, TotalRecords},
protocol::RecordId,
sync::Arc,
telemetry::{
Expand All @@ -23,19 +23,19 @@ use crate::{
/// Sending end of the gateway channel.
pub struct SendingEnd<M: Message> {
sender_role: Role,
channel_id: ChannelId,
channel_id: HelperChannelId,
inner: Arc<GatewaySender>,
_phantom: PhantomData<M>,
}

/// Sending channels, indexed by (role, step).
#[derive(Default)]
pub(super) struct GatewaySenders {
pub(super) inner: DashMap<ChannelId, Arc<GatewaySender>>,
pub(super) inner: DashMap<HelperChannelId, Arc<GatewaySender>>,
}

pub(super) struct GatewaySender {
channel_id: ChannelId,
channel_id: HelperChannelId,
ordering_tx: OrderingSender,
total_records: TotalRecords,
}
Expand All @@ -45,7 +45,7 @@ pub(super) struct GatewaySendStream {
}

impl GatewaySender {
fn new(channel_id: ChannelId, tx: OrderingSender, total_records: TotalRecords) -> Self {
fn new(channel_id: HelperChannelId, tx: OrderingSender, total_records: TotalRecords) -> Self {
Self {
channel_id,
ordering_tx: tx,
Expand All @@ -57,7 +57,7 @@ impl GatewaySender {
&self,
record_id: RecordId,
msg: B,
) -> Result<(), Error> {
) -> Result<(), Error<Role>> {
debug_assert!(
self.total_records.is_specified(),
"total_records cannot be unspecified when sending"
Expand Down Expand Up @@ -95,7 +95,11 @@ impl GatewaySender {
}

impl<M: Message> SendingEnd<M> {
pub(super) fn new(sender: Arc<GatewaySender>, role: Role, channel_id: &ChannelId) -> Self {
pub(super) fn new(
sender: Arc<GatewaySender>,
role: Role,
channel_id: &HelperChannelId,
) -> Self {
Self {
sender_role: role,
channel_id: channel_id.clone(),
Expand All @@ -113,8 +117,8 @@ impl<M: Message> SendingEnd<M> {
/// call.
///
/// [`set_total_records`]: crate::protocol::context::Context::set_total_records
#[tracing::instrument(level = "trace", "send", skip_all, fields(i = %record_id, total = %self.inner.total_records, to = ?self.channel_id.role, gate = ?self.channel_id.gate.as_ref()))]
pub async fn send<B: Borrow<M>>(&self, record_id: RecordId, msg: B) -> Result<(), Error> {
#[tracing::instrument(level = "trace", "send", skip_all, fields(i = %record_id, total = %self.inner.total_records, to = ?self.channel_id.peer, gate = ?self.channel_id.gate.as_ref()))]
pub async fn send<B: Borrow<M>>(&self, record_id: RecordId, msg: B) -> Result<(), Error<Role>> {
let r = self.inner.send(record_id, msg).await;
metrics::increment_counter!(RECORDS_SENT,
STEP => self.channel_id.gate.as_ref().to_string(),
Expand All @@ -135,7 +139,7 @@ impl GatewaySenders {
/// messages to get through.
pub(crate) fn get_or_create<M: Message>(
&self,
channel_id: &ChannelId,
channel_id: &HelperChannelId,
capacity: NonZeroUsize,
total_records: TotalRecords, // TODO track children for indeterminate senders
) -> (Arc<GatewaySender>, Option<GatewaySendStream>) {
Expand Down
Loading

0 comments on commit ae80516

Please sign in to comment.