diff --git a/asyncio_redis/__init__.py b/asyncio_redis/__init__.py index 1f46792..f08fcc4 100644 --- a/asyncio_redis/__init__.py +++ b/asyncio_redis/__init__.py @@ -5,3 +5,4 @@ from .exceptions import * from .pool import * from .protocol import * +from .sentinel import * diff --git a/asyncio_redis/protocol.py b/asyncio_redis/protocol.py index 5a95ec7..9801e58 100644 --- a/asyncio_redis/protocol.py +++ b/asyncio_redis/protocol.py @@ -1804,6 +1804,16 @@ def start_subscribe(self, *a) -> 'Subscription': self._subscription = subscription return subscription + @_query_command + def sentintelslaves(self, key:NativeType) -> ListReply: + """ Returns list of slaves for specified group """ + return self._query(b'sentinel', b'slaves', self.encode_from_native(key)) + + @_query_command + def sentintelmaster(self, key:NativeType) -> DictReply: + """ Returns info for the specified master """ + return self._query(b'sentinel', b'master', self.encode_from_native(key)) + @_command def _subscribe(self, channels:ListOf(NativeType)) -> NoneType: """ Listen for messages published to the given channels """ diff --git a/asyncio_redis/sentinel.py b/asyncio_redis/sentinel.py new file mode 100644 index 0000000..74a6203 --- /dev/null +++ b/asyncio_redis/sentinel.py @@ -0,0 +1,109 @@ +import asyncio +from random import randint + +from .pool import Pool +from .protocol import RedisProtocol +from .replies import DictReply +from .exceptions import Error + +__all__ = ('Sentinel',) + + +class Sentinel: + """ + Provides connection to redis sentinel. + Returns master and slave instance. + """ + @classmethod + @asyncio.coroutine + def connect(cls, host='localhost', port=6379, *, password=None, db=0, + encoder=None, poolsize=1, auto_reconnect=True, loop=None, + protocol_class=RedisProtocol): + """ + Create a new connection to sentinel via Pool. + + :param host: Address, either host or unix domain socket path + :type host: str + :param port: TCP port. If port is 0 then host assumed to be unix socket path + :type port: int + :param password: Redis database password + :type password: bytes + :param db: Redis database + :type db: int + :param encoder: Encoder to use for encoding to or decoding from redis bytes to a native type. + :type encoder: :class:`~asyncio_redis.encoders.BaseEncoder` instance. + :param poolsize: The number of parallel connections. + :type poolsize: int + :param auto_reconnect: Enable auto reconnect + :type auto_reconnect: bool + :param loop: (optional) asyncio event loop. + :type protocol_class: :class:`~asyncio_redis.RedisProtocol` + :param protocol_class: (optional) redis protocol implementation + """ + self = cls() + self._active_slave = None + self._sentinel_pool = yield from Pool.create( + host=host, port=port, password=password, db=db, encoder=encoder, + poolsize=poolsize, auto_reconnect=auto_reconnect, loop=loop, + protocol_class=protocol_class + ) + + return self + + @asyncio.coroutine + def master(self, key): + """ + Returns a new connection to master. + + :param key: master name + :return: instance of Pool + """ + master = yield from self._sentinel_pool.sentintelmaster(key) + master_params = yield from master.asdict() + connection = yield from Pool.create( + host=master_params['ip'], port=int(master_params['port']) + ) + + return connection + + @asyncio.coroutine + def rotate_slaves(self, slaves): + """ + Randomly choose slave from list. + + :param slaves: ListReply + :return: DictReply + """ + count = slaves._result.count + if count > 0: + # Round-robin slave balancer + if self._active_slave is None: + index = randint(0, count - 1) + else: + index = (self._active_slave + 1) % count + + self._active_slave = index + slaves_list = yield from slaves.aslist() + slave = yield from DictReply(slaves_list[index]).asdict() + + return slave + else: + raise Error('Not found slaves') + + @asyncio.coroutine + def slave(self, key): + """ + Returns a new connection to slave. + + :param key: master name + :return: instance of Pool + """ + slaves = yield from self._sentinel_pool.sentintelslaves(key) + slave = yield from self.rotate_slaves(slaves) + connection = yield from Pool.create( + host=slave['ip'], port=int(slave['port']) + ) + + return connection + +