Skip to content

Commit

Permalink
Refactor reshare
Browse files Browse the repository at this point in the history
  • Loading branch information
JesseAbram committed Aug 8, 2024
1 parent 9dac61d commit 4becd43
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 88 deletions.
40 changes: 18 additions & 22 deletions crates/protocol/src/execute_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ pub async fn execute_protocol_generic<Res: synedrion::ProtocolResult>(
mut chans: Channels,
session: Session<Res, sr25519::Signature, PairWrapper, PartyId>,
session_id_hash: [u8; 32],
) -> Result<(Res::Success, mpsc::Receiver<ProtocolMessage>), GenericProtocolError<Res>> {
) -> Result<(Res::Success, Broadcaster, mpsc::Receiver<ProtocolMessage>), GenericProtocolError<Res>>
{
let session_id = synedrion::SessionId::from_seed(&session_id_hash);
let tx = &chans.0;
let rx = &mut chans.1;
Expand Down Expand Up @@ -143,7 +144,7 @@ pub async fn execute_protocol_generic<Res: synedrion::ProtocolResult>(
}

match session.finalize_round(&mut OsRng, accum)? {
FinalizeOutcome::Success(res) => break Ok((res, chans.1)),
FinalizeOutcome::Success(res) => break Ok((res, chans.0, chans.1)),
FinalizeOutcome::AnotherRound {
session: new_session,
cached_messages: new_cached_messages,
Expand Down Expand Up @@ -208,7 +209,6 @@ pub async fn execute_dkg(
threshold: usize,
) -> Result<KeyShareWithAuxInfo, ProtocolExecutionErr> {
tracing::debug!("Executing DKG");
let broadcaster = chans.0.clone();

let party_ids: BTreeSet<PartyId> =
threshold_accounts.iter().cloned().map(PartyId::new).collect();
Expand All @@ -231,7 +231,8 @@ pub async fn execute_dkg(
)
.map_err(ProtocolExecutionErr::SessionCreation)?;

let (init_keyshare, rx) = execute_protocol_generic(chans, session, session_id_hash).await?;
let (init_keyshare, broadcaster, rx) =
execute_protocol_generic(chans, session, session_id_hash).await?;

tracing::info!("Finished key init protocol");
// Setup channels for the next session
Expand Down Expand Up @@ -259,6 +260,7 @@ pub async fn execute_dkg(
} else {
// Wait to receive verifying_key
let mut rx = chans.1;
let broadcaster = chans.0;
let message = rx.recv().await.ok_or_else(|| {
ProtocolExecutionErr::IncomingStream("Waiting for validating key".to_string())
})?;
Expand Down Expand Up @@ -302,7 +304,7 @@ pub async fn execute_dkg(
inputs,
)
.map_err(ProtocolExecutionErr::SessionCreation)?;
let (new_key_share_option, rx) =
let (new_key_share_option, broadcaster, rx) =
execute_protocol_generic(chans, session, session_id_hash).await?;
let new_key_share =
new_key_share_option.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)?;
Expand Down Expand Up @@ -337,29 +339,22 @@ pub async fn execute_proactive_refresh(
chans: Channels,
threshold_pair: &sr25519::Pair,
threshold_accounts: Vec<AccountId32>,
old_key: ThresholdKeyShare<KeyParams, PartyId>,
) -> Result<ThresholdKeyShare<KeyParams, PartyId>, ProtocolExecutionErr> {
verifying_key: VerifyingKey,
threshold: usize,
inputs: KeyResharingInputs<KeyParams, PartyId>,
) -> Result<
(ThresholdKeyShare<KeyParams, PartyId>, Broadcaster, mpsc::Receiver<ProtocolMessage>),
ProtocolExecutionErr,
> {
tracing::debug!("Executing proactive refresh");
tracing::debug!("Signing with {:?}", &threshold_pair.public());

let party_ids: BTreeSet<PartyId> =
threshold_accounts.iter().cloned().map(PartyId::new).collect();
let pair = PairWrapper(threshold_pair.clone());
let verifying_key = old_key.verifying_key();

let threshold = old_key.threshold();

let session_id_hash = session_id.blake2(None)?;
let inputs = KeyResharingInputs {
old_holder: Some(OldHolder { key_share: old_key }),
new_holder: Some(NewHolder {
verifying_key,
old_threshold: party_ids.len(),
old_holders: party_ids.clone(),
}),
new_holders: party_ids.clone(),
new_threshold: threshold,
};

let session = make_key_resharing_session(
&mut OsRng,
SynedrionSessionId::from_seed(session_id_hash.as_slice()),
Expand All @@ -369,9 +364,10 @@ pub async fn execute_proactive_refresh(
)
.map_err(ProtocolExecutionErr::SessionCreation)?;

let new_key_share = execute_protocol_generic(chans, session, session_id_hash).await?.0;
let (new_key_share, brodcaster, rx) =
execute_protocol_generic(chans, session, session_id_hash).await?;

new_key_share.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)
Ok((new_key_share.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)?, brodcaster, rx))
}

/// Psuedo-randomly select a subset of the parties of size `threshold`
Expand Down
95 changes: 69 additions & 26 deletions crates/threshold-signature-server/src/signing_client/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::time::Duration;

use axum::{
body::Bytes,
extract::{
Expand All @@ -27,9 +25,12 @@ use axum::{
use blake2::{Blake2s256, Digest};
use entropy_protocol::{
execute_protocol::{execute_proactive_refresh, Channels},
KeyParams, KeyShareWithAuxInfo, Listener, PartyId, SessionId, ValidatorInfo,
protocol_transport::Broadcaster,
KeyParams, KeyShareWithAuxInfo, Listener, PartyId, ProtocolMessage, SessionId, ValidatorInfo,
};
use parity_scale_codec::Encode;
use std::{collections::BTreeSet, time::Duration};
use tokio::sync::mpsc;

use entropy_kvdb::kv_manager::{
helpers::{deserialize, serialize as key_serialize},
Expand All @@ -45,7 +46,7 @@ use subxt::{
utils::{AccountId32 as SubxtAccountId32, Static},
OnlineClient,
};
use synedrion::{AuxInfo, ThresholdKeyShare};
use synedrion::{AuxInfo, KeyResharingInputs, NewHolder, OldHolder, ThresholdKeyShare};
use tokio::time::timeout;
use x25519_dalek::StaticSecret;

Expand Down Expand Up @@ -102,7 +103,7 @@ pub async fn proactive_refresh(
) = deserialize(&old_key_share)
.ok_or_else(|| ProtocolErr::Deserialization("Failed to load KeyShare".into()))?;

let new_key_share = do_proactive_refresh(
let (new_key_share, _broadcaster, _rx) = do_proactive_refresh(
&ocw_data.validators_info,
&signer,
&x25519_secret_key,
Expand Down Expand Up @@ -165,7 +166,10 @@ pub async fn do_proactive_refresh(
verifying_key: Vec<u8>,
old_key: ThresholdKeyShare<KeyParams, PartyId>,
block_number: u32,
) -> Result<ThresholdKeyShare<KeyParams, PartyId>, ProtocolErr> {
) -> Result<
(ThresholdKeyShare<KeyParams, PartyId>, Broadcaster, mpsc::Receiver<ProtocolMessage>),
ProtocolErr,
> {
tracing::debug!("Preparing to perform proactive refresh");
tracing::debug!("Signing with {:?}", &signer.signer().public());

Expand All @@ -189,31 +193,39 @@ pub async fn do_proactive_refresh(
tss_accounts.push(tss_account);
}

// subscribe to all other participating parties. Listener waits for other subscribers.
let (rx_ready, rx_from_others, listener) =
Listener::new(converted_validator_info.clone(), &account_id);
state
.listeners
.lock()
.map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))?
.insert(session_id.clone(), listener);
let party_ids: BTreeSet<PartyId> = tss_accounts.iter().cloned().map(PartyId::new).collect();

open_protocol_connections(
&converted_validator_info,
&session_id,
signer.signer(),
let inputs = KeyResharingInputs {
old_holder: Some(OldHolder { key_share: old_key.clone() }),
new_holder: Some(NewHolder {
verifying_key: old_key.verifying_key(),
old_threshold: party_ids.len(),
old_holders: party_ids.clone(),
}),
new_holders: party_ids.clone(),
new_threshold: old_key.threshold(),
};

let channels = get_channels(
state,
converted_validator_info,
account_id,
&session_id,
signer,
x25519_secret_key,
)
.await?;
let channels = {
let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Channels(broadcast_out, rx_from_others)
};
let result =
execute_proactive_refresh(session_id, channels, signer.signer(), tss_accounts, old_key)
.await?;

let result = execute_proactive_refresh(
session_id,
channels,
signer.signer(),
tss_accounts,
old_key.verifying_key(),
old_key.threshold(),
inputs,
)
.await?;
Ok(result)
}

Expand Down Expand Up @@ -273,3 +285,34 @@ pub async fn validate_proactive_refresh(
kv_manager.kv().put(reservation, latest_block_number.to_be_bytes().to_vec()).await?;
Ok(())
}

pub async fn get_channels(
state: &ListenerState,
converted_validator_info: Vec<ValidatorInfo>,
account_id: SubxtAccountId32,
session_id: &SessionId,
signer: &PairSigner<EntropyConfig, sr25519::Pair>,
x25519_secret_key: &StaticSecret,
) -> Result<Channels, ProtocolErr> {
// subscribe to all other participating parties. Listener waits for other subscribers.
let (rx_ready, rx_from_others, listener) =
Listener::new(converted_validator_info.clone(), &account_id);
state
.listeners
.lock()
.map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))?
.insert(session_id.clone(), listener);

open_protocol_connections(
&converted_validator_info,
session_id,
signer.signer(),
state,
x25519_secret_key,
)
.await?;

let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Ok(Channels(broadcast_out, rx_from_others))
}
64 changes: 24 additions & 40 deletions crates/threshold-signature-server/src/validator/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::{
launch::{FORBIDDEN_KEYS, LATEST_BLOCK_NUMBER_RESHARE},
substrate::{get_stash_address, get_validators_info, query_chain, submit_transaction},
},
signing_client::{protocol_transport::open_protocol_connections, ProtocolErr},
signing_client::{
api::get_channels, protocol_transport::open_protocol_connections, ProtocolErr,
},
validator::errors::ValidatorErr,
AppState,
};
Expand All @@ -33,7 +35,9 @@ use entropy_protocol::Subsession;
pub use entropy_protocol::{
decode_verifying_key,
errors::ProtocolExecutionErr,
execute_protocol::{execute_protocol_generic, Channels, PairWrapper},
execute_protocol::{
execute_proactive_refresh, execute_protocol_generic, Channels, PairWrapper,
},
KeyParams, KeyShareWithAuxInfo, Listener, PartyId, SessionId, ValidatorInfo,
};
use entropy_shared::{OcwMessageReshare, NETWORK_PARENT_KEY, SETUP_TIMEOUT_SECONDS};
Expand Down Expand Up @@ -62,7 +66,7 @@ pub async fn new_reshare(
encoded_data: Bytes,
) -> Result<StatusCode, ValidatorErr> {
let data = OcwMessageReshare::decode(&mut encoded_data.as_ref())?;
// TODO: validate message came from chain (check reshare block # against current block number) see #941

let api = get_api(&app_state.configuration.endpoint).await?;
let rpc = get_rpc(&app_state.configuration.endpoint).await?;
validate_new_reshare(&api, &rpc, &data, &app_state.kv_store).await?;
Expand Down Expand Up @@ -151,8 +155,6 @@ pub async fn new_reshare(

let session_id = SessionId::Reshare { verifying_key, block_number: data.block_number };
let account_id = AccountId32(signer.signer().public().0);
let session_id_hash = session_id.blake2(Some(Subsession::Reshare))?;
let pair = PairWrapper(signer.signer().clone());

let mut converted_validator_info = vec![];
let mut tss_accounts = vec![];
Expand All @@ -166,54 +168,36 @@ pub async fn new_reshare(
tss_accounts.push(validator_info.tss_account.clone());
}

let (rx_ready, rx_from_others, listener) =
Listener::new(converted_validator_info.clone(), &account_id);
app_state
.listener_state
.listeners
.lock()
.map_err(|_| ValidatorErr::SessionError("Error getting lock".to_string()))?
.insert(session_id.clone(), listener);

open_protocol_connections(
&converted_validator_info,
&session_id,
signer.signer(),
let channels = get_channels(
&app_state.listener_state,
converted_validator_info,
account_id,
&session_id,
&signer,
&x25519_secret_key,
)
.await?;

let (channels, broadcaster) = {
let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
(Channels(broadcast_out.clone(), rx_from_others), broadcast_out)
};

let session = make_key_resharing_session(
&mut OsRng,
SynedrionSessionId::from_seed(session_id_hash.as_slice()),
pair.clone(),
&party_ids,
let (new_key_share, brodcaster, rx) = execute_proactive_refresh(
session_id.clone(),
channels,
signer.signer(),
tss_accounts,
decoded_verifying_key,
threshold as usize,
inputs,
)
.map_err(ProtocolExecutionErr::SessionCreation)?;

let (new_key_share_option, rx) = execute_protocol_generic(channels, session, session_id_hash)
.await
.map_err(|_| ValidatorErr::ProtocolError("Error executing protocol".to_string()))?;

let new_key_share = new_key_share_option.ok_or(ValidatorErr::NoOutputFromReshareProtocol)?;
.await?;

// Setup channels for the next session
let channels = Channels(broadcaster, rx);
// // Setup channels for the next session
let channels = Channels(brodcaster, rx);

// Now run an aux gen session
// // Now run an aux gen session
let session_id_hash = session_id.blake2(Some(Subsession::AuxGen))?;
let session = make_aux_gen_session(
&mut OsRng,
SynedrionSessionId::from_seed(session_id_hash.as_slice()),
pair,
PairWrapper(signer.signer().clone()),
&party_ids,
)
.map_err(ProtocolExecutionErr::SessionCreation)?;
Expand Down

0 comments on commit 4becd43

Please sign in to comment.