diff --git a/tower/Cargo.toml b/tower/Cargo.toml index acfebbd77..a8ab1935a 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -58,7 +58,7 @@ load-shed = ["__common"] make = ["futures-util", "pin-project-lite", "tokio/io-std"] ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"] reconnect = ["make", "tokio/io-std", "tracing"] -retry = ["__common", "tokio/time", "util"] +retry = ["__common", "tokio/time", "util", "tracing"] spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"] steer = [] timeout = ["pin-project-lite", "tokio/time"] diff --git a/tower/src/retry/backoff.rs b/tower/src/retry/backoff.rs index 306723eda..5f2d3bd80 100644 --- a/tower/src/retry/backoff.rs +++ b/tower/src/retry/backoff.rs @@ -23,7 +23,7 @@ pub trait MakeBackoff { type Backoff: Backoff; /// Constructs a new backoff type. - fn make_backoff(&mut self) -> Self::Backoff; + fn make_backoff(&self) -> Self::Backoff; } /// A backoff trait where a single mutable reference represents a single @@ -120,7 +120,7 @@ where { type Backoff = ExponentialBackoff; - fn make_backoff(&mut self) -> Self::Backoff { + fn make_backoff(&self) -> Self::Backoff { ExponentialBackoff { max: self.max, min: self.min, @@ -179,6 +179,8 @@ where self.iterations += 1; + tracing::trace!(next_backoff_ms = %next.as_millis(), "Next backoff"); + tokio::time::sleep(next) } } @@ -217,7 +219,7 @@ mod tests { let min = time::Duration::from_millis(min_ms); let max = time::Duration::from_millis(max_ms); let rng = HasherRng::default(); - let mut backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) { + let backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) { Err(_) => return TestResult::discard(), Ok(backoff) => backoff, }; @@ -231,7 +233,7 @@ mod tests { let min = time::Duration::from_millis(min_ms); let max = time::Duration::from_millis(max_ms); let rng = HasherRng::default(); - let mut backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) { + let backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) { Err(_) => return TestResult::discard(), Ok(backoff) => backoff, }; @@ -246,7 +248,7 @@ mod tests { let base = time::Duration::from_millis(base_ms); let max = time::Duration::from_millis(max_ms); let rng = HasherRng::default(); - let mut backoff = match ExponentialBackoffMaker::new(base, max, jitter, rng) { + let backoff = match ExponentialBackoffMaker::new(base, max, jitter, rng) { Err(_) => return TestResult::discard(), Ok(backoff) => backoff, }; diff --git a/tower/src/retry/mod.rs b/tower/src/retry/mod.rs index 3abd94fca..a800f332a 100644 --- a/tower/src/retry/mod.rs +++ b/tower/src/retry/mod.rs @@ -1,10 +1,24 @@ //! Middleware for retrying "failed" requests. +//! +//! # Batteries-Included Features +//! +//! The [`standard_policy`] module contains a default retry [`Policy`] that can +//! be used with the [`Retry`] middleware. For more information, see the module +//! docs for [`standard_policy`]. +//! +//! The [`backoff`] module contains utilities for implementing backoffs — +//! which determine how long to wait between retries — in a custom [`Policy`]. +//! +//! The [`budget`] module contains utilities to reduce the amount of concurrent +//! retries made by a tower middleware stack. The goal for this is to reduce +//! congestive collapse when downstream systems degrade. pub mod backoff; pub mod budget; pub mod future; mod layer; mod policy; +pub mod standard_policy; pub use self::layer::RetryLayer; pub use self::policy::Policy; diff --git a/tower/src/retry/standard_policy.rs b/tower/src/retry/standard_policy.rs new file mode 100644 index 000000000..a507b7c21 --- /dev/null +++ b/tower/src/retry/standard_policy.rs @@ -0,0 +1,301 @@ +//! A standard retry policy that combines many of the `retry` module's utilities +//! together in a production-ready retry policy. +//! +//! # Defaults +//! +//! - [`ExponentialBackoffMaker`] is the default type for `B`. +//! - [`Budget`]'s default implementation is used. +//! - [`IsRetryable`] by default will always return `false`. +//! - [`CloneRequest`] by default will always return `None`. +//! +//! # Backoff +//! +//! The [`StandardRetryPolicy`] takes some [`MakeBackoff`] and will make a +//! [`Backoff`] for each request "session" (a request session is the initial +//! request and any subsequent requests). It will then supply the backoff future +//! to the retry middlware. Usually, this future is the [`tokio::time::Sleep`] +//! type. +//! +//! # Budget +//! +//! The [`StandardRetryPolicy`] uses the [`Budget`] type to ensure that for each +//! produced policy that this client will not overwhelm downstream services. +//! Check the docs of [`Budget`] to understand what the defaults are and why they +//! were chosen. +//! +//! # Example +//! +//! ``` +//!# use tower::retry::standard_policy::StandardRetryPolicy; +//!# use tower::retry::budget::Budget; +//! +//! let policy = StandardRetryPolicy::<(), (), ()>::builder() +//! .should_retry(|res: &mut Result<(), ()>| true) +//! .clone_request(|req: &()| Some(*req)) +//! .budget(Budget::default()) +//! .build(); +//! ``` +use std::{fmt, marker::PhantomData, sync::Arc}; + +use super::{ + backoff::{Backoff, ExponentialBackoffMaker, MakeBackoff}, + budget::Budget, +}; +use crate::retry::Policy; + +/// A trait to determine if the request associated with the response should be +/// retried by [`StandardRetryPolicy`]. +/// +/// # Closure +/// +/// This trait provides a blanket implementation for a closure of the type +/// `Fn(&mut Result) -> bool + Send + Sync + 'static`. +pub trait IsRetryable: Send + Sync + 'static { + /// Return `true` if the request associated with the response should be + /// retried and `false` if it should not be retried. + fn is_retryable(&self, response: &mut Result) -> bool; +} + +/// A trait to clone a request for the [`StandardRetryPolicy`]. +/// +/// # Closure +/// +/// This trait provides a blanket implementation for a closure of the type +/// `Fn(&Req) -> Option + Send + Sync + 'static`. +pub trait CloneRequest: Send + Sync + 'static { + /// Clone a request, if `None` is returned the request will not be retried. + fn clone_request(&self, request: &Req) -> Option; +} + +impl IsRetryable for F +where + F: Fn(&mut Result) -> bool + Send + Sync + 'static, +{ + fn is_retryable(&self, response: &mut Result) -> bool { + (self)(response) + } +} + +impl CloneRequest for F +where + F: Fn(&Req) -> Option + Send + Sync + 'static, +{ + fn clone_request(&self, request: &Req) -> Option { + (self)(request) + } +} + +/// A standard retry [`Policy`] that combines a retry budget and a backoff +/// mechanism to produce a safe retry policy that prevents congestive collapse +/// and retry storms. +/// +/// This type is constructed with the [`StandardRetryPolicyBuilder`]. +pub struct StandardRetryPolicy +where + B: MakeBackoff, +{ + is_retryable: Arc>, + clone_request: Arc>, + budget: Arc, + make_backoff: B, + current_backoff: B::Backoff, + _pd: PhantomData, +} + +/// Builder for [`StandardRetryPolicy`]. +/// +/// This type can constructed from the `StandardRetryPolicy::builder` function. +pub struct StandardRetryPolicyBuilder { + is_retryable: Arc>, + clone_request: Arc>, + make_backoff: B, + budget: Arc, +} + +impl StandardRetryPolicyBuilder { + /// Sets the retry decision maker. + /// + /// # Default + /// + /// By default, this will be set to an [`IsRetryable`] implementation that + /// always returns false and thus will not retry requests. + /// + /// # Example + /// + /// ``` + /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// // Set the Req, Res, and E type to () for simplicity, replace these with + /// // your specific request/response/error types. + /// StandardRetryPolicy::<(), (), ()>::builder() + /// .should_retry(|res: &mut Result<(), ()>| true) + /// .build(); + /// ``` + pub fn should_retry(mut self, f: impl IsRetryable + 'static) -> Self { + self.is_retryable = Arc::new(f); + self + } + + /// Sets the clone request handler. + /// + /// # Default + /// + /// By default, this will be set to a [`CloneRequest`] implementation that + /// will never clone the request and will always return `None`. + /// + /// # Example + /// + /// ``` + /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// // Set the Req, Res, and E type to () for simplicity, replace these with + /// // your specific request/response/error types. + /// StandardRetryPolicy::<(), (), ()>::builder() + /// .clone_request(|req: &()| Some(*req)) + /// .build(); + /// ``` + pub fn clone_request(mut self, f: impl CloneRequest + 'static) -> Self { + self.clone_request = Arc::new(f); + self + } + + /// Sets the backoff maker. + /// + /// # Default + /// + /// By default, this will be set to [`ExponentialBackoffMaker`]'s default + /// implementation. + /// + /// # Example + /// + /// ``` + /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// # use tower::retry::backoff::ExponentialBackoffMaker; + /// // Set the Req, Res, and E type to () for simplicity, replace these with + /// // your specific request/response/error types. + /// StandardRetryPolicy::<(), (), ()>::builder() + /// .make_backoff(ExponentialBackoffMaker::default()) + /// .build(); + /// ``` + pub fn make_backoff(self, backoff: B2) -> StandardRetryPolicyBuilder { + StandardRetryPolicyBuilder { + make_backoff: backoff, + is_retryable: self.is_retryable, + clone_request: self.clone_request, + budget: self.budget, + } + } + + /// Sets the budget. + pub fn budget(mut self, budget: impl Into>) -> Self { + self.budget = budget.into(); + self + } + + /// Consume this builder and produce a [`StandardRetryPolicy`] with this + /// builders configured settings. + /// + /// # Default + /// + /// By default, this will be set to `Budget::default()`. + /// + /// # Example + /// + /// ``` + /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// # use tower::retry::budget::Budget; + /// // Set the Req, Res, and E type to () for simplicity, replace these with + /// // your specific request/response/error types. + /// StandardRetryPolicy::<(), (), ()>::builder() + /// .budget(Budget::default()) + /// .build(); + /// ``` + pub fn build(self) -> StandardRetryPolicy { + let current_backoff = self.make_backoff.make_backoff(); + + StandardRetryPolicy { + is_retryable: self.is_retryable, + clone_request: self.clone_request, + make_backoff: self.make_backoff, + current_backoff, + budget: self.budget, + _pd: PhantomData, + } + } +} + +impl StandardRetryPolicy { + /// Create a [`StandardRetryPolicyBuilder`]. + pub fn builder() -> StandardRetryPolicyBuilder { + StandardRetryPolicyBuilder { + is_retryable: Arc::new(|_: &mut Result| false), + clone_request: Arc::new(|_: &Req| None), + make_backoff: ExponentialBackoffMaker::default(), + budget: Arc::new(Budget::default()), + } + } +} + +impl Policy for StandardRetryPolicy +where + B: MakeBackoff, + Req: 'static, + Res: 'static, + E: 'static, +{ + type Future = ::Future; + + fn retry(&mut self, _req: &mut Req, result: &mut Result) -> Option { + let can_retry = self.is_retryable.is_retryable(result); + + if !can_retry { + tracing::trace!("Received non-retryable response"); + self.budget.deposit(); + return None; + } + + let can_withdraw = self.budget.withdraw().is_ok(); + + if !can_withdraw { + tracing::trace!("Unable to withdraw from budget"); + return None; + } + + tracing::trace!("Withdrew from retry budget"); + + Some(self.current_backoff.next_backoff()) + } + + fn clone_request(&mut self, req: &Req) -> Option { + self.clone_request.clone_request(req) + } +} + +impl Clone for StandardRetryPolicy +where + B: MakeBackoff + Clone, +{ + fn clone(&self) -> Self { + Self { + is_retryable: self.is_retryable.clone(), + clone_request: self.clone_request.clone(), + budget: self.budget.clone(), + make_backoff: self.make_backoff.clone(), + current_backoff: self.make_backoff.make_backoff(), + _pd: PhantomData, + } + } +} + +impl fmt::Debug for StandardRetryPolicy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StandardRetryPolicy") + .field("budget", &self.budget) + .field("make_backoff", &self.make_backoff) + .finish() + } +} + +impl fmt::Debug for StandardRetryPolicyBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StandardRetryPolicyBuilder").finish() + } +} diff --git a/tower/tests/retry/main.rs b/tower/tests/retry/main.rs index 7ce220b47..2d75bdb58 100644 --- a/tower/tests/retry/main.rs +++ b/tower/tests/retry/main.rs @@ -2,9 +2,11 @@ #[path = "../support.rs"] mod support; +use std::time::Duration; + use futures_util::future; use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task}; -use tower::retry::Policy; +use tower::retry::{standard_policy::StandardRetryPolicy, Policy}; use tower_test::{assert_request_eq, mock}; #[tokio::test(flavor = "current_thread")] @@ -218,6 +220,44 @@ where } } +#[tokio::test] +async fn basic() { + let _t = support::trace_init(); + + tokio::time::pause(); + + let policy = StandardRetryPolicy::builder() + .should_retry( + |r: &mut Result<&'static str, Box>| { + if let Err(e) = r { + if format!("{:?}", e).contains("retry me") { + return true; + } + } + + false + }, + ) + .clone_request(|r: &&'static str| Some(*r)) + .build(); + + let (mut svc, mut handle) = new_service(policy); + + assert_ready_ok!(svc.poll_ready()); + + let mut fut = task::spawn(svc.call("hello")); + + assert_request_eq!(handle, "hello").send_error("retry me"); + + assert_pending!(fut.poll()); + tokio::time::advance(Duration::from_secs(1)).await; + assert_pending!(fut.poll()); + + assert_request_eq!(handle, "hello").send_response("world"); + + assert_eq!(fut.await.unwrap(), "world"); +} + fn new_service + Clone>( policy: P, ) -> (mock::Spawn>, Handle) {