diff --git a/crates/protocol/src/execute_protocol.rs b/crates/protocol/src/execute_protocol.rs index f6042d21e..c484002b9 100644 --- a/crates/protocol/src/execute_protocol.rs +++ b/crates/protocol/src/execute_protocol.rs @@ -73,7 +73,8 @@ pub async fn execute_protocol_generic( mut chans: Channels, session: Session, session_id_hash: [u8; 32], -) -> Result<(Res::Success, mpsc::Receiver), GenericProtocolError> { +) -> Result<(Res::Success, Broadcaster, mpsc::Receiver), GenericProtocolError> +{ let session_id = synedrion::SessionId::from_seed(&session_id_hash); let tx = &chans.0; let rx = &mut chans.1; @@ -143,7 +144,7 @@ pub async fn execute_protocol_generic( } match session.finalize_round(&mut OsRng, accum)? { - FinalizeOutcome::Success(res) => break Ok((res, chans.1)), + FinalizeOutcome::Success(res) => break Ok((res, chans.0, chans.1)), FinalizeOutcome::AnotherRound { session: new_session, cached_messages: new_cached_messages, @@ -208,7 +209,6 @@ pub async fn execute_dkg( threshold: usize, ) -> Result { tracing::debug!("Executing DKG"); - let broadcaster = chans.0.clone(); let party_ids: BTreeSet = threshold_accounts.iter().cloned().map(PartyId::new).collect(); @@ -231,7 +231,8 @@ pub async fn execute_dkg( ) .map_err(ProtocolExecutionErr::SessionCreation)?; - let (init_keyshare, rx) = execute_protocol_generic(chans, session, session_id_hash).await?; + let (init_keyshare, broadcaster, rx) = + execute_protocol_generic(chans, session, session_id_hash).await?; tracing::info!("Finished key init protocol"); // Setup channels for the next session @@ -259,6 +260,7 @@ pub async fn execute_dkg( } else { // Wait to receive verifying_key let mut rx = chans.1; + let broadcaster = chans.0; let message = rx.recv().await.ok_or_else(|| { ProtocolExecutionErr::IncomingStream("Waiting for validating key".to_string()) })?; @@ -302,7 +304,7 @@ pub async fn execute_dkg( inputs, ) .map_err(ProtocolExecutionErr::SessionCreation)?; - let (new_key_share_option, rx) = + let (new_key_share_option, broadcaster, rx) = execute_protocol_generic(chans, session, session_id_hash).await?; let new_key_share = new_key_share_option.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)?; @@ -337,29 +339,22 @@ pub async fn execute_proactive_refresh( chans: Channels, threshold_pair: &sr25519::Pair, threshold_accounts: Vec, - old_key: ThresholdKeyShare, -) -> Result, ProtocolExecutionErr> { + verifying_key: VerifyingKey, + threshold: usize, + inputs: KeyResharingInputs, +) -> Result< + (ThresholdKeyShare, Broadcaster, mpsc::Receiver), + ProtocolExecutionErr, +> { tracing::debug!("Executing proactive refresh"); tracing::debug!("Signing with {:?}", &threshold_pair.public()); let party_ids: BTreeSet = threshold_accounts.iter().cloned().map(PartyId::new).collect(); let pair = PairWrapper(threshold_pair.clone()); - let verifying_key = old_key.verifying_key(); - - let threshold = old_key.threshold(); let session_id_hash = session_id.blake2(None)?; - let inputs = KeyResharingInputs { - old_holder: Some(OldHolder { key_share: old_key }), - new_holder: Some(NewHolder { - verifying_key, - old_threshold: party_ids.len(), - old_holders: party_ids.clone(), - }), - new_holders: party_ids.clone(), - new_threshold: threshold, - }; + let session = make_key_resharing_session( &mut OsRng, SynedrionSessionId::from_seed(session_id_hash.as_slice()), @@ -369,9 +364,10 @@ pub async fn execute_proactive_refresh( ) .map_err(ProtocolExecutionErr::SessionCreation)?; - let new_key_share = execute_protocol_generic(chans, session, session_id_hash).await?.0; + let (new_key_share, brodcaster, rx) = + execute_protocol_generic(chans, session, session_id_hash).await?; - new_key_share.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol) + Ok((new_key_share.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)?, brodcaster, rx)) } /// Psuedo-randomly select a subset of the parties of size `threshold` diff --git a/crates/threshold-signature-server/src/signing_client/api.rs b/crates/threshold-signature-server/src/signing_client/api.rs index bbea6738f..73487ee5e 100644 --- a/crates/threshold-signature-server/src/signing_client/api.rs +++ b/crates/threshold-signature-server/src/signing_client/api.rs @@ -13,8 +13,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::time::Duration; - use axum::{ body::Bytes, extract::{ @@ -27,9 +25,12 @@ use axum::{ use blake2::{Blake2s256, Digest}; use entropy_protocol::{ execute_protocol::{execute_proactive_refresh, Channels}, - KeyParams, KeyShareWithAuxInfo, Listener, PartyId, SessionId, ValidatorInfo, + protocol_transport::Broadcaster, + KeyParams, KeyShareWithAuxInfo, Listener, PartyId, ProtocolMessage, SessionId, ValidatorInfo, }; use parity_scale_codec::Encode; +use std::{collections::BTreeSet, time::Duration}; +use tokio::sync::mpsc; use entropy_kvdb::kv_manager::{ helpers::{deserialize, serialize as key_serialize}, @@ -45,7 +46,7 @@ use subxt::{ utils::{AccountId32 as SubxtAccountId32, Static}, OnlineClient, }; -use synedrion::{AuxInfo, ThresholdKeyShare}; +use synedrion::{AuxInfo, KeyResharingInputs, NewHolder, OldHolder, ThresholdKeyShare}; use tokio::time::timeout; use x25519_dalek::StaticSecret; @@ -102,7 +103,7 @@ pub async fn proactive_refresh( ) = deserialize(&old_key_share) .ok_or_else(|| ProtocolErr::Deserialization("Failed to load KeyShare".into()))?; - let new_key_share = do_proactive_refresh( + let (new_key_share, _broadcaster, _rx) = do_proactive_refresh( &ocw_data.validators_info, &signer, &x25519_secret_key, @@ -165,7 +166,10 @@ pub async fn do_proactive_refresh( verifying_key: Vec, old_key: ThresholdKeyShare, block_number: u32, -) -> Result, ProtocolErr> { +) -> Result< + (ThresholdKeyShare, Broadcaster, mpsc::Receiver), + ProtocolErr, +> { tracing::debug!("Preparing to perform proactive refresh"); tracing::debug!("Signing with {:?}", &signer.signer().public()); @@ -189,31 +193,39 @@ pub async fn do_proactive_refresh( tss_accounts.push(tss_account); } - // subscribe to all other participating parties. Listener waits for other subscribers. - let (rx_ready, rx_from_others, listener) = - Listener::new(converted_validator_info.clone(), &account_id); - state - .listeners - .lock() - .map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))? - .insert(session_id.clone(), listener); + let party_ids: BTreeSet = tss_accounts.iter().cloned().map(PartyId::new).collect(); - open_protocol_connections( - &converted_validator_info, - &session_id, - signer.signer(), + let inputs = KeyResharingInputs { + old_holder: Some(OldHolder { key_share: old_key.clone() }), + new_holder: Some(NewHolder { + verifying_key: old_key.verifying_key(), + old_threshold: party_ids.len(), + old_holders: party_ids.clone(), + }), + new_holders: party_ids.clone(), + new_threshold: old_key.threshold(), + }; + + let channels = get_channels( state, + converted_validator_info, + account_id, + &session_id, + signer, x25519_secret_key, ) .await?; - let channels = { - let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?; - let broadcast_out = ready??; - Channels(broadcast_out, rx_from_others) - }; - let result = - execute_proactive_refresh(session_id, channels, signer.signer(), tss_accounts, old_key) - .await?; + + let result = execute_proactive_refresh( + session_id, + channels, + signer.signer(), + tss_accounts, + old_key.verifying_key(), + old_key.threshold(), + inputs, + ) + .await?; Ok(result) } @@ -273,3 +285,34 @@ pub async fn validate_proactive_refresh( kv_manager.kv().put(reservation, latest_block_number.to_be_bytes().to_vec()).await?; Ok(()) } + +pub async fn get_channels( + state: &ListenerState, + converted_validator_info: Vec, + account_id: SubxtAccountId32, + session_id: &SessionId, + signer: &PairSigner, + x25519_secret_key: &StaticSecret, +) -> Result { + // subscribe to all other participating parties. Listener waits for other subscribers. + let (rx_ready, rx_from_others, listener) = + Listener::new(converted_validator_info.clone(), &account_id); + state + .listeners + .lock() + .map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))? + .insert(session_id.clone(), listener); + + open_protocol_connections( + &converted_validator_info, + session_id, + signer.signer(), + state, + x25519_secret_key, + ) + .await?; + + let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?; + let broadcast_out = ready??; + Ok(Channels(broadcast_out, rx_from_others)) +} diff --git a/crates/threshold-signature-server/src/validator/api.rs b/crates/threshold-signature-server/src/validator/api.rs index 7632ed87e..bc6e28de0 100644 --- a/crates/threshold-signature-server/src/validator/api.rs +++ b/crates/threshold-signature-server/src/validator/api.rs @@ -23,7 +23,9 @@ use crate::{ launch::{FORBIDDEN_KEYS, LATEST_BLOCK_NUMBER_RESHARE}, substrate::{get_stash_address, get_validators_info, query_chain, submit_transaction}, }, - signing_client::{protocol_transport::open_protocol_connections, ProtocolErr}, + signing_client::{ + api::get_channels, protocol_transport::open_protocol_connections, ProtocolErr, + }, validator::errors::ValidatorErr, AppState, }; @@ -33,7 +35,9 @@ use entropy_protocol::Subsession; pub use entropy_protocol::{ decode_verifying_key, errors::ProtocolExecutionErr, - execute_protocol::{execute_protocol_generic, Channels, PairWrapper}, + execute_protocol::{ + execute_proactive_refresh, execute_protocol_generic, Channels, PairWrapper, + }, KeyParams, KeyShareWithAuxInfo, Listener, PartyId, SessionId, ValidatorInfo, }; use entropy_shared::{OcwMessageReshare, NETWORK_PARENT_KEY, SETUP_TIMEOUT_SECONDS}; @@ -62,7 +66,7 @@ pub async fn new_reshare( encoded_data: Bytes, ) -> Result { let data = OcwMessageReshare::decode(&mut encoded_data.as_ref())?; - // TODO: validate message came from chain (check reshare block # against current block number) see #941 + let api = get_api(&app_state.configuration.endpoint).await?; let rpc = get_rpc(&app_state.configuration.endpoint).await?; validate_new_reshare(&api, &rpc, &data, &app_state.kv_store).await?; @@ -151,8 +155,6 @@ pub async fn new_reshare( let session_id = SessionId::Reshare { verifying_key, block_number: data.block_number }; let account_id = AccountId32(signer.signer().public().0); - let session_id_hash = session_id.blake2(Some(Subsession::Reshare))?; - let pair = PairWrapper(signer.signer().clone()); let mut converted_validator_info = vec![]; let mut tss_accounts = vec![]; @@ -166,54 +168,36 @@ pub async fn new_reshare( tss_accounts.push(validator_info.tss_account.clone()); } - let (rx_ready, rx_from_others, listener) = - Listener::new(converted_validator_info.clone(), &account_id); - app_state - .listener_state - .listeners - .lock() - .map_err(|_| ValidatorErr::SessionError("Error getting lock".to_string()))? - .insert(session_id.clone(), listener); - - open_protocol_connections( - &converted_validator_info, - &session_id, - signer.signer(), + let channels = get_channels( &app_state.listener_state, + converted_validator_info, + account_id, + &session_id, + &signer, &x25519_secret_key, ) .await?; - let (channels, broadcaster) = { - let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?; - let broadcast_out = ready??; - (Channels(broadcast_out.clone(), rx_from_others), broadcast_out) - }; - - let session = make_key_resharing_session( - &mut OsRng, - SynedrionSessionId::from_seed(session_id_hash.as_slice()), - pair.clone(), - &party_ids, + let (new_key_share, brodcaster, rx) = execute_proactive_refresh( + session_id.clone(), + channels, + signer.signer(), + tss_accounts, + decoded_verifying_key, + threshold as usize, inputs, ) - .map_err(ProtocolExecutionErr::SessionCreation)?; - - let (new_key_share_option, rx) = execute_protocol_generic(channels, session, session_id_hash) - .await - .map_err(|_| ValidatorErr::ProtocolError("Error executing protocol".to_string()))?; - - let new_key_share = new_key_share_option.ok_or(ValidatorErr::NoOutputFromReshareProtocol)?; + .await?; - // Setup channels for the next session - let channels = Channels(broadcaster, rx); + // // Setup channels for the next session + let channels = Channels(brodcaster, rx); - // Now run an aux gen session + // // Now run an aux gen session let session_id_hash = session_id.blake2(Some(Subsession::AuxGen))?; let session = make_aux_gen_session( &mut OsRng, SynedrionSessionId::from_seed(session_id_hash.as_slice()), - pair, + PairWrapper(signer.signer().clone()), &party_ids, ) .map_err(ProtocolExecutionErr::SessionCreation)?;