diff --git a/README.rst b/README.rst index 2b74bae51..59488d2f5 100644 --- a/README.rst +++ b/README.rst @@ -13,7 +13,7 @@ pysparkling endpoint. You need the same pre-processing pipeline for a single document per API call. This does not have to be done in parallel, but there should be only a small overhead in initialization and preferably no - dependency on the JVM. This is where ``pysparkling`` shines. + dependency on the JVM. This is what ``pysparkling`` is for. .. image:: https://travis-ci.org/svenkreiss/pysparkling.png?branch=master :target: https://travis-ci.org/svenkreiss/pysparkling @@ -67,23 +67,32 @@ API Context ------- -* ``__init__(pool=None)``: takes a pool object (an object that has a ``map()`` - method, e.g. a multiprocessing.Pool) to parallelize all ``map()`` and - ``foreach()`` methods. +* ``__init__(pool=None, serializer=None, deserializer=None, data_serializer=None, data_deserializer=None)``: + takes a pool object + (an object that has a ``map()`` method, e.g. a multiprocessing.Pool) to + parallelize methods. To support functions and lambda functions, specify custom + serializers and deserializers, + e.g. ``serializer=dill.dumps, deserializer=dill.loads``. +* ``broadcast(var)``: returns an instance of ``Broadcast()`` and it's values + are accessed with ``value``. +* ``newRddId()``: incrementing number * ``textFile(filename)``: load every line of a text file into a RDD. ``filename`` can contain a comma separated list of many files, ``?`` and ``*`` wildcards, file paths on S3 (``s3n://bucket_name/filename.txt``) and local file paths (``relative/path/my_text.txt``, ``/absolut/path/my_text.txt`` or ``file:///absolute/file/path.txt``). If the filename points to a folder containing ``part*`` files, those are resolved. -* ``broadcast(var)``: returns an instance of ``Broadcast()`` and it's values - are accessed with ``value``. +* ``version``: the version of pysparkling RDD --- +* ``aggregate(zeroValue, seqOp, combOp)``: aggregate value in partition with + seqOp and combine with combOp +* ``aggregateByKey(zeroValue, seqFunc, combFunc)``: aggregate by key * ``cache()``: execute previous steps and cache result +* ``cartesian(other)``: cartesian product * ``coalesce()``: do nothing * ``collect()``: return the underlying list * ``count()``: get length of internal list @@ -100,6 +109,8 @@ RDD * ``foldByKey(zeroValue, op)``: aggregate elements by key * ``foreach(func)``: apply func to every element in place * ``foreachPartition(func)``: same as ``foreach()`` +* ``getNumPartitions()``: number of partitions +* ``getPartitions()``: returns an iterator over the partitions * ``groupBy(func)``: group by the output of func * ``groupByKey()``: group by key where the RDD is of type [(key, value), ...] * ``histogram(buckets)``: buckets can be a list or an int @@ -112,6 +123,7 @@ RDD * ``leftOuterJoin(other)``: left outer join * ``lookup(key)``: return list of values for this key * ``map(func)``: apply func to every element and return a new RDD +* ``mapPartitions(func)``: apply f to entire partitions * ``mapValues(func)``: apply func to value in (key, value) pairs and return a new RDD * ``max()``: get the maximum element * ``mean()``: mean @@ -138,7 +150,13 @@ Broadcast Changelog ========= -* `master `_ +* `master `_ +* `0.2.0 `_ (2015-05-17) + * proper handling of partitions + * custom serializers, deserializers (for functions and data separately) + * more tests for parallelization options + * execution of distributed jobs is such that a chain of ``map()`` operations gets executed on workers without sending intermediate results back to the master + * a few more methods for RDDs implemented * `0.1.1 `_ (2015-05-12) * implemented a few more RDD methods * changed handling of context in RDD diff --git a/pysparkling/__init__.py b/pysparkling/__init__.py index 122cf6e95..c487b3cb9 100644 --- a/pysparkling/__init__.py +++ b/pysparkling/__init__.py @@ -1,6 +1,6 @@ """pytld module.""" -__version__ = '0.1.1' +__version__ = '0.2.0' from .context import Context from .rdd import RDD diff --git a/pysparkling/context.py b/pysparkling/context.py index ee63e6a4d..f1aadf8f2 100644 --- a/pysparkling/context.py +++ b/pysparkling/context.py @@ -1,28 +1,100 @@ -"""Imitates SparkContext.""" +"""Context.""" + +from __future__ import division, print_function import boto import glob +import math import fnmatch +import logging +import functools from .rdd import RDD from .broadcast import Broadcast from .utils import Tokenizer +from .partition import Partition +from .task_context import TaskContext +from . import __version__ as PYSPARKLING_VERSION + +log = logging.getLogger(__name__) + + +def unit_fn(arg): + """Used as dummy serializer and deserializer.""" + return arg + + +def runJob_map(i): + deserializer, data_serializer, data_deserializer, serialized, serialized_data = i + func, rdd = deserializer(serialized) + partition = data_deserializer(serialized_data) + log.debug('Worker function {0} is about to get executed with {1}' + ''.format(func, partition)) + + task_context = TaskContext(stage_id=0, partition_id=partition.index) + return data_serializer(func(task_context, rdd.compute(partition, task_context))) class Context(object): - def __init__(self, pool=None): + def __init__(self, pool=None, serializer=None, deserializer=None, + data_serializer=None, data_deserializer=None): if not pool: pool = DummyPool() + if not serializer: + serializer = unit_fn + if not deserializer: + deserializer = unit_fn + if not data_serializer: + data_serializer = unit_fn + if not data_deserializer: + data_deserializer = unit_fn self._pool = pool + self._serializer = serializer + self._deserializer = deserializer + self._data_serializer = data_serializer + self._data_deserializer = data_deserializer self._s3_conn = None + self._last_rdd_id = 0 - def parallelize(self, x, numPartitions=None): - return RDD(x, self) + self.version = PYSPARKLING_VERSION def broadcast(self, x): return Broadcast(x) + def newRddId(self): + self._last_rdd_id += 1 + return self._last_rdd_id + + def parallelize(self, x, numPartitions=None): + if not numPartitions: + return RDD([Partition(x, 0)], self) + + stride_size = int(math.ceil(len(x)/numPartitions)) + def partitioned(): + for i in range(numPartitions): + yield Partition(x[i*stride_size:(i+1)*stride_size], i) + + return RDD(partitioned(), self) + + def runJob(self, rdd, func, partitions=None, allowLocal=False, resultHandler=None): + """func is of the form func(TaskContext, Iterator over elements)""" + # TODO: this is the place to insert proper schedulers + map_result = (self._data_deserializer(d) for d in self._pool.map(runJob_map, [ + (self._deserializer, + self._data_serializer, + self._data_deserializer, + self._serializer((func, rdd)), + self._data_serializer(p), + ) + for p in rdd.partitions() + ])) + log.info('Map jobs generated.') + + if resultHandler is not None: + return resultHandler(map_result) + return map_result + def textFile(self, filename): lines = [] for f_name in self._resolve_filenames(filename): @@ -47,6 +119,11 @@ def textFile(self, filename): rdd._name = filename return rdd + def union(self, rdds): + return self.parallelize( + (x for rdd in rdds for x in rdd.collect()) + ) + def _get_s3_conn(self): if not self._s3_conn: self._s3_conn = boto.connect_s3() @@ -85,3 +162,4 @@ def __init__(self): def map(self, f, input_list): return (f(x) for x in input_list) + diff --git a/pysparkling/partition.py b/pysparkling/partition.py new file mode 100644 index 000000000..9f095276e --- /dev/null +++ b/pysparkling/partition.py @@ -0,0 +1,20 @@ +import itertools + + +class Partition(object): + def __init__(self, x, idx=None): + self.index = idx + self._x = x + + def x(self): + self._x, r = itertools.tee(self._x) + return r + + def hashCode(self): + return self.index + + def __getstate__(self): + return { + 'index': self.index, + '_x': list(self.x()) + } diff --git a/pysparkling/rdd.py b/pysparkling/rdd.py index 551f0da47..876cbd3bc 100644 --- a/pysparkling/rdd.py +++ b/pysparkling/rdd.py @@ -1,88 +1,161 @@ """RDD implementation.""" -from __future__ import division, absolute_import +from __future__ import division, absolute_import, print_function +import os import random import functools import itertools import subprocess +from collections import defaultdict -from .utils import Tokenizer +from . import utils class RDD(object): """methods starting with underscore are not in the Spark interface""" - def __init__(self, x, ctx): - self._x = x + def __init__(self, partitions, ctx): + self._p = partitions self.context = ctx self._name = None - def x(self): - self._x, r = itertools.tee(self._x, 2) + def __getstate__(self): + r = dict((k, v) for k, v in self.__dict__.items()) + r['_p'] = list(self.partitions()) + r['context'] = None return r - def _flatten(self): - self._x = (xx for x in self.x() for xx in x) - return self + def compute(self, split, task_context): + """split is a partition. This function is used in derived RDD + classes. To add smarter behavior for specific cases.""" + return split.x() - def _flattenValues(self): - self._x = ((e[0], v) for e in self.x() for v in e[1]) - return self + def partitions(self): + self._p, r = itertools.tee(self._p, 2) + return r + + """ + + Public API + ---------- + """ + + def aggregate(self, zeroValue, seqOp, combOp): + """[distributed]""" + return self.context.runJob( + self, + lambda tc, i: functools.reduce(seqOp, i, zeroValue), + resultHandler=lambda l: functools.reduce(combOp, l, zeroValue) + ) + + def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): + def seqFuncByKey(tc, i): + r = defaultdict(zeroValue) + for k, v in i: + r[k] = seqFunc(r[k], v) + return r + def combFuncByKey(l): + r = defaultdict(zeroValue) + for p in l: + for k, v in p.items(): + r[k] = combFunc(r[k], v) + return r + return self.context.runJob(self, seqFuncByKey, + resultHandler=combFuncByKey) def cache(self): # This cache is not lazy, but it will guarantee that previous # steps are only executed once. - self._x = list(self._x) + for p in self.partitions(): + p._x = list(p.x()) return self - def coalesce(self): - return self + def cartesian(self, other): + v1 = self.collect() + v2 = self.collect() + return self.context.parallelize([(a, b) for a in v1 for b in v2]) + + def coalesce(self, numPartitions, shuffle=False): + return self.context.parallelize(self.collect(), numPartitions) def collect(self): - return list(self.x()) + """[distributed]""" + return self.context.runJob(self, lambda tc, i: list(i), + resultHandler=lambda l: [x for p in l for x in p]) def count(self): - return sum(1 for _ in self.x()) + """[distributed]""" + return self.context.runJob(self, lambda tc, i: sum(1 for _ in i), + resultHandler=sum) def countApprox(self): return self.count() def countByKey(self): - keys = set(k for k, v in self.x()) - return dict((k, sum(v for kk, v in self.x() if kk == k)) for k in keys) + """[distributed]""" + def map_func(tc, x): + r = defaultdict(int) + for k, v in x: + r[k] += v + return r + return self.context.runJob(self, map_func, + resultHandler=utils.sum_counts_by_keys) def countByValue(self): - as_list = list(self.x()) - keys = set(as_list) - return dict((k, as_list.count(k)) for k in keys) + """[distributed]""" + def map_func(tc, x): + r = defaultdict(int) + for v in x: + r[v] += 1 + return r + return self.context.runJob(self, map_func, + resultHandler=utils.sum_counts_by_keys) def distinct(self, numPartitions=None): - return RDD(list(set(self.x())), self.context) + return self.context.parallelize(list(set(self.collect())), numPartitions) def filter(self, f): - return RDD((x for x in self.x() if f(x)), self.context) + """[distributed]""" + def map_func(tc, i, x): + return (xx for xx in x if f(xx)) + return MapPartitionsRDD(self, map_func, preservesPartitioning=True) def first(self): - return next(self.x()) + """[distributed]""" + return self.context.runJob( + self, + lambda tc, i: next(i) if tc.partition_id == 0 else None, + resultHandler=lambda l: next(l), + ) def flatMap(self, f, preservesPartitioning=False): - return self.map(f)._flatten() + """[distributed]""" + return MapPartitionsRDD( + self, + lambda tc, i, x: (e for xx in x for e in f(xx)), + preservesPartitioning=True, + ) def flatMapValues(self, f): - return self.mapValues(f)._flattenValues() + """[distributed]""" + return MapPartitionsRDD( + self, + lambda tc, i, x: ((xx[0], e) for xx in x for e in f(xx[1])), + preservesPartitioning=True, + ) def fold(self, zeroValue, op): - return functools.reduce(op, self.x(), zeroValue) + return functools.reduce(op, self.collect(), zeroValue) def foldByKey(self, zeroValue, op): - keys = set(k for k, v in self.x()) + keys = set(k for k, v in self.collect()) return dict( ( k, functools.reduce( op, - (e[1] for e in self.x() if e[0] == k), + (e[1] for e in self.collect() if e[0] == k), zeroValue ) ) @@ -90,29 +163,34 @@ def foldByKey(self, zeroValue, op): ) def foreach(self, f): - self._x = self.context._pool.map(f, self.x()) - return self + self.context.runJob(self, lambda tc, x: (f(xx) for xx in x), + resultHandler=None) def foreachPartition(self, f): - self.foreach(f) - return self + self.context.runJob(self, lambda tc, x: f(x), + resultHandler=None) - def groupBy(self, f): - as_list = list(self.x()) - f_applied = list(self.context._pool.map(f, as_list)) - keys = set(f_applied) - return RDD([ - (k, [vv for kk, vv in zip(f_applied, as_list) if kk == k]) - for k in keys - ], self.context) + def getNumPartitions(self): + return sum(1 for _ in self.partitions()) - def groupByKey(self): - as_list = list(self.x()) - keys = set([e[0] for e in as_list]) - return RDD([ - (k, [e[1] for e in as_list if e[0] == k]) - for k in keys - ], self.context) + def getPartitions(self): + return self.partitions() + + def groupBy(self, f, numPartitions=None): + return self.context.parallelize(( + (k, [gg[1] for gg in g]) for k, g in itertools.groupby( + sorted(self.keyBy(f).collect()), + lambda e: e[0], + ) + ), numPartitions) + + def groupByKey(self, numPartitions=None): + return self.context.parallelize(( + (k, [gg[1] for gg in g]) for k, g in itertools.groupby( + sorted(self.collect()), + lambda e: e[0], + ) + ), numPartitions) def histogram(self, buckets): if isinstance(buckets, int): @@ -122,7 +200,7 @@ def histogram(self, buckets): buckets = [min_v + float(i)*(max_v-min_v)/num_buckets for i in range(num_buckets+1)] h = [0 for _ in buckets] - for x in self.x(): + for x in self.collect(): for i, b in enumerate(zip(buckets[:-1], buckets[1:])): if x >= b[0] and x < b[1]: h[i] += 1 @@ -136,54 +214,97 @@ def id(self): return None def intersection(self, other): - return RDD(list(set(self.collect()) & set(other.collect())), - self.context) + return self.context.parallelize( + list(set(self.collect()) & set(other.collect())) + ) def isCheckpointed(self): return False def join(self, other, numPartitions=None): - d1 = dict(self.x()) - d2 = dict(other.x()) + d1 = dict(self.collect()) + d2 = dict(other.collect()) keys = set(d1.keys()) & set(d2.keys()) - return RDD(((k, (d1[k], d2[k])) for k in keys), self.context) + return self.context.parallelize(( + (k, (d1[k], d2[k])) + for k in keys + ), numPartitions) def keyBy(self, f): - return RDD(((f(e), e) for e in self.x()), self.context) + return self.map(lambda e: (f(e), e)) def keys(self): - return RDD((e[0] for e in self.x()), self.context) + return self.map(lambda e: e[0]) - def leftOuterJoin(self, other): - d1 = dict(self.x()) - d2 = dict(other.x()) - return RDD(((k, (d1[k], d2[k] if k in d2 else None)) - for k in d1.keys()), self.context) + def leftOuterJoin(self, other, numPartitions=None): + d1 = dict(self.collect()) + d2 = dict(other.collect()) + return self.context.parallelize(( + (k, (d1[k], d2[k] if k in d2 else None)) + for k in d1.keys() + ), numPartitions) def lookup(self, key): - return [e[1] for e in self.x() if e[0] == key] + """[distributed]""" + return self.context.runJob( + self, + lambda tc, x: (xx[1] for xx in x if xx[0] == key), + resultHandler=lambda l: [e for ll in l for e in ll], + ) def map(self, f): - return RDD(self.context._pool.map(f, self.x()), self.context) + """[distributed]""" + return MapPartitionsRDD( + self, + lambda tc, i, x: (f(xx) for xx in x), + preservesPartitioning=True, + ) + + def mapPartitions(self, f, preservesPartitioning=False): + """[distributed]""" + return MapPartitionsRDD( + self, + lambda tc, i, x: f(x), + preservesPartitioning=True, + ) def mapValues(self, f): - return RDD(zip( - (e[0] for e in self.x()), - self.context._pool.map(f, (e[1] for e in self.x())) - ), self.context) + """[distributed]""" + return MapPartitionsRDD( + self, + lambda tc, i, x: ((e[0], f(e[1])) for e in x), + preservesPartitioning=True, + ) def max(self): - return max(self.x()) + """[distributed]""" + return self.context.runJob( + self, + lambda tc, x: max(x), + resultHandler=max, + ) def mean(self): - summed, length = (0.0, 0) - for x in self.x(): - summed += x - length += 1 - return summed / length + """[distributed]""" + def map_func(tc, x): + summed, length = (0.0, 0) + for xx in x: + summed += xx + length += 1 + return (summed, length) + def reduce_func(l): + summed, length = zip(*l) + return sum(summed)/sum(length) + return self.context.runJob(self, map_func, + resultHandler=reduce_func) def min(self): - return min(self.x()) + """[distributed]""" + return self.context.runJob( + self, + lambda tc, x: min(x), + resultHandler=min, + ) def name(self): return self._name @@ -192,52 +313,95 @@ def persist(self, storageLevel=None): return self.cache() def pipe(self, command, env={}): - return RDD((subprocess.check_output( + return self.context.parallelize(subprocess.check_output( [command]+x if isinstance(x, list) else [command, x] - ) for x in self.x()), self.context) + ) for x in self.collect()) def reduce(self, f): - return functools.reduce(f, self.x()) + """[distributed] f must be a commutative and associative binary operator""" + return self.context.runJob( + self, + lambda tc, x: functools.reduce(f, x), + resultHandler=lambda x: functools.reduce(f, x), + ) def reduceByKey(self, f): return self.groupByKey().mapValues(lambda x: functools.reduce(f, x)) - def rightOuterJoin(self, other): - d1 = dict(self.x()) - d2 = dict(other.x()) - return RDD(((k, (d1[k] if k in d1 else None, d2[k])) - for k in d2.keys()), self.context) + def rightOuterJoin(self, other, numPartitions=None): + d1 = dict(self.collect()) + d2 = dict(other.collect()) + return self.context.parallelize(( + (k, (d1[k] if k in d1 else None, d2[k])) + for k in d2.keys() + ), numPartitions) def saveAsTextFile(self, path, compressionCodecClass=None): - if path.startswith('s3://') or path.startswith('s3n://'): - t = Tokenizer(path) - t.next('//') # skip scheme - bucket_name = t.next('/') - key_name = t.next() - conn = self.context._get_s3_conn() - bucket = conn.get_bucket(bucket_name, validate=False) - key = bucket.new_key(key_name) - key.set_contents_from_string('\n'.join(str(x) for x in self.x())) - else: - path_local = path - if path_local.startswith('file://'): - path_local = path_local[7:] - with open(path_local, 'w') as f: - for x in self.x(): - f.write(str(x)) - f.write('\n') + def write_file(this_path, iter_content): + if path.startswith('s3://') or path.startswith('s3n://'): + t = utils.Tokenizer(this_path) + t.next('//') # skip scheme + bucket_name = t.next('/') + key_name = t.next() + conn = self.context._get_s3_conn() + bucket = conn.get_bucket(bucket_name, validate=False) + key = bucket.new_key(key_name) + key.set_contents_from_string(str(x)+'\n' for x in iter_content) + else: + path_local = this_path + if path_local.startswith('file://'): + path_local = path_local[7:] + print('creating dir {0}/'.format(path)) + os.system('mkdir -p '+path+'/') + print('writing file {0}/'.format(path_local)) + with open(path_local, 'w') as f: + for x in iter_content: + f.write(str(x)) + f.write('\n') + self.context.runJob( + self, + lambda tc, x: write_file(path+'/part-{0:05d}'.format(tc.partitionId()), x), + resultHandler=lambda l: write_file(path+'/_SUCCESS', list(l)), + ) return self def subtract(self, other, numPartitions=None): - list_other = list(other.x()) - return RDD((x for x in self.x() if x not in list_other), self.context) + """[distributed]""" + list_other = other.collect() + return MapPartitionsRDD( + self, + lambda tc, i, x: (e for e in x if e not in list_other), + preservesPartitioning=True, + ) def sum(self): - return sum(self.x()) + """[distributed]""" + return self.context.runJob(self, lambda tc, x: sum(x), resultHandler=sum) def take(self, n): - i = self.x() - return [next(i) for _ in range(n)] + return self.collect()[:n] def takeSample(self, n): - return random.sample(list(self.x()), n) + return random.sample(self.collect(), n) + + +class MapPartitionsRDD(RDD): + def __init__(self, prev, f, preservesPartitioning=False): + """prev is the previous RDD. + + f is a function with the signature + (task_context, partition index, iterator over elements). + """ + RDD.__init__(self, prev.partitions(), prev.context) + + self.prev = prev + self.f = f + self.preservesPartitioning = preservesPartitioning + + def compute(self, split, task_context): + return self.f(task_context, split.index, self.prev.compute(split, task_context._create_child())) + + def partitions(self): + return self.prev.partitions() + + diff --git a/pysparkling/task_context.py b/pysparkling/task_context.py new file mode 100644 index 000000000..72c513835 --- /dev/null +++ b/pysparkling/task_context.py @@ -0,0 +1,29 @@ +import logging + +log = logging.getLogger(__name__) + + +class TaskContext(object): + def __init__(self, stage_id=0, partition_id=0): + log.info('Running stage {0} for partition {1}' + ''.format(stage_id, partition_id)) + + self.stage_id = stage_id + self.partition_id = partition_id + self.attempt_number = 0 + self.is_completed = False + self.is_running_locally = True + self.task_completion_listeners = [] + + def _create_child(self): + return TaskContext(stage_id=self.stage_id+1, + partition_id=self.partition_id) + + def attemptNumber(self): + return self.attempt_number + + def partitionId(self): + return self.partition_id + + def stageId(self): + return self.stage_id diff --git a/pysparkling/utils.py b/pysparkling/utils.py index cfc1e2ddd..7468abc94 100644 --- a/pysparkling/utils.py +++ b/pysparkling/utils.py @@ -1,3 +1,5 @@ +from collections import defaultdict + class Tokenizer(object): def __init__(self, expression): @@ -24,3 +26,11 @@ def next(self, separator=None): value = self.expression[:sep_pos] self.expression = self.expression[sep_pos+len(separator):] return value + + +def sum_counts_by_keys(list_of_pairlists): + r = defaultdict(int) # calling int results in a zero + for l in list_of_pairlists: + for key, count in l.items(): + r[key] += count + return r diff --git a/setup.py b/setup.py index 177c40981..ad98e3ae2 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ tests_require=[ 'nose>=1.3.4', 'futures>=3.0.1', + 'dill>=0.2.2', ], test_suite='nose.collector', diff --git a/tests/test_context_unit.py b/tests/test_context_unit.py new file mode 100644 index 000000000..c2abc398d --- /dev/null +++ b/tests/test_context_unit.py @@ -0,0 +1,32 @@ +from pysparkling import Context + + +def test_broadcast(): + b = Context().broadcast([1, 2, 3]) + assert b.value[0] == 1 + + +def test_parallelize_single_element(): + my_rdd = Context().parallelize([7], 100) + assert my_rdd.collect()[0] == 7 + + +def test_parallelize_matched_elements(): + my_rdd = Context().parallelize([1, 2, 3, 4, 5], 5) + assert my_rdd.collect()[2] == 3 and len(my_rdd.collect()) == 5 + + +def test_union(): + sc = Context() + rdd1 = sc.parallelize(['Hello']) + rdd2 = sc.parallelize(['World']) + union = sc.union([rdd1, rdd2]).collect() + assert len(union) == 2 and 'Hello' in union and 'World' in union + + +def test_version(): + assert isinstance(Context().version, str) + + +if __name__ == '__main__': + test_union() diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index 18231fec1..b53a3b62f 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -1,21 +1,23 @@ +import dill import math -import pysparkling +import logging import multiprocessing from concurrent import futures +from pysparkling import Context def test_multiprocessing(): p = multiprocessing.Pool(4) - my_rdd = pysparkling.Context(pool=p).parallelize([1, 3, 4]) - r = my_rdd.foreach(math.sqrt).collect() + my_rdd = Context(pool=p, serializer=dill.dumps, deserializer=dill.loads).parallelize([1, 3, 4]) + r = my_rdd.map(lambda x: x*x).collect() print(r) - assert 2 in r + assert 16 in r def test_concurrent(): - with futures.ProcessPoolExecutor(4) as p: - my_rdd = pysparkling.Context(pool=p).parallelize([1, 3, 4]) - r = my_rdd.foreach(math.sqrt).collect() + with futures.ThreadPoolExecutor(4) as p: + my_rdd = Context(pool=p).parallelize([1, 3, 4]) + r = my_rdd.map(math.sqrt).collect() print(r) assert 2 in r @@ -30,7 +32,7 @@ def indent_line(l): def test_lazy_execution(): - r = pysparkling.Context().textFile('tests/test_simple.py') + r = Context().textFile('tests/test_multiprocessing.py') r = r.map(indent_line) r.foreach(indent_line) exec_before_collect = INDENT_WAS_EXECUTED @@ -42,17 +44,18 @@ def test_lazy_execution(): def test_lazy_execution_threadpool(): with futures.ThreadPoolExecutor(4) as p: - r = pysparkling.Context(pool=p).textFile('tests/test_simple.py') + r = Context(pool=p).textFile('tests/test_multiprocessing.py') + r = r.map(indent_line) r = r.map(indent_line) - r.foreach(indent_line) r = r.collect() # ThreadPool is not lazy although it returns generators. print(r) - assert '--- --- import pysparkling' in r + assert '--- --- from pysparkling import Context' in r if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) # test_multiprocessing() # test_concurrent() - test_lazy_execution() - # test_lazy_execution_threadpool() + # test_lazy_execution() + test_lazy_execution_threadpool() diff --git a/tests/test_rdd_unit.py b/tests/test_rdd_unit.py new file mode 100644 index 000000000..93183a919 --- /dev/null +++ b/tests/test_rdd_unit.py @@ -0,0 +1,242 @@ +from __future__ import print_function + +import logging +import tempfile +from pysparkling import Context + + +def test_aggregate(): + seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) + combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) + r = Context().parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) + assert r[0] == 10 and r[1] == 4 + + +def test_aggregateByKey(): + seqOp = (lambda x, y: x + y) + combOp = (lambda x, y: x + y) + r = Context().parallelize([('a', 1), ('b', 2), ('a', 3), ('c', 4)]).aggregateByKey(int, seqOp, combOp) + assert r['a'] == 4 and r['b'] == 2 + +def test_cartesian(): + rdd = Context().parallelize([1, 2]) + r = sorted(rdd.cartesian(rdd).collect()) + print(r) + assert r[0][0] == 1 and r[2][0] == 2 and len(r) == 4 and len(r[0]) == 2 + +def test_coalesce(): + my_rdd = Context().parallelize([1, 2, 3], 2).coalesce(1) + assert my_rdd.getNumPartitions() == 1 + + +def test_collect(): + my_rdd = Context().parallelize([1, 2, 3]) + assert my_rdd.collect()[0] == 1 + + +def test_count(): + my_rdd = Context().parallelize([1, 2, 3]) + assert my_rdd.count() == 3 + + +def test_count_partitions(): + my_rdd = Context().parallelize([1, 2, 3], 2) + print(my_rdd.collect()) + my_rdd.foreach(print) + assert my_rdd.count() == 3 + + +def test_countByKey(): + my_rdd = Context().parallelize([('a', 1), ('b', 2), ('b', 2)]) + assert my_rdd.countByKey()['b'] == 4 + + +def test_countByValue(): + my_rdd = Context().parallelize([1, 2, 2, 4, 1]) + assert my_rdd.countByValue()[2] == 2 + + +def test_distinct(): + my_rdd = Context().parallelize([1, 2, 2, 4, 1]).distinct() + assert my_rdd.count() == 3 + + +def test_filter(): + my_rdd = Context().parallelize([1, 2, 2, 4, 1, 3, 5, 9], 3).filter(lambda x: x % 2 == 0) + print(my_rdd.collect()) + print(my_rdd.count()) + assert my_rdd.count() == 3 + + +def test_first(): + my_rdd = Context().parallelize([1, 2, 2, 4, 1, 3, 5, 9], 3) + print(my_rdd.first()) + assert my_rdd.first() == 1 + + +def test_flatMap(): + my_rdd = Context().parallelize([ + ('hello', 'world') + ]) + mapped = my_rdd.flatMap(lambda x: [['a']+list(x)]).collect() + assert mapped[0][0] == 'a' + + +def test_flatMapValues(): + my_rdd = Context().parallelize([ + ('message', ('hello', 'world')) + ]) + mapped = my_rdd.flatMapValues(lambda x: ['a']+list(x)).collect() + assert mapped[0][1][0] == 'a' + + +def test_fold(): + my_rdd = Context().parallelize([4, 7, 2]) + folded = my_rdd.fold(0, lambda a, b: a+b) + assert folded == 13 + + +def test_foldByKey(): + my_rdd = Context().parallelize([('a', 4), ('b', 7), ('a', 2)]) + folded = my_rdd.foldByKey(0, lambda a, b: a+b) + assert folded['a'] == 6 + + +def test_foreach(): + my_rdd = Context().parallelize([1, 2, 3]) + my_rdd.foreach(lambda x: x+1) + assert my_rdd.collect()[0] == 1 + + +def test_groupBy(): + my_rdd = Context().parallelize([4, 7, 2]) + grouped = my_rdd.groupBy(lambda x: x % 2).collect() + print(grouped) + assert grouped[0][1][0] == 2 + + +def test_histogram(): + my_rdd = Context().parallelize([0, 4, 7, 4, 10]) + b, h = my_rdd.histogram(10) + assert h[4] == 2 + + +def test_intersection(): + rdd1 = Context().parallelize([0, 4, 7, 4, 10]) + rdd2 = Context().parallelize([3, 4, 7, 4, 5]) + i = rdd1.intersection(rdd2) + assert i.collect()[0] == 4 + + +def test_join(): + rdd1 = Context().parallelize([(0, 1), (1, 1)]) + rdd2 = Context().parallelize([(2, 1), (1, 3)]) + j = rdd1.join(rdd2) + assert dict(j.collect())[1][1] == 3 + + +def test_keyBy(): + rdd = Context().parallelize([0, 4, 7, 4, 10]) + rdd = rdd.keyBy(lambda x: x % 2) + assert rdd.collect()[2][0] == 1 # the third element (7) is odd + + +def test_keys(): + rdd = Context().parallelize([(0, 1), (1, 1)]).keys() + assert rdd.collect()[0] == 0 + + +def test_leftOuterJoin(): + rdd1 = Context().parallelize([(0, 1), (1, 1)]) + rdd2 = Context().parallelize([(2, 1), (1, 3)]) + j = rdd1.leftOuterJoin(rdd2) + assert dict(j.collect())[1][1] == 3 + + +def test_lookup(): + rdd = Context().parallelize([(0, 1), (1, 1), (1, 3)]) + print(rdd.lookup(1)) + assert 3 in rdd.lookup(1) + + +def test_map(): + my_rdd = Context().parallelize([1, 2, 3]).map(lambda x: x+1) + assert my_rdd.collect()[0] == 2 + + +def test_mapPartitions(): + rdd = Context().parallelize([1, 2, 3, 4], 2) + def f(iterator): yield sum(iterator) + r = rdd.mapPartitions(f).collect() + assert 3 in r and 7 in r + + +def test_max(): + rdd = Context().parallelize([1, 2, 3, 4, 3, 2], 2) + assert rdd.max() == 4 + + +def test_mean(): + rdd = Context().parallelize([0, 4, 7, 4, 10]) + assert rdd.mean() == 5 + + +def test_pipe(): + rdd = Context().parallelize(['0', 'hello', 'world']) + piped = rdd.pipe('echo').collect() + print(piped) + assert b'hello\n' in piped + + +def test_reduce(): + rdd = Context().parallelize([0, 4, 7, 4, 10]) + assert rdd.reduce(lambda a, b: a+b) == 25 + + +def test_reduceByKey(): + rdd = Context().parallelize([(0, 1), (1, 1), (1, 3)]) + assert dict(rdd.reduceByKey(lambda a, b: a+b).collect())[1] == 4 + + +def test_rightOuterJoin(): + rdd1 = Context().parallelize([(0, 1), (1, 1)]) + rdd2 = Context().parallelize([(2, 1), (1, 3)]) + j = rdd1.rightOuterJoin(rdd2) + assert dict(j.collect())[1][1] == 3 + + +def test_saveAsTextFile(): + tempFile = tempfile.NamedTemporaryFile(delete=True) + tempFile.close() + Context().parallelize(range(10)).saveAsTextFile(tempFile.name) + with open(tempFile.name+'/part-00000', 'r') as f: + r = f.readlines() + print(r) + assert '5\n' in r + + +def test_subtract(): + rdd1 = Context().parallelize([(0, 1), (1, 1)]) + rdd2 = Context().parallelize([(1, 1), (1, 3)]) + subtracted = rdd1.subtract(rdd2).collect() + assert (0, 1) in subtracted and (1, 1) not in subtracted + + +def test_sum(): + rdd = Context().parallelize([0, 4, 7, 4, 10]) + assert rdd.sum() == 25 + + +def test_take(): + my_rdd = Context().parallelize([4, 7, 2]) + assert my_rdd.take(2)[1] == 7 + + +def test_takeSample(): + my_rdd = Context().parallelize([4, 7, 2]) + assert my_rdd.takeSample(1)[0] in [4, 7, 2] + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + test_filter() diff --git a/tests/test_resolve_filenames.py b/tests/test_resolve_filenames.py index c090788de..36ffd3869 100644 --- a/tests/test_resolve_filenames.py +++ b/tests/test_resolve_filenames.py @@ -5,14 +5,14 @@ def test_local_1(): filenames = pysparkling.Context()._resolve_filenames( 'tests/*' ) - assert 'tests/test_simple.py' in filenames + assert 'tests/test_resolve_filenames.py' in filenames def test_local_2(): filenames = pysparkling.Context()._resolve_filenames( - 'tests/test_simple.py' + 'tests/test_resolve_filenames.py' ) - assert 'tests/test_simple.py' == filenames[0] and len(filenames) == 1 + assert 'tests/test_resolve_filenames.py' == filenames[0] and len(filenames) == 1 if __name__ == '__main__': diff --git a/tests/test_simple.py b/tests/test_simple.py deleted file mode 100644 index 68e559303..000000000 --- a/tests/test_simple.py +++ /dev/null @@ -1,167 +0,0 @@ -import tempfile -import pysparkling - - -def test_collect(): - my_rdd = pysparkling.Context().parallelize([1, 2, 3]) - assert my_rdd.collect()[0] == 1 - - -def test_broadcast(): - b = pysparkling.Context().broadcast([1, 2, 3]) - assert b.value[0] == 1 - - -def test_map(): - my_rdd = pysparkling.Context().parallelize([1, 2, 3]).map(lambda x: x+1) - assert my_rdd.collect()[0] == 2 - - -def test_foreach(): - my_rdd = pysparkling.Context().parallelize([1, 2, 3]) - my_rdd.foreach(lambda x: x+1) - assert my_rdd.collect()[0] == 2 - - -def test_countByValue(): - my_rdd = pysparkling.Context().parallelize([1, 2, 2, 4, 1]) - assert my_rdd.countByValue()[2] == 2 - - -def test_countByKey(): - my_rdd = pysparkling.Context().parallelize([('a', 1), ('b', 2), ('b', 2)]) - assert my_rdd.countByKey()['b'] == 4 - - -def test_flatMapValues(): - my_rdd = pysparkling.Context().parallelize([ - ('message', ('hello', 'world')) - ]) - mapped = my_rdd.flatMapValues(lambda x: ['a']+list(x)).collect() - assert mapped[0][1][0] == 'a' - - -def test_fold(): - my_rdd = pysparkling.Context().parallelize([4, 7, 2]) - folded = my_rdd.fold(0, lambda a, b: a+b) - assert folded == 13 - - -def test_foldByKey(): - my_rdd = pysparkling.Context().parallelize([('a', 4), ('b', 7), ('a', 2)]) - folded = my_rdd.foldByKey(0, lambda a, b: a+b) - assert folded['a'] == 6 - - -def test_groupBy(): - my_rdd = pysparkling.Context().parallelize([4, 7, 2]) - grouped = my_rdd.groupBy(lambda x: x % 2).collect() - assert grouped[0][1][1] == 2 - - -def test_take(): - my_rdd = pysparkling.Context().parallelize([4, 7, 2]) - assert my_rdd.take(2)[1] == 7 - - -def test_takeSample(): - my_rdd = pysparkling.Context().parallelize([4, 7, 2]) - assert my_rdd.takeSample(1)[0] in [4, 7, 2] - - -def test_histogram(): - my_rdd = pysparkling.Context().parallelize([0, 4, 7, 4, 10]) - b, h = my_rdd.histogram(10) - assert h[4] == 2 - - -def test_intersection(): - rdd1 = pysparkling.Context().parallelize([0, 4, 7, 4, 10]) - rdd2 = pysparkling.Context().parallelize([3, 4, 7, 4, 5]) - i = rdd1.intersection(rdd2) - assert i.collect()[0] == 4 - - -def test_join(): - rdd1 = pysparkling.Context().parallelize([(0, 1), (1, 1)]) - rdd2 = pysparkling.Context().parallelize([(2, 1), (1, 3)]) - j = rdd1.join(rdd2) - assert dict(j.collect())[1][1] == 3 - - -def test_keyBy(): - rdd = pysparkling.Context().parallelize([0, 4, 7, 4, 10]) - rdd = rdd.keyBy(lambda x: x % 2) - assert rdd.collect()[2][0] == 1 # the third element (7) is odd - - -def test_keys(): - rdd = pysparkling.Context().parallelize([(0, 1), (1, 1)]).keys() - assert rdd.collect()[0] == 0 - - -def test_leftOuterJoin(): - rdd1 = pysparkling.Context().parallelize([(0, 1), (1, 1)]) - rdd2 = pysparkling.Context().parallelize([(2, 1), (1, 3)]) - j = rdd1.leftOuterJoin(rdd2) - assert dict(j.collect())[1][1] == 3 - - -def test_lookup(): - rdd = pysparkling.Context().parallelize([(0, 1), (1, 1), (1, 3)]) - assert 3 in rdd.lookup(1) - - -def test_mean(): - rdd = pysparkling.Context().parallelize([0, 4, 7, 4, 10]) - assert rdd.mean() == 5 - - -def test_pipe(): - rdd = pysparkling.Context().parallelize(['0', 'hello', 'world']) - piped = rdd.pipe('echo').collect() - print(piped) - assert b'hello\n' in piped - - -def test_reduce(): - rdd = pysparkling.Context().parallelize([0, 4, 7, 4, 10]) - assert rdd.reduce(lambda a, b: a+b) == 25 - - -def test_reduceByKey(): - rdd = pysparkling.Context().parallelize([(0, 1), (1, 1), (1, 3)]) - assert dict(rdd.reduceByKey(lambda a, b: a+b).collect())[1] == 4 - - -def test_rightOuterJoin(): - rdd1 = pysparkling.Context().parallelize([(0, 1), (1, 1)]) - rdd2 = pysparkling.Context().parallelize([(2, 1), (1, 3)]) - j = rdd1.rightOuterJoin(rdd2) - assert dict(j.collect())[1][1] == 3 - - -def test_saveAsTextFile(): - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - pysparkling.Context().parallelize(range(10)).saveAsTextFile(tempFile.name) - with open(tempFile.name, 'r') as f: - r = f.readlines() - print(r) - assert '5\n' in r - - -def test_subtract(): - rdd1 = pysparkling.Context().parallelize([(0, 1), (1, 1)]) - rdd2 = pysparkling.Context().parallelize([(1, 1), (1, 3)]) - subtracted = rdd1.subtract(rdd2).collect() - assert (0, 1) in subtracted and (1, 1) not in subtracted - - -def test_sum(): - rdd = pysparkling.Context().parallelize([0, 4, 7, 4, 10]) - assert rdd.sum() == 25 - - -if __name__ == '__main__': - test_saveAsTextFile()