Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrently process messages in protocol execution loop #1136

Merged
merged 12 commits into from
Nov 1, 2024

Conversation

ameba23
Copy link
Contributor

@ameba23 ameba23 commented Oct 28, 2024

Related issue: #641

Now that some fixes in synedrion mean that Session is sync, and we only need read access to Session to process / create messages, message processing / creation from different peers during a round should be able to happen concurrently without needing a mutex. Things are a bit more complicated than last time i tried to do this because we have to handle messages from future sub-protocol for the DKG flow (which is now composed of 3 sub-protocols).

This PR adds concurrency both for processing incoming messages (calls to session.process_message) and outgoing messages (calls to session.make_message).

Since most of the time goes into processing incoming messages (according to my benchmarks from a while back), i was expecting this to speed things up significantly with a large number of parties. Unfortunately, it does not make too much difference (on my laptop, the signing protocol with 8 parties went from 19.7s to 17.9s). I think the problem is that i am simulating all parties on one machine, so i don't really benefit from the concurrency - cpu cores are spread across all parties so the total time taken doesn't change much. I hope that in production this wont be the case, because the other parties will be running on other machines.

Trying this on our TDX box, which has 32 cores, the signing protocol with 16 parties takes 12.94s on this branch and 15.25s on master. I was hoping for a bigger difference.

@ameba23 ameba23 marked this pull request as draft October 28, 2024 12:11
let artifact_tx = artifact_tx.clone();
let destination = destination.clone();
tokio::spawn(async move {
let result = match session_arc.make_message(&mut OsRng, &destination) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These match expressions are a bit of a mess. If anyone knows a neater way to map these let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just spitballing here, maybe Result::and_then() will be cleaner?

match session.finalize_round(&mut OsRng, accum)? {
// Get session back out of Arc
let session_inner =
Arc::try_unwrap(session_arc).map_err(|_| GenericProtocolError::ArcUnwrapError)?;
Copy link
Contributor Author

@ameba23 ameba23 Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we assume there are no other references to the session at this point, which is always the case in the happy path as the protocol only finishes when all messages are processed.

@fjarri Im not sure if there are some syendrion error cases where session.can_finalize() will return true before all received messages have been processed. If that is the case, instead of returning an error here we should wait until all other references to session are dropped.

The alternative would be to put it in a Arc<RwLock<>> for the entire protocol (not just one round).

Copy link
Member

@fjarri fjarri Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in the current synedrion. The current manul version can exit prematurely if there are so many invalid messages already registered that it will be impossible to finalize (and for the current protocols it means if at least one invalid message is registered). Also it will be possible to finalize early in the future with the support of nodes dropping out during the protocol (entropyxyz/manul#11).

The way I envision this API to be used is that as soon as you get the definitive answer from can_finalize() (true now, or CanFinalize::Yes or Never in manul), you stop processing messages, wait for all the existing processing tasks to finish (there are no cancel points in them), and finalize the round (or terminate the session if the round can never be finalized, in manul).

Now for the waiting part, I know how I would organize it in Python, but I am not exactly sure for tokio - haven't played with async much in Rust. I just assumed that if Python can do it, it would be possible somehow in Rust too, I hope I was correct :)

@@ -33,37 +33,40 @@ use helpers::{server, ProtocolOutput};

use std::collections::BTreeSet;

/// The maximum number of worker threads that tokio should use
const MAX_THREADS: usize = 16;
Copy link
Contributor Author

@ameba23 ameba23 Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was setting the number of tokio worker threads to num_cpus, but i had issues running with 32 worker threads on our 32 core box. Tokio documentation says 'it is advised to keep this value on the smaller side'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of problems did you run into? And perhaps why not pick a multiple of whatever num_cpu reports? Like half or a quarter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was the test just hangs and only finishes when the test harness timeout runs out. I guess we could pick half of num_cpus if num_cpus is greater than some limit. Since in CI i think we only have 2 cpus, so we definitely want both. Bear in mind this is only for tests / benchmarks. I think in production we use the tokio default which if ive understood right is actually the number of cpus: https://docs.rs/tokio/latest/tokio/attr.main.html#multi-threaded-runtime

@ameba23
Copy link
Contributor Author

ameba23 commented Oct 30, 2024

I had to re-run CI twice to get this to pass - the failures don't look to me like they are related to this PR. I will make an issue.

@ameba23 ameba23 marked this pull request as ready for review October 30, 2024 13:00
Copy link
Collaborator

@HCastano HCastano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda sucks that the performance numbers aren't better, but maybe the tests on a single machine aren't representative.

I'd wait until Bogdan reviews to merge since I'm not the best with async Rust.

@@ -69,11 +69,15 @@ impl RandomizedPrehashSigner<sr25519::Signature> for PairWrapper {
}
}

pub async fn execute_protocol_generic<Res: synedrion::ProtocolResult>(
pub async fn execute_protocol_generic<Res: synedrion::ProtocolResult + 'static>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to go with a 'static lifetime here instead of something shorter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from the error handling using the generic GenericProtocolError<Res> in a spawned task. Probably its possible to use some other error type in the spawned task and converted to a generic error after its been passed back out to the host task. Hopefully then we wouldn't need the static lifetime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to remove the need for this static and can't come up with something.

Having read up about it, i am with @JesseAbram on this - think we should not worry.

The answer here:
https://stackoverflow.com/questions/66510485/when-is-a-static-lifetime-not-appropriate

says that 'static means can live forever, not will live forever. We can see from this code that the references in the spawned tasks are only going to be around as long as the messages are still being processed.

match session.finalize_round(&mut OsRng, accum)? {
// Get session back out of Arc
let session_inner =
Arc::try_unwrap(session_arc).map_err(|_| GenericProtocolError::ArcUnwrapError)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the underlying cause for this error be though? Not actually having processed all the messages?

Asking because I'm not sure how much help we're giving the caller by just saying that we failed on unwrap an Arc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree we should put a better error message. But ideally i would only leave this in if i was pretty sure this was never going to happen. We only get to this point in the code when the loop has broken because can_finalize is true - which means the protocol round is finished. If there are still some references to Session lying around in spawned tasks at that point then this will fail. But that would also mean some messages have not yet been processed, so the round should not be finished.

Since try_unwrap actually returns the Arc back in the error variant of the Result if it fails, it would be possible to loop around waiting until those references get dropped because the tasks finish. But i think its only worth doing that if it is actually possible to get into such a state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for an error type describing the problem, ArcUnwrapError.

crates/protocol/src/execute_protocol.rs Outdated Show resolved Hide resolved
crates/protocol/src/execute_protocol.rs Outdated Show resolved Hide resolved
@fjarri
Copy link
Member

fjarri commented Oct 30, 2024

I think the problem is that i am simulating all parties on one machine, so i don't really benefit from the concurrency - cpu cores are spread across all parties so the total time taken doesn't change much.

I can't come up with an easy way to do this. manul has a testing helper that takes a list of sessions and simulates execution in a single thread; we could extend that to optionally add parallel message processing, so that you could run the protocol with and without it in a benchmark and see a more informative difference. But that has to wait until entropyxyz/synedrion#156 is in.

@ameba23
Copy link
Contributor Author

ameba23 commented Oct 30, 2024

I can't come up with an easy way to do this. manul has a testing helper that takes a list of sessions and simulates execution in a single thread; we could extend that to optionally add parallel message processing, so that you could run the protocol with and without it in a benchmark and see a more informative difference. But that has to wait until entropyxyz/synedrion#156 is in.

Good, that sounds promising. I was wondering if there may be able to do it by using a deterministic RNG and running the protocol twice. On the second time all but one parties replay their responses from the first time, so only one party is actually doing any computation, and we could benchmark it.

@ameba23 ameba23 merged commit 4528bf8 into master Nov 1, 2024
8 checks passed
@ameba23 ameba23 deleted the peg/execute-protocol-concurrent branch November 1, 2024 16:09
Copy link
Contributor

@dvdplm dvdplm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious about the benchmarks numbers you're seeing and if you did any profiling (by eyeballing or using some tool) on this? Where is the time spent?

Comment on lines +79 to +80
<Res as synedrion::ProtocolResult>::ProvableError: std::marker::Send,
<Res as synedrion::ProtocolResult>::CorrectnessProof: std::marker::Send,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need the full path here? Isn't Send enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right we can just do Send


// This will happen in a host task.
accum.add_processed_message(processed)??;
for result in try_join_all(join_handles).await? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will abort all the process_message tasks and return immediately. Is this what we want here? It might be exactly what we want, but just checking it's what we expect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if there is an error in one of the tasks, all will be aborted because of the ? on try_join_all? If so, yes i think this is what we want. If we fail to process any message we assume we cannot recover and abort the protocol session.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less because of the ? and more because of how try_join_all works: abort as soon as any of the tasks abort.

tokio::spawn(async move {
let result = session_arc.process_message(&mut OsRng, preprocessed);
if tx.send(result).await.is_err() {
tracing::error!("Protocol finished before message processing result sent");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, isn't this a SendError, i.e. the rx side of the channel closed or the buffer is full or some such shenanigans? Like, the log message seems to assume that cause is that the protocol finished prematurely but are we positive that is the case?

Copy link
Contributor Author

@ameba23 ameba23 Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure that it is the case.

If the channel is full there will be no error, it will await until there is space for a message.

That only leaves that the receiver is closed due to either close() being called (which we never do) or the receiver being dropped.

match session.finalize_round(&mut OsRng, accum)? {
// Get session back out of Arc
let session_inner =
Arc::try_unwrap(session_arc).map_err(|_| GenericProtocolError::ArcUnwrapError)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for an error type describing the problem, ArcUnwrapError.

@@ -33,37 +33,40 @@ use helpers::{server, ProtocolOutput};

use std::collections::BTreeSet;

/// The maximum number of worker threads that tokio should use
const MAX_THREADS: usize = 16;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of problems did you run into? And perhaps why not pick a multiple of whatever num_cpu reports? Like half or a quarter?

@ameba23
Copy link
Contributor Author

ameba23 commented Nov 4, 2024

I am curious about the benchmarks numbers you're seeing and if you did any profiling (by eyeballing or using some tool) on this? Where is the time spent?

@dvdplm It was back in April i did this so things might have changed, and i didn't have any special technique other than adding a lot of log lines with std::time::Instant::elapsed(). I was using criterion for benchmarking, but abandoned it because of strange problems with things appearing to work on my laptop but not on @HCastano 's. See
#738 (comment)

I'm pretty sure i came to the conclusion that incoming message processing is the biggest time hog for the signing protocol. I didn't pay much attention to the other protocols because the signing protocol is the only one where we keep the user waiting.

It would be great to look at this in more detail. Probably it makes more sense to look at synedrion's benchmarking setup, but i wanted to also do it here so that we can try to optimise the execution loop like in this PR, and also to take into account the overhead of websockets and encryption.

Thanks for taking the time to review this btw.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants