Skip to content

Commit

Permalink
Merge pull request #1 from svenkreiss/v0.2
Browse files Browse the repository at this point in the history
V0.2
  • Loading branch information
svenkreiss committed May 18, 2015
2 parents dd0e633 + 2d10665 commit 56d594a
Show file tree
Hide file tree
Showing 13 changed files with 731 additions and 301 deletions.
32 changes: 25 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pysparkling
endpoint. You need the same pre-processing pipeline for a single
document per API call. This does not have to be done in parallel, but there
should be only a small overhead in initialization and preferably no
dependency on the JVM. This is where ``pysparkling`` shines.
dependency on the JVM. This is what ``pysparkling`` is for.

.. image:: https://travis-ci.org/svenkreiss/pysparkling.png?branch=master
:target: https://travis-ci.org/svenkreiss/pysparkling
Expand Down Expand Up @@ -67,23 +67,32 @@ API
Context
-------

* ``__init__(pool=None)``: takes a pool object (an object that has a ``map()``
method, e.g. a multiprocessing.Pool) to parallelize all ``map()`` and
``foreach()`` methods.
* ``__init__(pool=None, serializer=None, deserializer=None, data_serializer=None, data_deserializer=None)``:
takes a pool object
(an object that has a ``map()`` method, e.g. a multiprocessing.Pool) to
parallelize methods. To support functions and lambda functions, specify custom
serializers and deserializers,
e.g. ``serializer=dill.dumps, deserializer=dill.loads``.
* ``broadcast(var)``: returns an instance of ``Broadcast()`` and it's values
are accessed with ``value``.
* ``newRddId()``: incrementing number
* ``textFile(filename)``: load every line of a text file into a RDD.
``filename`` can contain a comma separated list of many files, ``?`` and
``*`` wildcards, file paths on S3 (``s3n://bucket_name/filename.txt``) and
local file paths (``relative/path/my_text.txt``, ``/absolut/path/my_text.txt``
or ``file:///absolute/file/path.txt``). If the filename points to a folder
containing ``part*`` files, those are resolved.
* ``broadcast(var)``: returns an instance of ``Broadcast()`` and it's values
are accessed with ``value``.
* ``version``: the version of pysparkling


RDD
---

* ``aggregate(zeroValue, seqOp, combOp)``: aggregate value in partition with
seqOp and combine with combOp
* ``aggregateByKey(zeroValue, seqFunc, combFunc)``: aggregate by key
* ``cache()``: execute previous steps and cache result
* ``cartesian(other)``: cartesian product
* ``coalesce()``: do nothing
* ``collect()``: return the underlying list
* ``count()``: get length of internal list
Expand All @@ -100,6 +109,8 @@ RDD
* ``foldByKey(zeroValue, op)``: aggregate elements by key
* ``foreach(func)``: apply func to every element in place
* ``foreachPartition(func)``: same as ``foreach()``
* ``getNumPartitions()``: number of partitions
* ``getPartitions()``: returns an iterator over the partitions
* ``groupBy(func)``: group by the output of func
* ``groupByKey()``: group by key where the RDD is of type [(key, value), ...]
* ``histogram(buckets)``: buckets can be a list or an int
Expand All @@ -112,6 +123,7 @@ RDD
* ``leftOuterJoin(other)``: left outer join
* ``lookup(key)``: return list of values for this key
* ``map(func)``: apply func to every element and return a new RDD
* ``mapPartitions(func)``: apply f to entire partitions
* ``mapValues(func)``: apply func to value in (key, value) pairs and return a new RDD
* ``max()``: get the maximum element
* ``mean()``: mean
Expand All @@ -138,7 +150,13 @@ Broadcast
Changelog
=========

* `master <https://github.com/svenkreiss/pysparkling/compare/v0.1.1...master>`_
* `master <https://github.com/svenkreiss/pysparkling/compare/v0.2.0...master>`_
* `0.2.0 <https://github.com/svenkreiss/pysparkling/compare/v0.1.1...v0.2.0>`_ (2015-05-17)
* proper handling of partitions
* custom serializers, deserializers (for functions and data separately)
* more tests for parallelization options
* execution of distributed jobs is such that a chain of ``map()`` operations gets executed on workers without sending intermediate results back to the master
* a few more methods for RDDs implemented
* `0.1.1 <https://github.com/svenkreiss/pysparkling/compare/v0.1.0...v0.1.1>`_ (2015-05-12)
* implemented a few more RDD methods
* changed handling of context in RDD
Expand Down
2 changes: 1 addition & 1 deletion pysparkling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""pytld module."""

__version__ = '0.1.1'
__version__ = '0.2.0'

from .context import Context
from .rdd import RDD
Expand Down
86 changes: 82 additions & 4 deletions pysparkling/context.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,100 @@
"""Imitates SparkContext."""
"""Context."""

from __future__ import division, print_function

import boto
import glob
import math
import fnmatch
import logging
import functools

from .rdd import RDD
from .broadcast import Broadcast
from .utils import Tokenizer
from .partition import Partition
from .task_context import TaskContext
from . import __version__ as PYSPARKLING_VERSION

log = logging.getLogger(__name__)


def unit_fn(arg):
"""Used as dummy serializer and deserializer."""
return arg


def runJob_map(i):
deserializer, data_serializer, data_deserializer, serialized, serialized_data = i
func, rdd = deserializer(serialized)
partition = data_deserializer(serialized_data)
log.debug('Worker function {0} is about to get executed with {1}'
''.format(func, partition))

task_context = TaskContext(stage_id=0, partition_id=partition.index)
return data_serializer(func(task_context, rdd.compute(partition, task_context)))


class Context(object):
def __init__(self, pool=None):
def __init__(self, pool=None, serializer=None, deserializer=None,
data_serializer=None, data_deserializer=None):
if not pool:
pool = DummyPool()
if not serializer:
serializer = unit_fn
if not deserializer:
deserializer = unit_fn
if not data_serializer:
data_serializer = unit_fn
if not data_deserializer:
data_deserializer = unit_fn

self._pool = pool
self._serializer = serializer
self._deserializer = deserializer
self._data_serializer = data_serializer
self._data_deserializer = data_deserializer
self._s3_conn = None
self._last_rdd_id = 0

def parallelize(self, x, numPartitions=None):
return RDD(x, self)
self.version = PYSPARKLING_VERSION

def broadcast(self, x):
return Broadcast(x)

def newRddId(self):
self._last_rdd_id += 1
return self._last_rdd_id

def parallelize(self, x, numPartitions=None):
if not numPartitions:
return RDD([Partition(x, 0)], self)

stride_size = int(math.ceil(len(x)/numPartitions))
def partitioned():
for i in range(numPartitions):
yield Partition(x[i*stride_size:(i+1)*stride_size], i)

return RDD(partitioned(), self)

def runJob(self, rdd, func, partitions=None, allowLocal=False, resultHandler=None):
"""func is of the form func(TaskContext, Iterator over elements)"""
# TODO: this is the place to insert proper schedulers
map_result = (self._data_deserializer(d) for d in self._pool.map(runJob_map, [
(self._deserializer,
self._data_serializer,
self._data_deserializer,
self._serializer((func, rdd)),
self._data_serializer(p),
)
for p in rdd.partitions()
]))
log.info('Map jobs generated.')

if resultHandler is not None:
return resultHandler(map_result)
return map_result

def textFile(self, filename):
lines = []
for f_name in self._resolve_filenames(filename):
Expand All @@ -47,6 +119,11 @@ def textFile(self, filename):
rdd._name = filename
return rdd

def union(self, rdds):
return self.parallelize(
(x for rdd in rdds for x in rdd.collect())
)

def _get_s3_conn(self):
if not self._s3_conn:
self._s3_conn = boto.connect_s3()
Expand Down Expand Up @@ -85,3 +162,4 @@ def __init__(self):

def map(self, f, input_list):
return (f(x) for x in input_list)

20 changes: 20 additions & 0 deletions pysparkling/partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import itertools


class Partition(object):
def __init__(self, x, idx=None):
self.index = idx
self._x = x

def x(self):
self._x, r = itertools.tee(self._x)
return r

def hashCode(self):
return self.index

def __getstate__(self):
return {
'index': self.index,
'_x': list(self.x())
}
Loading

0 comments on commit 56d594a

Please sign in to comment.