diff --git a/crates/protocol/src/execute_protocol.rs b/crates/protocol/src/execute_protocol.rs index f6042d21e..c80af4574 100644 --- a/crates/protocol/src/execute_protocol.rs +++ b/crates/protocol/src/execute_protocol.rs @@ -73,7 +73,7 @@ pub async fn execute_protocol_generic( mut chans: Channels, session: Session, session_id_hash: [u8; 32], -) -> Result<(Res::Success, mpsc::Receiver), GenericProtocolError> { +) -> Result<(Res::Success, Channels), GenericProtocolError> { let session_id = synedrion::SessionId::from_seed(&session_id_hash); let tx = &chans.0; let rx = &mut chans.1; @@ -143,7 +143,7 @@ pub async fn execute_protocol_generic( } match session.finalize_round(&mut OsRng, accum)? { - FinalizeOutcome::Success(res) => break Ok((res, chans.1)), + FinalizeOutcome::Success(res) => break Ok((res, chans)), FinalizeOutcome::AnotherRound { session: new_session, cached_messages: new_cached_messages, @@ -208,7 +208,6 @@ pub async fn execute_dkg( threshold: usize, ) -> Result { tracing::debug!("Executing DKG"); - let broadcaster = chans.0.clone(); let party_ids: BTreeSet = threshold_accounts.iter().cloned().map(PartyId::new).collect(); @@ -231,11 +230,10 @@ pub async fn execute_dkg( ) .map_err(ProtocolExecutionErr::SessionCreation)?; - let (init_keyshare, rx) = execute_protocol_generic(chans, session, session_id_hash).await?; + let (init_keyshare, chans) = + execute_protocol_generic(chans, session, session_id_hash).await?; tracing::info!("Finished key init protocol"); - // Setup channels for the next session - let chans = Channels(broadcaster.clone(), rx); // Send verifying key let verifying_key = init_keyshare.verifying_key(); @@ -259,6 +257,7 @@ pub async fn execute_dkg( } else { // Wait to receive verifying_key let mut rx = chans.1; + let broadcaster = chans.0; let message = rx.recv().await.ok_or_else(|| { ProtocolExecutionErr::IncomingStream("Waiting for validating key".to_string()) })?; @@ -302,15 +301,12 @@ pub async fn execute_dkg( inputs, ) .map_err(ProtocolExecutionErr::SessionCreation)?; - let (new_key_share_option, rx) = + let (new_key_share_option, chans) = execute_protocol_generic(chans, session, session_id_hash).await?; let new_key_share = new_key_share_option.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)?; tracing::info!("Finished reshare protocol"); - // Setup channels for the next session - let chans = Channels(broadcaster.clone(), rx); - // Now run the aux gen protocol to get AuxInfo let session_id_hash = session_id.blake2(Some(Subsession::AuxGen))?; let session = make_aux_gen_session( @@ -327,51 +323,56 @@ pub async fn execute_dkg( } /// Execute proactive refresh. +#[allow(clippy::type_complexity)] #[tracing::instrument( skip_all, fields(threshold_accounts, my_idx), level = tracing::Level::DEBUG )] -pub async fn execute_proactive_refresh( +pub async fn execute_reshare( session_id: SessionId, chans: Channels, threshold_pair: &sr25519::Pair, - threshold_accounts: Vec, - old_key: ThresholdKeyShare, -) -> Result, ProtocolExecutionErr> { + inputs: KeyResharingInputs, + aux_info_option: Option>, +) -> Result< + (ThresholdKeyShare, AuxInfo), + ProtocolExecutionErr, +> { tracing::debug!("Executing proactive refresh"); tracing::debug!("Signing with {:?}", &threshold_pair.public()); - let party_ids: BTreeSet = - threshold_accounts.iter().cloned().map(PartyId::new).collect(); let pair = PairWrapper(threshold_pair.clone()); - let verifying_key = old_key.verifying_key(); - - let threshold = old_key.threshold(); let session_id_hash = session_id.blake2(None)?; - let inputs = KeyResharingInputs { - old_holder: Some(OldHolder { key_share: old_key }), - new_holder: Some(NewHolder { - verifying_key, - old_threshold: party_ids.len(), - old_holders: party_ids.clone(), - }), - new_holders: party_ids.clone(), - new_threshold: threshold, - }; + let session = make_key_resharing_session( &mut OsRng, SynedrionSessionId::from_seed(session_id_hash.as_slice()), pair, - &party_ids, - inputs, + &inputs.new_holders, + inputs.clone(), ) .map_err(ProtocolExecutionErr::SessionCreation)?; - let new_key_share = execute_protocol_generic(chans, session, session_id_hash).await?.0; + let (new_key_share, chans) = execute_protocol_generic(chans, session, session_id_hash).await?; + let aux_info = if let Some(aux_info) = aux_info_option { + aux_info + } else { + // Now run an aux gen session + let session_id_hash_aux_data = session_id.blake2(Some(Subsession::AuxGen))?; + let session = make_aux_gen_session( + &mut OsRng, + SynedrionSessionId::from_seed(session_id_hash_aux_data.as_slice()), + PairWrapper(threshold_pair.clone()), + &inputs.new_holders, + ) + .map_err(ProtocolExecutionErr::SessionCreation)?; + + execute_protocol_generic(chans, session, session_id_hash_aux_data).await?.0 + }; - new_key_share.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol) + Ok((new_key_share.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)?, aux_info)) } /// Psuedo-randomly select a subset of the parties of size `threshold` diff --git a/crates/protocol/tests/helpers/mod.rs b/crates/protocol/tests/helpers/mod.rs index 627d1c912..a1a0f60e9 100644 --- a/crates/protocol/tests/helpers/mod.rs +++ b/crates/protocol/tests/helpers/mod.rs @@ -16,9 +16,7 @@ //! A simple protocol server, like a mini version of entropy-tss, for benchmarking use anyhow::{anyhow, ensure}; use entropy_protocol::{ - execute_protocol::{ - execute_dkg, execute_proactive_refresh, execute_signing_protocol, Channels, - }, + execute_protocol::{execute_dkg, execute_reshare, execute_signing_protocol, Channels}, protocol_transport::{ errors::WsError, noise::{noise_handshake_initiator, noise_handshake_responder}, @@ -31,12 +29,13 @@ use entropy_shared::X25519PublicKey; use futures::future; use sp_core::{sr25519, Pair}; use std::{ + collections::BTreeSet, fmt, sync::{Arc, Mutex}, time::Duration, }; use subxt::utils::AccountId32; -use synedrion::{AuxInfo, KeyShare, ThresholdKeyShare}; +use synedrion::{AuxInfo, KeyResharingInputs, KeyShare, NewHolder, OldHolder, ThresholdKeyShare}; use tokio::{ net::{TcpListener, TcpStream}, time::timeout, @@ -131,15 +130,22 @@ pub async fn server( Ok(ProtocolOutput::Sign(RecoverableSignature { signature, recovery_id })) }, SessionId::Reshare { .. } => { - let new_keyshare = execute_proactive_refresh( - session_id, - channels, - &pair, - tss_accounts, - threshold_keyshare.unwrap(), - ) - .await?; - Ok(ProtocolOutput::Reshare(new_keyshare)) + let old_key = threshold_keyshare.unwrap(); + let party_ids: BTreeSet = + tss_accounts.iter().cloned().map(PartyId::new).collect(); + let inputs = KeyResharingInputs { + old_holder: Some(OldHolder { key_share: old_key.clone() }), + new_holder: Some(NewHolder { + verifying_key: old_key.verifying_key(), + old_threshold: party_ids.len(), + old_holders: party_ids.clone(), + }), + new_holders: party_ids.clone(), + new_threshold: old_key.threshold(), + }; + + let new_keyshare = execute_reshare(session_id, channels, &pair, inputs, None).await?; + Ok(ProtocolOutput::Reshare(new_keyshare.0)) }, SessionId::Dkg { .. } => { let keyshare_and_aux_info = diff --git a/crates/threshold-signature-server/src/signing_client/api.rs b/crates/threshold-signature-server/src/signing_client/api.rs index bbea6738f..973499eec 100644 --- a/crates/threshold-signature-server/src/signing_client/api.rs +++ b/crates/threshold-signature-server/src/signing_client/api.rs @@ -13,8 +13,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::time::Duration; - use axum::{ body::Bytes, extract::{ @@ -26,10 +24,11 @@ use axum::{ }; use blake2::{Blake2s256, Digest}; use entropy_protocol::{ - execute_protocol::{execute_proactive_refresh, Channels}, - KeyParams, KeyShareWithAuxInfo, Listener, PartyId, SessionId, ValidatorInfo, + execute_protocol::{execute_reshare, Channels}, + KeyParams, Listener, PartyId, SessionId, ValidatorInfo, }; use parity_scale_codec::Encode; +use std::{collections::BTreeSet, time::Duration}; use entropy_kvdb::kv_manager::{ helpers::{deserialize, serialize as key_serialize}, @@ -45,7 +44,7 @@ use subxt::{ utils::{AccountId32 as SubxtAccountId32, Static}, OnlineClient, }; -use synedrion::{AuxInfo, ThresholdKeyShare}; +use synedrion::{AuxInfo, KeyResharingInputs, NewHolder, OldHolder, ThresholdKeyShare}; use tokio::time::timeout; use x25519_dalek::StaticSecret; @@ -96,13 +95,13 @@ pub async fn proactive_refresh( let exists_result = app_state.kv_store.kv().exists(&key).await?; if exists_result { let old_key_share = app_state.kv_store.kv().get(&key).await?; - let (deserialized_old_key, _aux_info): ( + let (deserialized_old_key, aux_info): ( ThresholdKeyShare, AuxInfo, ) = deserialize(&old_key_share) .ok_or_else(|| ProtocolErr::Deserialization("Failed to load KeyShare".into()))?; - let new_key_share = do_proactive_refresh( + let (new_key_share, aux_info) = do_proactive_refresh( &ocw_data.validators_info, &signer, &x25519_secret_key, @@ -110,19 +109,10 @@ pub async fn proactive_refresh( encoded_key, deserialized_old_key, ocw_data.block_number, + aux_info, ) .await?; - // Get aux info from existing entry - let aux_info = { - let existing_entry = app_state.kv_store.kv().get(&key).await?; - let (_old_key_share, aux_info): KeyShareWithAuxInfo = deserialize(&existing_entry) - .ok_or_else(|| { - ProtocolErr::Deserialization("Failed to load KeyShare".into()) - })?; - aux_info - }; - // Since this is a refresh with the parties not changing, store the old aux_info let serialized_key_share = key_serialize(&(new_key_share, aux_info)) .map_err(|_| ProtocolErr::KvSerialize("Kv Serialize Error".to_string()))?; @@ -152,6 +142,7 @@ async fn handle_socket_result(socket: WebSocket, app_state: AppState) { }; } +#[allow(clippy::type_complexity, clippy::too_many_arguments)] #[tracing::instrument( skip_all, fields(validators_info, verifying_key, my_subgroup), @@ -165,7 +156,8 @@ pub async fn do_proactive_refresh( verifying_key: Vec, old_key: ThresholdKeyShare, block_number: u32, -) -> Result, ProtocolErr> { + aux_info: AuxInfo, +) -> Result<(ThresholdKeyShare, AuxInfo), ProtocolErr> { tracing::debug!("Preparing to perform proactive refresh"); tracing::debug!("Signing with {:?}", &signer.signer().public()); @@ -189,31 +181,31 @@ pub async fn do_proactive_refresh( tss_accounts.push(tss_account); } - // subscribe to all other participating parties. Listener waits for other subscribers. - let (rx_ready, rx_from_others, listener) = - Listener::new(converted_validator_info.clone(), &account_id); - state - .listeners - .lock() - .map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))? - .insert(session_id.clone(), listener); + let party_ids: BTreeSet = tss_accounts.iter().cloned().map(PartyId::new).collect(); - open_protocol_connections( - &converted_validator_info, - &session_id, - signer.signer(), + let inputs = KeyResharingInputs { + old_holder: Some(OldHolder { key_share: old_key.clone() }), + new_holder: Some(NewHolder { + verifying_key: old_key.verifying_key(), + old_threshold: party_ids.len(), + old_holders: party_ids.clone(), + }), + new_holders: party_ids.clone(), + new_threshold: old_key.threshold(), + }; + + let channels = get_channels( state, + converted_validator_info, + account_id, + &session_id, + signer, x25519_secret_key, ) .await?; - let channels = { - let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?; - let broadcast_out = ready??; - Channels(broadcast_out, rx_from_others) - }; + let result = - execute_proactive_refresh(session_id, channels, signer.signer(), tss_accounts, old_key) - .await?; + execute_reshare(session_id, channels, signer.signer(), inputs, Some(aux_info)).await?; Ok(result) } @@ -273,3 +265,34 @@ pub async fn validate_proactive_refresh( kv_manager.kv().put(reservation, latest_block_number.to_be_bytes().to_vec()).await?; Ok(()) } + +pub async fn get_channels( + state: &ListenerState, + converted_validator_info: Vec, + account_id: SubxtAccountId32, + session_id: &SessionId, + signer: &PairSigner, + x25519_secret_key: &StaticSecret, +) -> Result { + // subscribe to all other participating parties. Listener waits for other subscribers. + let (rx_ready, rx_from_others, listener) = + Listener::new(converted_validator_info.clone(), &account_id); + state + .listeners + .lock() + .map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))? + .insert(session_id.clone(), listener); + + open_protocol_connections( + &converted_validator_info, + session_id, + signer.signer(), + state, + x25519_secret_key, + ) + .await?; + + let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?; + let broadcast_out = ready??; + Ok(Channels(broadcast_out, rx_from_others)) +} diff --git a/crates/threshold-signature-server/src/user/tests.rs b/crates/threshold-signature-server/src/user/tests.rs index 71c2fa434..731a0558b 100644 --- a/crates/threshold-signature-server/src/user/tests.rs +++ b/crates/threshold-signature-server/src/user/tests.rs @@ -633,7 +633,7 @@ async fn test_store_share() { let mut new_verifying_key = vec![]; // wait for registered event check that key exists in kvdb - for _ in 0..45 { + for _ in 0..65 { std::thread::sleep(std::time::Duration::from_millis(1000)); let block_hash = rpc.chain_get_block_hash(None).await.unwrap(); let events = EventsClient::new(api.clone()).at(block_hash.unwrap()).await.unwrap(); diff --git a/crates/threshold-signature-server/src/validator/api.rs b/crates/threshold-signature-server/src/validator/api.rs index de912a277..810996711 100644 --- a/crates/threshold-signature-server/src/validator/api.rs +++ b/crates/threshold-signature-server/src/validator/api.rs @@ -23,33 +23,27 @@ use crate::{ launch::{FORBIDDEN_KEYS, LATEST_BLOCK_NUMBER_RESHARE}, substrate::{get_stash_address, get_validators_info, query_chain, submit_transaction}, }, - signing_client::{protocol_transport::open_protocol_connections, ProtocolErr}, + signing_client::{api::get_channels, ProtocolErr}, validator::errors::ValidatorErr, AppState, }; use axum::{body::Bytes, extract::State, http::StatusCode}; use entropy_kvdb::kv_manager::{helpers::serialize as key_serialize, KvManager}; -use entropy_protocol::Subsession; pub use entropy_protocol::{ decode_verifying_key, errors::ProtocolExecutionErr, - execute_protocol::{execute_protocol_generic, Channels, PairWrapper}, + execute_protocol::{execute_protocol_generic, execute_reshare, Channels, PairWrapper}, KeyParams, KeyShareWithAuxInfo, Listener, PartyId, SessionId, ValidatorInfo, }; -use entropy_shared::{OcwMessageReshare, NETWORK_PARENT_KEY, SETUP_TIMEOUT_SECONDS}; +use entropy_shared::{OcwMessageReshare, NETWORK_PARENT_KEY}; use parity_scale_codec::{Decode, Encode}; -use rand_core::OsRng; use sp_core::Pair; -use std::{collections::BTreeSet, str::FromStr, time::Duration}; +use std::{collections::BTreeSet, str::FromStr}; use subxt::{ backend::legacy::LegacyRpcMethods, ext::sp_core::sr25519, tx::PairSigner, utils::AccountId32, OnlineClient, }; -use synedrion::{ - make_aux_gen_session, make_key_resharing_session, sessions::SessionId as SynedrionSessionId, - AuxInfo, KeyResharingInputs, NewHolder, OldHolder, -}; -use tokio::time::timeout; +use synedrion::{KeyResharingInputs, NewHolder, OldHolder}; /// HTTP POST endpoint called by the off-chain worker (propagation pallet) during network reshare. /// @@ -62,7 +56,7 @@ pub async fn new_reshare( encoded_data: Bytes, ) -> Result { let data = OcwMessageReshare::decode(&mut encoded_data.as_ref())?; - // TODO: validate message came from chain (check reshare block # against current block number) see #941 + let api = get_api(&app_state.configuration.endpoint).await?; let rpc = get_rpc(&app_state.configuration.endpoint).await?; validate_new_reshare(&api, &rpc, &data, &app_state.kv_store).await?; @@ -154,8 +148,6 @@ pub async fn new_reshare( let session_id = SessionId::Reshare { verifying_key, block_number: data.block_number }; let account_id = AccountId32(signer.signer().public().0); - let session_id_hash = session_id.blake2(Some(Subsession::Reshare))?; - let pair = PairWrapper(signer.signer().clone()); let mut converted_validator_info = vec![]; let mut tss_accounts = vec![]; @@ -169,63 +161,18 @@ pub async fn new_reshare( tss_accounts.push(validator_info.tss_account.clone()); } - let (rx_ready, rx_from_others, listener) = - Listener::new(converted_validator_info.clone(), &account_id); - app_state - .listener_state - .listeners - .lock() - .map_err(|_| ValidatorErr::SessionError("Error getting lock".to_string()))? - .insert(session_id.clone(), listener); - - open_protocol_connections( - &converted_validator_info, - &session_id, - signer.signer(), + let channels = get_channels( &app_state.listener_state, + converted_validator_info, + account_id, + &session_id, + &signer, &x25519_secret_key, ) .await?; - let (channels, broadcaster) = { - let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?; - let broadcast_out = ready??; - (Channels(broadcast_out.clone(), rx_from_others), broadcast_out) - }; - - let session = make_key_resharing_session( - &mut OsRng, - SynedrionSessionId::from_seed(session_id_hash.as_slice()), - pair.clone(), - &party_ids, - inputs, - ) - .map_err(ProtocolExecutionErr::SessionCreation)?; - - let (new_key_share_option, rx) = execute_protocol_generic(channels, session, session_id_hash) - .await - .map_err(|_| ValidatorErr::ProtocolError("Error executing protocol".to_string()))?; - - let new_key_share = new_key_share_option.ok_or(ValidatorErr::NoOutputFromReshareProtocol)?; - - // Setup channels for the next session - let channels = Channels(broadcaster, rx); - - // Now run an aux gen session - let session_id_hash = session_id.blake2(Some(Subsession::AuxGen))?; - let session = make_aux_gen_session( - &mut OsRng, - SynedrionSessionId::from_seed(session_id_hash.as_slice()), - pair, - &party_ids, - ) - .map_err(ProtocolExecutionErr::SessionCreation)?; - - let aux_info: AuxInfo = - execute_protocol_generic(channels, session, session_id_hash) - .await - .map_err(|_| ValidatorErr::ProtocolError("Error executing protocol".to_string()))? - .0; + let (new_key_share, aux_info) = + execute_reshare(session_id.clone(), channels, signer.signer(), inputs, None).await?; let serialized_key_share = key_serialize(&(new_key_share, aux_info)) .map_err(|_| ProtocolErr::KvSerialize("Kv Serialize Error".to_string()))?;