Skip to content

Commit

Permalink
First stab at implementing retries (#421)
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Mariette committed May 11, 2023
1 parent f10406b commit 50f8359
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 4 deletions.
3 changes: 3 additions & 0 deletions graph-error/src/graph_failure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub enum GraphFailure {

#[error("{0:#?}")]
ErrorMessage(#[from] ErrorMessage),

#[error("Temporary Graph API Error")]
TemporaryError,
}

impl GraphFailure {
Expand Down
1 change: 1 addition & 0 deletions graph-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde_urlencoded = "0.7.1"
thiserror = "1"
tokio = { version = "1.27.0", features = ["full"] }
url = { version = "2", features = ["serde"] }
backoff = { version = "0.4.0", features = ["tokio"] }

graph-error = { path = "../graph-error" }
graph-core = { path = "../graph-core" }
Expand Down
86 changes: 85 additions & 1 deletion graph-http/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,64 @@
use crate::blocking::BlockingClient;
use crate::traits::ODataQuery;

use backoff::exponential::ExponentialBackoff;
use backoff::{ExponentialBackoffBuilder, SystemClock};
use graph_error::GraphResult;
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, USER_AGENT};
use reqwest::redirect::Policy;
use reqwest::tls::Version;
use std::env::VarError;
use std::ffi::OsStr;
use std::fmt::{Debug, Formatter};
use std::time::Duration;
use std::time::{Duration, Instant};
use url::Url;

#[derive(Debug, Clone)]
pub struct RetriesConfig {
/// The current retry interval.
pub current_interval: Duration,
/// The initial retry interval.
pub initial_interval: Duration,
/// The randomization factor to use for creating a range around the retry interval.
///
/// A randomization factor of 0.5 results in a random period ranging between 50% below and 50%
/// above the retry interval.
pub randomization_factor: f64,
/// The value to multiply the current interval with for each retry attempt.
pub multiplier: f64,
/// The maximum value of the back off period. Once the retry interval reaches this
/// value it stops increasing.
pub max_interval: Duration,
/// The system time. It is calculated when an [`ExponentialBackoff`](struct.ExponentialBackoff.html) instance is
/// created and is reset when [`retry`](../trait.Operation.html#method.retry) is called.
pub start_time: Instant,
/// The maximum elapsed time after instantiating [`ExponentialBackoff`](struct.ExponentialBackoff.html) or calling
/// [`reset`](trait.Backoff.html#method.reset) after which [`next_backoff`](../trait.Backoff.html#method.reset) returns `None`.
pub max_elapsed_time: Option<Duration>,
}

impl Default for RetriesConfig {
fn default() -> RetriesConfig {
RetriesConfig {
current_interval: Duration::from_millis(500),
initial_interval: Duration::from_millis(500),
randomization_factor: 0.0,
multiplier: 1.5,
max_interval: Duration::from_secs(450),
max_elapsed_time: Some(Duration::from_millis(450)),
start_time: Instant::now(),
}
}
}

#[derive(Clone)]
struct ClientConfiguration {
access_token: Option<String>,
headers: HeaderMap,
referer: bool,
timeout: Option<Duration>,
retries: bool,
retries_config: RetriesConfig,
connect_timeout: Option<Duration>,
connection_verbose: bool,
https_only: bool,
Expand All @@ -33,6 +75,8 @@ impl ClientConfiguration {
headers,
referer: true,
timeout: None,
retries: false,
retries_config: RetriesConfig::default(),
connect_timeout: None,
connection_verbose: false,
https_only: true,
Expand All @@ -49,6 +93,8 @@ impl Debug for ClientConfiguration {
.field("headers", &self.headers)
.field("referer", &self.referer)
.field("timeout", &self.timeout)
.field("retries", &self.retries)
.field("retries_config", &self.retries_config)
.field("connect_timeout", &self.connect_timeout)
.field("https_only", &self.https_only)
.field("min_tls_version", &self.min_tls_version)
Expand Down Expand Up @@ -112,6 +158,44 @@ impl GraphClientConfiguration {
self
}

/// Enables the client to perform retries if the request fail.
///
/// Default is no retries.
pub fn retries(mut self, retries: bool) -> GraphClientConfiguration {
self.config.retries = retries;
self
}

pub(crate) fn get_retries(&self) -> bool {
self.config.retries
}

/// Set the Retries Config to be used by the Exponential Backoff Builder
/// Has no effect if the retries are not enabled
///
/// # Note
///
/// If Graph API returns an Error 429 Too Many Requests with a Retry-After header
/// it will be preferred over the configured exponential backoff
pub fn retries_config(mut self, config: RetriesConfig) -> GraphClientConfiguration {
self.config.retries_config = config;
self
}

pub fn get_retries_config(self) -> RetriesConfig {
self.config.retries_config
}

pub(crate) fn get_retries_exponential_backoff(&self) -> ExponentialBackoff<SystemClock> {
ExponentialBackoffBuilder::new()
.with_initial_interval(self.config.retries_config.initial_interval)
.with_multiplier(self.config.retries_config.multiplier)
.with_randomization_factor(self.config.retries_config.randomization_factor)
.with_max_interval(self.config.retries_config.max_interval)
.with_max_elapsed_time(self.config.retries_config.max_elapsed_time)
.build()
}

/// Set whether connections should emit verbose logs.
///
/// Enabling this option will emit [log][] messages at the `TRACE` level
Expand Down
64 changes: 62 additions & 2 deletions graph-http/src/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use crate::internal::{
RequestComponents,
};
use async_stream::try_stream;
use backoff::future::retry;
use futures::Stream;
use graph_error::{ErrorMessage, GraphFailure, GraphResult};
use http::StatusCode;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue, CONTENT_TYPE};
use serde::de::DeserializeOwned;
use std::collections::VecDeque;
Expand Down Expand Up @@ -187,8 +189,66 @@ impl RequestHandler {

#[inline]
pub async fn send(self) -> GraphResult<reqwest::Response> {
let request_builder = self.build()?;
request_builder.send().await.map_err(GraphFailure::from)
match self.client_builder.get_retries() {
true => {
let backoff = self.client_builder.get_retries_exponential_backoff();
let request_builder = self.build()?;
match request_builder.try_clone() {
Some(request_builder_clone) => {
retry(backoff, || async {
match request_builder_clone.try_clone().unwrap() // we already proved we could clone
.send().await {
Ok(response) => match response.status() {
StatusCode::TOO_MANY_REQUESTS
| StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
match response.headers().get("Retry-After") {
Some(retry_after) => {
let retry_after = match retry_after.to_str() {
Ok(ra) => match ra.parse::<u64>() {
Ok(ra) => Some(Duration::from_secs(ra)),
Err(_) => None,
},
Err(_) => None,
}; // if None will use the configured exponential backoff
Err(backoff::Error::Transient {
err: GraphFailure::TemporaryError,
retry_after,
})
}
None => Err(backoff::Error::Transient {
err: GraphFailure::TemporaryError,
retry_after: None,
}),
}
}
_ => Ok(response),
},
Err(e) => match e.status() {
Some(StatusCode::TOO_MANY_REQUESTS)
| Some(StatusCode::INTERNAL_SERVER_ERROR)
| Some(StatusCode::SERVICE_UNAVAILABLE)
| Some(StatusCode::GATEWAY_TIMEOUT) => {
Err(backoff::Error::Transient {
err: GraphFailure::TemporaryError,
retry_after: None,
})
}
_ => Err(backoff::Error::Permanent(GraphFailure::from(e))),
},
}
})
.await
}
None => request_builder.send().await.map_err(GraphFailure::from),
}
}
false => {
let request_builder = self.build()?;
request_builder.send().await.map_err(GraphFailure::from)
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ pub static GRAPH_URL_BETA: &str = "https://graph.microsoft.com/beta";

pub use crate::client::Graph;
pub use graph_error::{GraphFailure, GraphResult};
pub use graph_http::api_impl::{GraphClientConfiguration, ODataQuery};
pub use graph_http::api_impl::{GraphClientConfiguration, RetriesConfig, ODataQuery};

/// Reexport of graph-oauth crate.
pub mod oauth {
Expand Down

0 comments on commit 50f8359

Please sign in to comment.