Skip to content

Commit a39dd82

Browse files
authored
Merge branch 'master' into ps_change_ssl_check_hostname_default_value_to_true
2 parents f092532 + 8faac60 commit a39dd82

File tree

11 files changed

+283
-153
lines changed

11 files changed

+283
-153
lines changed

docs/clustering.rst

+7-4
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ When a ClusterPubSub instance is created without specifying a node, a
187187
single node will be transparently chosen for the pubsub connection on
188188
the first command execution. The node will be determined by: 1. Hashing
189189
the channel name in the request to find its keyslot 2. Selecting a node
190-
that handles the keyslot: If read_from_replicas is set to true, a
191-
replica can be selected.
190+
that handles the keyslot: If read_from_replicas is set to true or
191+
load_balancing_strategy is provided, a replica can be selected.
192192
193193
Known PubSub Limitations
194194
------------------------
@@ -216,9 +216,12 @@ By default, Redis Cluster always returns MOVE redirection response on
216216
accessing a replica node. You can overcome this limitation and scale
217217
read commands by triggering READONLY mode.
218218
219-
To enable READONLY mode pass read_from_replicas=True to RedisCluster
220-
constructor. When set to true, read commands will be assigned between
219+
To enable READONLY mode pass read_from_replicas=True or define
220+
a load_balancing_strategy to RedisCluster constructor.
221+
When read_from_replicas is set to true read commands will be assigned between
221222
the primary and its replications in a Round-Robin manner.
223+
With load_balancing_strategy you can define a custom strategy for
224+
assigning read commands to the replicas and primary nodes.
222225
223226
READONLY mode can be set at runtime by calling the readonly() method
224227
with target_nodes=‘replicas’, and read-write access can be restored by

docs/retry.rst

+23-20
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,25 @@ Retry in Redis Standalone
1313
>>> from redis.client import Redis
1414
>>> from redis.exceptions import (
1515
>>> BusyLoadingError,
16-
>>> ConnectionError,
17-
>>> TimeoutError
16+
>>> RedisError,
1817
>>> )
1918
>>>
2019
>>> # Run 3 retries with exponential backoff strategy
2120
>>> retry = Retry(ExponentialBackoff(), 3)
22-
>>> # Redis client with retries on custom errors
23-
>>> r = Redis(host='localhost', port=6379, retry=retry, retry_on_error=[BusyLoadingError, ConnectionError, TimeoutError])
24-
>>> # Redis client with retries on TimeoutError only
25-
>>> r_only_timeout = Redis(host='localhost', port=6379, retry=retry, retry_on_timeout=True)
21+
>>> # Redis client with retries on custom errors in addition to the errors
22+
>>> # that are already retried by default
23+
>>> r = Redis(host='localhost', port=6379, retry=retry, retry_on_error=[BusyLoadingError, RedisError])
2624

27-
As you can see from the example above, Redis client supports 3 parameters to configure the retry behaviour:
25+
As you can see from the example above, Redis client supports 2 parameters to configure the retry behaviour:
2826

2927
* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries
30-
* ``retry_on_error``: list of :ref:`exceptions-label` to retry on
31-
* ``retry_on_timeout``: if ``True``, retry on :class:`~.TimeoutError` only
28+
* The :class:`~.Retry` instance has default set of :ref:`exceptions-label` to retry on,
29+
which can be overridden by passing a tuple with :ref:`exceptions-label` to the ``supported_errors`` parameter.
30+
* ``retry_on_error``: list of additional :ref:`exceptions-label` to retry on
3231

33-
If either ``retry_on_error`` or ``retry_on_timeout`` are passed and no ``retry`` is given,
34-
by default it uses a ``Retry(NoBackoff(), 1)`` (meaning 1 retry right after the first failure).
32+
33+
If no ``retry`` is provided, a default one is created with :class:`~.ExponentialWithJitterBackoff` as backoff strategy
34+
and 3 retries.
3535

3636

3737
Retry in Redis Cluster
@@ -44,27 +44,30 @@ Retry in Redis Cluster
4444
>>> # Run 3 retries with exponential backoff strategy
4545
>>> retry = Retry(ExponentialBackoff(), 3)
4646
>>> # Redis Cluster client with retries
47-
>>> rc = RedisCluster(host='localhost', port=6379, retry=retry, cluster_error_retry_attempts=2)
47+
>>> rc = RedisCluster(host='localhost', port=6379, retry=retry)
4848

4949
Retry behaviour in Redis Cluster is a little bit different from Standalone:
5050

51-
* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries, default value is ``Retry(NoBackoff(), 0)``
52-
* ``cluster_error_retry_attempts``: number of times to retry before raising an error when :class:`~.TimeoutError` or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered, default value is ``3``
51+
* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries, default value is ``Retry(ExponentialWithJitterBackoff(base=1, cap=10), cluster_error_retry_attempts)``
52+
* ``cluster_error_retry_attempts``: number of times to retry before raising an error when :class:`~.TimeoutError`, :class:`~.ConnectionError`, :class:`~.ClusterDownError` or :class:`~.SlotNotCoveredError` are encountered, default value is ``3``
53+
* This argument is deprecated - it is used to initialize the number of retries for the retry object,
54+
only in the case when the ``retry`` object is not provided.
55+
When the ``retry`` argument is provided, the ``cluster_error_retry_attempts`` argument is ignored!
56+
57+
* The retry object is not yet fully utilized in the cluster client.
58+
The retry object is used only to determine the number of retries for the cluster level calls.
5359

5460
Let's consider the following example:
5561

5662
>>> from redis.backoff import ExponentialBackoff
5763
>>> from redis.retry import Retry
5864
>>> from redis.cluster import RedisCluster
5965
>>>
60-
>>> rc = RedisCluster(host='localhost', port=6379, retry=Retry(ExponentialBackoff(), 6), cluster_error_retry_attempts=1)
66+
>>> rc = RedisCluster(host='localhost', port=6379, retry=Retry(ExponentialBackoff(), 6))
6167
>>> rc.set('foo', 'bar')
6268

6369
#. the client library calculates the hash slot for key 'foo'.
6470
#. given the hash slot, it then determines which node to connect to, in order to execute the command.
6571
#. during the connection, a :class:`~.ConnectionError` is raised.
66-
#. because we set ``retry=Retry(ExponentialBackoff(), 6)``, the client tries to reconnect to the node up to 6 times, with an exponential backoff between each attempt.
67-
#. even after 6 retries, the client is still unable to connect.
68-
#. because we set ``cluster_error_retry_attempts=1``, before giving up, the client starts a cluster update, removes the failed node from the startup nodes, and re-initializes the cluster.
69-
#. after the cluster has been re-initialized, it starts a new cycle of retries, up to 6 retries, with an exponential backoff.
70-
#. if the client can connect, we're good. Otherwise, the exception is finally raised to the caller, because we've run out of attempts.
72+
#. because we set ``retry=Retry(ExponentialBackoff(), 6)``, the cluster client starts a cluster update, removes the failed node from the startup nodes, and re-initializes the cluster.
73+
#. the cluster client retries the command until it either succeeds or the max number of retries is reached.

redis/asyncio/cluster.py

+55-42
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from redis.asyncio.lock import Lock
3030
from redis.asyncio.retry import Retry
3131
from redis.auth.token import TokenInterface
32-
from redis.backoff import default_backoff
32+
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
3333
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
3434
from redis.cluster import (
3535
PIPELINE_BLOCKED_COMMANDS,
@@ -143,19 +143,23 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
143143
To avoid reinitializing the cluster on moved errors, set reinitialize_steps to
144144
0.
145145
:param cluster_error_retry_attempts:
146-
| Number of times to retry before raising an error when :class:`~.TimeoutError`
147-
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
148-
:param connection_error_retry_attempts:
149-
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
150-
or :class:`~.ConnectionError` are encountered.
151-
The default backoff strategy will be set if Retry object is not passed (see
152-
default_backoff in backoff.py). To change it, pass a custom Retry object
153-
using the "retry" keyword.
146+
| @deprecated - Please configure the 'retry' object instead
147+
In case 'retry' object is set - this argument is ignored!
148+
149+
Number of times to retry before raising an error when :class:`~.TimeoutError`,
150+
:class:`~.ConnectionError`, :class:`~.SlotNotCoveredError`
151+
or :class:`~.ClusterDownError` are encountered
152+
:param retry:
153+
| A retry object that defines the retry strategy and the number of
154+
retries for the cluster client.
155+
In current implementation for the cluster client (starting form redis-py version 6.0.0)
156+
the retry object is not yet fully utilized, instead it is used just to determine
157+
the number of retries for the cluster client.
158+
In the future releases the retry object will be used to handle the cluster client retries!
154159
:param max_connections:
155160
| Maximum number of connections per node. If there are no free connections & the
156161
maximum number of connections are already created, a
157-
:class:`~.MaxConnectionsError` is raised. This error may be retried as defined
158-
by :attr:`connection_error_retry_attempts`
162+
:class:`~.MaxConnectionsError` is raised.
159163
:param address_remap:
160164
| An optional callable which, when provided with an internal network
161165
address of a node, e.g. a `(host, port)` tuple, will return the address
@@ -211,10 +215,9 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
211215
__slots__ = (
212216
"_initialize",
213217
"_lock",
214-
"cluster_error_retry_attempts",
218+
"retry",
215219
"command_flags",
216220
"commands_parser",
217-
"connection_error_retry_attempts",
218221
"connection_kwargs",
219222
"encoder",
220223
"node_flags",
@@ -231,6 +234,13 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
231234
reason="Please configure the 'load_balancing_strategy' instead",
232235
version="5.3.0",
233236
)
237+
@deprecated_args(
238+
args_to_warn=[
239+
"cluster_error_retry_attempts",
240+
],
241+
reason="Please configure the 'retry' object instead",
242+
version="6.0.0",
243+
)
234244
def __init__(
235245
self,
236246
host: Optional[str] = None,
@@ -242,8 +252,9 @@ def __init__(
242252
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
243253
reinitialize_steps: int = 5,
244254
cluster_error_retry_attempts: int = 3,
245-
connection_error_retry_attempts: int = 3,
246255
max_connections: int = 2**31,
256+
retry: Optional["Retry"] = None,
257+
retry_on_error: Optional[List[Type[Exception]]] = None,
247258
# Client related kwargs
248259
db: Union[str, int] = 0,
249260
path: Optional[str] = None,
@@ -263,8 +274,6 @@ def __init__(
263274
socket_keepalive: bool = False,
264275
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
265276
socket_timeout: Optional[float] = None,
266-
retry: Optional["Retry"] = None,
267-
retry_on_error: Optional[List[Type[Exception]]] = None,
268277
# SSL related kwargs
269278
ssl: bool = False,
270279
ssl_ca_certs: Optional[str] = None,
@@ -318,7 +327,6 @@ def __init__(
318327
"socket_keepalive": socket_keepalive,
319328
"socket_keepalive_options": socket_keepalive_options,
320329
"socket_timeout": socket_timeout,
321-
"retry": retry,
322330
"protocol": protocol,
323331
}
324332

@@ -342,17 +350,15 @@ def __init__(
342350
# Call our on_connect function to configure READONLY mode
343351
kwargs["redis_connect_func"] = self.on_connect
344352

345-
self.retry = retry
346-
if retry or retry_on_error or connection_error_retry_attempts > 0:
347-
# Set a retry object for all cluster nodes
348-
self.retry = retry or Retry(
349-
default_backoff(), connection_error_retry_attempts
353+
if retry:
354+
self.retry = retry
355+
else:
356+
self.retry = Retry(
357+
backoff=ExponentialWithJitterBackoff(base=1, cap=10),
358+
retries=cluster_error_retry_attempts,
350359
)
351-
if not retry_on_error:
352-
# Default errors for retrying
353-
retry_on_error = [ConnectionError, TimeoutError]
360+
if retry_on_error:
354361
self.retry.update_supported_errors(retry_on_error)
355-
kwargs.update({"retry": self.retry})
356362

357363
kwargs["response_callbacks"] = _RedisCallbacks.copy()
358364
if kwargs.get("protocol") in ["3", 3]:
@@ -389,8 +395,6 @@ def __init__(
389395
self.read_from_replicas = read_from_replicas
390396
self.load_balancing_strategy = load_balancing_strategy
391397
self.reinitialize_steps = reinitialize_steps
392-
self.cluster_error_retry_attempts = cluster_error_retry_attempts
393-
self.connection_error_retry_attempts = connection_error_retry_attempts
394398
self.reinitialize_counter = 0
395399
self.commands_parser = AsyncCommandsParser()
396400
self.node_flags = self.__class__.NODE_FLAGS.copy()
@@ -561,15 +565,8 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
561565
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
562566
return self.connection_kwargs
563567

564-
def get_retry(self) -> Optional["Retry"]:
565-
return self.retry
566-
567-
def set_retry(self, retry: "Retry") -> None:
568+
def set_retry(self, retry: Retry) -> None:
568569
self.retry = retry
569-
for node in self.get_nodes():
570-
node.connection_kwargs.update({"retry": retry})
571-
for conn in node._connections:
572-
conn.retry = retry
573570

574571
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
575572
"""Set a custom response callback."""
@@ -688,8 +685,8 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
688685
"""
689686
Execute a raw command on the appropriate cluster node or target_nodes.
690687
691-
It will retry the command as specified by :attr:`cluster_error_retry_attempts` &
692-
then raise an exception.
688+
It will retry the command as specified by the retries property of
689+
the :attr:`retry` & then raise an exception.
693690
694691
:param args:
695692
| Raw command args
@@ -705,7 +702,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
705702
command = args[0]
706703
target_nodes = []
707704
target_nodes_specified = False
708-
retry_attempts = self.cluster_error_retry_attempts
705+
retry_attempts = self.retry.get_retries()
709706

710707
passed_targets = kwargs.pop("target_nodes", None)
711708
if passed_targets and not self._is_node_flag(passed_targets):
@@ -1048,7 +1045,23 @@ def acquire_connection(self) -> Connection:
10481045
return self._free.popleft()
10491046
except IndexError:
10501047
if len(self._connections) < self.max_connections:
1051-
connection = self.connection_class(**self.connection_kwargs)
1048+
# We are configuring the connection pool not to retry
1049+
# connections on lower level clients to avoid retrying
1050+
# connections to nodes that are not reachable
1051+
# and to avoid blocking the connection pool.
1052+
# The only error that will have some handling in the lower
1053+
# level clients is ConnectionError which will trigger disconnection
1054+
# of the socket.
1055+
# The retries will be handled on cluster client level
1056+
# where we will have proper handling of the cluster topology
1057+
retry = Retry(
1058+
backoff=NoBackoff(),
1059+
retries=0,
1060+
supported_errors=(ConnectionError,),
1061+
)
1062+
connection_kwargs = self.connection_kwargs.copy()
1063+
connection_kwargs["retry"] = retry
1064+
connection = self.connection_class(**connection_kwargs)
10521065
self._connections.append(connection)
10531066
return connection
10541067

@@ -1544,7 +1557,7 @@ async def execute(
15441557
"""
15451558
Execute the pipeline.
15461559
1547-
It will retry the commands as specified by :attr:`cluster_error_retry_attempts`
1560+
It will retry the commands as specified by retries specified in :attr:`retry`
15481561
& then raise an exception.
15491562
15501563
:param raise_on_error:
@@ -1560,7 +1573,7 @@ async def execute(
15601573
return []
15611574

15621575
try:
1563-
retry_attempts = self._client.cluster_error_retry_attempts
1576+
retry_attempts = self._client.retry.get_retries()
15641577
while True:
15651578
try:
15661579
if self._client._initialize:

redis/asyncio/retry.py

+12
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ def update_supported_errors(self, specified_errors: list):
4343
set(self._supported_errors + tuple(specified_errors))
4444
)
4545

46+
def get_retries(self) -> int:
47+
"""
48+
Get the number of retries.
49+
"""
50+
return self._retries
51+
52+
def update_retries(self, value: int) -> None:
53+
"""
54+
Set the number of retries.
55+
"""
56+
self._retries = value
57+
4658
async def call_with_retry(
4759
self, do: Callable[[], Awaitable[T]], fail: Callable[[RedisError], Any]
4860
) -> T:

0 commit comments

Comments
 (0)