Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multithreading in seq_join/parallel_join #873

Merged

Conversation

akoshelev
Copy link
Collaborator

Support is currently behind a feature flag that is not enabled by default We use userspace concurrency to drive many futures in parallel by spawning tasks inside the executor. This model is not ideal for performance because memory loads will happen across thread boundaries and NUMA cores, but already gives 50% more throughput for OPRF version and 200% to old IPA.

This is just a draft for folks to take a look and provide feedback. A few things are not working and need to be fixed before pushing this to main

  • async_scoped may resolve futures out of order. Under the hood, it uses FuturesUnordered and streams elements out of it when collected. This is not ideal for our seq_join where we expect things to be in FIFO order. I am planning to raise a PR and suggest to use FuturesOrdered. If that does not go through, a bit of work is required on our side to annotate futures with index and keeping a buffer.
  • shuttle flags our IPA tests saying it exceeded the limit of 100k steps. It is a bit suspicious as it may (or may not) indicate a cycle. I need to look deeper into it.

Support is currently behind a feature flag that is not enabled by default
We use userspace concurrency to drive many futures in parallel by spawning
tasks inside the executor. This model is not ideal for performance because
memory loads will happen across thread boundaries and NUMA cores, but
already gives 50% more throughput for OPRF version and 200% to old IPA.
@akoshelev akoshelev marked this pull request as draft November 30, 2023 20:18
This was referenced Dec 21, 2023
Comment on lines 418 to 420
// SAFETY: scope object does not escape this function. All futures are driven to
// completion inside it or cancelled if a panic occurs.
let mut scope = unsafe { create_spawner() };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async stuff makes it harder to reason about "escapes this function". If you view the function as being active until the boxed future completes, then yes, I think that's true. But if anyone calls mem::forget for the boxed future before it completes, then there could be a memory safety violation.

It is my understanding though, that as long as you don't mem::forget the future (and thus instead always either drive it to completion or drop it), then there can't be a memory safety violation. Maybe it would make sense to put mem::forget on the disallowed function list like join_all?

I also thought about whether we could use the safe spawn_and_block API. That would mean consuming a thread per active query, which seems tolerable. But if we want to avoid calling spawn_and_block from a regular tokio executor, and instead call it via thread::spawn or tokio::task::spawn_blocking, that brings us right back to the 'static problem. The only way I can see it working is if we spawn some number of tasks that receive futures over channels in query::executor::do_query (which is where the query lifetime is realized -- it is already calling tokio::spawn).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe thread::scope helps us here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right - I mistakenly assumed that not leaking futures created within Scope makes it safe. It does not and I wrote a test to confirm that.

It is my understanding though, that as long as you don't mem::forget the future (and thus instead always either drive it to completion or drop it), then there can't be a memory safety violation. Maybe it would make sense to put mem::forget on the disallowed function list like join_all?

that's my plan - restricting mem::forget won't be enough, we also need to monitor usages of Box::leak and ManuallyDrop. I wouldn't go down that path if we were a library, but for IPA as an application this could be acceptable mitigation along with sanitizers run.

I also thought about whether we could use the safe spawn_and_block API.

I am going to play more with it but I don't feel that we can make it work. We combine the use of seq_join with parallel_join inside it and blocking the thread may lead to some weird deadlocks. My current thinking that it is probably not worth trying to make it safe, given that seq_join is inherently unsafe and cannot be make safe.

async move {
b1.wait().await;
for _ in 0..100 {
if borrowed != &vec![1, 2, 3] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you deal with this differently, for instance, by passing in a weak reference that should remain live while the outside is operating. That would be less prone to problems of scope.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, changed the test to work with weak references instead.

Version that we need hasn't been published yet. I want to unblock our efforts to deploy multi-threading
Copy link

codecov bot commented Jan 18, 2024

Codecov Report

Attention: 2 lines in your changes are missing coverage. Please review.

Comparison is base (3b9eb5c) 89.33% compared to head (28c945e) 88.82%.

Files Patch % Lines
ipa-core/src/seq_join/local.rs 96.00% 1 Missing ⚠️
ipa-core/src/seq_join/mod.rs 99.25% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #873      +/-   ##
==========================================
- Coverage   89.33%   88.82%   -0.52%     
==========================================
  Files         180      181       +1     
  Lines       24230    24297      +67     
==========================================
- Hits        21646    21581      -65     
- Misses       2584     2716     +132     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@akoshelev
Copy link
Collaborator Author

It is ready for review now. I added Miri tests and currently importing async_scoped from github because 0.9 hasn't been released yet.

@martinthomson @andyleiserson please take a look if you have time

@akoshelev akoshelev marked this pull request as ready for review January 19, 2024 21:56
@akoshelev akoshelev changed the title [DRAFT] Support multithreading in seq_join/parallel_join Support multithreading in seq_join/parallel_join Jan 19, 2024
Copy link
Member

@martinthomson martinthomson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty good stuff.

However, I have some reservations about the ultimate performance profile of this. That's not enough to say not to land this, but it's enough to suggest that maybe there is some new work in our futures (no pun intended).

If I understand how async-scoped works, spawning tasks will put them into a pool somewhere. Everything in that pool is polled, according to whatever scheduler logic applies.

It seems like the stream on the TokioScope ensures that spawned tasks resolve in the order in which they were added, which makes our parallel_join logic easier. That's the change you managed to arrange for async-scoped, which is good.

I'm interested in whether we need that sort of polling logic for a sequential join or whether something a whole lot simpler might also work.

For a sequential join, the core problem we have is that the source stream is not polled reliably. For that, we need just a single task to be spawned, one that drives the stream to completion.

The trick there is that there is no way to get the items from that stream back to the current task for polling. But decoupling could use a single mpsc::channel to allow that to happen (I want single producer, single consumer, but apparently that's not a thing).

Basically, you would create a channel, pass the sender to a spawned task that reads from the source stream and sends the results. Then the current task would read from the receiver as we do in the single-threaded (and therefore deadlock-y) variant.

The benefit of that sort of approach is that you would be able to keep the source stream fed with polling from the spawned task, but you don't need to engage most of the nasty async-scoped stuff.

We might still need async-scoped so that we can hide the lifetime and panic horrors that are associated; after all, we would be no less exposed to the drop issues in this version than the last.

I'm trying to remember now why I previously thought that maybe that sort of approach wasn't wise, but looking into it now, I can't see why.

The primary reason I suggest this is that while the overhead from FuturesOrdered is probably minimal, we're going to be loading those up pretty hard and maybe we can avoid needing multiple collection of futures that hold significant numbers of entries. The channel approach also greatly reduces the number of futures on hand overall, which might be a good trade.

.github/workflows/check.yml Show resolved Hide resolved
ipa-core/Cargo.toml Outdated Show resolved Hide resolved
ipa-core/src/lib.rs Outdated Show resolved Hide resolved
ipa-core/src/seq_join.rs Outdated Show resolved Hide resolved
ipa-core/src/seq_join.rs Outdated Show resolved Hide resolved
ipa-core/src/seq_join.rs Outdated Show resolved Hide resolved
ipa-core/src/seq_join.rs Outdated Show resolved Hide resolved
ipa-core/src/seq_join.rs Outdated Show resolved Hide resolved
ipa-core/src/seq_join.rs Outdated Show resolved Hide resolved
ipa-core/src/seq_join.rs Outdated Show resolved Hide resolved
akoshelev and others added 3 commits January 22, 2024 10:50
@akoshelev
Copy link
Collaborator Author

The primary reason I suggest this is that while the overhead from FuturesOrdered is probably minimal, we're going to be loading those up pretty hard and maybe we can avoid needing multiple collection of futures that hold significant numbers of entries. The channel approach also greatly reduces the number of futures on hand overall, which might be a good trade.

I agree, we are not doing anything differently here - FuturesOrdered is used inside futures::try_join_all for the same purpose, so there should be no added cost of managing the FIFO order.

@akoshelev
Copy link
Collaborator Author

I'm interested in whether we need that sort of polling logic for a sequential join or whether something a whole lot simpler might also work.

Here are some numbers that steered me towards using more than a single thread for seq_join

single thread:

Running IPA for 100000 records took 1245s

multithreaded seq_join with metrics disabled

Running IPA for 100000 records took 345.480439625s

So I think we should use Tokio executor for seq_join. However, I agree that long term I would like to see parallelism applied somewhere else. We will hit the scaling bottleneck (if we haven't already) soon because of allocation thrashing and random read/write access across many threads. A much better approach imo would be aligning every step with its own task, making sure allocations there stay local all the way to the kernel stack (including RSS/XPS). This would be a much more involved change, but worth considering when we don't have low-hanging fruits anymore

One for local spawn and one for multi-threading
@andyleiserson
Copy link
Collaborator

Re: @martinthomson's proposal to spawn a task that polls the seq_join input stream and sends to a channel:

The issue I see here is managing/controlling how far forward that task can get. If the channel has unlimited capacity, then it potentially uses as much memory as a non-streaming implementation. If the channel has limited capacity, then the question is how big the channel needs to be to prevent deadlocks -- once the channel is full, the task will stop polling the input stream, same as now. To me it feels pretty similar to the buffering solution.

@akoshelev akoshelev merged commit 4292569 into private-attribution:main Jan 29, 2024
12 checks passed
@akoshelev akoshelev deleted the parallel-join-truly-parallel branch January 29, 2024 17:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants