From 21d8a0ae9652fc5eb6613c7ef5b1627b43472a71 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Fri, 27 Oct 2023 23:26:50 +0100 Subject: [PATCH] rename and update --- .../common/{retrier.py => retry_invoker.py} | 135 +++++++++--------- ...{retrier_test.py => retry_invoker_test.py} | 38 ++--- 2 files changed, 84 insertions(+), 89 deletions(-) rename src/py/flwr/common/{retrier.py => retry_invoker.py} (68%) rename src/py/flwr/common/{retrier_test.py => retry_invoker_test.py} (83%) diff --git a/src/py/flwr/common/retrier.py b/src/py/flwr/common/retry_invoker.py similarity index 68% rename from src/py/flwr/common/retrier.py rename to src/py/flwr/common/retry_invoker.py index e08402b30420..7096e891384a 100644 --- a/src/py/flwr/common/retrier.py +++ b/src/py/flwr/common/retry_invoker.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""`Retrier` to augment other callables with error handling and retries.""" +"""`RetryInvoker` to augment other callables with error handling and retries.""" import itertools @@ -25,10 +25,12 @@ Dict, Generator, Iterable, + List, Optional, Tuple, Type, Union, + cast, ) @@ -71,55 +73,41 @@ def constant( yield from interval -def random_jitter(value: float) -> float: - """Jitter the value by a random number of milliseconds. +def full_jitter(max_value: float) -> float: + """Randomize a float between 0 and the given maximum value. - This adds up to 1 second of additional time to the original value. - Prior to backoff version 1.2, this was the default jitter behavior. + This function implements the "Full Jitter" algorithm as described in the + AWS article discussing the efficacy of different jitter algorithms. + Reference: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ Parameters ---------- - value : float - The original, unjittered backoff value. + max_value : float + The upper limit for the randomized value. """ - return value + random.random() - - -def full_jitter(value: float) -> float: - """Jitter the value across the full range (0 to value). - - This corresponds to the "Full Jitter" algorithm specified in the - AWS blog post on the performance of various jitter algorithms. - See: http://www.awsarchitectureblog.com/2015/03/backoff.html - - Parameters - ---------- - value : float - The original, unjittered backoff value. - """ - return random.uniform(0, value) + return random.uniform(0, max_value) @dataclass class RetryState: - """State for event handlers in Retrier.""" + """State for callbacks in RetryInvoker.""" target: Callable[..., Any] args: Tuple[Any, ...] kwargs: Dict[str, Any] tries: int - elapsed: float + elapsed_time: float exception: Optional[Exception] = None - wait: Optional[float] = None + actual_wait: Optional[float] = None # pylint: disable-next=too-many-instance-attributes -class Retrier: +class RetryInvoker: """Wrapper class for retry (with backoff) triggered by exceptions. Parameters ---------- - wait: Generator[float, None, None] + wait_strategy: Generator[float, None, None] A generator yielding successive wait times in seconds. If the generator is finite, the giveup event will be triggered when the generator raises `StopIteration`. @@ -134,12 +122,12 @@ class Retrier: has expired, this method won't be interrupted immediately, but the exception will be allowed to escape. If set to None, there is no limit to the total time. jitter: Optional[Callable[[float], float]] (default: full_jitter) - A function of the value yielded by `wait_gen` returning the actual time + A function of the value yielded by `wait_strategy` returning the actual time to wait. This function helps distribute wait times stochastically to avoid timing collisions across concurrent clients. Wait times are jittered by default using the `full_jitter` function. To disable jittering, pass `jitter=None`. - giveup_condition: Optional[Callable[[Exception], bool]] (default: None) + should_giveup: Optional[Callable[[Exception], bool]] (default: None) A function accepting an exception instance, returning whether or not to give up prematurely before other give-up conditions are evaluated. If set to None, the strategy is to never give up prematurely. @@ -151,46 +139,47 @@ class Retrier: data class object detailing the invocation. on_giveup: Optional[Callable[[RetryState], None]] (default: None) A callable to be executed in the event that `max_tries` or `max_time` is - exceeded, `giveup_condition` returns True, or `wait` generator raises + exceeded, `should_giveup` returns True, or `wait_strategy` generator raises `StopInteration`. The parameter is a data class object detailing the invocation. Examples -------- - Initialize a `Retrier` with exponential backoff and call a function: + Initialize a `RetryInvoker` with exponential backoff and call a function: - >>> retrier = Retrier( + >>> invoker = RetryInvoker( >>> exponential(), >>> grpc.RpcError, >>> max_tries=3, >>> max_time=None, >>> ) - >>> retrier.invoke(my_func, arg1, arg2, kw1=kwarg1) + >>> invoker.invoke(my_func, arg1, arg2, kw1=kwarg1) """ def __init__( self, - wait: Generator[float, None, None], - exception: Union[Type[Exception], Tuple[Type[Exception], ...]], + wait_strategy: Generator[float, None, None], + recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]], max_tries: Optional[int], max_time: Optional[float], *, - jitter: Optional[Callable[[float], float]] = full_jitter, - giveup_condition: Optional[Callable[[Exception], bool]] = None, on_success: Optional[Callable[[RetryState], None]] = None, on_backoff: Optional[Callable[[RetryState], None]] = None, on_giveup: Optional[Callable[[RetryState], None]] = None, + jitter: Optional[Callable[[float], float]] = full_jitter, + should_giveup: Optional[Callable[[Exception], bool]] = None, ) -> None: - self.wait = wait - self.exception = exception + self.wait_strategy = wait_strategy + self.recoverable_exceptions = recoverable_exceptions self.max_tries = max_tries self.max_time = max_time - self.jitter = jitter - self.giveup_condition = giveup_condition self.on_success = on_success self.on_backoff = on_backoff self.on_giveup = on_giveup + self.jitter = jitter + self.should_giveup = should_giveup + # pylint: disable-next=too-many-locals def invoke( self, target: Callable[..., Any], @@ -200,7 +189,7 @@ def invoke( """Safely invoke the provided callable with retry mechanisms. This method attempts to invoke the given callable, and in the event of - a specified exception, employs a retry mechanism that considers + a recoverable exception, employs a retry mechanism that considers wait times, jitter, maximum attempts, and maximum time. During the retry process, various callbacks (`on_backoff`, `on_success`, and `on_giveup`) can be triggered based on the outcome. @@ -223,70 +212,76 @@ def invoke( ------ Exception If the number of tries exceeds `max_tries`, if the total time - exceeds `max_time`, if `wait` generator raises `StopInteration`, - or if the `giveup_condition` returns True for a raised exception. + exceeds `max_time`, if `wait_strategy` generator raises `StopInteration`, + or if the `should_giveup` returns True for a raised exception. Notes ----- - The time between retries is determined by the provided `wait` generator - and can optionally be jittered using the `jitter` function. The exact - exceptions that trigger a retry, as well as conditions to stop retries, - are also determined by the class's initialization parameters. + The time between retries is determined by the provided `wait_strategy` + generator and can optionally be jittered using the `jitter` function. + The recoverable exceptions that trigger a retry, as well as conditions to + stop retries, are also determined by the class's initialization parameters. """ def try_call_event_handler( - handler: Optional[Callable[[RetryState], None]], details: RetryState + handler: Optional[Callable[[RetryState], None]] ) -> None: if handler is not None: - handler(details) + handler(cast(RetryState, ref_state[0])) - tries = 0 + try_cnt = 0 start = time.time() + ref_state: List[Optional[RetryState]] = [None] while True: - tries += 1 - elapsed = time.time() - start - details = RetryState( - target=target, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed + try_cnt += 1 + elapsed_time = time.time() - start + state = RetryState( + target=target, + args=args, + kwargs=kwargs, + tries=try_cnt, + elapsed_time=elapsed_time, ) + ref_state[0] = state try: ret = target(*args, **kwargs) - except self.exception as err: + except self.recoverable_exceptions as err: # Check if giveup event should be triggered - max_tries_exceeded = tries == self.max_tries + max_tries_exceeded = try_cnt == self.max_tries max_time_exceeded = ( - self.max_time is not None and elapsed >= self.max_time + self.max_time is not None and elapsed_time >= self.max_time ) def giveup_check(_exception: Exception) -> bool: - if self.giveup_condition is None: + if self.should_giveup is None: return False - return self.giveup_condition(_exception) + return self.should_giveup(_exception) if giveup_check(err) or max_tries_exceeded or max_time_exceeded: # Trigger giveup event - try_call_event_handler(self.on_giveup, details) + try_call_event_handler(self.on_giveup) raise try: - seconds = next(self.wait) + wait_time = next(self.wait_strategy) if self.jitter is not None: - seconds = self.jitter(seconds) + wait_time = self.jitter(wait_time) if self.max_time is not None: - seconds = min(seconds, self.max_time - elapsed) - details.wait = seconds + wait_time = min(wait_time, self.max_time - elapsed_time) + state.actual_wait = wait_time except StopIteration: # Trigger giveup event - try_call_event_handler(self.on_giveup, details) + try_call_event_handler(self.on_giveup) raise err from None # Trigger backoff event - try_call_event_handler(self.on_backoff, details) + try_call_event_handler(self.on_backoff) # Sleep - time.sleep(seconds) + time.sleep(wait_time) else: # Trigger success event - try_call_event_handler(self.on_success, details) + try_call_event_handler(self.on_success) return ret diff --git a/src/py/flwr/common/retrier_test.py b/src/py/flwr/common/retry_invoker_test.py similarity index 83% rename from src/py/flwr/common/retrier_test.py rename to src/py/flwr/common/retry_invoker_test.py index 8e6752b2cc3c..5f6dab49ce1c 100644 --- a/src/py/flwr/common/retrier_test.py +++ b/src/py/flwr/common/retry_invoker_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""Tests for `Retrier`.""" +"""Tests for `RetryInvoker`.""" from typing import Generator @@ -20,7 +20,7 @@ import pytest -from flwr.common.retrier import Retrier, constant +from flwr.common.retry_invoker import RetryInvoker, constant def successful_function() -> str: @@ -53,7 +53,7 @@ def test_successful_invocation() -> None: success_handler = Mock() backoff_handler = Mock() giveup_handler = Mock() - retrier = Retrier( + invoker = RetryInvoker( constant(0.1), ValueError, max_tries=None, @@ -64,7 +64,7 @@ def test_successful_invocation() -> None: ) # Execute - result = retrier.invoke(successful_function) + result = invoker.invoke(successful_function) # Assert assert result == "success" @@ -77,23 +77,23 @@ def test_failure() -> None: """Check termination when unexpected exception is raised.""" # Prepare # `constant([0.1])` generator will raise `StopIteration` after one iteration. - retrier = Retrier(constant(0.1), TypeError, None, None) + invoker = RetryInvoker(constant(0.1), TypeError, None, None) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) def test_failure_two_exceptions(mock_sleep: MagicMock) -> None: """Verify one retry on a specified iterable of exceptions.""" # Prepare - retrier = Retrier( + invoker = RetryInvoker( constant(0.1), (TypeError, ValueError), max_tries=2, max_time=None, jitter=None ) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) mock_sleep.assert_called_once_with(0.1) @@ -101,11 +101,11 @@ def test_backoff_on_failure(mock_sleep: MagicMock) -> None: """Verify one retry on specified exception.""" # Prepare # `constant([0.1])` generator will raise `StopIteration` after one iteration. - retrier = Retrier(constant([0.1]), ValueError, None, None, jitter=None) + invoker = RetryInvoker(constant([0.1]), ValueError, None, None, jitter=None) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) mock_sleep.assert_called_once_with(0.1) @@ -113,13 +113,13 @@ def test_max_tries(mock_sleep: MagicMock) -> None: """Check termination after `max_tries`.""" # Prepare # Disable `jitter` to ensure 0.1s wait time. - retrier = Retrier( + invoker = RetryInvoker( constant(0.1), ValueError, max_tries=2, max_time=None, jitter=None ) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) # Assert 1 sleep call due to the max_tries being set to 2 mock_sleep.assert_called_once_with(0.1) @@ -132,11 +132,11 @@ def test_max_time(mock_time: MagicMock, mock_sleep: MagicMock) -> None: 0.0, 3.0, ] - retrier = Retrier(constant(2), ValueError, max_tries=None, max_time=2.5) + invoker = RetryInvoker(constant(2), ValueError, max_tries=None, max_time=2.5) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) # Assert no wait because `max_time` is exceeded before the first retry. mock_sleep.assert_not_called() @@ -147,7 +147,7 @@ def test_event_handlers() -> None: success_handler = Mock() backoff_handler = Mock() giveup_handler = Mock() - retrier = Retrier( + invoker = RetryInvoker( constant(0.1), ValueError, max_tries=2, @@ -159,7 +159,7 @@ def test_event_handlers() -> None: # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function) backoff_handler.assert_called_once() giveup_handler.assert_called_once() success_handler.assert_not_called() @@ -172,10 +172,10 @@ def test_giveup_condition() -> None: def should_give_up(exc: Exception) -> bool: return isinstance(exc, ValueError) - retrier = Retrier( - constant(0.1), ValueError, None, None, giveup_condition=should_give_up + invoker = RetryInvoker( + constant(0.1), ValueError, None, None, should_giveup=should_give_up ) # Execute and Assert with pytest.raises(ValueError): - retrier.invoke(failing_function) + invoker.invoke(failing_function)