From d923d83eeee9e44add13b57e51073f8a336530c6 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Mon, 16 Oct 2023 16:01:30 +0100 Subject: [PATCH 01/13] cache --- src/py/flwr/common/backoff.py | 160 ++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 src/py/flwr/common/backoff.py diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py new file mode 100644 index 000000000000..f159be9f70bb --- /dev/null +++ b/src/py/flwr/common/backoff.py @@ -0,0 +1,160 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Functions to enchance other functions with error handling and retries.""" + + +import random +from typing import Generator, Iterable, Union, Optional, Callable, Any +import itertools + + +def exponential( + base_delay: float = 1, + multiplier: float = 2, + max_delay: Optional[int] = None, +) -> Generator[float, None, None]: + """Wait time generator for exponential backoff strategy. + + Parameters + ---------- + base_delay: float (default: 1) + Initial delay duration before the first retry. + multiplier: float (default: 2) + Factor by which the delay is multiplied after each retry. + max_delay: Optional[float] (default: None) + The maximum delay duration between two consecutive retries. + """ + delay = min(base_delay, max_delay) + while True: + yield delay + delay *= multiplier + if max_delay is not None: + delay = min(delay, max_delay) + + +def interval( + interval: Union[float, Iterable[float]] = 1, +) -> Generator[float, None, None]: + """Wait time generator for specified intervals. + + Parameters + ---------- + interval: Union[float, Iterable[float]] (default: 1) + A constant value to yield or an iterable of such values. + """ + try: + iter(interval) + except TypeError: + interval = itertools.repeat(interval) + for wait_time in interval: + yield wait_time + + +def random_jitter(value: float) -> float: + """Jitter the value by a random number of milliseconds. + + This adds up to 1 second of additional time to the original value. + Prior to backoff version 1.2, this was the default jitter behavior. + + Parameters + ---------- + value : float + The original, unjittered backoff value. + """ + + return value + random.random() + + +def full_jitter(value: float) -> float: + """Jitter the value across the full range (0 to value). + + This corresponds to the "Full Jitter" algorithm specified in the + AWS blog post on the performance of various jitter algorithms. + See: http://www.awsarchitectureblog.com/2015/03/backoff.html + + Parameters + ---------- + value : float + The original, unjittered backoff value. + """ + + return random.uniform(0, value) + + +class SafeInvoker + + +def safe_invoke( + wait_gen: Generator[float, None, None], + exception: Union[Exception, Iterable[Exception]], + func: Callable[[Any], Any] + *, + max_tries: Optional[int] = None, + max_time: Optional[float] = None, + jitter: Optional[Callable[[float], float]] = full_jitter, + giveup_condition: Callable[[Exception], bool] = lambda e: False, + on_success: Optional[Callable[Exception]] = None, + on_backoff: Union[_Handler, Iterable[_Handler], None] = None, + on_giveup: Union[_Handler, Iterable[_Handler], None] = None, + logger: _MaybeLogger = 'backoff', + backoff_log_level: int = logging.INFO, + giveup_log_level: int = logging.ERROR, +) -> Callable[[_CallableT], _CallableT]: + """Returns decorator for backoff and retry triggered by exception. + + Args: + wait_gen: A generator yielding successive wait times in + seconds. + exception: An exception type (or tuple of types) which triggers + backoff. + max_tries: The maximum number of attempts to make before giving + up. Once exhausted, the exception will be allowed to escape. + The default value of None means there is no limit to the + number of tries. If a callable is passed, it will be + evaluated at runtime and its return value used. + max_time: The maximum total amount of time to try for before + giving up. Once expired, the exception will be allowed to + escape. If a callable is passed, it will be + evaluated at runtime and its return value used. + jitter: A function of the value yielded by wait_gen returning + the actual time to wait. This distributes wait times + stochastically in order to avoid timing collisions across + concurrent clients. Wait times are jittered by default + using the full_jitter function. Jittering may be disabled + altogether by passing jitter=None. + giveup: Function accepting an exception instance and + returning whether or not to give up. Optional. The default + is to always continue. + on_success: Callable (or iterable of callables) with a unary + signature to be called in the event of success. The + parameter is a dict containing details about the invocation. + on_backoff: Callable (or iterable of callables) with a unary + signature to be called in the event of a backoff. The + parameter is a dict containing details about the invocation. + on_giveup: Callable (or iterable of callables) with a unary + signature to be called in the event that max_tries + is exceeded. The parameter is a dict containing details + about the invocation. + raise_on_giveup: Boolean indicating whether the registered exceptions + should be raised on giveup. Defaults to `True` + logger: Name or Logger object to log to. Defaults to 'backoff'. + backoff_log_level: log level for the backoff event. Defaults to "INFO" + giveup_log_level: log level for the give up event. Defaults to "ERROR" + **wait_gen_kwargs: Any additional keyword args specified will be + passed to wait_gen when it is initialized. Any callable + args will first be evaluated and their return values passed. + This is useful for runtime configuration. + """ + ... From 06c8e5ce4762ecc20edfa69827b19dace7d9776c Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Tue, 17 Oct 2023 12:16:21 +0100 Subject: [PATCH 02/13] add backoff.py --- src/py/flwr/common/backoff.py | 259 ++++++++++++++++++++++++---------- 1 file changed, 183 insertions(+), 76 deletions(-) diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py index f159be9f70bb..84687b3c98a5 100644 --- a/src/py/flwr/common/backoff.py +++ b/src/py/flwr/common/backoff.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""Functions to enchance other functions with error handling and retries.""" +"""`SafeInvoker` to argument other functions with error handling and retries.""" -import random -from typing import Generator, Iterable, Union, Optional, Callable, Any import itertools +import random +import time +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any, Callable, Dict, Generator, Iterable, Optional, Tuple, Union def exponential( @@ -36,7 +39,7 @@ def exponential( max_delay: Optional[float] (default: None) The maximum delay duration between two consecutive retries. """ - delay = min(base_delay, max_delay) + delay = base_delay if max_delay is None else min(base_delay, max_delay) while True: yield delay delay *= multiplier @@ -44,7 +47,7 @@ def exponential( delay = min(delay, max_delay) -def interval( +def constant( interval: Union[float, Iterable[float]] = 1, ) -> Generator[float, None, None]: """Wait time generator for specified intervals. @@ -54,14 +57,11 @@ def interval( interval: Union[float, Iterable[float]] (default: 1) A constant value to yield or an iterable of such values. """ - try: - iter(interval) - except TypeError: + if not isinstance(interval, Iterable): interval = itertools.repeat(interval) - for wait_time in interval: - yield wait_time - - + yield from interval + + def random_jitter(value: float) -> float: """Jitter the value by a random number of milliseconds. @@ -73,7 +73,6 @@ def random_jitter(value: float) -> float: value : float The original, unjittered backoff value. """ - return value + random.random() @@ -89,72 +88,180 @@ def full_jitter(value: float) -> float: value : float The original, unjittered backoff value. """ - return random.uniform(0, value) -class SafeInvoker - +@dataclass +class Details: + """Details for event handlers in SafeInvoker.""" -def safe_invoke( - wait_gen: Generator[float, None, None], - exception: Union[Exception, Iterable[Exception]], func: Callable[[Any], Any] - *, - max_tries: Optional[int] = None, - max_time: Optional[float] = None, - jitter: Optional[Callable[[float], float]] = full_jitter, - giveup_condition: Callable[[Exception], bool] = lambda e: False, - on_success: Optional[Callable[Exception]] = None, - on_backoff: Union[_Handler, Iterable[_Handler], None] = None, - on_giveup: Union[_Handler, Iterable[_Handler], None] = None, - logger: _MaybeLogger = 'backoff', - backoff_log_level: int = logging.INFO, - giveup_log_level: int = logging.ERROR, -) -> Callable[[_CallableT], _CallableT]: - """Returns decorator for backoff and retry triggered by exception. - - Args: - wait_gen: A generator yielding successive wait times in - seconds. - exception: An exception type (or tuple of types) which triggers - backoff. - max_tries: The maximum number of attempts to make before giving - up. Once exhausted, the exception will be allowed to escape. - The default value of None means there is no limit to the - number of tries. If a callable is passed, it will be - evaluated at runtime and its return value used. - max_time: The maximum total amount of time to try for before - giving up. Once expired, the exception will be allowed to - escape. If a callable is passed, it will be - evaluated at runtime and its return value used. - jitter: A function of the value yielded by wait_gen returning - the actual time to wait. This distributes wait times - stochastically in order to avoid timing collisions across - concurrent clients. Wait times are jittered by default - using the full_jitter function. Jittering may be disabled - altogether by passing jitter=None. - giveup: Function accepting an exception instance and - returning whether or not to give up. Optional. The default - is to always continue. - on_success: Callable (or iterable of callables) with a unary - signature to be called in the event of success. The - parameter is a dict containing details about the invocation. - on_backoff: Callable (or iterable of callables) with a unary - signature to be called in the event of a backoff. The - parameter is a dict containing details about the invocation. - on_giveup: Callable (or iterable of callables) with a unary - signature to be called in the event that max_tries - is exceeded. The parameter is a dict containing details - about the invocation. - raise_on_giveup: Boolean indicating whether the registered exceptions - should be raised on giveup. Defaults to `True` - logger: Name or Logger object to log to. Defaults to 'backoff'. - backoff_log_level: log level for the backoff event. Defaults to "INFO" - giveup_log_level: log level for the give up event. Defaults to "ERROR" - **wait_gen_kwargs: Any additional keyword args specified will be - passed to wait_gen when it is initialized. Any callable - args will first be evaluated and their return values passed. - This is useful for runtime configuration. + args: Tuple[Any, ...] + kwargs: Dict[str, Any] + tries: int + elapsed: float + exception: Optional[Exception] = None + wait: Optional[float] = None + + +# pylint: disable-next=too-many-instance-attributes +class SafeInvoker: + """Wrapper class for backoff and retry triggered by exceptions. + + Parameters + ---------- + wait: Generator[float, None, None] + A generator yielding successive wait times in seconds. If the generator + is finite, the giveup event will be triggered when the generator raises + `StopIteration`. + exception: Union[Exception, Iterable[Exception]] + An exception type (or iterable of types) that triggers backoff. + max_tries: Optional[int] (default: None) + The maximum number of attempts to make before giving up. Once exhausted, + the exception will be allowed to escape. If set to None, there is no limit + to the number of tries. + max_time: Optional[float] (default: None) + The maximum total amount of time to try before giving up. Once this time + has expired, the exception will be allowed to escape. + jitter: Optional[Callable[[float], float]] (default: full_jitter) + A function of the value yielded by `wait_gen` returning the actual time + to wait. This function helps distribute wait times stochastically to avoid + timing collisions across concurrent clients. Wait times are jittered by + default using the `full_jitter` function. To disable jittering, pass + `jitter=None`. + giveup_condition: Optional[Callable[[Exception], bool]] (default: None) + A function accepting an exception instance, returning whether or not + to give up. If set to None, the strategy is to always continue. + on_success: Optional[Callable[[Details], None]] (default: None) + A callable to be executed in the event of success. The parameter is a + data class object detailing the invocation. + on_backoff: Optional[Callable[[Details], None]] (default: None) + A callable to be executed in the event of a backoff. The parameter is a + data class object detailing the invocation. + on_giveup: Optional[Callable[[Details], None]] (default: None) + A callable to be executed in the event that `max_tries` or `max_time` is + exceeded. The parameter is a data class object detailing the invocation. """ - ... + + def __init__( + self, + wait: Generator[float, None, None], + exception: Union[Exception, Iterable[Exception]], + *, + max_tries: Optional[int] = None, + max_time: Optional[float] = None, + jitter: Optional[Callable[[float], float]] = full_jitter, + giveup_condition: Optional[Callable[[Exception], bool]] = None, + on_success: Optional[Callable[[Details], None]] = None, + on_backoff: Optional[Callable[[Details], None]] = None, + on_giveup: Optional[Callable[[Details], None]] = None, + ) -> None: + self.wait = wait + self.exception = exception + self.max_tries = max_tries + self.max_time = max_time + self.jitter = jitter + self.giveup_condition = giveup_condition + self.on_success = on_success + self.on_backoff = on_backoff + self.on_giveup = on_giveup + + def invoke( + self, + func: Callable[[Any], Any], + *args: Tuple[Any, ...], + **kwargs: Dict[str, Any], + ) -> Any: + """Safely invoke the provided function with retry mechanisms. + + This method attempts to call the given function, and in the event of + a specified exception, employs a retry mechanism that considers + wait times, jitter, maximum attempts, and maximum time. During the + retry process, various callbacks (`on_backoff`, `on_success`, and + `on_giveup`) can be triggered based on the outcome. + + Parameters + ---------- + func: Callable[[Any], Any] + The function to be invoked. + *args: Tuple[Any, ...] + Positional arguments to pass to `func`. + **kwargs: Dict[str, Any] + Keyword arguments to pass to `func`. + + Returns + ------- + Any + The result of the function invocation. + + Raises + ------ + Exception + If the number of tries exceeds `max_tries`, if the total time + exceeds `max_time`, if `wait` generator raises `StopInteration`, + or if the `giveup_condition` returns True for a raised exception. + + Notes + ----- + The time between retries is determined by the provided `wait` generator + and can optionally be jittered using the `jitter` function. The exact + exceptions that trigger a retry, as well as conditions to stop retries, + are also determined by the class's initialization parameters. + """ + + def try_call_event_handler( + handler: Optional[Callable[[Details], None]], details: Details + ) -> None: + if handler is not None: + handler(details) + + tries = 0 + start = datetime.now() + + while True: + tries += 1 + elapsed = timedelta.total_seconds(datetime.now() - start) + details = Details( + func=func, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed + ) + + try: + ret = func(*args, **kwargs) + except self.exception as err: # type: ignore + # Check if giveup event should be triggered + max_tries_exceeded = tries == self.max_tries + max_time_exceeded = ( + self.max_time is not None and elapsed >= self.max_time + ) + + def giveup_check(_exception: Exception) -> bool: + if self.giveup_condition is None: + return False + return self.giveup_condition(_exception) + + if giveup_check(err) or max_tries_exceeded or max_time_exceeded: + # Trigger giveup event + try_call_event_handler(self.on_giveup, details) + raise + + try: + seconds = next(self.wait) + if self.jitter is not None: + seconds = self.jitter(seconds) + if self.max_time is not None: + seconds = min(seconds, self.max_time - elapsed) + details.wait = seconds + except StopIteration: + # Trigger giveup event + try_call_event_handler(self.on_giveup, details) + raise err from None + + # Trigger backoff event + try_call_event_handler(self.on_backoff, details) + + # Sleep + time.sleep(seconds) + else: + # Trigger success event + try_call_event_handler(self.on_success, details) + return ret From 7fd96f8400e5a31d2a4ed28c78fe2063b28a3d4c Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Tue, 17 Oct 2023 12:37:56 +0100 Subject: [PATCH 03/13] update docstring for --- src/py/flwr/common/backoff.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py index 84687b3c98a5..1156751a18ca 100644 --- a/src/py/flwr/common/backoff.py +++ b/src/py/flwr/common/backoff.py @@ -140,7 +140,9 @@ class SafeInvoker: data class object detailing the invocation. on_giveup: Optional[Callable[[Details], None]] (default: None) A callable to be executed in the event that `max_tries` or `max_time` is - exceeded. The parameter is a data class object detailing the invocation. + exceeded, `giveup_condition` returns True, or `wait` generator raises + `StopInteration`. The parameter is a data class object detailing the + invocation. """ def __init__( From 6f2670c399b61e0526c17b0b1901bdaa91152198 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Tue, 17 Oct 2023 15:57:15 +0100 Subject: [PATCH 04/13] add unit tests for backoff --- src/py/flwr/common/backoff.py | 25 +++-- src/py/flwr/common/backoff_test.py | 163 +++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 src/py/flwr/common/backoff_test.py diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py index 1156751a18ca..4445a8161894 100644 --- a/src/py/flwr/common/backoff.py +++ b/src/py/flwr/common/backoff.py @@ -19,8 +19,17 @@ import random import time from dataclasses import dataclass -from datetime import datetime, timedelta -from typing import Any, Callable, Dict, Generator, Iterable, Optional, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Generator, + Iterable, + Optional, + Tuple, + Type, + Union, +) def exponential( @@ -114,7 +123,7 @@ class SafeInvoker: A generator yielding successive wait times in seconds. If the generator is finite, the giveup event will be triggered when the generator raises `StopIteration`. - exception: Union[Exception, Iterable[Exception]] + exception: Union[Type[Exception], Iterable[Type[Exception]]] An exception type (or iterable of types) that triggers backoff. max_tries: Optional[int] (default: None) The maximum number of attempts to make before giving up. Once exhausted, @@ -148,7 +157,7 @@ class SafeInvoker: def __init__( self, wait: Generator[float, None, None], - exception: Union[Exception, Iterable[Exception]], + exception: Union[Type[Exception], Iterable[Type[Exception]]], *, max_tries: Optional[int] = None, max_time: Optional[float] = None, @@ -170,7 +179,7 @@ def __init__( def invoke( self, - func: Callable[[Any], Any], + func: Callable[..., Any], *args: Tuple[Any, ...], **kwargs: Dict[str, Any], ) -> Any: @@ -184,7 +193,7 @@ def invoke( Parameters ---------- - func: Callable[[Any], Any] + func: Callable[..., Any] The function to be invoked. *args: Tuple[Any, ...] Positional arguments to pass to `func`. @@ -218,11 +227,11 @@ def try_call_event_handler( handler(details) tries = 0 - start = datetime.now() + start = time.time() while True: tries += 1 - elapsed = timedelta.total_seconds(datetime.now() - start) + elapsed = time.time() - start details = Details( func=func, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed ) diff --git a/src/py/flwr/common/backoff_test.py b/src/py/flwr/common/backoff_test.py new file mode 100644 index 000000000000..e82699758287 --- /dev/null +++ b/src/py/flwr/common/backoff_test.py @@ -0,0 +1,163 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for `SafeInvoker`.""" + + +from typing import Generator +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from flwr.common.backoff import SafeInvoker, constant + +# Assuming SafeInvoker and related utilities have been imported here... + + +def successful_function() -> str: + """.""" + return "success" + + +def failing_function() -> None: + """.""" + raise ValueError("failed") + + +@pytest.fixture(name="mock_time") +def fixture_mock_time() -> Generator[MagicMock, None, None]: + """Mock time.time for controlled testing.""" + with patch("time.time") as mock_time: + yield mock_time + + +@pytest.fixture(name="mock_sleep") +def fixture_mock_sleep() -> Generator[MagicMock, None, None]: + """Mock sleep to prevent actual waiting during testing.""" + with patch("time.sleep") as mock_sleep: + yield mock_sleep + + +def test_successful_invocation() -> None: + """Ensure successful function invocation.""" + # Prepare + success_handler = Mock() + backoff_handler = Mock() + giveup_handler = Mock() + invoker = SafeInvoker( + constant(0.1), + ValueError, + on_success=success_handler, + on_backoff=backoff_handler, + on_giveup=giveup_handler, + ) + + # Execute + result = invoker.invoke(successful_function) + + # Assert + assert result == "success" + success_handler.assert_called_once() + backoff_handler.assert_not_called() + giveup_handler.assert_not_called() + + +def test_failure() -> None: + """Check termination when unexpected exception is raised.""" + # Prepare + # `constant([0.1])` generator will raise `StopIteration` after one iteration. + invoker = SafeInvoker(constant(0.1), TypeError) + + # Execute and Assert + with pytest.raises(ValueError): + invoker.invoke(failing_function) + + +def test_backoff_on_failure(mock_sleep: MagicMock) -> None: + """Verify one retry on specified exception.""" + # Prepare + # `constant([0.1])` generator will raise `StopIteration` after one iteration. + invoker = SafeInvoker(constant([0.1]), ValueError, jitter=None) + + # Execute and Assert + with pytest.raises(ValueError): + invoker.invoke(failing_function) + mock_sleep.assert_called_once_with(0.1) + + +def test_max_tries(mock_sleep: MagicMock) -> None: + """Check termination after `max_tries`.""" + # Prepare + # Disable `jitter` to ensure 0.1s wait time. + invoker = SafeInvoker(constant(0.1), ValueError, max_tries=2, jitter=None) + + # Execute and Assert + with pytest.raises(ValueError): + invoker.invoke(failing_function) + # Assert 1 sleep call due to the max_tries being set to 2 + mock_sleep.assert_called_once_with(0.1) + + +def test_max_time(mock_time: MagicMock, mock_sleep: MagicMock) -> None: + """Check termination after `max_time`.""" + # Prepare + # Simulate the passage of time using mock + mock_time.side_effect = [ + 0.0, + 3.0, + ] + invoker = SafeInvoker(constant(2), ValueError, max_time=2.5) + + # Execute and Assert + with pytest.raises(ValueError): + invoker.invoke(failing_function) + # Assert no wait because `max_time` is exceeded before the first retry. + mock_sleep.assert_not_called() + + +def test_event_handlers() -> None: + """Test `on_backoff` and `on_giveup` triggers.""" + # Prepare + success_handler = Mock() + backoff_handler = Mock() + giveup_handler = Mock() + invoker = SafeInvoker( + constant(0.1), + ValueError, + max_tries=2, + on_success=success_handler, + on_backoff=backoff_handler, + on_giveup=giveup_handler, + ) + + # Execute and Assert + with pytest.raises(ValueError): + invoker.invoke(failing_function) + backoff_handler.assert_called_once() + giveup_handler.assert_called_once() + success_handler.assert_not_called() + + +def test_giveup_condition() -> None: + """Verify custom giveup termination.""" + + # Prepare + def should_give_up(exc: Exception) -> bool: + return isinstance(exc, ValueError) + + invoker = SafeInvoker(constant(0.1), ValueError, giveup_condition=should_give_up) + + # Execute and Assert + with pytest.raises(ValueError): + invoker.invoke(failing_function) From 9c439d0a5a0fe1095667d4bf03616a62774ebf74 Mon Sep 17 00:00:00 2001 From: Heng Pan <134433891+panh99@users.noreply.github.com> Date: Mon, 23 Oct 2023 16:59:56 +0100 Subject: [PATCH 05/13] Update src/py/flwr/common/backoff.py Co-authored-by: Daniel J. Beutel --- src/py/flwr/common/backoff.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py index 4445a8161894..82254b974182 100644 --- a/src/py/flwr/common/backoff.py +++ b/src/py/flwr/common/backoff.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""`SafeInvoker` to argument other functions with error handling and retries.""" +"""`SafeInvoker` to augment other functions with error handling and retries.""" import itertools From 3637d3bab23920156e939d5dee0317d603579185 Mon Sep 17 00:00:00 2001 From: Heng Pan <134433891+panh99@users.noreply.github.com> Date: Mon, 23 Oct 2023 17:00:07 +0100 Subject: [PATCH 06/13] Update src/py/flwr/common/backoff.py Co-authored-by: Daniel J. Beutel --- src/py/flwr/common/backoff.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py index 82254b974182..b008364d1b3f 100644 --- a/src/py/flwr/common/backoff.py +++ b/src/py/flwr/common/backoff.py @@ -115,7 +115,7 @@ class Details: # pylint: disable-next=too-many-instance-attributes class SafeInvoker: - """Wrapper class for backoff and retry triggered by exceptions. + """Wrapper class for retry (with backoff) triggered by exceptions. Parameters ---------- From 2ed25b6f7e4695273c5aef5761216aada76acef4 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Mon, 23 Oct 2023 21:26:06 +0100 Subject: [PATCH 07/13] update doc --- src/py/flwr/common/backoff.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py index b008364d1b3f..8aa50100b792 100644 --- a/src/py/flwr/common/backoff.py +++ b/src/py/flwr/common/backoff.py @@ -104,7 +104,7 @@ def full_jitter(value: float) -> float: class Details: """Details for event handlers in SafeInvoker.""" - func: Callable[[Any], Any] + func: Callable[..., Any] args: Tuple[Any, ...] kwargs: Dict[str, Any] tries: int @@ -123,7 +123,7 @@ class SafeInvoker: A generator yielding successive wait times in seconds. If the generator is finite, the giveup event will be triggered when the generator raises `StopIteration`. - exception: Union[Type[Exception], Iterable[Type[Exception]]] + exception: Union[Type[Exception], Iterable[Type[Exception]]] An exception type (or iterable of types) that triggers backoff. max_tries: Optional[int] (default: None) The maximum number of attempts to make before giving up. Once exhausted, @@ -131,7 +131,8 @@ class SafeInvoker: to the number of tries. max_time: Optional[float] (default: None) The maximum total amount of time to try before giving up. Once this time - has expired, the exception will be allowed to escape. + has expired, the function won't be interrupted immediately, but the exception + will be allowed to escape. If set to None, there is no limit to the total time. jitter: Optional[Callable[[float], float]] (default: full_jitter) A function of the value yielded by `wait_gen` returning the actual time to wait. This function helps distribute wait times stochastically to avoid @@ -140,7 +141,8 @@ class SafeInvoker: `jitter=None`. giveup_condition: Optional[Callable[[Exception], bool]] (default: None) A function accepting an exception instance, returning whether or not - to give up. If set to None, the strategy is to always continue. + to give up prematurely before other give-up conditions are evaluated. + If set to None, the strategy is to never give up prematurely. on_success: Optional[Callable[[Details], None]] (default: None) A callable to be executed in the event of success. The parameter is a data class object detailing the invocation. @@ -152,6 +154,17 @@ class SafeInvoker: exceeded, `giveup_condition` returns True, or `wait` generator raises `StopInteration`. The parameter is a data class object detailing the invocation. + + Examples + -------- + Initialize a `SafeInvoker` with exponential backoff and call a function: + + >>> invoker = SafeInvoker( + >>> exponential(), + >>> grpc.RpcError, + >>> max_tries=3, + >>> ) + >>> invoker.invoke(my_func, arg1, arg2, kw1=kwarg1) """ def __init__( From 5a01bd9fbb314478781515764f61e34413b79591 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Thu, 26 Oct 2023 17:44:50 +0100 Subject: [PATCH 08/13] renaming --- src/py/flwr/common/backoff.py | 37 +++++++++++++++++++++--------- src/py/flwr/common/backoff_test.py | 27 +++++++++++++++------- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py index 8aa50100b792..3121ec41f663 100644 --- a/src/py/flwr/common/backoff.py +++ b/src/py/flwr/common/backoff.py @@ -19,6 +19,7 @@ import random import time from dataclasses import dataclass +from logging import WARNING from typing import ( Any, Callable, @@ -31,6 +32,8 @@ Union, ) +from flwr.common.logger import log + def exponential( base_delay: float = 1, @@ -102,9 +105,9 @@ def full_jitter(value: float) -> float: @dataclass class Details: - """Details for event handlers in SafeInvoker.""" + """Details for event handlers in Retrier.""" - func: Callable[..., Any] + target: Callable[..., Any] args: Tuple[Any, ...] kwargs: Dict[str, Any] tries: int @@ -114,7 +117,7 @@ class Details: # pylint: disable-next=too-many-instance-attributes -class SafeInvoker: +class Retrier: """Wrapper class for retry (with backoff) triggered by exceptions. Parameters @@ -123,8 +126,8 @@ class SafeInvoker: A generator yielding successive wait times in seconds. If the generator is finite, the giveup event will be triggered when the generator raises `StopIteration`. - exception: Union[Type[Exception], Iterable[Type[Exception]]] - An exception type (or iterable of types) that triggers backoff. + exception: Union[Type[Exception], Tuple[Type[Exception]]] + An exception type (or tuple of types) that triggers backoff. max_tries: Optional[int] (default: None) The maximum number of attempts to make before giving up. Once exhausted, the exception will be allowed to escape. If set to None, there is no limit @@ -157,9 +160,9 @@ class SafeInvoker: Examples -------- - Initialize a `SafeInvoker` with exponential backoff and call a function: + Initialize a `Retrier` with exponential backoff and call a function: - >>> invoker = SafeInvoker( + >>> invoker = Retrier( >>> exponential(), >>> grpc.RpcError, >>> max_tries=3, @@ -170,7 +173,7 @@ class SafeInvoker: def __init__( self, wait: Generator[float, None, None], - exception: Union[Type[Exception], Iterable[Type[Exception]]], + exception: Union[Type[Exception], Tuple[Type[Exception], ...]], *, max_tries: Optional[int] = None, max_time: Optional[float] = None, @@ -189,10 +192,16 @@ def __init__( self.on_success = on_success self.on_backoff = on_backoff self.on_giveup = on_giveup + if max_tries is None and max_time is None: + log( + WARNING, + "Neither 'max_tries' nor 'max_time' is set. This might result in " + "the Retrier continuously retrying without any limit.", + ) def invoke( self, - func: Callable[..., Any], + target: Callable[..., Any], *args: Tuple[Any, ...], **kwargs: Dict[str, Any], ) -> Any: @@ -232,6 +241,12 @@ def invoke( exceptions that trigger a retry, as well as conditions to stop retries, are also determined by the class's initialization parameters. """ + if self.max_tries is None and self.max_time is None: + log( + WARNING, + "Neither 'max_tries' nor 'max_time' is set. This might result in " + "the Retrier continuously retrying without any limit.", + ) def try_call_event_handler( handler: Optional[Callable[[Details], None]], details: Details @@ -246,11 +261,11 @@ def try_call_event_handler( tries += 1 elapsed = time.time() - start details = Details( - func=func, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed + target=target, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed ) try: - ret = func(*args, **kwargs) + ret = target(*args, **kwargs) except self.exception as err: # type: ignore # Check if giveup event should be triggered max_tries_exceeded = tries == self.max_tries diff --git a/src/py/flwr/common/backoff_test.py b/src/py/flwr/common/backoff_test.py index e82699758287..4168f140425f 100644 --- a/src/py/flwr/common/backoff_test.py +++ b/src/py/flwr/common/backoff_test.py @@ -20,7 +20,7 @@ import pytest -from flwr.common.backoff import SafeInvoker, constant +from flwr.common.backoff import Retrier, constant # Assuming SafeInvoker and related utilities have been imported here... @@ -55,7 +55,7 @@ def test_successful_invocation() -> None: success_handler = Mock() backoff_handler = Mock() giveup_handler = Mock() - invoker = SafeInvoker( + invoker = Retrier( constant(0.1), ValueError, on_success=success_handler, @@ -77,18 +77,29 @@ def test_failure() -> None: """Check termination when unexpected exception is raised.""" # Prepare # `constant([0.1])` generator will raise `StopIteration` after one iteration. - invoker = SafeInvoker(constant(0.1), TypeError) + invoker = Retrier(constant(0.1), TypeError) # Execute and Assert with pytest.raises(ValueError): invoker.invoke(failing_function) +def test_failure_two_exceptions(mock_sleep: MagicMock) -> None: + """Verify one retry on a specified iterable of exceptions.""" + # Prepare + invoker = Retrier(constant(0.1), (TypeError, ValueError), max_tries=2, jitter=None) + + # Execute and Assert + with pytest.raises(ValueError): + invoker.invoke(failing_function) + mock_sleep.assert_called_once_with(0.1) + + def test_backoff_on_failure(mock_sleep: MagicMock) -> None: """Verify one retry on specified exception.""" # Prepare # `constant([0.1])` generator will raise `StopIteration` after one iteration. - invoker = SafeInvoker(constant([0.1]), ValueError, jitter=None) + invoker = Retrier(constant([0.1]), ValueError, jitter=None) # Execute and Assert with pytest.raises(ValueError): @@ -100,7 +111,7 @@ def test_max_tries(mock_sleep: MagicMock) -> None: """Check termination after `max_tries`.""" # Prepare # Disable `jitter` to ensure 0.1s wait time. - invoker = SafeInvoker(constant(0.1), ValueError, max_tries=2, jitter=None) + invoker = Retrier(constant(0.1), ValueError, max_tries=2, jitter=None) # Execute and Assert with pytest.raises(ValueError): @@ -117,7 +128,7 @@ def test_max_time(mock_time: MagicMock, mock_sleep: MagicMock) -> None: 0.0, 3.0, ] - invoker = SafeInvoker(constant(2), ValueError, max_time=2.5) + invoker = Retrier(constant(2), ValueError, max_time=2.5) # Execute and Assert with pytest.raises(ValueError): @@ -132,7 +143,7 @@ def test_event_handlers() -> None: success_handler = Mock() backoff_handler = Mock() giveup_handler = Mock() - invoker = SafeInvoker( + invoker = Retrier( constant(0.1), ValueError, max_tries=2, @@ -156,7 +167,7 @@ def test_giveup_condition() -> None: def should_give_up(exc: Exception) -> bool: return isinstance(exc, ValueError) - invoker = SafeInvoker(constant(0.1), ValueError, giveup_condition=should_give_up) + invoker = Retrier(constant(0.1), ValueError, giveup_condition=should_give_up) # Execute and Assert with pytest.raises(ValueError): From 36f1feeeab2f94177a346963d5b2f22519d8d218 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Thu, 26 Oct 2023 19:31:33 +0100 Subject: [PATCH 09/13] renaming --- src/py/flwr/common/backoff.py | 306 ----------------------------- src/py/flwr/common/backoff_test.py | 174 ---------------- 2 files changed, 480 deletions(-) delete mode 100644 src/py/flwr/common/backoff.py delete mode 100644 src/py/flwr/common/backoff_test.py diff --git a/src/py/flwr/common/backoff.py b/src/py/flwr/common/backoff.py deleted file mode 100644 index 3121ec41f663..000000000000 --- a/src/py/flwr/common/backoff.py +++ /dev/null @@ -1,306 +0,0 @@ -# Copyright 2023 Flower Labs GmbH. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""`SafeInvoker` to augment other functions with error handling and retries.""" - - -import itertools -import random -import time -from dataclasses import dataclass -from logging import WARNING -from typing import ( - Any, - Callable, - Dict, - Generator, - Iterable, - Optional, - Tuple, - Type, - Union, -) - -from flwr.common.logger import log - - -def exponential( - base_delay: float = 1, - multiplier: float = 2, - max_delay: Optional[int] = None, -) -> Generator[float, None, None]: - """Wait time generator for exponential backoff strategy. - - Parameters - ---------- - base_delay: float (default: 1) - Initial delay duration before the first retry. - multiplier: float (default: 2) - Factor by which the delay is multiplied after each retry. - max_delay: Optional[float] (default: None) - The maximum delay duration between two consecutive retries. - """ - delay = base_delay if max_delay is None else min(base_delay, max_delay) - while True: - yield delay - delay *= multiplier - if max_delay is not None: - delay = min(delay, max_delay) - - -def constant( - interval: Union[float, Iterable[float]] = 1, -) -> Generator[float, None, None]: - """Wait time generator for specified intervals. - - Parameters - ---------- - interval: Union[float, Iterable[float]] (default: 1) - A constant value to yield or an iterable of such values. - """ - if not isinstance(interval, Iterable): - interval = itertools.repeat(interval) - yield from interval - - -def random_jitter(value: float) -> float: - """Jitter the value by a random number of milliseconds. - - This adds up to 1 second of additional time to the original value. - Prior to backoff version 1.2, this was the default jitter behavior. - - Parameters - ---------- - value : float - The original, unjittered backoff value. - """ - return value + random.random() - - -def full_jitter(value: float) -> float: - """Jitter the value across the full range (0 to value). - - This corresponds to the "Full Jitter" algorithm specified in the - AWS blog post on the performance of various jitter algorithms. - See: http://www.awsarchitectureblog.com/2015/03/backoff.html - - Parameters - ---------- - value : float - The original, unjittered backoff value. - """ - return random.uniform(0, value) - - -@dataclass -class Details: - """Details for event handlers in Retrier.""" - - target: Callable[..., Any] - args: Tuple[Any, ...] - kwargs: Dict[str, Any] - tries: int - elapsed: float - exception: Optional[Exception] = None - wait: Optional[float] = None - - -# pylint: disable-next=too-many-instance-attributes -class Retrier: - """Wrapper class for retry (with backoff) triggered by exceptions. - - Parameters - ---------- - wait: Generator[float, None, None] - A generator yielding successive wait times in seconds. If the generator - is finite, the giveup event will be triggered when the generator raises - `StopIteration`. - exception: Union[Type[Exception], Tuple[Type[Exception]]] - An exception type (or tuple of types) that triggers backoff. - max_tries: Optional[int] (default: None) - The maximum number of attempts to make before giving up. Once exhausted, - the exception will be allowed to escape. If set to None, there is no limit - to the number of tries. - max_time: Optional[float] (default: None) - The maximum total amount of time to try before giving up. Once this time - has expired, the function won't be interrupted immediately, but the exception - will be allowed to escape. If set to None, there is no limit to the total time. - jitter: Optional[Callable[[float], float]] (default: full_jitter) - A function of the value yielded by `wait_gen` returning the actual time - to wait. This function helps distribute wait times stochastically to avoid - timing collisions across concurrent clients. Wait times are jittered by - default using the `full_jitter` function. To disable jittering, pass - `jitter=None`. - giveup_condition: Optional[Callable[[Exception], bool]] (default: None) - A function accepting an exception instance, returning whether or not - to give up prematurely before other give-up conditions are evaluated. - If set to None, the strategy is to never give up prematurely. - on_success: Optional[Callable[[Details], None]] (default: None) - A callable to be executed in the event of success. The parameter is a - data class object detailing the invocation. - on_backoff: Optional[Callable[[Details], None]] (default: None) - A callable to be executed in the event of a backoff. The parameter is a - data class object detailing the invocation. - on_giveup: Optional[Callable[[Details], None]] (default: None) - A callable to be executed in the event that `max_tries` or `max_time` is - exceeded, `giveup_condition` returns True, or `wait` generator raises - `StopInteration`. The parameter is a data class object detailing the - invocation. - - Examples - -------- - Initialize a `Retrier` with exponential backoff and call a function: - - >>> invoker = Retrier( - >>> exponential(), - >>> grpc.RpcError, - >>> max_tries=3, - >>> ) - >>> invoker.invoke(my_func, arg1, arg2, kw1=kwarg1) - """ - - def __init__( - self, - wait: Generator[float, None, None], - exception: Union[Type[Exception], Tuple[Type[Exception], ...]], - *, - max_tries: Optional[int] = None, - max_time: Optional[float] = None, - jitter: Optional[Callable[[float], float]] = full_jitter, - giveup_condition: Optional[Callable[[Exception], bool]] = None, - on_success: Optional[Callable[[Details], None]] = None, - on_backoff: Optional[Callable[[Details], None]] = None, - on_giveup: Optional[Callable[[Details], None]] = None, - ) -> None: - self.wait = wait - self.exception = exception - self.max_tries = max_tries - self.max_time = max_time - self.jitter = jitter - self.giveup_condition = giveup_condition - self.on_success = on_success - self.on_backoff = on_backoff - self.on_giveup = on_giveup - if max_tries is None and max_time is None: - log( - WARNING, - "Neither 'max_tries' nor 'max_time' is set. This might result in " - "the Retrier continuously retrying without any limit.", - ) - - def invoke( - self, - target: Callable[..., Any], - *args: Tuple[Any, ...], - **kwargs: Dict[str, Any], - ) -> Any: - """Safely invoke the provided function with retry mechanisms. - - This method attempts to call the given function, and in the event of - a specified exception, employs a retry mechanism that considers - wait times, jitter, maximum attempts, and maximum time. During the - retry process, various callbacks (`on_backoff`, `on_success`, and - `on_giveup`) can be triggered based on the outcome. - - Parameters - ---------- - func: Callable[..., Any] - The function to be invoked. - *args: Tuple[Any, ...] - Positional arguments to pass to `func`. - **kwargs: Dict[str, Any] - Keyword arguments to pass to `func`. - - Returns - ------- - Any - The result of the function invocation. - - Raises - ------ - Exception - If the number of tries exceeds `max_tries`, if the total time - exceeds `max_time`, if `wait` generator raises `StopInteration`, - or if the `giveup_condition` returns True for a raised exception. - - Notes - ----- - The time between retries is determined by the provided `wait` generator - and can optionally be jittered using the `jitter` function. The exact - exceptions that trigger a retry, as well as conditions to stop retries, - are also determined by the class's initialization parameters. - """ - if self.max_tries is None and self.max_time is None: - log( - WARNING, - "Neither 'max_tries' nor 'max_time' is set. This might result in " - "the Retrier continuously retrying without any limit.", - ) - - def try_call_event_handler( - handler: Optional[Callable[[Details], None]], details: Details - ) -> None: - if handler is not None: - handler(details) - - tries = 0 - start = time.time() - - while True: - tries += 1 - elapsed = time.time() - start - details = Details( - target=target, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed - ) - - try: - ret = target(*args, **kwargs) - except self.exception as err: # type: ignore - # Check if giveup event should be triggered - max_tries_exceeded = tries == self.max_tries - max_time_exceeded = ( - self.max_time is not None and elapsed >= self.max_time - ) - - def giveup_check(_exception: Exception) -> bool: - if self.giveup_condition is None: - return False - return self.giveup_condition(_exception) - - if giveup_check(err) or max_tries_exceeded or max_time_exceeded: - # Trigger giveup event - try_call_event_handler(self.on_giveup, details) - raise - - try: - seconds = next(self.wait) - if self.jitter is not None: - seconds = self.jitter(seconds) - if self.max_time is not None: - seconds = min(seconds, self.max_time - elapsed) - details.wait = seconds - except StopIteration: - # Trigger giveup event - try_call_event_handler(self.on_giveup, details) - raise err from None - - # Trigger backoff event - try_call_event_handler(self.on_backoff, details) - - # Sleep - time.sleep(seconds) - else: - # Trigger success event - try_call_event_handler(self.on_success, details) - return ret diff --git a/src/py/flwr/common/backoff_test.py b/src/py/flwr/common/backoff_test.py deleted file mode 100644 index 4168f140425f..000000000000 --- a/src/py/flwr/common/backoff_test.py +++ /dev/null @@ -1,174 +0,0 @@ -# Copyright 2023 Flower Labs GmbH. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Tests for `SafeInvoker`.""" - - -from typing import Generator -from unittest.mock import MagicMock, Mock, patch - -import pytest - -from flwr.common.backoff import Retrier, constant - -# Assuming SafeInvoker and related utilities have been imported here... - - -def successful_function() -> str: - """.""" - return "success" - - -def failing_function() -> None: - """.""" - raise ValueError("failed") - - -@pytest.fixture(name="mock_time") -def fixture_mock_time() -> Generator[MagicMock, None, None]: - """Mock time.time for controlled testing.""" - with patch("time.time") as mock_time: - yield mock_time - - -@pytest.fixture(name="mock_sleep") -def fixture_mock_sleep() -> Generator[MagicMock, None, None]: - """Mock sleep to prevent actual waiting during testing.""" - with patch("time.sleep") as mock_sleep: - yield mock_sleep - - -def test_successful_invocation() -> None: - """Ensure successful function invocation.""" - # Prepare - success_handler = Mock() - backoff_handler = Mock() - giveup_handler = Mock() - invoker = Retrier( - constant(0.1), - ValueError, - on_success=success_handler, - on_backoff=backoff_handler, - on_giveup=giveup_handler, - ) - - # Execute - result = invoker.invoke(successful_function) - - # Assert - assert result == "success" - success_handler.assert_called_once() - backoff_handler.assert_not_called() - giveup_handler.assert_not_called() - - -def test_failure() -> None: - """Check termination when unexpected exception is raised.""" - # Prepare - # `constant([0.1])` generator will raise `StopIteration` after one iteration. - invoker = Retrier(constant(0.1), TypeError) - - # Execute and Assert - with pytest.raises(ValueError): - invoker.invoke(failing_function) - - -def test_failure_two_exceptions(mock_sleep: MagicMock) -> None: - """Verify one retry on a specified iterable of exceptions.""" - # Prepare - invoker = Retrier(constant(0.1), (TypeError, ValueError), max_tries=2, jitter=None) - - # Execute and Assert - with pytest.raises(ValueError): - invoker.invoke(failing_function) - mock_sleep.assert_called_once_with(0.1) - - -def test_backoff_on_failure(mock_sleep: MagicMock) -> None: - """Verify one retry on specified exception.""" - # Prepare - # `constant([0.1])` generator will raise `StopIteration` after one iteration. - invoker = Retrier(constant([0.1]), ValueError, jitter=None) - - # Execute and Assert - with pytest.raises(ValueError): - invoker.invoke(failing_function) - mock_sleep.assert_called_once_with(0.1) - - -def test_max_tries(mock_sleep: MagicMock) -> None: - """Check termination after `max_tries`.""" - # Prepare - # Disable `jitter` to ensure 0.1s wait time. - invoker = Retrier(constant(0.1), ValueError, max_tries=2, jitter=None) - - # Execute and Assert - with pytest.raises(ValueError): - invoker.invoke(failing_function) - # Assert 1 sleep call due to the max_tries being set to 2 - mock_sleep.assert_called_once_with(0.1) - - -def test_max_time(mock_time: MagicMock, mock_sleep: MagicMock) -> None: - """Check termination after `max_time`.""" - # Prepare - # Simulate the passage of time using mock - mock_time.side_effect = [ - 0.0, - 3.0, - ] - invoker = Retrier(constant(2), ValueError, max_time=2.5) - - # Execute and Assert - with pytest.raises(ValueError): - invoker.invoke(failing_function) - # Assert no wait because `max_time` is exceeded before the first retry. - mock_sleep.assert_not_called() - - -def test_event_handlers() -> None: - """Test `on_backoff` and `on_giveup` triggers.""" - # Prepare - success_handler = Mock() - backoff_handler = Mock() - giveup_handler = Mock() - invoker = Retrier( - constant(0.1), - ValueError, - max_tries=2, - on_success=success_handler, - on_backoff=backoff_handler, - on_giveup=giveup_handler, - ) - - # Execute and Assert - with pytest.raises(ValueError): - invoker.invoke(failing_function) - backoff_handler.assert_called_once() - giveup_handler.assert_called_once() - success_handler.assert_not_called() - - -def test_giveup_condition() -> None: - """Verify custom giveup termination.""" - - # Prepare - def should_give_up(exc: Exception) -> bool: - return isinstance(exc, ValueError) - - invoker = Retrier(constant(0.1), ValueError, giveup_condition=should_give_up) - - # Execute and Assert - with pytest.raises(ValueError): - invoker.invoke(failing_function) From 10afb01b8db1cf7fa2803ae837d63490473e5690 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Thu, 26 Oct 2023 19:33:27 +0100 Subject: [PATCH 10/13] add files --- src/py/flwr/common/retrier.py | 292 +++++++++++++++++++++++++++++ src/py/flwr/common/retrier_test.py | 181 ++++++++++++++++++ 2 files changed, 473 insertions(+) create mode 100644 src/py/flwr/common/retrier.py create mode 100644 src/py/flwr/common/retrier_test.py diff --git a/src/py/flwr/common/retrier.py b/src/py/flwr/common/retrier.py new file mode 100644 index 000000000000..e08402b30420 --- /dev/null +++ b/src/py/flwr/common/retrier.py @@ -0,0 +1,292 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""`Retrier` to augment other callables with error handling and retries.""" + + +import itertools +import random +import time +from dataclasses import dataclass +from typing import ( + Any, + Callable, + Dict, + Generator, + Iterable, + Optional, + Tuple, + Type, + Union, +) + + +def exponential( + base_delay: float = 1, + multiplier: float = 2, + max_delay: Optional[int] = None, +) -> Generator[float, None, None]: + """Wait time generator for exponential backoff strategy. + + Parameters + ---------- + base_delay: float (default: 1) + Initial delay duration before the first retry. + multiplier: float (default: 2) + Factor by which the delay is multiplied after each retry. + max_delay: Optional[float] (default: None) + The maximum delay duration between two consecutive retries. + """ + delay = base_delay if max_delay is None else min(base_delay, max_delay) + while True: + yield delay + delay *= multiplier + if max_delay is not None: + delay = min(delay, max_delay) + + +def constant( + interval: Union[float, Iterable[float]] = 1, +) -> Generator[float, None, None]: + """Wait time generator for specified intervals. + + Parameters + ---------- + interval: Union[float, Iterable[float]] (default: 1) + A constant value to yield or an iterable of such values. + """ + if not isinstance(interval, Iterable): + interval = itertools.repeat(interval) + yield from interval + + +def random_jitter(value: float) -> float: + """Jitter the value by a random number of milliseconds. + + This adds up to 1 second of additional time to the original value. + Prior to backoff version 1.2, this was the default jitter behavior. + + Parameters + ---------- + value : float + The original, unjittered backoff value. + """ + return value + random.random() + + +def full_jitter(value: float) -> float: + """Jitter the value across the full range (0 to value). + + This corresponds to the "Full Jitter" algorithm specified in the + AWS blog post on the performance of various jitter algorithms. + See: http://www.awsarchitectureblog.com/2015/03/backoff.html + + Parameters + ---------- + value : float + The original, unjittered backoff value. + """ + return random.uniform(0, value) + + +@dataclass +class RetryState: + """State for event handlers in Retrier.""" + + target: Callable[..., Any] + args: Tuple[Any, ...] + kwargs: Dict[str, Any] + tries: int + elapsed: float + exception: Optional[Exception] = None + wait: Optional[float] = None + + +# pylint: disable-next=too-many-instance-attributes +class Retrier: + """Wrapper class for retry (with backoff) triggered by exceptions. + + Parameters + ---------- + wait: Generator[float, None, None] + A generator yielding successive wait times in seconds. If the generator + is finite, the giveup event will be triggered when the generator raises + `StopIteration`. + exception: Union[Type[Exception], Tuple[Type[Exception]]] + An exception type (or tuple of types) that triggers backoff. + max_tries: Optional[int] + The maximum number of attempts to make before giving up. Once exhausted, + the exception will be allowed to escape. If set to None, there is no limit + to the number of tries. + max_time: Optional[float] + The maximum total amount of time to try before giving up. Once this time + has expired, this method won't be interrupted immediately, but the exception + will be allowed to escape. If set to None, there is no limit to the total time. + jitter: Optional[Callable[[float], float]] (default: full_jitter) + A function of the value yielded by `wait_gen` returning the actual time + to wait. This function helps distribute wait times stochastically to avoid + timing collisions across concurrent clients. Wait times are jittered by + default using the `full_jitter` function. To disable jittering, pass + `jitter=None`. + giveup_condition: Optional[Callable[[Exception], bool]] (default: None) + A function accepting an exception instance, returning whether or not + to give up prematurely before other give-up conditions are evaluated. + If set to None, the strategy is to never give up prematurely. + on_success: Optional[Callable[[RetryState], None]] (default: None) + A callable to be executed in the event of success. The parameter is a + data class object detailing the invocation. + on_backoff: Optional[Callable[[RetryState], None]] (default: None) + A callable to be executed in the event of a backoff. The parameter is a + data class object detailing the invocation. + on_giveup: Optional[Callable[[RetryState], None]] (default: None) + A callable to be executed in the event that `max_tries` or `max_time` is + exceeded, `giveup_condition` returns True, or `wait` generator raises + `StopInteration`. The parameter is a data class object detailing the + invocation. + + Examples + -------- + Initialize a `Retrier` with exponential backoff and call a function: + + >>> retrier = Retrier( + >>> exponential(), + >>> grpc.RpcError, + >>> max_tries=3, + >>> max_time=None, + >>> ) + >>> retrier.invoke(my_func, arg1, arg2, kw1=kwarg1) + """ + + def __init__( + self, + wait: Generator[float, None, None], + exception: Union[Type[Exception], Tuple[Type[Exception], ...]], + max_tries: Optional[int], + max_time: Optional[float], + *, + jitter: Optional[Callable[[float], float]] = full_jitter, + giveup_condition: Optional[Callable[[Exception], bool]] = None, + on_success: Optional[Callable[[RetryState], None]] = None, + on_backoff: Optional[Callable[[RetryState], None]] = None, + on_giveup: Optional[Callable[[RetryState], None]] = None, + ) -> None: + self.wait = wait + self.exception = exception + self.max_tries = max_tries + self.max_time = max_time + self.jitter = jitter + self.giveup_condition = giveup_condition + self.on_success = on_success + self.on_backoff = on_backoff + self.on_giveup = on_giveup + + def invoke( + self, + target: Callable[..., Any], + *args: Tuple[Any, ...], + **kwargs: Dict[str, Any], + ) -> Any: + """Safely invoke the provided callable with retry mechanisms. + + This method attempts to invoke the given callable, and in the event of + a specified exception, employs a retry mechanism that considers + wait times, jitter, maximum attempts, and maximum time. During the + retry process, various callbacks (`on_backoff`, `on_success`, and + `on_giveup`) can be triggered based on the outcome. + + Parameters + ---------- + target: Callable[..., Any] + The callable to be invoked. + *args: Tuple[Any, ...] + Positional arguments to pass to `target`. + **kwargs: Dict[str, Any] + Keyword arguments to pass to `target`. + + Returns + ------- + Any + The result of the given callable invocation. + + Raises + ------ + Exception + If the number of tries exceeds `max_tries`, if the total time + exceeds `max_time`, if `wait` generator raises `StopInteration`, + or if the `giveup_condition` returns True for a raised exception. + + Notes + ----- + The time between retries is determined by the provided `wait` generator + and can optionally be jittered using the `jitter` function. The exact + exceptions that trigger a retry, as well as conditions to stop retries, + are also determined by the class's initialization parameters. + """ + + def try_call_event_handler( + handler: Optional[Callable[[RetryState], None]], details: RetryState + ) -> None: + if handler is not None: + handler(details) + + tries = 0 + start = time.time() + + while True: + tries += 1 + elapsed = time.time() - start + details = RetryState( + target=target, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed + ) + + try: + ret = target(*args, **kwargs) + except self.exception as err: + # Check if giveup event should be triggered + max_tries_exceeded = tries == self.max_tries + max_time_exceeded = ( + self.max_time is not None and elapsed >= self.max_time + ) + + def giveup_check(_exception: Exception) -> bool: + if self.giveup_condition is None: + return False + return self.giveup_condition(_exception) + + if giveup_check(err) or max_tries_exceeded or max_time_exceeded: + # Trigger giveup event + try_call_event_handler(self.on_giveup, details) + raise + + try: + seconds = next(self.wait) + if self.jitter is not None: + seconds = self.jitter(seconds) + if self.max_time is not None: + seconds = min(seconds, self.max_time - elapsed) + details.wait = seconds + except StopIteration: + # Trigger giveup event + try_call_event_handler(self.on_giveup, details) + raise err from None + + # Trigger backoff event + try_call_event_handler(self.on_backoff, details) + + # Sleep + time.sleep(seconds) + else: + # Trigger success event + try_call_event_handler(self.on_success, details) + return ret diff --git a/src/py/flwr/common/retrier_test.py b/src/py/flwr/common/retrier_test.py new file mode 100644 index 000000000000..8e6752b2cc3c --- /dev/null +++ b/src/py/flwr/common/retrier_test.py @@ -0,0 +1,181 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for `Retrier`.""" + + +from typing import Generator +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from flwr.common.retrier import Retrier, constant + + +def successful_function() -> str: + """.""" + return "success" + + +def failing_function() -> None: + """.""" + raise ValueError("failed") + + +@pytest.fixture(name="mock_time") +def fixture_mock_time() -> Generator[MagicMock, None, None]: + """Mock time.time for controlled testing.""" + with patch("time.time") as mock_time: + yield mock_time + + +@pytest.fixture(name="mock_sleep") +def fixture_mock_sleep() -> Generator[MagicMock, None, None]: + """Mock sleep to prevent actual waiting during testing.""" + with patch("time.sleep") as mock_sleep: + yield mock_sleep + + +def test_successful_invocation() -> None: + """Ensure successful function invocation.""" + # Prepare + success_handler = Mock() + backoff_handler = Mock() + giveup_handler = Mock() + retrier = Retrier( + constant(0.1), + ValueError, + max_tries=None, + max_time=None, + on_success=success_handler, + on_backoff=backoff_handler, + on_giveup=giveup_handler, + ) + + # Execute + result = retrier.invoke(successful_function) + + # Assert + assert result == "success" + success_handler.assert_called_once() + backoff_handler.assert_not_called() + giveup_handler.assert_not_called() + + +def test_failure() -> None: + """Check termination when unexpected exception is raised.""" + # Prepare + # `constant([0.1])` generator will raise `StopIteration` after one iteration. + retrier = Retrier(constant(0.1), TypeError, None, None) + + # Execute and Assert + with pytest.raises(ValueError): + retrier.invoke(failing_function) + + +def test_failure_two_exceptions(mock_sleep: MagicMock) -> None: + """Verify one retry on a specified iterable of exceptions.""" + # Prepare + retrier = Retrier( + constant(0.1), (TypeError, ValueError), max_tries=2, max_time=None, jitter=None + ) + + # Execute and Assert + with pytest.raises(ValueError): + retrier.invoke(failing_function) + mock_sleep.assert_called_once_with(0.1) + + +def test_backoff_on_failure(mock_sleep: MagicMock) -> None: + """Verify one retry on specified exception.""" + # Prepare + # `constant([0.1])` generator will raise `StopIteration` after one iteration. + retrier = Retrier(constant([0.1]), ValueError, None, None, jitter=None) + + # Execute and Assert + with pytest.raises(ValueError): + retrier.invoke(failing_function) + mock_sleep.assert_called_once_with(0.1) + + +def test_max_tries(mock_sleep: MagicMock) -> None: + """Check termination after `max_tries`.""" + # Prepare + # Disable `jitter` to ensure 0.1s wait time. + retrier = Retrier( + constant(0.1), ValueError, max_tries=2, max_time=None, jitter=None + ) + + # Execute and Assert + with pytest.raises(ValueError): + retrier.invoke(failing_function) + # Assert 1 sleep call due to the max_tries being set to 2 + mock_sleep.assert_called_once_with(0.1) + + +def test_max_time(mock_time: MagicMock, mock_sleep: MagicMock) -> None: + """Check termination after `max_time`.""" + # Prepare + # Simulate the passage of time using mock + mock_time.side_effect = [ + 0.0, + 3.0, + ] + retrier = Retrier(constant(2), ValueError, max_tries=None, max_time=2.5) + + # Execute and Assert + with pytest.raises(ValueError): + retrier.invoke(failing_function) + # Assert no wait because `max_time` is exceeded before the first retry. + mock_sleep.assert_not_called() + + +def test_event_handlers() -> None: + """Test `on_backoff` and `on_giveup` triggers.""" + # Prepare + success_handler = Mock() + backoff_handler = Mock() + giveup_handler = Mock() + retrier = Retrier( + constant(0.1), + ValueError, + max_tries=2, + max_time=None, + on_success=success_handler, + on_backoff=backoff_handler, + on_giveup=giveup_handler, + ) + + # Execute and Assert + with pytest.raises(ValueError): + retrier.invoke(failing_function) + backoff_handler.assert_called_once() + giveup_handler.assert_called_once() + success_handler.assert_not_called() + + +def test_giveup_condition() -> None: + """Verify custom giveup termination.""" + + # Prepare + def should_give_up(exc: Exception) -> bool: + return isinstance(exc, ValueError) + + retrier = Retrier( + constant(0.1), ValueError, None, None, giveup_condition=should_give_up + ) + + # Execute and Assert + with pytest.raises(ValueError): + retrier.invoke(failing_function) From 21d8a0ae9652fc5eb6613c7ef5b1627b43472a71 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Fri, 27 Oct 2023 23:26:50 +0100 Subject: [PATCH 11/13] rename and update --- .../common/{retrier.py => retry_invoker.py} | 135 +++++++++--------- ...{retrier_test.py => retry_invoker_test.py} | 38 ++--- 2 files changed, 84 insertions(+), 89 deletions(-) rename src/py/flwr/common/{retrier.py => retry_invoker.py} (68%) rename src/py/flwr/common/{retrier_test.py => retry_invoker_test.py} (83%) diff --git a/src/py/flwr/common/retrier.py b/src/py/flwr/common/retry_invoker.py similarity index 68% rename from src/py/flwr/common/retrier.py rename to src/py/flwr/common/retry_invoker.py index e08402b30420..7096e891384a 100644 --- a/src/py/flwr/common/retrier.py +++ b/src/py/flwr/common/retry_invoker.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""`Retrier` to augment other callables with error handling and retries.""" +"""`RetryInvoker` to augment other callables with error handling and retries.""" import itertools @@ -25,10 +25,12 @@ Dict, Generator, Iterable, + List, Optional, Tuple, Type, Union, + cast, ) @@ -71,55 +73,41 @@ def constant( yield from interval -def random_jitter(value: float) -> float: - """Jitter the value by a random number of milliseconds. +def full_jitter(max_value: float) -> float: + """Randomize a float between 0 and the given maximum value. - This adds up to 1 second of additional time to the original value. - Prior to backoff version 1.2, this was the default jitter behavior. + This function implements the "Full Jitter" algorithm as described in the + AWS article discussing the efficacy of different jitter algorithms. + Reference: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ Parameters ---------- - value : float - The original, unjittered backoff value. + max_value : float + The upper limit for the randomized value. """ - return value + random.random() - - -def full_jitter(value: float) -> float: - """Jitter the value across the full range (0 to value). - - This corresponds to the "Full Jitter" algorithm specified in the - AWS blog post on the performance of various jitter algorithms. - See: http://www.awsarchitectureblog.com/2015/03/backoff.html - - Parameters - ---------- - value : float - The original, unjittered backoff value. - """ - return random.uniform(0, value) + return random.uniform(0, max_value) @dataclass class RetryState: - """State for event handlers in Retrier.""" + """State for callbacks in RetryInvoker.""" target: Callable[..., Any] args: Tuple[Any, ...] kwargs: Dict[str, Any] tries: int - elapsed: float + elapsed_time: float exception: Optional[Exception] = None - wait: Optional[float] = None + actual_wait: Optional[float] = None # pylint: disable-next=too-many-instance-attributes -class Retrier: +class RetryInvoker: """Wrapper class for retry (with backoff) triggered by exceptions. Parameters ---------- - wait: Generator[float, None, None] + wait_strategy: Generator[float, None, None] A generator yielding successive wait times in seconds. If the generator is finite, the giveup event will be triggered when the generator raises `StopIteration`. @@ -134,12 +122,12 @@ class Retrier: has expired, this method won't be interrupted immediately, but the exception will be allowed to escape. If set to None, there is no limit to the total time. jitter: Optional[Callable[[float], float]] (default: full_jitter) - A function of the value yielded by `wait_gen` returning the actual time + A function of the value yielded by `wait_strategy` returning the actual time to wait. This function helps distribute wait times stochastically to avoid timing collisions across concurrent clients. Wait times are jittered by default using the `full_jitter` function. To disable jittering, pass `jitter=None`. - giveup_condition: Optional[Callable[[Exception], bool]] (default: None) + should_giveup: Optional[Callable[[Exception], bool]] (default: None) A function accepting an exception instance, returning whether or not to give up prematurely before other give-up conditions are evaluated. If set to None, the strategy is to never give up prematurely. @@ -151,46 +139,47 @@ class Retrier: data class object detailing the invocation. on_giveup: Optional[Callable[[RetryState], None]] (default: None) A callable to be executed in the event that `max_tries` or `max_time` is - exceeded, `giveup_condition` returns True, or `wait` generator raises + exceeded, `should_giveup` returns True, or `wait_strategy` generator raises `StopInteration`. The parameter is a data class object detailing the invocation. Examples -------- - Initialize a `Retrier` with exponential backoff and call a function: + Initialize a `RetryInvoker` with exponential backoff and call a function: - >>> retrier = Retrier( + >>> invoker = RetryInvoker( >>> exponential(), >>> grpc.RpcError, >>> max_tries=3, >>> max_time=None, >>> ) - >>> retrier.invoke(my_func, arg1, arg2, kw1=kwarg1) + >>> invoker.invoke(my_func, arg1, arg2, kw1=kwarg1) """ def __init__( self, - wait: Generator[float, None, None], - exception: Union[Type[Exception], Tuple[Type[Exception], ...]], + wait_strategy: Generator[float, None, None], + recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]], max_tries: Optional[int], max_time: Optional[float], *, - jitter: Optional[Callable[[float], float]] = full_jitter, - giveup_condition: Optional[Callable[[Exception], bool]] = None, on_success: Optional[Callable[[RetryState], None]] = None, on_backoff: Optional[Callable[[RetryState], None]] = None, on_giveup: Optional[Callable[[RetryState], None]] = None, + jitter: Optional[Callable[[float], float]] = full_jitter, + should_giveup: Optional[Callable[[Exception], bool]] = None, ) -> None: - self.wait = wait - self.exception = exception + self.wait_strategy = wait_strategy + self.recoverable_exceptions = recoverable_exceptions self.max_tries = max_tries self.max_time = max_time - self.jitter = jitter - self.giveup_condition = giveup_condition self.on_success = on_success self.on_backoff = on_backoff self.on_giveup = on_giveup + self.jitter = jitter + self.should_giveup = should_giveup + # pylint: disable-next=too-many-locals def invoke( self, target: Callable[..., Any], @@ -200,7 +189,7 @@ def invoke( """Safely invoke the provided callable with retry mechanisms. This method attempts to invoke the given callable, and in the event of - a specified exception, employs a retry mechanism that considers + a recoverable exception, employs a retry mechanism that considers wait times, jitter, maximum attempts, and maximum time. During the retry process, various callbacks (`on_backoff`, `on_success`, and `on_giveup`) can be triggered based on the outcome. @@ -223,70 +212,76 @@ def invoke( ------ Exception If the number of tries exceeds `max_tries`, if the total time - exceeds `max_time`, if `wait` generator raises `StopInteration`, - or if the `giveup_condition` returns True for a raised exception. + exceeds `max_time`, if `wait_strategy` generator raises `StopInteration`, + or if the `should_giveup` returns True for a raised exception. Notes ----- - The time between retries is determined by the provided `wait` generator - and can optionally be jittered using the `jitter` function. The exact - exceptions that trigger a retry, as well as conditions to stop retries, - are also determined by the class's initialization parameters. + The time between retries is determined by the provided `wait_strategy` + generator and can optionally be jittered using the `jitter` function. + The recoverable exceptions that trigger a retry, as well as conditions to + stop retries, are also determined by the class's initialization parameters. """ def try_call_event_handler( - handler: Optional[Callable[[RetryState], None]], details: RetryState + handler: Optional[Callable[[RetryState], None]] ) -> None: if handler is not None: - handler(details) + handler(cast(RetryState, ref_state[0])) - tries = 0 + try_cnt = 0 start = time.time() + ref_state: List[Optional[RetryState]] = [None] while True: - tries += 1 - elapsed = time.time() - start - details = RetryState( - target=target, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed + try_cnt += 1 + elapsed_time = time.time() - start + state = RetryState( + target=target, + args=args, + kwargs=kwargs, + tries=try_cnt, + elapsed_time=elapsed_time, ) + ref_state[0] = state try: ret = target(*args, **kwargs) - except self.exception as err: + except self.recoverable_exceptions as err: # Check if giveup event should be triggered - max_tries_exceeded = tries == self.max_tries + max_tries_exceeded = try_cnt == self.max_tries max_time_exceeded = ( - self.max_time is not None and elapsed >= self.max_time + self.max_time is not None and elapsed_time >= self.max_time ) def giveup_check(_exception: Exception) -> bool: - if self.giveup_condition is None: + if self.should_giveup is None: return False - return self.giveup_condition(_exception) + return self.should_giveup(_exception) if giveup_check(err) or max_tries_exceeded or max_time_exceeded: # Trigger giveup event - try_call_event_handler(self.on_giveup, details) + try_call_event_handler(self.on_giveup) raise try: - seconds = next(self.wait) + wait_time = next(self.wait_strategy) if self.jitter is not None: - seconds = self.jitter(seconds) + wait_time = self.jitter(wait_time) if self.max_time is not None: - seconds = min(seconds, self.max_time - elapsed) - details.wait = seconds + wait_time = min(wait_time, self.max_time - elapsed_time) + state.actual_wait = wait_time except StopIteration: # Trigger giveup event - try_call_event_handler(self.on_giveup, details) + try_call_event_handler(self.on_giveup) raise err from None # Trigger backoff event - try_call_event_handler(self.on_backoff, details) + try_call_event_handler(self.on_backoff) # Sleep - time.sleep(seconds) + time.sleep(wait_time) else: # Trigger success event - try_call_event_handler(self.on_success, details) + try_call_event_handler(self.on_success) return ret diff --git a/src/py/flwr/common/retrier_test.py b/src/py/flwr/common/retry_invoker_test.py similarity index 83% rename from src/py/flwr/common/retrier_test.py rename to src/py/flwr/common/retry_invoker_test.py index 8e6752b2cc3c..5f6dab49ce1c 100644 --- a/src/py/flwr/common/retrier_test.py +++ b/src/py/flwr/common/retry_invoker_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""Tests for `Retrier`.""" +"""Tests for `RetryInvoker`.""" from typing import Generator @@ -20,7 +20,7 @@ import pytest -from flwr.common.retrier import Retrier, constant +from flwr.common.retry_invoker import RetryInvoker, constant def successful_function() -> str: @@ -53,7 +53,7 @@ def test_successful_invocation() -> None: success_handler = Mock() backoff_handler = Mock() giveup_handler = Mock() - retrier = Retrier( + invoker = RetryInvoker( constant(0.1), ValueError, max_tries=None, @@ -64,7 +64,7 @@ def test_successful_invocation() -> None: ) # Execute - result = retrier.invoke(successful_function) + result = invoker.invoke(successful_function) # Assert assert result == "success" @@ -77,23 +77,23 @@ def test_failure() -> None: """Check termination when unexpected exception is raised.""" # Prepare # `constant([0.1])` generator will raise `StopIteration` after one iteration. - retrier = Retrier(constant(0.1), TypeError, None, None) + invoker = RetryInvoker(constant(0.1), TypeError, None, None) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) def test_failure_two_exceptions(mock_sleep: MagicMock) -> None: """Verify one retry on a specified iterable of exceptions.""" # Prepare - retrier = Retrier( + invoker = RetryInvoker( constant(0.1), (TypeError, ValueError), max_tries=2, max_time=None, jitter=None ) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) mock_sleep.assert_called_once_with(0.1) @@ -101,11 +101,11 @@ def test_backoff_on_failure(mock_sleep: MagicMock) -> None: """Verify one retry on specified exception.""" # Prepare # `constant([0.1])` generator will raise `StopIteration` after one iteration. - retrier = Retrier(constant([0.1]), ValueError, None, None, jitter=None) + invoker = RetryInvoker(constant([0.1]), ValueError, None, None, jitter=None) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) mock_sleep.assert_called_once_with(0.1) @@ -113,13 +113,13 @@ def test_max_tries(mock_sleep: MagicMock) -> None: """Check termination after `max_tries`.""" # Prepare # Disable `jitter` to ensure 0.1s wait time. - retrier = Retrier( + invoker = RetryInvoker( constant(0.1), ValueError, max_tries=2, max_time=None, jitter=None ) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) # Assert 1 sleep call due to the max_tries being set to 2 mock_sleep.assert_called_once_with(0.1) @@ -132,11 +132,11 @@ def test_max_time(mock_time: MagicMock, mock_sleep: MagicMock) -> None: 0.0, 3.0, ] - retrier = Retrier(constant(2), ValueError, max_tries=None, max_time=2.5) + invoker = RetryInvoker(constant(2), ValueError, max_tries=None, max_time=2.5) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) # Assert no wait because `max_time` is exceeded before the first retry. mock_sleep.assert_not_called() @@ -147,7 +147,7 @@ def test_event_handlers() -> None: success_handler = Mock() backoff_handler = Mock() giveup_handler = Mock() - retrier = Retrier( + invoker = RetryInvoker( constant(0.1), ValueError, max_tries=2, @@ -159,7 +159,7 @@ def test_event_handlers() -> None: # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) backoff_handler.assert_called_once() giveup_handler.assert_called_once() success_handler.assert_not_called() @@ -172,10 +172,10 @@ def test_giveup_condition() -> None: def should_give_up(exc: Exception) -> bool: return isinstance(exc, ValueError) - retrier = Retrier( - constant(0.1), ValueError, None, None, giveup_condition=should_give_up + invoker = RetryInvoker( + constant(0.1), ValueError, None, None, should_giveup=should_give_up ) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) From 537885ececa11d60dd2392a832804798a3ef918d Mon Sep 17 00:00:00 2001 From: Heng Pan <134433891+panh99@users.noreply.github.com> Date: Mon, 30 Oct 2023 13:33:20 +0000 Subject: [PATCH 12/13] Update src/py/flwr/common/retry_invoker.py Co-authored-by: Daniel J. Beutel --- src/py/flwr/common/retry_invoker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/common/retry_invoker.py b/src/py/flwr/common/retry_invoker.py index 7096e891384a..7c72b5e7ca56 100644 --- a/src/py/flwr/common/retry_invoker.py +++ b/src/py/flwr/common/retry_invoker.py @@ -111,7 +111,7 @@ class RetryInvoker: A generator yielding successive wait times in seconds. If the generator is finite, the giveup event will be triggered when the generator raises `StopIteration`. - exception: Union[Type[Exception], Tuple[Type[Exception]]] + recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception]]] An exception type (or tuple of types) that triggers backoff. max_tries: Optional[int] The maximum number of attempts to make before giving up. Once exhausted, From 59ceace5a16f1fb3b783743ed0ed6897af4dc50e Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Mon, 30 Oct 2023 13:46:04 +0000 Subject: [PATCH 13/13] update docstring --- src/py/flwr/common/retry_invoker.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/py/flwr/common/retry_invoker.py b/src/py/flwr/common/retry_invoker.py index 7c72b5e7ca56..a7b512e5d1a9 100644 --- a/src/py/flwr/common/retry_invoker.py +++ b/src/py/flwr/common/retry_invoker.py @@ -121,16 +121,6 @@ class RetryInvoker: The maximum total amount of time to try before giving up. Once this time has expired, this method won't be interrupted immediately, but the exception will be allowed to escape. If set to None, there is no limit to the total time. - jitter: Optional[Callable[[float], float]] (default: full_jitter) - A function of the value yielded by `wait_strategy` returning the actual time - to wait. This function helps distribute wait times stochastically to avoid - timing collisions across concurrent clients. Wait times are jittered by - default using the `full_jitter` function. To disable jittering, pass - `jitter=None`. - should_giveup: Optional[Callable[[Exception], bool]] (default: None) - A function accepting an exception instance, returning whether or not - to give up prematurely before other give-up conditions are evaluated. - If set to None, the strategy is to never give up prematurely. on_success: Optional[Callable[[RetryState], None]] (default: None) A callable to be executed in the event of success. The parameter is a data class object detailing the invocation. @@ -142,6 +132,16 @@ class RetryInvoker: exceeded, `should_giveup` returns True, or `wait_strategy` generator raises `StopInteration`. The parameter is a data class object detailing the invocation. + jitter: Optional[Callable[[float], float]] (default: full_jitter) + A function of the value yielded by `wait_strategy` returning the actual time + to wait. This function helps distribute wait times stochastically to avoid + timing collisions across concurrent clients. Wait times are jittered by + default using the `full_jitter` function. To disable jittering, pass + `jitter=None`. + should_giveup: Optional[Callable[[Exception], bool]] (default: None) + A function accepting an exception instance, returning whether or not + to give up prematurely before other give-up conditions are evaluated. + If set to None, the strategy is to never give up prematurely. Examples --------