Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First building blocks for Sharding API: ShardIndex and TransportIdentity #970

Merged
merged 8 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ipa-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use thiserror::Error;

use crate::{report::InvalidReportError, task::JoinError};
use crate::{helpers::Role, report::InvalidReportError, sharding::ShardIndex, task::JoinError};

/// An error raised by the IPA protocol.
///
Expand Down Expand Up @@ -52,8 +52,10 @@ pub enum Error {
#[error("failed to parse json: {0}")]
#[cfg(feature = "enable-serde")]
Serde(#[from] serde_json::Error),
#[error("Infrastructure error: {0}")]
InfraError(#[from] crate::helpers::Error),
#[error("MPC Infrastructure error: {0}")]
MpcInfraError(#[from] crate::helpers::Error<Role>),
#[error("Shard Infrastructure error: {0}")]
ShardInfraError(#[from] crate::helpers::Error<ShardIndex>),
#[error("Value truncation error: {0}")]
FieldValueTruncation(String),
#[error("Invalid query parameter: {0}")]
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/helpers/buffers/unordered_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use generic_array::GenericArray;
use typenum::Unsigned;

use crate::{
helpers::{Error, Message},
helpers::{Error, Message, Role},
protocol::RecordId,
sync::{Arc, Mutex},
};
Expand Down Expand Up @@ -160,7 +160,7 @@ pub enum ReceiveError<M: Message> {
#[error("Error deserializing {0:?} record: {1}")]
DeserializationError(RecordId, #[source] M::DeserializationError),
#[error(transparent)]
InfraError(#[from] Error),
InfraError(#[from] Error<Role>),
}

impl<S, C> OperatingState<S, C>
Expand Down
72 changes: 5 additions & 67 deletions ipa-core/src/helpers/error.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,17 @@
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;

use crate::{
error::BoxError,
helpers::{ChannelId, HelperIdentity, Message, Role, TotalRecords},
protocol::{step::Gate, RecordId},
helpers::{ChannelId, TotalRecords, TransportIdentity},
protocol::RecordId,
};

/// An error raised by the IPA supporting infrastructure.
#[derive(Error, Debug)]
pub enum Error {
#[error("An error occurred while sending data to {channel:?}: {inner}")]
SendError {
channel: ChannelId,

#[source]
inner: BoxError,
},
#[error("An error occurred while sending data over a reordering channel: {inner}")]
OrderedChannelError {
#[source]
inner: BoxError,
},
#[error("An error occurred while sending data to unknown helper: {inner}")]
PollSendError {
#[source]
inner: BoxError,
},
pub enum Error<I: TransportIdentity> {
#[error("An error occurred while receiving data from {source:?}/{step}: {inner}")]
ReceiveError {
source: Role,
source: I,
step: String,
#[source]
inner: BoxError,
Expand All @@ -39,54 +21,10 @@ pub enum Error {
// TODO(mt): add more fields, like step and role.
record_id: RecordId,
},
#[error("An error occurred while serializing or deserializing data for {record_id:?} and step {step}: {inner}")]
SerializationError {
record_id: RecordId,
step: String,
#[source]
inner: BoxError,
},
#[error("Encountered unknown identity {0:?}")]
UnknownIdentity(HelperIdentity),
#[error("record ID {record_id:?} is out of range for {channel_id:?} (expected {total_records:?} records)")]
TooManyRecords {
record_id: RecordId,
channel_id: ChannelId,
channel_id: ChannelId<I>,
total_records: TotalRecords,
},
}

impl Error {
pub fn send_error<E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>>(
channel: ChannelId,
inner: E,
) -> Error {
Self::SendError {
channel,
inner: inner.into(),
}
}

#[must_use]
pub fn serialization_error<E: Into<BoxError>>(
record_id: RecordId,
gate: &Gate,
inner: E,
) -> Error {
Self::SerializationError {
record_id,
step: String::from(gate.as_ref()),
inner: inner.into(),
}
}
}

impl<M: Message> From<SendError<(usize, M)>> for Error {
fn from(_: SendError<(usize, M)>) -> Self {
Self::OrderedChannelError {
inner: "ordered string".into(),
}
}
}

pub type Result<T> = std::result::Result<T, Error>;
40 changes: 28 additions & 12 deletions ipa-core/src/helpers/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ pub(super) use stall_detection::InstrumentedGateway;

use crate::{
helpers::{
buffers::UnorderedReceiver,
gateway::{
receive::GatewayReceivers, send::GatewaySenders, transport::RoleResolvingTransport,
},
ChannelId, Message, Role, RoleAssignment, TotalRecords, Transport,
HelperChannelId, HelperIdentity, Message, Role, RoleAssignment, RouteId, TotalRecords,
Transport,
},
protocol::QueryId,
};
Expand All @@ -28,17 +30,18 @@ use crate::{
/// To avoid proliferation of type parameters, most code references this concrete type alias, rather
/// than a type parameter `T: Transport`.
#[cfg(feature = "in-memory-infra")]
pub type TransportImpl = super::transport::InMemoryTransport;
pub type TransportImpl = super::transport::InMemoryTransport<HelperIdentity>;

#[cfg(feature = "real-world-infra")]
pub type TransportImpl = crate::sync::Arc<crate::net::HttpTransport>;

pub type TransportError = <TransportImpl as Transport>::Error;
pub type TransportError = <TransportImpl as Transport<HelperIdentity>>::Error;

/// Gateway into IPA Network infrastructure. It allows helpers send and receive messages.
pub struct Gateway {
config: GatewayConfig,
transport: RoleResolvingTransport,
query_id: QueryId,
#[cfg(feature = "stall-detection")]
inner: crate::sync::Arc<State>,
#[cfg(not(feature = "stall-detection"))]
Expand Down Expand Up @@ -74,20 +77,19 @@ impl Gateway {
) -> Self {
#[allow(clippy::useless_conversion)] // not useless in stall-detection build
Self {
query_id,
config,
transport: RoleResolvingTransport {
query_id,
roles,
inner: transport,
config,
},
inner: State::default().into(),
}
}

#[must_use]
pub fn role(&self) -> Role {
self.transport.role()
self.transport.identity()
}

#[must_use]
Expand All @@ -101,7 +103,7 @@ impl Gateway {
#[must_use]
pub fn get_sender<M: Message>(
&self,
channel_id: &ChannelId,
channel_id: &HelperChannelId,
total_records: TotalRecords,
) -> send::SendingEnd<M> {
let (tx, maybe_stream) = self.inner.senders.get_or_create::<M>(
Expand All @@ -113,10 +115,15 @@ impl Gateway {
tokio::spawn({
let channel_id = channel_id.clone();
let transport = self.transport.clone();
let query_id = self.query_id;
async move {
// TODO(651): In the HTTP case we probably need more robust error handling here.
transport
.send(&channel_id, stream)
.send(
channel_id.peer,
(RouteId::Records, query_id, channel_id.gate),
stream,
)
.await
.expect("{channel_id:?} receiving end should be accepted by transport");
}
Expand All @@ -127,12 +134,21 @@ impl Gateway {
}

#[must_use]
pub fn get_receiver<M: Message>(&self, channel_id: &ChannelId) -> receive::ReceivingEnd<M> {
pub fn get_receiver<M: Message>(
&self,
channel_id: &HelperChannelId,
) -> receive::ReceivingEnd<M> {
receive::ReceivingEnd::new(
channel_id.clone(),
self.inner
.receivers
.get_or_create(channel_id, || self.transport.receive(channel_id)),
self.inner.receivers.get_or_create(channel_id, || {
UnorderedReceiver::new(
Box::pin(
self.transport
.receive(channel_id.peer, (self.query_id, channel_id.gate.clone())),
),
self.config.active_work(),
)
}),
)
}
}
Expand Down
25 changes: 14 additions & 11 deletions ipa-core/src/helpers/gateway/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,33 @@ use dashmap::{mapref::entry::Entry, DashMap};
use futures::Stream;

use crate::{
helpers::{buffers::UnorderedReceiver, ChannelId, Error, Message, Transport, TransportImpl},
helpers::{
buffers::UnorderedReceiver, gateway::transport::RoleResolvingTransport, Error,
HelperChannelId, Message, Role, Transport,
},
protocol::RecordId,
};

/// Receiving end end of the gateway channel.
/// Receiving end of the gateway channel.
pub struct ReceivingEnd<M: Message> {
channel_id: ChannelId,
channel_id: HelperChannelId,
unordered_rx: UR,
_phantom: PhantomData<M>,
}

/// Receiving channels, indexed by (role, step).
#[derive(Default)]
pub(super) struct GatewayReceivers {
pub(super) inner: DashMap<ChannelId, UR>,
pub(super) inner: DashMap<HelperChannelId, UR>,
}

pub(super) type UR = UnorderedReceiver<
<TransportImpl as Transport>::RecordsStream,
<<TransportImpl as Transport>::RecordsStream as Stream>::Item,
<RoleResolvingTransport as Transport<Role>>::RecordsStream,
<<RoleResolvingTransport as Transport<Role>>::RecordsStream as Stream>::Item,
>;

impl<M: Message> ReceivingEnd<M> {
pub(super) fn new(channel_id: ChannelId, rx: UR) -> Self {
pub(super) fn new(channel_id: HelperChannelId, rx: UR) -> Self {
Self {
channel_id,
unordered_rx: rx,
Expand All @@ -44,21 +47,21 @@ impl<M: Message> ReceivingEnd<M> {
/// ## Panics
/// This will panic if message size does not fit into 8 bytes and it somehow got serialized
/// and sent to this helper.
#[tracing::instrument(level = "trace", "receive", skip_all, fields(i = %record_id, from = ?self.channel_id.role, gate = ?self.channel_id.gate.as_ref()))]
pub async fn receive(&self, record_id: RecordId) -> Result<M, Error> {
#[tracing::instrument(level = "trace", "receive", skip_all, fields(i = %record_id, from = ?self.channel_id.peer, gate = ?self.channel_id.gate.as_ref()))]
pub async fn receive(&self, record_id: RecordId) -> Result<M, Error<Role>> {
self.unordered_rx
.recv::<M, _>(record_id)
.await
.map_err(|e| Error::ReceiveError {
source: self.channel_id.role,
source: self.channel_id.peer,
step: self.channel_id.gate.to_string(),
inner: Box::new(e),
})
}
}

impl GatewayReceivers {
pub fn get_or_create<F: FnOnce() -> UR>(&self, channel_id: &ChannelId, ctr: F) -> UR {
pub fn get_or_create<F: FnOnce() -> UR>(&self, channel_id: &HelperChannelId, ctr: F) -> UR {
// TODO: raw entry API if it becomes available to avoid cloning the key
match self.inner.entry(channel_id.clone()) {
Entry::Occupied(entry) => entry.get().clone(),
Expand Down
24 changes: 14 additions & 10 deletions ipa-core/src/helpers/gateway/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::Stream;
use typenum::Unsigned;

use crate::{
helpers::{buffers::OrderingSender, ChannelId, Error, Message, Role, TotalRecords},
helpers::{buffers::OrderingSender, Error, HelperChannelId, Message, Role, TotalRecords},
protocol::RecordId,
sync::Arc,
telemetry::{
Expand All @@ -23,19 +23,19 @@ use crate::{
/// Sending end of the gateway channel.
pub struct SendingEnd<M: Message> {
sender_role: Role,
channel_id: ChannelId,
channel_id: HelperChannelId,
inner: Arc<GatewaySender>,
_phantom: PhantomData<M>,
}

/// Sending channels, indexed by (role, step).
#[derive(Default)]
pub(super) struct GatewaySenders {
pub(super) inner: DashMap<ChannelId, Arc<GatewaySender>>,
pub(super) inner: DashMap<HelperChannelId, Arc<GatewaySender>>,
}

pub(super) struct GatewaySender {
channel_id: ChannelId,
channel_id: HelperChannelId,
ordering_tx: OrderingSender,
total_records: TotalRecords,
}
Expand All @@ -45,7 +45,7 @@ pub(super) struct GatewaySendStream {
}

impl GatewaySender {
fn new(channel_id: ChannelId, tx: OrderingSender, total_records: TotalRecords) -> Self {
fn new(channel_id: HelperChannelId, tx: OrderingSender, total_records: TotalRecords) -> Self {
Self {
channel_id,
ordering_tx: tx,
Expand All @@ -57,7 +57,7 @@ impl GatewaySender {
&self,
record_id: RecordId,
msg: B,
) -> Result<(), Error> {
) -> Result<(), Error<Role>> {
debug_assert!(
self.total_records.is_specified(),
"total_records cannot be unspecified when sending"
Expand Down Expand Up @@ -95,7 +95,11 @@ impl GatewaySender {
}

impl<M: Message> SendingEnd<M> {
pub(super) fn new(sender: Arc<GatewaySender>, role: Role, channel_id: &ChannelId) -> Self {
pub(super) fn new(
sender: Arc<GatewaySender>,
role: Role,
channel_id: &HelperChannelId,
) -> Self {
Self {
sender_role: role,
channel_id: channel_id.clone(),
Expand All @@ -113,8 +117,8 @@ impl<M: Message> SendingEnd<M> {
/// call.
///
/// [`set_total_records`]: crate::protocol::context::Context::set_total_records
#[tracing::instrument(level = "trace", "send", skip_all, fields(i = %record_id, total = %self.inner.total_records, to = ?self.channel_id.role, gate = ?self.channel_id.gate.as_ref()))]
pub async fn send<B: Borrow<M>>(&self, record_id: RecordId, msg: B) -> Result<(), Error> {
#[tracing::instrument(level = "trace", "send", skip_all, fields(i = %record_id, total = %self.inner.total_records, to = ?self.channel_id.peer, gate = ?self.channel_id.gate.as_ref()))]
pub async fn send<B: Borrow<M>>(&self, record_id: RecordId, msg: B) -> Result<(), Error<Role>> {
let r = self.inner.send(record_id, msg).await;
metrics::increment_counter!(RECORDS_SENT,
STEP => self.channel_id.gate.as_ref().to_string(),
Expand All @@ -135,7 +139,7 @@ impl GatewaySenders {
/// messages to get through.
pub(crate) fn get_or_create<M: Message>(
&self,
channel_id: &ChannelId,
channel_id: &HelperChannelId,
capacity: NonZeroUsize,
total_records: TotalRecords, // TODO track children for indeterminate senders
) -> (Arc<GatewaySender>, Option<GatewaySendStream>) {
Expand Down
Loading