Skip to content

Commit

Permalink
Merge branch 'release/0.1.8'
Browse files Browse the repository at this point in the history
  • Loading branch information
singulared committed May 9, 2017
2 parents ae4c761 + 3830035 commit 8b07ffe
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 12 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.1.8 (2017-05-09)

Add:

- Quorum and timeout arguments into base operations.

## 0.1.7 (2017-02-13)

Fix:
Expand Down
27 changes: 21 additions & 6 deletions aioriak/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,17 +375,32 @@ async def get(self, robj):

return await self._transport.get(robj)

async def put(self, robj, return_body):
async def put(self, robj, w=None, dw=None, pw=None, return_body=None,
if_none_match=None, timeout=None):
'''
Stores an object in the Riak cluster.
:param return_body: whether to return the resulting object
after the write
:type return_body: boolean
:param robj: the object to store
:type robj: RiakObject
'''
return await self._transport.put(robj, return_body=return_body)
:param w: the write quorum
:type w: integer, string, None
:param dw: the durable write quorum
:type dw: integer, string, None
:param pw: the primary write quorum
:type pw: integer, string, None
:param return_body: whether to return the resulting object
after the write
:type return_body: boolean
:param if_none_match: whether to fail the write if the object
exists
:type if_none_match: boolean
:param timeout: a timeout value in milliseconds
:type timeout: int
'''
return await self._transport.put(robj, w=w, dw=dw, pw=pw,
return_body=return_body,
if_none_match=if_none_match,
timeout=timeout)

async def delete(self, robj):
'''
Expand Down
8 changes: 5 additions & 3 deletions aioriak/riak_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ async def reload(self):
await self.client.get(self)
return self

async def store(self, return_body=True):
async def store(self, w=None, dw=None, pw=None, return_body=True,
if_none_match=False, timeout=None):
'''
Store the object in Riak. When this operation completes, the
object could contain new metadata and possibly new data if Riak
Expand All @@ -102,8 +103,9 @@ async def store(self, return_body=True):
raise ConflictError("Attempting to store an invalid object, "
"resolve the siblings first")

await self.client.put(self, return_body)

await self.client.put(self, w=w, dw=dw, pw=pw,
return_body=return_body,
if_none_match=if_none_match, timeout=timeout)
return self

async def delete(self):
Expand Down
37 changes: 35 additions & 2 deletions aioriak/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
logger = logging.getLogger('aioriak.transport')


def _validate_timeout(timeout):
"""
Raises an exception if the given timeout is an invalid value.
"""
if not (timeout is None or
(type(timeout) == int and timeout > 0)): # noqa
raise ValueError("timeout must be a positive integer")


async def create_transport(host='localhost', port=8087, loop=None):
reader, writer = await asyncio.open_connection(
host, port, loop=loop)
Expand Down Expand Up @@ -870,13 +879,26 @@ def _decode_link(self, link):

return (bucket, key, tag)

async def get(self, robj):
async def get(self, robj, r=None, pr=None, timeout=None, basic_quorum=None,
notfound_ok=None):
'''
Serialize get request and deserialize response
'''
bucket = robj.bucket

req = riak_kv_pb2.RpbGetReq()
if r:
req.r = self._encode_quorum(r)
if pr:
req.pr = self._encode_quorum(pr)
if basic_quorum is not None:
req.basic_quorum = basic_quorum
if notfound_ok is not None:
req.notfound_ok = notfound_ok
if timeout:
req.timeout = timeout
req.deletedvclock = True

req.bucket = bucket.name.encode()
self._add_bucket_type(req, bucket.bucket_type)
req.key = robj.key.encode()
Expand Down Expand Up @@ -917,13 +939,24 @@ async def get_index(self, bucket, index, startkey, endkey=None,
else:
return results, None

async def put(self, robj, return_body=True):
async def put(self, robj, w=None, dw=None, pw=None, return_body=True,
if_none_match=False, timeout=None):
bucket = robj.bucket

req = riak_kv_pb2.RpbPutReq()
if w:
req.w = self._encode_quorum(w)
if dw:
req.dw = self._encode_quorum(dw)
if pw:
req.pw = self._encode_quorum(pw)

if return_body:
req.return_body = 1
if if_none_match:
req.if_none_match = 1
if timeout:
req.timeout = timeout

req.bucket = str_to_bytes(bucket.name)
self._add_bucket_type(req, bucket.bucket_type)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def read(*parts):

setup(
name='aioriak',
version='0.1.7',
version='0.1.8',
description='Async implementation of Riak DB python client',
long_description=read("README.rst"),
author='Makc Belousov',
Expand Down

0 comments on commit 8b07ffe

Please sign in to comment.