diff --git a/pysparkling/rdd.py b/pysparkling/rdd.py index 453627e8d..c0c926921 100644 --- a/pysparkling/rdd.py +++ b/pysparkling/rdd.py @@ -255,8 +255,62 @@ def coalesce(self, numPartitions, shuffle=False): >>> from pysparkling import Context >>> Context().parallelize([1, 2, 3], 2).coalesce(1).getNumPartitions() 1 + >>> from pysparkling import Context + >>> Context().parallelize([1, 2, 3], 2).coalesce(4).getNumPartitions() + 2 + >>> from pysparkling import Context + >>> rdd = Context().parallelize([1, 2, 3, 4, 5, 6, 7, 8], 5) + >>> rdd.foreachPartition(lambda x: print(list(x))) + [1] + [2, 3] + [4] + [5, 6] + [7, 8] + >>> rdd.coalesce(4).foreachPartition(lambda x: print(list(x))) + [1, 2, 3] + [4] + [5, 6] + [7, 8] + >>> rdd.coalesce(4).coalesce(3).foreachPartition(lambda x: print(list(x))) + [1, 2, 3, 4] + [5, 6] + [7, 8] + >>> rdd.coalesce(3).foreachPartition(lambda x: print(list(x))) + [1, 2, 3] + [4, 5, 6] + [7, 8] """ - return self.context.parallelize(self.toLocalIterator(), numPartitions) + if shuffle: + return self.context.parallelize(self.toLocalIterator(), numPartitions) + + current_num_partitions = self.getNumPartitions() + new_num_partitions = min(numPartitions, current_num_partitions) + + # Group partitions that will be coalesced together + # Note: as new_num_partitions may not divide current_num_partitions + # some groups are bigger (contains more partitions) than others + small_group_size = current_num_partitions // new_num_partitions + big_group_size = small_group_size + 1 + + number_of_big_groups = current_num_partitions % new_num_partitions + number_of_small_groups = new_num_partitions - number_of_big_groups + + def slice_partition_content(partitions, start, end): + return itertools.chain(*(p.x() for p in partitions[start:end])) + + def partitioned(): + start = 0 + for _ in range(number_of_big_groups): + end = start + big_group_size + yield slice_partition_content(self._p, start, end) + start = end + for _ in range(number_of_small_groups): + end = start + small_group_size + yield slice_partition_content(self._p, start, end) + start = end + + # noinspection PyProtectedMember + return self.context._parallelize_partitions(partitioned()) def cogroup(self, other, numPartitions=None): """Groups keys from both RDDs together. Values are nested iterators. @@ -1098,8 +1152,17 @@ def repartition(self, numPartitions): .. note:: Creating the new RDD is currently implemented as a local operation. + + Example: + + >>> from pysparkling import Context + >>> Context().parallelize([1, 2, 3], 2).repartition(1).getNumPartitions() + 1 + >>> from pysparkling import Context + >>> Context().parallelize([1, 2, 3], 2).repartition(4).getNumPartitions() + 4 """ - return self.context.parallelize(self.toLocalIterator(), numPartitions) + return self.coalesce(numPartitions, shuffle=True) def repartitionAndSortWithinPartitions( self, numPartitions=None, partitionFunc=None,