Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report unstable peers from TSS #1228

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/protocol/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async fn open_protocol_connections(
// Check the response as to whether they accepted our SubscribeMessage
let response_message = encrypted_connection.recv().await?;
let subscribe_response: Result<(), String> = bincode::deserialize(&response_message)?;

if let Err(error_message) = subscribe_response {
return Err(anyhow!(error_message));
}
Expand Down
1 change: 1 addition & 0 deletions crates/threshold-signature-server/src/helpers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub async fn do_dkg(
x25519_secret_key,
)
.await?;

let channels = {
let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Expand Down
2 changes: 2 additions & 0 deletions crates/threshold-signature-server/src/signing_client/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ async fn handle_socket_result(socket: WebSocket, app_state: AppState) {
if let Err(err) = handle_socket(socket, app_state).await {
tracing::warn!("Websocket connection closed unexpectedly {:?}", err);
// TODO here we should inform the chain that signing failed
//
// TODO (Nando): Report error up here?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer here is no since this would be bubbled up when we try and make a (failed) ws/ connection anyways

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't totally thought through but seems reasonable. The only errors we could catch here but not elsewhere are relating to incoming connections which do not relate to an existing listener. Eg: connections from random people or with a bad 'subscribe' message. I think we can just ignore these.

};
}

Expand Down
20 changes: 12 additions & 8 deletions crates/threshold-signature-server/src/signing_client/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use axum::{
};
use entropy_kvdb::kv_manager::error::InnerKvError;
use entropy_protocol::errors::ProtocolExecutionErr;
use subxt::utils::AccountId32;
use thiserror::Error;
use tokio::sync::oneshot::error::RecvError;

Expand All @@ -46,8 +47,8 @@ pub enum ProtocolErr {
Deserialization(String),
#[error("Oneshot timeout error: {0}")]
OneshotTimeout(#[from] RecvError),
#[error("Subscribe API error: {0}")]
Subscribe(#[from] SubscribeErr),
#[error("Subscribe API error: {source} by TSS Account `{account_id:?}`")]
Subscribe { source: SubscribeErr, account_id: AccountId32 },
#[error("reqwest error: {0}")]
Reqwest(#[from] reqwest::Error),
#[error("Utf8Error: {0:?}")]
Expand All @@ -70,18 +71,21 @@ pub enum ProtocolErr {
UserError(String),
#[error("Validation Error: {0}")]
ValidationErr(#[from] crate::validation::errors::ValidationErr),
#[error("Subscribe message rejected: {0}")]
BadSubscribeMessage(String),
#[error("Subscribe message rejected: {message} by TSS Account `{account_id:?}`")]
BadSubscribeMessage { message: String, account_id: AccountId32 },
#[error("From Hex Error: {0}")]
FromHex(#[from] hex::FromHexError),
#[error("Conversion Error: {0}")]
Conversion(&'static str),
#[error("Could not open ws connection: {0}")]
ConnectionError(#[from] tokio_tungstenite::tungstenite::Error),
#[error("Could not open ws connection: {source} with the TSS Account `{account_id:?}`")]
ConnectionError { source: tokio_tungstenite::tungstenite::Error, account_id: AccountId32 },
#[error("Timed out waiting for remote party")]
Timeout(#[from] tokio::time::error::Elapsed),
#[error("Encrypted connection error {0}")]
EncryptedConnection(String),
#[error("Encrypted connection error {source:?} with the TSS Account `{account_id:?}`")]
EncryptedConnection {
source: entropy_protocol::protocol_transport::errors::EncryptedConnectionErr,
account_id: AccountId32,
},
#[error("Program error: {0}")]
ProgramError(#[from] crate::user::errors::ProgramError),
#[error("Invalid length for converting address")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ pub async fn open_protocol_connections(
.map(|validator_info| async move {
// Open a ws connection
let ws_endpoint = format!("ws://{}/ws", validator_info.ip_address);
let (ws_stream, _response) = connect_async(ws_endpoint).await?;
let (ws_stream, _response) =
connect_async(ws_endpoint).await.map_err(|e| ProtocolErr::ConnectionError {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

// Send a SubscribeMessage in the payload of the final handshake message
let subscribe_message_vec =
Expand All @@ -75,32 +79,50 @@ pub async fn open_protocol_connections(
subscribe_message_vec,
)
.await
.map_err(|e| ProtocolErr::EncryptedConnection(e.to_string()))?;
.map_err(|e| ProtocolErr::EncryptedConnection {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

// Check the response as to whether they accepted our SubscribeMessage
let response_message = encrypted_connection
.recv()
.await
.map_err(|e| ProtocolErr::EncryptedConnection(e.to_string()))?;
let response_message = encrypted_connection.recv().await.map_err(|e| {
ProtocolErr::EncryptedConnection {
source: e,
account_id: validator_info.tss_account.clone(),
}
})?;

let subscribe_response: Result<(), String> = bincode::deserialize(&response_message)?;
if let Err(error_message) = subscribe_response {
// In future versions, we can check here if the error is VersionTooNew(version)
// and if possible the downgrade protocol messages used to be backward compatible
return Err(ProtocolErr::BadSubscribeMessage(error_message));
return Err(ProtocolErr::BadSubscribeMessage {
message: error_message,
account_id: validator_info.tss_account.clone(),
});
}

// Setup channels
let ws_channels = get_ws_channels(state, session_id, &validator_info.tss_account)?;
let ws_channels = get_ws_channels(state, session_id, &validator_info.tss_account)
.map_err(|e| ProtocolErr::Subscribe {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

let remote_party_id = PartyId::new(validator_info.tss_account.clone());
let account_id = validator_info.tss_account.clone();

// Handle protocol messages
tokio::spawn(async move {
if let Err(err) =
ws_to_channels(encrypted_connection, ws_channels, remote_party_id).await
{
tracing::warn!("{:?}", err);
};
ws_to_channels(encrypted_connection, ws_channels, remote_party_id).await.map_err(
|err| {
tracing::warn!("{:?}", err);
Err::<(), ProtocolErr>(ProtocolErr::EncryptedConnection {
source: err.into(),
account_id,
})
},
)
});

Ok::<_, ProtocolErr>(())
Expand Down
40 changes: 34 additions & 6 deletions crates/threshold-signature-server/src/user/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use x25519_dalek::StaticSecret;

use super::UserErr;
use crate::chain_api::entropy::runtime_types::pallet_registry::pallet::RegisteredInfo;
use crate::signing_client::ProtocolErr;
use crate::{
chain_api::{entropy, get_api, get_rpc, EntropyConfig},
helpers::{
Expand Down Expand Up @@ -351,14 +352,41 @@ pub async fn sign_tx(
request_limit,
derivation_path,
)
.await
.map(|signature| {
(
.await;

let signing_protocol_output = match signing_protocol_output {
Ok(signature) => Ok((
BASE64_STANDARD.encode(signature.to_rsv_bytes()),
signer.signer().sign(&signature.to_rsv_bytes()),
)
})
.map_err(|error| error.to_string());
)),
Err(e)
if matches!(
e,
ProtocolErr::ConnectionError { .. }
| ProtocolErr::EncryptedConnection { .. }
| ProtocolErr::BadSubscribeMessage { .. }
| ProtocolErr::Subscribe { .. }
) =>
{
let account_id = match e {
ProtocolErr::ConnectionError { ref account_id, .. } => account_id,
ProtocolErr::EncryptedConnection { ref account_id, .. } => account_id,
ProtocolErr::BadSubscribeMessage { ref account_id, .. } => account_id,
ProtocolErr::Subscribe { ref account_id, .. } => account_id,
_ => unreachable!(),
}
.clone();

let report_unstable_peer_tx =
entropy::tx().staking_extension().report_unstable_peer(account_id);
submit_transaction(&api, &rpc, &signer, &report_unstable_peer_tx, None)
.await
.expect("TODO");

Err(e.to_string())
},
Err(e) => Err(e.to_string()),
};

// This response chunk is sent later with the result of the signing protocol
if response_tx.try_send(serde_json::to_string(&signing_protocol_output)).is_err() {
Expand Down
Loading