Skip to content

Commit

Permalink
Refactor reshare (#994)
Browse files Browse the repository at this point in the history
* Refactor reshare

* clean

* changelog

* increase timeout

* Apply suggestions from code review

Co-authored-by: Hernando Castano <[email protected]>

* clean

* clean

---------

Co-authored-by: Hernando Castano <[email protected]>
  • Loading branch information
JesseAbram and HCastano authored Aug 14, 2024
1 parent 2fc472f commit 3c3fd2e
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 150 deletions.
67 changes: 34 additions & 33 deletions crates/protocol/src/execute_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub async fn execute_protocol_generic<Res: synedrion::ProtocolResult>(
mut chans: Channels,
session: Session<Res, sr25519::Signature, PairWrapper, PartyId>,
session_id_hash: [u8; 32],
) -> Result<(Res::Success, mpsc::Receiver<ProtocolMessage>), GenericProtocolError<Res>> {
) -> Result<(Res::Success, Channels), GenericProtocolError<Res>> {
let session_id = synedrion::SessionId::from_seed(&session_id_hash);
let tx = &chans.0;
let rx = &mut chans.1;
Expand Down Expand Up @@ -143,7 +143,7 @@ pub async fn execute_protocol_generic<Res: synedrion::ProtocolResult>(
}

match session.finalize_round(&mut OsRng, accum)? {
FinalizeOutcome::Success(res) => break Ok((res, chans.1)),
FinalizeOutcome::Success(res) => break Ok((res, chans)),
FinalizeOutcome::AnotherRound {
session: new_session,
cached_messages: new_cached_messages,
Expand Down Expand Up @@ -208,7 +208,6 @@ pub async fn execute_dkg(
threshold: usize,
) -> Result<KeyShareWithAuxInfo, ProtocolExecutionErr> {
tracing::debug!("Executing DKG");
let broadcaster = chans.0.clone();

let party_ids: BTreeSet<PartyId> =
threshold_accounts.iter().cloned().map(PartyId::new).collect();
Expand All @@ -231,11 +230,10 @@ pub async fn execute_dkg(
)
.map_err(ProtocolExecutionErr::SessionCreation)?;

let (init_keyshare, rx) = execute_protocol_generic(chans, session, session_id_hash).await?;
let (init_keyshare, chans) =
execute_protocol_generic(chans, session, session_id_hash).await?;

tracing::info!("Finished key init protocol");
// Setup channels for the next session
let chans = Channels(broadcaster.clone(), rx);

// Send verifying key
let verifying_key = init_keyshare.verifying_key();
Expand All @@ -259,6 +257,7 @@ pub async fn execute_dkg(
} else {
// Wait to receive verifying_key
let mut rx = chans.1;
let broadcaster = chans.0;
let message = rx.recv().await.ok_or_else(|| {
ProtocolExecutionErr::IncomingStream("Waiting for validating key".to_string())
})?;
Expand Down Expand Up @@ -302,15 +301,12 @@ pub async fn execute_dkg(
inputs,
)
.map_err(ProtocolExecutionErr::SessionCreation)?;
let (new_key_share_option, rx) =
let (new_key_share_option, chans) =
execute_protocol_generic(chans, session, session_id_hash).await?;
let new_key_share =
new_key_share_option.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)?;
tracing::info!("Finished reshare protocol");

// Setup channels for the next session
let chans = Channels(broadcaster.clone(), rx);

// Now run the aux gen protocol to get AuxInfo
let session_id_hash = session_id.blake2(Some(Subsession::AuxGen))?;
let session = make_aux_gen_session(
Expand All @@ -327,51 +323,56 @@ pub async fn execute_dkg(
}

/// Execute proactive refresh.
#[allow(clippy::type_complexity)]
#[tracing::instrument(
skip_all,
fields(threshold_accounts, my_idx),
level = tracing::Level::DEBUG
)]
pub async fn execute_proactive_refresh(
pub async fn execute_reshare(
session_id: SessionId,
chans: Channels,
threshold_pair: &sr25519::Pair,
threshold_accounts: Vec<AccountId32>,
old_key: ThresholdKeyShare<KeyParams, PartyId>,
) -> Result<ThresholdKeyShare<KeyParams, PartyId>, ProtocolExecutionErr> {
inputs: KeyResharingInputs<KeyParams, PartyId>,
aux_info_option: Option<AuxInfo<KeyParams, PartyId>>,
) -> Result<
(ThresholdKeyShare<KeyParams, PartyId>, AuxInfo<KeyParams, PartyId>),
ProtocolExecutionErr,
> {
tracing::debug!("Executing proactive refresh");
tracing::debug!("Signing with {:?}", &threshold_pair.public());

let party_ids: BTreeSet<PartyId> =
threshold_accounts.iter().cloned().map(PartyId::new).collect();
let pair = PairWrapper(threshold_pair.clone());
let verifying_key = old_key.verifying_key();

let threshold = old_key.threshold();

let session_id_hash = session_id.blake2(None)?;
let inputs = KeyResharingInputs {
old_holder: Some(OldHolder { key_share: old_key }),
new_holder: Some(NewHolder {
verifying_key,
old_threshold: party_ids.len(),
old_holders: party_ids.clone(),
}),
new_holders: party_ids.clone(),
new_threshold: threshold,
};

let session = make_key_resharing_session(
&mut OsRng,
SynedrionSessionId::from_seed(session_id_hash.as_slice()),
pair,
&party_ids,
inputs,
&inputs.new_holders,
inputs.clone(),
)
.map_err(ProtocolExecutionErr::SessionCreation)?;

let new_key_share = execute_protocol_generic(chans, session, session_id_hash).await?.0;
let (new_key_share, chans) = execute_protocol_generic(chans, session, session_id_hash).await?;
let aux_info = if let Some(aux_info) = aux_info_option {
aux_info
} else {
// Now run an aux gen session
let session_id_hash_aux_data = session_id.blake2(Some(Subsession::AuxGen))?;
let session = make_aux_gen_session(
&mut OsRng,
SynedrionSessionId::from_seed(session_id_hash_aux_data.as_slice()),
PairWrapper(threshold_pair.clone()),
&inputs.new_holders,
)
.map_err(ProtocolExecutionErr::SessionCreation)?;

execute_protocol_generic(chans, session, session_id_hash_aux_data).await?.0
};

new_key_share.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)
Ok((new_key_share.ok_or(ProtocolExecutionErr::NoOutputFromReshareProtocol)?, aux_info))
}

/// Psuedo-randomly select a subset of the parties of size `threshold`
Expand Down
32 changes: 19 additions & 13 deletions crates/protocol/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
//! A simple protocol server, like a mini version of entropy-tss, for benchmarking
use anyhow::{anyhow, ensure};
use entropy_protocol::{
execute_protocol::{
execute_dkg, execute_proactive_refresh, execute_signing_protocol, Channels,
},
execute_protocol::{execute_dkg, execute_reshare, execute_signing_protocol, Channels},
protocol_transport::{
errors::WsError,
noise::{noise_handshake_initiator, noise_handshake_responder},
Expand All @@ -31,12 +29,13 @@ use entropy_shared::X25519PublicKey;
use futures::future;
use sp_core::{sr25519, Pair};
use std::{
collections::BTreeSet,
fmt,
sync::{Arc, Mutex},
time::Duration,
};
use subxt::utils::AccountId32;
use synedrion::{AuxInfo, KeyShare, ThresholdKeyShare};
use synedrion::{AuxInfo, KeyResharingInputs, KeyShare, NewHolder, OldHolder, ThresholdKeyShare};
use tokio::{
net::{TcpListener, TcpStream},
time::timeout,
Expand Down Expand Up @@ -131,15 +130,22 @@ pub async fn server(
Ok(ProtocolOutput::Sign(RecoverableSignature { signature, recovery_id }))
},
SessionId::Reshare { .. } => {
let new_keyshare = execute_proactive_refresh(
session_id,
channels,
&pair,
tss_accounts,
threshold_keyshare.unwrap(),
)
.await?;
Ok(ProtocolOutput::Reshare(new_keyshare))
let old_key = threshold_keyshare.unwrap();
let party_ids: BTreeSet<PartyId> =
tss_accounts.iter().cloned().map(PartyId::new).collect();
let inputs = KeyResharingInputs {
old_holder: Some(OldHolder { key_share: old_key.clone() }),
new_holder: Some(NewHolder {
verifying_key: old_key.verifying_key(),
old_threshold: party_ids.len(),
old_holders: party_ids.clone(),
}),
new_holders: party_ids.clone(),
new_threshold: old_key.threshold(),
};

let new_keyshare = execute_reshare(session_id, channels, &pair, inputs, None).await?;
Ok(ProtocolOutput::Reshare(new_keyshare.0))
},
SessionId::Dkg { .. } => {
let keyshare_and_aux_info =
Expand Down
97 changes: 60 additions & 37 deletions crates/threshold-signature-server/src/signing_client/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::time::Duration;

use axum::{
body::Bytes,
extract::{
Expand All @@ -26,10 +24,11 @@ use axum::{
};
use blake2::{Blake2s256, Digest};
use entropy_protocol::{
execute_protocol::{execute_proactive_refresh, Channels},
KeyParams, KeyShareWithAuxInfo, Listener, PartyId, SessionId, ValidatorInfo,
execute_protocol::{execute_reshare, Channels},
KeyParams, Listener, PartyId, SessionId, ValidatorInfo,
};
use parity_scale_codec::Encode;
use std::{collections::BTreeSet, time::Duration};

use entropy_kvdb::kv_manager::{
helpers::{deserialize, serialize as key_serialize},
Expand All @@ -45,7 +44,7 @@ use subxt::{
utils::{AccountId32 as SubxtAccountId32, Static},
OnlineClient,
};
use synedrion::{AuxInfo, ThresholdKeyShare};
use synedrion::{AuxInfo, KeyResharingInputs, NewHolder, OldHolder, ThresholdKeyShare};
use tokio::time::timeout;
use x25519_dalek::StaticSecret;

Expand Down Expand Up @@ -96,33 +95,24 @@ pub async fn proactive_refresh(
let exists_result = app_state.kv_store.kv().exists(&key).await?;
if exists_result {
let old_key_share = app_state.kv_store.kv().get(&key).await?;
let (deserialized_old_key, _aux_info): (
let (deserialized_old_key, aux_info): (
ThresholdKeyShare<KeyParams, PartyId>,
AuxInfo<KeyParams, PartyId>,
) = deserialize(&old_key_share)
.ok_or_else(|| ProtocolErr::Deserialization("Failed to load KeyShare".into()))?;

let new_key_share = do_proactive_refresh(
let (new_key_share, aux_info) = do_proactive_refresh(
&ocw_data.validators_info,
&signer,
&x25519_secret_key,
&app_state.listener_state,
encoded_key,
deserialized_old_key,
ocw_data.block_number,
aux_info,
)
.await?;

// Get aux info from existing entry
let aux_info = {
let existing_entry = app_state.kv_store.kv().get(&key).await?;
let (_old_key_share, aux_info): KeyShareWithAuxInfo = deserialize(&existing_entry)
.ok_or_else(|| {
ProtocolErr::Deserialization("Failed to load KeyShare".into())
})?;
aux_info
};

// Since this is a refresh with the parties not changing, store the old aux_info
let serialized_key_share = key_serialize(&(new_key_share, aux_info))
.map_err(|_| ProtocolErr::KvSerialize("Kv Serialize Error".to_string()))?;
Expand Down Expand Up @@ -152,6 +142,7 @@ async fn handle_socket_result(socket: WebSocket, app_state: AppState) {
};
}

#[allow(clippy::type_complexity, clippy::too_many_arguments)]
#[tracing::instrument(
skip_all,
fields(validators_info, verifying_key, my_subgroup),
Expand All @@ -165,7 +156,8 @@ pub async fn do_proactive_refresh(
verifying_key: Vec<u8>,
old_key: ThresholdKeyShare<KeyParams, PartyId>,
block_number: u32,
) -> Result<ThresholdKeyShare<KeyParams, PartyId>, ProtocolErr> {
aux_info: AuxInfo<KeyParams, PartyId>,
) -> Result<(ThresholdKeyShare<KeyParams, PartyId>, AuxInfo<KeyParams, PartyId>), ProtocolErr> {
tracing::debug!("Preparing to perform proactive refresh");
tracing::debug!("Signing with {:?}", &signer.signer().public());

Expand All @@ -189,31 +181,31 @@ pub async fn do_proactive_refresh(
tss_accounts.push(tss_account);
}

// subscribe to all other participating parties. Listener waits for other subscribers.
let (rx_ready, rx_from_others, listener) =
Listener::new(converted_validator_info.clone(), &account_id);
state
.listeners
.lock()
.map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))?
.insert(session_id.clone(), listener);
let party_ids: BTreeSet<PartyId> = tss_accounts.iter().cloned().map(PartyId::new).collect();

open_protocol_connections(
&converted_validator_info,
&session_id,
signer.signer(),
let inputs = KeyResharingInputs {
old_holder: Some(OldHolder { key_share: old_key.clone() }),
new_holder: Some(NewHolder {
verifying_key: old_key.verifying_key(),
old_threshold: party_ids.len(),
old_holders: party_ids.clone(),
}),
new_holders: party_ids.clone(),
new_threshold: old_key.threshold(),
};

let channels = get_channels(
state,
converted_validator_info,
account_id,
&session_id,
signer,
x25519_secret_key,
)
.await?;
let channels = {
let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Channels(broadcast_out, rx_from_others)
};

let result =
execute_proactive_refresh(session_id, channels, signer.signer(), tss_accounts, old_key)
.await?;
execute_reshare(session_id, channels, signer.signer(), inputs, Some(aux_info)).await?;
Ok(result)
}

Expand Down Expand Up @@ -273,3 +265,34 @@ pub async fn validate_proactive_refresh(
kv_manager.kv().put(reservation, latest_block_number.to_be_bytes().to_vec()).await?;
Ok(())
}

pub async fn get_channels(
state: &ListenerState,
converted_validator_info: Vec<ValidatorInfo>,
account_id: SubxtAccountId32,
session_id: &SessionId,
signer: &PairSigner<EntropyConfig, sr25519::Pair>,
x25519_secret_key: &StaticSecret,
) -> Result<Channels, ProtocolErr> {
// subscribe to all other participating parties. Listener waits for other subscribers.
let (rx_ready, rx_from_others, listener) =
Listener::new(converted_validator_info.clone(), &account_id);
state
.listeners
.lock()
.map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))?
.insert(session_id.clone(), listener);

open_protocol_connections(
&converted_validator_info,
session_id,
signer.signer(),
state,
x25519_secret_key,
)
.await?;

let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Ok(Channels(broadcast_out, rx_from_others))
}
2 changes: 1 addition & 1 deletion crates/threshold-signature-server/src/user/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ async fn test_store_share() {

let mut new_verifying_key = vec![];
// wait for registered event check that key exists in kvdb
for _ in 0..45 {
for _ in 0..65 {
std::thread::sleep(std::time::Duration::from_millis(1000));
let block_hash = rpc.chain_get_block_hash(None).await.unwrap();
let events = EventsClient::new(api.clone()).at(block_hash.unwrap()).await.unwrap();
Expand Down
Loading

0 comments on commit 3c3fd2e

Please sign in to comment.