-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrently process messages in protocol execution loop #1136
Conversation
let artifact_tx = artifact_tx.clone(); | ||
let destination = destination.clone(); | ||
tokio::spawn(async move { | ||
let result = match session_arc.make_message(&mut OsRng, &destination) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These match expressions are a bit of a mess. If anyone knows a neater way to map these let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just spitballing here, maybe Result::and_then()
will be cleaner?
match session.finalize_round(&mut OsRng, accum)? { | ||
// Get session back out of Arc | ||
let session_inner = | ||
Arc::try_unwrap(session_arc).map_err(|_| GenericProtocolError::ArcUnwrapError)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we assume there are no other references to the session at this point, which is always the case in the happy path as the protocol only finishes when all messages are processed.
@fjarri Im not sure if there are some syendrion error cases where session.can_finalize()
will return true before all received messages have been processed. If that is the case, instead of returning an error here we should wait until all other references to session
are dropped.
The alternative would be to put it in a Arc<RwLock<>>
for the entire protocol (not just one round).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in the current synedrion
. The current manul
version can exit prematurely if there are so many invalid messages already registered that it will be impossible to finalize (and for the current protocols it means if at least one invalid message is registered). Also it will be possible to finalize early in the future with the support of nodes dropping out during the protocol (entropyxyz/manul#11).
The way I envision this API to be used is that as soon as you get the definitive answer from can_finalize()
(true
now, or CanFinalize::Yes
or Never
in manul
), you stop processing messages, wait for all the existing processing tasks to finish (there are no cancel points in them), and finalize the round (or terminate the session if the round can never be finalized, in manul
).
Now for the waiting part, I know how I would organize it in Python, but I am not exactly sure for tokio
- haven't played with async much in Rust. I just assumed that if Python can do it, it would be possible somehow in Rust too, I hope I was correct :)
@@ -33,37 +33,40 @@ use helpers::{server, ProtocolOutput}; | |||
|
|||
use std::collections::BTreeSet; | |||
|
|||
/// The maximum number of worker threads that tokio should use | |||
const MAX_THREADS: usize = 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was setting the number of tokio worker threads to num_cpus
, but i had issues running with 32 worker threads on our 32 core box. Tokio documentation says 'it is advised to keep this value on the smaller side'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of problems did you run into? And perhaps why not pick a multiple of whatever num_cpu
reports? Like half or a quarter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem was the test just hangs and only finishes when the test harness timeout runs out. I guess we could pick half of num_cpus
if num_cpus is greater than some limit. Since in CI i think we only have 2 cpus, so we definitely want both. Bear in mind this is only for tests / benchmarks. I think in production we use the tokio default which if ive understood right is actually the number of cpus: https://docs.rs/tokio/latest/tokio/attr.main.html#multi-threaded-runtime
I had to re-run CI twice to get this to pass - the failures don't look to me like they are related to this PR. I will make an issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kinda sucks that the performance numbers aren't better, but maybe the tests on a single machine aren't representative.
I'd wait until Bogdan reviews to merge since I'm not the best with async Rust.
@@ -69,11 +69,15 @@ impl RandomizedPrehashSigner<sr25519::Signature> for PairWrapper { | |||
} | |||
} | |||
|
|||
pub async fn execute_protocol_generic<Res: synedrion::ProtocolResult>( | |||
pub async fn execute_protocol_generic<Res: synedrion::ProtocolResult + 'static>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to go with a 'static
lifetime here instead of something shorter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from the error handling using the generic GenericProtocolError<Res>
in a spawned task. Probably its possible to use some other error type in the spawned task and converted to a generic error after its been passed back out to the host task. Hopefully then we wouldn't need the static lifetime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to remove the need for this static and can't come up with something.
Having read up about it, i am with @JesseAbram on this - think we should not worry.
The answer here:
https://stackoverflow.com/questions/66510485/when-is-a-static-lifetime-not-appropriate
says that 'static
means can live forever, not will live forever. We can see from this code that the references in the spawned tasks are only going to be around as long as the messages are still being processed.
match session.finalize_round(&mut OsRng, accum)? { | ||
// Get session back out of Arc | ||
let session_inner = | ||
Arc::try_unwrap(session_arc).map_err(|_| GenericProtocolError::ArcUnwrapError)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would the underlying cause for this error be though? Not actually having processed all the messages?
Asking because I'm not sure how much help we're giving the caller by just saying that we failed on unwrap an Arc
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree we should put a better error message. But ideally i would only leave this in if i was pretty sure this was never going to happen. We only get to this point in the code when the loop has broken because can_finalize
is true - which means the protocol round is finished. If there are still some references to Session
lying around in spawned tasks at that point then this will fail. But that would also mean some messages have not yet been processed, so the round should not be finished.
Since try_unwrap
actually returns the Arc
back in the error variant of the Result
if it fails, it would be possible to loop around waiting until those references get dropped because the tasks finish. But i think its only worth doing that if it is actually possible to get into such a state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for an error type describing the problem, ArcUnwrapError
.
I can't come up with an easy way to do this. |
Good, that sounds promising. I was wondering if there may be able to do it by using a deterministic RNG and running the protocol twice. On the second time all but one parties replay their responses from the first time, so only one party is actually doing any computation, and we could benchmark it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious about the benchmarks numbers you're seeing and if you did any profiling (by eyeballing or using some tool) on this? Where is the time spent?
<Res as synedrion::ProtocolResult>::ProvableError: std::marker::Send, | ||
<Res as synedrion::ProtocolResult>::CorrectnessProof: std::marker::Send, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need the full path here? Isn't Send
enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right we can just do Send
|
||
// This will happen in a host task. | ||
accum.add_processed_message(processed)??; | ||
for result in try_join_all(join_handles).await? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will abort all the process_message
tasks and return immediately. Is this what we want here? It might be exactly what we want, but just checking it's what we expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean if there is an error in one of the tasks, all will be aborted because of the ?
on try_join_all
? If so, yes i think this is what we want. If we fail to process any message we assume we cannot recover and abort the protocol session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Less because of the ?
and more because of how try_join_all
works: abort as soon as any of the tasks abort.
tokio::spawn(async move { | ||
let result = session_arc.process_message(&mut OsRng, preprocessed); | ||
if tx.send(result).await.is_err() { | ||
tracing::error!("Protocol finished before message processing result sent"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, isn't this a SendError
, i.e. the rx side of the channel closed or the buffer is full or some such shenanigans? Like, the log message seems to assume that cause is that the protocol finished prematurely but are we positive that is the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure that it is the case.
If the channel is full there will be no error, it will await
until there is space for a message.
That only leaves that the receiver is closed due to either close()
being called (which we never do) or the receiver being dropped.
match session.finalize_round(&mut OsRng, accum)? { | ||
// Get session back out of Arc | ||
let session_inner = | ||
Arc::try_unwrap(session_arc).map_err(|_| GenericProtocolError::ArcUnwrapError)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for an error type describing the problem, ArcUnwrapError
.
@@ -33,37 +33,40 @@ use helpers::{server, ProtocolOutput}; | |||
|
|||
use std::collections::BTreeSet; | |||
|
|||
/// The maximum number of worker threads that tokio should use | |||
const MAX_THREADS: usize = 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of problems did you run into? And perhaps why not pick a multiple of whatever num_cpu
reports? Like half or a quarter?
@dvdplm It was back in April i did this so things might have changed, and i didn't have any special technique other than adding a lot of log lines with I'm pretty sure i came to the conclusion that incoming message processing is the biggest time hog for the signing protocol. I didn't pay much attention to the other protocols because the signing protocol is the only one where we keep the user waiting. It would be great to look at this in more detail. Probably it makes more sense to look at synedrion's benchmarking setup, but i wanted to also do it here so that we can try to optimise the execution loop like in this PR, and also to take into account the overhead of websockets and encryption. Thanks for taking the time to review this btw. |
Related issue: #641
Now that some fixes in synedrion mean that
Session
issync
, and we only need read access toSession
to process / create messages, message processing / creation from different peers during a round should be able to happen concurrently without needing a mutex. Things are a bit more complicated than last time i tried to do this because we have to handle messages from future sub-protocol for the DKG flow (which is now composed of 3 sub-protocols).This PR adds concurrency both for processing incoming messages (calls to
session.process_message
) and outgoing messages (calls tosession.make_message
).Since most of the time goes into processing incoming messages (according to my benchmarks from a while back), i was expecting this to speed things up significantly with a large number of parties. Unfortunately, it does not make too much difference (on my laptop, the signing protocol with 8 parties went from 19.7s to 17.9s). I think the problem is that i am simulating all parties on one machine, so i don't really benefit from the concurrency - cpu cores are spread across all parties so the total time taken doesn't change much. I hope that in production this wont be the case, because the other parties will be running on other machines.
Trying this on our TDX box, which has 32 cores, the signing protocol with 16 parties takes 12.94s on this branch and 15.25s on master. I was hoping for a bigger difference.