-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support multithreading in seq_join
/parallel_join
#873
Support multithreading in seq_join
/parallel_join
#873
Conversation
Support is currently behind a feature flag that is not enabled by default We use userspace concurrency to drive many futures in parallel by spawning tasks inside the executor. This model is not ideal for performance because memory loads will happen across thread boundaries and NUMA cores, but already gives 50% more throughput for OPRF version and 200% to old IPA.
ipa-core/src/seq_join.rs
Outdated
// SAFETY: scope object does not escape this function. All futures are driven to | ||
// completion inside it or cancelled if a panic occurs. | ||
let mut scope = unsafe { create_spawner() }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The async stuff makes it harder to reason about "escapes this function". If you view the function as being active until the boxed future completes, then yes, I think that's true. But if anyone calls mem::forget
for the boxed future before it completes, then there could be a memory safety violation.
It is my understanding though, that as long as you don't mem::forget
the future (and thus instead always either drive it to completion or drop it), then there can't be a memory safety violation. Maybe it would make sense to put mem::forget
on the disallowed function list like join_all
?
I also thought about whether we could use the safe spawn_and_block
API. That would mean consuming a thread per active query, which seems tolerable. But if we want to avoid calling spawn_and_block
from a regular tokio executor, and instead call it via thread::spawn
or tokio::task::spawn_blocking
, that brings us right back to the 'static
problem. The only way I can see it working is if we spawn some number of tasks that receive futures over channels in query::executor::do_query
(which is where the query lifetime is realized -- it is already calling tokio::spawn
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe thread::scope
helps us here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right - I mistakenly assumed that not leaking futures created within Scope
makes it safe. It does not and I wrote a test to confirm that.
It is my understanding though, that as long as you don't mem::forget the future (and thus instead always either drive it to completion or drop it), then there can't be a memory safety violation. Maybe it would make sense to put mem::forget on the disallowed function list like join_all?
that's my plan - restricting mem::forget
won't be enough, we also need to monitor usages of Box::leak
and ManuallyDrop
. I wouldn't go down that path if we were a library, but for IPA as an application this could be acceptable mitigation along with sanitizers run.
I also thought about whether we could use the safe spawn_and_block API.
I am going to play more with it but I don't feel that we can make it work. We combine the use of seq_join
with parallel_join
inside it and blocking the thread may lead to some weird deadlocks. My current thinking that it is probably not worth trying to make it safe, given that seq_join
is inherently unsafe and cannot be make safe.
ipa-core/src/seq_join.rs
Outdated
async move { | ||
b1.wait().await; | ||
for _ in 0..100 { | ||
if borrowed != &vec![1, 2, 3] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you deal with this differently, for instance, by passing in a weak reference that should remain live while the outside is operating. That would be less prone to problems of scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, changed the test to work with weak references instead.
Version that we need hasn't been published yet. I want to unblock our efforts to deploy multi-threading
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #873 +/- ##
==========================================
- Coverage 89.33% 88.82% -0.52%
==========================================
Files 180 181 +1
Lines 24230 24297 +67
==========================================
- Hits 21646 21581 -65
- Misses 2584 2716 +132 ☔ View full report in Codecov by Sentry. |
seq_join won't be happy with them anymore
They fail with too many steps issue, so we likely can't make it work for the whole protocol.
We need this fix: tokio-rs/tokio#6179
Miri does not support some operations that it currently does. We don't do IO inside in-memory tests, so that should be fine. Miri progress on supporting these is tracked [here](rust-lang/miri#2057)
The beating will continue until morale improves
It is ready for review now. I added Miri tests and currently importing @martinthomson @andyleiserson please take a look if you have time |
seq_join
/parallel_join
seq_join
/parallel_join
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty good stuff.
However, I have some reservations about the ultimate performance profile of this. That's not enough to say not to land this, but it's enough to suggest that maybe there is some new work in our futures (no pun intended).
If I understand how async-scoped works, spawning tasks will put them into a pool somewhere. Everything in that pool is polled, according to whatever scheduler logic applies.
It seems like the stream on the TokioScope
ensures that spawned tasks resolve in the order in which they were added, which makes our parallel_join
logic easier. That's the change you managed to arrange for async-scoped, which is good.
I'm interested in whether we need that sort of polling logic for a sequential join or whether something a whole lot simpler might also work.
For a sequential join, the core problem we have is that the source stream is not polled reliably. For that, we need just a single task to be spawned, one that drives the stream to completion.
The trick there is that there is no way to get the items from that stream back to the current task for polling. But decoupling could use a single mpsc::channel
to allow that to happen (I want single producer, single consumer, but apparently that's not a thing).
Basically, you would create a channel, pass the sender to a spawned task that reads from the source stream and sends the results. Then the current task would read from the receiver as we do in the single-threaded (and therefore deadlock-y) variant.
The benefit of that sort of approach is that you would be able to keep the source stream fed with polling from the spawned task, but you don't need to engage most of the nasty async-scoped stuff.
We might still need async-scoped so that we can hide the lifetime and panic horrors that are associated; after all, we would be no less exposed to the drop issues in this version than the last.
I'm trying to remember now why I previously thought that maybe that sort of approach wasn't wise, but looking into it now, I can't see why.
The primary reason I suggest this is that while the overhead from FuturesOrdered
is probably minimal, we're going to be loading those up pretty hard and maybe we can avoid needing multiple collection of futures that hold significant numbers of entries. The channel approach also greatly reduces the number of futures on hand overall, which might be a good trade.
Thanks for thorough review! Co-authored-by: Martin Thomson <[email protected]>
I agree, we are not doing anything differently here - |
Here are some numbers that steered me towards using more than a single thread for seq_join
So I think we should use Tokio executor for |
One for local spawn and one for multi-threading
Re: @martinthomson's proposal to spawn a task that polls the seq_join input stream and sends to a channel: The issue I see here is managing/controlling how far forward that task can get. If the channel has unlimited capacity, then it potentially uses as much memory as a non-streaming implementation. If the channel has limited capacity, then the question is how big the channel needs to be to prevent deadlocks -- once the channel is full, the task will stop polling the input stream, same as now. To me it feels pretty similar to the buffering solution. |
Support is currently behind a feature flag that is not enabled by default We use userspace concurrency to drive many futures in parallel by spawning tasks inside the executor. This model is not ideal for performance because memory loads will happen across thread boundaries and NUMA cores, but already gives 50% more throughput for OPRF version and 200% to old IPA.
This is just a draft for folks to take a look and provide feedback. A few things are not working and need to be fixed before pushing this to main
async_scoped
may resolve futures out of order. Under the hood, it usesFuturesUnordered
and streams elements out of it when collected. This is not ideal for ourseq_join
where we expect things to be in FIFO order. I am planning to raise a PR and suggest to useFuturesOrdered
. If that does not go through, a bit of work is required on our side to annotate futures with index and keeping a buffer.shuttle
flags our IPA tests saying it exceeded the limit of 100k steps. It is a bit suspicious as it may (or may not) indicate a cycle. I need to look deeper into it.