Skip to content

Commit

Permalink
retry: Add StandardRetryPolicy and standard_policy mod
Browse files Browse the repository at this point in the history
This PR adds a new `standard_policy` module within the retry module
that provides a batteries included policy to be used with the retry
middleware. The policy combines the `Budget` type and generic backoff
utlities from the `backoff` module to provide an easy to use policy
with good defaults.

This PR also includes a `StandardRetryPolicyBuilder` as well as two
new traits `IsRetryable` and `CloneRequest`. These each have blanket
impls for closures. The reason that this implementation breaks these
out into two different traits is to allow `tower-http` to provide
a custom `CloneRequest` implementation that will be able to clone
some sort of `ReplayBody` and let the user pass in the retry decision
implementation.

Ref #682
  • Loading branch information
LucioFranco committed Oct 7, 2022
1 parent c5632a2 commit 70544a2
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 6 deletions.
12 changes: 7 additions & 5 deletions tower/src/retry/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait MakeBackoff {
type Backoff: Backoff;

/// Constructs a new backoff type.
fn make_backoff(&mut self) -> Self::Backoff;
fn make_backoff(&self) -> Self::Backoff;
}

/// A backoff trait where a single mutable reference represents a single
Expand Down Expand Up @@ -120,7 +120,7 @@ where
{
type Backoff = ExponentialBackoff<R>;

fn make_backoff(&mut self) -> Self::Backoff {
fn make_backoff(&self) -> Self::Backoff {
ExponentialBackoff {
max: self.max,
min: self.min,
Expand Down Expand Up @@ -179,6 +179,8 @@ where

self.iterations += 1;

tracing::trace!(next_backoff_ms = %next.as_millis(), "Next backoff");

tokio::time::sleep(next)
}
}
Expand Down Expand Up @@ -217,7 +219,7 @@ mod tests {
let min = time::Duration::from_millis(min_ms);
let max = time::Duration::from_millis(max_ms);
let rng = HasherRng::default();
let mut backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) {
let backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) {
Err(_) => return TestResult::discard(),
Ok(backoff) => backoff,
};
Expand All @@ -231,7 +233,7 @@ mod tests {
let min = time::Duration::from_millis(min_ms);
let max = time::Duration::from_millis(max_ms);
let rng = HasherRng::default();
let mut backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) {
let backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) {
Err(_) => return TestResult::discard(),
Ok(backoff) => backoff,
};
Expand All @@ -246,7 +248,7 @@ mod tests {
let base = time::Duration::from_millis(base_ms);
let max = time::Duration::from_millis(max_ms);
let rng = HasherRng::default();
let mut backoff = match ExponentialBackoffMaker::new(base, max, jitter, rng) {
let backoff = match ExponentialBackoffMaker::new(base, max, jitter, rng) {
Err(_) => return TestResult::discard(),
Ok(backoff) => backoff,
};
Expand Down
14 changes: 14 additions & 0 deletions tower/src/retry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
//! Middleware for retrying "failed" requests.
//!
//! # Batteries included features
//!
//! The [`standard_policy`] module contains a default retry [`Policy`] that can
//! be used with the [`Retry`] middleware. For more information check the module
//! docs for [`standard_policy`].
//!
//! The [`backoff`] module contains backoff utlities that can be used in a
//! custom [`Policy`].
//!
//! The [`budget`] module contains utilities to reduce the amount of concurrent
//! retries made by a tower middleware stack. The goal for this is to reduce
//! congestive collapse when downstream systems degrade.
pub mod backoff;
pub mod budget;
pub mod future;
mod layer;
mod policy;
pub mod standard_policy;

pub use self::layer::RetryLayer;
pub use self::policy::Policy;
Expand Down
297 changes: 297 additions & 0 deletions tower/src/retry/standard_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
//! A standard retry policy that combines many of the `retry` module's utilities
//! together in a production ready retry policy.
//!
//! # Defaults
//!
//! - [`ExponentialBackoffMaker`] is the default type for `B`.
//! - [`Budget`]'s default implementation is used.
//! - [`IsRetryable`] by default will always return `false`.
//! - [`CloneRequest`] by default will always return `None`.
//!
//! # Backoff
//!
//! The [`StandardRetryPolicy`] takes some [`MakeBackoff`] and will make a
//! [`Backoff`] for each request "session" (a request session is the initial
//! request and any subsequent requests). It will then supply the backoff future
//! to the retry middlware. Usually, this future is the [`tokio::time::Sleep`]
//! type.
//!
//! # Budget
//!
//! The [`StandardRetryPolicy`] uses the [`Budget`] type to ensure that for each
//! produced policy that this client will not overwhelm downstream services.
//! Check the docs of [`Budget`] to understand what the defaults are and why they
//! were chosen.
//!
//! # Example
//!
//! ```
//! let policy = StandardRetryPolicy::<(), (), ()>::builder()
//! .should_retry(|res: &mut Result<(), ()>| true)
//! .clone_request(|req: &()| *req)
//! .budget(Budget::default())
//! .build();
//! ```
use std::{fmt, marker::PhantomData, sync::Arc};

use super::{
backoff::{Backoff, ExponentialBackoffMaker, MakeBackoff},
budget::Budget,
};
use crate::retry::Policy;

/// A trait to determine if the request associated with the response should be
/// retried by [`StandardRetryPolicy`].
///
/// # Closure
///
/// This trait provides a blanket implementation for a closure of the type
/// `Fn(&mut Result<Res, E>) -> bool + Send + Sync + 'static`.
pub trait IsRetryable<Res, E>: Send + Sync + 'static {
/// Return `true` if the request associated with the response should be
/// retried and `false` if it should not be retried.
fn is_retryalbe(&self, response: &mut Result<Res, E>) -> bool;
}

/// A trait to clone a request for the [`StandardRetryPolicy`].
///
/// # Closure
///
/// This trait provides a blanket implementation for a closure of the type
/// `Fn(&Req) -> Option<Req> + Send + Sync + 'static`.
pub trait CloneRequest<Req>: Send + Sync + 'static {
/// Clone a request, if `None` is returned the request will not be retried.
fn clone_request(&self, request: &Req) -> Option<Req>;
}

impl<Res, E, F> IsRetryable<Res, E> for F
where
F: Fn(&mut Result<Res, E>) -> bool + Send + Sync + 'static,
{
fn is_retryalbe(&self, response: &mut Result<Res, E>) -> bool {
(self)(response)
}
}

impl<Req, F> CloneRequest<Req> for F
where
F: Fn(&Req) -> Option<Req> + Send + Sync + 'static,
{
fn clone_request(&self, request: &Req) -> Option<Req> {
(self)(request)
}
}

/// A standard retry [`Policy`] that combines a retry budget and a backoff
/// mechanism to produce a safe retry policy that prevents congestive collapse
/// and retry storms.
///
/// This type is constructed with the [`StandardRetryPolicyBuilder`].
pub struct StandardRetryPolicy<Req, Res, E, B = ExponentialBackoffMaker>
where
B: MakeBackoff,
{
is_retryable: Arc<dyn IsRetryable<Res, E>>,
clone_request: Arc<dyn CloneRequest<Req>>,
budget: Arc<Budget>,
make_backoff: B,
current_backoff: B::Backoff,
_pd: PhantomData<fn(Req, Res, E)>,
}

/// Builder for [`StandardRetryPolicy`].
///
/// This type can constructed from the `StandardRetryPolicy::builder` function.
pub struct StandardRetryPolicyBuilder<Req, Res, E, B = ExponentialBackoffMaker> {
is_retryable: Arc<dyn IsRetryable<Res, E>>,
clone_request: Arc<dyn CloneRequest<Req>>,
make_backoff: B,
budget: Arc<Budget>,
}

impl<Req, Res, E, B: MakeBackoff> StandardRetryPolicyBuilder<Req, Res, E, B> {
/// Sets the retry decision maker.
///
/// # Default
///
/// By default, this will be set to an [`IsRetryable`] implementation that
/// always returns false and thus will not retry requests.
///
/// # Example
///
/// ```
/// # use tower::retry::standard_policy::StandardRetryPolicy;
/// // Set the Req, Res, and E type to () for simplicity, replace these with
/// // your specific request/response/error types.
/// StandardRetryPolicy::<(), (), ()>::builder()
/// .should_retry(|res: &mut Result<(), ()>| true)
/// .build();
/// ```
pub fn should_retry(mut self, f: impl IsRetryable<Res, E> + 'static) -> Self {
self.is_retryable = Arc::new(f);
self
}

/// Sets the clone request handler.
///
/// # Default
///
/// By default, this will be set to a [`CloneRequest`] implementation that
/// will never clone the request and will always return `None`.
///
/// # Example
///
/// ```
/// # use tower::retry::standard_policy::StandardRetryPolicy;
/// // Set the Req, Res, and E type to () for simplicity, replace these with
/// // your specific request/response/error types.
/// StandardRetryPolicy::<(), (), ()>::builder()
/// .clone_request(|req: &()| Some(*req))
/// .build();
/// ```
pub fn clone_request(mut self, f: impl CloneRequest<Req> + 'static) -> Self {
self.clone_request = Arc::new(f);
self
}

/// Sets the backoff maker.
///
/// # Default
///
/// By default, this will be set to [`ExponentialBackoffMaker`]'s default
/// implementation.
///
/// # Example
///
/// ```
/// # use tower::retry::standard_policy::StandardRetryPolicy;
/// # use tower::retry::backoff::ExponentialBackoffMaker;
/// // Set the Req, Res, and E type to () for simplicity, replace these with
/// // your specific request/response/error types.
/// StandardRetryPolicy::<(), (), ()>::builder()
/// .make_backoff(ExponentialBackoffMaker::default())
/// .build();
/// ```
pub fn make_backoff<B2>(self, backoff: B2) -> StandardRetryPolicyBuilder<Req, Res, E, B2> {
StandardRetryPolicyBuilder {
make_backoff: backoff,
is_retryable: self.is_retryable,
clone_request: self.clone_request,
budget: self.budget,
}
}

/// Sets the budget.
pub fn budget(mut self, budget: impl Into<Arc<Budget>>) -> Self {
self.budget = budget.into();
self
}

/// Consume this builder and produce a [`StandardRetryPolicy`] with this
/// builders configured settings.
///
/// # Default
///
/// By default, this will be set to `Budget::default()`.
///
/// # Example
///
/// ```
/// # use tower::retry::standard_policy::StandardRetryPolicy;
/// // Set the Req, Res, and E type to () for simplicity, replace these with
/// // your specific request/response/error types.
/// StandardRetryPolicy::<(), (), ()>::builder()
/// .budget(Budget::default())
/// .build();
/// ```
pub fn build(self) -> StandardRetryPolicy<Req, Res, E, B> {
let current_backoff = self.make_backoff.make_backoff();

StandardRetryPolicy {
is_retryable: self.is_retryable,
clone_request: self.clone_request,
make_backoff: self.make_backoff,
current_backoff,
budget: self.budget,
_pd: PhantomData,
}
}
}

impl<Req, Res, E> StandardRetryPolicy<Req, Res, E, ExponentialBackoffMaker> {
/// Create a [`StandardRetryPolicyBuilder`].
pub fn builder() -> StandardRetryPolicyBuilder<Req, Res, E> {
StandardRetryPolicyBuilder {
is_retryable: Arc::new(|_: &mut Result<Res, E>| false),
clone_request: Arc::new(|_: &Req| None),
make_backoff: ExponentialBackoffMaker::default(),
budget: Arc::new(Budget::default()),
}
}
}

impl<Req, Res, E, B> Policy<Req, Res, E> for StandardRetryPolicy<Req, Res, E, B>
where
B: MakeBackoff,
Req: 'static,
Res: 'static,
E: 'static,
{
type Future = <B::Backoff as Backoff>::Future;

fn retry(&mut self, _req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
let can_retry = self.is_retryable.is_retryalbe(result);

if !can_retry {
tracing::trace!("Recived non-retryable response");
self.budget.deposit();
return None;
}

let can_withdraw = self.budget.withdraw().is_ok();

if !can_withdraw {
tracing::trace!("Unable to withdraw from budget");
return None;
}

tracing::trace!("Withdrew from retry budget");

Some(self.current_backoff.next_backoff())
}

fn clone_request(&mut self, req: &Req) -> Option<Req> {
self.clone_request.clone_request(req)
}
}

impl<Req, Res, E, B> Clone for StandardRetryPolicy<Req, Res, E, B>
where
B: MakeBackoff + Clone,
{
fn clone(&self) -> Self {
Self {
is_retryable: self.is_retryable.clone(),
clone_request: self.clone_request.clone(),
budget: self.budget.clone(),
make_backoff: self.make_backoff.clone(),
current_backoff: self.make_backoff.make_backoff(),
_pd: PhantomData,
}
}
}

impl<Req, Res, E, B: MakeBackoff + fmt::Debug> fmt::Debug for StandardRetryPolicy<Req, Res, E, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StandardRetryPolicy")
.field("budget", &self.budget)
.field("make_backoff", &self.make_backoff)
.finish()
}
}

impl<Req, Res, E, B: fmt::Debug> fmt::Debug for StandardRetryPolicyBuilder<Req, Res, E, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StandardRetryPolicyBuilder").finish()
}
}
Loading

0 comments on commit 70544a2

Please sign in to comment.