Skip to content

Commit

Permalink
Much better!
Browse files Browse the repository at this point in the history
  • Loading branch information
djordon committed Nov 20, 2024
1 parent 5963fff commit b1d0873
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 141 deletions.
24 changes: 1 addition & 23 deletions signer/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::bitcoin::BitcoinInteract;
use crate::config::Settings;
use crate::emily_client::EmilyInteract;
use crate::error::Error;
use crate::network::MessageTransfer;
use crate::stacks::api::StacksInteract;
use crate::storage::DbRead;
use crate::storage::DbWrite;
Expand Down Expand Up @@ -59,16 +58,14 @@ pub trait Context: Clone + Sync + Send {
/// later return `Some(_)`. But if [`StreamExt::next`] yields `None`
/// three times then the stream is "fused" and will return `None`
/// forever after.
fn as_signal_stream<F, M>(&self, network: &M, predicate: F) -> ReceiverStream<SignerSignal>
fn as_signal_stream<F>(&self, predicate: F) -> ReceiverStream<SignerSignal>
where
M: MessageTransfer,
F: Fn(&SignerSignal) -> bool + Send + Sync + 'static,
{
let (sender, receiver) = tokio::sync::mpsc::channel(SIGNER_CHANNEL_CAPACITY);

let mut watch_receiver = self.get_termination_handle();
let mut signal_stream = self.get_signal_receiver();
let mut network_stream = network.as_receiver();

tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -101,25 +98,6 @@ pub trait Context: Clone + Sync + Send {
}
}
}
item = network_stream.recv() => {
match item {
Some(msg) => {
let signal = P2PEvent::MessageReceived(msg).into();
if predicate(&signal) {
// See comment above, we can bail.
if sender.send(signal).await.is_err() {
break;
}
}
}
// This means the channel has been closed.
// Since this is the network stream we can bail
// here, we're probably on our way down.
None => {
break;
}
}
}
}
}
});
Expand Down
28 changes: 2 additions & 26 deletions signer/src/network/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use std::{
sync::{atomic::AtomicU16, Arc},
};

use tokio::sync::mpsc::Receiver;
use tokio::sync::{broadcast, Mutex};
use tokio::sync::broadcast;
use tokio::sync::Mutex;

use crate::error::Error;

Expand Down Expand Up @@ -103,30 +103,6 @@ impl super::MessageTransfer for MpmcBroadcaster {

Ok(msg)
}

fn as_receiver(&self) -> Receiver<super::Msg> {
let (sender, receiver) = tokio::sync::mpsc::channel(1000);
let mut signal_rx = self.receiver.resubscribe();
let recently_sent = self.recently_sent.clone();
tokio::spawn(async move {
loop {
match signal_rx.recv().await {
Ok(mut msg) => {
while Some(&msg.id()) == recently_sent.lock().await.front() {
recently_sent.lock().await.pop_front();
msg = signal_rx.recv().await.map_err(Error::ChannelReceive)?;
}
let _ = sender.send(msg).await;
}
Err(error) => {
tracing::error!(%error, "got a receive error");
return Err::<(), _>(Error::SignerShutdown);
}
}
}
});
receiver
}
}

#[cfg(test)]
Expand Down
24 changes: 0 additions & 24 deletions signer/src/network/in_memory2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::Duration;

use futures::StreamExt;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::BroadcastStream;

use crate::error::Error;
Expand Down Expand Up @@ -155,29 +154,6 @@ impl MessageTransfer for SignerNetworkInstance {
interval.tick().await;
}
}

fn as_receiver(&self) -> Receiver<Msg> {
let (sender, receiver) = tokio::sync::mpsc::channel(DEFAULT_SIGNER_CAPACITY);
let mut signal_rx = self.instance_rx.resubscribe();

tokio::spawn(async move {
// If we get an error that means that all senders have been
// dropped and the channel has been closed, or the channel is
// full. We bail in both cases because we can, this code is for
// tests anyway.
while let Ok(msg) = signal_rx.recv().await {
// Because there could only be one receiver, an error from
// Sender::send means the channel is closed and cannot be
// re-opened. So we bail on these errors too.
if let Err(error) = sender.send(msg).await {
tracing::error!(%error, "could not send message over local stream");
break;
}
}
tracing::warn!("the instance stream is closed or lagging, bailing");
});
receiver
}
}

#[cfg(test)]
Expand Down
52 changes: 0 additions & 52 deletions signer/src/network/libp2p/network.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! MessageTransfer implementation for the application signalling channel
//! together with LibP2P.
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::broadcast::Sender;

Expand All @@ -14,7 +13,6 @@ use crate::context::TerminationHandle;
use crate::error::Error;
use crate::network::MessageTransfer;
use crate::network::Msg;
use crate::SIGNER_CHANNEL_CAPACITY;

/// MessageTransfer interface for the application signalling channel.
pub struct P2PNetwork {
Expand Down Expand Up @@ -102,56 +100,6 @@ impl MessageTransfer for P2PNetwork {
}
}
}

fn as_receiver(&self) -> tokio::sync::mpsc::Receiver<Msg> {
// This is the same capacity of `signal_tx` in the SignerContext,
// which is typically used to create this P2PNetwork.
let (sender, receiver) = tokio::sync::mpsc::channel(SIGNER_CHANNEL_CAPACITY);
let mut rx = self.signal_rx.resubscribe();

tokio::spawn(async move {
loop {
// Let's drain the stream first and then wait for a messages
while let Ok(msg) = rx.try_recv() {
if let SignerSignal::Event(SignerEvent::P2P(P2PEvent::MessageReceived(msg))) =
msg
{
// Because there could only be one receiver, an error from
// Sender::send means the channel is closed and cannot be
// re-opened. So we bail on these errors too.
if sender.send(msg).await.is_err() {
tracing::debug!("could not send message, receivers dropped, bailing");
return;
}
}
}
match rx.recv().await {
Ok(SignerSignal::Event(SignerEvent::P2P(P2PEvent::MessageReceived(msg)))) => {
// Because there could only be one receiver, an error from
// Sender::send means the channel is closed and cannot be
// re-opened. So we bail on these errors too.
if sender.send(msg).await.is_err() {
tracing::debug!("could not send message, receivers dropped, bailing");
return;
}
}
// The receiver has been dropped. This is normal
// behavior so nothing to worry about
Err(RecvError::Closed) => {
tracing::warn!("the instance p2p stream is closed, this is bad, bailing");
return;
}
// If we are lagging in the stream, we could always
// catch up, we'll just have lost messages.
Err(error @ RecvError::Lagged(_)) => {
tracing::warn!(%error, "stream lagging behind")
}
_ => continue,
}
}
});
receiver
}
}

#[cfg(test)]
Expand Down
5 changes: 0 additions & 5 deletions signer/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::message;
#[cfg(any(test, feature = "testing"))]
pub use in_memory::InMemoryNetwork;
pub use libp2p::P2PNetwork;
use tokio::sync::mpsc::Receiver;

/// The supported message type of the signer network
pub type Msg = ecdsa::Signed<message::SignerMessage>;
Expand All @@ -35,10 +34,6 @@ pub trait MessageTransfer: Clone {
fn broadcast(&mut self, msg: Msg) -> impl Future<Output = Result<(), Error>> + Send;
/// Receive a message from the network
fn receive(&mut self) -> impl Future<Output = Result<Msg, Error>> + Send;
/// Return a stream of the same messages from the
/// [`MessageTransfer::receive`]. The inner type of each
/// [`SignerSignal`] should be a [`P2PEvent::MessageReceived`].
fn as_receiver(&self) -> Receiver<Msg>;
}

impl std::fmt::Display for Msg {
Expand Down
4 changes: 1 addition & 3 deletions signer/src/request_decider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ where
return Err(error);
};

let mut signal_stream = self
.context
.as_signal_stream(&self.network, run_loop_message_filter);
let mut signal_stream = self.context.as_signal_stream(run_loop_message_filter);

while let Some(message) = signal_stream.next().await {
match message {
Expand Down
18 changes: 13 additions & 5 deletions signer/src/transaction_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ fn run_loop_message_filter(signal: &SignerSignal) -> bool {
)
}

/// During DKG or message signing, we only need the following message
/// types, so we construct a stream with only these messages.
fn signed_message_filter(event: &SignerSignal) -> bool {
matches!(
event,
SignerSignal::Event(SignerEvent::TxSigner(TxSignerEvent::MessageGenerated(_)))
| SignerSignal::Event(SignerEvent::P2P(P2PEvent::MessageReceived(_)))
)
}

impl<C, N> TxCoordinatorEventLoop<C, N>
where
C: Context,
Expand All @@ -181,9 +191,7 @@ where
#[tracing::instrument(skip_all, name = "tx-coordinator")]
pub async fn run(mut self) -> Result<(), Error> {
tracing::info!("starting transaction coordinator event loop");
let mut signal_stream = self
.context
.as_signal_stream(&self.network, run_loop_message_filter);
let mut signal_stream = self.context.as_signal_stream(run_loop_message_filter);

loop {
match signal_stream.next().await {
Expand Down Expand Up @@ -686,7 +694,7 @@ where
let max_duration = self.signing_round_max_duration;
let signal_stream = self
.context
.as_signal_stream(&self.network, |_| true)
.as_signal_stream(signed_message_filter)
.filter_map(Self::to_signed_message);

tokio::pin!(signal_stream);
Expand Down Expand Up @@ -933,7 +941,7 @@ where

let signal_stream = self
.context
.as_signal_stream(&self.network, |_| true)
.as_signal_stream(signed_message_filter)
.filter_map(Self::to_signed_message);

tokio::pin!(signal_stream);
Expand Down
4 changes: 1 addition & 3 deletions signer/src/transaction_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ where
tracing::error!(%error, "error signalling event loop start");
return Err(error);
};
let mut signal_stream = self
.context
.as_signal_stream(&self.network, run_loop_message_filter);
let mut signal_stream = self.context.as_signal_stream(run_loop_message_filter);

loop {
match signal_stream.next().await {
Expand Down

0 comments on commit b1d0873

Please sign in to comment.