Skip to content

Commit

Permalink
rename and update
Browse files Browse the repository at this point in the history
  • Loading branch information
panh99 committed Oct 27, 2023
1 parent c71792b commit 21d8a0a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 89 deletions.
135 changes: 65 additions & 70 deletions src/py/flwr/common/retrier.py → src/py/flwr/common/retry_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""`Retrier` to augment other callables with error handling and retries."""
"""`RetryInvoker` to augment other callables with error handling and retries."""


import itertools
Expand All @@ -25,10 +25,12 @@
Dict,
Generator,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
cast,
)


Expand Down Expand Up @@ -71,55 +73,41 @@ def constant(
yield from interval


def random_jitter(value: float) -> float:
"""Jitter the value by a random number of milliseconds.
def full_jitter(max_value: float) -> float:
"""Randomize a float between 0 and the given maximum value.
This adds up to 1 second of additional time to the original value.
Prior to backoff version 1.2, this was the default jitter behavior.
This function implements the "Full Jitter" algorithm as described in the
AWS article discussing the efficacy of different jitter algorithms.
Reference: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
Parameters
----------
value : float
The original, unjittered backoff value.
max_value : float
The upper limit for the randomized value.
"""
return value + random.random()


def full_jitter(value: float) -> float:
"""Jitter the value across the full range (0 to value).
This corresponds to the "Full Jitter" algorithm specified in the
AWS blog post on the performance of various jitter algorithms.
See: http://www.awsarchitectureblog.com/2015/03/backoff.html
Parameters
----------
value : float
The original, unjittered backoff value.
"""
return random.uniform(0, value)
return random.uniform(0, max_value)


@dataclass
class RetryState:
"""State for event handlers in Retrier."""
"""State for callbacks in RetryInvoker."""

target: Callable[..., Any]
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
tries: int
elapsed: float
elapsed_time: float
exception: Optional[Exception] = None
wait: Optional[float] = None
actual_wait: Optional[float] = None


# pylint: disable-next=too-many-instance-attributes
class Retrier:
class RetryInvoker:
"""Wrapper class for retry (with backoff) triggered by exceptions.
Parameters
----------
wait: Generator[float, None, None]
wait_strategy: Generator[float, None, None]
A generator yielding successive wait times in seconds. If the generator
is finite, the giveup event will be triggered when the generator raises
`StopIteration`.
Expand All @@ -134,12 +122,12 @@ class Retrier:
has expired, this method won't be interrupted immediately, but the exception
will be allowed to escape. If set to None, there is no limit to the total time.
jitter: Optional[Callable[[float], float]] (default: full_jitter)
A function of the value yielded by `wait_gen` returning the actual time
A function of the value yielded by `wait_strategy` returning the actual time
to wait. This function helps distribute wait times stochastically to avoid
timing collisions across concurrent clients. Wait times are jittered by
default using the `full_jitter` function. To disable jittering, pass
`jitter=None`.
giveup_condition: Optional[Callable[[Exception], bool]] (default: None)
should_giveup: Optional[Callable[[Exception], bool]] (default: None)
A function accepting an exception instance, returning whether or not
to give up prematurely before other give-up conditions are evaluated.
If set to None, the strategy is to never give up prematurely.
Expand All @@ -151,46 +139,47 @@ class Retrier:
data class object detailing the invocation.
on_giveup: Optional[Callable[[RetryState], None]] (default: None)
A callable to be executed in the event that `max_tries` or `max_time` is
exceeded, `giveup_condition` returns True, or `wait` generator raises
exceeded, `should_giveup` returns True, or `wait_strategy` generator raises
`StopInteration`. The parameter is a data class object detailing the
invocation.
Examples
--------
Initialize a `Retrier` with exponential backoff and call a function:
Initialize a `RetryInvoker` with exponential backoff and call a function:
>>> retrier = Retrier(
>>> invoker = RetryInvoker(
>>> exponential(),
>>> grpc.RpcError,
>>> max_tries=3,
>>> max_time=None,
>>> )
>>> retrier.invoke(my_func, arg1, arg2, kw1=kwarg1)
>>> invoker.invoke(my_func, arg1, arg2, kw1=kwarg1)
"""

def __init__(
self,
wait: Generator[float, None, None],
exception: Union[Type[Exception], Tuple[Type[Exception], ...]],
wait_strategy: Generator[float, None, None],
recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]],
max_tries: Optional[int],
max_time: Optional[float],
*,
jitter: Optional[Callable[[float], float]] = full_jitter,
giveup_condition: Optional[Callable[[Exception], bool]] = None,
on_success: Optional[Callable[[RetryState], None]] = None,
on_backoff: Optional[Callable[[RetryState], None]] = None,
on_giveup: Optional[Callable[[RetryState], None]] = None,
jitter: Optional[Callable[[float], float]] = full_jitter,
should_giveup: Optional[Callable[[Exception], bool]] = None,
) -> None:
self.wait = wait
self.exception = exception
self.wait_strategy = wait_strategy
self.recoverable_exceptions = recoverable_exceptions
self.max_tries = max_tries
self.max_time = max_time
self.jitter = jitter
self.giveup_condition = giveup_condition
self.on_success = on_success
self.on_backoff = on_backoff
self.on_giveup = on_giveup
self.jitter = jitter
self.should_giveup = should_giveup

# pylint: disable-next=too-many-locals
def invoke(
self,
target: Callable[..., Any],
Expand All @@ -200,7 +189,7 @@ def invoke(
"""Safely invoke the provided callable with retry mechanisms.
This method attempts to invoke the given callable, and in the event of
a specified exception, employs a retry mechanism that considers
a recoverable exception, employs a retry mechanism that considers
wait times, jitter, maximum attempts, and maximum time. During the
retry process, various callbacks (`on_backoff`, `on_success`, and
`on_giveup`) can be triggered based on the outcome.
Expand All @@ -223,70 +212,76 @@ def invoke(
------
Exception
If the number of tries exceeds `max_tries`, if the total time
exceeds `max_time`, if `wait` generator raises `StopInteration`,
or if the `giveup_condition` returns True for a raised exception.
exceeds `max_time`, if `wait_strategy` generator raises `StopInteration`,
or if the `should_giveup` returns True for a raised exception.
Notes
-----
The time between retries is determined by the provided `wait` generator
and can optionally be jittered using the `jitter` function. The exact
exceptions that trigger a retry, as well as conditions to stop retries,
are also determined by the class's initialization parameters.
The time between retries is determined by the provided `wait_strategy`
generator and can optionally be jittered using the `jitter` function.
The recoverable exceptions that trigger a retry, as well as conditions to
stop retries, are also determined by the class's initialization parameters.
"""

def try_call_event_handler(
handler: Optional[Callable[[RetryState], None]], details: RetryState
handler: Optional[Callable[[RetryState], None]]
) -> None:
if handler is not None:
handler(details)
handler(cast(RetryState, ref_state[0]))

tries = 0
try_cnt = 0
start = time.time()
ref_state: List[Optional[RetryState]] = [None]

while True:
tries += 1
elapsed = time.time() - start
details = RetryState(
target=target, args=args, kwargs=kwargs, tries=tries, elapsed=elapsed
try_cnt += 1
elapsed_time = time.time() - start
state = RetryState(
target=target,
args=args,
kwargs=kwargs,
tries=try_cnt,
elapsed_time=elapsed_time,
)
ref_state[0] = state

try:
ret = target(*args, **kwargs)
except self.exception as err:
except self.recoverable_exceptions as err:
# Check if giveup event should be triggered
max_tries_exceeded = tries == self.max_tries
max_tries_exceeded = try_cnt == self.max_tries
max_time_exceeded = (
self.max_time is not None and elapsed >= self.max_time
self.max_time is not None and elapsed_time >= self.max_time
)

def giveup_check(_exception: Exception) -> bool:
if self.giveup_condition is None:
if self.should_giveup is None:
return False
return self.giveup_condition(_exception)
return self.should_giveup(_exception)

if giveup_check(err) or max_tries_exceeded or max_time_exceeded:
# Trigger giveup event
try_call_event_handler(self.on_giveup, details)
try_call_event_handler(self.on_giveup)
raise

try:
seconds = next(self.wait)
wait_time = next(self.wait_strategy)
if self.jitter is not None:
seconds = self.jitter(seconds)
wait_time = self.jitter(wait_time)
if self.max_time is not None:
seconds = min(seconds, self.max_time - elapsed)
details.wait = seconds
wait_time = min(wait_time, self.max_time - elapsed_time)
state.actual_wait = wait_time
except StopIteration:
# Trigger giveup event
try_call_event_handler(self.on_giveup, details)
try_call_event_handler(self.on_giveup)
raise err from None

# Trigger backoff event
try_call_event_handler(self.on_backoff, details)
try_call_event_handler(self.on_backoff)

# Sleep
time.sleep(seconds)
time.sleep(wait_time)
else:
# Trigger success event
try_call_event_handler(self.on_success, details)
try_call_event_handler(self.on_success)
return ret
Loading

0 comments on commit 21d8a0a

Please sign in to comment.