-
Notifications
You must be signed in to change notification settings - Fork 508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ser_bridge: Opposite of par_bridge #858
Comments
I think I would prefer a name like |
@nikomatsakis Thanks for the review. I am of course open to a different name. I would propose
Could the implementation use the Rayon thread pool? I'm really uncomfortable with each |
Hmm, I admit I hadn't read the code that closely. I'm thinking a bit more about this now. I had expected to use a channel for this, yes, but I'm wondering if there isn't a kind of hazard where using this on a Rayon worker thread would compose poorly and create deadlock hazards. (As far as naming, I would definitely not want to implement |
it would be cleaner to have blocks handling algorithms in rayon. it needs some discussions though. |
See also #210. I would like to solve this issue, but I'm really wary of how this will compose. It's not too hard to write something that will work with specific assumptions, but we need to be generally robust with what we add to Rayon's API. Even |
@cuviper I agree that we should be very careful not to introduce deadlocks. This is why the proof-of-concept implementation unconditionally spawns a new thread, which guarantees deadlock-free operation at the cost of, well, unconditionally spawning a new thread. I would hope that a better implementation is possible without significant changes to Rayon's internals, but it's far from obvious (to me) how to write one. But I'm willing to invest time into it, so if you have some pointers where to look at, they'd be appreciated! Thanks for the pointer to #210, I missed that issue previously. The discussion there is mostly about whether the implementation should preserve order. I think it shouldn't because an order-preserving parallel-to-serial bridge is both harder to implement and requires performance tradeoffs (if it can even be made to avoid both unbounded allocation and possibility of deadlock). I think the proof-of-concept implementation in this issue is better than the brief one given in a comment there because this one terminates iteration when the sequential iterator is dropped. This issue is technically a duplicate of #210. If you want, we can keep this one open and make it specifically about the implementation strategy for the non-order-preserving sequential iterator, keeping the semantics pegged to that of the proof-of-concept (and fixing the name). On the other hand, if you'd prefer to keep the design possibilities open, then this issue should be closed as duplicate of #210. |
Well...
This leaves a deadlock hazard that the serial side must not call into rayon in any other way. If the entire threadpool is blocked on the channel, waiting for the
Rust can't generally rely on drop for safety, which is why
It's not obvious to me either, or I probably would have attempted it already. 😉 |
Good point, I didn't think of that. Perhaps we could avoid it by replacing the blocking let par_iter = par_iterable.into_par_iter();
let (tx, rx) = crossbeam_channel::bounded(0);
std::thread::spawn(move || {
let _ = par_iter.try_for_each(|mut item| {
loop {
match tx.send(item) {
Err(TrySendError::Full(returned_item)) => item = returned_item,
other => return other, // success or disconnect
}
rayon::yield(); // do other Rayon business here
}
});
}); Something like
That's clear, though the linked article is mostly about memory safety. Here by "correctly" I meant the code handles an explicit drop by actually noticing when the channel is dropped and terminating the iteration by returning an error from If the serial iterator is neither dropped nor exhausted, our options are somewhat limited. It would be nice to be able to apply backpressure in a clean way, but barring that, the option of calling Do you think the above approach could work? In that case it might actually be ok to offload the work using |
It doesn't, but there's a note in the It's not a total panacea though. If there's not other work available, then it becomes a busy-loop burning CPU. Or else we could try some sleep heuristic, since we don't have a precise way to wake up between both external (crossbeam) events and internal work-available events. I also fear that work-stealing might end up quickly stealing from the same set jobs of this iterator, getting them all recursively stacked on one thread instead of spreading out on all threads. That's also that hazard if we yielded in
Oh, both of those unfortunately require Compromise upon compromise -- but I'm glad to have you exploring this! |
True, and such an issue might be lurking behind #795 in I can't tell how big an issue recursive stealing is.
It never occurred to me that it might not be acceptable to require |
Requiring |
That makes sense. We already have the callback-oriented interface of sorts, Maybe this option should be offered separately, after issues with the ordinary iterator (requiring |
My inclination right now is not to support this-- I suspect that gathering into a |
@nikomatsakis The trouble is some iterators produce a huge number of items that doesn't fit into available memory, and that you have to process as they are being generated. Of course, batching results into intermediate |
Can you |
I don't think so. In one case we |
…tor. This relies on a proposed serial bridge as per rayon-rs/rayon#858. As that project is licensed the same as Ruffle itself and the submitter intended it to be included in Rayon, I believe it's legal to copy this code.
I can concur with @hniksic's assertions here. I'm currently using Rayon in a PR to Ruffle's scanner binary (which you may know about because GitHub decided my commit message should get tagged in here three times). The proposed serial bridge impl (which I copied into my PR, assuming that's OK) makes it very easy to report partial results in case, say, the scan is cancelled halfway through; or something irrecoverably panics the host process. I eventually implemented multiprocess scanning to deal with unrecoverable panics, so that alone isn't as much of a concern, but being able to terminate the scan and still get partial data out of it is still useful. Batching everything up into a My untested opinion about performance/thread-scalability, at least in my use case, is that the internal channel lock this uses will probably be less contended for than, say, if I had However, if this use case is considered too niche, it might make more sense to package this code up as a library crate. |
…tor. This relies on a proposed serial bridge as per rayon-rs/rayon#858. As that project is licensed the same as Ruffle itself and the submitter intended it to be included in Rayon, I believe it's legal to copy this code.
Similar use case. I currently need to process 400-500 GB of blockchain data. All the file reading and decoding can be done in parallel, but certain steps of computation must follow chronological order. So, if I can do something like: (0...end)
.into_par_iter()
.map(|h| read_block_number(h)) // <- parallel
.into_sync_ser_iter()
.map(|block| do_something(block) // <~ sequential Or even something like (chained): (0...end)
.into_par_iter()
.map(|h| read_block_number(h)) // <- parallel
.into_sync_ser_iter()
.map(|block| do_something(block) // <~ sequential
.into_par_iter()
.map(|h| read_block_number(h)) // <- parallel
.into_sync_ser_iter()
.map(|block| do_something(block) // <~ sequential
.into_par_iter()
.map(|h| read_block_number(h)) // <- parallel
.into_sync_ser_iter()
.map(|block| do_something(block) // <~ sequential I somehow implemented a tiny crate for my own usage: https://github.com/Congyuwang/Synced-Parallel-Iterator. I assign each worker a synced channel to buffer their output, So, when I consume this iterator, I can consult this I somehow cannot get rid of a Mutex lock of guarding the upstream iterator
|
I described various solutions to this problem here: #1070 |
Here is my solution: #1071 |
@safinaskar FWIW I don't think #1071 resolves this. This issue is about funneling a parallel iterator into a serial one, the inverse of |
Now I think solution is https://crates.io/crates/pariter ( https://dpc.pw/adding-parallelism-to-your-rust-iterators ) |
In summary, I would like to propose adding
ser_bridge()
, a bridge that convertsParallelIterator
into an ordinaryIterator
, sort of the opposite ofpar_bridge()
. In this issue I present a toy implementation and would like to inquire about the feasibility of providing a production-ready one.Rayon already provides
par_bridge()
, an extension toIterator
that converts anyIterator
into aParallelIterator
. In most cases you can then proceed to transform the parallel iterator, ending with a consuming operation likefold()
,sum()
orfor_each()
. But in some cases you want to funnel the values produced by a parallel iterator back into an ordinary iterator, e.g. in order to feed them to a single-threaded resource. This is expressed in this StackOverflow question (which also proposes the nameser_bridge()
), but also in my own usage of Rayon.The goal would be to enable writing code like this:
Note that
sink.write()
requires mutable, and therefore single-threaded, access. Although one could imagine this particular case being solved by putting the sink in anArc<Mutex>
or a channel (the latter being the resolution of the SO question), in many cases it is much more convenient to obtain an iterator. If nothing else, it allows us to pass the result of the processing to a function that expects an iterator, such as those from theitertools
crate.If this is considered useful, I would like to submit this toy implementation for review:
This implementation has the following nice properties:
SerBridgeImpl
will also drop the receiver and causeParallelIterator::try_for_each
to return an error, terminating the parallel iteration.The downside, and the reason I consider it a toy implementation, is that it creates a whole new thread for the bridging.
My questions are:
crossbeam_channel
?The text was updated successfully, but these errors were encountered: