Skip to content

Commit

Permalink
Merge pull request #89 from tools4origins/fix/coalesceNumPartitions
Browse files Browse the repository at this point in the history
Fix numPartition in coalesce when requested one is above the current one
  • Loading branch information
svenkreiss authored Jul 13, 2019
2 parents 5ffcf9d + e4e7745 commit 3c4c6e4
Showing 1 changed file with 65 additions and 2 deletions.
67 changes: 65 additions & 2 deletions pysparkling/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,62 @@ def coalesce(self, numPartitions, shuffle=False):
>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).coalesce(1).getNumPartitions()
1
>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).coalesce(4).getNumPartitions()
2
>>> from pysparkling import Context
>>> rdd = Context().parallelize([1, 2, 3, 4, 5, 6, 7, 8], 5)
>>> rdd.foreachPartition(lambda x: print(list(x)))
[1]
[2, 3]
[4]
[5, 6]
[7, 8]
>>> rdd.coalesce(4).foreachPartition(lambda x: print(list(x)))
[1, 2, 3]
[4]
[5, 6]
[7, 8]
>>> rdd.coalesce(4).coalesce(3).foreachPartition(lambda x: print(list(x)))
[1, 2, 3, 4]
[5, 6]
[7, 8]
>>> rdd.coalesce(3).foreachPartition(lambda x: print(list(x)))
[1, 2, 3]
[4, 5, 6]
[7, 8]
"""
return self.context.parallelize(self.toLocalIterator(), numPartitions)
if shuffle:
return self.context.parallelize(self.toLocalIterator(), numPartitions)

current_num_partitions = self.getNumPartitions()
new_num_partitions = min(numPartitions, current_num_partitions)

# Group partitions that will be coalesced together
# Note: as new_num_partitions may not divide current_num_partitions
# some groups are bigger (contains more partitions) than others
small_group_size = current_num_partitions // new_num_partitions
big_group_size = small_group_size + 1

number_of_big_groups = current_num_partitions % new_num_partitions
number_of_small_groups = new_num_partitions - number_of_big_groups

def slice_partition_content(partitions, start, end):
return itertools.chain(*(p.x() for p in partitions[start:end]))

def partitioned():
start = 0
for _ in range(number_of_big_groups):
end = start + big_group_size
yield slice_partition_content(self._p, start, end)
start = end
for _ in range(number_of_small_groups):
end = start + small_group_size
yield slice_partition_content(self._p, start, end)
start = end

# noinspection PyProtectedMember
return self.context._parallelize_partitions(partitioned())

def cogroup(self, other, numPartitions=None):
"""Groups keys from both RDDs together. Values are nested iterators.
Expand Down Expand Up @@ -1098,8 +1152,17 @@ def repartition(self, numPartitions):
.. note::
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).repartition(1).getNumPartitions()
1
>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).repartition(4).getNumPartitions()
4
"""
return self.context.parallelize(self.toLocalIterator(), numPartitions)
return self.coalesce(numPartitions, shuffle=True)

def repartitionAndSortWithinPartitions(
self, numPartitions=None, partitionFunc=None,
Expand Down

0 comments on commit 3c4c6e4

Please sign in to comment.