Skip to content

Commit

Permalink
Merge branch 'release/v0.1.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
singulared committed Feb 3, 2017
2 parents 6c1066b + 43c16d0 commit 02814dd
Show file tree
Hide file tree
Showing 11 changed files with 513 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ before_install:
python:
- '3.5.0'
- '3.5.1'
- '3.5.2'
# - '3.6.0'

install:
- python setup.py develop
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 0.1.4 (2017-02-03)

Add:

- Initial MapReduce support
- initial Secondary Indexes 2i
- Add hostlist initial support

## 0.1.3 (2016-10-28)

Update:
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Operations timeout No
Security No
Riak Search No
MapReduce No
Tested python versions `3.5.0, 3.5.1 <travis_>`_
Tested python versions `3.5.0, 3.5.1, 3.5.2 <travis_>`_
Tested Riak versions `2.1.3, 2.1.4 <travis_>`_
================================ ==============================

Expand Down
8 changes: 6 additions & 2 deletions aioriak/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
__version__ = '0.0.1'

from .client import RiakClient # noqa
from .riak_object import RiakObject # noqa
from .client import RiakClient
from .riak_object import RiakObject
from .mapreduce import RiakMapReduce


__all__ = ('RiakClient', 'RiakObject', 'RiakMapReduce')
13 changes: 13 additions & 0 deletions aioriak/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@ async def delete(self, key, **kwargs):
'''
return await (await self.new(key)).delete(**kwargs)

async def get_index(self, index, startkey, endkey=None,
return_terms=None, max_results=None,
continuation=None, timeout=None, term_regex=None):
"""
Queries a secondary index over objects in this bucket,
returning keys or index/key pairs.
See :meth:`RiakClient.get_index()
<aioriak.client.RiakClient.get_index>` for more details.
"""
return await self._client.get_index(
self, index, startkey, endkey, return_terms, max_results,
continuation, timeout, term_regex)

def __repr__(self):
if self.bucket_type.is_default():
return '<Bucket {}>'.format(self.name)
Expand Down
88 changes: 86 additions & 2 deletions aioriak/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import json
import random
from weakref import WeakValueDictionary
from .transport import create_transport
from .bucket import BucketType, Bucket
Expand Down Expand Up @@ -56,7 +57,10 @@ class RiakClient:
or by using the methods on related objects.
'''
def __init__(self, host='localhost', port=8087, loop=None):
self._host = host
if isinstance(host, (list, tuple, set)):
self._host = random.choice(host)
else:
self._host = host
self._port = port
self._loop = loop
self._bucket_types = WeakValueDictionary()
Expand Down Expand Up @@ -153,7 +157,7 @@ async def go():
loop.run_until_complete(go())
:param host: Hostname or ip address of Riak instance
:type host: str
:type host: str, list, tuple
:param port: Port of riak instance
:type port: int
:param loop: asyncio event loop
Expand Down Expand Up @@ -393,3 +397,83 @@ async def update_datatype(self, datatype, **params):
'''

return await self._transport.update_datatype(datatype, **params)

async def get_index(self, bucket, index, startkey, *args, **kwargs):
"""
Queries a secondary index, returning matching keys.
:param bucket: the bucket whose index will be queried
:type bucket: :class:`~aioriak.bucket.Bucket`
:param index: the index to query
:type index: str
:param startkey: the sole key to query, or beginning of the query range
:type startkey: str | int
:param endkey: the end of the query range (optional if equality)
:type endkey: str | int
:param return_terms: whether to include the secondary index value
:type return_terms: bool
:param max_results: the maximum number of results to return (page size)
:type max_results: int
:param continuation: the opaque continuation returned from a
previous paginated request
:type continuation: str
:param timeout: a timeout value in milliseconds, or 'infinity'
:type timeout: int | str
:param term_regex: a regular expression used to filter index terms
:type term_regex: str
:rtype: list of keys or list of pairs (index_value, key)
"""
return await self._transport.get_index(bucket, index, startkey, *args,
**kwargs)

async def mapred(self, inputs, query, timeout=None):
"""
Executes a MapReduce query.
Example::
client = await RiakClient.create()
result = await client.mapred(
{"bucket": ["bucket_type", "bucket"]},
[{"map": {
"language": "erlang",
"module": "mr_example",
"function": "get_keys"}])
:param inputs: the input list/structure
:type inputs: list[str] | dict
:param query: the list of query phases
:type query: list[dict]
:param timeout: the query timeout
:type timeout: int | None
:rtype: mixed
"""
return await self._transport.mapred(inputs, query, timeout)

async def stream_mapred(self, inputs, query, timeout=None):
"""
Streams a MapReduce query as (phase, data) pairs.
Returns async iterator.
Example::
client = await RiakClient.create()
stream = await client.mapred(
{"bucket": ["bucket_type", "bucket"]},
[{"map": {
"language": "erlang",
"module": "mr_example",
"function": "get_keys"}])
async for phase, result in stream:
print(phase, result)
:param inputs: the input list/structure
:type inputs: list[str] | dict
:param query: the list of query phases
:type query: list[dict]
:param timeout: the query timeout
:type timeout: integer, None
:rtype: iterator
"""

return await self._transport.stream_mapred(inputs, query, timeout)
70 changes: 70 additions & 0 deletions aioriak/mapreduce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from riak.mapreduce import RiakMapReduce as _RiakMapReduce, RiakLinkPhase


class RiakMapReduce(_RiakMapReduce):
def __init__(self, client):
super().__init__(client)

async def run(self, timeout=None):
"""
Submit the map/reduce operation to Riak. Non-blocking wait for result.
Returns a list of results,
or a list of links if the last phase is a link phase.
Example::
client = await RiakClient.create()
mr = RiakMapReduce(client)
mr.add_bucket_key_data('bucket', 'key')
mr.map(['mr_example', 'get_keys'])
result = await mr.run()
:param timeout: Timeout in milliseconds
:type timeout: integer, None
:rtype: list
"""
query, link_results_flag = self._normalize_query()

result = await self._client.mapred(self._inputs, query, timeout)

# If the last phase is NOT a link phase, then return the result.
if not (link_results_flag or
isinstance(self._phases[-1], RiakLinkPhase)):
return result

# If there are no results, then return an empty list.
if result is None:
return []

# Otherwise, if the last phase IS a link phase, then convert the
# results to link tuples.
acc = []
for r in result:
if len(r) == 2:
link = (r[0], r[1], None)
elif len(r) == 3:
link = (r[0], r[1], r[2])
else:
raise ValueError('Invalid format for Link phase result')
acc.append(link)

return acc

async def stream(self, timeout=None):
"""
Streams the MapReduce query (returns an async iterator).
Example::
client = await RiakClient.create()
mr = RiakMapReduce(client)
mr.add_bucket_key_data('bucket', 'key')
mr.map(['mr_example', 'get_keys'])
async for phase, result in (await mr.stream()):
print(phase, result)
:param timeout: Timeout in milliseconds
:type timeout: integer
:rtype: async iterator that yields (phase_num, data) tuples
"""
query, link_results_flag = self._normalize_query()

return await self._client.stream_mapred(self._inputs, query, timeout)
8 changes: 8 additions & 0 deletions aioriak/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@ async def go():
self.assertEqual(non_zero_client_id,
(await self.client.get_client_id()))
self.loop.run_until_complete(go())

def test_list_of_hosts(self):
async def go():
hosts = [self.client._host, self.client._host]
client = await self.async_create_client(hosts)
self.assertTrue((await client.ping()))
self.assertIn(client._host, hosts)
self.loop.run_until_complete(go())
Loading

0 comments on commit 02814dd

Please sign in to comment.