Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sentinel support #97

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions asyncio_redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from .exceptions import *
from .pool import *
from .protocol import *
from .sentinel import *
10 changes: 10 additions & 0 deletions asyncio_redis/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,16 @@ def start_subscribe(self, *a) -> 'Subscription':
self._subscription = subscription
return subscription

@_query_command
def sentintelslaves(self, key:NativeType) -> ListReply:
""" Returns list of slaves for specified group """
return self._query(b'sentinel', b'slaves', self.encode_from_native(key))

@_query_command
def sentintelmaster(self, key:NativeType) -> DictReply:
""" Returns info for the specified master """
return self._query(b'sentinel', b'master', self.encode_from_native(key))

@_command
def _subscribe(self, channels:ListOf(NativeType)) -> NoneType:
""" Listen for messages published to the given channels """
Expand Down
109 changes: 109 additions & 0 deletions asyncio_redis/sentinel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import asyncio
from random import randint

from .pool import Pool
from .protocol import RedisProtocol
from .replies import DictReply
from .exceptions import Error

__all__ = ('Sentinel',)


class Sentinel:
"""
Provides connection to redis sentinel.
Returns master and slave instance.
"""
@classmethod
@asyncio.coroutine
def connect(cls, host='localhost', port=6379, *, password=None, db=0,
encoder=None, poolsize=1, auto_reconnect=True, loop=None,
protocol_class=RedisProtocol):
"""
Create a new connection to sentinel via Pool.

:param host: Address, either host or unix domain socket path
:type host: str
:param port: TCP port. If port is 0 then host assumed to be unix socket path
:type port: int
:param password: Redis database password
:type password: bytes
:param db: Redis database
:type db: int
:param encoder: Encoder to use for encoding to or decoding from redis bytes to a native type.
:type encoder: :class:`~asyncio_redis.encoders.BaseEncoder` instance.
:param poolsize: The number of parallel connections.
:type poolsize: int
:param auto_reconnect: Enable auto reconnect
:type auto_reconnect: bool
:param loop: (optional) asyncio event loop.
:type protocol_class: :class:`~asyncio_redis.RedisProtocol`
:param protocol_class: (optional) redis protocol implementation
"""
self = cls()
self._active_slave = None
self._sentinel_pool = yield from Pool.create(
host=host, port=port, password=password, db=db, encoder=encoder,
poolsize=poolsize, auto_reconnect=auto_reconnect, loop=loop,
protocol_class=protocol_class
)

return self

@asyncio.coroutine
def master(self, key):
"""
Returns a new connection to master.

:param key: master name
:return: instance of Pool
"""
master = yield from self._sentinel_pool.sentintelmaster(key)
master_params = yield from master.asdict()
connection = yield from Pool.create(
host=master_params['ip'], port=int(master_params['port'])
)

return connection

@asyncio.coroutine
def rotate_slaves(self, slaves):
"""
Randomly choose slave from list.

:param slaves: ListReply
:return: DictReply
"""
count = slaves._result.count
if count > 0:
# Round-robin slave balancer
if self._active_slave is None:
index = randint(0, count - 1)
else:
index = (self._active_slave + 1) % count

self._active_slave = index
slaves_list = yield from slaves.aslist()
slave = yield from DictReply(slaves_list[index]).asdict()

return slave
else:
raise Error('Not found slaves')

@asyncio.coroutine
def slave(self, key):
"""
Returns a new connection to slave.

:param key: master name
:return: instance of Pool
"""
slaves = yield from self._sentinel_pool.sentintelslaves(key)
slave = yield from self.rotate_slaves(slaves)
connection = yield from Pool.create(
host=slave['ip'], port=int(slave['port'])
)

return connection