[python] Balance distributed partition size more evenly #1135
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #1119
Generate the partitions by splitting on obs_joinids instead of on soma chunks.
Previously, when the number of soma chunks was not evenly divisible by the partition count ("world size"), partition sizes could become drastically imbalanced.
The new algo for shuffled, partitioned ids is:
When performing partitioning without shuffling, the soma chunks from step 1 may end up being split across two partitions. This introduces a minor read performance hit, since each original soma chunk that is split across partitions will result in 2 read operations instead of 1.
When performing partitioning with shuffling, each soma chunk from step 1 may end up being split across two soma chunks in step 5. This is because the latter chunks will not be "aligned" with the former chunks when the partition sizes are not evenly divisible by the chunk size. This introduces a 2x read performance hit in the worst case. Consider that each partition-local chunk may be composed of 2 original chunks that are not stored contiguously on disk. This could be addressed by explicitly aligning the step 5 chunks with the step 1 chunks, but this has not been implemented in the current PR.
Now, the worst case imbalance is that any two partitions will only differ in size by 1 row. This is due to the fact the numpy.array_split() evenly distributes the imbalances across the splits that it produces. Since only imbalances of size 1 can occur, it should not be necessary to drop rows or pad rows in any partition.
New unit test cases are included in
test_distributed__returns_data_partition_for_rank
, which now exercises the imbalanced cases as well. I've also addedtest_distributed__returns_data_partition_for_rank_globally_shuffled
to demonstrate that global shuffling is maintained.