pyxtension is a pure Python MIT-licensed library that includes Scala-like streams, Json with attribute access syntax, and other common-use stuff.
pip install pyxtension
or from Github:
git clone https://github.com/asuiu/pyxtension.git cd pyxtension python setup.py install
or
git submodule add https://github.com/asuiu/pyxtension.git
dict
subclass to represent a Json object. You should be able to
use thisdict
. While this is probably the
class youdict
underNever again will you have to write code like this:
body = {
'query': {
'filtered': {
'query': {
'match': {'description': 'addictive'}
},
'filter': {
'term': {'created_by': 'ASU'}
}
}
}
}
From now on, you may simply write the following three lines:
body = Json()
body.query.filtered.query.match.description = 'addictive'
body.query.filtered.filter.term.created_by = 'ASU'
stream
subclasses collections.Iterable
. It's the same Python
iterable, but with more added methods, suitable for multithreading and
multiprocess processings.Never again will you have to write code like this:
> lst = xrange(1,6)
> reduce(lambda x, y: x * y, map(lambda _: _ * _, filter(lambda _: _ % 2 == 0, lst)))
64
From now on, you may simply write the following lines:
> the_stream = stream( xrange(1,6) )
> the_stream.\
filter(lambda _: _ % 2 == 0).\
map(lambda _: _ * _).\
reduce(lambda x, y: x * y)
64
A Word Count Map-Reduce naive example using multiprocessing map
corpus = [
"MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster.",
"At Google, MapReduce was used to completely regenerate Google's index of the World Wide Web",
"Conceptually similar approaches have been very well known since 1995 with the Message Passing Interface standard having reduce and scatter operations."]
def reduceMaps(m1, m2):
for k, v in m2.iteritems():
m1[k] = m1.get(k, 0) + v
return m1
word_counts = stream(corpus).\
mpmap(lambda line: stream(line.lower().split(' ')).countByValue()).\
reduce(reduceMaps)
Identic with builtin map
but returns a stream
Parallel ordered map using multiprocessing.Pool.imap()
.
It can replace the map
when need to split computations to multiple
cores, and order of results matters.
It spawns at most poolSize
processes and applies the f
function.
The elements in the result stream appears in the same order they appear in the initial iterable.
:type f: (T) -> V :rtype: `stream`
Parallel ordered map using multiprocessing.Pool.imap_unordered()
.
It can replace the map
when the ordered of results doesn't matter.
It spawns at most poolSize
processes and applies the f
function.
The elements in the result stream appears in the unpredicted order.
:type f: (T) -> V :rtype: `stream`
map
when the ordered of results doesn't matter.It spawns at most poolSize
threads and applies the f
function.
The elements in the result stream appears in the unpredicted order.
Because of CPython GIL it's most usefull for I/O or CPU intensive consuming native functions, or on Jython or IronPython interpreters.
type f: |
|
---|---|
rtype: |
|
**flatMap(predicate=_IDENTITY_FUNC)** :param predicate: is a function that will receive elements of self collection and return an iterable ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
By default predicate is an identity function
type predicate: | (V)-> collections.Iterable[T] |
---|---|
return: | will return stream of objects of the same type of elements from the stream returned by predicate() |
Example:
stream([[1, 2], [3, 4], [4, 5]]).flatMap().toList() == [1, 2, 3, 4, 4, 5]
identic with builtin filter, but returns stream
returns reversed stream
Tests whether a predicate holds for some of the elements of this sequence.
rtype: | bool |
---|
Example:
stream([1, 2, 3]).exists(0) -> False
stream([1, 2, 3]).exists(1) -> True
**keyBy(keyfunc = _IDENTITY_FUNC)** Transforms stream of values to a stream of tuples (key, value) ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
param keyfunc: | function to map values to keys |
---|---|
type keyfunc: |
|
return: | stream of Key, Value pairs |
rtype: | stream[( T, V )] |
Example:
stream([1, 2, 3, 4]).keyBy(lambda _:_ % 2) -> [(1, 1), (0, 2), (1, 3), (0, 4)]
groupBy([keyfunc]) -> Make an iterator that returns consecutive keys and groups from the iterable.
The iterable needs not to be sorted on the same key function, but the keyfunction need to return hasable objects.
param keyfunc: | [Optional] The key is a function computing a key value for each element. |
---|---|
type keyfunc: |
|
return: | (key, sub-iterator) grouped by each value of key(value). |
rtype: | stream[ ( V, slist[T] ) ] |
Example:
stream([1, 2, 3, 4]).groupBy(lambda _: _ % 2) -> [(0, [2, 4]), (1, [1, 3])]
Returns a collections.Counter of values
Example
stream(['a', 'b', 'a', 'b', 'c', 'd']).countByValue() == {'a': 2, 'b': 2, 'c': 1, 'd': 1}
Returns stream of distinct values. Values must be hashable.
stream(['a', 'b', 'a', 'b', 'c', 'd']).distinct() == {'a', 'b', 'c', 'd'}
same arguments with builtin reduce() function
returns sset() instance
returns slist() instance
returns sdict() instance
same arguments with builtin sorted()
returns length of stream. Use carefully on infinite streams.
Returns a string joined by f. Proivides same functionality as str.join() builtin method.
if f is basestring, uses it to join the stream, else f should be a callable that returns a string to be used for join
identic with join(f)
returns first n elements from stream
returns first element from stream
the same behavior with itertools.izip()
throttles to process at most max_req elements pe every 'interval' seconds.
**unique(predicate=_IDENTITY_FUNC)** Returns a stream of unique (according to predicate) elements appearing in the same order as in original stream ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
The items returned by predicate should be hashable and comparable.
calculates the Shannon entropy of the values from stream
Calculates the population standard deviation.
returns the arithmetical mean of the values
returns the sum of elements from stream
**min(key=_IDENTITY_FUNC)** same functionality with builtin min() funcion '''''''''''''''''''''''''''''''''''''''''''''
**min_default(default, key=_IDENTITY_FUNC)** same functionality with min() but returns :default: when called on empty streams ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
same functionality with builtin max()
**maxes(key=_IDENTITY_FUNC)** returns a stream of max values from stream ''''''''''''''''''''''''''''''''''''''''''
**mins(key=_IDENTITY_FUNC)** returns a stream of min values from stream ''''''''''''''''''''''''''''''''''''''''''
Inherits streams.stream
and built-in list
classes, and keeps in
memory a list allowing faster index access
Inherits streams.stream
and built-in set
classes, and keeps in
memory the whole set of values
Inherits streams.stream
and built-in dict
, and keeps in memory
the dict object.
Inherits streams.sdict
and adds functionality of
collections.defaultdict
from stdlib
Thread-safe time throttler that can be attached on a stream to limit the number of calls per time interval. Example:
> from pyxtension.throttler import Throttler
> throttler = Throttler(5, 10)
> stream(range(100)).map(throttler.throttle).map(print).to_list()
it will throttle the stream to max 5 calls per every 10 seconds.
Json is a module that provides mapping objects that allow their elements to be accessed both as keys and as attributes:
> from pyxtension.Json import Json
> a = Json({'foo': 'bar'})
> a.foo
'bar'
> a['foo']
'bar'
Attribute access makes it easy to create convenient, hierarchical settings objects:
with open('settings.yaml') as fileobj:
settings = Json(yaml.safe_load(fileobj))
cursor = connect(**settings.db.credentials).cursor()
cursor.execute("SELECT column FROM table;")
Json
, and JsonList
.dict
as it extends it an is a
mutable mapping that allow creating, accessing, and deleting key-value
pairs as attributes.JsonList
is similar to native list
as it extends it and offers
a way to transform the dict
objects from inside also in Json
instances.> Json('{"key1": "val1", "lst1": [1,2] }')
{u'key1': u'val1', u'lst1': [1, 2]}
> Json( ('key1','val1'), ('lst1', [1,2]) )
{'key1': 'val1', 'lst1': [1, 2]}
# keep in mind that you should provide at least two tuples with key-value pairs
> Json( [('key1','val1'), ('lst1', [1,2])] )
{'key1': 'val1', 'lst1': [1, 2]}
Json({'key1': 'val1', 'lst1': [1, 2]})
{'key1': 'val1', 'lst1': [1, 2]}
> json = Json({'key1': 'val1', 'lst1': [1, 2]})
> json.toOrig()
{'key1': 'val1', 'lst1': [1, 2]}
Any key can be used as an attribute as long as:
- The key represents a valid attribute (i.e., it is a string comprised only of alphanumeric characters and underscores that doesn't start with a number)
- The key does not shadow a class attribute (e.g., get).
Json
object. This allows you to
recursively> attr = Json({'foo': {'bar': 'baz'}})
> attr.foo.bar
'baz'
bytes
, str
,
or unicode
list
s, tuple
s) will automatically be converted to
tuple
s, with anyJson
:> attr = Json({'foo': [{'bar': 'baz'}, {'bar': 'qux'}]})
> for sub_attr in attr.foo:
> print(sub_attr.bar)
'baz'
'qux'
Json
object::> json = Json({1: {'two': 3}})
> json[1].two
3
JsonList
usage examples:
> json = Json('{"lst":[1,2,3]}')
> type(json.lst)
<class 'pyxtension.Json.JsonList'>
> json = Json('{"1":[1,2]}')
> json["1"][1]
2
Assignment as keys will still work:
.. code:: python
> json = Json({'foo': {'bar': 'baz'}}) > json['foo']['bar'] = 'baz' > json.foo {'bar': 'baz'}