-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Moving collect to a better place in OPRF IPA #835
Conversation
a96a1d9
to
50ef10e
Compare
let collected_per_user_results = stream_of_per_user_circuits.collect::<Vec<_>>().await; | ||
let per_user_attribution_outputs = sh_ctx.parallel_join(collected_per_user_results).await?; | ||
let flattenned_stream = per_user_attribution_outputs.into_iter().flatten(); | ||
let flattenned_stream = seq_join(sh_ctx.active_work(), stream_of_per_user_circuits) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let flattenned_stream = seq_join(sh_ctx.active_work(), stream_of_per_user_circuits) | |
let flattenned_stream = sh_ctx.try_join(stream_of_per_user_circuits) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try_join
is a sequential join?
@@ -435,8 +440,11 @@ where | |||
let first_row = first_row.unwrap(); | |||
let rows_chunked_by_user = chunk_rows_by_user(input_stream, first_row); | |||
|
|||
let mut collected = rows_chunked_by_user.collect::<Vec<_>>().await; | |||
collected.sort_by(|a, b| std::cmp::Ord::cmp(&b.len(), &a.len())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sort may be a bottleneck in the future - we can't kick off processing until we receive the very last PRF shard. We should probably start thinking about how things will look like with multiple shards - I would assume that some sort of consistent hashing is required to map PRF pseudonyms to shards, meaning that each shard will have to wait until the very last event is sent to it and mapper indicated that no more events will be send.
In this model, the approach proposed here works, but we will have to keep all impressions and conversions in memory while receiving them from the mapper, i.e. no streaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is seriously sub-optimal. I think we should land this code, but re-evaluate this once we have the shuffling and sharding in place to see how we can deal with it.
I updated the code to filter out all the users with just a single row of data, and never even emit them to the outbound stream, in an effort to reduce the number of futures being created.
I also moved the "collect" to after the users are "chunked" by OPRF, to just sort them by number of rows descending.
This is a hacky workaround that prevents the infra stall that's caused when a batch of 1024 rows fails to generate 1024 multiplications at depth + 1.