Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving collect to a better place in OPRF IPA #835

Merged
merged 3 commits into from
Nov 8, 2023
Merged

Conversation

benjaminsavage
Copy link
Collaborator

@benjaminsavage benjaminsavage commented Nov 6, 2023

I updated the code to filter out all the users with just a single row of data, and never even emit them to the outbound stream, in an effort to reduce the number of futures being created.

I also moved the "collect" to after the users are "chunked" by OPRF, to just sort them by number of rows descending.

This is a hacky workaround that prevents the infra stall that's caused when a batch of 1024 rows fails to generate 1024 multiplications at depth + 1.

Running IPA for 100000 records took 238.103110663s

@benjaminsavage benjaminsavage changed the title debugging stall in streaming OPRF ipa Moving collect to a better place in OPRF IPA Nov 7, 2023
let collected_per_user_results = stream_of_per_user_circuits.collect::<Vec<_>>().await;
let per_user_attribution_outputs = sh_ctx.parallel_join(collected_per_user_results).await?;
let flattenned_stream = per_user_attribution_outputs.into_iter().flatten();
let flattenned_stream = seq_join(sh_ctx.active_work(), stream_of_per_user_circuits)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let flattenned_stream = seq_join(sh_ctx.active_work(), stream_of_per_user_circuits)
let flattenned_stream = sh_ctx.try_join(stream_of_per_user_circuits)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_join is a sequential join?

@@ -435,8 +440,11 @@ where
let first_row = first_row.unwrap();
let rows_chunked_by_user = chunk_rows_by_user(input_stream, first_row);

let mut collected = rows_chunked_by_user.collect::<Vec<_>>().await;
collected.sort_by(|a, b| std::cmp::Ord::cmp(&b.len(), &a.len()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sort may be a bottleneck in the future - we can't kick off processing until we receive the very last PRF shard. We should probably start thinking about how things will look like with multiple shards - I would assume that some sort of consistent hashing is required to map PRF pseudonyms to shards, meaning that each shard will have to wait until the very last event is sent to it and mapper indicated that no more events will be send.

In this model, the approach proposed here works, but we will have to keep all impressions and conversions in memory while receiving them from the mapper, i.e. no streaming

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is seriously sub-optimal. I think we should land this code, but re-evaluate this once we have the shuffling and sharding in place to see how we can deal with it.

@benjaminsavage benjaminsavage merged commit 4dd9554 into main Nov 8, 2023
7 of 10 checks passed
@benjaminsavage benjaminsavage deleted the debugging_stall branch November 8, 2023 02:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants